Python线程入门:多线程并发的基础与实践

server/2025/2/4 6:33:07/

线程概念

我们在日常开发中经常会听到使用多线程/多进程的方式完成并发任务。那么什么是进程?什么是线程?进程与线程之间有什么关系?接下来我们通过日常场景简单的了解一下进程与线程。

一个工厂,至少有一个车间,一个车间中最少有一个工人,最终是工人在工作。

一个程序,至少有一个进程,一个进程中最少有一个线程,最终是线程在工作。

在一个车间中有最基本的工作工具,人通过操作工具的方式来完成工作。

一个进程中有最基本的运行代码的资源(内存、cpu...),线程通过进程中提供的资源来运行代码。

通过上述描述,我们来总结一下线程与进程的关系:

  • 线程是计算机可以被cpu调度的最小单元
  • 进程是计算机分配资源的的最小单元,进程可以为线程提供运行资源。

一个进程中可以有多个线程,同一个进程中的线程可以共享当前进程中的资源。

使用线程完成并发任务

我们之前编写的代码都被称为同步代码,排队依次运行。如果前一个任务没有完成,那么之后的任务也无法继续。

import timedef work_1():print('任务1...')time.sleep(2)def work_2():print('任务2...')time.sleep(2)work_1()
work_2()

使用线程的方式优化代码

import time
import threadingdef work_1():print('任务1...')time.sleep(2)def work_2():print('任务2...')time.sleep(2)# 通过Thread方法创建线程对象,并使用target指定线程对象要运行的任务
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)# 运行线程
t1.start()
t2.start()

实现一个简单的单线程同步爬虫

import requestsdef get_image(url):response = requests.get(url).contentfile_name = url.rsplit('/')[-1]with open(file_name, 'wb') as f:f.write(response)print('下载完成...')url_list = ['http://pic.bizhi360.com/bbpic/98/10798.jpg','http://pic.bizhi360.com/bbpic/92/10792.jpg','http://pic.bizhi360.com/bbpic/86/10386.jpg']for url in url_list:get_image(url)

对上述单线程同步爬虫进行优化

import requests
import threadingdef get_image(url):response = requests.get(url).contentfile_name = url.rsplit('/')[-1]with open(file_name, 'wb') as f:f.write(response)print('下载完成...')url_list = ['http://pic.bizhi360.com/bbpic/98/10798.jpg','http://pic.bizhi360.com/bbpic/92/10792.jpg','http://pic.bizhi360.com/bbpic/86/10386.jpg']for url in url_list:t = threading.Thread(target=get_image, args=(url,))t.start()

除了上述使用线程的方式提升运行速度之外,我们还可以使用多进程的方式完成并发任务。

import requests
import multiprocessingdef get_image(url):response = requests.get(url).contentfile_name = url.rsplit('/')[-1]with open(file_name, 'wb') as f:f.write(response)print('下载完成...')url_list = ['http://pic.bizhi360.com/bbpic/98/10798.jpg','http://pic.bizhi360.com/bbpic/92/10792.jpg','http://pic.bizhi360.com/bbpic/86/10386.jpg']if __name__ == "__main__":for url in url_list:p = multiprocessing.Process(target=get_image, args=(url,))p.start()
GIL锁

在刚刚的一些案例中我们发现无论使用多线程还是多进程的方式都可以实现并发任务,那么在什么业务场景下去使用多线程/多进程?在学习本小结内容之前,先给出结论:

  • 如果任务是属于IO密集型任务那么优先使用线程方式
  • 如果任务是属于计算密集型任务那么优先使用进程方式

在Python中存在全局解释器锁,简称为GILGILcpython解释器独有的。主要的功能是:让一个进程在同一时刻只能有一个线程被执行。

例如在一个进程中创建了多个线程,在运行当前程序时在同一时刻只有一个线程被执行,其他线程等待cpu调度。这种情况无法利用多核cpu的优势,如果想要绕开GIL,那么可以使用多进程的方式,创建多个进程,每个进程中只有一个线程。但是请注意,创建多进程消耗的资源比多线程消耗的资源大。

