一.websocket简介及安装
使用pip命令安装websocket库:pip install websocket-client
websocket.WebSocketApp
是对 websocket.WebSocket
的封装,支持自动定时发送 PING 帧,支持事件驱动方式的数据帧接收,可用于长期的 WebSocket 连接。
websocket中就有建立连接connect、发送消息send等函数可供使用,但是websocket.WebSocketApp将这些都封装好了,只用在实例化的时候传入自定义函数即可,更方便。
WebSocketApp也是websocket中的一个类。要使用WebSocketApp中的回调函数需要传入一系列的可调用对象。在实例化该类时传入构造函数中的on_open、on_message、on_error就需要传入一系列的可调用对象,例如自定义的函数。
二.方法
运行WebSocketApp的事件循环,先创建webSocket对象,然后connect连接服务器,之后一直循环运行接收数据帧,回调对应函数处理数据帧;当websocket客户端被关闭后,将调用on_close()方法然后结束循环返回;当循环中发生异常时被捕捉,然后依次调用on_error(),on_close()方法,然后结束循环返回。
python"># -*- coding:utf-8 -*-import websocket
import time
import threading
import multiprocessing
from threadpool import ThreadPool, makeRequests"""
websocket长链接
"""# websocket地址
WS_URL = "ws://124.222.224.186:8800"
# 定义进程数
processes = 2
# 定义线程数(每个文件可能限制1024个,可以修改fs.file等参数)
thread_num = 3
# 拼接发送的消息
sendMsg = '{"appid":"futures","cover":0,"event":[\{"type":"exchange_rate","toggle":1,"expireTime":86400},\{"type":"accountInfo_USDT","toggle":1,"expireTime":'def on_message(ws, message):print("***message***", message)def on_error(ws, error):print("***error***", error)def on_close(ws, *args):print("***closed***")def on_open(ws):def send_thread():# 设置你websocket的内容# 每隔10秒发送一下数据使链接不中断while True:# ws.send('hello服务器' + str(time.time()))ws.send(sendMsg + str(time.time()) + '}]}')time.sleep(10)t = threading.Thread(target=send_thread)t.start()def on_start(num):time.sleep(5)ws = websocket.WebSocketApp(WS_URL,on_message=on_message,on_error=on_error,on_close=on_close)ws.on_open = on_openws.run_forever()def thread_web_socket():# 线程池pool_list = ThreadPool(thread_num)num = list()# 设置开启线程的数量for ir in range(thread_num):num.append(ir)requests = makeRequests(on_start, num)[pool_list.putRequest(req) for req in requests]pool_list.wait()if __name__ == "__main__":# 进程池pool = multiprocessing.Pool(processes=processes)# 设置开启进程的数量for i in range(processes):pool.apply_async(thread_web_socket)pool.close()pool.join()
2个进程运行,各运行3个线程,建立6个连接,每10秒发送一次数据: