python - 多线程

巴拉巴拉

最近在搞爬虫项目,架构和程序设计都是由我来决定和设计,所以发挥空间还是很自由的。程序语言选择的是python,在语言选型上也考虑过golang,golang的效率和速度肯定是要比python好很多的。但是在爬虫领域,python的易用性,超多的第三方扩展库,而且python支持协程后,效率速度方面上的表现,也是很不错的。那么在python做爬虫的过程中,多线程,多进程,协程,异步等肯定都是逃不过的。所以抽出一些时间,写一写线程,进程,协程,异步等方法在爬虫中的表现。

多进程和多线程

我们常见的 Linux、Windows、Mac OS 操作系统,都是支持多进程的多核操作系统。所谓多进程,就是系统可以同时运行多个任务。例如我们的电脑上运行着 QQ、浏览器、音乐播放器、影音播放器等。在操作系统中,每个任务就是一个进程。每个进程至少做一件事,多数进程会做很多事,例如影音播放器,要播放画面,同时要播放声音,在一个进程中,就有很多线程,每个线程做一件事,在一个进程中有多个线程运行就是多线程。可以在实验环境终端执行 ps -ef 命令来查看当前系统中正在运行的进程。


计算机的两大核心为运算器和存储器。常说的手机配置四核、八核,指的就是 CPU 的数量,它决定了手机的运算能力;128G、256G 超大存储空间,指的就是手机存储数据的能力。当我们运行一个程序来计算 3 + 5,计算机操作系统会启动一个进程,并要求运算器派过来一个 CPU 来完成任务;当我们运行一个程序来打开文件,操作系统会启动存储器的功能将硬盘中的文件数据导入到内存中。


一个 CPU 在某一时刻只能做一项任务,即在一个进程(或线程)中工作,当它闲置时,会被系统派到其它进程中。单核计算机也可以实现多进程,原理是第 1 秒的时间段内运行 A 进程,其它进程等待:第 2 秒的时间段内运行 B 进程,其它进程等待。。。第 5 秒的时间段内又运行 A 进程,往复循环。当然实际上 CPU 在各个进程间的切换是极快的,在毫秒(千分之一)、微秒(百万分之一)级,以至于我们看起来这些程序就像在同时运行。现代的计算机都是多核配置,四核八核等,但计算机启动的瞬间,往往就有几十上百个进程在运行了,所以进程切换是一定会发生的,CPU 在忙不迭停地到处赶场。注意,什么时候进行进程 、线程切换是由操作系统决定的,无法人为干预。

线程安全

我们都知道在 MySQL 中有 “原子操作” 的概念,打个比方:韩梅向李红转账 100 块钱,在 MySQL 中需要两步操作:韩梅账户减少 100 元,李红账户增加 100 元。如果第一步操作完成后,意外情况导致第二步没有做,这是不允许发生的,如何保证其不允许发生呢?将两步操作设计成一个事务,事务里可以有多个步骤,其中任何一步出现问题,事务都将失败,前面的步骤全部回滚,就像什么事都没发生。这种操作就叫做原子操作,这种特性就叫做原子性。
在 Python 多线程中,变量是共享的,这也是相较多进程的一个优点,线程占用资源要少得多,但也导致多个 CPU 同时操作多个线程时会引起结果无法预测的问题,也就是说 Python 的线程不安全。
在多线程与多进程的时候,因为一般情况下都是各自完成各自的任务,各个子线程或者各个子进程之前并没有太多的联系,如果需要通信的话我会使用队列或者数据库来完成,但是最近我在写一些多线程与多进程的代码时,发现如果它们需要用到共享变量的话,需要有一些注意的地方
接下来用实例讲解一下线程共享的问题

标准数据类型在线程间共享

下面的实例,在主线程中创建变量d,在5个子线程中引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# coding:utf-8
import threading


def test(name, data):
print("in thread {} name is {}".format(threading.current_thread(), name))
print("data is {} id(data) is {}".format(data, id(data)))


if __name__ == '__main__':
d = 5
name = "cpeixin"
for i in range(5):
th = threading.Thread(target=test, args=(name, d))
th.start()


下面的结果中显示,5个子线程中打印出变量d的id相同,表示引用的同一个变量,所以说明在主线程中创建了变量d,在子线程中是可以共享的,在子线程中对共享元素的改变是会影响到其它线程的,所以如果要对共享变量进行修改时,也就是线程不安全的,需要加锁。

1
2
3
4
5
6
7
8
9
10
11
/Users/cpeixin/venv/pythonCode/bin/python /Users/cpeixin/PycharmProjects/pythonCode/thread/variable_thread.py
in thread <Thread(Thread-1, started 123145519386624)> name is cpeixin
data is 5 id(data) is 4304846080
in thread <Thread(Thread-2, started 123145524641792)> name is cpeixin
data is 5 id(data) is 4304846080
in thread <Thread(Thread-3, started 123145519386624)> name is cpeixin
data is 5 id(data) is 4304846080
in thread <Thread(Thread-4, started 123145519386624)> name is cpeixin
data is 5 id(data) is 4304846080
in thread <Thread(Thread-5, started 123145524641792)> name is cpeixin
data is 5 id(data) is 4304846080

