kafka原理剖析及实战演练

news/2025/1/15 18:28:11/

一、消息系统概述

  一)消息系统按消息发送模型分类

  1、peer-to-peer(单播)

  

    特点:
  • 一般基于pull或polling接收消息
  • 发送对队列中的消息被一个而且仅仅一个接收者所接收,即使有多个接收者在同一队列中侦听同一消息
  • 即支持异步“即发即弃”的消息传送方式,也支持同步请求/应答传送方式
  2、发布/订阅(支持单播和多播)

  

    特点:
  • 发布到一个主题的消息,可被多个订阅者所接收
  • 发布/订阅可基于push消费数据,也可基于pull或者polling消费数据
  • 解耦能力比P2P模型更强

  二)消息系统使用场景

  • 解耦 各位系统之间通过消息系统这个统一的接口交换数据,无须了解彼此的存在
  • 冗余 部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险
  • 扩展 消息系统是统一的数据接口,各系统可独立扩展
  • 峰值处理能力 消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求
  • 可恢复性 系统中部分组件失效并不会影响整个系统,它恢复后仍然可从消息系统中获取并处理数据
  • 异步通信 在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候再处理

  三)常用的消息系统对比

  1、RabbitMQ

  Erlang编写,支持多协议 AMQP,XMPP,SMTP,STOMP。 支持负载均衡、数据持久化。同时支持Peer-to-Peer和发布/订阅模式

  使用场景:比较重量级,企业开发中。

  2、Redis

  基于Key-Value对的NoSQL数据库,同时支持MQ功能,可做轻量级队列服务使用。就入队操作而言,Redis对短消息(小于10KB)的性能比RabbitMQ好,长消息的性能比RabbitMQ差。

  3、ZeroMQ

  轻量级,不需要单独的消息服务器或中间件,应用程序本身扮演该角色,Peer-to-Peer。它实质上是一个库,需要开发人员自己组合多种技术,使用复杂度高。

  特点:不支持数据的持久化,很难做到异步发送,做到的是点对点异步缓存。

  4、ActiveMQ

  JMS实现,Peer-to-Peer,支持持久化、 XA事务

  5、Kafka/Jafka

  高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理

  7、MetaQ/RocketMQ

  纯Java实现,发布/订阅消息系统,支持本地事务和XA分布式事务

