python - 多进程

多线程: 同一进程中,创建多个线程,执行添加的任务列表
多进程: 创建任意个进程,各自执行添加的任务列表


假如cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力,将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。


对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程。


有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)。


由于每个进程至少要干一件事,所以,一个进程至少有一个线程。当然,像Word这种复杂的进程可以有多个线程,多个线程可以同时执行,多线程的执行方式和多进程是一样的,也是由操作系统在多个线程之间快速切换,让每个线程都短暂地交替运行,看起来就像同时执行一样。当然,真正地同时执行多线程需要多核CPU才可能实现。


如果我们并没有在程序中加入进程,线程,协程等,程序都是执行单任务的进程,也就是只有一个线程。如果我们要同时执行多个任务怎么办?


有两种解决方案:
一种是启动多个进程,每个进程虽然只有一个线程,但多个进程可以一块执行多个任务。
还有一种方法是启动一个进程,在一个进程内启动多个线程,这样,多个线程也可以一块执行多个任务。
当然还有第三种方法,就是启动多个进程,每个进程再启动多个线程,这样同时执行的任务就更多了,当然这种模型更复杂,实际很少采用。


总结一下就是,多任务的实现有3种方式:
多进程模式;
多线程模式;
多进程+多线程模式。


同时执行多个任务通常各个任务之间并不是没有关联的,而是需要相互通信和协调,有时,任务1必须暂停等待任务2完成后才能继续执行,有时,任务3和任务4又不能同时执行,所以,多进程和多线程的程序的复杂度要远远高于我们前面写的单进程单线程的程序。


因为复杂度高,调试困难,所以,不是迫不得已,我们也不想编写多任务。但是,有很多时候,没有多任务还真不行。想想在电脑上看电影,就必须由一个线程播放视频,另一个线程播放音频,否则,单线程实现的话就只能先把视频播放完再播放音频,或者先把音频播放完再播放视频,这显然是不行的。

什么时候用多进程

1.多线程使用场景:IO密集型
2.多进程使用场景:CPU密集型


涉及并发的场景,大家想到使用多线程或多进程解决并发问题;
一般情况下,解决多并发场景问题,多数语言采用多线程编程模式(线程是轻量级的进程,共用一份进程空间)。
也同样适用于Python多并发处理吗? 不是的,针对并发处理,Python多线程和多进程是有很大差异的!


Python多线程和多进程差异
Python多线程不能使用CPU多核资源,即同一时刻,只有一个线程使用CPU资源,所以使用Python多线程不能算是并发。
如果想要充分利用CPU多核资源,做到多并发,这就需要Python多进程的了!
也就是说:只有Python多进程才能利用CPU多核资源,做到真正的多并发!

Python多线程和多进程应用场景

Python多线程适用于I/O密集型场景,如解决网络IO、磁盘IO阻塞问题,例如文件读写、网络数据传输等;
而Python多进程更适用于计算密集型场景,多并发,大量计算任务等。


注意:Python多线程和多进程在平时开发过程中,需要注意使用,如果使用Python多线程方式处理计算密集型任务,它比实际单进程处理性能还要慢!所以要注意,看场景类型。

多进程-fork()

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。


子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。


Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

1
2
3
4
5
6
7
8
9
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))


结果:

1
2
3
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.


由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,推荐大家用Mac学Python!


有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

多进程-multiprocessing

进程基础版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')


执行结果如下:

1
2
3
4
Parent process 928.
Process will start.
Run child process test (929)...
Process end.

进程池:

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')


代码解读:
对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。


请注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在我的电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:
p = Pool(5)
就可以同时跑5个进程。


由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。
进程池实例方法:
apply(func[, args[, kwds]]):同步进程池
apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池

Lock互斥锁

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突
进程之间数据隔离,但是共享一套文件系统,因而可以通过文件来实现进程直接的通信,但问题是必须自己加锁处理。


注意:加锁的目的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,没错,速度是慢了,牺牲了速度而保证了数据安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import json
import time
import random
import os
from multiprocessing import Process,Lock

def chakan():
dic = json.load(open('piao',)) # 先查看票数,也就是打开那个文件
print('剩余票数:%s' % dic['count']) # 查看剩余的票数

def buy():
dic = json.load(open('piao',))
if dic['count']>0: #如果还有票
dic['count']-=1 #就修改里面的值-1
time.sleep(random.randint(1,3)) #执行里面买票的一系列操作就先不执行了,让睡一会代替(并且随机的睡)
json.dump(dic,open('piao','w'))
print('%s 购票成功' % os.getpid()) # 当前的那个id购票成功

def task(mutex): #抢票

# 第一种加锁:
# mutex.acquire() #加锁
# chakan() # 因为查看的时候大家都可以看到,不需要加锁
# buy() #买的时候必须一个一个的买,先等一个人买完了,后面的人在买
# mutex.release() #取消锁

# 第二种加锁:
#with表示自动打开自动释放锁
with mutex:
chakan() # 因为查看的时候大家都可以看到,不需要加锁
buy() #买的时候必须一个一个的买,先等一个人买完了,后面的人在买


if __name__ == '__main__':
mutex = Lock()
for i in range(50):#让50个人去访问那个票数
p = Process(target=task,args=(mutex,))
p.start()

进程池加锁

还是上面抢票的例子,这次使用了进程池,由于进程之间不共享内存,所以进程之间的通信不能像线程之间直接引用,使用进程池异步对共享变量进行操作,异步操作lock锁,会引起冲突,因而需要采取一些策略来完成进程之间的数据通信。


所以要引入进程Manager来完成进程间通信的方式
这里举两个例子:
大家自行创建 piao 文件,内容为 {“count”:320}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import json
import time
import random
import os
from multiprocessing import Process,Lock,Pool

def chakan():
dic = json.load(open('piao',)) # 先查看票数,也就是打开那个文件
print('剩余票数:%s' % dic['count']) # 查看剩余的票数


def buy():
dic = json.load(open('piao',))
if dic['count']>0: #如果还有票
dic['count']-=1 #就修改里面的值-1
time.sleep(random.randint(1,3)) #执行里面买票的一系列操作就先不执行了,让睡一会代替(并且随机的睡)
json.dump(dic,open('piao','w'))
print('%s 购票成功' % os.getpid()) # 当前的那个id购票成功


def task(mutex): #抢票
# mutex.acquire() #加锁
# chakan() # 因为查看的时候大家都可以看到,不需要加锁
# buy() #买的时候必须一个一个的买,先等一个人买完了,后面的人在买
# mutex.release() #取消锁

with mutex:
chakan() # 因为查看的时候大家都可以看到,不需要加锁
buy() #买的时候必须一个一个的买,先等一个人买完了,后面的人在买


if __name__ == '__main__':
mutex = Lock()
pool = Pool(20)

"""进程池加锁
"""
from multiprocessing import Pool, Manager, Lock

manager = Manager()
mutex = manager.Lock()


for i in range(50):#让50个人去访问那个票数
pool.apply_async(task,args=(mutex, ))
pool.close()
pool.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Process,Manager
import os
# 这里实现的就是多个进程之间共享内存,并修改数据
# 这里不需要加锁,因为manager已经默认给你加锁了

def f(d,l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.append(os.getpid())
print(l)

if __name__ == '__main__':
with Manager() as manager:
d = manager.dict() #生成一个字典
l = manager.list(range(5)) #生成一个列表
p_list = []
for i in range(10):
p = Process(target=f,args=(d,l))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)