跳到主要内容

进程与线程

信息
2024年8月10日 · ·
  • 线程,最小的执行单元;
  • 进程,最小的资源分配单元,至少由一个线程组成;

1. 多进程

Unix/Linux 操作系统的 fork() 可以把当前进程(父进程)复制一份(子进程),然后分别在父子进程内返回,其中子进程返回的是 0,父进程返回的是子进程 ID,子进程通过 getppid() 获取父进程 ID;

os 模块封装的系统调用包含 fork

import os

print(f'Process {os.getpid()} start...')
pid = os.fork()
if pid = 0:
print(f'child process {os.getpid()}, parent is {os.getppid()}')
else:
print(f'process {os.getpid()} created a child process {pid}')
Process 1798 start...
child process 1799, parent is 1798
process 1798 created a child process 1799

multiprocessing

跨平台的多进程模板;

from multiprocessing import Process

import os
import time


def proc(name):
# time.sleep(5)
print(f'run child process {name} ({os.getpid()})')


if __name__ == "__main__":
print(f'parent process {os.getpid()}')
# 创建 Process 实例
p = Process(target=proc, args=('test', ))
print('child process will start.')
# 启动 p 子进程
p.start()
# 阻塞,等待 p 子进程结束后才继续往下执行
p.join()
print('child process end.')
parent process 4872
child process will start.
run child process test (14360)
child process end.

子进程结束后主进程才会退出;

Pool

进程池,用于批量启动子进程;

from multiprocessing import Pool
import os, time, random


def proc(name):
print(f'run task {name} ({os.getpid()})')
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f'task {name} runs: {end-start}')


if __name__ == "__main__":
print(f'parent process {os.getpid()}')
# 正在运行的子进程个数
p = Pool(4)
for i in range(5):
p.apply_async(proc, args=(i, ))
print('waiting for all subprocesses done...')
# close 后不能继续添加新的 process
p.close()
# 等待所有子进程执行完毕,必须在 close 后调用
p.join()
print('all subprocess done.')
parent process 14360
waiting for all subprocesses done...
run task 0 (10800)
run task 1 (7996)
run task 2 (4084)
run task 3 (3492)
task 0 runs: 1.7698190212249756
run task 4 (10800)
task 1 runs: 1.9010038375854492
task 2 runs: 2.1044681072235107
task 3 runs: 2.431309700012207
task 4 runs: 2.9220056533813477
all subprocess done.

subprocess

创建外部进程,并控制其输入输出,如调用命令nslookup,并与之交互;

# 调用 snlookup 子进程
p = subprocess.call(['nslookup', 'www.python.org'])

# 启动 nslookup 子进程,并设置其输入输出流
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 向 p 子进程输入命令,并接收输出
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
# 打印输出
print(output.decode('gbk'))

进程间通行

Python 的multiprocessing模块提供了QueuePipes等交换数据的方式;

from multiprocessing import Process, Queue
import os, time, random


def write(q):
print(f'process to write: {os.getpid()}')
for value in ['A', 'B', 'C']:
print(f'put {value} to queue...')
q.put(value)
time.sleep(random.random())


def read(q):
print(f'process to read: {os.getpid()}')
while True:
value = q.get(True)
print(f'get {value} from queue.')


if __name__ == "__main__":
q = Queue()
pw = Process(target=write, args=(q, ))
pr = Process(target=read, args=(q, ))
pw.start()
pr.start()
pw.join()
# 写完即可关闭读进程
pr.terminate()
process to write: 9472
put A to queue...
process to read: 10084
get A from queue.
put B to queue...
get B from queue.
put C to queue...
get C from queue.

2. 多线程

threading

Python 标准库提供了两个线程模块:_thread(低级模块)、threading(高级模块,_thread的封装);

import time, threading


def loop():
print(f'thread {threading.current_thread().name} is running...')
n = 0
while n < 5:
n = n + 1
print(f'thread {threading.current_thread().name} >>> {n}')
time.sleep(1)
print(f'thread {threading.current_thread().name} ended.')


