如何使用ebpf统计某个端口的流量

news/2024/11/24 11:53:36/

前言

        上篇文章我们已经初步使用kprobe来探测内核函数了, 这篇文章就在上篇文章的基础上做个修改, 通过kprobe探测内核函数tcp_sendmsg来统计tcp服务端的发送流量. 废话不多说, 直接上正文. 

环境

        tcp服务端运行在ubuntu22, 监听端口为6230, 其内核为5.19.0-26-generic, ebpf程序同样运行在ubuntu22.

        tcp客户端运行在centos7, 其内核为3.10.0-1160.el7.x86_64. 

代码

        ebpf代码同样分为两个文件, 一个是内核态代码, 探测内核函数, 并把相关信息写入map, 在本文中是tcp服务端发送的流量大小(Byte). 一个是用户态代码, 定时读取map并打印流量大小(Byte).

        for_flow_kern.c代码如下:

// for_flow_kern.c#include <linux/skbuff.h>
#include <linux/netdevice.h>
#include <linux/version.h>
#include <uapi/linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
#include <net/sock.h>
#include "trace_common.h"#define MAX_ENTRIES 64struct {__uint(type, BPF_MAP_TYPE_HASH);__type(key, u64);__type(value, u32);__uint(max_entries, MAX_ENTRIES);
} pid_2_port_map SEC(".maps");struct {__uint(type, BPF_MAP_TYPE_HASH);__type(key, u64);__type(value, u64);__uint(max_entries, MAX_ENTRIES);
} pid_2_flow_map SEC(".maps");SEC("kprobe/tcp_sendmsg")
int trace_sys_send(struct pt_regs *ctx)
{int ret = 0;u16 family = 0;struct sock *sock = (struct sock *)PT_REGS_PARM1_CORE(ctx);if ((ret = bpf_probe_read_kernel(&family, sizeof(family), &sock->sk_family))){return 0;}if (family != AF_INET){return 0;}u16 port_tmp = 0;if ((ret = bpf_probe_read_kernel(&port_tmp, sizeof(port_tmp), &sock->sk_num))){return 0;}if (port_tmp == 6230){u64 pid = bpf_get_current_pid_tgid();u32 port = port_tmp;bpf_map_update_elem(&pid_2_port_map, (const void *)&pid, &port, BPF_ANY);}return 0;
}SEC("kretprobe/tcp_sendmsg")
int trace_sys_send_ret(struct pt_regs *ctx)
{int ret = 0;u64 pid = bpf_get_current_pid_tgid();// 获取pidu32 *value_ptr = bpf_map_lookup_elem(&pid_2_port_map, &pid);if (!value_ptr){return 0;}// 获取tcp_sendmsg返回值int size = PT_REGS_RC(ctx);if (size > 0){// 查找flowu64 *flow_ptr = bpf_map_lookup_elem(&pid_2_flow_map, &pid);u64 sum = flow_ptr == NULL ? (0 + size) : (*flow_ptr + size); bpf_map_update_elem(&pid_2_flow_map, &pid, &sum, BPF_ANY);}return 0;
}char _license[] SEC("license") = "GPL";
u32 _version SEC("version") = LINUX_VERSION_CODE;

        for_flow_user.c代码如下:

