ELK中L的filebeat配置及使用(超详细)

server/2024/10/18 19:04:11/

上一次讲解了如何在linux服务器上使用docker配置ELK中的E和K,这期着重讲解一下L怎么配置。

首先L在elk中指的是一个数据处理管道,可以从多种来源收集数据,进行处理和转换,然后将数据发送到 Elasticsearch。L的全称就是:Logstash,Logstsh的功能特别多,提供了大量的输入,过滤和输出插件,但是他的资源占用也比较多。

08_对接kafka案例展示_哔哩哔哩_bilibili

简单说一下什么是Filebeat

​ Filebeat是一种轻量型日志采集器,内置有多种模块(auditd、Apache、NGINX、System、MySQL 等等),可针对常见格式的日志大大简化收集、解析和可视化过程,只需一条命令即可。根据采集的数据形式不同,形成了由多个模块组成的Beats。Beats是开源数据传输程序集,可以将其作为代理安装在服务器上,将操作数据发送给Elasticsearch,或者通过Logstash,在Kibana中可视化数据之前,在Logstash中进一步处理和增强数据。

白话说:Filebeat占用内存小的日志采集器,它的作用是:1、可以代替Logstash将日志直接发送到Elasticsearch.2、可以在Filebeat收集完数据后统一发送给Logstash,然后又Logstash进一步处理数据后再发送给Elasticsearch。

Filebeat特点

(1)轻量型日志采集器,占用资源更少,对机器配置要求极低。

(2)操作简便,可将采集到的日志信息直接发送到ES集群、Logstash、Kafka集群等消息队列中。

(3)异常中断重启后会继续上次停止的位置。(通过${filebeat_home}\data\registry文件来记录日志的偏移量)。

(4)使用压力敏感协议(backpressure-sensitive)来传输数据,在 logstash 忙的时候,Filebeat 会减慢读取-传输速度,一旦 logstash 恢复,则 Filebeat 恢复原来的速度。

(5)Filebeat带有内部模块(auditd,Apache,Nginx,System和MySQL),可通过一个指定命令来简化通用日志格式的收集,解析和可视化。

bin/logstash -e ‘input { stdin{} } output { stdout{} }’

Filebeat与Logstash对比

(1)Filebeat是轻量级数据托运者,您可以在服务器上将其作为代理安装,以将特定类型的操作数据发送到Elasticsearch。与Logstash相比,其占用空间小,使用的系统资源更少。

(2)Logstash具有更大的占用空间,但提供了大量的输入,过滤和输出插件,用于收集,丰富和转换来自各种来源的数据。

(3)Logstash是使用Java编写,插件是使用jruby编写,对机器的资源要求会比较高。在采集日志方面,对CPU、内存上都要比Filebeat高很多。

为什么要先用Filebeat收集再发送到logstash呢?

假设我们有多个节点都需要收集日志数据,如果每一个节点都安装一个Logstash,那么消耗的内存就浪费了,但是如果每个都安装一个占用内存很小的Filebeat,然后将所有的Filebeat收集的数据都发送给统一的一个Logstash的话,那么Logstash只需要下载一遍就可以了。

演示如何安装部署L部分

前言:记得elk中的几个工具都用一样版本的,不同版本可能会出现版本不兼容,看上一篇博客说的,我们继续使用 7.12.1版本的。

上一个服务器也就是ELK中EK都是在ip: 121.196.217.190上的,E在9200端口,K在5601端口。

我要把L也就是收集器就模拟放到一个节点上,放到另外一个服务器上,ip: 8.148.4.43,注意提前关闭防火墙和打开端口访问,

这个服务器使用到的端口是5044端口。

下载Filebeat
# 进入文件夹
cd /www/zzntest/# 下载
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.12.1-linux-x86_64.tar.gz# 解压文件
tar -zxvf filebeat-7.12.1-linux-x86_64.tar.gz # 重命名
mv filebeat-7.12.1-linux-x86_64  filebeat

下载好了之后

#进入filebeat
cd filebeat#指定监控日志的输入输出路径
vim filebeat.yml

