2、zookeeper和kafka

embedded/2025/1/8 20:40:43/

zookeeper

zookeeper基本知识

zookeeper:开源的分布式框架协调服务

zookeeper的工作机制

基于观察者模式设计的分布式结构,复制存储和管理架构当中的元信息,架构当中的应用接受观察者的监控,一旦数据有变化,通知对于的zookeeper,保存变化的信息。

zookeeper特点

1、最少要有3台服务器,一个领导者(leader),多个跟随者(follower)。

2、zookeeper要有半数以上的节点存货,整个架构就可以正常工作,所有都是奇数台部署

3、全局数据一致

4、数据更新的原子性,要么都成功,要么都失败。

5、实时性

zookeeper的数据结构

1、统一的命名服务,不是以IP来记录,可以用域名也可以是主机名来记录信息。

2、统一配置管理,所有的节点信息的配置要是一致的。

3、统一的集群管理,在整个分布式的环境中,必须实时的掌握每个节点的状态,如果状态发生变化,要及时更新。

zookeeper安装实操

这里我们使用三台服务器完成zookeeper架构,leader和follow是随机选举的。

实验架构如下:

zw6:192.168.254.16

tomcat:192.168.254.21

mysql:192.168.254.33

1、三台服务器都解压zookeeper包并安装java依赖环境

tar -xf apache-zookeeper-3.5.7-bin.tar.gz

apt -y install openjdk-8-jre-headless

2、三台服务器都创建配置文件zoo.cfg

3、三台服务器修改配置文件zoo.cfg

定义zookeeper的数据目录和日志目录

  • server.1:指的是服务器的序号
  • 3188:leader和follower之间交换信息的通信端口
  • 3288:选举的端口

4、三台服务器都创建数据目录和日志目录并赋权

5、三台服务器分别在/data/目录下,创建myid文件,并且里面输入分别是1、2、3,和我们配置文件里面1、2、3对应。

6、三台主机都启动zookeeper并查看状态,这里我们mysql3服务器是leader,两外两台都是follow

kafka

消息队列

kafka消息队列:服务端向客户端发送一个指令,客户端收到指令并且通过这个指令反馈到服务端,完成一个异步方式的通信的流程。

kafka消息队列:处理大数据场景非常合适

rabbitMQ消息队列:处理小数据场景合适

activeMQ消息队列:处理小数据场景合适

消息队列的应用场景

1、异步处理:用于用户的短信验证码,邮件通知。

2、系统解耦:用于微服务架构中的服务之间通信。可以降低各个组件之间的依赖程度(耦合度),可以提高组件的灵活性和可维护性。

3、负载均衡:用于高并发系统的任务处理。消息队列把多个任务分发到多个消费者实列,如电商平台的订单系统。

4、流量控制和限流:根据api请求来进行处理。通过控制消息的生产速度和消费者的处理速度来完成限流。

  • 端口:应用和应用之间通信
  • api接口:应用程序内部各个组件之间的通信的方式

5、任务调度和定时任务:消息队列可以定时的进行任务调度,按照消费者的计划生产对于的数据

6、数据同步和分发:用于日志收集和数据收集。可以远程的实现数据的统一管理。

7、实时数据处理

8、备份和恢复

消息队列的模式

1、点对点:一对一,生产者生产消息,消费者消费消息,这个是一对一的。

2、发布/订阅模式:消息的生产者发布一个主题,其他的消费者订阅这个主题,从而实现一对多。

  • 主题:topic

kafka的组件名称

1、主题 topic:主题是kafka数据流的一个基本的单元,类似于数据的管道,生产者将消息发布到主题,其他消费者订阅主题来消费消息,主题可以被分区,分区有偏移量。

2、生产者:将消息写入主题和分区

3、消费者:从主题和分区中接受发布的消息,一个消费者可以订阅多个主题。

4、分区:一个主题可以被分为多个分区,每个分区都是数据的有序的子集,分区越多,消费者消费的速度越快,可以避免生产者的消费堆积。分区当中有偏移量,按照偏移量进行有序存储,消费者可以独立的读写每个分区的数据。

  • 如何读取分区的数据,一般是不考虑的。只有在消息出现堆积的时候,会考虑扩展分区数。
  • 堆积:消费者没有及时处理掉生产者发布的消息,导致消费堆积

kafka的消费堆积出现如何处理:

1、扩展分区数

2、偏移量:消息在分区当中的唯一标识,根据偏移量指定消费者获取消息的位置。

3、经纪人zookeeper:存储分区的消息,kafka集群的元数据。

kafka数据流向

kafka安装实操

这里我们依然使用上面zookeeper实验的三台服务器完成kafka实操

kafka非常占内存,可以在安装使用前清理下内存,echo 3 > /proc/sys/vm/drop_caches

1、三台服务器解压软件包后移动至/usr/local/kafka

2、三台服务端修改kafka配置文件server.properties

首先配置三台服务器不一样的地方

  • #全局唯一编号,每一台服务器都不能相同,这里我们定0、1、2即可

  • #地址分别对应自己主机的IP地址

接下里都是一样的配置

3、三台服务器添加kafka全局变量

4、三台服务器一起启动kafka

./kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

看端口可以看出我们kafka服务已启,并且推选了mysql3这个主机为主

kafka补充应用

创建主题

