Logstash笔记

ops/2024/9/22 20:20:24/

目录​​​​​​​

一、简介

二、单个输入和输出插件

三、多个输入和输出插件

四、pipeline结构

五、队列和数据弹性

六、内存队列

七、持久化队列

八、死信队列 (DLQ)

九、输入插件

1)、beats

2)、dead_letter_queue

 3)、elasticsearch 

 4)、file

5)、redis

十、输出插件

1)、elasticsearch

2)、file

 3)、kafka

4)、redis

十一、过滤插件

1)、age

2)、bytes

3)、date

4)、grok


一、简介

Logstash 是一个具有实时管道功能的开源数据收集引擎。 Logstash 可以动态地统一来自不同来源的数据,并将数据规范化到您选择的目的地。

Logstash 管道有两个必需元素(输入和输出)以及一个可选元素(过滤器)。输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目标。
例如:

image.png


最基本的 Logstash 管道:

cd logstash-8.12.2
# 示例中的管道从标准输入 stdin 获取输入,并以结构化格式将该输入移动到标准输出 stdout。
bin/logstash -e 'input { stdin { } } output { stdout {} }'

二、单个输入和输出插件

在创建 Logstash 管道之前,您将配置 Filebeat 以将日志行发送到 Logstash。 Filebeat 客户端是一个轻量级、资源友好型工具,它从服务器上的文件收集日志并将这些日志转发到 Logstash 实例进行处理。 Filebeat 专为可靠性和低延迟而设计。 Filebeat 在主机上的资源占用很少,Beats 输入插件最大限度地减少了 Logstash 实例的资源需求。

安装Filebeat后,需要对其进行配置。打开位于 Filebeat 安装目录中的 filebeat.yml 文件:

#输出logstash中filebeat.inputs:
- type: logpaths:- /path/to/file/logstash-tutorial.log    # Filebeat 处理的一个或多个文件的绝对路径
output.logstash:hosts: ["localhost:5044"]   #输出logstash

创建一个 Logstash 配置管道,该管道使用 Beats 输入插件从 Beats 接收事件,first-pipeline.conf 的文件:

