前言
本文介绍并发编程中另一个重要的知识 - 线程。
线程介绍
我们知道一个程序的运行过程是一个进程,在操作系统中每个进程都有一个地址空间,而且每个进程默认有一个控制线程,打个比方,在一个车间中有很多原材料通过流水线加工产品,而线程就是这个车间中的流水线,而这个车间就是进程,原材料就是内存中的数据,每个车间至少有一条流水线。
因此,进程只是将资源(原材料)集中到一起是资源单位,线程才是CPU具体的执行单位,一个进程中可以存在多个线程,这多个线程会共享该进程内的所有资源,因此同一个进程内开启多线程会产生资源争抢。
为了单核实现并发已经有了多进程为何还要有多线程呢?进程相当于一个一个的车间,创建一个进程就需要创建一个车间,而线程是车间中的流水线,创建一个线程只是在车间中创建一条流水线无需申请另外的内存空间,创建开销相对进程来说小很多。
开启线程 - threading模块
开启线程和开启进程的方式基本一致,只是使用的模块不同,并且在开多线程时无需在if __name__ == '__main__':
下开设,但是约定俗成还是建议在if __name__ == '__main__':
分支下开设线程。
from threading import Threaddef task():print('i am sub_thread')# 方式1:直接使用Thread实例化线程对象
if __name__ == '__main__':t = Thread(target=task)t.start()print('i am main-thread')# 方式2:继承Therad,自定自己的线程类,重写run方法
class MyThread(Thread):def run(self):task()if __name__ == '__main__':t = MyThread()t.start()print('i am main-thread')
有了开启多线程的方式,针对之前学习的socket就可以实现TCP服务端的并发:
import socket
from threading import Threadserver = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)# 线程的任务:通信循环
def task(conn):while 1:try:data = conn.recv(1024)if not data: breakconn.send(data.upper())except Exception as e:print(e)breakconn.close()while True:conn, addr = server.accept()t = Thread(target=task, args=(conn, )) # 来一个连接请求,就开一个线程处理这个连接t.start() # 这种并发方式有缺陷:容易内存溢出(线程个数有限)
join方法
多线程中也有join
方法,主线程等待子线程执行结束才执行主线程
from threading import Thread
import timeclass MyThread(Thread):def __init__(self,name):super().__init__()self.name = namedef run(self):print(f'{self.name} is run')time.sleep(2)print(f'{self.name} is over')if __name__ == '__main__':t = MyThread('tank')t.start()t.join()print('主')
多线程共享数据
我们知道多个进程之间的数据是不会共享的,但是线程是开设在进程内,一个进程内如果有多个线程,那么这多个线程就会共享该进程内的所有资源数据,如下
from threading import Threadmoney = 100
def foo():global moneymoney = 666print('子',money)if __name__ == '__main__':t = Thread(target=foo)t.start()print('主',money)# 程序运行结果
子 666
主 666
守护线程
主线程运行结束后不会立刻结束,会等待进程中的其他子线程全部运行完毕后才会结束,因为主线程结束就意味着所在的进程结束,进程结束的话内存空间就会被回收,那么其他子线程就无法工作了。因此如果子线程不是守护线程,主线程结束后会等待子线程运行完毕,但是如果子线程属于守护线程,那么主线程结束,守护线程也结束。
from threading import Thread
import timedef foo():print('太监逍遥自在!')time.sleep(3)print('老子寿终正寝')if __name__ == '__main__':t = Thread(target=foo)t.daemon = Truet.start()print('吾主驾崩')# 运行结果,表示太监没有寿终正寝
太监逍遥自在!
吾主驾崩
线程互斥锁
多线程共享统一进程内的资源,因此就会发生数据争抢,造成数据错乱,为了防止这一现象的发生,可以进行加锁处理,让多个线程进行抢锁,比如:
from threading import Thread, Lock
import timemoney = 100
mutex = Lock() # 实例化得到锁# 线程需要执行的任务
def task():global moneymutex.acquire() # 线程获取锁tmp = moneytime.sleep(0.01)money = tmp - 1mutex.release() # 任务执行完成后释放锁,由其他线程继续争抢if __name__ == '__main__':t_list = []for i in range(100):t = Thread(target=task)t.start()t_list.append(t)for t in t_list:t.join() # 等待子线程完成后才运行主线程print(money)
死锁和递归锁
我们在说进程加锁的时候有提到过,锁不能轻易使用,容易出现死锁现象,不管是进程加锁还是线程加锁都容易出现死锁,所谓死锁就是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁:
from threading import Thread,Lock
import time
mutexA = Lock()
mutexB = Lock()class MyThread(Thread):def run(self):self.func1()self.func2()def func1(self):mutexA.acquire()print(f'{self.name}抢到A锁')# self.name获取当前线程的名字mutexB.acquire()print(f'{self.name}抢到B锁')mutexB.release()mutexA.release()def func2(self):mutexB.acquire()print(f'{self.name}抢到B锁')time.sleep(2)mutexA.acquire()print(f'{self.name}抢到A锁')mutexA.release()mutexB.release()if __name__ == '__main__':for i in range(10):t = MyThread()t.start()# 运行结果
Thread-1抢到A锁
Thread-1抢到B锁
Thread-1抢到B锁
Thread-2抢到A锁
....这里阻塞住了,出现死锁现象
可以对上述执行结果进行分析:
1.共有10个线程开启,开启后会自动执行run() 2.首先执行func1功能 3.执行func1,10个线程中会有一个首先抢到A锁,另外的9个线程需要等A锁释放才能争抢A锁 4.抢到A锁的线程会很顺利的抢到B锁,然后依次释放B锁和B锁 5.A锁释放完毕后,第一个线程就可以执行到func2,再次抢到B锁,与此同时其他的9个线程执行func1时,又会有一个线程抢到A锁 6.第二个线程拿着A锁想要抢到B锁,而此时正在执行func2的线程拿着B锁想要抢到A锁 7.由此产生了死锁现象
为了解决死锁的问题,我们可以使用递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
导入RLock
将两把锁变为一把锁:mutexA = Lock()mutexB = Lock()# 换成mutexA = mutexB = RLock()
GIL
python解释器有多个版本,比如Cpython/Jpython/Pypython
,经常使用的版本就是Cpython
,GIL(全局解释器锁)就是Cpython解释器中的一把互斥锁,GIL的作用就是用来阻止同一进程下多个线程的同时执行。
GIL存在的原因是Cpython的内存管理不是线程安全的。程序的代码需要交给解释器解释执行,由于进程中所有线程是共享该进程内的资源,这就有了竞争,而垃圾回收机制也是当前进程中的一个线程,这个线程会和当前进程内的其他线程争抢数据,为了保证数据安全,进程内同一时间只有一个线程在运行就有了GIL。
可能有小伙伴会问,python既然有了GIL保证同一时间只有一个线程运行,为什么还有互斥锁呢?首先需要知道加锁的目的是为了保护数据,同一时间只能有一个线程修改共享的数据,而保护不同的数据应该加不同的锁
,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock。
锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果其它线程已经获得了该锁,则当前线程需等待其被释放),待资源访问完后,再调用release方法释放锁。
可能又有小伙伴问了,既然加锁会让运行变成串行,那么我在start之后立即使用join,不用加锁也是串行的效果,为什么还要用锁呢?在start之后立刻使用jion,肯定会将多个任务的执行变成串行,是安全的,但问题是start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的,单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高.比如下述代码用来验证join
方式的效率,可以发现最后执行时间非常久。
from threading import current_thread,Thread,Lock
import os,time
def task():time.sleep(3)print('%s start to run' %current_thread().getName())global ntemp=ntime.sleep(0.5)n=temp-1if __name__ == '__main__':n=100lock=Lock()start_time=time.time()for i in range(100):t=Thread(target=task)t.start()t.join()stop_time=time.time()print('主:%s n:%s' %(stop_time-start_time,n))'''
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 # 耗时会非常久
'''
python多线程的应用
我们知道由于GIL的原因,Cpython无法利用计算机的多核优势,是不是就意味着Cpython没有用了呢?想要回答这个问题需要明确CPU到底是用来做计算的还是用来做IO操作的,多个CPU意味着可以有多个核并行完成计算,因此多核可以提升计算速度,但是每个CPU一旦遇到IO阻塞仍然需要等待,此时多核CPU也没什么用。
举例来说,一个工人相当于cpu,此时计算相当于工人在干活,I/O阻塞相当于为工人干活提供所需原材料的过程,工人干活的过程中如果没有原材料了,则工人干活的过程需要停止,直到等待原材料的到来。 如果你的工厂干的大多数任务都要有准备原材料的过程(I/O密集型),那么你有再多的工人,意义也不大,还不如一个人,在等材料的过程中让工人去干别的活, 反过来讲,如果你的工厂原材料都齐全,那当然是工人越多,效率越高。
因此对于计算密集型的程序来说肯定是CPU越多越好,但是对于IO密集型的程序来讲再多的CPU也没用。