一个简单的OPC UA/ModbusTCP 网关(Python)

news/2024/11/28 22:38:31/

        使用我前面几篇博文的内容,能够使用Python编写一个最简单的OPC UA /ModbusTCP网关。

从这个程序可以看出:

  1. 应用OPC UA 并不难,现在我们就可以应用到工程应用中,甚至DIY项目也可以。不必采用复杂的工具软件。
  2. 使用Python 来构建工业自动化领域的原型机程序是不错的选择。

OPCUA_modbus 网关

import sys
sys.path.insert(0, "..")
import time
from opcua import  Server
from pyModbusTCP.client import ModbusClient # Modbus TCP Client
from pyModbusTCP import utils 
if __name__ == "__main__":# setup our serverserver = Server()server.set_endpoint("opc.tcp://127.0.0.1:48400/freeopcua/server/")# setup our own namespace, not really necessary but should as specuri = "http://examples.freeopcua.github.io"idx = server.register_namespace(uri)# get Objects node, this is where we should put our nodesobjects = server.get_objects_node()# populating our address spacemyobj = objects.add_object(idx, "MyObject")myvar = myobj.add_variable(idx, "MyVariable", 6.7)myvar.set_writable()    # Set MyVariable to be writable by clientsModbusInterface = ModbusClient(host="localhost", port=502, unit_id=1, auto_open=True, auto_close=False) # starting!server.start()try:count = 0while True:time.sleep(1)reg_l=ModbusInterface.read_input_registers(0,2)val=utils.word_list_to_long(reg_l)print(utils.decode_ieee(val[0],False)) myvar.set_value(utils.decode_ieee(val[0],False))finally:#close connection, remove subcsriptions, etcserver.stop()

modbusTCP服务器程序

import argparse
from pyModbusTCP.server import ModbusServer, DataBank
from pyModbusTCP import utils
from datetime import datetimeimport numpy as np 
Fs = 8000
f = 50
x=0
coil_state=True 
class MyDataBank(DataBank):"""A custom ModbusServerDataBank for override get_holding_registers method."""def __init__(self):# turn off allocation of memory for standard modbus object types# only "holding registers" space will be replaced by dynamic build values.super().__init__(virtual_mode=True)def get_coils(self, address, number=1, srv_info=None):global coil_statecoil_state=not coil_statereturn coil_statedef get_holding_registers(self, address, number=1, srv_info=None):"""Get virtual holding registers."""# populate virtual registers dict with current datetime valuesnow = datetime.now()return now.seconddef get_input_registers(self, address, number=1, srv_info=None):global xwave=np.sin(2 * np.pi * f * x / Fs)*10x=x+1b32_l=[utils.encode_ieee(wave,False)]b16_l = utils.long_list_to_word(b32_l)print(b16_l)              return  b16_lif __name__ == '__main__':# parse argsparser = argparse.ArgumentParser()parser.add_argument('-H', '--host', type=str, default='localhost', help='Host (default: localhost)')parser.add_argument('-p', '--port', type=int, default=502, help='TCP port (default: 502)')args = parser.parse_args()# init modbus server and start itserver = ModbusServer(host=args.host, port=args.port, data_bank=MyDataBank())server.start()

        最后透过uaExpert 程序访问OPCUA Server。可以看到myVar 变量在变化。

 OPCUA 客户端 

import sys
sys.path.insert(0, "..")
import numpy as np 
import matplotlib.pyplot as plt
from opcua import Clientx = np.arange(0,1000,1,dtype=np.int16)
y=np.arange(-10,10,0.02,dtype=np.float32)
if __name__ == "__main__":client = Client("opc.tcp://localhost:48400/freeopcua/server/")# client = Client("opc.tcp://admin@localhost:4840/freeopcua/server/") #connect using a usertry:client.connect()# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objectsroot = client.get_root_node()print("Objects node is: ", root)# Node objects have methods to read and write node attributes as well as browse or populate address spaceprint("Children of root are: ", root.get_children())while True:myvar = root.get_child(["0:Objects", "2:MyObject", "2:MyVariable"])obj = root.get_child(["0:Objects", "2:MyObject"])print("myvar is: ", myvar.get_value())y=np.append(y,myvar.get_value())y=np.delete(y, 0, axis=0)      plt.clf()plt.plot(x, y, ls="-", lw=2, label="plot figure")plt.legend()plt.show()plt.pause(0.1)# Stacked myvar access# print("myvar is: ", root.get_children()[0].get_children()[1].get_variables()[0].get_value())finally:client.disconnect()

看出来了吧这些程序都短小精悍。编写程序是学习计算机网络协议最好的方法。


http://www.ppmy.cn/news/676613.html

相关文章

彻底清除Trojan.DL.Delf.cxw的方法

症状:瑞星监控经常检测到C:/%windows/%system32/brlmon.dll 感染Trojan.DL.Delf.cxw,病毒总是出现,查杀不干净。 解决方法: 1、启动计算机,按F8键进入系统安全模式; 2、鼠标双击打开“我的电脑”&#xff0…

简单的登录系统(Java版)

简单的登录系统 class Check{public boolean validate(String name, String password){return name.equals("cxw") && password.equals("msyq");} }class Operate{private final String[] info;public Operate(String[] info){this.infoinfo;}pub…

它来了,Flutter 应用内调试工具 UME 开源啦

作者:字节跳动终端技术 —— 赵瑞 先说重点 Pub 地址:https://pub.dev/packages/flutter_umeGitHub 地址:https://github.com/bytedance/flutter_ume 背景 字节跳动已有累计超过 70 款 App 使用了 Flutter 技术,公司内有超过 …

基于TransportClient整合

1、什么是ElasticSearch? ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业…

Java避免过多的if else

避免过多的if else 在Java 代码里会遇见很多的if else, 最近在坐企业微信回调处理的时候, 根据不同事件状态进行不同的业务, 状态大概有8个, 所以if else 也有8个, 特别不优雅, 所以整理一下常见的方法&…

字节跳动出品的 Flutter 应用内调试工具 UME 正式开源

2020 年 11 月,西瓜 Flutter 团队在字节跳动技术团队公众号上发布了 UME 工具的功能介绍和预览,今年 3 月 25 日的 Flutter Engage China 活动上,字节跳动 Flutter 技术负责人袁辉辉在 介绍 Flutter 业务的大规模落地的演讲 中也到了 UME 工具…

ArrayList使用详解

概述 ArrayList是Java中使用率很高的集合容器,面试频率也很高,本篇文章主要对ArrayList这个容器从功能到源码做一个深入解析,使用的是jdk8来讲解。 功能介绍 ArrayList是一个有顺序的容器,底层是一个数组,不过它是会…

CVPR2022车道线检测Efficient Lane Detection via Curve Modeling

分享前段时间看的一篇车道线检测方向的新工作,也是中了最近公开结果的2022CVPR,是上海交大、华东师大、香港城市大学和商汤科技合作完成的,代码已经开源。关于车道线检测任务,我之前也分享过几篇文章: 相关链接&#…