二、kafka

  http://www.jasongj.com/tags/Kafka/

  一)kafka概述

  1、kafka简介  

  Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

  2、kafka的设计目标
  • 高吞吐量、低延迟:在廉价的商用机器上单机可支持每秒100万条消息的读写
  • 消息持久化、可靠性:所有消息均被持久化到磁盘,无消息丢失,支持消息重放
  • 可扩展性,完全分布式:producer、broker、consumer均支持水平扩展
  • 高并发:支持数千个客户端同时读写
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 同时满足适应在线流处理和离线批处理
  3、kafka2.0.0版本新增改变了哪些功能详细

  kafka2.0.0版本新增改变了哪些功能详细:http://www.aboutyun.com/forum.php?mod=viewthread&tid=24981

  4、kafka的适应场景
  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源
  5、kafka最详细原理总结

  http://www.itkeyword.com/doc/3033455819328241799/kafka-apache-scala

  二)kafka架构

  1、kafka架构图

  

  2、kafka架构组件
  • 话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名;
  • 生产者(Producer):是能够发布消息到话题的任何对象;
  • 服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群;
  • 消费者(Consumer):可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息;
    1、topic

  

  • 逻辑概念:同一个topic的消息可分布在一个或多个节点(broker)上
  • 一个topic包含一个或者多个partition(partition均匀分布在集群中)
  • 每条消息都属于且仅属于一个topic
  • producer发布数据时,必须指定将改消息发布到哪一个topic
  • consumer订阅消息时,也必须指定订阅那个topic的消息
    2、partition

  

  • 物理概念:一个partition只分布在一个broker上(不考虑备份的情况)
  • 一个partition物理上对应一个文件夹
  • 一个partition包含多个segment(线段、部分)
  • 一个segment对应一个文件
  • segment由一个个不可变记录组成
  • 记录只会被append到segment中,不会被单独删除或者修改
  • 清除过期日志时,直接删除一个或多个segment

  kafka的最小物理单位是partition,所以offset是记录在partition中的(segment index中),那么partition是跨机器的,offset的是partiton内管理的。

  kafka 提供两种分配策略 range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。

  3、sync(同步) producer和async(异步) producer
    1、sync producer特点
  • 低延迟
  • 低吞吐率
  • 无数据丢失
    2、async producer特点
  • 高延迟
  • 高吞吐率
  • 可能会有数据丢失 
  4、consumer和partition

  kafka的配置要点:https://yq.aliyun.com/ziliao/417900

  更多内容见:http://www.open-open.com/lib/view/open1434551761926.html

  1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
  2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
  3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
  4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
  5. High-level接口中获取不到数据的时候是会block的

  三)单点版kafka的安装使用

   1、kafka部署方式
  • 在虚拟机上部署kafka
    • 使用kafka带的zookeeper起kafka:适用单独部署kafka
    • 使用单独的zookeeper起kafka:公司架构已有zookeeper
  • 使用docker部署kafka
    • 使用kafka带的zookeeper起kafka:适用单独部署kafka
    • 使用单独的zookeeper起kafka:公司架构已有zookeeper
   2、下载kafka

  kafka下载最新版地址:Apache Kafka

  kafka下载指定版本:Index of /dist/kafka

  zookeeper下载地址:Index of /apache/zookeeper/stable

  下载二进制包

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgzwget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
  3、kafka的目录结构
kafka_2.11-2.0.0
├── bin
├── config
├── libs
├── LICENSE
├── NOTICE
└── site-docs4 directories, 2 files

  kafka配置文件

############################# Server Basics
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true############################# Socket Server Settings
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://kafka01.test.com:9092# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://kafka01.test.com:9092# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics# A comma seperated list of directories under which to store log files
log.dirs=/opt/ytd_data01/kafkanum.partitions=5
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy 
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=24
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824log.retention.check.interval.ms=300000############################# Zookeeper
# root directory for all kafka znodes.
zookeeper.connect=zk01.test.com:2181,zk02.test.com:2181,zk03.test.com:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000############################# Group Coordinator Settings 
group.initial.rebalance.delay.ms=0
  4、四种部署方式
    1、在虚拟上部署

  使用kafka带的zookeeper起kafka

cd /opt/ytd_soft
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
tar xvf kafka_2.11-2.0.0.tgz
cd kafka_2.11-2.0.0
#zookeeper默认是前台启动,让其后台启动使用nohup command &&
#前台启动
#./bin/zookeeper-server-start.sh config/zookeeper.properties
#后台启动
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties &&
ss -lutnp |grep 2181#启动kafka,默认非daemon启动,-daemon启动
./bin/kafka-server-start.sh -daemon config/server.properties
ss -lutnp|grep 9092

  测试kafka是否可用

#创建topic
# cd /opt/ytd_soft/kafka_2.11-2.0.0
# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 3 --replication-factor 1
Created topic "test1".
#查看topic详情
# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test1
Topic:test1    PartitionCount:3    ReplicationFactor:1    Configs:Topic: test1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0Topic: test1    Partition: 1    Leader: 0    Replicas: 0    Isr: 0Topic: test1    Partition: 2    Leader: 0    Replicas: 0    Isr: 0#模拟消费者consumer(kafka之前的版本参数是:bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1  --from-beginning#另外一个窗口,模拟生产者producer
# cd /opt/ytd_soft/kafka_2.11-2.0.0
# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
>11
>111
>222

  使用单独的zookeeper起kafka

  安装启动zookeeper

