Kafka 业务日志采集最佳实践

news/2024/10/18 6:06:49/

简介

Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据流管道和应用程序。在收集业务日志的场景中,Kafka 可以作为一个消息中间件,用于接收、存储和转发大量的日志数据。将 Kafka 与其他系统(如 Elasticsearch、Flume、Spark Streaming 等)集成,以提供更丰富的日志处理和分析功能。本文提到的是和观测云集成,即通过观测云的采集器 Datakit 采集 Kafka 中的业务日志,下面通过一些例子了解下观测云的快速集成效果。

实践环境

前置条件

  • 注册观测云

软件和中间件

  • Kafka3.2.0
  • Datakit采集器
  • JDK 8

硬件

  • 云服务器 CentOS7.9 64位 4vCPU,8GB 内存,100GB 云盘一台。

接入方案

准备 Kafka 环境

安装 Kafka

下载 3.2.0 版本,解压即可使用。

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz

注:目前 Datakit 支持的 Kafka 版本有[version:0.8.2 ~ 3.2.0]

启动 Zookeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动 KafkaServer
$ bin/kafka-server-start.sh config/server.properties
创建 Topic

创建名为 testlog 的 Topic 。

$ bin/kafka-topics.sh --create --topic testlog --bootstrap-server localhost:9092
启动 Producer
$ bin/kafka-console-producer.sh --topic testlog --bootstrap-server localhost:9092

安装 DataKit

参考官网文档安装 DataKit 采集器

TOKEN 依据你的观测云工作空间来填写
DK_DATAWAY=https://openway.guance.com?token=<TOKEN> bash -c "$(curl -L https://static.guance.com/datakit/install.sh)"

开启 Kafka 采集器

进入 DataKit 安装目录下 (默认是 /usr/local/datakit/conf.d/ ) 的 conf.d/kafkamq 目录,复制 kafkamq.conf.sample 并命名为 kafkamq.conf 。

类似如下:

-rwxr-xr-x 1 root root 2574 Apr 30 23:52 kafkamq.conf
-rwxr-xr-x 1 root root 2579 May  1 00:40 kafkamq.conf.sample

调制 kafka 采集器配置如下:

  • addrs = ["localhost:9092"],该文采集器 DataKit 和 Kafka 安装到同一台操作系统中,localhost 即可。
  • kafka_version = "3.2.0",该文使用 Kafka 的版本。
  • [inputs.kafkamq.custom],删除注释符号“#”。
  • [inputs.kafkamq.custom.log_topic_map],删除注释符号“#”。
  • "testlog"="log.p",testlog 为 Topic 的名字,log.p 为观测云 Pipeline 可编程数据处理器的日志字段提取规则配置。涉及的业务日志和 log.p 的内容详细见下面的《使用 Pipeline》。
# {"version": "1.28.1", "desc": "do NOT edit this line"}[[inputs.kafkamq]]addrs = ["localhost:9092"]# your kafka version:0.8.2 ~ 3.2.0kafka_version = "3.2.0"group_id = "datakit-group"# consumer group partition assignment strategy (range, roundrobin, sticky)assignor = "roundrobin"## rate limit.#limit_sec = 100## sample# sampling_rate = 1.0## kafka tls config# tls_enable = true# tls_security_protocol = "SASL_PLAINTEXT"# tls_sasl_mechanism = "PLAIN"# tls_sasl_plain_username = "user"# tls_sasl_plain_password = "pw"## -1:Offset Newest, -2:Offset Oldestoffsets=-1## skywalking custom#[inputs.kafkamq.skywalking]## Required!send to datakit skywalking input.#dk_endpoint="http://localhost:9529"#thread = 8 #topics = [#  "skywalking-metrics",#  "skywalking-profilings",#  "skywalking-segments",#  "skywalking-managements",#  "skywalking-meters",#  "skywalking-logging",#]#namespace = ""## Jaeger from kafka. Please make sure your Datakit Jaeger collector is open !!!#[inputs.kafkamq.jaeger]## Required! ipv6 is "[::1]:9529"#dk_endpoint="http://localhost:9529"#thread = 8 #source: agent,otel,others...#source = "agent"## Required! topics#topics=["jaeger-spans","jaeger-my-spans"]## user custom message with PL script.[inputs.kafkamq.custom]#spilt_json_body = true#thread = 8 ## spilt_topic_map determines whether to enable log splitting for specific topic based on the values in the spilt_topic_map[topic].#[inputs.kafkamq.custom.spilt_topic_map]#  "log_topic"=true#  "log01"=false[inputs.kafkamq.custom.log_topic_map]"testlog"="log.p"#  "log01"="log_01.p"#[inputs.kafkamq.custom.metric_topic_map]#  "metric_topic"="metric.p"#  "metric01"="rum_apm.p"#[inputs.kafkamq.custom.rum_topic_map]#  "rum_topic"="rum_01.p"#  "rum_02"="rum_02.p"#[inputs.kafkamq.remote_handle]## Required!#endpoint="http://localhost:8080"## Required! topics#topics=["spans","my-spans"]# send_message_count = 100# debug = false# is_response_point = true# header_check = false## Receive and consume OTEL data from kafka.#[inputs.kafkamq.otel]#dk_endpoint="http://localhost:9529"#trace_api="/otel/v1/trace"#metric_api="/otel/v1/metric"#trace_topics=["trace1","trace2"]#metric_topics=["otel-metric","otel-metric1"]#thread = 8 ## todo: add other input-mq