自定义类型对象在线程间共享

如果我们要自定义一个类呢,将一个对象作为变量在子线程中传递呢?会是什么效果呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# coding:utf-8
import threading


class Data:
def __init__(self, data=None):
self.data = data

def get(self):
return self.data

def set(self, data):
self.data = data


def test(name, data):
print("in thread {} name is {}".format(threading.current_thread(), name))
print("data is {} id(data) is {}".format(data.get(), id(data)))


if __name__ == '__main__':
d = Data(10)
name = "cpeixin"
print("in main thread id(data) is {}".format(id(d)))
for i in range(5):
th = threading.Thread(target=test, args=(name, d))
th.start()
1
2
3
4
5
6
7
8
9
10
11
in main thread id(data) is 4348194152
in thread <Thread(Thread-1, started 123145427701760)> name is cpeixin
data is 10 id(data) is 4348194152
in thread <Thread(Thread-2, started 123145427701760)> name is cpeixin
data is 10 id(data) is 4348194152
in thread <Thread(Thread-3, started 123145427701760)> name is cpeixin
data is 10 id(data) is 4348194152
in thread <Thread(Thread-4, started 123145427701760)> name is cpeixin
data is 10 id(data) is 4348194152
in thread <Thread(Thread-5, started 123145427701760)> name is cpeixin
data is 10 id(data) is 4348194152


我们看到,在主线程和子线程中,这个对象的id是一样的,说明它们用的是同一个对象。


无论是标准数据类型还是复杂的自定义数据类型,它们在多线程之间是共享同一个的


以上就是在多线程中,变量共享的码上说明

GIL 全局解释器锁

如何解决线程安全问题?CPython 解释器使用了加锁的方法。每个进程有一把锁,启动线程先加锁,结束线程释放锁。打个比方,进程是一个厂房,厂房大门是开着的,门内有锁,工人进入大门后可以在内部上锁。厂房里面有 10 个车间对应 10 个线程,每个 CPU 就是一个工人。GIL(Global Interpreter Lock)全局锁就相当于厂房规定:工人要到车间工作,从厂房大门进去后要在里面反锁,完成工作后开锁出门,下一个工人再进门上锁。也就是说,任意时刻厂房里只能有一个工人,但这样就保证了工作的安全性,这就是 GIL 的原理。当然了,GIL 的存在有很多其它益处,包括简化 CPython 解释器和大量扩展的实现。
根据上面的例子可以看出 GIL 实现了线程操作的安全性,但多线程的效率被大打折扣,一个工厂里只能有一个工人干活,很难想象。这也是 David Beazley(《Python 参考手册》和《Python Cookbook》的作者)说 “Python 线程毫无用处” 的原因。
注意,GIL 不是语言特性,而是解释器的设计特点,有些 Python 解释器例如 JPython 就没有 GIL ,除了 Python 其它语言也有 GIL 设计,例如 Ruby 。

线程锁

  • 为什么需要线程锁?

多个线程对同一个数据进行修改时, 可能会出现不可预料的情况.

例如实现银行转账功能,money += 1 这句其实有三个步骤 money; money+1; money=money+1;假如这三步骤还没完成money-=1的线程就开始执行了,后果可想而知,money的值肯定时乱的

  • 如何实现线程锁?
  1. 实例化一个锁对象;
    lock = threading.Lock()
  2. 操作变量之前进行加锁
    lock.acquire()
  3. 操作变量之后进行解锁
    lock.release()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import threading


# 银行存钱和取钱
def add(lock):
global money # 生命money为全局变量
for i in range(1000000):
# 2. 操作变量之前进行加锁
lock.acquire()
money += 1 # money; money+1; money=money+1;
# 3. 操作变量之后进行解锁
lock.release()


def reduce(lock):
global money
for i in range(1000000):
# 2. 操作变量之前进行加锁
lock.acquire()
money -= 1
# 3. 操作变量之后进行解锁
lock.release()


if __name__ == '__main__':
money = 0
# 1. 实例化一个锁对象;
lock = threading.Lock()

t1 = threading.Thread(target=add, args=(lock,))
t2 = threading.Thread(target=reduce, args=(lock,))
t1.start()
t2.start()
t1.join()
t2.join()

print("当前金额:", money)

多线程提高工作效率