cd /opt/ytd_soft
wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
tar xvf zookeeper-3.4.12.tar.gz 
cd zookeeper-3.4.12/
bin/zkServer.sh start conf/zoo_sample.cfg 
bin/zkServer.sh status conf/zoo_sample.cfg 
ss -lutnp|grep 2181

  部署kafka,更改kafka配置文件server.properties(若zookeeper是集群或zookeeper不在本机上必须更改),其他步骤和上面的一样

zookeeper.connect=localhost:2181
    2、使用docker部署(自己创建镜像)

  使用kafka带的zookeeper起kafka

  使用kafka带的zookeeper起kafka

   部署zookeeper

jdk和zookeeper创建镜像时下载

FROM centos:7.9RUN cp -rp /etc/yum.repos.d/CentOS-Base.repo{,.bak} && curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repoRUN yum -y install vim lsof wget tar bzip2 unzip vim-enhanced passwd sudo yum-utils hostname net-tools rsync man git make automake cmake patch logrotate python-devel libpng-devel libjpeg-devel pwgen python-pipRUN mkdir /opt/java &&\wget --no-check-certificate --no-cookies --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u191-b12/2787e4a523244c269598db4e85c51e0c/jdk-8u191-linux-x64.tar.gz -P /opt/javaRUN tar zxvf /opt/java/jdk-8u191-linux-x64.tar.gz -C /opt/java &&\JAVA_HOME=/opt/java/jdk1.8.0_191 &&\sed -i "/^PATH/i export JAVA_HOME=$JAVA_HOME" /root/.bash_profile &&\sed -i "s%^PATH.*$%&:$JAVA_HOME/bin%g" /root/.bash_profile &&\source /root/.bash_profileENV ZOOKEEPER_VERSION "3.4.12"RUN mkdir /opt/zookeeper &&\wget http://mirror.olnevhost.net/pub/apache/zookeeper/zookeeper-$ZOOKEEPER_VERSION/zookeeper-$ZOOKEEPER_VERSION.tar.gz -P /opt/zookeeperRUN tar zxvf /opt/zookeeper/zookeeper*.tar.gz -C /opt/zookeeperRUN echo "source /root/.bash_profile" > /opt/zookeeper/start.sh &&\echo "cp /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo_sample.cfg /opt/zookeeper/zookeeper-"$ZOOKEEPER_VERSION"/conf/zoo.cfg" >> /opt/zookeeper/start.sh &&\echo "/opt/zookeeper/zookeeper-$"ZOOKEEPER_VERSION"/bin/zkServer.sh start-foreground" >> /opt/zookeeper/start.shEXPOSE 2181ENTRYPOINT ["sh", "/opt/zookeeper/start.sh"]

 

  部署kafka(jdk和kafka安装包提前下载)

FROM centos:7.9ENV KAFKA_VERSION "2.11-2.0.0"RUN cp -rp /etc/yum.repos.d/CentOS-Base.repo{,.bak} && curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-6.repoRUN yum -y install vim lsof wget tar bzip2 unzip vim-enhanced passwd sudo yum-utils hostname net-tools rsync man git make automake cmake patch logrotate python-devel libpng-devel libjpeg-devel pwgen python-pipRUN mkdir /opt/javaADD jdk-8u191-linux-x64.tar.gz /opt/javaRUN JAVA_HOME=/opt/java/jdk1.8.0_191 &&\sed -i "/^PATH/i export JAVA_HOME=$JAVA_HOME" /root/.bash_profile &&\sed -i "s%^PATH.*$%&:$JAVA_HOME/bin%g" /root/.bash_profile &&\source /root/.bash_profileRUN mkdir /opt/kafkaADD kafka_$KAFKA_VERSION.tgz /opt/kafka RUN sed -i 's/num.partitions.*$/num.partitions=3/g' /opt/kafka/kafka_$KAFKA_VERSION/config/server.propertiesRUN echo "source /root/.bash_profile" > /opt/kafka/start.sh &&\echo "cd /opt/kafka/kafka_"$KAFKA_VERSION >> /opt/kafka/start.sh &&\echo "sed -i 's%zookeeper.connect=.*$%zookeeper.connect=zookeeper:2181%g'  /opt/kafka/kafka_"$KAFKA_VERSION"/config/server.properties" >> /opt/kafka/start.sh &&\echo "bin/kafka-server-start.sh config/server.properties" >> /opt/kafka/start.sh &&\chmod a+x /opt/kafka/start.shEXPOSE 9092ENTRYPOINT ["sh", "/opt/kafka/start.sh"]

  创建镜像

