基于python-socket构建任务服务器(基于socket发送指令创建、停止任务)

news/2024/12/23 2:25:32/

在实现ia业务服务器时需要构建一个python-socket客户端,1、要求能与服务器保持心跳连接,每10秒钟发送一次心跳信号;2、要求能根据socket服务器发送的指令创建或终止一个定时任务。
为此以3个类实现该功能,分别为socket通信类(用于实现通信连接与任务创建)、任务池类(用于管理任务)、任务类(用于实现具体任务)。

1、socket通信客户端

这里定义的MySocket类主体结构如下图所示,共包含4个函数,2个线程(其本身继承Thread类实现主任务流程——run函数、接收服务器信息并创建任务添加到任务池 或者接收服务器返回的心跳数据;同时又在__init__函数中将self.thread_msg类封装为一个线程,每隔10秒钟向socket服务器发送一次心跳包)。check_connection函数用于检测socket是否与服务器断开连接,在send_msg函数中调用,当发现客户端掉线后则立刻进行重连。send_msg函数用于发送信息给服务器,因为run函数与thread_msg函数2个线程都需要调用连接与服务器发送数据,为避免冲突故而定义为函数在内部进行加锁。这里的关键点在于,有多个线程可以发送数据(thread_msg与run线程),但是只有一个线程可以接收数据(run函数),单一线程接收数据可以避免服务器发送的数据存在冲突(两个线程同时接收数据就会存在死锁)
在这里插入图片描述

#socket客户端
class MySocket(Thread):def __init__(self,config):super().__init__()# 1.创建套接字self.tcp_socket = socket(AF_INET,SOCK_STREAM)self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护# 2.准备连接服务器,建立连接self.serve_ip = config["serve_ip"]#当前"118.24.111,149"self.serve_port = config["serve_port"]  #端口当前7900self.sleep_time = config["sleep_time"]print("connect to : ",self.serve_ip,self.serve_port)self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式self.lock = threading.RLock()self.taskpool=TaskPool()task_msg=threading.Thread(target=self.thread_msg)task_msg.daemon = Truetask_msg.start()#定时发送信息def run(self):while True:a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1ka=a.decode('utf-8')if len(a)<66:#此时的数据为服务器返回的心跳数据continueprint("------主线程-----",a)jdata=json.loads(a)#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}task=OCRTask(jdata)self.taskpool.append(task)json_data={  "type":"OCR_STATE_ACK","timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"streamAddr": jdata["streamAddr"]}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)self.send_msg(data)def check_connection(self):try:self.tcp_socket.getpeername()return Trueexcept socket.error:return False#定时发送心跳信息def thread_msg(self):while True:#message=input('You can say:')#json标注的模板json_data={  "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"type":"HEARBEAT"}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)#进行定时发送self.send_msg(data)def send_msg(self,msg):if self.check_connection() is False:print('服务器掉线!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式try:#进行定时发送self.lock.acquire()self.tcp_socket.send(msg)self.lock.release()except ConnectionRefusedError:print('服务器拒绝本次连接!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except TimeoutError:print('连接超时!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except OSError:self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式print('智能终端无网络连接!!!!!')

2、任务池实现

任务池的实现代码如下所示,主要包含3个函数(其中将remove_task封装为一个子线程,用于实时移除已经完成计算任务的线程),append函数用于将新创建的任务添加大任务池pool中,stop函数用于停止并移除正在运行中的任务。
在这里插入图片描述
其具体实现代码如下所示,其作为MySocket类中的一个成员属性,每当MySocket接收到服务器信息创建任务ocrtask后都调用TaskPool.append(ocrtask)将任务添加到任务池中。由任务池管理任务的声明周期,具体可见其append函数可以启动task或终止task。remove_task线程会自动将已经完成的任务移除。

#ocr任务线程池
class TaskPool:def __init__(self,sleep_time=0.5):self.pool=[]self.sleep_time=sleep_timetask_msg=threading.Thread(target=self.remove_task)task_msg.daemon = Truetask_msg.start()#删除已经结束的任务def remove_task(self):while True:names=[]for task in self.pool:if task.get_count()==0: #生存时间为0,认为该任务已经完成需要被删除task.stop()self.pool.remove(task)else:names.append(task.taskname)if len(names)>0:print(names)time.sleep(self.sleep_time)def append(self,ocrtask):if ocrtask.state==0:#终止任务self.stop(ocrtask)else:#启动任务ocrtask.start()self.pool.append(ocrtask)#终止任务def stop(self,ocrtask):for task in self.pool:if task.taskname==ocrtask.taskname:task.stop()self.pool.remove(task)

3、具体任务线程

