使用Python和PyQt5实现Modbus TCP协议下的量程切换功能
在工业自动化系统中,设备的量程切换是一个常见的需求。本文将介绍如何使用Python和PyQt5开发一个图形用户界面(GUI)应用程序,以通过Modbus TCP协议向设备发送特定的指令,实现量程的切换(从大量程到小量程,反之亦然)。
一、需求概述
根据您的需求,我们需要实现以下功能:
- 切换指令:
- 大量程指令:
[0x01, 0x00, 0x00, 0x00, 0x04, 0x06, 0x01, 0x10, 0x00, 0xC8, 0x00, 0x02, 0x04, 0x00, 0x02, 0x00, 0x02]
- 小量程指令:
[0x01, 0x00, 0x00, 0x00, 0x04, 0x06, 0x01, 0x10, 0x00, 0xC8, 0x00, 0x02, 0x04, 0x00, 0x01, 0x00, 0x01]
- 大量程指令:
- Modbus TCP服务器配置:
- IP地址:
192.168.0.150
- 端口:
6789
- IP地址:
- 开发工具:
- 编程语言:Python
- GUI框架:PyQt5
我们将通过发送上述字节序列的方式,向指定的Modbus TCP服务器发送指令,实现量程的切换。
二、技术实现方案
为了满足需求,我们将采取以下步骤:
- 建立GUI界面:使用PyQt5创建一个简单的界面,包含两个按钮:“设置大量程”和“设置小量程”。
- 发送Modbus TCP指令:使用Python的
socket
库,通过TCP连接向指定的服务器发送相应的指令。 - 异步处理:为了确保GUI的响应性,我们将使用多线程或异步编程,防止发送指令时界面卡顿。
- 反馈机制:在发送指令后,向用户显示成功或失败的信息。
三、详细实现步骤
3.1 安装必要的库
确保已安装PyQt5
库。如果尚未安装,可以使用以下命令进行安装:
pip install PyQt5
3.2 编写Python代码
以下是完整的Python代码示例,使用PyQt5创建GUI,并通过TCP发送Modbus指令:
python">import sys
import socket
from PyQt5.QtWidgets import (QApplication, QWidget, QPushButton, QVBoxLayout, QLabel, QMessageBox
)
from PyQt5.QtCore import Qt, QThread, pyqtSignalclass ModbusThread(QThread):# 定义信号,用于向主线程发送消息finished = pyqtSignal(str)def __init__(self, command, ip, port):super().__init__()self.command = commandself.ip = ipself.port = portdef run(self):try:# 创建TCP/IP套接字with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:# 设置连接超时时间为5秒sock.settimeout(5)# 连接到服务器sock.connect((self.ip, self.port))# 发送指令sock.sendall(bytearray(self.command))# 接收响应(根据具体设备的响应而定,此处假设响应长度为8字节)response = sock.recv(1024)# 可以根据需要解析响应,这里仅简单地返回成功信息self.finished.emit("指令发送成功,收到响应:" + response.hex())except socket.timeout:self.finished.emit("连接超时,请检查服务器是否可达。")except socket.error as e:self.finished.emit(f"Socket错误: {e}")except Exception as e:self.finished.emit(f"发生错误: {e}")class ModbusApp(QWidget):def __init__(self):super().__init__()self.initUI()# 配置Modbus服务器的IP和端口self.modbus_ip = '192.168.0.150'self.modbus_port = 6789# 定义指令self.large_scale_command = [0x01, 0x00, 0x00, 0x00, 0x04, 0x06, 0x01, 0x10, 0x00, 0xC8, 0x00, 0x02, 0x04, 0x00, 0x02, 0x00, 0x02]self.small_scale_command = [0x01, 0x00, 0x00, 0x00, 0x04, 0x06, 0x01, 0x10, 0x00, 0xC8, 0x00, 0x02, 0x04, 0x00, 0x01, 0x00, 0x01]def initUI(self):self.setWindowTitle('Modbus TCP量程切换')self.setGeometry(100, 100, 300, 200)layout = QVBoxLayout()self.label = QLabel('请选择要设置的量程模式:', self)self.label.setAlignment(Qt.AlignCenter)layout.addWidget(self.label)self.large_scale_btn = QPushButton('设置大量程', self)self.large_scale_btn.clicked.connect(self.set_large_scale)layout.addWidget(self.large_scale_btn)self.small_scale_btn = QPushButton('设置小量程', self)self.small_scale_btn.clicked.connect(self.set_small_scale)layout.addWidget(self.small_scale_btn)self.status_label = QLabel('', self)self.status_label.setAlignment(Qt.AlignCenter)layout.addWidget(self.status_label)self.setLayout(layout)def set_large_scale(self):self.status_label.setText('正在发送大量程指令...')self.send_command(self.large_scale_command)def set_small_scale(self):self.status_label.setText('正在发送小量程指令...')self.send_command(self.small_scale_command)def send_command(self, command):# 创建并启动Modbus线程self.thread = ModbusThread(command, self.modbus_ip, self.modbus_port)self.thread.finished.connect(self.on_finished)self.thread.start()def on_finished(self, message):# 更新状态标签self.status_label.setText(message)# 弹出消息框QMessageBox.information(self, '信息', message)def main():app = QApplication(sys.argv)window = ModbusApp()window.show()sys.exit(app.exec_())if __name__ == '__main__':main()
3.3 代码解析
3.3.1 模块导入
python">import sys
import socket
from PyQt5.QtWidgets import (QApplication, QWidget, QPushButton, QVBoxLayout, QLabel, QMessageBox
)
from PyQt5.QtCore import Qt, QThread, pyqtSignal
sys
:用于系统相关的功能,如退出应用。socket
:用于建立TCP连接和发送接收数据。PyQt5.QtWidgets
:用于创建GUI组件。PyQt5.QtCore
:用于多线程和信号通信。
3.3.2 创建ModbusThread类
python">class ModbusThread(QThread):# 定义信号,用于向主线程发送消息finished = pyqtSignal(str)def __init__(self, command, ip, port):super().__init__()self.command = commandself.ip = ipself.port = portdef run(self):try:# 创建TCP/IP套接字with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:# 设置连接超时时间为5秒sock.settimeout(5)# 连接到服务器sock.connect((self.ip, self.port))# 发送指令sock.sendall(bytearray(self.command))# 接收响应(根据具体设备的响应而定,此处假设响应长度为1024字节)response = sock.recv(1024)# 可以根据需要解析响应,这里仅简单地返回成功信息self.finished.emit("指令发送成功,收到响应:" + response.hex())except socket.timeout:self.finished.emit("连接超时,请检查服务器是否可达。")except socket.error as e:self.finished.emit(f"Socket错误: {e}")except Exception as e:self.finished.emit(f"发生错误: {e}")
- ModbusThread:继承自
QThread
,用于在子线程中执行发送指令的操作,避免阻塞主线程。 - 信号:定义
finished
信号,用于将操作结果传递回主线程。 run
方法:执行发送指令的具体逻辑,包括建立连接、发送指令、接收响应,并通过信号传递结果。
3.3.3 创建ModbusApp类
python">class ModbusApp(QWidget):def __init__(self):super().__init__()self.initUI()# 配置Modbus服务器的IP和端口self.modbus_ip = '192.168.0.150'self.modbus_port = 6789# 定义指令self.large_scale_command = [0x01, 0x00, 0x00, 0x00, 0x04, 0x06, 0x01, 0x10, 0x00, 0xC8, 0x00, 0x02, 0x04, 0x00, 0x02, 0x00, 0x02]self.small_scale_command = [0x01, 0x00, 0x00, 0x00, 0x04, 0x06, 0x01, 0x10, 0x00, 0xC8, 0x00, 0x02, 0x04, 0x00, 0x01, 0x00, 0x01]
- 初始化:设置服务器的IP和端口,以及定义切换量程所需的指令。
3.3.4 初始化GUI界面
python"> def initUI(self):self.setWindowTitle('Modbus TCP量程切换')self.setGeometry(100, 100, 300, 200)layout = QVBoxLayout()self.label = QLabel('请选择要设置的量程模式:', self)self.label.setAlignment(Qt.AlignCenter)layout.addWidget(self.label)self.large_scale_btn = QPushButton('设置大量程', self)self.large_scale_btn.clicked.connect(self.set_large_scale)layout.addWidget(self.large_scale_btn)self.small_scale_btn = QPushButton('设置小量程', self)self.small_scale_btn.clicked.connect(self.set_small_scale)layout.addWidget(self.small_scale_btn)self.status_label = QLabel('', self)self.status_label.setAlignment(Qt.AlignCenter)layout.addWidget(self.status_label)self.setLayout(layout)
- 窗口标题和大小:设置窗口的标题和初始大小。
- 布局:使用垂直布局(
QVBoxLayout
)组织组件。 - 组件:
- 标签:提示用户选择量程模式。
- 按钮:两个按钮分别用于设置大量程和小量程。
- 状态标签:显示操作结果。
3.3.5 定义按钮点击事件
python"> def set_large_scale(self):self.status_label.setText('正在发送大量程指令...')self.send_command(self.large_scale_command)def set_small_scale(self):self.status_label.setText('正在发送小量程指令...')self.send_command(self.small_scale_command)
set_large_scale
和set_small_scale
:分别对应“设置大量程”和“设置小量程”按钮的点击事件,更新状态标签并调用send_command
方法发送指令。
3.3.6 发送指令的方法
python"> def send_command(self, command):# 创建并启动Modbus线程self.thread = ModbusThread(command, self.modbus_ip, self.modbus_port)self.thread.finished.connect(self.on_finished)self.thread.start()def on_finished(self, message):# 更新状态标签self.status_label.setText(message)# 弹出消息框QMessageBox.information(self, '信息', message)
send_command
:创建一个ModbusThread
线程,传递指令、IP和端口,连接信号,并启动线程。on_finished
:接收线程发送的信号,更新状态标签并弹出消息框显示操作结果。
3.3.7 主函数
python">def main():app = QApplication(sys.argv)window = ModbusApp()window.show()sys.exit(app.exec_())if __name__ == '__main__':main()
main
函数:初始化并运行PyQt5应用程序。
3.4 运行代码
将上述代码保存为modbus_switch.py
,然后在终端或命令提示符中运行:
python modbus_switch.py
四、代码运行效果
运行程序后,您将看到一个简单的窗口,包含以下组件:
- 标签:提示用户选择量程模式。
- “设置大量程”按钮:点击后,程序将发送大量程指令。
- “设置小量程”按钮:点击后,程序将发送小量程指令。
- 状态标签:显示当前操作的状态信息。
- 消息框:在指令发送完成后,弹出消息框显示操作结果。
运行结果
点击切换大量程:
点击切换小量程:
与网络串口助手收的结果一致:
五、注意事项
- 指令正确性:确保发送的指令与设备的协议规范一致。根据您提供的指令,程序将发送特定的字节序列。如果设备有特定的响应格式,您可能需要根据实际情况解析响应。
- 网络配置:确保运行该程序的计算机与Modbus TCP服务器在同一网络或能够相互访问。检查防火墙设置,确保端口
6789
未被阻塞。 - 错误处理:程序中已包含基本的错误处理,如连接超时和Socket错误。根据需要,您可以扩展错误处理逻辑。
- 多线程:使用子线程发送指令,确保GUI的响应性。如果发送指令的时间较长,考虑进一步优化线程管理。
- 安全性:在生产环境中,考虑使用加密和身份验证机制,确保通信的安全性。
六、扩展功能建议
- 指令验证:在发送指令前,添加对指令格式和内容的验证,确保指令的合法性。
- 日志记录:添加日志功能,记录发送的指令和接收的响应,便于后续分析和故障排查。
- 配置选项:将Modbus服务器的IP和端口、指令等配置项外部化(如使用配置文件),提高程序的灵活性。
- 高级响应解析:根据设备的响应协议,解析并显示更多详细的信息,如错误码、执行状态等。
- 用户认证:在GUI中添加用户认证功能,确保只有授权用户可以操作量程切换。
七、结语
通过本文的介绍,您可以使用Python和PyQt5开发一个简单而有效的Modbus TCP量程切换工具。该工具通过发送特定的Modbus指令,实现设备量程的切换,并提供友好的用户界面和基本的错误处理。根据实际需求,您可以进一步扩展和优化该工具,以满足更复杂的应用场景。
如果在开发过程中遇到问题,建议参考以下资源:
- PyQt5官方文档
- Python
socket
库文档 - Modbus协议规范