股票量化实时行情接口WebSocket接入Python封装

news/2024/11/13 1:21:14/

Python做量化,如果是日内策略,需要更实时的行情数据,不然策略滑点太大,容易跑偏结果。

之前用行情网站提供的level1行情接口,实测平均更新延迟达到了6秒,超过10只股票并发请求频率过快很容易封IP。后面又尝试了买代理IP来请求,成本太高而且不稳定。

在Github上看到一个可转债的Golang高频T+0策略,对接的是WebSocket协议,拿来改了改,封装了一个Python版本的包,记录一下:

#!python3
# -*- coding:utf-8 -*-
import time
import datetime
import websocket
import zlib
import requests
import threading# 行情订阅推送封装
class Construct:__token = ""__server_req_url = "http://jvquant.com/query/server?market=ab&type=websocket&token="__ws_ser_addr = ""__ws_conn = ""__lv1_field = ["time", "name", "price", "ratio", "volume", "amount", "b1", "b1p", "b2", "b2p", "b3", "b3p", "b4","b4p", "b5", "b5p", "s1", "s1p", "s2", "s2p", "s3", "s3p", "s4", "s4p", "s5", "s5p"]__lv2_field = ["time", "oid", "price", "vol"]def __init__(self, logHandle, token, onRevLv1, onRevLv2):if logHandle == "" or token == "" or onRevLv1 == "" or onRevLv2 == "":msg = "行情初始化失败:logHandle或token或onRevLv1或onRevLv2必要参数缺失。"print(msg)exit(-1)self.__log = logHandleself.__token = tokenself.__deal_lv1 = onRevLv1self.__deal_lv2 = onRevLv2self.__getSerAddr()self.__conn_event = threading.Event()self.th_handle = threading.Thread(target=self.__conn)self.th_handle.start()self.__conn_event.wait()def __getSerAddr(self):url = self.__server_req_url + self.__tokentry:res = requests.get(url=url)except Exception as e:self.__log(e)returnif (res.json()["code"] == "0"):self.__ws_ser_addr = res.json()["server"]print("获取行情服务器地址成功:" + self.__ws_ser_addr)else:msg = "获取行情服务器地址失败:" + res.textself.__log(msg)exit(-1)def __conn(self):wsUrl = self.__ws_ser_addr + "?token=" + self.__tokenself.__ws_conn = websocket.WebSocketApp(wsUrl,on_open=self.__on_open,on_data=self.__on_message,on_error=self.__on_error,on_close=self.__on_close)self.__ws_conn.run_forever()self.__conn_event.set()self.__log("ws thread exited")def addLv1(self, codeArr):cmd = "add="lv1Codes = []for code in codeArr:lv1Codes.append("lv1_" + code)cmd = cmd + ",".join(lv1Codes)self.__log("cmd:" + cmd)self.__ws_conn.send(cmd)def addLv2(self, codeArr):cmd = "add="lv1Codes = []for code in codeArr:lv1Codes.append("lv2_" + code)cmd = cmd + ",".join(lv1Codes)self.__log("cmd:" + cmd)self.__ws_conn.send(cmd)def dealLv1(self, data):self.__deal_lv1(data)def dealLv2(self, data):self.__deal_lv1(data)def __on_open(self, ws):self.__conn_event.set()self.__log("行情连接已创建")def __on_error(self, ws, error):self.__log("行情处理error:", error)def __on_close(self, ws, code, msg):self.__log("行情服务未连接")def close(self):self.__ws_conn.close()def __on_message(self, ws, message, type, flag):# 命令返回文本消息if type == websocket.ABNF.OPCODE_TEXT:self.__log("Text响应:" + message)# 行情推送压缩二进制消息,在此解压缩if type == websocket.ABNF.OPCODE_BINARY:now = datetime.datetime.now()nStamp = time.mktime(now.timetuple())date = now.strftime('%Y-%m-%d')rb = zlib.decompress(message, -zlib.MAX_WBITS)text = rb.decode("utf-8")# self.__log("Binary响应:" + text)ex1 = text.split("\n")for e1 in ex1:ex2 = e1.split("=")if len(ex2) != 2:continuecode = ex2[0]hqs = ex2[1]if code.startswith("lv1_"):code = code.replace("lv1_", "")hq = hqs.split(",")if len(hq) == len(self.__lv1_field):hqMap = dict(zip(self.__lv1_field, hq))timeStr = hqMap['time']date_obj = datetime.datetime.strptime(date + ' ' + timeStr, '%Y-%m-%d %H:%M:%S')tStamp = int(time.mktime(date_obj.timetuple()))if abs(tStamp - nStamp) <= 2:self.__deal_lv1(code, hqMap)if code.startswith("lv2_"):code = code.replace("lv2_", "")hqArr = hqs.split("|")for hq in hqArr:hqEx = hq.split(",")if len(hqEx) == len(self.__lv2_field):hqMap = dict(zip(self.__lv2_field, hqEx))timeEx = hqMap['time'].split('.')if len(timeEx) == 2:timeStr = timeEx[0]date_obj = datetime.datetime.strptime(date + ' ' + timeStr, '%Y-%m-%d %H:%M:%S')tStamp = int(time.mktime(date_obj.timetuple()))if abs(tStamp - nStamp) <= 2:self.__deal_lv2(code, hqMap)

