性能测试 - Locust WebSocket client

devtools/2025/1/16 14:18:20/

Max.Bai

2024.10

0. 背景

Locust性能测试工具,但是默认只支持http协议,就是默认只有http的client,需要其他协议的测试必须自己扩展对于的client,比如下面的WebSocket client。

1. WebSocket test Client
python">“”“
Max.Bai
Websocket Client
”“”
import json
import logging
import secrets
import threading
import time
from typing import Callable, Optionalimport websocket
from locust import eventslogger = logging.getLogger(__name__)class WebSocketClient:def __init__(self, host: str, log_messages: bool = False):self._host: str = hostself._id: str = secrets.token_hex(8)self._alias: Optional[str] = Noneself._ws: Optional[websocket.WebSocketApp] = Noneself.log_messages = log_messagesself.count_recv_type = Falseself.heartbeat_auto_respond = Falseself._recv_messages: list = []self.messages: list = []self._sent_messages: list = []def __enter__(self):self.connect()return selfdef __exit__(self, type, value, traceback):self.disconnect()@propertydef tag(self) -> str:tag = f"{self._host} <{self._id}>"if self._alias:tag += f"({self._alias})"return tagdef connect(self, alias: Optional[str] = None, headers: Optional[dict] = None, on_message: Optional[Callable] = None):if not self._ws:self._alias = aliasself._ws = websocket.WebSocketApp(url=self._host,header=headers,on_open=self._on_open,on_message=on_message if on_message else self._on_message,on_close=self._on_close,)thread = threading.Thread(target=self._ws.run_forever)thread.daemon = Truethread.start()time.sleep(3)else:logger.warning("An active WebSocket connection is already established.")def is_connected(self) -> bool:return self._ws is not Nonedef disconnect(self):if self._ws:self._ws.close()self._alias = Noneelse:logger.warning("No active WebSocket connection established.")def _on_open(self, ws):logger.debug(f"[WebSocket] {self.tag} connected.")events.request.fire(request_type="ws_client",name="connect",response_time=0,response_length=0,)def _on_message(self, ws, message):recv_time = time.time()recv_time_ms = int(recv_time * 1000)recv_time_ns = int(recv_time * 1000000)logger.debug(f"[WebSocket] {self.tag} message received: {message}")if self.log_messages:self._recv_messages.append(message)self.messages.append(message)# public/respond-heartbeatif self.heartbeat_auto_respond:if "public/heartbeat" in message:self.send(message.replace("public/heartbeat", "public/respond-heartbeat"))if self.count_recv_type:try:msg = json.loads(message)id = str(msg.get("id", 0))if len(id) == 13:resp_time = recv_time_ms - int(id)elif len(id) == 16:resp_time = (recv_time_ns - int(id)) / 1000elif len(id) > 13:resp_time = recv_time_ms - int(id[:13])else:resp_time = 0method = msg.get("method", "unknown")code = msg.get("code", "unknown")error = msg.get("message", "unknown")# send_time = int(msg.get("nonce", 0))if method in ["public/heartbeat", "private/set-cancel-on-disconnect"]:events.request.fire(request_type="ws_client",name=f"recv {method}",response_time=0,response_length=len(msg),)elif code == 0:events.request.fire(request_type="ws_client",name=f"recv {method} {code}",# response_time=recv_time - send_time,response_time=resp_time,response_length=len(msg),)else:events.request.fire(request_type="ws_client",name=f"recv {method} {code}",response_time=resp_time,response_length=len(msg),exception=error,)except Exception as e:events.request.fire(request_type="ws_client",name="recv error",response_time=0,response_length=len(msg),exception=str(e),)def _on_close(self, ws, close_status_code, close_msg):logger.debug(f"[WebSocket] {self.tag} closed.")self._ws = Noneevents.request.fire(request_type="ws_client",name="close",response_time=0,response_length=0,)def set_on_message(self, on_message: Callable):self._ws.on_message = on_messagedef send(self, message: str):if self._ws:self._ws.send(data=message)if self.log_messages:self._sent_messages.append(message)logger.debug(f"[WebSocket] {self.tag} message sent: {message}")else:logger.warning(f"No active [WebSocket] {self.tag} connection established.")raise ConnectionError("No active [WebSocket] connection established.")def clear(self):self._recv_messages = []self._sent_messages = []self.messages = []def expect_messages(self,matcher: Callable[..., bool],count: int = 1,timeout: int = 10,interval: int = 1,) -> list:"""Expect to receive one or more filtered messages.Args:matcher (Callable): A matcher function used to filter the received messages.count (int, optional): Number of messages to be expected before timeout. Defaults to 1.timeout (int, optional): Timeout in seconds. Defaults to 10.interval (int, optional): Interval in seconds. Defaults to 1.Returns:list: A list of messages filtered by the matcher."""deadline: float = time.time() + timeoutresult: list = []  # messages filtered by the matcherseen: list = []  # messages already seen by the matcher to be excluded from further matchingwhile time.time() < deadline:snapshot: list = [*self._recv_messages]for element in seen:if element in snapshot:snapshot.remove(element)result.extend(filter(matcher, snapshot))if len(result) >= count:breakseen.extend(snapshot)time.sleep(interval)if len(result) < count:logger.warning(f"({self.tag}) Expected to receive {count} messages, but received only {len(result)} messages.")return result
2. 如何使用
python">class PrivateWsUser(User):def on_start(self):self.ws_client=WebSocketClient("wss://abc.pp.com/chat", log_message=True)self.ws_client.connect()@taskdef send_hello()self.ws_client.send("hello world")
3. 扩展

