使用Fluent-bit将容器标准输入和输出的日志发送到Kafka

devtools/2025/3/16 13:33:59/

 什么是fluent-bit?

         Fluent Bit 是一款开源的轻量级日志处理器与转发器,专为嵌入式系统、容器化环境及分布式架构设计。其核心功能包括日志收集、过滤、聚合和传输,支持多种输入源(如文件、系统日志、HTTP接口)和输出目标(如Elasticsearch、Kafka、云存储服务)

 工作流程

日志通过数据管道从数据源发送到目的地,一个数据管道可以由input,paser,filter,buffer,routing,output等组成。

  • input插件:用于从数据源抽取数据,一个数据管道中可以包含多个input

  • parser组件:负责将input抽取的非结构化数据转化为标准的结构化数据,每个input均可以定义自己的parser

  • filter插件:负责对格式化数据进行过滤和修改。一个数据管道中可以包含多个filter,多个filter执行顺序与配置文件中的顺序一致

  • buffer组件:用户缓存经过filter处理的数据,默认情况下buffer把input插件的数据缓存到内存中,直到路由传递到output为止

  • routing组件:将buffer中缓存的数据路由到不同的output

  • output插件:负责将数据发送到不同的目的地,一个数据管道中可以包含多个output

部署kafka集群

安装JDK

[root@k8s-master ~]# java -version
java version "21.0.6" 2025-01-21 LTS
Java(TM) SE Runtime Environment (build 21.0.6+8-LTS-188)
Java HotSpot(TM) 64-Bit Server VM (build 21.0.6+8-LTS-188, mixed mode, sharing)

部署kafka

[root@k8s-master kafaka-zookeeper]# tar -xvf kafka_2.12-3.8.0.tgz [root@k8s-master kafaka-zookeeper]#  mkdir -p /home/kafaka-zookeeper/kafka_2.12-3.8.0/{data,logs}
[root@k8s-master kafaka-zookeeper]#  cd /home/kafaka-zookeeper/kafka_2.12-3.8.0/config/
[root@k8s-master config]#  cp server.properties server.properties.bak
[root@k8s-master config]# sed -i "/#/d" server.properties
[root@k8s-master config]# ll
total 80
-rw-r--r-- 1 root root  906 Jul 23  2024 connect-console-sink.properties
-rw-r--r-- 1 root root  909 Jul 23  2024 connect-console-source.properties
-rw-r--r-- 1 root root 5475 Jul 23  2024 connect-distributed.properties
-rw-r--r-- 1 root root  883 Jul 23  2024 connect-file-sink.properties
-rw-r--r-- 1 root root  881 Jul 23  2024 connect-file-source.properties
-rw-r--r-- 1 root root 2063 Jul 23  2024 connect-log4j.properties
-rw-r--r-- 1 root root 2540 Jul 23  2024 connect-mirror-maker.properties
-rw-r--r-- 1 root root 2262 Jul 23  2024 connect-standalone.properties
-rw-r--r-- 1 root root 1221 Jul 23  2024 consumer.properties
drwxr-xr-x 2 root root 4096 Jul 23  2024 kraft
-rw-r--r-- 1 root root 4917 Jul 23  2024 log4j.properties
-rw-r--r-- 1 root root 2065 Jul 23  2024 producer.properties
-rw-r--r-- 1 root root  544 Mar 14 14:27 server.properties
-rw-r--r-- 1 root root 6896 Mar 14 14:27 server.properties.bak
-rw-r--r-- 1 root root 1094 Jul 23  2024 tools-log4j.properties
-rw-r--r-- 1 root root 1169 Jul 23  2024 trogdor.conf
-rw-r--r-- 1 root root 1205 Jul 23  2024 zookeeper.properties
[root@k8s-master config]# vim server.properties
[root@k8s-master config]# cd ..
[root@k8s-master kafka_2.12-3.8.0]# cd bin/
[root@k8s-master bin]#  nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
[1] 16270

部署zookeeper