cd /opt/kafka/dockerfile
docker build -t zookeeper:v1.1 -f zookeeper.dockerfile .
docker build -t kafka:v1.0 -f kafka.dockerfile .

  启动kafka和zookeeper容器,先启动zookeeper

docker images|egrep "zookeeper|kafka"docker run -itd  --name  zookeeper -h zookeeper -p 2181:2181 zookeeper:v1.1 /bin/bash
#--link 关联容器
docker run -itd --name kafka -h kafka -p 9092:9092 --link zookeeper kafka:v1.0 /bin/bashss -lutnp|egrep "2181|9092"

  测试kafka的可用性

docker exec -it kafka /bin/bash
source /root/.bash_profile#创建topic
cd /opt/kafka/kafka_2.11-2.0.0
bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test1 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --topic test2 --partitions 3 --replication-factor 1#查看topic详情
bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test2bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test1#起consumer(注意版本差异,有可能参数不同)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1  --from-beginning#单开一个会话,起producer(输入测试数据,看consumer是否全部接收)
docker exec -it kafka /bin/bash
source /root/.bash_profile
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1

  四)kafka 高可用

  参考:http://www.jasongj.com/2015/04/24/KafkaColumn2/#ACK%E5%89%8D%E9%9C%80%E8%A6%81%E4%BF%9D%E8%AF%81%E6%9C%89%E5%A4%9A%E5%B0%91%E4%B8%AA%E5%A4%87%E4%BB%BD

  1、CAP理论(原则或定理)

  CAP理论和BASE理论详细信息见链接:https://www.cnblogs.com/duanxz/p/5229352.html

    1、简述CAP理论

  CAP原则又称CAP定理,指的是在一个分布式系统中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。

  CAP原则是NOSQL数据库的基石。Consistency(一致性)。 Availability(可用性)。Partition tolerance(分区容错性)。

  分布式系统的CAP理论:理论首先把分布式系统中的三个特性进行了如下归纳:

  • 一致性(C):在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)
    • 通过某个节点的写操作结果对后面通过其他节点的读操作可见
    • 若更新数据后,并发访问情况下可立即感知该更新,称为强制一致性
    • 若允许之后部分或者全部感知不到该更新,称为弱一致性
    • 若之后的一段时间(通常该事件不固定)后,一定可以感知该更新,称为最终一致性
  • 可用性(A):在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)
    • 任何一个个没有发生故障的节点,必须在有限的时间内返回合理的结果
  • 分区容忍性(P):以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。
    • 部分节点宕机或者无法与节点通信时,各分区间还可保持分布式系统的功能
    2、CAP理论特征

  CAP理论:分布式系统中,一致性、可用性、分区容忍性最多只可同时满足两个

  一般分区容忍性都要求有保障,因此很多时候在可用性与一致性之间做权衡。

  

   2、一致性方案
     1、master-slave
  • RDBMS的读写分离是典型的master-slave方案
  • 同步复制可保证一致性,但会影响可用性
  • 异步复制可提高可用性,但会降低一致性
    2、WNR
  • 主要用于去中心话(P2P)的分布式系统中。dynamoDB和Cassandra是采用此方案
  • N代表副本数,W代表每次写操作要保证的最少写成功的副本数,R代表每次读至少读取的副本数
  • 当W+R>N时,可保证每次读取的数据至少有一个副本具有最新的更新(大于)
  • 多个写操作的顺序难以保证,可能导致多副本间的写操作书序不一致,dynamo通过向量适中保证最终一致性
    3、paxos及其变种一致性算法(更多使用的是其变种)
  • Google的chubby,zookeeper的Zab,RAFT
  3、replica(复制使用pull)
  • 当某个topic的replication-factor为N且N大于1时,每个partion都会有N个副本(replica)
  • replica的个数小于等于broker数:对每个partition而言每个broker上只会有一个replica,因此broker ID表示replica
  • 所有partition的所有replica默认情况会均匀分布到所有broker上

  

  4、何时commit——ISR

  Kafka HA一致性重要机制ISR:Kafka HA Kafka一致性重要机制之ISR(kafka replica)_1. rerplica.lag.time.max.ms-CSDN博客

  如何处理replica恢复?

  

  五)docker集群版kafka的安装使用

  1、创建zookeeper和kafka镜像

  实验环境是MAC的注意:docker在MAC电脑实现,它不支持MAC通过hosts绑定或IP直接访问kafka集群的

    解决(前提配置文件中写的hostname):1、hosts绑定改成:127.0.0.1 kafka01

       2、连接程序连接地址写成:localhost:9092

  六)kafka如何使用zookeeper

  zookeeper:https://www.cnblogs.com/happy-king/p/9921458.html

  1、配置管理
  2、leader election
  3、服务发现

  七)kafka高性能之道

  1、高效使用磁盘
  • 顺序写磁盘 顺序写磁盘性能高于随机写内存
  • Append Only 数据不更新,无记录级的数据删除(只会整个segment删除)
  • 充分利用Page Cache
    • I/O Scheduler将连续的小块写组装成大块的物理写从而提高性能
    • I/O Scheduler会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
    • 充分利用所有空闲内存(非JVM内存)
      • 应用层cache也会有对应的page cache与之对应,直接使用page cache可增大可用cache
      • 如使用heap内的cache,会增加GC负担
  • 读操作可直接在page cache内进行。如果进程重启,JVM内的cache会失效,但page cache仍然可用
  • 可通过如下参数强制flush,但不建议
    • log.flush.interval.messages=10000
    • log.flush.interval.ms=1000
  • 支持多directory(多使用多drive)
  2、零拷贝
    1、传统模式下的拷贝

    传统模式下的拷贝:数据从文件传输到网络需要4次数据拷贝,4次上下文切换(用户态和内核态)和2次系统调用