可自行扩展on_message 方法,上面的on_message 方法是json 格式的信息处理


http://www.ppmy.cn/devtools/150972.html

相关文章

模之屋模型导入到UE5

去模之屋随便下个模型 安装Blender2.8 插件 cats-blender-plugin &#xff0c; 打开blender 2.8转换 pmx转换fbx https://github.com/absolute-quantum/cats-blender-plugin Index of /release/Blender2.80/ 修改单位 修复贴图 更高清了 点fix model 修复模型 改为编辑模式…

mysql5.7+实现分组排行实现分组排行,自动序列,分组求最值

在做导出时&#xff0c;遇到一个根据价格最低数统计&#xff0c;所以用到了序号排行&#xff0c;mysql 8.0以上版本支持窗口函数&#xff0c;8.0可以参看我另一篇文章 一下总结5.7怎么实现一下几种函数&#xff1a; 1、根据分组的结果是每一行记录生成一个序号&#xff0c;每…

django在线考试系统

Django在线考试系统是一种基于Django框架开发的在线考试平台&#xff0c;它提供了完整的在线考试解决方案。 一、系统概述 Django在线考试系统旨在为用户提供便捷、高效的在线考试环境&#xff0c;满足教育机构、企业、个人等不同场景下的考试需求。通过该系统&#xff0c;用…

504 Gateway Timeout:网关超时解决方法

一、什么是 504Gateway Timeout&#xff1f; 1. 错误定义 504 Gateway Timeout 是 HTTP 状态码的一种&#xff0c;表示网关或代理服务器在等待上游服务器响应时超时。通俗来说&#xff0c;这是服务器之间“对话失败”导致的。 2. 常见触发场景 Nginx 超时&#xff1a;反向代…

72_List列表原理

1.List列表介绍 在Redis的List数据类型中,元素以字符串形式存在,并按照它们被插入的顺序进行有序排列。List允许元素重复,即相同元素可以被多次添加到列表中。每个List的容量上限为2的32次方减1,,也就是4294967295个元素。我们可以添加一个新元素到List列表的头部(左边)…

Plyr 在 vue3 中的使用

1. 安装 Plyr pnpm add plyr 2. 引入 Plyr 在需要使用 Plyr 的 Vue 组件中&#xff0c;引入 Plyr 的 CSS 和 JavaScript 文件。 import "plyr/dist/plyr.css";import Plyr from "plyr"; 3. 在模板中使用 Plyr 在 Vue 组件的<template>中&#…

链家房价数据爬虫和机器学习数据可视化预测

完整源码项目包获取→点击文章末尾名片&#xff01;

征服Windows版nginx(2)

1.配置Nginx 编辑Nginx的配置文件&#xff08;通常是nginx.conf&#xff09;&#xff0c;找到安装Nginx位置&#xff0c;如下路径&#xff1a; D:\nginx-1.26.2\conf 双击打开nginx.CONF编辑&#xff0c;在http块中添加一个新的server块&#xff0c;用于指定Vue项目的静态文件…