kafka-topics.sh --create --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --replication-factor 2 --partitions 3 --topic test1

  • bootstrap-server:这里的地址一般是一个集群当中的地址即可,约定是全写。
  • replication-factor 2:定义主题的副本数,2个副本,一般2、4个,副本是偶数。
  • partitions 3:定义分区数,一般3、6个

查询当前集群中的主题数

./kafka-topics.sh --list --bootstrap-server 192.168.233.61:9092

发布消息

kafka-console-producer.sh --broker-list 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1

消费消息

kafka-console-consumer.sh --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1 --from-beginning

修改主题分区数

查看分区

Leader:每一个分区都有一个Leader,领导者负责处理分区的读写操作

Replicas:副本数,0  1  2 分别对象每一个Leader

Isr:标识和哪个Leader进行同步

Partition:分区的编号

修改分区数

kafka-topics.sh --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --alter --topic test1 --partitions 6

删除主题

kafka-topics.sh --delete --bootstrap-server 192.168.233.61:9092,192.168.233.62:9092,192.168.233.63:9092 --topic test1

elk+filebeat+kafka实操

数据流向如下

这里我们结合之前ELK+filebeat实验和上面kafka实验,具体架构如下

es1:zw4,192.168.254.14

es2:zw5,192.168.254.15

logstash、kibana:mysql1,192.168.254.31

filebeat:mysql2,192.168.254.32

kafka:zw6,192.168.254.16

kafka:tomcat,192.168.254.21

kafka:mysql,192.168.254.33

1、修改filebeat配置文件 /usr/local/filebeat/filebeat.yml,更换下标签,区分下之前做的实验,具体修改如下

2、创建logstash的日志收集文件/etc/logstash/conf.d/kafka.sh

  • codec => "json":指定数据的格式是json
  • auto_offser_reset => "latest":latest,从尾部开始;earliest的话就是从头开始拉取
  • decorate_events => true:传递给es的数据额外的附加kafka的属性数据

3、启动和验证

启动logstash:logstash -f kafka.conf --path.data /opt/testa &

打开kafka集群的消费集群

kafka-topics.sh --bootstrap-server 192.168.254.16:9092,192.168.254.21:9092,192.168.254.33:9092 --topic nginx_mysql --from-beginning

启动filebeat:./filebeat -e -c filebeat.yml

显示filebeat与kafka集群连接成功

最后我们登录kibana,显示日志收集成功


http://www.ppmy.cn/embedded/152374.html

相关文章

使用宝塔面板,安装 Nginx、MySQL 和 Node.js

使用ssh远程链接服务器 在完成使用ssh远程链接服务器后 可使用宝塔面板,安装 Nginx、MySQL 和 Node.js 宝塔网站 一、远程链接服务器 二、根据服务器系统安装宝塔 wget -O install.sh https://download.bt.cn/install/install_lts.sh && sudo bash inst…

『SQLite』SELECT语句查询数据

通过 SQLite 的 SELECT 语法从指定数据库表中查询数据。 查询数据 SELECT 语句查询数据。 查询案例 sqlite_master 表简介 该表是 SQLite 数据库用于存储创建表、索引等的一张表。 注意 上述内容详讲见文章:SQLite 的 SELECT 操作(内含案例&#xff…

ES_如何设置ElasticSearch 8.0版本的匿名访问以及https_http模式的互相切换

总结: 设置匿名访问,只需要设置xpack.security.authc.anonymous.username和xpack.security.authc.anonymous.roles参数就行,设置好后,可以匿名访问也可以非匿名访问,但是非匿名访问的情况下必须保证用户名和密码正确 取…

用户界面的UML建模10

非正常的可视反馈可伴随着同步事件发生,而同步事件可由系统动作产生。但是,可以分别对它们进行建模。 在下节中将对这些特殊的事件依次进行论述。 6.1 异常处理建模 异常,由Meyer 定义[16],其作为运行时事件(run-time events&a…

mysql性能测试优化

1.压测工具 MySQL 压测与结果分析_数据库压测报告-CSDN博客 thds: 30 tps: 1145.99 qps: 22943.36 (r/w/o: 16063.50/4585.07/2294.79) lat (ms,95%): 29.72 err/s: 0.00 reconn/s: 0.00 2.代码压测 DROP TABLE IF EXISTS user_info; CREATE TABLE user_info (id bigint(20…

Kubernetes Gateway API-4-TCPRoute和GRPCRoute

1 TCPRoute 目前 TCP routing 还处于实验阶段。 Gateway API 被设计为与多个协议一起工作,TCPRoute 就是这样一个允许管理TCP流量的路由。 在这个例子中,我们有一个 Gateway 资源和两个 TCPRoute 资源,它们按照以下规则分配流量&#xff1…

【Vaadin flow 实战】第3讲-快速上手构建VaadinFlow+Springboot的全栈web项目

快速构建VaadinFlowSpringboot的全栈web项目 温馨提示,本文讲解比较精炼,主要以快速上手开发为主。 官方提供了与本文类似的教程讲解,地址https://vaadin.com/docs/latest/getting-started 1访问vaadin官方提供的start网站(类似于 spring i…

003__系统共享工具、服务器的使用

[ 基本难度系数 ]:★★★★☆ 零、各种共享工具、服务器的说明 一、Vmware-tools工具安装和共享文件夹的设置 (1)、Vmware-tools工具安装 // 前提说明: 在没有安装此工具之前,VMware软件里的ubuntu系统和外面的Windows系统两者没有相互连通的(比如&a…