注意:开启或调整 DataKit 的配置,需重启采集器(shell 下执行 datakit service -R)。

使用 Pipeline

log.p 规则内容

data = load_json(message)
protocol = data["protocol"]
response_code = data["response_code"]
set_tag(protocol,protocol)
set_tag(response_code,response_code)
group_between(response_code,[200,300],"info","status")
group_between(response_code,[400,499],"warning","status")
group_between(response_code,[500,599],"error","status")time = data["start_time"]
set_tag(time,time)
default_time(time)

效果展示

发送业务日志样例

业务日志样例文件如下:

#info
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:37:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#error
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:39:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#warn
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:38:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

日志发送命令

在 Producer 启动后,分别发送如下三条日志内容,三条日志一条为 info 级别("response_code":204),另一条为 error 级别("response_code":504),最后一条为 warn 级别日志("response_code":404)。

>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

通过 DataKit 采集到 Kafka 的三条业务日志

使用 Pipeline 对业务日志进行字段提取

下图 protocol、response_code 以及 time 都是使用 Pipeline 提取后的效果。


http://www.ppmy.cn/news/1461940.html

相关文章

web安全day03

MYSQL注入&#xff1a; SQL 注入的原理、危害及防御措施 SQL 注入的原理&#xff1a;原本的 SQL 语句在与用户可控的参数经过了如拼接、替换等字符串操作后&#xff0c;得到一个新的 SQL 语句并被数据库解析执行&#xff0c;从而达到非预期的效果。 SQL 注入的危害&#xff…

探索制造业中 AI 人工智能与 MES 的协同效应

AI已经渗透到我们日常生活中的各种工具中——从语音助手到语言翻译&#xff0c;以及允许我们从图片、手写笔记中提取结构化数据的工具。AI还为许多流程自动化提供动力&#xff0c;纾解管理、物流、会计和人力资源部门员工的工作量和压力。 AI、物联网和新兴的超高速网络&#…

254 基于matlab的钢筋混凝土非线性分析

基于matlab的钢筋混凝土非线性分析&#xff0c;根据梁本构关系&#xff0c;然后进行非线性分析&#xff0c;绘制弯矩-曲率曲线。可设置梁的截面尺寸、混凝土本构&#xff0c;钢筋截面面积等相关参数&#xff0c;程序已调通&#xff0c;可直接运行。 254 钢筋混凝土非线性分析 弯…

Leaflet.canvaslabel在Ajax异步请求时bindPopup无效的解决办法

目录 前言 一、场景重现 1、遇到问题的代码 2、问题排查 二、通过实验验证猜想 1、排查LayerGroup和FeatureGroup 2、排查Leaflet.canvaslabel.js 三、柳暗花明又一村 1、点聚类的办法 2、歪打正着 总结 前言 在上一篇博客中介绍了基于SpringBoot的全国风景区WebGIS按…

自然语言处理(NLP)技术及举例说明

自然语言处理&#xff08;NLP&#xff09;技术是一种人工智能技术&#xff0c;在处理人类自然语言的文本或语音时&#xff0c;可以帮助计算机理解、解释和生成语言。 以下是一些常见的自然语言处理技术的例子&#xff1a; 机器翻译&#xff1a;机器翻译技术可以将一种语言的文…

springboot实现文件防盗链设计

shigen坚持更新文章的博客写手&#xff0c;擅长Java、python、vue、shell等编程语言和各种应用程序、脚本的开发。记录成长&#xff0c;分享认知&#xff0c;留住感动。 个人IP&#xff1a;shigen &#x1f44b;&#x1f44b;&#x1f44b;hello&#xff0c;伙伴们好久不见&…

MybatisPlus拓展功能(内附全功能代码)

目录 代码生成 静态工具 案例 逻辑删除 枚举处理器 ​编辑 Json处理器 分页插件功能 ​编辑 案例 封装转换方法 代码生成 静态工具 案例 Overridepublic UserVO queryUserAndAddressById(long id) { // 1.查询用户User user getById(id);if (user null || …

UIKit之图片浏览器

功能需求 实现一个图片浏览器&#xff0c;点击左右按钮可以切换背景图&#xff0c;且更新背景图对应的索引页和图片描述内容。 分析&#xff1a; 实现一个UIView的子类即可&#xff0c;该子类包含多个按钮。 实现步骤&#xff1a; 使用OC语言&#xff0c;故创建cocoa Touch类…