线程方法 - 重点

在正式学习线程方法之前,我们首先回顾一下刚刚学习到的内容。

# threading_test.py
import time
import threadingdef work():time.sleep(2)print('子线程任务...')# 当前只是创建了线程对象,并不会执行线程
t = threading.Thread(target=work)
# 通过start运行线程
t.start()
print('主线程打印信息...')'''
在上述代码中,我们要明确:1.一个py文件被解释器执行时会在操作系统中创建进程。2.在进程中创建线程来执行当前文件中的代码,我们把最初创建的线程称之为主线程。3.当主线程执行到Thread代码时会创建一个新的线程,我们一般称之为子线程。4.当前代码中的主线程与子线程交替执行。5.子线程被执行时主线程不会等待,继续往下执行到没有代码时等待子线程执行完毕再退出。
'''

thread_obj.start()

当前线程准备就绪,等待cpu调度,具体调度时间由cpu决定。

import threadingnum = 0def add():global numfor i in range(100000):num += it = threading.Thread(target=add)
t.start()
print(num)

thread_obj.join()

等待子线程任务执行完毕后再继续向下执行

import threadingnum = 0def add():global numfor i in range(100000):num += it = threading.Thread(target=add)
t.start()
t.join()  # 主线程等待子线程任务结束时解堵塞
print(num)
import threadingnum = 0def add_():global numfor i in range(100000):num += idef sub_():global numfor i in range(100000):num -= it1 = threading.Thread(target=add_)
t2 = threading.Thread(target=sub_)
t1.start()
t1.join()  # 主线程等待子线程任务结束时解堵塞t2.start()
t2.join()
print(num)

thread_obj.setDaemon(bool: attr)

设置守护线程,需要在线程启动之前设置。

如果一个线程为守护线程则主线程执行完毕后不管子线程任务是否结束都自动退出。

当前参数默认为:False

import time
import threadingdef work():for i in range(5):print(i)time.sleep(1)t = threading.Thread(target=work)
t.setDaemon(True)
t.start()
print('主线程即将退出...')

thread_obj.current_thread()

获取当前运行的线程对象的引用

import threadingdef work():name = threading.current_thread().getName()print(name)for i in range(5):t = threading.Thread(target=work)t.setName(f'线程: {i}')t.start()

自定义线程类完成并发爬虫

import requests
import threadingclass ThreadSpider(threading.Thread):def __init__(self, url):super().__init__()  # 子类继承线程类如果需要重写构造函数必须先调用父类的构造函数self.url = urldef run(self):response = requests.get(self.url).contentfile_name = self.url.rsplit('/')[-1]with open(file_name, 'wb') as f:f.write(response)print('下载完成...')url_list = ['http://pic.bizhi360.com/bbpic/98/10798.jpg','http://pic.bizhi360.com/bbpic/92/10792.jpg','http://pic.bizhi360.com/bbpic/86/10386.jpg'
]for url in url_list:ts = ThreadSpider(url)ts.start()
线程安全

在之前的案例中我们使用了多个线程对一个全局变量进行修改的操作,如果多个线程都对一个全局变量进行操作的话会出现资源竞争的问题,会导致计算错误。

import threadingnum = 0def add_():global numfor i in range(100000):num += idef sub_():global numfor i in range(100000):num -= it1 = threading.Thread(target=add_)
t2 = threading.Thread(target=sub_)
t1.start()
t2.start()t1.join()
t2.join()
print(num)

使用线程锁解决上述问题

from threading import Thread, RLocknum = 0
lock_obj = RLock()def add_():global numfor i in range(100000):lock_obj.acquire()  # 申请锁,申请成功会让其他线程等待直到当前线程释放num += ilock_obj.release()  # 释放锁,当锁被释放后其他线程才能被cpu挂起执行def sub_():global numfor i in range(100000):lock_obj.acquire()num -= ilock_obj.release()t1 = Thread(target=add_)
t2 = Thread(target=sub_)
t1.start()
t2.start()t1.join()
t2.join()
print(num)

