尚硅谷大数据技术-Kafka视频教程-笔记01【Kafka 入门】

devtools/2024/10/21 10:00:50/

视频地址:【尚硅谷】Kafka3.x教程(从入门到调优,深入全面)_哔哩哔哩_bilibili

  1. 尚硅谷大数据技术-Kafka视频教程-笔记01【Kafka 入门】
  2. 尚硅谷大数据技术-Kafka视频教程-笔记02【Kafka 外部系统集成】
  3. 尚硅谷大数据技术-Kafka视频教程-笔记03【Kafka 生产调优手册】
  4. 尚硅谷大数据技术-Kafka视频教程-笔记04【Kafka 源码解析】

目录

01_尚硅谷大数据技术之Kafka

第 1 章 Kafka 概述

p001

p002

p003

p004

p005

第 2 章 Kafka 快速入门

p006

p007

p008

p009

第 3 章 Kafka 生产者

p010

p011

p012

p013

p014

第 4 章 Kafka Broker

第 5 章 Kafka 消费者

第 6 章 Kafka-Eagle 监控

第 7 章 Kafka-Kraft 模式


01_尚硅谷大数据技术之Kafka

第 1 章 Kafka 概述

p001

p002

p003

  1. flume:时刻监控数据文件的变化,每产生一条数据日志都能监控的到,并将数据传送到hadoop集群。
  2. kafka:数据量太大,对数据进行缓冲。
  1. 同步处理:时刻处理,一步一步地做完。
  2. 异步处理:先处理核心事务。

p004

消息队列的两种模式:

  1. 点对点模式:
    1. 只产生一个主题的数据;
    2. 数据消费后就删除了。
  2. 发布/订阅模式:
    1. 可以有多个主题的数据;
    2. 数据消费后不删除;
    3. 多个消费者相互独立。

p005

  1. zookeeper:kafka中的一部分数据存储到kafka中,zookeeper帮助kafka存储记录服务器节点运行的状态,zk记录谁是leader。
  2. kafka:数据分区存储。

第 2 章 Kafka 快速入门

p006

  1. Apache Kafka
  2. Apache Kafka

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181
zookeeper.connect=node001:2181,node002:2181,node003:2181/kafka

zk采用目录树进行存储,根目录下有zookeeper节点,不采用node003:2181/kafka方式进行存储的话,kafka的信息就会打散到zookeeper里面去,对kafka集群进行注销或删除的话,需要挨个删除,不利于后续管理。

[atguigu@node001 ~]$ vim /opt/module/kafka/kafka_2.12-3.0.0/config/server.properties 
[atguigu@node001 ~]$ sudo vim /etc/profile.d/my_env.sh
[atguigu@node001 ~]$ source /etc/profile
[atguigu@node001 ~]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
==================== node001 ====================
sending incremental file listsent 47 bytes  received 12 bytes  39.33 bytes/sec
total size is 1,201  speedup is 20.36
==================== node002 ====================
sending incremental file list
my_env.shsent 599 bytes  received 47 bytes  1,292.00 bytes/sec
total size is 1,201  speedup is 1.86
==================== node003 ====================
sending incremental file list
my_env.shsent 599 bytes  received 47 bytes  1,292.00 bytes/sec
total size is 1,201  speedup is 1.86
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ zookeeper.sh start
---------- zookeeper node001 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node002 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node003 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ xcall jps
=============== node001 ===============
4291 QuorumPeerMain
4346 Jps
=============== node002 ===============
3570 QuorumPeerMain
3630 Jps
=============== node003 ===============
3426 QuorumPeerMain
3478 Jps
[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh 
USAGE: bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-server-start.sh -daemon config/server.properties
[atguigu@node001 kafka_2.12-3.0.0]$ jpsall 
================ node001 ================
4817 Jps
4291 QuorumPeerMain
4756 Kafka
================ node002 ================
3570 QuorumPeerMain
3724 Jps
================ node003 ================
3426 QuorumPeerMain
3564 Jps
[atguigu@node001 kafka_2.12-3.0.0]$ 

p007

#!/bin/bashcase $1 in
"start"){for i in node001 node002 node003doecho "--------------- $i Kafka 启动 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-server-start.sh -daemon /opt/module/kafka/kafka_2.12-3.0.0/config/server.properties"done
};;
"stop"){for i in node001 node002 node003doecho "--------------- $i Kafka 停止 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-server-stop.sh "done
};;
"status") {for i in node001 node002 node003doecho "--------------- $i Kafka 状态 ---------------"ssh $i "/opt/module/kafka/kafka_2.12-3.0.0/bin/kafka-topics.sh "done
}
;;
esac

