ELK中L的filebeat配置及使用(超详细)

embedded/2024/12/22 20:41:40/

上一次讲解了如何在linux服务器上使用docker配置ELK中的E和K,这期着重讲解一下L怎么配置。

首先L在elk中指的是一个数据处理管道,可以从多种来源收集数据,进行处理和转换,然后将数据发送到 Elasticsearch。L的全称就是:Logstash,Logstsh的功能特别多,提供了大量的输入,过滤和输出插件,但是他的资源占用也比较多。

08_对接kafka案例展示_哔哩哔哩_bilibili

简单说一下什么是Filebeat

​ Filebeat是一种轻量型日志采集器,内置有多种模块(auditd、Apache、NGINX、System、MySQL 等等),可针对常见格式的日志大大简化收集、解析和可视化过程,只需一条命令即可。根据采集的数据形式不同,形成了由多个模块组成的Beats。Beats是开源数据传输程序集,可以将其作为代理安装在服务器上,将操作数据发送给Elasticsearch,或者通过Logstash,在Kibana中可视化数据之前,在Logstash中进一步处理和增强数据。

白话说:Filebeat占用内存小的日志采集器,它的作用是:1、可以代替Logstash将日志直接发送到Elasticsearch.2、可以在Filebeat收集完数据后统一发送给Logstash,然后又Logstash进一步处理数据后再发送给Elasticsearch。

Filebeat特点

(1)轻量型日志采集器,占用资源更少,对机器配置要求极低。

(2)操作简便,可将采集到的日志信息直接发送到ES集群、Logstash、Kafka集群等消息队列中。

(3)异常中断重启后会继续上次停止的位置。(通过${filebeat_home}\data\registry文件来记录日志的偏移量)。

(4)使用压力敏感协议(backpressure-sensitive)来传输数据,在 logstash 忙的时候,Filebeat 会减慢读取-传输速度,一旦 logstash 恢复,则 Filebeat 恢复原来的速度。

(5)Filebeat带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

bin/logstash -e ‘input { stdin{} } output { stdout{} }’

Filebeat与Logstash对比

(1)Filebeat是轻量级数据托运者,您可以在服务器上将其作为代理安装,以将特定类型的操作数据发送到Elasticsearch。与Logstash相比,其占用空间小,使用的系统资源更少。

(2)Logstash具有更大的占用空间,但提供了大量的输入,过滤和输出插件,用于收集,丰富和转换来自各种来源的数据。

(3)Logstash是使用Java编写,插件是使用jruby编写,对机器的资源要求会比较高。在采集日志方面,对CPU、内存上都要比Filebeat高很多。

为什么要先用Filebeat收集再发送到logstash呢?

假设我们有多个节点都需要收集日志数据,如果每一个节点都安装一个Logstash,那么消耗的内存就浪费了,但是如果每个都安装一个占用内存很小的Filebeat,然后将所有的Filebeat收集的数据都发送给统一的一个Logstash的话,那么Logstash只需要下载一遍就可以了。

演示如何安装部署L部分

前言:记得elk中的几个工具都用一样版本的,不同版本可能会出现版本不兼容,看上一篇博客说的,我们继续使用 7.12.1版本的。

上一个服务器也就是ELK中EK都是在ip: 121.196.217.190上的,E在9200端口,K在5601端口。

我要把L也就是收集器就模拟放到一个节点上,放到另外一个服务器上,ip: 8.148.4.43,注意提前关闭防火墙和打开端口访问,

这个服务器使用到的端口是5044端口。

下载Filebeat
# 进入文件夹
cd /www/zzntest/# 下载
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-linux-x86_64.tar.gz# 解压文件
tar -zxvf filebeat-7.12.1-linux-x86_64.tar.gz # 重命名
mv filebeat-7.12.1-linux-x86_64  filebeat

下载好了之后

#进入filebeat
cd filebeat#指定监控日志的输入输出路径
vim filebeat.yml