print(f'thread {threading.current_thread().name} is running...')
# 传入一个函数以创建 Thread 实例
t = threading.Thread(target=loop, name='LoopThread')
# 启动子线程
t.start()
# 阻塞等待子线程执行结束
t.join()
print(f'thread {threading.current_thread().name} ended.')
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.

Lock

多进程中,同一个变量会被拷贝到每个进程中,互不影响;

多线程中,所有变量由所有线程共享;

高级语言的一条语句在 CPU 执行时可能是若干条语句,当多个线程同时操作同一个变量时,变量在缓存状态下可能已被修改,此时执行结果无法满足"一致性"(类比事务一致性);

import threading
balance = 0
lock = threading.Lock()

def run_thread(n):
for i in range(10000):
# 获取锁
lock.acquire()
try:
change_it(n)
finally:
# 释放锁
lock.release()
  • ,可以避免上述问题,同一时刻最多只有一个线程持有同一个锁,只有成功获得锁,才能继续执行代码,其他尝试获取该锁的线程会被阻塞,知道获得锁的线程释放该锁;

坏处

阻止多线程的并发执行;

死锁

存在多个锁,多个线程试图获取对方持有的锁,会导致多个线程同时挂起,只能靠操作系统强制终止;

GIL

Global Interpreter Lock 全局解释器锁,任何 Python 线程在执行前必须先获得GIL,每执行 100 条字节码自动释放GIL,这导致多线程在同一时间实际永远不能利用多核 CPU,只能交替执行;

Python 中多线程并发不能利用多核 CPU;

3. ThreadLocal

多线程环境下,全局变量必须加锁,相较使用局部变量会更好;

局部变量需要在函数调用建以参数传递,十分麻烦,此时可引入ThreadLocal变量;

  • ThreadLocal,变量虽是全局变量,但每个线程只能读取自己线程独立副本,互不干扰;
import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()


def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))


def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()


t1 = threading.Thread(target=process_thread, args=('Alice', ), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob', ), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

4. 进程 vs. 线程

多进程

  • 优点:稳定性好,子进程挂掉不影响其他;
  • 缺点:创建,切换等操作开销巨大;

子线程

  • 优点:开销稍小;
  • 缺点:所有线程共享其所在进程的内存,一个线程挂掉,整个进程崩溃;

线程切换

多线程的并行实际是单核快速在多线程间切换的效果,当线程多到一定程度,线程切换会消耗系统大量资源,使效率急剧下降;

计算密集型 vs. IO 密集型

计算密集型

消耗都在 CPU,最高效率利用 CPU,同时进行的任务数应等于 CPU 核数;

Python 的运行效率较低,不适合计算型程序,最好使用 C 语言;

IO 密集型

消耗在等待 IO 上,CPU 消耗少,任务越多,CPU 效率越高;

Python 与 C 语言在 IO 密集型程序上的运行效率表现相差不大,Python 开发效率往往更高;

异步 IO

操作系统提供的异步 IO 支持,事件驱动模型;

Python 中单线程的异步编程模型称为协程

5. 分布式进程

Process 可以分布到多台机器上(分布式),Thread 最多只能分布在同一机器的多个 CPU;

Python 的multiprocessing模块的子模块managers对分布式做了支持;

# task_master.py

import random, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()


def get_task_queue():
return task_queue


def get_result_queue():
return result_queue


# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass


if __name__ == "__main__":

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=get_task_queue)
QueueManager.register('get_result_queue', callable=get_result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=100)
print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
import time, queue
from multiprocessing.managers import BaseManager


# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass


if __name__ == "__main__":

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
manager = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
manager.connect()
# 获取Queue的对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n * n)
time.sleep(1)
result.put(r)
except queue.Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')

通过managers模块把Queue暴露到网络以便其他机器的进程访问,worker中并没有保存QueueQueue存储在master中;

Queue是用来传递任务和接收结果的,任务的描述数据要尽量小,如果要传输体量较大的数据,可通过共享磁盘等,让worker自行读取;


PS:感谢每一位志同道合者的阅读,欢迎关注、评论、赞!