在上述案例中,我们在代码中手动申请锁与释放锁,RLock方法支持上下文管理协议,可以使用with语句帮助我们申请和释放锁。

from threading import Thread, RLocknum = 0
lock_obj = RLock()def add_():global numfor i in range(100000):with lock_obj:  # 使用上下文管理锁的申请与释放num += idef sub_():global numfor i in range(100000):with lock_obj:num -= it1 = Thread(target=add_)
t2 = Thread(target=sub_)
t1.start()
t2.start()t1.join()
t2.join()
print(num)

在多线程编程中,不是任何地方都需要加锁处理。有些内置类型已经具有了加锁的机制,例如:列表。

import threadingdata_list = list()def add_list():for _ in range(100000):data_list.append(i)print(len(data_list))for i in range(2):t = threading.Thread(target=add_list)t.start()

在线程中一般使用两种锁机制:LockRLock

Lock

同步锁:同步锁一般很少使用,不支持锁嵌套

from threading import Thread, Locknum = 0
lock_obj = Lock()def add_num():global numfor i in range(10000000):lock_obj.acquire()num += 1lock_obj.release()print(num)for _ in range(2):t = Thread(target=add_num)t.start()

RLock

递归锁:支持锁嵌套

from threading import Thread, RLocknum = 0
lock_obj = RLock()def add_num():global numfor i in range(100000):lock_obj.acquire()lock_obj.acquire()num += 1lock_obj.release()lock_obj.release()print(num)for _ in range(2):t = Thread(target=add_num)t.start()
死锁

在使用锁的过程中我们发现如果在同步锁中使用了嵌套则程序会卡死,这种情况我们称之为死锁。

死锁:由于资源竞争造成的一种堵塞现象。

# coding=utf-8
import threading
import timemutexA = threading.Lock()
mutexB = threading.Lock()class MyThread1(threading.Thread):def run(self):# 对mutexA上锁mutexA.acquire()# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁print(self.name + '----do1---up----')time.sleep(1)# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了mutexB.acquire()print(self.name + '----do1---down----')mutexB.release()# 对mutexA解锁mutexA.release()class MyThread2(threading.Thread):def run(self):# 对mutexB上锁mutexB.acquire()# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁print(self.name + '----do2---up----')time.sleep(1)# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了mutexA.acquire()print(self.name + '----do2---down----')mutexA.release()# 对mutexB解锁mutexB.release()if __name__ == '__main__':t1 = MyThread1()t2 = MyThread2()t1.start()t2.start()
线程池

创建线程对象的期间会损耗时间,尤其是在需要开辟大量线程对象的时候会发生性能下降的情况。那么我们能否让程序创建一定数量的线程对象,并且在执行完某一个任务后不会被解释器销毁,下一个任务重复使用之前所创建的线程对象。

像这种需要创建大量线程对象的场景推荐使用线程池。

线程池的创建

# 模拟网页请求
import time
from concurrent.futures import ThreadPoolExecutordef get_html(time_attr):time.sleep(time_attr)print(f'get page {time_attr} success')return time_attr# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)# 通过submit提交需要执行的函数到线程池中,并且submit是立即返回对象不会堵塞
task_1 = executor.submit(get_html, 3)
task_2 = executor.submit(get_html, 2)# done方法用于判定某个任务是否完成
print('task_1完成情况:', task_1.done())# 可以使用cancel取消任务 但是运行中的任务无法取消,可以将线程数量修改成1
# print('task_2任务取消:', task_2.cancel())# result方法可以获取任务的返回值 当前获取为阻塞
print('task_1返回结果:', task_1.result())

as_completed

获取已经执行成功的task的返回值

# 模拟网页请求
import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef get_html(time_attr):time.sleep(time_attr)print(f'get page {time_attr} success')return time_attr# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)# 批量提交任务并获取已经执行成功的task的返回值
time_attr_list = [3, 2, 5, 4]
all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list]
for future in as_completed(all_task):data = future.result()print(f'get page {data}')  # 线程任务只要执行完就能获取到返回值

map