改下面这两部分内容:意思就是允许从/www/zzntest/log/*.log 路径下的日志输入到filebeat

  # Unique ID among all inputs, an ID is required.id: my-filestream-id# Change to true to enable this input configuration.enabled: true# Paths that should be crawled and fetched. Glob based paths.paths:- /www/zzntest/log/*.log 

此时如果启动filebeat他就会监听/www/zzntest/log/*.log 的日志文件了,但是我们还没有写输出到哪里,所以先不启动,

先测试第一种情况,

1、将日志用filebeat收集后输出到logstash上
下载logstash

下载filebeat差不多

cd /www/zzntest/# 下载
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.12.1-linux-x86_64.tar.gz# 解压文件
tar -zxvf logstash-7.12.1-linux-x86_64.tar.gz# 重命名
mv  logstash-7.12.1 logstash

在logstash里面创建文件夹job编写数据采集文件:filebeat_to_logstash.conf

mkdir job

vim filebeat_to_logstash.conf

#本次采用的数据为数仓项目中的模拟数据
#1562065564549|{"cm":{"ln":"-54.7","sv":"V2.9.8","os":"8.1.0","g":"9NY0AL0L@gmail.com","mid":"m140","nw":"WIFI","l":"pt","vc":"13","hw":"1080*1920","ar":"MX","uid":"u737","t":"1562030978430","la":"-7.4","md":"HTC-14","vn":"1.1.3","ba":"HTC","sr":"M"},"ap":"gmall","et":[{"ett":"1561996979060","en":"display","kv":{"newsid":"n925","action":"1","extend1":"1","place":"4","category":"37"}},{"ett":"1562031053551","en":"newsdetail","kv":{"entry":"3","newsid":"n332","news_staytime":"10","loading_time":"8","action":"4","showtype":"3","category":"11","type1":"433"}},{"ett":"1561986545246","en":"loading","kv":{"extend2":"","loading_time":"7","action":"1","extend1":"","type":"1","type1":"102","loading_way":"1"}},{"ett":"1562053433842","en":"active_foreground","kv":{"access":"1","push_id":"1"}},{"ett":"1562030443443","en":"favorites","kv":{"course_id":2,"id":0,"add_time":"1562049124751","userid":0}}]}
#上面是即将使用的模拟数据,下面就是输入输出使用的端口5044,语言默认“GBK”,过滤器filter就是接收的message使用“|”切分,最后拿到数据转换为json,然后output->stdout即输出到控制台。
input {beats {port=>5044codec=>plain{charset=>"GBK"}}
}
filter {mutate{split=>["message","|"]add_field => {"field1" => "%{[message][0]}"}add_field => {"field2" => "%{[message][1]}"}remove_field => ["message"]}json{source => "field1"target => "field2"}
}
#输出数据到控制台
output{stdout{codec=>rubydebug}
}

这个文件写好之后就回到logstash的目录下启动logstash

bin/logstash -f job/filebeat_to_logstash.conf

启动完之后不要关闭这个窗口,再开一个窗口进入到filebeat文件夹中

进入之后再次打开配置修改 vim filebeat.yml

要修改的内容就是将output部分中的Elasticsearch给注释掉,然后把Logstash需要的部分给打开和输入要传输的目的地。

image-20240928155801503

此时就可以去运行filebeat了, ./filebeat -e 即携带日志运行,

注意一下,此时开了两个页面,窗口1是运行logstash的,窗口2是运行filebeat的,都不要关,去打开窗口3进入到zzntest/log/文件夹下

准备输入日志 echo hello >> 1.log 输入之后,就会看到窗口2是这样的

注意:在这里是输入数据到端口,一定要把端口使用权限打开,不然会报error说连接端口失败。

image-20240928161745547

端口2将数据处理好之后就会发送到5044端口,然后窗口1就会看到刚才输入的hello,但格式虽然和最初的不太符合,但这起码映射成功了。

image-20240928161912300

如果此时把最初准备好的日志内容输入vim test.log

1562065564549|{"cm":{"ln":"-54.7","sv":"V2.9.8","os":"8.1.0","g":"9NY0AL0L@gmail.com","mid":"m140","nw":"WIFI","l":"pt","vc":"13","hw":"1080*1920","ar":"MX","uid":"u737","t":"1562030978430","la":"-7.4","md":"HTC-14","vn":"1.1.3","ba":"HTC","sr":"M"},"ap":"gmall","et":[{"ett":"1561996979060","en":"display","kv":{"newsid":"n925","action":"1","extend1":"1","place":"4","category":"37"}},{"ett":"1562031053551","en":"newsdetail","kv":{"entry":"3","newsid":"n332","news_staytime":"10","loading_time":"8","action":"4","showtype":"3","category":"11","type1":"433"}},{"ett":"1561986545246","en":"loading","kv":{"extend2":"","loading_time":"7","action":"1","extend1":"","type":"1","type1":"102","loading_way":"1"}},{"ett":"1562053433842","en":"active_foreground","kv":{"access":"1","push_id":"1"}},{"ett":"1562030443443","en":"favorites","kv":{"course_id":2,"id":0,"add_time":"1562049124751","userid":0}}]}

在窗户一就可以看到想要的结果了。也就是完成了。

image-20240928162144514

2、将filebeat的内容直接放到es里

如果发送到logstash的话就是不需要考虑什么索引表名什么都不用考虑,但是es是要注意发送到哪一个索引(index)

先去修改Filebeat中的配置,因为我们是日志,就简单做一下分区

把logstash给注释掉,然后打开elasticsearch,和添加下面的索引,大概意思就是:如果输入的日志字段含有WARN字段,就放到索引warning里面就可以。如果包含ERR就放到,,,,索引中另外两个也是同样的作用。

output.elasticsearch:# Array of hosts to connect to.hosts: ["8.148.4.43:9200"]# Protocol - either `http` (default) or `https`.#protocol: "https"# Authentication credentials - either API key or username/password.#api_key: "id:api_key"#username: "elastic"#password: "changeme"#通过判断数据中包含的字符串 可以分流数据#通过语法%{}可以调用元数据信息和特殊信息
#error-8.5.2-2022-12-02indices:- index: "warning"when.contains:message: "WARN"- index: "error-%{[agent.version]}-%{+yyyy.MM.dd}"when.contains:message: "ERR"- index: "info"when.contains:message: "INFO"

修改好之后就打开es和重新运行filebeat。./filebeat -e 然后再用重新的方法在窗口3里输入日志。

输出好之后就打开es所在地,因为之前已经配置好kibana,所以就直接打开kibana的端口,看有没有把数据写进去就行

注意:看上面的配置是把日志文件分成了三个索引存入,在输入日志之前要提前在es里面创建好这三个索引的,这样filebeat才能够成功将日志输入到es中。

121.196.217.190:5601

ik安装

  1. 下载 IK 分词器插件:(很麻烦且速度慢,不建议)
    你可以在本地下载与你的 Elasticsearch 版本对应的 IK 分词器插件文件(.zip 格式)。可以在 IK Analyzer 的 GitHub releases 页面 中找到对应版本。

  2. 创建本地目录
    假设你已经下载了 IK 插件压缩包(例如 elasticsearch-analysis-ik-7.12.1.zip),将其解压缩后命名ik放在一个本地目录中。例如,将其放在 /www/plugins 目录。

    此时因为之前就已经启动了es了,此时需要新添加一个容器卷,所以要把之前的给删掉

    docker rm -f es

    之后去linux的elk目录添加一个plugins的文件夹,等会把它挂载到docker里面

    然后添加这个容器卷重新运行容器

  3. 使用 Docker 启动 Elasticsearch 挂载插件
    在启动 Elasticsearch Docker 容器时,可以将本地的插件目录挂载到容器中的 plugins 目录下。以下是一个示例命令:

    前面是自己的目录路径:后面是不用改的是容器内的路径

    docker run -d --name es \
    --net elk \
    -p 9200:9200 -p 9300:9300 \
    -e "discovery.type=single-node" \
    --privileged=true \
    -v $PWD/elasticsearch/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
    -v $PWD/elasticsearch/data/:/usr/share/elasticsearch/data \
    -v $PWD/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
    elasticsearch:7.12.1
    
    docker run -d --name your_elasticsearch_container \-v /path/to/your/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \-v /path/to/your/data:/usr/share/elasticsearch/data \-v /path/to/your/plugins:/usr/share/elasticsearch/plugins \-e "discovery.type=single-node" \docker.elastic.co/elasticsearch/elasticsearch:7.10.1  
    

    确保将命令中的版本号和路径替换为你实际使用的。

    之后就进入容器后测试一下是否加载(不行的话可以再重启一下: docker rm -f es )

  4. 确认插件是否加载
    启动容器后,你可以进入容器内部,查看插件是否成功加载。执行以下命令:

    docker exec -it your_elasticsearch_container /bin/bash  
    ./bin/elasticsearch-plugin list  
    

    如果 ik 显示在列表中,那么你就成功安装了 IK 分词器。

  5. 验证功能
    启动 Es 后,可以在 Kibana 或使用 curl 来测试 IK 分词器是否正常工作。例如:

    POST /_analyze  
    {  "analyzer": "ik_max_word",  "text": "中文分词器"  
    }  
    

如果一切顺利,你应该能够在 Docker 中通过挂载方式成功安装并使用 IK 分词器。这样做的好处是避免了网络问题导致的插件下载失败。

你会在结果中看到是中文、分词、器 也就是分成了三个词

如果使用的是默认的分词器就会分成中、文、分、词、器五个词了。

ik分词器中的词汇是可以修改的打开挂载的容器卷所在的ik文件夹就可以找到。

image-20241001203042886


http://www.ppmy.cn/server/131991.html

相关文章

神经网络反向传播交叉熵 计算损失函数对隐藏层权重w1的梯度

本文是交叉熵损失函数为代表的两层神经网络的反向传播量化求导计算公式中的一个公式,单独拿出来做一下解释说明。 公式 8-18 是反向传播算法中的一个重要步骤,它计算的是损失函数 L L L 对隐藏层的权重 w 1 w_1 w1​ 的梯度。这一步是通过链式法则&…

[网络基础]—深入理解ARP(地址解析协议)详解

🏡作者主页:点击! 🤖网络通信基础TCP/IP专栏:点击! ⏰️创作时间:2024年10月14日15点23分 ARP(地址解析协议)扮演着至关重要的角色。它负责将网络层的IP地址转换为数据…

机器学习_线性回归_线性回归过拟合和欠拟合+正则化线性模型学习总结

线性回归的缺陷--欠拟合和过拟合 欠拟合: 简介 训练集和测试集表现都不怎么样, 模型太简单 产生原因: 学习到的特征太少 改进方法: 1.添加其他特征 组合 泛化 相关性 上下文特征,平台特征等 2.添加多项式特征, 将低次项模型变成高次项模型 过拟合: 简介 原始特征过多,存在嘈杂特…

黑龙江网络安全等级保护办理机制

黑龙江的网络安全等级保护机制根据《网络安全法》和相关法规要求,信息系统按照安全等级从低到高分为五级,分别为一般、重要、非常重要、特别重要和特别敏感。不同等级的信息系统必须实施相应的安全措施,以确保系统免受内外部威胁,…

Linux内核 -- 读写同步机制之序列锁 read_seqbegin 作用与用法

read_seqbegin 函数的作用与用法 read_seqbegin 是 Linux 内核中的一个函数,用于在内核实现轻量级的读写同步机制时读取序列锁(sequence lock)的状态。序列锁是一种适合读多写少的场景的锁机制,允许多个读取者并发读取&#xff0…

第二十五:IP网络层的数据,IP数据报

在数据链路层传输的数据叫帧,帧是数据链路层的传输单元。 那么在IP网络层的数据也有一个叫法IP数据报。 IP数据报 IP数据报首部 数据。 数据是传输层传递过来的报文;IP数据报首部格式如下: IP 报头的最小长度为 20 字节,上图…

WebGl 如何给页面绑定点击事件

在WebGL中给页面绑定点击事件&#xff0c;可以通过为WebGL的绘图上下文所在的<canvas>元素添加事件监听器来实现点击事件的处理。 1. 画布添加点击事件 const ctx document.getElementById(canvas) const gl ctx.getContext(webgl)ctx.onclick function (e) {// 给ca…

SpringSecurity(二)——授权实现

一、授权基本思路 在SpringSecurity中&#xff0c;会使用默认的FilterSecurityInterceptor来进行权限校验。在 FilterSecurityInterceptor中会从SecurityContextHolder获取其中的Authentication&#xff0c;然后获取其中的 权限信息。当前用户是否拥有访问当前资源所需的权限。…