本文是一篇学习笔记,学习内容主要来源于莫凡python的文档:https://mofanpy.com/tutorials/python-basic/threading/thread
多线程
线程基本结构
开启子线程的简单方式如下:
import threadingdef thread_job():print('This is a thread of %s' % threading.current_thread())def main():thread = threading.Thread(target=thread_job, ) # 定义线程thread.start() # 让线程开始工作if __name__ == '__main__':main()
线程阻塞
下面是一个双线程的示例,期望效果是先运行完两个子线程,再输出all done
。
import threading
import timedef T1_job():print("T1 start\n")for i in range(10):time.sleep(0.1)print("T1 finish\n")def T2_job():print("T2 start\n")print("T2 finish\n")if __name__ == '__main__':thread_1 = threading.Thread(target=T1_job, name='T1')thread_2 = threading.Thread(target=T2_job, name='T2')thread_1.start() # 开启T1thread_2.start() # 开启T2print("all done\n")
输出:
T1 start
T2 start
all done
T2 finish
T1 finish
实际结果发现,主线程没有“等待”子线程执行完就已经结束。
为了达到预期效果,需要通过join()
方法来设定线程阻塞。
下面再开启T2之前,插入thread_1.join()
if __name__ == '__main__':thread_1 = threading.Thread(target=T1_job, name='T1')thread_2 = threading.Thread(target=T2_job, name='T2')thread_1.start() # 开启T1thread_1.join() thread_2.start() # 开启T2print("all done\n")
输出:
T1 start
T1 finish
T2 start
all done
T2 finish
可以看到,T2在等待T1结束后再开始运行。
为了达到预期情况,可以使用1221
的V型排布:
if __name__ == '__main__':thread_1 = threading.Thread(target=T1_job, name='T1')thread_2 = threading.Thread(target=T2_job, name='T2')thread_1.start()thread_2.start()thread_2.join()thread_1.join()print("all done\n")
输出:
T1 start
T2 start
T2 finish
T1 finish
all done
线程通信
在定义的子线程任务函数job
中,无法通过return
的方式将计算完成的结果返回出来。
此时,可以使用队列(Queue)这种数据结构来获取子线程的结果数据,实现线程之间的通信,下面是一个示例:
import threading
from queue import Queuedef job(l, q):for i in range(len(l)):l[i] = l[i] ** 2q.put(l)def multithreading():q = Queue()threads = []data = [[1, 2, 3], [3, 4, 5], [4, 4, 4], [5, 5, 5]]for i in range(4):t = threading.Thread(target=job, args=(data[i], q))t.start()threads.append(t)for thread in threads:thread.join()results = []for _ in range(4):results.append(q.get())print(results)if __name__ == '__main__':multithreading()
线程锁
为了防止多线程输出结果混乱,除了添加线程阻塞之外,还可以使用线程锁。
同时,线程锁还可以确保当前线程执行时,内存不会被其他线程访问,执行运算完毕后,可以打开锁共享内存。
下面是一个不添加线程锁的示例:
import threadingdef job1():global Afor i in range(10):A += 1print('job1', A)def job2():global Afor i in range(10):A += 10print('job2', A)if __name__ == '__main__':A = 0t1 = threading.Thread(target=job1)t2 = threading.Thread(target=job2)t1.start()t2.start()t1.join()t2.join()
输出:
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job2job1 19
18
job2 29
job2 39job1
job2 50
job240
60
job2 70
job2 80
job2 90
job2 100
job2 110
添加线程锁之后:
import threadingdef job1():global A, locklock.acquire()for i in range(10):A += 1print('job1', A)lock.release()def job2():global A, locklock.acquire()for i in range(10):A += 10print('job2', A)lock.release()if __name__ == '__main__':lock = threading.Lock()A = 0t1 = threading.Thread(target=job1)t2 = threading.Thread(target=job2)t1.start()t2.start()t1.join()t2.join()
输出:
job1 1
job1 2
job1 3
job1 4
job1 5
job1 6
job1 7
job1 8
job1 9
job1 10
job2 20
job2 30
job2 40
job2 50
job2 60
job2 70
job2 80
job2 90
job2 100
job2 110
多进程
进程基本结构
import multiprocessing as mpdef job(a, d):print('aaaaa')if __name__ == '__main__':p1 = mp.Process(target=job, args=(1, 2))p1.start()p1.join()
进程通信
和多线程类似,进程之间同样可以通过队列queue
形式进行通信,并且,在multiprocessing
库中,直接包含了Queue()
结构。
import multiprocessing as mpdef job(q):res = 0for i in range(1000):res += i + i ** 2 + i ** 3q.put(res) # queueif __name__ == '__main__':q = mp.Queue()p1 = mp.Process(target=job, args=(q,))p2 = mp.Process(target=job, args=(q,))p1.start()p2.start()p1.join()p2.join()res1 = q.get()res2 = q.get()print(res1 + res2)
进程池
进程池(Pool)就是将所要运行的东西,放到池子里,Python会自行解决多进程的问题。
下面是一个简单示例,processes参数指定进程池中的进程数。
import multiprocessing as mpdef job(x):return x * xdef multicore():pool = mp.Pool(processes=6)# res = pool.map(job, range(10))# print(res)res = pool.apply_async(job, (2,))print(res.get())if __name__ == '__main__':multicore()
进程池运算结果有两种获取方式:
- 第一种是
pool.map
,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果; - 第二种是
pool.apply_async()
,在apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值。
pool.apply_async()
只能传递一个值,如果要传递多个值,可以使用迭代器
,下面的代码通过迭代器实现了两种取值方式的等效结果:
import multiprocessing as mpdef job(x):return x * xdef multicore():pool = mp.Pool(processes=2)res = pool.map(job, range(10))print(res)res = pool.apply_async(job, (2,))# 用get获得结果print(res.get())# 迭代器,i=0时apply一次,i=1时apply一次等等multi_res = [pool.apply_async(job, (i,)) for i in range(10)]# 从迭代器中取出print([res.get() for res in multi_res])if __name__ == '__main__':multicore()
进程锁
在不同进程中,可以通过变量.value
的方式共享变量内存。
如果多个进程对同一个变量进行操控,不加进程锁,就会让结果混乱。
下面是一个不加进程锁的示例:
import multiprocessing as mp
import timedef job(v, num):for _ in range(5):time.sleep(0.1) # 暂停0.1秒,让输出效果更明显v.value += num # v.value获取共享变量值print(v.value)def multicore():v = mp.Value('i', 0) # 定义共享变量p1 = mp.Process(target=job, args=(v, 1))p2 = mp.Process(target=job, args=(v, 3)) # 设定不同的number看如何抢夺内存p1.start()p2.start()p1.join()p2.join()if __name__ == '__main__':multicore()
输出:
1
2
3
4
7
8
11
14
17
20
添加进程锁之后:
import multiprocessing as mp
import timedef job(v, num, l):l.acquire() # 锁住for _ in range(5):time.sleep(0.1)v.value += num # 获取共享内存print(v.value)l.release() # 释放def multicore():l = mp.Lock() # 定义一个进程锁v = mp.Value('i', 0) # 定义共享内存p1 = mp.Process(target=job, args=(v, 1, l)) # 需要将lock传入p2 = mp.Process(target=job, args=(v, 3, l))p1.start()p2.start()p1.join()p2.join()if __name__ == '__main__':multicore()
输出:
1
2
3
4
5
8
11
14
17
20
进程锁保证了进程p1的完整运行,然后才进行了进程p2的运行。
多线程和多进程的效率对比
在python语言中,并无法做到实际的多线程,这是由于Python中内置了全局解释器锁(GIL),让任何时候只有一个线程进行执行。下面是一段具体解释:
尽管Python完全支持多线程编程, 但是解释器的C语言实现部分在完全并行执行时并不是线程安全的。 实际上,解释器被一个全局解释器锁保护着,它确保任何时候都只有一个Python线程执行。 GIL最大的问题就是Python的多线程程序并不能利用多核CPU的优势 (比如一个使用了多个线程的计算密集型程序只会在一个单CPU上面运行)。
在讨论普通的GIL之前,有一点要强调的是GIL只会影响到那些严重依赖CPU的程序(比如计算型的)。 如果你的程序大部分只会涉及到I/O,比如网络交互,那么使用多线程就很合适, 因为它们大部分时间都在等待。实际上,你完全可以放心的创建几千个Python线程, 现代操作系统运行这么多线程没有任何压力,没啥可担心的。
下面是一段测试程序,对比常规计算,双线程,双进程的计算效率:
import multiprocessing as mp
import threading as td
import timedef job(q):res = 0for i in range(1000000):res += i + i ** 2 + i ** 3q.put(res) # queuedef multicore():q = mp.Queue()p1 = mp.Process(target=job, args=(q,))p2 = mp.Process(target=job, args=(q,))p1.start()p2.start()p1.join()p2.join()res1 = q.get()res2 = q.get()print('multicore:', res1 + res2)def multithread():q = mp.Queue() # thread可放入process同样的queue中t1 = td.Thread(target=job, args=(q,))t2 = td.Thread(target=job, args=(q,))t1.start()t2.start()t1.join()t2.join()res1 = q.get()res2 = q.get()print('multithread:', res1 + res2)def normal():res = 0for _ in range(2):for i in range(1000000):res += i + i ** 2 + i ** 3print('normal:', res)if __name__ == '__main__':st = time.time()normal()st1 = time.time()print('normal time:', st1 - st)multithread()st2 = time.time()print('multithread time:', st2 - st1)multicore()print('multicore time:', time.time() - st2)
输出:
normal: 499999666667166666000000
normal time: 0.9803786277770996
multithread: 499999666667166666000000
multithread time: 0.9883582592010498
multicore: 499999666667166666000000
multicore time: 1.4371891021728516
结果发现,双线程的所花时间和单线程相差不大,论证了python的多线程是“伪多线程”。然而,多进程的所花时间却更多,这是由于该运算较简单,启动线程的时间消耗过大。
把计算数扩大十倍,输出结果:
normal: 4999999666666716666660000000
normal time: 10.019219160079956
multithread: 4999999666666716666660000000
multithread time: 9.802824974060059
multicore: 4999999666666716666660000000
multicore time: 6.478690147399902
此时发现多进程的速度有了明显提升。