[root@k8s-node2 ~]# mkdir -p /home/kafaka-zookeeper
[root@k8s-node2 ~]# cd /home/kafaka-zookeeper/
[root@k8s-node2 kafaka-zookeeper]# mv apache-zookeeper-3.8.4-bin apache-zookeeper-3.8.4
[root@k8s-node2 kafaka-zookeeper]# mkdir -p /home/kafaka-zookeeper/apache-zookeeper-3.8.4/{data,logs}
[root@k8s-node2 kafaka-zookeeper]# cd /home/kafaka-zookeeper/apache-zookeeper-3.8.4/conf/
[root@k8s-node2 conf]#  mv zoo_sample.cfg zoo.cfg
[root@k8s-node2 data]# /home/kafaka-zookeeper/apache-zookeeper-3.8.4/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/kafaka-zookeeper/apache-zookeeper-3.8.4/bin/../conf/zoo.cfg
Starting zookeeper ... already running as process 11484.

配置时间同步:

sudo ntpdate ntp.aliyun.com

使用k8s部署fluent-bit

fluent-bit-config.yaml 

[root@k8s-master ~]# cat fluent-bit-config.yaml 
apiVersion: v1
kind: ConfigMap
metadata:name: fluent-bit-confignamespace: kube-system
data:fluent-bit.conf: |[SERVICE]Flush        5Log_Level    infoDaemon       offParsers_File parsers.conf[INPUT]Name              tailPath              /var/log/containers/*.logParser            dockerTag               k8s.*Refresh_Interval  5Mem_Buf_Limit     5MBSkip_Long_Lines   On[FILTER]Name                kubernetesMatch               k8s.*Kube_URL            https://kubernetes.default.svc:443Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crtKube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/tokenKube_Tag_Prefix     k8s.Merge_Log           OnKeep_Log            Off[OUTPUT]Name            kafkaMatch           *Brokers         192.168.9.128:9092,192.168.9.129:9092,192.168.9.130:9092Topics          k8s-logsrdkafka.compression.codec snappyparsers.conf: |[PARSER]Name        dockerFormat      jsonTime_Key    timeTime_Format %Y-%m-%dT%H:%M:%S.%LZDecode_Field_As   escaped_utf8    log

fluent-bit-daemonset.yaml 

[root@k8s-master ~]# cat fluent-bit-daemonset.yaml 
apiVersion: apps/v1
kind: DaemonSet
metadata:name: fluent-bitnamespace: kube-systemlabels:app: fluent-bit
spec:selector:matchLabels:app: fluent-bittemplate:metadata:labels:app: fluent-bitspec:serviceAccountName: fluent-bittolerations:  # 允许在所有节点(包括 Master)部署- key: node-role.kubernetes.io/masteroperator: Existseffect: NoSchedulecontainers:- name: fluent-bitimage: fluent/fluent-bit:1.9.10volumeMounts:- name: varlogmountPath: /var/log- name: dockercontainersmountPath: /var/lib/docker/containersreadOnly: true- name: fluent-bit-configmountPath: /fluent-bit/etc/volumes:- name: varloghostPath:path: /var/log- name: dockercontainershostPath:path: /var/lib/docker/containers  # 如果使用 Containerd,路径为 /var/log/pods- name: fluent-bit-configconfigMap:name: fluent-bit-config
---
# 创建 RBAC 权限
apiVersion: v1
kind: ServiceAccount
metadata:name: fluent-bitnamespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:name: fluent-bit-read
rules:
- apiGroups: [""]resources: ["pods", "namespaces"]verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:name: fluent-bit-read
roleRef:apiGroup: rbac.authorization.k8s.iokind: ClusterRolename: fluent-bit-read
subjects:
- kind: ServiceAccountname: fluent-bitnamespace: kube-system
[root@k8s-master ~]# kubectl get pod -n kube-system | grep flu
fluent-bit-4rvfx                           1/1     Running   0          31d
fluent-bit-lqhcm                           1/1     Running   0          31d
fluent-bit-r6xrh                           1/1     Running   0          31d

查看pod日志检查是否连接成功:

[root@k8s-master bin]# kubectl logs fluent-bit-lqhcm -n kube-system
Fluent Bit v1.9.10
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io[2025/02/11 08:58:15] [ info] [fluent bit] version=1.9.10, commit=760956f50c, pid=1
[2025/02/11 08:58:15] [ info] [storage] version=1.3.0, type=memory-only, sync=normal, checksum=disabled, max_chunks_up=128
[2025/02/11 08:58:15] [ info] [cmetrics] version=0.3.7
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] https=1 host=kubernetes.default.svc port=443
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0]  token updated
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] local POD info OK
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] testing connectivity with API server...
[2025/02/11 08:58:15] [ info] [filter:kubernetes:kubernetes.0] connectivity OK
[2025/02/11 08:58:15] [ info] [output:kafka:kafka.0] brokers='192.168.9.128:9092,192.168.9.129:9092,192.168.9.130:9092' topics='k8s-logs'
[2025/02/11 08:58:15] [ info] [sp] stream processor started

测试:

在生产者主机上发布消息:

[root@k8s-master bin]#  echo "test message" | /home/kafaka-zookeeper/kafka_2.12-3.8.0/bin/kafka-console-producer.sh   --broker-list 192.168.9.128:9092,192.168.9.129:9092,192.168.9.130:9092   --topic k8s-logs

在消费者主机进行查看:

/home/kafaka-zookeeper/kafka_2.12-3.8.0/bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.9.128:9092,192.168.9.129:9092,192.168.9.130:9092 \
--topic k8s-logs \
--from-beginning

可以看到容器的日志被输出到kafka当中:

 


http://www.ppmy.cn/devtools/167572.html

相关文章

PAT甲级(Advanced Level) Practice 1020 Tree Traversals

原题 1020 Tree Traversals - PAT (Advanced Level) Practice 题目大意 输入n表示二叉树的元素数量,再分别输入该树的后序和中序遍历,输出该树的层序遍历。 解题思路 想了很久也没想出来怎么直接把后序中序转化为层序,大家如果有直接转化…

微信小程序wx.request接口报错(errno: 600001, errMsg: “request:fail -2:net::ERR_FAILED“)

来看看报错 报错如下: 请求发送部分,代码如下: uni.request({url: self.serverUrl "/getRealName",method: GET,data: {"code": self.info.code,},header: {"Authorization": uni.getStorageSync(tokenHead) uni.getStorageSync(token)}}…

浏览器指纹——跨境业务

一、什么是浏览器指纹? 浏览器指纹(Browser Fingerprinting)是一种用于识别和追踪用户设备的技术。不同于 Cookie 或 IP 地址,浏览器指纹不会直接存储在用户设备上,而是通过收集用户的浏览器信息、字体、屏幕分辨率、…

Windows 11 安装Docker Desktop环境

1、确认CPU开启虚拟化 打开任务管理器,切换到“性能”选项卡,查看 CPU 信息。若“虚拟化”状态显示为“已启用”,则表示虚拟化已开启;若显示为“已禁用”,则需要在启动时进入 BIOS 开启虚拟化设置(若显示已…

【Quarkus】通过Quarkus集成后端服务示例

说明: REST资源接口(AuthResource)。REST资源实现类(AuthResourceImpl)。服务接口(AuthService)。服务实现类(AuthServiceImpl)。配置文件(application.prop…

基于 GEE 利用 Sentinel-1 双极化数据计算 SDWI 指数实现逐月提取水域面积

目录 1 SDWI 指数 2 研究方法及数据处理 3 完整代码 4 运行结果 1 SDWI 指数 Sentinel-1双极化数据SDWI水体提取指数公式:SDWI ln ⁡(10VVVH) Sentinel-1 Dual-Polarized Water Index (SDWI)水体信息提取方法对 Sentinel-1 双极化数据(VV 和 VH)之间水体信息…

【图像处理】ISP(Image Signal Processor) 图像处理器的用途和工作原理?

ISP(图像信号处理器)是数字影像设备的“视觉大脑”,负责将传感器捕获的原始电信号转化为我们看到的高清图像。以下从用途和工作原理两方面通俗解析: 一、ISP的核心用途:让照片“更像眼睛看到的” 提升画质&#xff1a…

​技术解析麦萌短剧《阴阳无极》:从「性别偏见下的对抗训练」到「分布式江湖的架构重构」​

《阴阳无极》以陈千叶的武道觉醒为线索,展现了传统系统的路径依赖困境与对抗性策略的范式突破。本文将从算法博弈视角拆解这场武侠革命的底层逻辑,探讨如何在性别偏见的数据集中完成模型的自我进化。 ​1. 初始模型偏差:继承权剥夺与梯度冻结…