File.read(fileDesc, buf, len); 
Socket.send(socket, buf, len);

  

    2、零拷贝

  通过NIO的transferTo/transferFrom调用操作系统的sendfile实现零拷贝。总共发生2次内核数据拷贝(没有CPU参与(或没有用户态的)的拷贝),2次上下文切换和1次系统调用,消除了CPU数据拷贝 

伪代码

  

  

 

  3、批处理和压缩
  • Producer和Consumer均支持批量处理数据,从而减少了网络传输的开销 (少次多量)
  • Producer可将数据压缩后发送给broker,从而减少网络传输代价。目前支持Snappy, Gzip和LZ4压缩 
  4、partition(可以设置成broker数量一致)
  • 通过Partition实现了并行处理和水平扩展
  • Partition是Kafka(包括Kafka Stream)并行处理的最小单位
  • 不同Partition可处于不同的Broker(节点),充分利用多机资源
  • 同一Broker(节点)上的不同Partition可置于不同的Directory,如果节点上有多个Disk Drive,可将不同的Drive对应不同的Directory,从而使Kafka充分利用多Disk Drive的磁盘优势
     
  5、ISR

   ISR(In-Sync Replicas)

  对每个消息都做f+1的备份:以单个消息为进行备份的基本单位,进行可靠性保障

  ISR最核心的思想:以一段时间而非以一个消息为基本单位,进行可靠性保障

    1、ISR实现了可用性和一致性的动态平衡 (会话失效后10秒删除节点)