// for_flow_user.c#define _GNU_SOURCE
#include <sched.h>
#include <stdio.h>
#include <sys/types.h>
#include <asm/unistd.h>
#include <unistd.h>
#include <assert.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include <arpa/inet.h>
#include <errno.h>#include <bpf/bpf.h>
#include <bpf/libbpf.h>int main(int argc, char **argv)
{struct bpf_object *obj;int i = 0;int mfd1 = 0;struct bpf_link *link1 = NULL;struct bpf_program *prog1;int mfd2 = 0;struct bpf_link *link2 = NULL;struct bpf_program *prog2;char filename[256];snprintf(filename, sizeof(filename), "%s_kern.o", argv[0]);// objobj = bpf_object__open_file(filename, NULL);if (libbpf_get_error(obj)) {fprintf(stderr, "ERROR: opening BPF object file failed\n");return -1;}if (bpf_object__load(obj)) {fprintf(stderr, "ERROR: loading BPF object file failed\n");goto END;}// ------------- //prog1 = bpf_object__find_program_by_name(obj, "trace_sys_send");if (!prog1) {printf("finding a prog1 in obj file failed\n");goto END;}prog2 = bpf_object__find_program_by_name(obj, "trace_sys_send_ret");if (!prog2) {printf("finding a prog2 in obj file failed\n");goto END;}// ------------- //mfd1 = bpf_object__find_map_fd_by_name(obj, "pid_2_port_map");if (mfd1 < 0){fprintf(stderr, "ERROR: finding a map mfd1 in obj file failed\n");goto END;    }mfd2 = bpf_object__find_map_fd_by_name(obj, "pid_2_flow_map");if (mfd2 < 0){fprintf(stderr, "ERROR: finding a map mfd2 in obj file failed\n");goto END;    }// ------------- //link1 = bpf_program__attach(prog1);if (libbpf_get_error(link1)) {fprintf(stderr, "ERROR: bpf_program__attach link1 failed\n");link1 = NULL;goto END;}link2 = bpf_program__attach(prog2);if (libbpf_get_error(link2)) {fprintf(stderr, "ERROR: bpf_program__attach link2 failed\n");link2 = NULL;goto END;}for (i = 0; i < 1000; i++){unsigned long long key = 0;unsigned long long next_key = 0;int j = 0;while (bpf_map_get_next_key(mfd2, &key, &next_key) == 0) {unsigned int value = 0;bpf_map_lookup_elem(mfd1, &next_key, &value);fprintf(stdout, "pid%d: %llu, flow: %u\n", ++j, next_key, value);key = next_key;}key = 0;next_key = 0;j = 0;while (bpf_map_get_next_key(mfd2, &key, &next_key) == 0) {unsigned long long value = 0;bpf_map_lookup_elem(mfd2, &next_key, &value);fprintf(stdout, "pid%d: %llu, flow: %llu\n", ++j, next_key, value);key = next_key;}printf("-----------------------\n");sleep(2);}END:bpf_link__destroy(link1);bpf_link__destroy(link2);bpf_object__close(obj);return 0;
}

        把for_flow_kern.c和for_flow_user.c两个文件加入Makefile后编译.

        在这里提供一个自己编写的tcp客户端和服务端测试程序, 可能会有bug, 但是应付这个测试是没问题的. 客户端和服务端互相通信, 客户端每次写1024字节, 每次读512字节. 服务端每次读1024字节, 每次写512字节. 代码如下:

