多线程异步线程是我们常用的,如我们在执行耗时操作,又不想卡用主程序 ;
1. QThread
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget, QPushButton
import timeclass WorkerThread(QThread):progress = pyqtSignal(int) # 定义信号def __init__(self,main_instance):QThread.__init__(self)self.main_instance = main_instancedef run(self):for i in range(1, 101):self.main_instance.excuteSomeThing()self.progress.emit(i) # 发送信号class MainWindow(QWidget):def __init__(self):super().__init__()self.resize(800, 600)self.initUI()def initUI(self):self.label = QLabel("进度: 0")self.button = QPushButton("开始任务")self.button.clicked.connect(self.start_task)layout = QVBoxLayout()layout.addWidget(self.label)layout.addWidget(self.button)self.setLayout(layout)def excuteSomeThing(self):time.sleep(0.1) # 模拟耗时操作def start_task(self):self.worker = WorkerThread(self)self.worker.progress.connect(self.update_label) # 连接信号到槽函数self.worker.start() # 启动线程def update_label(self, value):self.label.setText(f"进度: {value}")app = QApplication([])
window = MainWindow()
window.show()
app.exec_()
子线程中回调主线程函数执行,在子线程;
2. QThreadPool
from PyQt5.QtCore import QRunnable, QThreadPool, pyqtSignal, QObject
from PyQt5.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget, QPushButton
import timeclass WorkerSignals(QObject):progress = pyqtSignal(int)class Worker(QRunnable):def __init__(self):super().__init__()self.signals = WorkerSignals()def run(self):for i in range(1, 101):time.sleep(0.01) # 模拟耗时操作self.signals.progress.emit(i) # 发送信号class MainWindow(QWidget):def __init__(self):super().__init__()self.resize(800, 600)self.initUI()self.thread_pool = QThreadPool()def initUI(self):self.label = QLabel("进度: 0")self.button = QPushButton("开始任务")self.button.clicked.connect(self.start_task)layout = QVBoxLayout()layout.addWidget(self.label)layout.addWidget(self.button)self.setLayout(layout)def start_task(self):worker = Worker()worker.signals.progress.connect(self.update_label)self.thread_pool.start(worker)def update_label(self, value):self.label.setText(f"进度: {value}")app = QApplication([])
window = MainWindow()
window.show()
app.exec_()
3.concurrent
from PyQt5.QtCore import pyqtSignal, QObject
from PyQt5.QtWidgets import QApplication, QLabel, QVBoxLayout, QWidget, QPushButton
from concurrent.futures import ThreadPoolExecutor
import timeclass Worker(QObject):progress = pyqtSignal(int)def do_work(self):for i in range(1, 101):time.sleep(0.021) # 模拟耗时操作self.progress.emit(i)class MainWindow(QWidget):def __init__(self):super().__init__()self.resize(800, 600)self.initUI()self.executor = ThreadPoolExecutor(max_workers=10)def initUI(self):self.label = QLabel("进度: 0")self.button = QPushButton("开始任务")self.button.clicked.connect(self.start_task)layout = QVBoxLayout()layout.addWidget(self.label)layout.addWidget(self.button)self.setLayout(layout)def start_task(self):self.worker = Worker()self.worker.progress.connect(self.update_label)self.executor.submit(self.worker.do_work)def update_label(self, value):self.label.setText(f"进度: {value}")app = QApplication([])
window = MainWindow()
window.show()
app.exec_()
总结
QThread
:适合需要自定义线程逻辑的场景。QRunnable + QThreadPool
:适合轻量级、高并发任务。concurrent.futures
:简单结合信号与槽机制使用线程池。