replica.lag.time.max.ms=10000
    2、ISR可容忍更多的节点失败 
  • Majority Quorum如果要容忍f个节点失败,则至少需要2f+1个节点
  • ISR如果要容忍f个节点失败,至少需要f+1个节点
    3、如何处理Replica Crash 
  • Leader crash后,ISR中的任何replica皆可竞选成为Leader
  • 如果所有replica都crash,可选择让第一个recover的replica或者第一个在ISR中的replica成为leader
  • unclean.leader.election.enable=true 

  八)kafka数据迁移

  kafka 扩容之后迁移topic:https://my.oschina.net/remainsu/blog/1816010

  kafka集群中数据自动迁移:Kafka集群中数据自动迁移-powerful_boy-ChinaUnix博客

  kafka配置:https://www.cnblogs.com/angellst/p/9368493.html


http://www.ppmy.cn/news/1526072.html

相关文章

若依后端正常启动但是uniapp移动端提示后端接口异常

pc端能用模拟器也能正常连接接口,手机端真机调试连不上接口 解决: 1. 先看config.js的 填自己的ip地址 module.exports { // baseUrl: https://vue.ruoyi.vip/prod-api, baseUrl: "http://192.168.101.5:8080", } 2.网络环境问题&#…

探索自动化的魔法:Python中的pyautogui库

文章目录 探索自动化的魔法:Python中的 pyautogui 库背景:为什么选择pyautogui?pyautogui是什么?如何安装pyautogui?五个简单的库函数使用方法场景应用常见Bug及解决方案总结 探索自动化的魔法:Python中的 …

vue+el-table 可输入表格使用上下键进行input框切换

使用上下键进行完工数量这一列的切换 <el-table :data"form.detailList" selection-change"handleChildSelection" ref"bChangeOrderChild" max-height"500"><!-- <el-table-column type"selection" width&quo…

基于ESP32的管道检修机器人:MQTT协议、SLAM技术栈设计流程

一、项目概述 项目目标和用途 随着城市基础设施的发展&#xff0c;管道的检修与维护变得越来越重要。传统的人工检修方式不仅效率低下&#xff0c;还存在安全隐患。本项目旨在设计一款基于ESP32单片机的物联网管道检修机器人&#xff0c;集成先进的传感器、无线通信和人工智能…

微信小程序开发第三课

1 wxml语法 1.1 模版语法 # 1 在页面 xx.js 的 Page() 方法的 data 对象中进行声明定义 # 2 在xx.wxml 中使用 {{}} 包裹&#xff0c;显示数据 # 3 可以显示如下&#xff0c;不能编写js语句或js方法-变量-算数运算-三元运算-逻辑判断# 4 只是单纯通过赋值&#xff0c;js中…

上图为是否色发

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…

Rocky Linux 8通过chrony模块设置时间同步

内网有一台Rocky Linux8.4的服务器&#xff0c;想设置时间同步&#xff0c;发现ntpdate不能用&#xff0c;经查&#xff0c;可使用chrony模块来实现。 查看系统版本 [rootSVN ~]# cat /etc/redhat-release Rocky Linux release 8.4 (Green Obsidian) 修改chrony配置文件 vim…

深入理解 JWT、OAuth2 和 Spring Security 的特性及其相互关系

在现代应用程序中&#xff0c;安全性是至关重要的。随着微服务架构的兴起和对无状态认证的需求增加&#xff0c;JSON Web Token (JWT)、OAuth2 和 Spring Security 已成为实现安全认证和授权的关键技术。本文将详细探讨这三者的特性及其相互关系&#xff0c;帮助你理解如何利用…