引用地址:https://github.com/freevolunteer/bondTrader/blob/main/pyscript/jvUtil/HanqQing.py

订阅指令参考:https://jvquant.com/wiki.html#--9

原文地址:https://zhuanlan.zhihu.com/p/6059899873


http://www.ppmy.cn/news/1546006.html

相关文章

漫谈分布式唯一ID

文章目录 本系列前言UUIDDB自增主键Redis incr命令号段模式雪花算法 本系列 漫谈分布式唯一ID&#xff08;本文&#xff09;分布式唯一ID生成&#xff08;二&#xff09;&#xff1a;leaf分布式唯一ID生成&#xff08;三&#xff09;&#xff1a;uid-generator分布式唯一ID生成…

解析Eureka的架构

1. 引言 1.1 Eureka的定义与背景 Eureka是由Netflix开发的一个RESTful服务&#xff0c;用于服务发现。它是微服务架构中的一个核心组件&#xff0c;主要用于管理服务的注册和发现。Eureka允许服务提供者注册自己的服务信息&#xff0c;同时也允许服务消费者查询可用的服务&am…

数据结构之排序--选择排序详解

选择排序 每一次从待排序的数据元素中选出最小&#xff08;或最大&#xff09;的一个元素&#xff0c;存放在序列的起始位置&#xff0c;直到全部待排序的数据元素排完 。 直接选择排序: 在元素集合 array[i]--array[n-1] 中选择关键码最大 ( 小 ) 的数据元素 若它不是这组元…

React native Text Webview 处理字体大小的变化

If the user has set a custom font size in the Android system, an undesirable scale of the site interface in WebView occurs. 如果用户在Android系统中设置了自定义字体大小&#xff0c;会导致WebView中的站点界面出现不良比例 When setting the standard textZoom (100…

前端学习Day12 CSS盒子的定位(相对定位篇“附练习”)

一、相对定位 使用相对定位的盒子会相对于自身原本的位置&#xff0c;通过偏移指定的距离&#xff0c;到达新的位置。盒子的本体仍处于文档流中。使用相对定位&#xff0c;除了要将 position 属性值设置为 relative 外&#xff0c;还需要指定一定的偏移量。其中&#xff0c;水…

【freertos】FreeRTOS任务管理

FreeRTOS任务管理 一、任务的创建和删除1、函数xTaskCreate2、函数xTaskCreateStatic3、函数xTaskCreateRestricted4、函数vTaskDelete 二、任务的挂起和恢复1、函数vTaskSuspend2、函数vTaskResume3、函数vTaskResumeFromISR4、函数vTaskSuspendAll5、函数xTaskResumeAll 三、…

Vite与Vue Cli的区别与详解

它们的功能非常相似&#xff0c;都是提供基本项目脚手架和开发服务器的构建工具。 主要区别 Vite在开发环境下基于浏览器原生ES6 Modules提供功能支持&#xff0c;在生产环境下基于Rollup打包&#xff1b; Vue Cli不区分环境&#xff0c;都是基于Webpack。 在生产环境下&…

C#:强大而优雅的编程语言

在当今的软件开发领域&#xff0c;C#作为一种广泛应用的编程语言&#xff0c;以其强大的功能、优雅的语法和丰富的生态系统&#xff0c;受到了众多开发者的喜爱。本文将深入探讨 C#的各个方面&#xff0c;展示它的魅力和优势。 一、C#的历史与发展 C#是由微软公司开发的一种面…