0%

python多线程

由于python GIL(全局解释器锁,是python原生解释器的限制)的限制,python的多进程更适合于I/O密集型的应用,而不是计算密集型。为了实现更好的并发,应该使用多进程。
多线程编程能够降低程序的复杂性,每一个线程完成特定工作。

多线程适用于以下任务:

  • 本质上是异步的
  • 需要多个并发活动,每个活动的处理次序可能不确定的、随机的或不可预测的
  • 计算密集型任务
  • I/O密集型任务

使用线程

调用threading模块的Thread类即可初始化线程实例,使用target指定需要线程运行的函数,使用args传递参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import threading
import time


def sleep_sometime(x, nsec):
print('{0} going to sleep {1} seconds.'.format(x, nsec))
time.sleep(nsec)


nsec_list = [2, 4, 3]
threads = []
print('start at:{0}'.format(time.ctime()))

for x in range(len(nsec_list)):
t = threading.Thread(target=sleep_sometime, args=(x, nsec_list[x]))
threads.append(t)
t.start()

for t in threads:
t.join()

print('end at:{0}'.format(time.ctime()))

运行结果:

1
2
3
4
5
start at:Tue Aug 14 14:38:35 2018
0 going to sleep 2 seconds.
1 going to sleep 4 seconds.
2 going to sleep 3 seconds.
end at:Tue Aug 14 14:38:39 2018

上例中使用多线程运行3次sleep_sometime函数,最终花费的时间基本等同于花费最长时间的那次所消耗的时间。
t.start()开始执行线程;t.join()直至启动的线程终止之前一直挂起,除非给定timeout。

线程同步(同步原语),线程锁

多个线程访问同一个数据可能会因为次序问题导致不一致,线程锁可以保证同一时间只有一个线程能够执行获得锁与释放锁之间的代码。

多进程修改数据导致不一致实例。

本例参考了廖雪峰的官方网站

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import threading
import time

sum = 0


def change_it(n):
global sum
sum += n
sum -= n


def do_change(n):
for i in range(1000000):
change_it(n)


t1 = threading.Thread(target=do_change, args=(5, ))
t2 = threading.Thread(target=do_change, args=(8, ))
t1.start()
t2.start()
t1.join()
t2.join()
print('sum: {0}'.format(sum))

运行结果:(如果你的结果是0,请尝试继续加大range的值试试)

1
sum: -17

加锁实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import threading

sum = 0
lock = threading.Lock()


def change_it(n):
global sum
lock.acquire()
sum += n
sum -= n
lock.release()


def do_change(n):
for i in range(1000000):
change_it(n)


t1 = threading.Thread(target=do_change, args=(5, ))
t2 = threading.Thread(target=do_change, args=(8, ))
t1.start()
t2.start()
t1.join()
t2.join()
print('sum: {0}'.format(sum))

我们在修改sum的值前后添加了lock.acquire()lock.release(),此时修改sum的值的操作仅允许一个进程进行。

使用with加锁:

with语句经常被用在I/O操作中,加锁/释放锁的操作也可以使用with语句替代。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import threading

sum = 0
lock = threading.Lock()


def change_it(n):
global sum
with lock:
sum += n
sum -= n


def do_change(n):
for i in range(1000000):
change_it(n)


t1 = threading.Thread(target=do_change, args=(5, ))
t2 = threading.Thread(target=do_change, args=(8, ))
t1.start()
t2.start()
t1.join()
t2.join()
print('sum: {0}'.format(sum))

信号量 BoundedSemaphore

BoundedSemaphore允许一定数量的线程(初始化时指定)同时运行指定代码段。

模拟一个20个人排队点餐的例子,有3个点餐窗口,1个取餐窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import threading
import time
import random


class People(object):
def __init__(self, haoma):
self.haoma = haoma
self.jiaohao = False

def set_jiaohao(self):
self.jiaohao = True


def diancan(diancan_sam, people):
# 与锁类似,acquire()和release()之间的代码被限制部分进程同时运行
diancan_sam.acquire()
print('{0}用户点餐。'.format(people.haoma))
# 随机数模拟点餐时间
time.sleep(random.randint(1, 2))
print('{0}用户点餐完毕。'.format(people.haoma))
people.set_jiaohao()
diancan_sam.release()


def qucan(qucan_sam, people):
while not people.jiaohao:
time.sleep(1)
qucan_sam.acquire()
print('{0}用户取餐。'.format(people.haoma))
# 随机数模拟点餐时间
time.sleep(random.randint(2, 3))
print('{0}用户取餐完毕。'.format(people.haoma))
qucan_sam.release()


def main():
peoples = []
peoples_num = 20
diancan_chuangkou = 3
qucan_chuangkou = 1
diancan_sam = threading.BoundedSemaphore(diancan_chuangkou)
qucan_sam = threading.BoundedSemaphore(qucan_chuangkou)
for i in range(1, peoples_num+1):
p = People(i)
peoples.append(p)

diancan_threads = []
for i in range(peoples_num):
t = threading.Thread(target=diancan, args=(diancan_sam, peoples[i]))
diancan_threads.append(t)
t.start()

qucan_threads = []
for p in peoples:
t = threading.Thread(target=qucan, args=(qucan_sam, p))
qucan_threads.append(t)
t.start()

for t in diancan_threads:
t.join()

for t in qucan_threads:
t.join()


if __name__ == '__main__':
main()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
1用户点餐。
2用户点餐。
3用户点餐。
1用户点餐完毕。
2用户点餐完毕。
3用户点餐完毕。
4用户点餐。
5用户点餐。
6用户点餐。
1用户取餐。
4用户点餐完毕。
7用户点餐。
6用户点餐完毕。
8用户点餐。
5用户点餐完毕。
9用户点餐。
7用户点餐完毕。
10用户点餐。
8用户点餐完毕。
11用户点餐。
1用户取餐完毕。
8用户取餐。
9用户点餐完毕。
10用户点餐完毕。
12用户点餐。
13用户点餐。
13用户点餐完毕。
14用户点餐。
12用户点餐完毕。
15用户点餐。
11用户点餐完毕。
16用户点餐。
8用户取餐完毕。
3用户取餐。
16用户点餐完毕。
17用户点餐。
14用户点餐完毕。
15用户点餐完毕。
18用户点餐。
19用户点餐。
17用户点餐完毕。
20用户点餐。
19用户点餐完毕。
20用户点餐完毕。
18用户点餐完毕。
3用户取餐完毕。
4用户取餐。
4用户取餐完毕。
5用户取餐。
5用户取餐完毕。
7用户取餐。
7用户取餐完毕。
6用户取餐。
6用户取餐完毕。
2用户取餐。
2用户取餐完毕。
10用户取餐。
10用户取餐完毕。
9用户取餐。
9用户取餐完毕。
11用户取餐。
11用户取餐完毕。
12用户取餐。
12用户取餐完毕。
13用户取餐。
13用户取餐完毕。
16用户取餐。
16用户取餐完毕。
14用户取餐。
14用户取餐完毕。
15用户取餐。
15用户取餐完毕。
17用户取餐。
17用户取餐完毕。
18用户取餐。
18用户取餐完毕。
20用户取餐。
20用户取餐完毕。
19用户取餐。
19用户取餐完毕。

队列 queue

使用队列模拟一个仅能存放4条数据的先进先出的存储桶。
队列一般用在多进程间共享数据(进程具有单独的内存空间)。队列常规用法是存储线程运行结果,以便后续进行统计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from multiprocessing import Process
num = 0


def change_sum(n):
global num
num += n
print('sum add {0} is {1}'.format(n, num))


def main():
p1 = Process(target=change_sum, args=(1, ))
p2 = Process(target=change_sum, args=(2, ))
p1.start()
p2.start()
p1.join()
p2.join()


if __name__ == '__main__':
main()
print('num is {0}'.format(num))

查看运行结果,可以看到虽然在指定了global,但num的值并没有被修改。每一个进程都会将num拷贝一份到自己的内存空间
多进程修改数据导致不一致实例已经表明多线程共享内存空间。

1
2
3
num add 1 is 1
num add 2 is 2
num is 0

下面代码利用queue来做统计,每个线程在运行结束后往queue里面写入进程编号和耗时,在所有进程运行完成后程序读取queue中的数据,给出使用多线程节省了多少时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from threading import Thread
from queue import Queue
import random
import time


def sleep_sometime(x, q):
sometime = random.randint(1, 5)
print('%d thread is going to sleep %d seconds.' % (x, sometime))
time.sleep(sometime)
print('%d thread done.' % x)
q.put((x, sometime))


def main():
q = Queue()
t_list = []
threads = 10
start_time = time.time()
for x in range(threads):
t = Thread(target=sleep_sometime, args=(x, q))
t_list.append(t)
for t in t_list:
t.start()
for t in t_list:
t.join()

end_time = time.time()
print('all done....')
total_time = 0
while q.qsize() > 0:
x, sometime = q.get()
total_time += sometime
print('总线程: {0}\n耗时:{1}s\n节省:{2}s'.format(threads,end_time-start_time, total_time-(end_time-start_time)))


if __name__ == '__main__':
main()

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
0 thread is going to sleep 1 seconds.
1 thread is going to sleep 3 seconds.
2 thread is going to sleep 4 seconds.
3 thread is going to sleep 3 seconds.
4 thread is going to sleep 1 seconds.
5 thread is going to sleep 4 seconds.
6 thread is going to sleep 1 seconds.
7 thread is going to sleep 5 seconds.
8 thread is going to sleep 5 seconds.
9 thread is going to sleep 5 seconds.
0 thread done.
6 thread done.
4 thread done.
3 thread done.
1 thread done.
2 thread done.
5 thread done.
9 thread done.
8 thread done.
7 thread done.


all done....
总线程: 10
耗时:5.003232479095459s
节省:26.99676752090454s

多进程编程

我们知道由于全局解释锁(GIL)的束缚,同一时刻只有一个线程可以和python对象交互,通过使用进程,我们并行运行一定数量的python解释器,每一个进程都有自己的内存空间和GIL,每一个进程都可以串行运行(没有GIL之间的竞争),可以显著提高cpu密集型任务速度。

通常我们使用multiprocessing模块来编写多进程程序,默认情况下multiprocessing将使用它能见到的尽可能多的内核。所以应该尽量避免使用指定进程数的创建。

详细请参考转发的另一篇文章

参考:《python核心编程》第四章 多线程编程