获取已经执行成功的task的返回值 - 代码更精简

# 模拟网页请求
import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef get_html(time_attr):time.sleep(time_attr)print(f'get page {time_attr} success')return time_attr# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)# 批量提交任务并获取已经执行成功的task的返回值
time_attr_list = [3, 2, 5, 4]# 通过executor对象中的map获取已经完成的任务的返回值
for data in executor.map(get_html, time_attr_list):print(f'get page {data}')  # 当前打印的返回值顺序与列表顺序一致# all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list]
# for future in as_completed(all_task):
#     data = future.result()
#     print(f'get page {data}')  # 线程任务只要执行完就能获取到返回值

wait

等待指定任务完成后主线程解堵塞

import time
from concurrent.futures import ThreadPoolExecutor, waitdef get_html(time_attr):time.sleep(time_attr)print(f'get page {time_attr} success')return time_attr# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)# 批量提交任务并获取已经执行成功的task的返回值
time_attr_list = [3, 2, 5, 4]all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list]
wait(all_task)
print('主线程执行...')

http://www.ppmy.cn/server/164822.html

相关文章

[EAI-028] Diffusion-VLA,能够进行多模态推理和机器人动作预测的VLA模型

Paper Card 论文标题:Diffusion-VLA: Scaling Robot Foundation Models via Unified Diffusion and Autoregression 论文作者:Junjie Wen, Minjie Zhu, Yichen Zhu, Zhibin Tang, Jinming Li, Zhongyi Zhou, Chengmeng Li, Xiaoyu Liu, Yaxin Peng, Chao…

【线程】基于环形队列的生产者消费者模型

1 环形队列 环形队列采用数组来模拟,用取模运算来模拟环状特性。 1.如何判断环形队列为空或者为满? 当环形队列为空时,头和尾都指向同一个位置。当环形队列为满时,头和尾也都指向同一个位置。 因此, 可以通过加计数器或者标记…

使用Ollama 在Ubuntu运行deepseek大模型:以deepseek-r1为例

deepseek大模型上热搜啦! 咱们来亲身感受下DeepSeek模型的魅力吧! 整个操作流程非常简单方便,只需要2步,先安装Ollama,然后执行大模型即可。 支持的deepseek-r1模型 deepseek-r1 DeepSeek-R1-Distill-Qwen-1.5B …

利用Vue和javascript分别编写一个“Hello World”的定时更新

目录 一、利用Vue编写一个“Hello World”的定时更新(1)vue编码在Html文件中(2)vue编码在js文件中 二、利用javascript编写一个“Hello World”的定时更新 一、利用Vue编写一个“Hello World”的定时更新 (1&#xff…

院校联合以项目驱动联合培养医工计算机AI人才路径探析

一、引言 1.1 研究背景与意义 在科技飞速发展的当下,医疗人工智能作为一个极具潜力的新兴领域,正深刻地改变着传统医疗模式。从疾病的早期诊断、个性化治疗方案的制定,到药物研发的加速,人工智能技术的应用极大地提升了医疗服务…

OceanBase 读写分离探讨

版本信息 OceanBase: 4.2.1.10 OBProxy: 4.3.3.0 租户类型: MySQL租户 弱一致性读 官方声明 默认情况下,所有的请求都是发送到数据的 Leader 副本上,即强一致性的请求,因为 OLAP 的分析计算,一般对于数据的一致性要求不高&…

(9)下:学习与验证 linux 里的 epoll 对象里的 EPOLLIN、 EPOLLHUP 与 EPOLLRDHUP 的不同。小例子的实验

(4)本实验代码的蓝本,是伊圣雨老师里的课本里的代码,略加改动而来的。 以下是 服务器端的代码: 每当收到客户端的报文时,就测试一下对应的 epoll 事件里的事件标志,不读取报文内容,…

【Spark速通】

Spark介绍 Spark诞生的背景与原因: MapReduce的局限性:MapReduce语义简单,仅支持map和reduce两种操作,编程范式强制拆分,语义死板且不丰富;运行速度慢,大量与磁盘交互以节约内存,却…