改下面这两部分内容:意思就是允许从/www/zzntest/log/*.log 路径下的日志输入到filebeat

  # Unique ID among all inputs, an ID is required.id: my-filestream-id# Change to true to enable this input configuration.enabled: true# Paths that should be crawled and fetched. Glob based paths.paths:- /www/zzntest/log/*.log 

此时如果启动filebeat他就会监听/www/zzntest/log/*.log 的日志文件了,但是我们还没有写输出到哪里,所以先不启动,

先测试第一种情况,

1、将日志用filebeat收集后输出到logstash上
下载logstash

下载filebeat差不多

cd /www/zzntest/# 下载
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.12.1-linux-x86_64.tar.gz# 解压文件
tar -zxvf logstash-7.12.1-linux-x86_64.tar.gz# 重命名
mv  logstash-7.12.1 logstash

在logstash里面创建文件夹job编写数据采集文件:filebeat_to_logstash.conf

mkdir job

vim filebeat_to_logstash.conf

#本次采用的数据为数仓项目中的模拟数据
#1562065564549|{"cm":{"ln":"-54.7","sv":"V2.9.8","os":"8.1.0","g":"9NY0AL0L@gmail.com","mid":"m140","nw":"WIFI","l":"pt","vc":"13","hw":"1080*1920","ar":"MX","uid":"u737","t":"1562030978430","la":"-7.4","md":"HTC-14","vn":"1.1.3","ba":"HTC","sr":"M"},"ap":"gmall","et":[{"ett":"1561996979060","en":"display","kv":{"newsid":"n925","action":"1","extend1":"1","place":"4","category":"37"}},{"ett":"1562031053551","en":"newsdetail","kv":{"entry":"3","newsid":"n332","news_staytime":"10","loading_time":"8","action":"4","showtype":"3","category":"11","type1":"433"}},{"ett":"1561986545246","en":"loading","kv":{"extend2":"","loading_time":"7","action":"1","extend1":"","type":"1","type1":"102","loading_way":"1"}},{"ett":"1562053433842","en":"active_foreground","kv":{"access":"1","push_id":"1"}},{"ett":"1562030443443","en":"favorites","kv":{"course_id":2,"id":0,"add_time":"1562049124751","userid":0}}]}
#上面是即将使用的模拟数据,下面就是输入输出使用的端口5044,语言默认“GBK”,过滤器filter就是接收的message使用“|”切分,最后拿到数据转换为json,然后output->stdout即输出到控制台。
input {beats {port=>5044codec=>plain{charset=>"GBK"}}
}
filter {mutate{split=>["message","|"]add_field => {"field1" => "%{[message][0]}"}add_field => {"field2" => "%{[message][1]}"}remove_field => ["message"]}json{source => "field1"target => "field2"}
}
#输出数据到控制台
output{stdout{codec=>rubydebug}
}

这个文件写好之后就回到logstash的目录下启动logstash

bin/logstash -f job/filebeat_to_logstash.conf

启动完之后不要关闭这个窗口,再开一个窗口进入到filebeat文件夹中

进入之后再次打开配置修改 vim filebeat.yml

要修改的内容就是将output部分中的Elasticsearch给注释掉,然后把Logstash需要的部分给打开和输入要传输的目的地。

image-20240928155801503

此时就可以去运行filebeat了, ./filebeat -e 即携带日志运行,

注意一下,此时开了两个页面,窗口1是运行logstash的,窗口2是运行filebeat的,都不要关,去打开窗口3进入到zzntest/log/文件夹下

准备输入日志 echo hello >> 1.log 输入之后,就会看到窗口2是这样的

注意:在这里是输入数据到端口,一定要把端口使用权限打开,不然会报error说连接端口失败。

image-20240928161745547

端口2将数据处理好之后就会发送到5044端口,然后窗口1就会看到刚才输入的hello,但格式虽然和最初的不太符合,但这起码映射成功了。

image-20240928161912300

如果此时把最初准备好的日志内容输入vim test.log

1562065564549|{"cm":{"ln":"-54.7","sv":"V2.9.8","os":"8.1.0","g":"9NY0AL0L@gmail.com","mid":"m140","nw":"WIFI","l":"pt","vc":"13","hw":"1080*1920","ar":"MX","uid":"u737","t":"1562030978430","la":"-7.4","md":"HTC-14","vn":"1.1.3","ba":"HTC","sr":"M"},"ap":"gmall","et":[{"ett":"1561996979060","en":"display","kv":{"newsid":"n925","action":"1","extend1":"1","place":"4","category":"37"}},{"ett":"1562031053551","en":"newsdetail","kv":{"entry":"3","newsid":"n332","news_staytime":"10","loading_time":"8","action":"4","showtype":"3","category":"11","type1":"433"}},{"ett":"1561986545246","en":"loading","kv":{"extend2":"","loading_time":"7","action":"1","extend1":"","type":"1","type1":"102","loading_way":"1"}},{"ett":"1562053433842","en":"active_foreground","kv":{"access":"1","push_id":"1"}},{"ett":"1562030443443","en":"favorites","kv":{"course_id":2,"id":0,"add_time":"1562049124751","userid":0}}]}

在窗户一就可以看到想要的结果了。也就是完成了。

image-20240928162144514

2、将filebeat的内容直接放到es里

如果发送到logstash的话就是不需要考虑什么索引表名什么都不用考虑,但是es是要注意发送到哪一个索引(index)

先去修改Filebeat中的配置,因为我们是日志,就简单做一下分区

把logstash给注释掉,然后打开elasticsearch,和添加下面的索引,大概意思就是:如果输入的日志字段含有WARN字段,就放到索引warning里面就可以。如果包含ERR就放到,,,,索引中另外两个也是同样的作用。

output.elasticsearch:# Array of hosts to connect to.hosts: ["8.148.4.43:9200"]# Protocol - either `http` (default) or `https`.#protocol: "https"# Authentication credentials - either API key or username/password.#api_key: "id:api_key"#username: "elastic"#password: "changeme"#通过判断数据中包含的字符串 可以分流数据#通过语法%{}可以调用元数据信息和特殊信息
#error-8.5.2-2022-12-02indices:- index: "warning"when.contains:message: "WARN"- index: "error-%{[agent.version]}-%{+yyyy.MM.dd}"when.contains:message: "ERR"- index: "info"when.contains:message: "INFO"

修改好之后就打开es和重新运行filebeat。./filebeat -e 然后再用重新的方法在窗口3里输入日志。

输出好之后就打开es所在地,因为之前已经配置好kibana,所以就直接打开kibana的端口,看有没有把数据写进去就行

注意:看上面的配置是把日志文件分成了三个索引存入,在输入日志之前要提前在es里面创建好这三个索引的,这样filebeat才能够成功将日志输入到es中。

121.196.217.190:5601

ik安装

  1. 下载 IK 分词器插件:(很麻烦且速度慢,不建议)
    你可以在本地下载与你的 Elasticsearch 版本对应的 IK 分词器插件文件(.zip 格式)。可以在 IK Analyzer 的 GitHub releases 页面 中找到对应版本。

  2. 创建本地目录
    假设你已经下载了 IK 插件压缩包(例如 elasticsearch-analysis-ik-7.12.1.zip),将其解压缩后命名ik放在一个本地目录中。例如,将其放在 /www/plugins 目录。

    此时因为之前就已经启动了es了,此时需要新添加一个容器卷,所以要把之前的给删掉

    docker rm -f es

    之后去linux的elk目录添加一个plugins的文件夹,等会把它挂载到docker里面

    然后添加这个容器卷重新运行容器

  3. 使用 Docker 启动 Elasticsearch 挂载插件
    在启动 Elasticsearch Docker 容器时,可以将本地的插件目录挂载到容器中的 plugins 目录下。以下是一个示例命令:

    前面是自己的目录路径:后面是不用改的是容器内的路径

    docker run -d --name es \
    --net elk \
    -p 9200:9200 -p 9300:9300 \
    -e "discovery.type=single-node" \
    --privileged=true \
    -v $PWD/elasticsearch/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
    -v $PWD/elasticsearch/data/:/usr/share/elasticsearch/data \
    -v $PWD/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
    elasticsearch:7.12.1
    
    docker run -d --name your_elasticsearch_container \-v /path/to/your/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \-v /path/to/your/data:/usr/share/elasticsearch/data \-v /path/to/your/plugins:/usr/share/elasticsearch/plugins \-e "discovery.type=single-node" \docker.elastic.co/elasticsearch/elasticsearch:7.10.1  
    

    确保将命令中的版本号和路径替换为你实际使用的。

    之后就进入容器后测试一下是否加载(不行的话可以再重启一下: docker rm -f es )

  4. 确认插件是否加载
    启动容器后,你可以进入容器内部,查看插件是否成功加载。执行以下命令:

    docker exec -it your_elasticsearch_container /bin/bash  
    ./bin/elasticsearch-plugin list  
    

    如果 ik 显示在列表中,那么你就成功安装了 IK 分词器。

  5. 验证功能
    启动 Es 后,可以在 Kibana 或使用 curl 来测试 IK 分词器是否正常工作。例如:

    POST /_analyze  
    {  "analyzer": "ik_max_word",  "text": "中文分词器"  
    }  
    

如果一切顺利,你应该能够在 Docker 中通过挂载方式成功安装并使用 IK 分词器。这样做的好处是避免了网络问题导致的插件下载失败。

你会在结果中看到是中文、分词、器 也就是分成了三个词

如果使用的是默认的分词器就会分成中、文、分、词、器五个词了。

ik分词器中的词汇是可以修改的打开挂载的容器卷所在的ik文件夹就可以找到。

image-20241001203042886


http://www.ppmy.cn/embedded/126038.html

相关文章

【华为】静态路由配置

1.配置接入层: LSW1(LSW3同理): vlan batch 10 20 in g0/0/1 port link-type ac port default vlan 10 in g0/0/2 port link-type ac port default vlan 20 in g0/0/24 port link-type tr port tr allow-pass vlan 10 202.配置汇聚层&#xf…

Solon 3.0 新特性:SqlUtils

Solon 3.0 引入了新的 SqlUtils 用于数据库基础操作,SqlUtils 是对 JDBC 较为原始的封装,极为反朴归真。 特性有: 支持事务管理支持多数据源支持流式输出支持批量执行支持存储过程 一、概述 SqlUtils 是一个轻量的数据库操作框架&#xff…

NVIDIA Ampere 架构

全球超强弹性数据中心的核心。 文章目录 前言一、突破性创新1. 第三代 Tensor 核心2. 多实例 GPU (MIG)3. 第三代 NVLink4. 结构化稀疏5. 第二代 RT 核心6. 更聪明、快速的内存二、为规模化部署而优化1. 为各种服务器优化性能2. 统一计算和网络加速3. 密度优化的设计4. 安全部署…

Python:一个列表分割成多个列表

大家在使用Python处理数据时,有时候需要将一个大列表分割成多个小列表,这样可以更加方便地管理和操作数据。执行列表分割的方法主要有使用循环、利用切片操作、借助标准库中的工具如itertools.groupby()以及numpy库的array_split()方法。这些方法各有特点…

从零学编程-C语言-第17天

今天是学习C语言的第17天 时间:2024/10/6 21:16分 使用编译器:vs2019 此贴记录自己的成长 今天学习内容如下 1.自定义类型-结构体 结构体 枚举 联合 //结构体 struct stu {char name[20]; }s1, s2; 这里是全局变量 int main() {struct stu s1,s2 …

简单理解Python代码的重构

版本一: 实现存储用户的名字: remember_me.py #易择365 from pathlib import Path import json username input("请输入你的名字:") path Path(username.json) contents json.dumps(username) path.write_text(contents) print(…

【PostgreSQL】运维篇——定期备份与恢复策略

在数据库管理中,定期备份是确保数据安全性和可恢复性的关键措施。制定一个有效的备份计划可以帮助您在发生数据丢失、损坏或其他灾难性事件时快速恢复数据。 以下是制定定期备份计划的步骤,以及使用pg_dump和其他工具进行数据备份和恢复的详细介绍。 1…

HUAWEI_HCIA_实验指南_Lib1.4_配置通过Telnet登录系统

一、原理概述 Telnet(Telecommunication Network Protocol)起源于ARPANET,是最早的Internet应用之一。 Telnet 通常用在远程登录应用中,以便对本地或远端运行的网络设备进行配置、监控和维护。如网络中有多台设备需要配置和管理,用户无需为每一台设备…