Kafka 架构深入介绍 及搭建Filebeat+Kafka+ELK

embedded/2024/9/20 9:18:52/ 标签: kafka, 架构, elk

目录

一        架构深入介绍

(一)Kafka 工作流程及文件存储机制

(二)数据可靠性保证

(三)数据一致性问题

(四)故障问题

(五)ack 应答机制

二      实验搭建Filebeat+Kafka+ELK

(一)实验环境

(二)架构

(三)实验模拟

1,部署 Zookeeper+Kafka 集群

2,66机器部署 Filebeat 

3,  66机器安装httpd

4,部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

5,生产黑屏操作es时查看所有的索引


一        架构深入介绍

(一)Kafka 工作流程及文件存储机制

1,Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的。

2,topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。Producer 生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset。 消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费。

3,由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment 对应两个文件:“.index” 文件和 “.log” 文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,test 这个 topic 有三个分区, 则其对应的文件夹为 test-0、test-1、test-2。

4,ndex 和 log 文件以当前 segment 的第一条消息的 offset 命名。

5,“.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 message 的物理偏移地址。

(二)数据可靠性保证

为保证 producer 发送的数据,能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后, 都需要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则重新发送数据。
 

(三)数据一致性问题

LEO:指的是每个副本最大的 offset; 
HW:指的是消费者能见到的最大的 offset,所有副本中最小的 LEO。
 

(四)故障问题

(1)follower 故障 
follower 发生故障后会被临时踢出 ISR(Leader 维护的一个和 Leader 保持同步的 Follower 集合),待该 follower 恢复后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的部分截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。

(2)leader 故障 
leader 发生故障之后,会从 ISR 中选出一个新的 leader, 之后,为保证多个副本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据。

注:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。 

(五)ack 应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等 ISR 中的 follower 全部接收成功。所以 Kafka 为用户提供了三种可靠性级别,用户根据对可靠性和延迟的要求进行权衡选择。

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

●0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。当broker故障时有可能丢失数据。

●1(默认配置):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果在follower同步成功之前leader故障,那么将会丢失数据。

●-1(或者是all):producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是如果在 follower 同步完成后,broker 发送ack 之前,leader 发生故障,那么会造成数据重复。

三种机制性能依次递减,数据可靠性依次递增。

 注:在 0.11 版本以前的Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。在 0.11 及以后版本的 Kafka,引入了一项重大特性:幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据, Server 端都只会持久化一条。

二      实验搭建Filebeat+Kafka+ELK

(一)实验环境

Node1节点(2C/4G):node1/192.168.217.77                   Elasticsearch  Kibana
Node2节点(2C/4G):node2/192.168.217.88                    Elasticsearch
Apache节点:apache/192.168.217.99                                  Logstash  Apache
Filebeat节点:filebeat/192.168.217.66                                   Filebeat

zookeeper 集群;         192.168.217.22 /44/55

kafka 集群 :               192.168.217.22 /44/55

(二)架构

(三)实验模拟

1,部署 Zookeeper+Kafka 集群

上章有详细介绍

2,66机器部署 Filebeat 

写filebeat 的配置文件:

vim /etc/filebeat/filebeat.yml

重启 filebeat

3,  66机器安装httpd

4,部署 ELK,在 Logstash 组件所在节点上新建一个 Logstash 配置文件

99机器:

代码如下:

input {kafka {bootstrap_servers => "192.168.217.22:9092,192.168.217.44:9092,192.168.217.55:9092"  #kafka集群地址topics  => "httpd"     #拉取的kafka的指定topictype => "httpd_kafka"  #指定 type 字段codec => "json"        #解析json格式的日志数据auto_offset_reset => "latest"  #拉取最近数据,earliest为从头开始拉取decorate_events => true   #传递给elasticsearch的数据额外增加kafka的属性数据}
}output {if "access" in [tags] {elasticsearch {hosts => ["192.168.217.77:9200"]index => "httpd_access-%{+YYYY.MM.dd}"}}if "error" in [tags] {elasticsearch {hosts => ["192.168.217.77:9200"]index => "httpd_error-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }
}

启动logstash

5,生产黑屏操作es时查看所有的索引

去到77 es node1 

curl -X GET "localhost:9200/_cat/indices?v"

三    常见问题报错

 例如如图所示:  在logstash 节点 启动 logstash时,报以下错误:

[root@logstash logstash]/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/kafka.conf ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path //usr/share/logstash/config/log4j2.properties. Using default config which logs to console
16:23:10.325 [LogStash::Runner] FATAL logstash.runner - Logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the "path.data" setting.

解决方法:

ps -aux |grep logstash 找到进程号   然后kill -9 关闭

再重新启动logstash


http://www.ppmy.cn/embedded/7095.html

相关文章

Android活动之Intent

Intent Intent是Android程序中各组件之间进行交互的一种重要方式,它不仅可以指明当前组件想要执行的动作,还可以在不同组件之间传递数据。Intent一般可被用于启动活动、启动服务以及发送广播等场景, 显示intent 第一个参数Context要求提供…

设计模式-抽象工厂模式

作者持续关注WPS二次开发专题系列,持续为大家带来更多有价值的WPS开发技术细节,如果能够帮助到您,请帮忙来个一键三连,更多问题请联系我(QQ:250325397) 目录 定义 特点 使用场景 优缺点 (1) 优点 (2) …

爬虫 | 基于 Python 实现有道翻译工具

Hi,大家好,我是半亩花海。本项目旨在利用 Python 语言实现一个简单的有道翻译工具。有道翻译是一款常用的在线翻译服务,能够实现多种语言的互译,提供高质量的翻译结果。 目录 一、项目功能 二、注意事项 三、代码解析 1. 导入…

5、JVM-G1详解

G1收集器 -XX:UseG1GC G1 (Garbage-First)是一款面向服务器的垃圾收集器,主要针对配备多颗处理器及大容量内存的机器. 以极高概率满足GC停顿时间要求的同时,还具备高吞吐量性能特征. G1将Java堆划分为多个大小相等的独立区域(Region),JVM目标…

SPISPI

#include "stm32f10x.h" void MySPI_W_SS(uint8_t BitValue) //写SS的引脚 { GPIO_WriteBit(GPIOA,GPIO_Pin_4,(BitAction)BitValue);//BitAction表示非0即1 } void MySPI_Init(void) { //开启SPI和GPIO的时钟 RCC_APB2PeriphClockCmd(RCC_APB2Periph…

iOS 动态对UIImageView加载到的图片进行偏移裁剪处理

关键点:图片偏移重绘、图片重绘时机处理 1. 图片偏移重绘问题 原因:UIImageView的现有填充模式已经不能满足应用需求,需自定义偏移量结合填充模式实现效果 方法:按需求样式重绘,设置偏移量比例,偏移阈值&a…

docker 容器数据在盘与盘之间迁移

docker 容器数据在盘与盘之间迁移 1、停止容器systemctl stop docker2、docker目录备份# 备份 切记一定要先备份, /var/lib/docker目录为原docker数据目录, /data/docker_data_bak 为新盘的备份目录 cp -r /var/lib/docker /data/docker_data_bak # 移动…

在Python中使用gmssl包实现SM2加密和解密

1.安装gmssl包 pip install gmssl安装完成后,您可以使用 gmssl 提供的函数来修改 User 类中的 set_password 和 verify_password 方法,以便使用 SM2 加密和解密密码。以下是使用 gmssl 的 User 类示例: import datetime from tortoise.model…

Leetcode215_数组中的第K个最大元素

1.leetcode原题链接:. - 力扣(LeetCode) 2.题目描述 给定整数数组 nums 和整数 k,请返回数组中第 k 个最大的元素。 请注意,你需要找的是数组排序后的第 k 个最大的元素,而不是第 k 个不同的元素。 你必…

ORAN C平面 Section Extension 23

ORAN C平面Section扩展23用于任意symbol模式的调制压缩参数。此section扩展允许为一个或多个“SymPrbPatterns”指定多组“mcScaleReMask、csf和mcScaleOffset”值。“SymPrbPattern”用于指定一组PRB,这些PRB可以跨越使用prbPattern指定的整个PRB范围(频…

Java关键字和API

1 this和super关键字 1.this和super的意义 this:当前对象 在构造器和非静态代码块中,表示正在new的对象 在实例方法中,表示调用当前方法的对象 super:引用父类声明的成员 无论是this和super都是和对象有关的。 2.this和sup…

2024.4.18 Python爬虫复习day06 可视化2

day06_数据可视化 Map_地图 基础地图 知识点: 制作地图步骤:1.导包2.创建对象3.添加数据,设置格式4.设置全局选项5.渲染页面基础示例: # 1.导包 from pyecharts.charts import Map import pyecharts.options as opts# 2.创建对象 map Map() # 3.给对象添加数据,设置格式 …

3D视觉技术如何助力惯性环精准上料

随着制造业的快速发展,对自动化和智能化生产的需求日益增强。特别是在高精度、高效率的生产场景中,传统的上料方式已经难以满足生产需求。而3D视觉技术的出现,为惯性环等复杂工件的精准上料提供了全新的解决方案。 3D视觉技术的基本原理是通过…

4月21敲一篇猜数字游戏,封装函数,void,无限循环,快去体验体验

今天敲一篇猜数字游戏 目录 今天敲一篇猜数字游戏 1.打开先学goto语句: 2.开干: 首次我们学习随机数: 讲解一下: 改用srand; 加入时间变量: 获取时间:哈​编辑 3.我本来想已近够完美了&#xff0…

【R语言】组合图:散点图+箱线图+平滑曲线图+柱状图

用算数运算符轻松组合不同的ggplot图,如图: 具体代码如下: install.packages("devtools")#安装devtools包 devtools::install_github("thomasp85/patchwork")#安装patchwork包 library(ggplot2) library(patchwork) #p1是…

笔试狂刷--Day2(模拟高精度算法)

大家好,我是LvZi,今天带来笔试狂刷--Day2(模拟高精度算法) 一.二进制求和 题目链接:二进制求和 分析: 代码实现: class Solution {public String addBinary(String a, String b) {int c1 a.length() - 1, c2 b.length() - 1, t 0;StringBuffer ret new StringBuffer()…

内网代理技术总结

代理技术就是解决外网和内网的通信问题,例如,我的一个外网主机想要找到另外一个网段下的一个内网主机,理论上是无法找到的。如果我们想要进行通信的话就要使用代理技术。我们可以找到一个与目标内网主机在容易网段下可以通信的外网主机&#…

2024第二十一届五一数学建模C题思路 五一杯建模思路

文章目录 1 赛题思路2 比赛日期和时间3 组织机构4 建模常见问题类型4.1 分类问题4.2 优化问题4.3 预测问题4.4 评价问题 5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 比赛日期和时间 报名截止时间:2024…

用Qt+NetCDF 读取NC文件

用QtNetCDF 读取NC文件_ivqtnc-CSDN博客 基于NetCDF-CXX4 封装&#xff0c;对NC文件进行读取&#xff0c;读取内存放到vector中。 ncBase.h #ifndef __NC_BASE_H__ #define __NC_BASE_H__#include <QVariant> #include <vector> #include <map> using na…

物联网网关

物联网网关类似我们常见的路由器&#xff0c;具有网络接入能力&#xff0c;但是功能比较简单&#xff0c;通讯速率要求不高&#xff0c;看重移动网络接入能力一般只是做数据透传使用&#xff0c;将物联网的数据转发到服务器上&#xff0c;做进一步分析和处理网关是物联网非常重…