实际情况并非上面讲得那么惨,Python 多线程可以成倍提高程序的运行速度,而且在多数情况下都是有效的。接着上面的例子说,一个工厂里同一时刻只能有一个工人在工作,如果这个工厂里各个车间的自动化程度极高且任务耦合度极低,工人进去只是按几下按钮,就可以等待机器完成其余工作,那情况就不一样了,这种场景下一个工人可以管理好多个车间,而且大多数时间都是等,甚至还能抽空打打羽毛球看场电影。
比如爬虫程序爬取页面数据这个场景中,CPU 做的事就是发起页面请求和处理响应数据,这两步是极快的,中间网络传输数据的过程是耗时且不占用 CPU 的。一个工人可以在吃完早饭后一分钟内快速到 1000 个车间按下发起请求的按钮,吃完午饭睡一觉,日薄西山时差不多收到网络传回的数据,又用一分钟处理数据,整个程序完成。
上面的场景中,CPU 再多也没有用处,一个 CPU 抽空就能完成整个任务了,毕竟程序中需要 CPU 做的事并不多。这就涉及复杂程序的分类:CPU 密集型和 IO 密集型。爬虫程序就是 IO 密集型程序。CPU 密集型程序全是手工操作,工人一刻也不能停歇,这种情况下 Python 多线程就真可以说是毫无用处了。
我们可以使用 time.sleep 方法模拟 IO 操作来写一段程序证明多线程可以提高程序的运行效率:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File Name: thread.py

import threading
import time
import requests


def crawl_url(): # 假设这是爬虫程序,爬取一个 URL
time.sleep(0.02) # 模拟 IO 操作
# res = requests.get("http://www.ip111.cn").status_code
# print(res)


def main1(): # 单线程程序
for i in range(100):
crawl_url()


def main2(): # 多线程程序
thread_list = []
for i in range(100):
t = threading.Thread(target=crawl_url)
t.start()
thread_list.append(t)
for t in thread_list:
t.join()


if __name__ == '__main__':
start = time.time()
main1()
end = time.time()
print('单线程耗时:{:.4f}s'.format(end - start))
start = time.time()
main2()
end = time.time()
print('多线程耗时:{:.4f}s'.format(end - start))
1
2
单线程耗时:2.4027s
多线程耗时:0.0323s


理论上,main1 的耗时是 main2 的 100 倍,考虑到 main2 创建多线程、线程切换的开销,这个结果也是相当可观的,IO 操作耗时越长,多线程的威力越大。

线程池

ThreadPool

在使用多线程处理任务时也不是线程越多越好,由于在切换线程的时候,需要切换上下文环境,依然会造成cpu的大量开销。为解决这个问题,线程池的概念被提出来了。预先创建好一个较为优化的数量的线程,让过来的任务立刻能够使用,就形成了线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# coding: utf-8
from concurrent.futures import ThreadPoolExecutor
import time


def spider(page):
time.sleep(page)
print(f"crawl task{page} finished")
return page

with ThreadPoolExecutor(max_workers=5) as t: # 创建一个最大容纳数量为5的线程池
task1 = t.submit(spider, 1)
task2 = t.submit(spider, 2) # 通过submit提交执行的函数到线程池中
task3 = t.submit(spider, 3)


使用 with 语句 ,通过 ThreadPoolExecutor 构造实例,同时传入 max_workers 参数来设置线程池中最多能同时运行的线程数目。


使用 submit 函数来提交线程需要执行的任务到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。

multiprocessing.dummy.Pool

multiprocessing.dummy.Pool 是个什么东东?
看起来像一个进程池的样子啊~~
看了源码中的代码和注释

1
2
3
4
5
6
7
8
9
# Support for the API of the multiprocessing package using threads
#
# multiprocessing/dummy/__init__.py

class DummyProcess(threading.Thread)

def Pool(processes=None, initializer=None, initargs=()):
from ..pool import ThreadPool
return ThreadPool(processes, initializer, initargs)


这只是是以multiprocessing相同API实现的多线程模块。继承了Thread,内部是封装调用了ThreadPool,使用起来更加方便。

异步和同步,阻塞和非阻塞

上文的模拟爬虫示例代码中,main1 中的 for 循环运行 100 次爬取网页的操作,前一个完成后才能运行下一个,这就是同步的概念,在 crawl_url 函数内部的 IO 操作为阻塞操作,线程无法向下执行。


main2 中的第一个 for 循环,_创建 100 个线程并启动,这步操作是非阻塞的_,不会等一个线程运行完成才创建下一个线程,它会一气儿创建 100 个线程;第二个 for 循环将主线程挂起,直到全部子线程完成,此时的主线程就是阻塞的。这种程序运行方式叫做异步,CPU 在遇到 IO 阻塞时不会站在那儿傻等,而是被操作系统派往其它线程中看看有什么事可做。


所谓的异步,就是 CPU 在当前线程阻塞时可以去其它线程中工作,不管怎么设计,在一个线程内部代码都是顺序执行的,遇到 IO 都得阻塞,所谓的非阻塞,是遇到当前线程阻塞时,CPU 去其它线程工作。