大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch

news/2024/11/24 5:06:13/

文章目录

使用FileBeat采集Kafka日志到Elasticsearch

一、需求分析

二、配置FileBeats

1、input配置

2、output配置

三、配置文件

1、创建配置文件

2、复制一下到配置文件中

四、运行FileBeat

1、运行FileBeat

2、将日志数据上传到/var/kafka/log,并解压

五、查询数据

1、查看索引信息

六、解决一个日志涉及到多行问题

1、导入错误日志

2、问题分析

3、FileBeat多行配置选项

4、重新配置FileBeat


使用FileBeat采集Kafka日志到Elasticsearch

一、需求分析

在资料中有一个kafka_server.log.tar.gz压缩包,里面包含了很多的Kafka服务器日志,现在我们为了通过在Elasticsearch中快速查询这些日志,定位问题。我们需要用FileBeats将日志数据上传到Elasticsearch中。

问题:

  • 首先,我们要指定FileBeat采集哪些Kafka日志,因为FileBeats中必须知道采集存放在哪儿的日志,才能进行采集。
  • 其次,采集到这些数据后,还需要指定FileBeats将采集到的日志输出到Elasticsearch,那么Elasticsearch的地址也必须指定。

二、配置FileBeats

FileBeats配置文件主要分为两个部分。

  • inputs
  • output

从名字就能看出来,一个是用来输入数据的,一个是用来输出数据的。

1、input配置

