logstash同步数据从kafka到es集群

news/2024/11/20 10:29:28/

背景:需求是这样的,原始文件是txt文件(每天300个文件),最终想要的结果是每天将txt中的数据加载到es中,开始的想法是通过logstash加载数据到es中,但是对logstash不太熟悉,不知道怎么讲程序弄成读取一个txt文件到es中以后,就将这个txt原始文件备份并且删除掉,然后就想到了通过一个中间件来做,Python读取成功一个txt文件,并且加载到kafka中以后,就将这个txt文件备份然后删除掉原始文件。

第一步:向kafka中添加数据,我用Python连接kafka集群,向其中加载数据

# -*- coding: utf-8 -*-import json
import json
import msgpack
from loguru import logger
from kafka import KafkaProducer
from kafka.errors import KafkaErrordef kfk_produce_1():"""发送 json 格式数据:return:"""producer = KafkaProducer(bootstrap_servers='192.168.85.109:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))#logstash-topic-one#producer.send('python_test_topic', {'key': 'value'})producer.send('logstash-topic-one', {'name': 'value'})kfk_produce_1()
执行完的结果,来界面工具上看,显示这样,说明数据已经加载进来了

在这里插入图片描述

第二步:配置logstash,将kafka中的数据加载到es集群中

编写的logstash.conf配置如下;
input{kafka{bootstrap_servers => "192.168.85.109:9092"client_id => "consumer_id"group_id => "consumer_group"auto_offset_reset => "latest"consumer_threads => 1decorate_events => truetopics => ["logstash-topic-one","logstash-topic-two"]}
}
output {if [@metadata][kafka][topic] == "logstash-topic-one" {elasticsearch {hosts => "http://192.168.85.109:9200"index => "kafka-one-data"timeout => 300}}if [@metadata][kafka][topic] == "logstash-topic-two" {elasticsearch {hosts => "http://192.168.85.109:9200"index => "kafka-two-data"timeout => 300}}stdout {}
}

第三步:执行logstash,通过kibana查看数据是否在es集群中,展示如下,则说明配置是正确的

在这里插入图片描述

在这里插入图片描述

问题1:现在发现,name字段是在message下面,如果是多个字段的话,不方便查询,想着怎么讲字段从message中弄出来,修改的配置如下,增加一段这样的代码就OK了

type => "json"codec => json {charset => "UTF-8"}

完整的配置文件logstash.conf代码如下;

input{kafka{bootstrap_servers => "192.168.85.109:9092"client_id => "consumer_id"group_id => "consumer_group"auto_offset_reset => "latest"consumer_threads => 1decorate_events => truetopics => ["logstash-topic-one","logstash-topic-two"]type => "json"codec => json {charset => "UTF-8"}}
}
output {if [@metadata][kafka][topic] == "logstash-topic-one" {elasticsearch {hosts => "http://192.168.85.109:9200"index => "kafka-one-data"timeout => 300}}if [@metadata][kafka][topic] == "logstash-topic-two" {elasticsearch {hosts => "http://192.168.85.109:9200"index => "kafka-two-data"timeout => 300}}stdout {}
}

然后我又造了一个多字段的场景如下;

在这里插入图片描述

我先去logstash中查看日志如下,字段已经分离出来了

{"name" => "value","@version" => "1","type" => "json","@timestamp" => 2023-05-17T06:13:48.825Z
}
{"@version" => "1","type" => "json","@timestamp" => 2023-05-17T06:20:57.729Z,"name" => "令狐冲","age" => "30","height" => "180cm"
}

去kibana中去查询,显示如下,测试成功喽,😄

在这里插入图片描述
在这里插入图片描述

问题2:在查询结果中发现,有些字段是没有用的,看看怎么去掉?

在配置文件中增加一个过滤器就可以解决了

filter { mutate {remove_field => ["@version","@timestamp","type"] # 删除字段}}

然后再去kibana中去查看,就发现这会儿的字段格式非常好看了,😄

在这里插入图片描述

文档后续再继续完善,有好的建议或者问题可以留言交流,😄


http://www.ppmy.cn/news/79390.html

相关文章

【Linux系列P4】Linux需要什么?编辑器?软件包?一文帮你了解掌握 [yum][vim]———基础开发工具篇

前言 大家好,这里是YY的Linux系列part4;本章主要内容面向接触过Linux的老铁,主要内容含【学习yum工具,进行软件安装】【拓展yum源安装】【掌握vim编辑器使用,基本命令】【命令集】【懒人配置文件安装教程】 在下一章节…

openGauss5.0.0单节点安装

创建用户组dbgroup groupadd dbgroup创建用户组dbgroup下的普通用户omm,并设置普通用户omm的密码,密码建议设置为omm123 useradd -g dbgroup omm passwd omm创建安装目录 mkdir -p /usr/local/openGauss修改安装目录的权限 cd /usr/local chown omm:…

【数据结构与算法】- 期末考试

课程链接: 清华大学驭风计划 代码仓库:Victor94-king/MachineLearning: MachineLearning basic introduction (github.com) 驭风计划是由清华大学老师教授的,其分为四门课,包括: 机器学习(张敏教授) , 深度学习(胡晓林教授), 计算…

了解和使用Docker

前提 本文对 Docker 进行全面阐述,详细介绍 Docker 的作用、其基本使用,如常用命令、Dockerfile 的作用及使用、Docker Compose 的作用及使用。常用的基本上都会涉及,其他可以在 Docker 官网进行查漏补缺。 下面还有投票,一起参…

MyBatis-Plus精讲和使用注意事项

🍓 简介:java系列技术分享(👉持续更新中…🔥) 🍓 初衷:一起学习、一起进步、坚持不懈 🍓 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正🙏 🍓 希望这篇文章对你有所帮助,欢…

让input框只输入英文

解决扫码枪在中文输入法时扫码冲突 扫码枪在扫完码时会自动回车,这时如果是中文输入法就会触发输入法联想,再加一个回车,那么input框输入的就成中文了。如果可以控制input框只能输入英文那就好了。css有一个属性(ime-mode&#xf…

draw.io如何绘制带箭头的弧线

好长时间没有写draw.io相关的技巧了。今天再补充一个小技巧。 如何绘制像下图中蓝色的带箭头的弧线? 本来以为这个问题应该很简单,但是在仔细研究了很久之后我发现这个问题并没有想像得那么容易。 众所周知,draw.io中带箭头的线叫作“connect…

网络安全管理员证书有什么用?2023证书怎么考?证书报考条件?

网络安全管理员是做什么工作的呢?现如今,网络高速发展,带动了很多行业的兴起,比如说电商行业,今天已经步入到足不出户即可购物的时代了,当然网络也是一把“双刃剑”,带来了好处的同时&#xff0…