视频地址:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)_哔哩哔哩_bilibili
目录
01_尚硅谷大数据技术之Kafka
第 1 章 Kafka 概述
p001
p002
p003
p004
p005
第 2 章 Kafka 快速入门
p006
p007
p008
p009
第 3 章 Kafka 生产者
p010
p011
p012
p013
p014
第 4 章 Kafka Broker
第 5 章 Kafka 消费者
第 6 章 Kafka-Eagle 监控
第 7 章 Kafka-Kraft 模式
01_尚硅谷大数据技术之Kafka
第 1 章 Kafka 概述
p001
p002
p003
- flume:时刻监控数据文件的变化,每产生一条数据日志都能监控的到,并将数据传送到hadoop集群。
- kafka:数据量太大,对数据进行缓冲。
- 同步处理:时刻处理,一步一步地做完。
- 异步处理:先处理核心事务。
p004
消息队列的两种模式:
- 点对点模式:
- 只产生一个主题的数据;
- 数据消费后就删除了。
- 发布/订阅模式:
- 可以有多个主题的数据;
- 数据消费后不删除;
- 多个消费者相互独立。
p005
第 2 章 Kafka 快速入门
p006
- Apache Kafka
- Apache Kafka
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181
zookeeper.connect=node001:2181,node002:2181,node003:2181/kafkazk采用目录树进行存储,根目录下有zookeeper节点,不采用node003:2181/kafka方式进行存储的话,kafka的信息就会打散到zookeeper里面去,对kafka集群进行注销或删除的话,需要挨个删除,不利于后续管理。
[atguigu@node001 ~]$ vim /opt/module/kafka/kafka_2.12-3.0.0/config/server.properties
[atguigu@node001 ~]$ sudo vim /etc/profile.d/my_env.sh
[atguigu@node001 ~]$ source /etc/profile
[atguigu@node001 ~]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
==================== node001 ====================
sending incremental file listsent 47 bytes received 12 bytes 39.33 bytes/sec
total size is 1,201 speedup is 20.36
==================== node002 ====================
sending incremental file list
my_env.shsent 599 bytes received 47 bytes 1,292.00 bytes/sec
total size is 1,201 speedup is 1.86
==================== node003 ====================
sending incremental file list
my_env.shsent 599 bytes received 47 bytes 1,292.00 bytes/sec
total size is 1,201 speedup is 1.86
[atguigu@node001 ~]$
[atguigu@node001 ~]$ zookeeper.sh start
---------- zookeeper node001 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node002 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node003 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[atguigu@node001 ~]$
[atguigu@node001 ~]$
[atguigu@node001 ~]$ xcall jps
=============== node001 ===============
4291 QuorumPeerMain
4346 Jps
=============== node002 ===============
3570 QuorumPeerMain
3630 Jps
=============== node003 ===============
3426 QuorumPeerMain
3478 Jps
[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh
USAGE: bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@node001 kafka_2.12-3.0.0]$ jpsall
================ node001 ================
4817 Jps
4291 QuorumPeerMain
4756 Kafka
================ node002 ================
3570 QuorumPeerMain
3724 Jps
================ node003 ================
3426 QuorumPeerMain
3564 Jps
[atguigu@node001 kafka_2.12-3.0.0]$
p007
#!/bin/bashcase $1 in
"start"){for i in node001 node002 node003doecho "--------------- $i Kafka 启动 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-server-start.sh -daemon /opt/module/kafka/kafka_2.12-3.0.0/config/server.properties"done
};;
"stop"){for i in node001 node002 node003doecho "--------------- $i Kafka 停止 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-server-stop.sh "done
};;
"status") {for i in node001 node002 node003doecho "--------------- $i Kafka 状态 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-topics.sh "done
}
;;
esac
p008
2.2 Kafka 命令行操作
[atguigu@node001 kafka_2.12-3.0.0]$ pwd
/opt/module/kafka/kafka_2.12-3.0.0
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh # 查看操作主题命令参数
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions, replica assignment, and/or configuration for the topic.
--at-min-isr-partitions if set when describing topics, only show partitions whose isr count is equal to the configured minimum.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect connect to> to.
--command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs.
--config <String: name=value> A topic configuration override for the topic being created or altered. The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas local.retention.bytes local.retention.ms max.compaction.lag.ms max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate remote.storage.enable retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. It is supported only in combination with --create if --bootstrap-server option is used (the kafka-configs CLI supports altering topic configs with a --bootstrap-server option).
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). Not supported with the --bootstrap-server option.
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--exclude-internal exclude internal topics when running list or describe command. The internal topics will be listed by default
--help Print usage information.
--if-exists if set when altering or deleting or describing topics, the action will only execute if the topic exists.
--if-not-exists if set when creating topics, the action will only execute if the topic does not already exist.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected). If not supplied for create, defaults to the cluster default.
--replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. If not supplied, defaults to the cluster default.
--topic <String: topic> The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the '\' prefix to escape regular expression symbols; e.g. "test\.topic".
--topics-with-overrides if set when describing topics, only show topics that have overridden configs
--unavailable-partitions if set when describing topics, only show partitions whose leader is not available
--under-min-isr-partitions if set when describing topics, only show partitions whose isr count is less than the configured minimum.
--under-replicated-partitions if set when describing topics, only show under replicated partitions
--version Display Kafka version.
[atguigu@node001 kafka_2.12-3.0.0]$
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect connect to> to.--topic <String: topic> The topic to create, alter, describe or delete. It also accepts a regular expression, except for --create option. Put topic name in double quotes and use the '\' prefix to escape regular expression symbols; e.g. "test\.topic".
- [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --list
- [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3 # 创建first主题,设置三个副本
- [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
- [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --partitions 3
- [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
- [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --partitions 1 # 报错,分区只能增加,不能减少!
- [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --replication-factor 2 # 不能通过命令行去修改副本
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --list
__consumer_offsets
__transaction_state
action_topic
appVideo_topic
display_topic
dwd_examination_test_paper
dwd_examination_test_question
dwd_interaction_comment
dwd_interaction_favor_add
dwd_interaction_review
dwd_learn_play
dwd_trade_cart_add
dwd_trade_order_detail
dwd_trade_pay_suc_detail
dwd_traffic_action_log
dwd_traffic_display_log
dwd_traffic_error_log
dwd_traffic_page_log
dwd_traffic_play_pre_process
dwd_traffic_start_log
dwd_traffic_unique_visitor_detail
dwd_traffic_user_jump_detail
dwd_user_user_login
dwd_user_user_register
error_topic
first
maxwell
nifi
nifiOutput
page_topic
start_topic
topic_db
topic_log
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3
Error while executing topic command : Topic 'first' already exists.
[2024-03-04 16:59:58,015] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'first' already exists.(kafka.admin.TopicCommand$)
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --create --partitions 1 --replication-factor 3
Created topic first01.
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
Topic: first01 TopicId: 8_ayAUYdRbODZCeFMBE8Cg PartitionCount: 1 ReplicationFactor: 3 Configs: segment.bytes=1073741824Topic: first01 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
[atguigu@node001 kafka_2.12-3.0.0]$
p009
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3 # 创建first主题,设置三个副本
之后,创建生产者,向first主题发送数据,
[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/bin
[atguigu@node001 bin]$ ./kafka-console-producer.sh
[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/bin
[atguigu@node001 bin]$ ./kafka-console-producer.sh
Missing required option(s) [bootstrap-server]
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single batch if they are not being sent synchronously. (default: 200)
--bootstrap-server <String: server to REQUIRED unless --broker-list connect to> (deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2.
--broker-list <String: broker-list> DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap- server is specified. The broker list string in the form HOST1:PORT1, HOST2:PORT2.
--compression-codec [String: The compression codec: either 'none', compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'. If specified without value, then it defaults to 'gzip'
--help Print usage information.
--line-reader <String: reader_class> The class name of the class to use for reading lines from standard in. By default each line is read as a separate message. (default: kafka. tools. ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on The max time that the producer will send> block for during a send request (default: 60000)
--max-memory-bytes <Long: total memory The total memory used by the producer in bytes> to buffer records waiting to be sent to the server. (default: 33554432)
--max-partition-memory-bytes <Long: The buffer size allocated for a memory in bytes per partition> partition. When records are received which are smaller than this size the producer will attempt to optimistically group them together until this size is reached. (default: 16384)
--message-send-max-retries <Integer> Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retries before the producer give up and drop this message. (default: 3)
--metadata-expiry-ms <Long: metadata The period of time in milliseconds expiration interval> after which we force a refresh of metadata even if we haven't seen any leadership changes. (default: 300000)
--producer-property <String: A mechanism to pass user-defined producer_prop> properties in the form key=value to the producer.
--producer.config <String: config file> Producer config properties file. Note that [producer-property] takes precedence over this config.
--property <String: prop> A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user- defined message reader. Default properties include: parse.key=true|false key.separator=<key.separator> ignore.error=true|false
--request-required-acks <String: The required acks of the producer request required acks> requests (default: 1)
--request-timeout-ms <Integer: request The ack timeout of the producer timeout ms> requests. Value must be non-negative and non-zero (default: 1500)
--retry-backoff-ms <Integer> Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata. (default: 100)
--socket-buffer-size <Integer: size> The size of the tcp RECV size. (default: 102400)
--sync If set message send requests to the brokers are synchronously, one at a time as they arrive.
--timeout <Integer: timeout_ms> If set and the producer is running in asynchronous mode, this gives the maximum amount of time a message will queue awaiting sufficient batch size. The value is given in ms. (default: 1000)
--topic <String: topic> REQUIRED: The topic id to produce messages to.
--version Display Kafka version.
[atguigu@node001 bin]$
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-console-producer.sh --bootstrap-server node001:9092 --topic first01 # 生产者
>hello
>123
[atguigu@node002 kafka_2.12-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server node001:9092 --topic first01 # 消费者
hello
123
--------------------------------------------------
[atguigu@node002 kafka_2.12-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server node001:9092 --topic first01 --from-beginning # --from-beginning,把主题中所有的数据都读取出来(包括历史数据)
第 3 章 Kafka 生产者
p010
kafka由三部分组成:生产者、broker、消费者。
3.1.1 发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
p011
3.2 异步发送 API
3.2.1 普通异步发送