filebeat.inputs:
- type: logenabled: truepaths:- /var/log/*.log#- c:\programdata\elasticsearch\logs\*

在FileBeats中,可以读取一个或多个数据源。

2、output配置

默认FileBeat会将日志数据放入到名称为:filebeat-%filebeat版本号%-yyyy.MM.dd 的索引中。

PS:

FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置选项。

三、配置文件

1、创建配置文件

cd /export/server/es/filebeat-7.6.1-linux-x86_64
vim filebeat_kafka_log.yml

2、复制一下到配置文件中

filebeat.inputs:
- type: logenabled: truepaths:- /export/server/es/data/kafka/server.log.*output.elasticsearch:hosts: ["node1:9200", "node2:9200", "node3:9200"]

四、运行FileBeat

1、运行FileBeat

./filebeat -c filebeat_kafka_log.yml -e

2、将日志数据上传到/var/kafka/log,并解压

mkdir -p /export/server/es/data/kafka/tar -xvzf kafka_server.log.tar.gz

注意: 文件权限的报错

如果在启动fileBeat的时候, 报了一个配置文件权限的错误, 请修改其权限为 -rw-r--r--

五、查询数据

1、查看索引信息

GET /_cat/indices?v

    {"health": "green","status": "open","index": "filebeat-7.6.1-2021.12.05-000001","uuid": "dplqB_hTQq2XeSk6S4tccQ","pri": "1","rep": "1","docs.count": "213780","docs.deleted": "0","store.size": "71.9mb","pri.store.size": "35.8mb"}

GET /filebeat-7.6.1-2021.12.05-000001/_search

            {"_index": "filebeat-7.6.1-2021.12.05-000001","_type": "_doc","_id": "-72pX3IBjTeClvZff0CB","_score": 1,"_source": {"@timestamp": "2021-12-05T09:00:40.041Z","log": {"offset": 55433,"file": {"path": "/var/kafka/log/server.log.2021-12-05-16"}},"message": "[2021-12-05 09:01:30,682] INFO Socket connection established, initiating session, client: /192.168.88.100:46762, server: node1.cn/192.168.88.100:2181 (org.apache.zookeeper.ClientCnxn)","input": {"type": "log"},"ecs": {"version": "1.4.0"},"host": {"name": "node1"},"agent": {"id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","version": "7.6.1","type": "filebeat","ephemeral_id": "b8fbf7ab-bc37-46dd-86c7-fa7d74d36f63","hostname": "node1"}}}

FileBeat自动给我们添加了一些关于日志、采集类型、Host各种字段。

六、​​​​​​​解决一个日志涉及到多行问题

我们在日常日志的处理中,经常会碰到日志中出现异常的情况。类似下面的情况:

[2021-12-05 14:00:05,725] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error when sending leader epoch request for Map(test_10m-2 -> (currentLeaderEpoch=Optional[161], leaderEpoch=158)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to node2:9092 (id: 1 rack: null) failed.at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102)at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 14:00:05,725] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test_10m-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)
[2021-12-05 14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Connection to node 1 (node2/192.168.88.101:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

在FileBeat中,Harvest是逐行读取日志文件的。但上述的日志会出现一条日志,跨多行的情况。有异常信息时,肯定会出现多行。我们先来看一下,如果默认不处理这种情况会出现什么问题。

1、​​​​​​​导入错误日志

1)在/export/server/es/data/kafka/中创建名为server.log.2021-12-05的日志文件

2)将资料中的err.txt日志文本贴入到该文件中

观察FileBeat,发现FileBeat已经针对该日志文件启动了Harvester,并读取到数据数据。

2021-12-05T19:11:01.236+0800    INFO    log/harvester.go:297    Harvester started for file: /var/kafka/log/server.log.2021-12-05

3)在Elasticsearch检索该文件

我们发现,原本是一条日志中的异常信息,都被作为一条单独的消息来处理了~

"message":"java.io.IOException:Connection to node2:9092 (id;

这明显是不符合我们的预期的,我们想要的是将所有的异常消息合并到一条日志中。那针对这种情况该如何处理呢?

2、​​​​​​​问题分析

每条日志都是有统一格式的开头的,就拿Kafka的日志消息来说,[2021-12-05 14:00:05,725]这是一个统一的格式,如果不是以这样的形式开头,说明这一行肯定是属于某一条日志,而不是独立的一条日志。所以,我们可以通过日志的开头来判断某一行是否为新的一条日志。

3、​​​​​​​FileBeat多行配置选项

在FileBeat的配置中,专门有一个解决一条日志跨多行问题的配置。主要为以下三个配置:

multiline.pattern: ^\[
multiline.negate: false
multiline.match: after

multiline.pattern表示能够匹配一条日志的模式,默认配置的是以[开头的才认为是一条新的日志。

multiline.negate:配置该模式是否生效,默认为false。

multiline.match:表示是否将未匹配到的行追加到上一日志,还是追加到下一个日志。

4、​​​​​​​重新配置FileBeat

1)修改filebeat.yml,并添加以下内容

filebeat.inputs:
- type: logenabled: truepaths:- /var/kafka/log/server.log.*multiline.pattern: '^\['multiline.negate: truemultiline.match: afteroutput.elasticsearch:hosts: ["node1:9200", "node2:9200", "node3:9200"]

2)修改「注册表」/data.json,将server.log.2021-12-05对应的offset设置为0

cd /export/server/es/filebeat-7.6.1-linux-x86_64/data/registry/filebeatvim data.json

3)删除之前创建的文档

// 删除指定文件的文档
POST /filebeat-7.6.1-2021.12.05-000001/_delete_by_query
{"query": {"match": {"log.file.path": "/var/kafka/log/server.log.2021-12-05"}}
}

 4)重新启动FileBeat

./filebeat -e

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

http://www.ppmy.cn/news/608424.html

相关文章

三宝

产品有三宝,弹窗,浮层加引导。 设计有三宝,透明,阴影加圆角。 运营有三宝,短信,推送加红包。 码农有一宝,这个做不了。 转载于:https://www.cnblogs.com/Oldz/p/10314769.html

项目引入 mqtt 报错 Uncaught ReferenceError: Buffer is not defined

项目引入 mqtt 报错 Uncaught ReferenceError: Buffer is not defined 最近在做一个 vite4.x react18 的项目,需要引入 mqtt.js , 基操直接引入 import mqtt from mqtt结果浏览器直接报错 Uncaught ReferenceError: Buffer is not defined. 首先想到的是不是 mqtt.js 库不能…

大数据ELK(二十):FileBeat是如何工作的

文章目录 FileBeat是如何工作的 一、input和harvester 1、inputs(输入)

重型车辆盲区行为检查Behaviours – Heavy Vehicle Blind Spots

重型车辆盲区行为检查Behaviours – Heavy Vehicle Blind Spots VISIBILITY AROUND HEAVY VEHICLES A blind spot is an area of the road outside the driver’s field of vision that cannot be seen in the rearview mirrors or through the windows. The taller and longer…

大数据ELK(二十一):Logstash简介和安装

文章目录 Logstash简介和安装 一、简介 1、经典架构 2、对比Flume

大数据ELK(二十二):采集Apache Web服务器日志

文章目录 采集Apache Web服务器日志 一、需求 二、准备日志数据

Mobileye高级驾驶辅助系统(ADAS)

Mobileye高级驾驶辅助系统(ADAS) Mobileye is the global leader in the development of vision technology for Advanced Driver Assistance Systems (ADAS) and autonomous driving. We have over 1,700 employees continuing our two-decade traditio…

大数据ELK(二十三):Kibana简介

文章目录 Kibana简介 Kibana简介 通过上面的这张图就可以看到,Kibana可以用来展示丰富的图表。 Kibana是一个开源的数据分析和可视化平台,使用Kibana可以用来搜索Elasticsearch中的数据,构建漂亮的可视化图形、以及制作一些好看的仪表盘Kibana是用来管理Elastic stack组件的…