任务的实现代码如下所示,其支持3中任务模式,使用state区分任务,state为0-停止识别,1-连续识别count张,2-持续识别(故而在state为2时将count设置的特别大)。这里以count控制任务的运行,任务每运行一次count减少1。当count小于等于0,则表示任务运行完成。在TaskPool的remove_task中检测到count为0时则会自动删除任务。

#ocr任务
class OCRTask(Thread):def __init__(self,json):super().__init__()self.streamAddr=json["streamAddr"]self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别if json["state"]==2:self.count=9999999999999999999999999else:self.count=json["count"]if "taskname" in json.keys():self.taskname=json["taskname"]else:self.taskname=json["streamAddr"]self.jsonname=json["jsonname"]self.lock = threading.RLock()def run(self):while self.get_count()>0:print('run %s'%self.taskname,end='*')time.sleep(2)self.lock.acquire()self.count-=1self.lock.release()print('%s finish!! '%self.taskname)#获取任务的生存时间def get_count(self):self.lock.acquire()now_count=self.countself.lock.release()#削减countreturn now_count#停止任务def stop(self):self.lock.acquire()self.count=-1self.lock.release()#停止任务pass

4、完整代码与使用效果

完整代码如下所示

from socket import *
import time,json
import yaml
import threading,struct
from threading import Threaddef hex_to_bytes(hex_str):""":param hex_str: 16进制字符串:return: byte_data 字节流数据"""bytes_data = bytes()while hex_str :"""16进制字符串转换为字节流"""temp = hex_str[0:2]s = int(temp, 16)bytes_data += struct.pack('B', s)hex_str = hex_str[2:]return bytes_data# 读取Yaml文件方法
def read_yaml(yaml_path):with open(yaml_path, encoding="utf-8", mode="r") as f:result = yaml.load(stream=f,Loader=yaml.FullLoader)return result#ocr任务
class OCRTask(Thread):def __init__(self,json):super().__init__()self.streamAddr=json["streamAddr"]self.state=json["state"] # 0-停止识别,1-连续识别count张,2-持续识别if json["state"]==2:self.count=9999999999999999999999999else:self.count=json["count"]if "taskname" in json.keys():self.taskname=json["taskname"]else:self.taskname=json["streamAddr"]self.jsonname=json["jsonname"]self.lock = threading.RLock()def run(self):while self.get_count()>0:print('run %s'%self.taskname,end='*')time.sleep(2)self.lock.acquire()self.count-=1self.lock.release()print('%s finish!! '%self.taskname)#获取任务的生存时间def get_count(self):self.lock.acquire()now_count=self.countself.lock.release()#削减countreturn now_count#停止任务def stop(self):self.lock.acquire()self.count=-1self.lock.release()#停止任务pass#ocr任务线程池
class TaskPool:def __init__(self,sleep_time=0.5):self.pool=[]self.sleep_time=sleep_timetask_msg=threading.Thread(target=self.remove_task)task_msg.daemon = Truetask_msg.start()#删除已经结束的任务def remove_task(self):while True:names=[]for task in self.pool:if task.get_count()==0:task.stop()self.pool.remove(task)else:names.append(task.taskname)if len(names)>0:print(names)time.sleep(self.sleep_time)def append(self,ocrtask):if ocrtask.state==0:#终止任务self.stop(ocrtask)else:#启动任务ocrtask.start()self.pool.append(ocrtask)#终止任务def stop(self,ocrtask):for task in self.pool:if task.taskname==ocrtask.taskname:task.stop()self.pool.remove(task)#socket客户端
class MySocket(Thread):def __init__(self,config):super().__init__()# 1.创建套接字self.tcp_socket = socket(AF_INET,SOCK_STREAM)self.tcp_socket.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1) #在客户端开启心跳维护# 2.准备连接服务器,建立连接self.serve_ip = config["serve_ip"]#当前"118.24.111,149"self.serve_port = config["serve_port"]  #端口当前7900self.sleep_time = config["sleep_time"]print("connect to : ",self.serve_ip,self.serve_port)self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式self.lock = threading.RLock()self.taskpool=TaskPool()task_msg=threading.Thread(target=self.thread_msg)task_msg.daemon = Truetask_msg.start()#定时发送信息#通信线程-用于接收服务器的指令def run(self):while True:a=self.tcp_socket.recv(1024)#接受服务端的信息,最大数据为1ka=a.decode('utf-8')if len(a)<66:#服务器返回的心跳包,不予处理continueprint("------主线程-----",a)jdata=json.loads(a)#jdata={"streamAddr":"rtmp://adasdasdxcvsdfj.sdfdsfsd","state":1,"count":5,"taskname":"aaa","jsonname":"a.json"}task=OCRTask(jdata)self.taskpool.append(task)json_data={  "type":"OCR_STATE_ACK","timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"streamAddr": jdata["streamAddr"]}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)self.send_msg(data)#检测socket连接是否断开def check_connection(self):try:self.tcp_socket.getpeername()return Trueexcept socket.error:return False#定时发送心跳信息--子线程def thread_msg(self):while True:#message=input('You can say:')#json标注的模板json_data={  "timestamp": int(time.time()*10),#时间戳放大一位和格式要求的长度保持一致"type":"HEARBEAT"}#print( json_data)message = json.dumps(json_data)data='{:08X}'.format(len(message))+message.encode('utf-8').hex().upper()data=hex_to_bytes(data)#进行定时发送self.send_msg(data)#发送信息def send_msg(self,msg):if self.check_connection() is False:print('服务器掉线!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式try:#进行定时发送self.lock.acquire()self.tcp_socket.send(msg)self.lock.release()except ConnectionRefusedError:print('服务器拒绝本次连接!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except TimeoutError:print('连接超时!!!!!')self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式except OSError:self.tcp_socket.connect((self.serve_ip,self.serve_port))  # 连接服务器,建立连接,参数是元组形式print('智能终端无网络连接!!!!!')if "__main__"==__name__:#进行定时通信测试config=read_yaml("config.yaml")socket_client=MySocket(config)socket_client.start()

