前言
上篇文章我们已经初步使用kprobe来探测内核函数了, 这篇文章就在上篇文章的基础上做个修改, 通过kprobe探测内核函数tcp_sendmsg来统计tcp服务端的发送流量. 废话不多说, 直接上正文.
环境
tcp服务端运行在ubuntu22, 监听端口为6230, 其内核为5.19.0-26-generic, ebpf程序同样运行在ubuntu22.
tcp客户端运行在centos7, 其内核为3.10.0-1160.el7.x86_64.
代码
ebpf代码同样分为两个文件, 一个是内核态代码, 探测内核函数, 并把相关信息写入map, 在本文中是tcp服务端发送的流量大小(Byte). 一个是用户态代码, 定时读取map并打印流量大小(Byte).
for_flow_kern.c代码如下:
// for_flow_kern.c#include <linux/skbuff.h>
#include <linux/netdevice.h>
#include <linux/version.h>
#include <uapi/linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
#include <net/sock.h>
#include "trace_common.h"#define MAX_ENTRIES 64struct {__uint(type, BPF_MAP_TYPE_HASH);__type(key, u64);__type(value, u32);__uint(max_entries, MAX_ENTRIES);
} pid_2_port_map SEC(".maps");struct {__uint(type, BPF_MAP_TYPE_HASH);__type(key, u64);__type(value, u64);__uint(max_entries, MAX_ENTRIES);
} pid_2_flow_map SEC(".maps");SEC("kprobe/tcp_sendmsg")
int trace_sys_send(struct pt_regs *ctx)
{int ret = 0;u16 family = 0;struct sock *sock = (struct sock *)PT_REGS_PARM1_CORE(ctx);if ((ret = bpf_probe_read_kernel(&family, sizeof(family), &sock->sk_family))){return 0;}if (family != AF_INET){return 0;}u16 port_tmp = 0;if ((ret = bpf_probe_read_kernel(&port_tmp, sizeof(port_tmp), &sock->sk_num))){return 0;}if (port_tmp == 6230){u64 pid = bpf_get_current_pid_tgid();u32 port = port_tmp;bpf_map_update_elem(&pid_2_port_map, (const void *)&pid, &port, BPF_ANY);}return 0;
}SEC("kretprobe/tcp_sendmsg")
int trace_sys_send_ret(struct pt_regs *ctx)
{int ret = 0;u64 pid = bpf_get_current_pid_tgid();// 获取pidu32 *value_ptr = bpf_map_lookup_elem(&pid_2_port_map, &pid);if (!value_ptr){return 0;}// 获取tcp_sendmsg返回值int size = PT_REGS_RC(ctx);if (size > 0){// 查找flowu64 *flow_ptr = bpf_map_lookup_elem(&pid_2_flow_map, &pid);u64 sum = flow_ptr == NULL ? (0 + size) : (*flow_ptr + size); bpf_map_update_elem(&pid_2_flow_map, &pid, &sum, BPF_ANY);}return 0;
}char _license[] SEC("license") = "GPL";
u32 _version SEC("version") = LINUX_VERSION_CODE;
for_flow_user.c代码如下:
// for_flow_user.c#define _GNU_SOURCE
#include <sched.h>
#include <stdio.h>
#include <sys/types.h>
#include <asm/unistd.h>
#include <unistd.h>
#include <assert.h>
#include <sys/wait.h>
#include <stdlib.h>
#include <signal.h>
#include <string.h>
#include <time.h>
#include <arpa/inet.h>
#include <errno.h>#include <bpf/bpf.h>
#include <bpf/libbpf.h>int main(int argc, char **argv)
{struct bpf_object *obj;int i = 0;int mfd1 = 0;struct bpf_link *link1 = NULL;struct bpf_program *prog1;int mfd2 = 0;struct bpf_link *link2 = NULL;struct bpf_program *prog2;char filename[256];snprintf(filename, sizeof(filename), "%s_kern.o", argv[0]);// objobj = bpf_object__open_file(filename, NULL);if (libbpf_get_error(obj)) {fprintf(stderr, "ERROR: opening BPF object file failed\n");return -1;}if (bpf_object__load(obj)) {fprintf(stderr, "ERROR: loading BPF object file failed\n");goto END;}// ------------- //prog1 = bpf_object__find_program_by_name(obj, "trace_sys_send");if (!prog1) {printf("finding a prog1 in obj file failed\n");goto END;}prog2 = bpf_object__find_program_by_name(obj, "trace_sys_send_ret");if (!prog2) {printf("finding a prog2 in obj file failed\n");goto END;}// ------------- //mfd1 = bpf_object__find_map_fd_by_name(obj, "pid_2_port_map");if (mfd1 < 0){fprintf(stderr, "ERROR: finding a map mfd1 in obj file failed\n");goto END; }mfd2 = bpf_object__find_map_fd_by_name(obj, "pid_2_flow_map");if (mfd2 < 0){fprintf(stderr, "ERROR: finding a map mfd2 in obj file failed\n");goto END; }// ------------- //link1 = bpf_program__attach(prog1);if (libbpf_get_error(link1)) {fprintf(stderr, "ERROR: bpf_program__attach link1 failed\n");link1 = NULL;goto END;}link2 = bpf_program__attach(prog2);if (libbpf_get_error(link2)) {fprintf(stderr, "ERROR: bpf_program__attach link2 failed\n");link2 = NULL;goto END;}for (i = 0; i < 1000; i++){unsigned long long key = 0;unsigned long long next_key = 0;int j = 0;while (bpf_map_get_next_key(mfd2, &key, &next_key) == 0) {unsigned int value = 0;bpf_map_lookup_elem(mfd1, &next_key, &value);fprintf(stdout, "pid%d: %llu, flow: %u\n", ++j, next_key, value);key = next_key;}key = 0;next_key = 0;j = 0;while (bpf_map_get_next_key(mfd2, &key, &next_key) == 0) {unsigned long long value = 0;bpf_map_lookup_elem(mfd2, &next_key, &value);fprintf(stdout, "pid%d: %llu, flow: %llu\n", ++j, next_key, value);key = next_key;}printf("-----------------------\n");sleep(2);}END:bpf_link__destroy(link1);bpf_link__destroy(link2);bpf_object__close(obj);return 0;
}
把for_flow_kern.c和for_flow_user.c两个文件加入Makefile后编译.
在这里提供一个自己编写的tcp客户端和服务端测试程序, 可能会有bug, 但是应付这个测试是没问题的. 客户端和服务端互相通信, 客户端每次写1024字节, 每次读512字节. 服务端每次读1024字节, 每次写512字节. 代码如下:
// tcp_test.c#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <signal.h>
#include <sys/ioctl.h>
#include <linux/tcp.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>#define EPOLL_SIZE 1024
#define PARALLEL_MAX 16
#define READ_SIZE 1024
#define WRITE_SIZE 512#define SET_NONBLOCK(fd) ({fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);})#define EPOLL_ET_CTL(node, op, event) \
({\struct epoll_event ev = {0x00}; \ev.events = (event) | EPOLLET; \ev.data.ptr = (node); \epoll_ctl((node)->ep_fd, (op), (node)->fd, &ev); \
})#define EPOLL_ET_DEL(node) \
({\epoll_ctl((node)->ep_fd, EPOLL_CTL_DEL, (node)->fd, NULL); \
})/*
* 命令行参数
*/
typedef struct config
{int mode;char *addr;unsigned short int port;int parallel;struct sockaddr_in addr_in;socklen_t addr_in_len;
}config_t;/*
* epoll树的节点
*/
typedef struct ep_node
{struct sockaddr_in addr_in;int ep_fd;int sk_fd;int fd;long long r_sum;long long w_sum;
}ep_node_t;typedef struct ep_instance
{config_t *conf;int position; // 0代表主干线程 int ep_fd; // epoll树节点union {int sk_fd; // 主干epoll维护的socketint r_pipe; // 分支epoll维护的读pipe, 与w_pipes对应};int w_pipes[PARALLEL_MAX]; // 主干线程维护的各子线程pipe的写端long long count;
}ep_instance_t;typedef struct fd_and_addr
{int fd;struct sockaddr_in addr_in;
}fd_and_addr_t;static int tcp_socket(const config_t *conf)
{int sfd = socket(AF_INET, SOCK_STREAM, 0);if (sfd == -1){printf("socket failed, err msg: %s\n", strerror(errno));return -1;}int val1 = 1;if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR | (conf->mode == 0 ? SO_REUSEPORT : 0), (void *)&val1, sizeof(val1)) == -1){printf("setsockopt failed, err msg: %s\n", strerror(errno));goto FAILED; }if (conf->mode == 0){if (bind(sfd, (struct sockaddr*)&conf->addr_in, conf->addr_in_len) == -1){printf("bind failed, err msg: %s\n", strerror(errno));goto FAILED; }if (listen(sfd, 1024)){printf("bind failed, err msg: %s\n", strerror(errno));goto FAILED; } }else{if (connect(sfd, (struct sockaddr*)&conf->addr_in, conf->addr_in_len)){printf("connect failed, err msg: %s\n", strerror(errno));goto FAILED; } }int val2 = 1;if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&val2, sizeof(val2)) == -1){printf("setsockopt failed, err msg: %s\n", strerror(errno));goto FAILED; }SET_NONBLOCK(sfd);return sfd;FAILED:close(sfd);return -1;
}ep_node_t *accept_cb(ep_instance_t *ins, ep_node_t *node)
{ep_node_t *new_node = (ep_node_t *)malloc(sizeof(ep_node_t));if (!new_node){ return NULL;}fd_and_addr_t addr;if (ins->position == 0){socklen_t remote_len = sizeof(addr.addr_in);addr.fd = accept(ins->sk_fd, (struct sockaddr *)&addr.addr_in, &remote_len);if (addr.fd == -1){goto FREE;}int index = ins->count++ % ins->conf->parallel;if (index != 0){write(((int *)(ins->w_pipes))[index], (void *)&addr, sizeof(addr));return NULL;}}else{int ret = read(ins->r_pipe, &addr, sizeof(addr));if (ret != sizeof(addr)){goto CLOSE;}}SET_NONBLOCK(addr.fd);new_node->addr_in = addr.addr_in;new_node->ep_fd = ins->ep_fd;new_node->sk_fd = ins->position == 0 ? ins->sk_fd : ins->r_pipe;new_node->fd = addr.fd;return new_node;CLOSE:close(addr.fd);
FREE:free(new_node);new_node = NULL;return new_node;
}static int server_read_cb(ep_node_t *node)
{unsigned char read_data[READ_SIZE];memset(read_data, 0x00, READ_SIZE);int ret = read(node->fd, read_data, READ_SIZE);if (ret > 0){node->r_sum += ret;EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);}else if (ret <= 0){close(node->fd);EPOLL_ET_DEL(node);free(node);node = NULL;}return ret;
}static int server_write_cb(ep_node_t *node)
{unsigned char write_data[WRITE_SIZE];memset(write_data, 0x30, WRITE_SIZE);int ret = write(node->fd, write_data, WRITE_SIZE);if (ret >= 0){node->w_sum += ret;EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);}else{printf("write finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);free(node);node = NULL;}return 0;
}void *tcp_server_process(void *arg)
{ep_instance_t *ins = (ep_instance_t *)arg;if (!ins){return NULL;}ins->ep_fd = epoll_create(EPOLL_SIZE);if (ins->ep_fd == -1){return NULL; }int sk_fd = ins->position == 0 ? ins->sk_fd : ins->r_pipe;ep_node_t sk_fd_node = {.ep_fd = ins->ep_fd, .sk_fd = sk_fd, .fd = sk_fd, };if (EPOLL_ET_CTL(&sk_fd_node, EPOLL_CTL_ADD, EPOLLIN)){return NULL;}struct epoll_event active_events[EPOLL_SIZE + 1];memset(&active_events, 0x00, sizeof(active_events));int i = 0;for(;;){int active = epoll_wait(ins->ep_fd, active_events, EPOLL_SIZE + 1, -1);for (i = 0; i < active; i++){ep_node_t *node = (ep_node_t *)active_events[i].data.ptr;// 新连接if (node->fd == node->sk_fd){ep_node_t *new_node = accept_cb(ins, node);if (new_node){if (EPOLL_ET_CTL(new_node, EPOLL_CTL_ADD, EPOLLIN)){close(new_node->fd);free(new_node);new_node = NULL;}else{printf("pos: %d, fd: %d, remote ip: %s, remote port: %d\n", ins->position, new_node->fd, inet_ntoa(new_node->addr_in.sin_addr), ntohs(new_node->addr_in.sin_port));}}EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);}// 读事件else if (active_events[i].events & EPOLLIN){unsigned char read_data[READ_SIZE];memset(read_data, 0x00, READ_SIZE);int ret = read(node->fd, read_data, READ_SIZE);if (ret <= 0){printf("peer closed or read failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);free(node);node = NULL;continue;}node->r_sum += ret;EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);}// 写事件else if (active_events[i].events & EPOLLOUT){unsigned char write_data[WRITE_SIZE];memset(write_data, 0x39, WRITE_SIZE);int ret = write(node->fd, write_data, WRITE_SIZE);if (ret < 0){printf("write failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);free(node);node = NULL;continue;}node->w_sum += ret;EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);}}}return NULL;
}int tcp_server(config_t *conf)
{int i = 0, tmp_sk_fd = 0;if ((tmp_sk_fd = tcp_socket(conf)) < 0){return -1;}int tmp_pipes_fd[PARALLEL_MAX][2];memset(tmp_pipes_fd, 0x00, sizeof(tmp_pipes_fd));ep_instance_t tmp_ins_arr[PARALLEL_MAX];memset(tmp_ins_arr, 0x00, sizeof(tmp_ins_arr));int tmp_w_pipes[PARALLEL_MAX];memset(tmp_w_pipes, 0x00, sizeof(tmp_w_pipes));pthread_t pids[PARALLEL_MAX];memset(pids, 0x00, sizeof(pids));for (i = 1; i < conf->parallel; i++){pipe(tmp_pipes_fd[i]);SET_NONBLOCK(tmp_pipes_fd[i][0]);SET_NONBLOCK(tmp_pipes_fd[i][1]);tmp_ins_arr[i].conf = conf;tmp_ins_arr[i].position = i;tmp_ins_arr[i].r_pipe = tmp_pipes_fd[i][0];tmp_w_pipes[i] = tmp_pipes_fd[i][1];pthread_create(&pids[i], NULL, tcp_server_process, (void *)&tmp_ins_arr[i]);}tmp_ins_arr[0].conf = conf;tmp_ins_arr[0].position = 0;tmp_ins_arr[0].sk_fd = tmp_sk_fd;memcpy(tmp_ins_arr[0].w_pipes, tmp_w_pipes, sizeof(tmp_w_pipes));tcp_server_process((void *)&tmp_ins_arr[0]);for (i = 1; i < conf->parallel; i++){pthread_join(pids[i], NULL);}return 0;
}void* tcp_client(void *arg)
{config_t *conf = (config_t *)arg;ep_node_t fd_node;memset(&fd_node, 0x00, sizeof(fd_node));int ep_fd = epoll_create(EPOLL_SIZE);if (ep_fd == -1){return NULL; }int fd = tcp_socket(conf);if (fd < 0){return NULL;}fd_node.ep_fd = ep_fd;fd_node.fd = fd;if (EPOLL_ET_CTL(&fd_node, EPOLL_CTL_ADD, EPOLLOUT)){return NULL;}struct epoll_event active_events[EPOLL_SIZE + 1];memset(&active_events, 0x00, sizeof(active_events));int i = 0;for(;;){int active = epoll_wait(ep_fd, active_events, EPOLL_SIZE + 1, -1);for (i = 0; i < active; i++){ep_node_t *node = (ep_node_t *)active_events[i].data.ptr;if (active_events[i].events & EPOLLIN){unsigned char read_data[WRITE_SIZE];memset(read_data, 0x00, WRITE_SIZE);int ret = read(node->fd, read_data, WRITE_SIZE);if (ret <= 0){printf("peer closed or read failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);continue;}node->r_sum += ret;if (node->r_sum >= 1024 * 1024 * 512 && node->w_sum >= 1024 * 1024 * 1024){printf("rw finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);return NULL;}EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLOUT);}else if (active_events[i].events & EPOLLOUT){unsigned char write_data[READ_SIZE];memset(write_data, 0x39, READ_SIZE);int ret = write(node->fd, write_data, READ_SIZE);if (ret < 0){printf("write failed and then close peer, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);continue;}node->w_sum += ret;if (node->r_sum >= 1024 * 1024 * 512 && node->w_sum >= 1024 * 1024 * 1024){printf("rw finished, read size: %lld, write size: %lld\n", node->r_sum, node->w_sum);close(node->fd);EPOLL_ET_DEL(node);return NULL;}EPOLL_ET_CTL(node, EPOLL_CTL_MOD, EPOLLIN);}}}return 0;
}void sig_cb(int sig)
{printf("capture sig: %d\n", sig);
}int main(int argc, char *argv[])
{if (argc != 5){printf("Argc failed\n");return -1;}signal(SIGPIPE, sig_cb);config_t conf = {.mode = atoi(argv[1]),.addr = argv[2],.port = atoi(argv[3]),.parallel = atoi(argv[4]) > PARALLEL_MAX ? PARALLEL_MAX : atoi(argv[4]),.addr_in.sin_family = AF_INET,.addr_in.sin_addr.s_addr = inet_addr(argv[2]),.addr_in.sin_port = htons(atoi(argv[3])),.addr_in_len = sizeof(struct sockaddr_in),};int i = 0;if (conf.mode == 0){tcp_server(&conf);}else{pthread_t pids[PARALLEL_MAX];for (i = 1; i < conf.parallel; i++){pthread_create(&pids[i], NULL, tcp_client, &conf);}tcp_client(&conf);for (i = 1; i < conf.parallel; i++){pthread_join(pids[i], NULL);}printf("after pthread_join\n");}return 0;
}
编译命令: gcc tcp_test.c -o tcp_test -lpthread
测试
ubuntu22上启动ebpf程序: ./for_flow
ubuntu22上启动tcp服务端: ./tcp_test 0 192.168.20.11 6230 2 // 0代表服务端, 2代表线程数
centos7上启动tcp客户端: ./tcp_test 1 192.168.20.11 6230 2 // 1代表客户端, 2代表线程数, 且每个线程一个tcp连接
结果
结果入下图:
从结果可以看到, 客户端读取536870912字节(512MB), 服务端写入536870912字节(512MB), tcp测试程序代码里做的判断的也是1024*1024*512字节, 而最后ebpf统计的也是536870912字节(512MB), 结果匹配, ebpf统计tcp服务端写流量成功.
其实这个代码改一改也可以统计tcp服务端读取的字节数, 只要探测函数改成tcp_recvmsg, 其他的逻辑判断再改一改即可. 但是改造的过程中可能要踩一下坑, 我已经改造成功, 就不放出来了. 如果有同学感兴趣可以尝试下. 如果改造的过程中遇到问题, 也可以交流交流.