input {beats {port => "5044"   # 使用 Filebeat 将 Apache Web 日志作为输入}
}filter {   # 解析这些日志以从日志中创建特定的命名字段grok {match => { "message" => "%{COMBINEDAPACHELOG}"}}geoip {source => "clientip"}
}
output {elasticsearch {hosts => [ "localhost:9200" ]   # 将解析后的数据写入 Elasticsearch 集群}
}

三、多个输入和输出插件

创建一个 Logstash 管道,该管道从 Twitter 源和 Filebeat 客户端获取输入,然后将信息发送到 Elasticsearch 集群以及将信息直接写入文件。

input {twitter {consumer_key => "enter_your_consumer_key_here"consumer_secret => "enter_your_secret_here"keywords => ["cloud"]oauth_token => "enter_your_access_token_here"oauth_token_secret => "enter_your_access_token_secret_here"}beats {port => "5044"}
}
output {elasticsearch {hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]}file {path => "/path/to/target/file"}
}

四、pipeline结构

input {...
}filter {...
}output {...
}

每个部分都包含一个或多个插件的配置选项。如果指定多个过滤器,它们将按照它们在配置文件中出现的顺序应用。如果指定多个输出,事件将按照它们在配置文件中出现的顺序按顺序发送到每个目标。

五、队列和数据弹性

默认情况下,Logstash 在管道阶段(输入 → 管道工作线程)之间使用内存中的有界队列来缓冲事件如果 Logstash 遇到临时机器故障,内存队列的内容将会丢失。(临时机器故障是指 Logstash 或其主机异常终止但能够重新启动的情况。)为了防止数据丢失并确保事件在管道中不间断地流动,Logstash 提供了数据弹性功能。

  • 持久队列 (PQ) 通过将事件存储在磁盘上的内部队列中来防止数据丢失。
  • 死信队列 (DLQ) 为 Logstash 无法处理的事件提供磁盘存储,以便您可以评估它们。您可以使用 dead_letter_queue 输入插件轻松地重新处理死信队列中的事件。

默认情况下,这些弹性功能处于禁用状态。要打开这些功能,您必须在 Logstash 设置文件中显式启用它们。

六、内存队列

内存队列大小不是直接配置的。相反,这取决于您如何调整 Logstash。

其上限由 pipeline.workers (默认值:CPU 数量)乘以 pipeline.batch.size (默认值:125)事件定义。该值称为“飞行计数”,确定每个内存队列中可以保存的最大事件数。

当队列已满时,Logstash 对输入施加反压,以阻止数据流入 Logstash。

七、持久化队列

Logstash 持久队列通过将运行中的消息队列存储到磁盘来帮助防止异常终止期间的数据丢失。

  • 有助于防止正常关闭期间和 Logstash 异常终止期间消息丢失。如果在事件正在进行时 Logstash 重新启动,Logstash 会尝试传送存储在持久队列中的消息,直到传送至少成功一次。
  • 可以吸收突发事件,而不需要 Redis 或 Apache Kafka 等外部缓冲机制。

默认情况下禁用持久队列。

持久队列不能解决以下问题:

  • 不使用请求-响应协议的输入插件无法防止数据丢失。 Tcp、udp、zeromq 推+拉和许多其他输入没有向发送方确认收到的机制。 (beats和http等插件确实具有确认功能,因此受到该队列的良好保护。)
  • 如果在提交检查点文件之前发生异常关闭,则可能会丢失数据。
  • 持久队列不处理永久性机器故障,例如磁盘损坏、磁盘故障和机器丢失。保留到磁盘的数据不会被复制。

八、死信队列 (DLQ)

死信队列(DLQ)被设计为临时写入无法处理的事件的地方。 DLQ 使您能够灵活地调查有问题的事件,而不会阻塞管道或丢失事件。可以使用 dead_letter_queue 输入插件处理来自 DLQ 的事件。

默认情况下,当 Logstash 遇到由于数据包含映射错误或其他问题而无法处理的事件时,Logstash 管道会挂起或删除不成功的事件。为了防止这种情况下的数据丢失,您可以配置 Logstash 将不成功的事件写入死信队列而不是丢弃它们。

默认情况下禁用死信队列。要启用死信队列,请在logstash.yml设置文件中设置dead_letter_queue_enable选项:

dead_letter_queue.enable: true

当您准备好处理死信队列中的事件时,您可以创建一个使用 dead_letter_queue 输入插件从死信队列中读取事件的管道。

input {dead_letter_queue {path => "/path/to/data/dead_letter_queue"   # 包含死信队列的顶级目录的路径commit_offsets => true   # 当 true 时,保存偏移量。当管道重新启动时,它将继续从上次停止的位置读取,而不是重新处理队列中的所有项目pipeline_id => "main"   # 写入死信队列的管道的 ID。默认为“main”clean_consumed => true   # 删除已完全消耗的段,从而在处理时释放空间start_timestamp => "2017-06-06T23:40:37"  # 使用 start_timestamp 选项在队列中的特定点开始处理事件}
}output {stdout {codec => rubydebug { metadata => true }   # 将事件(包括元数据)写入标准输出}
}

九、输入插件

1)、beats

在端口 5044 上侦听传入的 Beats 连接并为 Elasticsearch 建立索引:

input {beats {port => 5044}
}output {elasticsearch {hosts => ["http://localhost:9200"]index => "%{[@metadata][beat]}-%{[@metadata][version]}"  # %{[@metadata][beat]} 将索引名称的第一部分设置为元数据字段的值,%{[@metadata][version]} 将第二部分设置为 Beat 版本。例如:metricbeat-6.1.6。}
}

2)、dead_letter_queue

用于从 Logstash 的死信队列中读取事件: 

input {dead_letter_queue {path => "/var/logstash/data/dead_letter_queue"start_timestamp => "2017-04-04T23:40:37"}
}

 3)、elasticsearch 

input {# Read all documents from Elasticsearch matching the given queryelasticsearch {hosts => "localhost"query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'}}

 4)、file

input {file {path => "/path/log/*.log"}
}

5)、redis

input {redis {# Redis的key的类型。值可以是以下任意值:list、channel、pattern_channeldata_type => "list"# Redis 数据库编号db => 5# Redis的数据库地址,默认为127.0.0.1host => "10.0.0.10"# Redis的端口号port => 6379# Redis的认证密码password => "admin"# Redis 列表或通道的名称key => "logstash-input-redis"}
}

十、输出插件

输出插件将事件数据发送到特定目的地。输出是事件管道的最后阶段。

1)、elasticsearch

使用 mutate 过滤器和条件来添加 [@metadata] 字段来设置每个事件的目标索引:

filter {if [log_type] in [ "test", "staging" ] {mutate { add_field => { "[@metadata][target_index]" => "test-%{+YYYY.MM}" } }} else if [log_type] == "production" {mutate { add_field => { "[@metadata][target_index]" => "prod-%{+YYYY.MM.dd}" } }} else {mutate { add_field => { "[@metadata][target_index]" => "unknown-%{+YYYY}" } }}}output {elasticsearch {index => "%{[@metadata][target_index]}"}}

2)、file

