测试集群内主机链路带宽并生成网络拓扑图
- 一、背景
- 二、操作步骤
- 1.修改 Docker 源
- 2.启动 Redis 服务
- 3.创建 Python 3.10 容器
- 4.在容器内安装依赖
- 5.创建删除 Redis 集合的脚本 `reset.py`
- 6.创建发布本地 IP 的脚本 `publish.py`
- 7.创建带宽测试的脚本 `benchmark.py`
- 8.创建生成网络拓扑图的脚本 `summary.py`
- 8.创建自动化脚本 `auto.py`
- 9.提交 Docker 镜像
- 10.重新进入 `net_scan` 容器,运行自动化脚本
一、背景
我们希望生成集群内主机之间每条链路的带宽图。方案如下:
- 通过 Redis 共享数据
- 枚举所有的网卡,向 Redis 发布自己的 IP,监听请求,用于客户端测试发送带宽
- 从 Redis 接受发布的 IP 并记录,尝试连接,如果连接成功则发送 2MB 的数据,测试发送带宽
- 将测试记录(主机名、源 IP、目的 IP、发送带宽)发送到 Redis 服务器
- 使用独立的 Python 程序读取 Redis 上的测试记录,生成网络拓扑图,节点为主机名,边为某个网卡对另一个节点的带宽
二、操作步骤
1.修改 Docker 源
sudo tee /etc/docker/daemon.json <<EOF
{"registry-mirrors":["https://docker.1ms.run","https://docker.xuanyuan.me"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
2.启动 Redis 服务
docker run -d --rm -p 6379:6379 redis:6.2
3.创建 Python 3.10 容器
docker run --shm-size=32g -it --privileged --net=host \-v $PWD:/home -w /home \--name net_scan python:3.10 /bin/bash
4.在容器内安装依赖
pip install redis
pip install psutil
pip install paramiko
pip install netifaces
pip install networkx
pip install matplotlib
apt install dmidecode -y
5.创建删除 Redis 集合的脚本 reset.py
cat > /mnt/reset.py <<-'EOF'
import redis
import sys
r = redis.Redis(host=sys.argv[1], port=6379)
if r.exists('host_info'):r.delete('host_info')if r.exists('test_results'):r.delete('test_results')
print("reset end")
EOF
6.创建发布本地 IP 的脚本 publish.py
cat > /mnt/publish.py <<-'EOF'
import redis
import sys
import socket
import redis
import netifaces
import threading
import subprocessdef gethostname():command="dmidecode -s baseboard-serial-number"serial_number=subprocess.check_output(command,shell=True).decode().strip()import osreturn os.environ.get('NODENAME',serial_number)def get_local_ips():"""获取本地主机的所有IP地址"""ips = []interfaces = netifaces.interfaces()for iface in interfaces:# 排除回环接口if iface == 'lo':continue# 排除虚拟接口if iface.startswith(('docker', 'br-', 'veth', 'virbr', 'vnet', 'vmnet', 'vboxnet')):continueaddrs = netifaces.ifaddresses(iface)if netifaces.AF_INET in addrs:for addr_info in addrs[netifaces.AF_INET]:ip=addr_info['addr']ips.append(ip)return ipsdef publish_host_info(redis_client, hostname, ips):"""将主机名和IP地址发布到Redis"""host_info = {'hostname': hostname, 'ips': ips}redis_client.sadd('host_info', str(host_info))def start_server(ip, port):"""在指定IP和端口上启动TCP服务器"""server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server.bind((ip, port))server.listen(5)print(f"服务器在 {ip}:{port} 上监听中...")while True:client_socket, addr = server.accept()threading.Thread(target=handle_client, args=(client_socket, addr)).start()def handle_client(client_socket, addr):"""处理客户端连接,接收数据"""while True:data = client_socket.recv(2 * 1024 * 1024) # 接收2MB数据if not data:breakclient_socket.close()def main():redis_client = redis.Redis(host=sys.argv[1], port=6379)hostname = gethostname()ips = get_local_ips()publish_host_info(redis_client, hostname, ips)port = 5000 # 定义测试使用的端口for ip in ips:threading.Thread(target=start_server, args=(ip, port)).start()if __name__ == '__main__':main()
EOF
7.创建带宽测试的脚本 benchmark.py
cat > /mnt/benchmark.py <<-'EOF'
import redis
import sys
import socket
import time
import ast
import threading
import netifaces
import subprocessdef gethostname():command="dmidecode -s baseboard-serial-number"serial_number=subprocess.check_output(command,shell=True).decode().strip()import osreturn os.environ.get('NODENAME',serial_number)def get_local_ips():"""获取本地主机的所有IP地址"""ips = []interfaces = netifaces.interfaces()for iface in interfaces:# 排除回环接口if iface == 'lo':continue# 排除虚拟接口if iface.startswith(('docker', 'br-', 'veth', 'virbr', 'vnet', 'vmnet', 'vboxnet')):continueaddrs = netifaces.ifaddresses(iface)if netifaces.AF_INET in addrs:for addr_info in addrs[netifaces.AF_INET]:ip=addr_info['addr']ips.append(ip)return ipsdef test_bandwidth(redis_client, local_hostname, local_ip):"""测试与其他主机的带宽"""host_info_list = redis_client.smembers('host_info')port = 5000local_port=5001results = []for info_str in host_info_list:info = ast.literal_eval(info_str.decode('utf-8'))remote_hostname = info['hostname']ips = info['ips']if remote_hostname == local_hostname:continuefor ip in ips:if ip == local_ip:continuetry:client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)client.bind((local_ip, local_port))client.settimeout(5)start_time = time.time()client.connect((ip, port))data = b'x' * (2 * 1024 * 1024) # 2MB的数据client.sendall(data)client.shutdown(socket.SHUT_WR)end_time = time.time()client.close()duration = end_time - start_timebandwidth = (2 * 8) / duration # 单位:Mbpsresult = {'source_hostname': local_hostname,'source_ip': local_ip,'dest_hostname': remote_hostname,'dest_ip': ip,'bandwidth': round(bandwidth, 2)}results.append(result)# 将结果发送到Redisredis_client.rpush('test_results', str(result))print(f"{local_hostname}:[{local_ip}] 与 {remote_hostname}:[{ip}] 的带宽为 {bandwidth:.2f} Mbps")except Exception as e:#print(f"无法连接到 {ip}:{port},错误:{e}")continuedef main():redis_client = redis.Redis(host=sys.argv[1], port=6379)local_hostname = gethostname()local_ips = get_local_ips()for local_ip in local_ips:test_bandwidth(redis_client, local_hostname, local_ip)if __name__ == '__main__':main()
EOF
8.创建生成网络拓扑图的脚本 summary.py
cat > /mnt/summary.py <<-'EOF'
import redis
import sys
import ast
import networkx as nx
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlibplt.figure(figsize=(19.2, 10.8), dpi=100)redis_client = redis.Redis(host=sys.argv[1], port=6379)
test_results = redis_client.lrange('test_results', 0, -1)data=[]
for result_str in test_results:result = ast.literal_eval(result_str.decode('utf-8'))data.append(result)# 创建一个有向多重图
G = nx.MultiDiGraph()# 添加节点和边到图中
for entry in data:src_hostname = entry['source_hostname'].replace("NODE","")dest_hostname = entry['dest_hostname'].replace("NODE","")bandwidth = entry['bandwidth']# 添加节点(如果节点已存在则不会重复添加)G.add_node(src_hostname)G.add_node(dest_hostname)# 添加边,带宽作为属性G.add_edge(src_hostname, dest_hostname, bandwidth=bandwidth)# 获取边列表以及对应的带宽,用于设置边的粗细
edges = G.edges(data=True)
bandwidths = [d['bandwidth'] for (u, v, d) in edges]# 计算边宽的归一化值,使得最大边宽不超过10
max_width = 8 # 定义最大边宽
max_bandwidth = max(bandwidths) # 最大带宽值
edge_widths = [(bandwidth / max_bandwidth) * max_width for bandwidth in bandwidths]# 准备边标签,显示带宽
edge_labels = {(u, v): f"{d['bandwidth']}" for u, v, d in edges}# 设置图片大小为1920x1080像素
plt.figure(figsize=(19.2, 10.8), dpi=100)# 使用spring布局来定位节点
#pos = nx.spring_layout(G)
pos = nx.circular_layout(G)
# 绘制节点
nx.draw_networkx_nodes(G, pos, node_size=1000, node_color='red')# 绘制节点标签(主机名)
nx.draw_networkx_labels(G, pos, font_size=12)# 绘制边,边的粗细根据带宽归一化调整
nx.draw_networkx_edges(G, pos, width=edge_widths, arrowstyle='->', arrowsize=20)# 从绘图中获取坐标转换器
ax = plt.gca()# 计算边标签的位置,距离起始节点30个像素
edge_label_pos = {}
pixel_offset = 64 # 偏移量,像素
for (u, v, d) in edges:# 获取起点和终点的坐标(数据坐标)x_start, y_start = pos[u]x_end, y_end = pos[v]# 计算边的方向向量dx = x_end - x_startdy = y_end - y_start# 计算边的长度(数据坐标系)edge_length = (dx**2 + dy**2) ** 0.5# 计算单位方向向量ux = dx / edge_lengthuy = dy / edge_length# 将数据坐标转换为像素坐标start_pixel = ax.transData.transform((x_start, y_start))end_pixel = ax.transData.transform((x_end, y_end))# 计算边的像素长度dx_pixel = end_pixel[0] - start_pixel[0]dy_pixel = end_pixel[1] - start_pixel[1]edge_length_pixel = (dx_pixel**2 + dy_pixel**2) ** 0.5# 计算偏移量在数据坐标系中的长度offset_data = (pixel_offset / edge_length_pixel) * edge_length# 计算新的标签位置(沿边方向偏移指定的像素距离)label_x = x_start + ux * offset_datalabel_y = y_start + uy * offset_data# 将标签位置存储起来edge_label_pos[(u, v)] = (label_x, label_y)# 使用 Matplotlib 在指定位置绘制边标签
for (u, v), (x, y) in edge_label_pos.items():label = edge_labels[(u, v)]plt.text(x, y, label, fontsize=10, ha='center', va='center', bbox=dict(facecolor='white', alpha=0.5, edgecolor='none'))# 隐藏坐标轴
plt.axis('off')
# 调整图形边距
plt.tight_layout()
plt.savefig('topo.png', dpi=100)
plt.show()
EOF
8.创建自动化脚本 auto.py
cat > /mnt/auto.py <<-'EOF'
import paramiko
import threading
import sys
import time
import osif len(sys.argv)!=4:print(f"{sys.argv[0]} redis_server docker_image_url hostfile")os._exit(0)redis_server=sys.argv[1]
docker_image_url=sys.argv[2]
hosts=[]with open(sys.argv[3],"r") as f:for line in f.readlines():hosts.append([x.strip() for x in line.split(" ")])def ssh_interactive_shell(hostname, port, username, password,ssh_requests,flog):ssh = paramiko.SSHClient()#print(hostname,ssh_requests)ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())try:ssh.connect(hostname=hostname, port=port, username=username, password=password)channel = ssh.invoke_shell()def recv_data():while True:if channel.recv_ready():data = channel.recv(1024)if not data:breaksys.stdout.write(data.decode())sys.stdout.flush()flog.write(data)else:if channel.exit_status_ready():breakrecv_thread = threading.Thread(target=recv_data)recv_thread.start()channel.send(ssh_requests + '\n')recv_thread.join()except Exception as e:print(f"Error: {e} {hostname},{port},{username},{password}")finally:ssh.close()def remote_exec(alias,nodename,hostname,port,username,password,cmdline,flog,is_blocking):if is_blocking:mode=""else:mode="-d"ssh_requests = f'''
docker run --rm {mode} --name {alias} -e NODENAME={nodename} --privileged --net=host {docker_image_url} {cmdline}
sleep 1
exit'''ssh_interactive_shell(hostname, port, username, password, ssh_requests,flog)def stop_docker(hostname,port,username,password,flog):print(f"stop_docker:{hostname}")ssh_interactive_shell(hostname, port, username, password, f"docker stop publish;docker pull {docker_image_url};exit",flog)if __name__ == "__main__":output_file = "log.txt"flog = open(output_file, 'wb')for nodename,hostname,port,username,password in hosts:stop_docker(hostname,port,username,password,flog)for nodename,hostname,port,username,password in hosts:cmdline=f"python /mnt/reset.py {redis_server}"remote_exec("reset",nodename,hostname,port,username,password,cmdline,flog,True)for nodename,hostname,port,username,password in hosts:cmdline=f"python /mnt/publish.py {redis_server}"remote_exec("publish",nodename,hostname,port,username,password,cmdline,flog,False)time.sleep(2)for nodename,hostname,port,username,password in hosts:cmdline=f"python /mnt/benchmark.py {redis_server}"remote_exec("benchmark",nodename,hostname,port,username,password,cmdline,flog,True)for nodename,hostname,port,username,password in hosts:stop_docker(hostname,port,username,password,flog)flog.close()
EOF
9.提交 Docker 镜像
docker commit net_scan <镜像地址>
docker push <镜像地址>
10.重新进入 net_scan
容器,运行自动化脚本
docker exec -it net_scan bash
创建 hostfile
文件:
cat > hostfile << 'EOF'
<NODE1> <IP> <端口> <用户名> <密码>
<NODE2> <IP> <端口> <用户名> <密码>
EOF
运行自动化脚本:
python /mnt/auto.py <Redis 服务 IP> <镜像地址> hostfile
python /mnt/summary.py <Redis 服务 IP>
执行完上述步骤后,将生成集群内主机链路带宽的拓扑图 topo.png
,可用于分析集群网络性能。