Max.Bai
2024.10
0. 背景
Locust 是性能测试工具,但是默认只支持http协议,就是默认只有http的client,需要其他协议的测试必须自己扩展对于的client,比如下面的WebSocket client。
1. WebSocket test Client
python">“”“
Max.Bai
Websocket Client
”“”
import json
import logging
import secrets
import threading
import time
from typing import Callable, Optionalimport websocket
from locust import eventslogger = logging.getLogger(__name__)class WebSocketClient:def __init__(self, host: str, log_messages: bool = False):self._host: str = hostself._id: str = secrets.token_hex(8)self._alias: Optional[str] = Noneself._ws: Optional[websocket.WebSocketApp] = Noneself.log_messages = log_messagesself.count_recv_type = Falseself.heartbeat_auto_respond = Falseself._recv_messages: list = []self.messages: list = []self._sent_messages: list = []def __enter__(self):self.connect()return selfdef __exit__(self, type, value, traceback):self.disconnect()@propertydef tag(self) -> str:tag = f"{self._host} <{self._id}>"if self._alias:tag += f"({self._alias})"return tagdef connect(self, alias: Optional[str] = None, headers: Optional[dict] = None, on_message: Optional[Callable] = None):if not self._ws:self._alias = aliasself._ws = websocket.WebSocketApp(url=self._host,header=headers,on_open=self._on_open,on_message=on_message if on_message else self._on_message,on_close=self._on_close,)thread = threading.Thread(target=self._ws.run_forever)thread.daemon = Truethread.start()time.sleep(3)else:logger.warning("An active WebSocket connection is already established.")def is_connected(self) -> bool:return self._ws is not Nonedef disconnect(self):if self._ws:self._ws.close()self._alias = Noneelse:logger.warning("No active WebSocket connection established.")def _on_open(self, ws):logger.debug(f"[WebSocket] {self.tag} connected.")events.request.fire(request_type="ws_client",name="connect",response_time=0,response_length=0,)def _on_message(self, ws, message):recv_time = time.time()recv_time_ms = int(recv_time * 1000)recv_time_ns = int(recv_time * 1000000)logger.debug(f"[WebSocket] {self.tag} message received: {message}")if self.log_messages:self._recv_messages.append(message)self.messages.append(message)# public/respond-heartbeatif self.heartbeat_auto_respond:if "public/heartbeat" in message:self.send(message.replace("public/heartbeat", "public/respond-heartbeat"))if self.count_recv_type:try:msg = json.loads(message)id = str(msg.get("id", 0))if len(id) == 13:resp_time = recv_time_ms - int(id)elif len(id) == 16:resp_time = (recv_time_ns - int(id)) / 1000elif len(id) > 13:resp_time = recv_time_ms - int(id[:13])else:resp_time = 0method = msg.get("method", "unknown")code = msg.get("code", "unknown")error = msg.get("message", "unknown")# send_time = int(msg.get("nonce", 0))if method in ["public/heartbeat", "private/set-cancel-on-disconnect"]:events.request.fire(request_type="ws_client",name=f"recv {method}",response_time=0,response_length=len(msg),)elif code == 0:events.request.fire(request_type="ws_client",name=f"recv {method} {code}",# response_time=recv_time - send_time,response_time=resp_time,response_length=len(msg),)else:events.request.fire(request_type="ws_client",name=f"recv {method} {code}",response_time=resp_time,response_length=len(msg),exception=error,)except Exception as e:events.request.fire(request_type="ws_client",name="recv error",response_time=0,response_length=len(msg),exception=str(e),)def _on_close(self, ws, close_status_code, close_msg):logger.debug(f"[WebSocket] {self.tag} closed.")self._ws = Noneevents.request.fire(request_type="ws_client",name="close",response_time=0,response_length=0,)def set_on_message(self, on_message: Callable):self._ws.on_message = on_messagedef send(self, message: str):if self._ws:self._ws.send(data=message)if self.log_messages:self._sent_messages.append(message)logger.debug(f"[WebSocket] {self.tag} message sent: {message}")else:logger.warning(f"No active [WebSocket] {self.tag} connection established.")raise ConnectionError("No active [WebSocket] connection established.")def clear(self):self._recv_messages = []self._sent_messages = []self.messages = []def expect_messages(self,matcher: Callable[..., bool],count: int = 1,timeout: int = 10,interval: int = 1,) -> list:"""Expect to receive one or more filtered messages.Args:matcher (Callable): A matcher function used to filter the received messages.count (int, optional): Number of messages to be expected before timeout. Defaults to 1.timeout (int, optional): Timeout in seconds. Defaults to 10.interval (int, optional): Interval in seconds. Defaults to 1.Returns:list: A list of messages filtered by the matcher."""deadline: float = time.time() + timeoutresult: list = [] # messages filtered by the matcherseen: list = [] # messages already seen by the matcher to be excluded from further matchingwhile time.time() < deadline:snapshot: list = [*self._recv_messages]for element in seen:if element in snapshot:snapshot.remove(element)result.extend(filter(matcher, snapshot))if len(result) >= count:breakseen.extend(snapshot)time.sleep(interval)if len(result) < count:logger.warning(f"({self.tag}) Expected to receive {count} messages, but received only {len(result)} messages.")return result
2. 如何使用
python">class PrivateWsUser(User):def on_start(self):self.ws_client=WebSocketClient("wss://abc.pp.com/chat", log_message=True)self.ws_client.connect()@taskdef send_hello()self.ws_client.send("hello world")
3. 扩展
可自行扩展on_message 方法,上面的on_message 方法是json 格式的信息处理