output {file {path => ...codec => line { format => "custom format: %{message}"}}
}

 3)、kafka

output {kafka {codec => jsontopic_id => "mytopic"   # 生成消息的主题}}

4)、redis

output {redis {# 指定写入数据的类型data_type => "list"# Redis 数据库编号db => 5# Redis主机地址host => "10.0.0.10"# Redis的端口号port => 6379# Redis的认证密码password => "admin"# Redis写入的key值key => "logstash-input-redis"}
}

十一、过滤插件

过滤器插件对事件执行中间处理。通常根据事件的特征有条件地应用过滤器。

1)、age

用于计算事件年龄的简单过滤器。
此过滤器通过从当前时间戳减去事件时间戳来计算事件的年龄。您可以将此插件与删除过滤器插件一起使用来删除早于某个阈值的 Logstash 事件。 

filter {age {add_field => {   # 一次添加多个字段"foo_%{somefield}" => "Hello world, from %{host}""new_field" => "new_static_value"}}if [@metadata][age] > 86400 {drop {}}
}

2)、bytes

将计算机存储大小的字符串表示形式(例如“123 MB”或“5.6gb”)解析为其以字节为单位的数值。

filter {bytes {source => "my_bytes_string_field"   # 包含存储大小的源字段的名称target => "my_bytes_numeric_field"  # 将包含存储大小(以字节为单位)的目标字段的名称}}

3)、date

日期过滤器用于解析字段中的日期,然后使用该日期或时间戳作为事件的 Logstash 时间戳。 

 filter {date {match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]}}

4)、grok

解析任意文本并对其进行结构化。Grok 是将非结构化日志数据解析为结构化且可查询的数据的好方法。

虚构的 http 请求日志:

55.3.244.1 GET /index.html 15824 0.043
filter {grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }}}

 在 grok 过滤器之后,事件中将有一些额外的字段:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

http://www.ppmy.cn/ops/44841.html

相关文章

JavaWeb基础(一)-IO操作

Java I/O工作机制: 注:简要笔记,示例代码可能较少,甚至没有。 1、Java 的 I/O 类库的基本架构。 ​ Java 的 I/O 操作类在包 java.io 下,大概有将近80个类,这些类大概可以分为如下四组。 基于字节操作的…

文本生成流程图 泰酷啦 Excalidraw Mermaid Obsidian

前言 介绍一个很酷的工具,Mermaid to Excalidraw 。作用是用代码生成流程图。 Mermaid 是一款强大的、轻量级的文本到图表的转换工具,它允许用户使用简单的Markdown风格的语法编写文本描述,然后通过JavaScript引擎将其转换成美观的图表。Mer…

【LeetCode】438.找到字符串中所有字母异位词

找到字符串中所有字母异位词 题目描述: 给定两个字符串 s 和 p,找到 s 中所有 p 的 异位词 的子串,返回这些子串的起始索引。不考虑答案输出的顺序。 异位词 指由相同字母重排列形成的字符串(包括相同的字符串)。 示…

包和final

一.什么是包? 包就是文件夹,用来管理各种不同功能的Java类,方便后期代码维护。 二.包名的规则 公司域名反写包的作用,需要全部英文小写,见名知意。 com.xxx.domain domain:这个包是干什么的。 三.使用其他类的规则 1.使用同一个包中的类时,不需要导…

vmware中Ubuntu虚拟机和本地电脑Win10互相ping通

初始状态 使用vmware17版本安装的Ubuntu的20版本,安装之后什么配置都要不懂,然后进行下述配置。 初始的时候是NAT,没动的. 设置 点击右键编辑“属性” 常规选择“启用”: 高级选择全部: 打开网络配置,右键属…

数据库设计实例---学习数据库最重要的应用之一

一、引言【可忽略】 在学习“数据库系统概述”这门课程时,我一直很好奇,这样一门必修课,究竟教会了我什么呢? 由于下课后,,没有拓展自己的眼界,上课时又局限于课堂上老师的讲课水平,…

风萧萧兮易水寒,壮士一去兮不复还 的 rm 命令

风萧萧兮易水寒,壮士一去兮不复还 的 rm 命令 风萧萧兮易水寒,壮士一去兮不复还 的 rm语法几个示例/bin/rm Argument list too long – Linux”配合find与xargs完成删除海量文件使用find的delete选项 快速删除大文件 风萧萧兮易水寒,壮士一去…

【数据结构】探索树中的奇妙世界

专栏介绍: 哈喽大家好,我是野生的编程萌新,首先感谢大家的观看。数据结构的学习者大多有这样的想法:数据结构很重要,一定要学好,但数据结构比较抽象,有些算法理解起来很困难,学的很累…