p008

2.2 Kafka 命令行操作

[atguigu@node001 kafka_2.12-3.0.0]$ pwd
/opt/module/kafka/kafka_2.12-3.0.0
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh # 查看操作主题命令参数
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        replica assignment, and/or           configuration for the topic.         
--at-min-isr-partitions                  if set when describing topics, only    show partitions whose isr count is   equal to the configured minimum.     
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  connect to>                              to.                                  
--command-config <String: command        Property file containing configs to be config property file>                    passed to Admin Client. This is used only with --bootstrap-server option  for describing and altering broker   configs.                             
--config <String: name=value>            A topic configuration override for the topic being created or altered. The  following is a list of valid         configurations:                      cleanup.policy                        compression.type                      delete.retention.ms                   file.delete.delay.ms                  flush.messages                        flush.ms                              follower.replication.throttled.       replicas                             index.interval.bytes                  leader.replication.throttled.replicas local.retention.bytes                 local.retention.ms                    max.compaction.lag.ms                 max.message.bytes                     message.downconversion.enable         message.format.version                message.timestamp.difference.max.ms   message.timestamp.type                min.cleanable.dirty.ratio             min.compaction.lag.ms                 min.insync.replicas                   preallocate                           remote.storage.enable                 retention.bytes                       retention.ms                          segment.bytes                         segment.index.bytes                   segment.jitter.ms                     segment.ms                            unclean.leader.election.enable        See the Kafka documentation for full   details on the topic configs. It is  supported only in combination with --create if --bootstrap-server option  is used (the kafka-configs CLI       supports altering topic configs with a --bootstrap-server option).        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   removed for an existing topic (see   the list of configurations under the --config option). Not supported with the --bootstrap-server option.       
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--exclude-internal                       exclude internal topics when running   list or describe command. The        internal topics will be listed by    default                              
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting or    describing topics, the action will   only execute if the topic exists.    
--if-not-exists                          if set when creating topics, the       action will only execute if the      topic does not already exist.        
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic being created or altered (WARNING:   If partitions are increased for a    topic that has a key, the partition  logic or ordering of the messages    will be affected). If not supplied   for create, defaults to the cluster  default.                             
--replica-assignment <String:            A list of manual partition-to-broker   broker_id_for_part1_replica1 :           assignments for the topic being      broker_id_for_part1_replica2 ,           created or altered.                  broker_id_for_part2_replica1 :                                                broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        replication factor>                      partition in the topic being         created. If not supplied, defaults   to the cluster default.              
--topic <String: topic>                  The topic to create, alter, describe   or delete. It also accepts a regular expression, except for --create      option. Put topic name in double     quotes and use the '\' prefix to     escape regular expression symbols; e.g. "test\.topic".                    
--topics-with-overrides                  if set when describing topics, only    show topics that have overridden     configs                              
--unavailable-partitions                 if set when describing topics, only    show partitions whose leader is not  available                            
--under-min-isr-partitions               if set when describing topics, only    show partitions whose isr count is   less than the configured minimum.    
--under-replicated-partitions            if set when describing topics, only    show under replicated partitions     
--version                                Display Kafka version.                 
[atguigu@node001 kafka_2.12-3.0.0]$ 
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  connect to>                              to.--topic <String: topic>                  The topic to create, alter, describe   or delete. It also accepts a regular expression, except for --create      option. Put topic name in double     quotes and use the '\' prefix to     escape regular expression symbols; e.g. "test\.topic".
  1. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --list
  2. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3 # 创建first主题,设置三个副本
  3. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
  4. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --partitions 3
  5. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
  6. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --partitions 1 # 报错,分区只能增加,不能减少!
  7. [atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --alter --replication-factor 2 # 不能通过命令行去修改副本
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --list
__consumer_offsets
__transaction_state
action_topic
appVideo_topic
display_topic
dwd_examination_test_paper
dwd_examination_test_question
dwd_interaction_comment
dwd_interaction_favor_add
dwd_interaction_review
dwd_learn_play
dwd_trade_cart_add
dwd_trade_order_detail
dwd_trade_pay_suc_detail
dwd_traffic_action_log
dwd_traffic_display_log
dwd_traffic_error_log
dwd_traffic_page_log
dwd_traffic_play_pre_process
dwd_traffic_start_log
dwd_traffic_unique_visitor_detail
dwd_traffic_user_jump_detail
dwd_user_user_login
dwd_user_user_register
error_topic
first
maxwell
nifi
nifiOutput
page_topic
start_topic
topic_db
topic_log
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3
Error while executing topic command : Topic 'first' already exists.
[2024-03-04 16:59:58,015] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'first' already exists.(kafka.admin.TopicCommand$)
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --create --partitions 1 --replication-factor 3
Created topic first01.
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first01 --describe
Topic: first01  TopicId: 8_ayAUYdRbODZCeFMBE8Cg PartitionCount: 1       ReplicationFactor: 3    Configs: segment.bytes=1073741824Topic: first01  Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
[atguigu@node001 kafka_2.12-3.0.0]$ 

p009

[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-topics.sh --bootstrap-server node001:9092 --topic first --create --partitions 1 --replication-factor 3 # 创建first主题,设置三个副本

之后,创建生产者,向first主题发送数据,

[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/bin
[atguigu@node001 bin]$ ./kafka-console-producer.sh

[atguigu@node001 ~]$ cd /opt/module/kafka/kafka_2.12-3.0.0/bin
[atguigu@node001 bin]$ ./kafka-console-producer.sh
Missing required option(s) [bootstrap-server]
Option                                   Description                            
------                                   -----------                            
--batch-size <Integer: size>             Number of messages to send in a single batch if they are not being sent     synchronously. (default: 200)        
--bootstrap-server <String: server to    REQUIRED unless --broker-list          connect to>                              (deprecated) is specified. The server(s) to connect to. The broker list   string in the form HOST1:PORT1,HOST2:PORT2.                               
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server     instead; ignored if --bootstrap-     server is specified.  The broker     list string in the form HOST1:PORT1, HOST2:PORT2.                         
--compression-codec [String:             The compression codec: either 'none',  compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.  If specified without value, then it  defaults to 'gzip'                   
--help                                   Print usage information.               
--line-reader <String: reader_class>     The class name of the class to use for reading lines from standard in. By   default each line is read as a       separate message. (default: kafka.   tools.                               ConsoleProducer$LineMessageReader)   
--max-block-ms <Long: max block on       The max time that the producer will    send>                                    block for during a send request      (default: 60000)                     
--max-memory-bytes <Long: total memory   The total memory used by the producer  in bytes>                                to buffer records waiting to be sent to the server. (default: 33554432)   
--max-partition-memory-bytes <Long:      The buffer size allocated for a        memory in bytes per partition>           partition. When records are received which are smaller than this size the producer will attempt to             optimistically group them together   until this size is reached.          (default: 16384)                     
--message-send-max-retries <Integer>     Brokers can fail receiving the message for multiple reasons, and being      unavailable transiently is just one  of them. This property specifies the number of retries before the         producer give up and drop this       message. (default: 3)                
--metadata-expiry-ms <Long: metadata     The period of time in milliseconds     expiration interval>                     after which we force a refresh of    metadata even if we haven't seen any leadership changes. (default: 300000)
--producer-property <String:             A mechanism to pass user-defined       producer_prop>                           properties in the form key=value to  the producer.                        
--producer.config <String: config file>  Producer config properties file. Note  that [producer-property] takes       precedence over this config.         
--property <String: prop>                A mechanism to pass user-defined       properties in the form key=value to  the message reader. This allows      custom configuration for a user-     defined message reader. Default      properties include:                  parse.key=true|false                  key.separator=<key.separator>         ignore.error=true|false               
--request-required-acks <String:         The required acks of the producer      request required acks>                   requests (default: 1)                
--request-timeout-ms <Integer: request   The ack timeout of the producer        timeout ms>                              requests. Value must be non-negative and non-zero (default: 1500)         
--retry-backoff-ms <Integer>             Before each retry, the producer        refreshes the metadata of relevant   topics. Since leader election takes  a bit of time, this property         specifies the amount of time that    the producer waits before refreshing the metadata. (default: 100)         
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.         (default: 102400)                    
--sync                                   If set message send requests to the    brokers are synchronously, one at a  time as they arrive.                 
--timeout <Integer: timeout_ms>          If set and the producer is running in  asynchronous mode, this gives the    maximum amount of time a message     will queue awaiting sufficient batch size. The value is given in ms.      (default: 1000)                      
--topic <String: topic>                  REQUIRED: The topic id to produce      messages to.                         
--version                                Display Kafka version.                 
[atguigu@node001 bin]$ 
[atguigu@node001 kafka_2.12-3.0.0]$ bin/kafka-console-producer.sh --bootstrap-server node001:9092 --topic first01 # 生产者
>hello
>123
[atguigu@node002 kafka_2.12-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server node001:9092 --topic first01 # 消费者
hello
123
--------------------------------------------------
[atguigu@node002 kafka_2.12-3.0.0]$ bin/kafka-console-consumer.sh --bootstrap-server node001:9092 --topic first01 --from-beginning # --from-beginning,把主题中所有的数据都读取出来(包括历史数据)

第 3 章 Kafka 生产者

p010

kafka由三部分组成:生产者、broker、消费者。

3.1.1 发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程 中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

p011

3.2 异步发送 API

3.2.1 普通异步发送

p012

p013

p014

第 4 章 Kafka Broker

第 5 章 Kafka 消费者

第 6 章 Kafka-Eagle 监控

第 7 章 Kafka-Kraft 模式


http://www.ppmy.cn/devtools/103982.html

相关文章

生产环境中变态开启devtools(强制)

写到最前面 首先&#xff0c;你已经下载了google的插件【vue devtools】&#xff0c;不知道怎么下载&#xff0c;留言博主 如果你想看的项目中的vuetools插件打开是这样的 Vue.js is detected on this page. Devtools inspection is not available because it’s in product…

mysql实用系列:coalesce函数的使用

COALESCE 是 SQL 语言中的一个函数&#xff0c;它的作用是返回第一个非空表达式的结果。如果所有的表达式都是空值&#xff08;NULL&#xff09;&#xff0c;则 COALESCE 函数返回 NULL。这个函数常用于处理可能存在 NULL 值的数据列&#xff0c;确保查询结果更加稳定和可预测 …

Android 中ebpf 的集成和调试

1. BPF 简介 BPF&#xff0c;是Berkeley Packet Filter的简称&#xff0c;最初构想提出于 1992 年。是一种网络分流器和数据包过滤器&#xff0c;允许早操作系统级别捕获和过滤计算机网络数据包。它为数据链路层提供了一个原始接口&#xff0c;允许发送和接收原始链路层数据包…

python中代码缩进

是指每行语句开始前的空白区域 用来表示python程序之间的包含和层次关系 类定义、函数定义、流程控制语句以及异常处理语句等行尾的冒号和下一行的缩进表示一个代码块的开始、而缩进结束&#xff0c;则表示一个代码块的结束 通常情况下采用4个空格作为一个缩进量 例如&#…

谷粒商城实战笔记-269~271-商城业务-订单服务-bug修改

文章目录 一&#xff0c;269-商城业务-订单服务-bug修改二&#xff0c;270-商城业务-订单服务-订单确认页渲染三&#xff0c;271-商城业务-订单服务-订单确认页库存查询四&#xff0c;272-商城业务-订单服务-订单确认页模拟运费效果 一&#xff0c;269-商城业务-订单服务-bug修…

系统编程-消息队列

消息队列 目录 消息队列 引入 一、消息队列的特点 二、使用指令查看消息队列 三、使用消息队列进行通信的步骤 1、获取键值 2、创建或获取消息队列 id 3、使用消息队列进行数据的传输 4、msgrcv -- 从消息队列中读取数据 5、消息队列的多种操作函数 引入 -- 进程间…

ChatGPT 3.5/4.0简单使用手册

ChatGPT 3.5/4.0 是一种先进的人工智能聊天机器人&#xff0c;能够理解和生成自然语言文本&#xff0c;为用户提供信息检索、问题解答、语言翻译等服务。 系统要求 操作系统&#xff1a;无特定要求&#xff0c;支持主流操作系统。网络连接&#xff1a;需要稳定的网络连接来使…

计算机网络概述(网络结构)

目录 网络结构组成 1.网络边缘 1.1使用基础设施的面向连接的可靠服务&#xff08;TCP&#xff09; 1.2使用基础设施的无连接的不可靠服务&#xff08;UDP&#xff09; 2.网络核心 2.1电路交换&#xff08;线路交换&#xff09; 2.2分组交换 排队延迟和丢失 2.3网络核心…