使用效果如下所示,这里基于socket调试工具作为客户端

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


http://www.ppmy.cn/news/1365017.html

相关文章

【算法与数据结构】1971、LeetCode寻找图中是否存在路径

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;本题应用并查集的理论直接就可以解决&#xff1a;【算法与数据结构】回溯算法、贪心算法、动态规划、图…

【Java EE初阶二十六】简单的表白墙(二)

2. 后端服务器部分 2.1 服务器分析 2.2 代码编写 2.2.2 前端发起一个ajax请求 2.2.3 服务器读取上述请求,并计算出响应 服务器需要使用 jackson 读取到前端这里的数据,并且进行解析&#xff1a; 代码运行图&#xff1a; 2.2.4 回到前端代码&#xff0c;处理服务器返回的响应…

kubectl 声明式资源管理方式

目录 介绍 YAML 语法格式 命令 应用yaml文件指定的资源 删除yaml文件指定的资源 查看资源的yaml格式信息 查看yaml文件字段说明 查看 api 资源版本标签 修改yaml文件指定的资源 离线修改 在线修改 编写yaml文件 创建资源对象 查看创建的pod资源 创建service服务对…

【数据结构】从链表到LinkedList类

&#x1f9e7;&#x1f9e7;&#x1f9e7;&#x1f9e7;&#x1f9e7;个人主页&#x1f388;&#x1f388;&#x1f388;&#x1f388;&#x1f388; &#x1f9e7;&#x1f9e7;&#x1f9e7;&#x1f9e7;&#x1f9e7;数据结构专栏&#x1f388;&#x1f388;&#x1f388;&…

机器视觉运动控制一体机在光伏汇流焊机器人系统的解决方案

一、市场应用背景 汇流焊是光伏太阳能电池板中段加工工艺&#xff0c;其前道工序为串焊&#xff0c;在此环节流程中&#xff0c;需要在多个太阳能电池片表面以平行方式串焊多条焊带&#xff0c;形成电池串。串焊好的多组电池串被有序排列输送到汇流焊接工作台&#xff0c;通过…

低功耗设计——门控时钟

1. 前言 芯片功耗组成中&#xff0c;有高达40%甚至更多是由时钟树消耗掉的。这个结果的原因也很直观&#xff0c;因为这些时钟树在系统中具有最高的切换频率&#xff0c;而且有很多时钟buffer&#xff0c;而且为了最小化时钟延时&#xff0c;它们通常具有很高的驱动强度。此外&…

在Redhat 7 Linux上安装llama.cpp

正常安装的错误信息 安装 gcc 和gcc-c 之后&#xff0c;你运行Make 命令编译llama.cpp的时候&#xff0c;你会发现下面问题。 yum -y install gcc --nogpgcheckyum -y install gcc-c --nogpgcheck 错误信息&#xff0c; 因为gcc 的版本是4.8 cc -I. -Icommon -D_XOPEN_SOU…

Rocky Linux 运维工具yum

一、yum的简介 ​​yum​是用于在基于RPM包管理系统的包管理工具。用户可以通过 ​yum​来搜索、安装、更新和删除软件包&#xff0c;自动处理依赖关系&#xff0c;方便快捷地管理系统上的软件。 二、yum的参数说明 1、install 用于在系统的上安装一个或多个软件包 2、seach 用…