// tcp_test.c#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <signal.h>
#include <sys/ioctl.h>
#include <linux/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>#define EPOLL_SIZE 1024
#define PARALLEL_MAX 16
#define READ_SIZE 1024
#define WRITE_SIZE 512#define SET_NONBLOCK(fd) ({fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);})#define EPOLL_ET_CTL(node, op, event) \
({\struct epoll_event ev = {0x00}; \ev.events = (event) | EPOLLET; \ev.data.ptr = (node); \epoll_ctl((node)->ep_fd, (op), (node)->fd, &ev); \
})#define EPOLL_ET_DEL(node) \
({\epoll_ctl((node)->ep_fd, EPOLL_CTL_DEL, (node)->fd, NULL); \
})/*
* 命令行参数
*/
typedef struct config
{int mode;char *addr;unsigned short int port;int parallel;struct sockaddr_in addr_in;socklen_t  addr_in_len;
}config_t;/*
* epoll树的节点
*/
typedef struct ep_node
{struct sockaddr_in addr_in;int ep_fd;int sk_fd;int fd;long long r_sum;long long w_sum;
}ep_node_t;typedef struct ep_instance
{config_t *conf;int position;                   // 0代表主干线程                                    int ep_fd;                      // epoll树节点union {int sk_fd;              // 主干epoll维护的socketint r_pipe;             // 分支epoll维护的读pipe, 与w_pipes对应};int w_pipes[PARALLEL_MAX];      // 主干线程维护的各子线程pipe的写端long long count;
}ep_instance_t;typedef struct fd_and_addr
{int fd;struct sockaddr_in addr_in;
}fd_and_addr_t;static int tcp_socket(const config_t *conf)
{int sfd = socket(AF_INET, SOCK_STREAM, 0);if (sfd == -1){printf("socket failed, err msg: %s\n", strerror(errno));return -1;}int val1 = 1;if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR | (conf->mode == 0 ? SO_REUSEPORT : 0), (void *)&val1, sizeof(val1)) == -1){printf("setsockopt failed, err msg: %s\n", strerror(errno));goto FAILED;               }if (conf->mode == 0){if (bind(sfd, (struct sockaddr*)&conf->addr_in, conf->addr_in_len) == -1){printf("bind failed, err msg: %s\n", strerror(errno));goto FAILED;               }if (listen(sfd, 1024)){printf("bind failed, err msg: %s\n", strerror(errno));goto FAILED;                              }                       }else{if (connect(sfd, (struct sockaddr*)&conf->addr_in, conf->addr_in_len)){printf("connect failed, err msg: %s\n", strerror(errno));goto FAILED;                        }                }int val2 = 1;if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&val2, sizeof(val2)) == -1){printf("setsockopt failed, err msg: %s\n", strerror(errno));goto FAILED;               }SET_NONBLOCK(sfd);return sfd;FAILED:close(sfd);return -1;
}ep_node_t *accept_cb(ep_instance_t *ins, ep_node_t *node)
{ep_node_t *new_node = (ep_node_t *)malloc(sizeof(ep_node_t));if (!new_node){       return NULL;}fd_and_addr_t addr;if (ins->position == 0){socklen_t remote_len = sizeof(addr.addr_in);addr.fd = accept(ins->sk_fd, (struct sockaddr *)&addr.addr_in, &remote_len);if (addr.fd == -1){goto FREE;}int index = ins->count++ % ins->conf->parallel;if (index != 0){write(((int *)(ins->w_pipes))[index], (void *)&addr, sizeof(addr));return NULL;}}else{int ret = read(ins->r_pipe, &addr, sizeof(addr));if (ret != sizeof(addr)){goto CLOSE;}}SET_NONBLOCK(addr.fd);new_node->addr_in = addr.addr_in;new_node->ep_fd = ins->ep_fd;new_node->sk_fd = ins->position == 0 ? ins->sk_fd : ins->r_pipe;new_node->fd = addr.fd;return new_node;CLOSE:close(addr.fd);
FREE:free(new_node);new_node = NULL;return new_node;
}static int server_read_cb(ep_node_t *node)
{unsigned char read_data[READ_SIZE];memset(read_data, 0x00, READ_SIZE);int ret = read(node->fd, read_data, READ_SIZE);if (ret > 0){node->r_sum += ret;EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);}else if (ret <= 0){close(node->fd);EPOLL_ET_DEL(node);free(node);node = NULL;}return ret;
}static int server_write_cb(ep_node_t *node)
{unsigned char write_data[WRITE_SIZE];memset(write_data, 0x30, WRITE_SIZE);int ret = write(node->fd, write_data, WRITE_SIZE);if (ret >= 0){node->w_sum += ret;EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);}else{printf("write finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);free(node);node = NULL;}return 0;
}void *tcp_server_process(void *arg)
{ep_instance_t *ins = (ep_instance_t *)arg;if (!ins){return NULL;}ins->ep_fd = epoll_create(EPOLL_SIZE);if (ins->ep_fd == -1){return NULL;                }int sk_fd = ins->position == 0 ? ins->sk_fd : ins->r_pipe;ep_node_t sk_fd_node = {.ep_fd = ins->ep_fd, .sk_fd = sk_fd, .fd = sk_fd, };if (EPOLL_ET_CTL(&sk_fd_node, EPOLL_CTL_ADD, EPOLLIN)){return NULL;}struct epoll_event active_events[EPOLL_SIZE + 1];memset(&active_events, 0x00, sizeof(active_events));int i = 0;for(;;){int active = epoll_wait(ins->ep_fd, active_events, EPOLL_SIZE + 1, -1);for (i = 0; i < active; i++){ep_node_t *node = (ep_node_t *)active_events[i].data.ptr;// 新连接if (node->fd == node->sk_fd){ep_node_t *new_node = accept_cb(ins, node);if (new_node){if (EPOLL_ET_CTL(new_node, EPOLL_CTL_ADD, EPOLLIN)){close(new_node->fd);free(new_node);new_node = NULL;}else{printf("pos: %d, fd: %d, remote ip: %s, remote port: %d\n", ins->position, new_node->fd, inet_ntoa(new_node->addr_in.sin_addr), ntohs(new_node->addr_in.sin_port));}}EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);}// 读事件else if (active_events[i].events & EPOLLIN){unsigned char read_data[READ_SIZE];memset(read_data, 0x00, READ_SIZE);int ret = read(node->fd, read_data, READ_SIZE);if (ret <= 0){printf("peer closed or read failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);free(node);node = NULL;continue;}node->r_sum += ret;EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);}// 写事件else if (active_events[i].events & EPOLLOUT){unsigned char write_data[WRITE_SIZE];memset(write_data, 0x39, WRITE_SIZE);int ret = write(node->fd, write_data, WRITE_SIZE);if (ret < 0){printf("write failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);free(node);node = NULL;continue;}node->w_sum += ret;EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);}}}return NULL;
}int tcp_server(config_t *conf)
{int i = 0, tmp_sk_fd = 0;if ((tmp_sk_fd = tcp_socket(conf)) < 0){return -1;}int tmp_pipes_fd[PARALLEL_MAX][2];memset(tmp_pipes_fd, 0x00, sizeof(tmp_pipes_fd));ep_instance_t tmp_ins_arr[PARALLEL_MAX];memset(tmp_ins_arr, 0x00, sizeof(tmp_ins_arr));int tmp_w_pipes[PARALLEL_MAX];memset(tmp_w_pipes, 0x00, sizeof(tmp_w_pipes));pthread_t pids[PARALLEL_MAX];memset(pids, 0x00, sizeof(pids));for (i = 1; i < conf->parallel; i++){pipe(tmp_pipes_fd[i]);SET_NONBLOCK(tmp_pipes_fd[i][0]);SET_NONBLOCK(tmp_pipes_fd[i][1]);tmp_ins_arr[i].conf = conf;tmp_ins_arr[i].position = i;tmp_ins_arr[i].r_pipe = tmp_pipes_fd[i][0];tmp_w_pipes[i] = tmp_pipes_fd[i][1];pthread_create(&pids[i], NULL, tcp_server_process, (void *)&tmp_ins_arr[i]);}tmp_ins_arr[0].conf = conf;tmp_ins_arr[0].position = 0;tmp_ins_arr[0].sk_fd = tmp_sk_fd;memcpy(tmp_ins_arr[0].w_pipes, tmp_w_pipes, sizeof(tmp_w_pipes));tcp_server_process((void *)&tmp_ins_arr[0]);for (i = 1; i < conf->parallel; i++){pthread_join(pids[i], NULL);}return 0;
}void* tcp_client(void *arg)
{config_t *conf = (config_t *)arg;ep_node_t fd_node;memset(&fd_node, 0x00, sizeof(fd_node));int ep_fd = epoll_create(EPOLL_SIZE);if (ep_fd == -1){return NULL;                }int fd = tcp_socket(conf);if (fd < 0){return NULL;}fd_node.ep_fd = ep_fd;fd_node.fd = fd;if (EPOLL_ET_CTL(&fd_node, EPOLL_CTL_ADD, EPOLLOUT)){return NULL;}struct epoll_event active_events[EPOLL_SIZE + 1];memset(&active_events, 0x00, sizeof(active_events));int i = 0;for(;;){int active = epoll_wait(ep_fd, active_events, EPOLL_SIZE + 1, -1);for (i = 0; i < active; i++){ep_node_t *node = (ep_node_t *)active_events[i].data.ptr;if (active_events[i].events & EPOLLIN){unsigned char read_data[WRITE_SIZE];memset(read_data, 0x00, WRITE_SIZE);int ret = read(node->fd, read_data, WRITE_SIZE);if (ret <= 0){printf("peer closed or read failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);continue;}node->r_sum += ret;if (node->r_sum >= 1024 * 1024 * 512 && node->w_sum >= 1024 * 1024 * 1024){printf("rw finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);return NULL;}EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);}else if (active_events[i].events & EPOLLOUT){unsigned char write_data[READ_SIZE];memset(write_data, 0x39, READ_SIZE);int ret = write(node->fd, write_data, READ_SIZE);if (ret < 0){printf("write failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);continue;}node->w_sum += ret;if (node->r_sum >= 1024 * 1024 * 512 && node->w_sum >= 1024 * 1024 * 1024){printf("rw finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);return NULL;}EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);}}}return 0;
}void sig_cb(int sig)
{printf("capture sig: %d\n", sig);
}int main(int argc, char *argv[])
{if (argc != 5){printf("Argc failed\n");return -1;}signal(SIGPIPE, sig_cb);config_t conf = {.mode = atoi(argv[1]),.addr = argv[2],.port = atoi(argv[3]),.parallel = atoi(argv[4]) > PARALLEL_MAX ? PARALLEL_MAX : atoi(argv[4]),.addr_in.sin_family = AF_INET,.addr_in.sin_addr.s_addr = inet_addr(argv[2]),.addr_in.sin_port = htons(atoi(argv[3])),.addr_in_len = sizeof(struct sockaddr_in),};int i = 0;if (conf.mode == 0){tcp_server(&conf);}else{pthread_t pids[PARALLEL_MAX];for (i = 1; i < conf.parallel; i++){pthread_create(&pids[i], NULL, tcp_client, &conf);}tcp_client(&conf);for (i = 1; i < conf.parallel; i++){pthread_join(pids[i], NULL);}printf("after pthread_join\n");}return 0;
}

        编译命令: gcc tcp_test.c -o tcp_test -lpthread

测试

        ubuntu22上启动ebpf程序: ./for_flow

        ubuntu22上启动tcp服务端: ./tcp_test 0 192.168.20.11 6230 2 // 0代表服务端, 2代表线程数

        centos7上启动tcp客户端: ./tcp_test 1 192.168.20.11 6230 2 // 1代表客户端, 2代表线程数, 且每个线程一个tcp连接

结果

        结果入下图:

         从结果可以看到, 客户端读取536870912字节(512MB), 服务端写入536870912字节(512MB), tcp测试程序代码里做的判断的也是1024*1024*512字节, 而最后ebpf统计的也是536870912字节(512MB), 结果匹配, ebpf统计tcp服务端写流量成功.

        其实这个代码改一改也可以统计tcp服务端读取的字节数, 只要探测函数改成tcp_recvmsg, 其他的逻辑判断再改一改即可. 但是改造的过程中可能要踩一下坑, 我已经改造成功, 就不放出来了. 如果有同学感兴趣可以尝试下. 如果改造的过程中遇到问题, 也可以交流交流. 


http://www.ppmy.cn/news/283595.html

相关文章

【周赛318 LeetCode 6230】长度为 K 子数组中的最大和

题目描述 给你一个整数数组 nums 和一个整数 k 。请你从 nums 中满足下述条件的全部子数组中找出最大子数组和&#xff1a; 子数组的长度是 k&#xff0c;且子数组中的所有元素 各不相同 。返回满足题面要求的最大子数组和。如果不存在子数组满足这些条件&#xff0c;返回 0 …

pyinstaller OSError: could not get source code

pyinstaller打包pytorch框架分类模型报错&#xff0c; 报错&#xff1a;pyinstaller OSError: could not get source code 解决思路&#xff1a;网上大部分资料都是降低torchvision&#xff0c;然后继续解决新的报错&#xff0c;这里提供另一个思路&#xff0c;就是直接把报错…

osgi 学习系列(十一)非eclipse环境启动osgi配置

先说下bundle打jar包 选中要打包的bundle&#xff0c;export&#xff0c;在弹出的view中选中下图所示 在Destination中设置导出路径&#xff0c;Options修改如下&#xff0c;直接Finish即可 下面看下启动配置的目录结构 run.bat内容如下 java -Dosgi.noShutdowntrue -Dse…

Python基于指定范围筛选并剔除Excel表格中的数据

本文介绍基于Python语言&#xff0c;读取Excel表格文件&#xff0c;基于我们给定的规则&#xff0c;对其中的数据加以筛选&#xff0c;将不在指定数据范围内的数据剔除&#xff0c;保留符合我们需要的数据的方法。 首先&#xff0c;我们来明确一下本文的具体需求。现有一个Exce…

Linux ARM平台开发系列讲解(网络篇) 2.1 Marvell 88EA6321/6320 Switch 数据手册阅读之了解芯片功能

1. 概述 注意:88EA6321/88EA6320 和 88E6321/88E6320软件上无区别,区别在于硬件外围电源上 Marvell 88EA6321/88EA6320设备是一个集成了7端口千兆以太网交换机的单芯片,带有两个集成的千兆以太网收发器。该设备支持最新的IEEE 802.1音频视频桥接(AVB)标准。这些设备使用这…

如何使用ebpf kprobe探测内核函数

前言 在这之前, 我也曾使用过ebpf来改造我自己的项目, 最后也成功引入了项目, 有兴趣的同学可以查看此文章. 如何用ebpf开启tun网卡的TUNSETSTEERINGEBPF功能_我不买vip的博客-CSDN博客 但是该文章里并没有实质性的内容, 比如ebpf的map未曾涉及, 探测类型也未曾涉及, 只是一个空…

网络协议分析(2)判断两个ip数据包是不是同一个数据包分片

一个节点收到两个IP包的首部如下&#xff1a; &#xff08;1&#xff09;45 00 05 dc 18 56 20 00 40 01 bb 12 c0 a8 00 01 c0 a8 00 67 &#xff08;2&#xff09;45 00 00 15 18 56 00 b9 49 01 e0 20 c0 a8 00 01 c0 a8 00 67 分析并判断这两个IP包是不是同一个数据报的分片…

leetcode周赛第二题6230. 长度为 K 子数组中的最大和

题目&#xff1a; 给你一个整数数组 nums 和一个整数 k 。请你从 nums 中满足下述条件的全部子数组中找出最大子数组和&#xff1a; 子数组的长度是 k&#xff0c;且 子数组中的所有元素 各不相同 。 返回满足题面要求的最大子数组和。如果不存在子数组满足这些条件&#xff0…