「Kafka」Kafka基础知识入门介绍(三)
- 一、消息主题
- 1. 创建主题
- 二、生产数据
- 1. 命令行模式
- 2. Java代码模式
- 三、消费数据
- 1. 命令行模式
- 2. Java代码模式
「Kafka」Kafka理论知识解读(一)
「Kafka」Kafka安装和启动(二)
一、消息主题
消息主题(Message Topic)
是Kafka中用于组织和存储消息的基本单元。它类似于一个命名的消息队列,生产者可以向主题发布消息,而消费者可以从主题订阅并接收消息。
每个主题可以分为多个分区(Partitions)
,每个分区可以在不同的服务器上进行复制以提供容错性
。消息被附加到主题的分区中,并根据消息键(Key)进行分配和存储。这种设计允许Kafka在分布式环境中实现高吞吐量和水平扩展。
主题的名称在集群中必须是唯一的
,并且可以根据需求创建任意数量的主题。通常,主题的名称会反映其中包含的消息类型或者业务逻辑。例如,一个电子商务应用可能会创建名为 “orders”、“payments” 和 “shipments” 的主题来存储订单、支付和发货相关的消息。
将不同的消息进行分类,分成不同的主题(Topic)
,然后消息生产者在生成消息时,就会向指定的主题(Topic)中发送。而消息消费者也可以订阅自己感兴趣的主题(Topic)并从中获取消息。
有很多种方式都可以操作Kafka消息中的主题(Topic):命令行、第三方工具、Java API、自动创建。下面将介绍命令行和Java API模式
1. 创建主题
- 创建主题
# Kafka是通过kafka-topics.bat指令文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。
# 调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线。因为参数比较多,为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
# --bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
# --create : 表示对主题的创建操作,是个操作参数,后面无需增加参数值
# --topic : 主题的名称,后面接的参数值一般就是见名知意的字符串名称,类似于java中的字符串类型标识符名称,当然也可以使用数字,只不过最后还是当成数字字符串使用。
# 指令
kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test
- 查询主题
# 查看所有主题
# --list : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值
kafka-topics.bat --bootstrap-server localhost:9092 --list
# 查看某个主题的详细信息
# --describe : 查看主题的详细信息
# --topic : 查询的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test123
- 修改主题
# --alter : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值
# --topic : 修改的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2
- 删除主题
# --delete: 表示对主题的删除操作,是个操作参数,后面无需增加参数值。默认情况下,删除操作是逻辑删除,也就是说数据存储的文件依然存在,但是通过指令查询不出来。如果想要直接删除,需要在server.properties文件中设置参数delete.topic.enable=true
# --topic : 删除的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete
注意:
windows系统中由于权限或进程锁定的问题,删除topic会导致kafka服务节点异常关闭。
二、生产数据
1. 命令行模式
Kafka是通过
kafka-console-producer.bat
文件进行消息生产者操作的。
调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比较多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
--bootstrap-server :
把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。
--topic :
主题的名称,后面接的参数值就是之前已经创建好的主题名称。
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test123
注意:
这里的数据需要回车后,才能真正将数据发送到Kafka服务器。
2. Java代码模式
public class Producer {public static void main(String[] args) {// TODO 配置属性集合Map<String, Object> configMap = new HashMap<>();// TODO 配置属性:Kafka服务器集群地址configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// TODO 创建Kafka生产者对象,建立Kafka连接// 构造对象时,需要传递配置参数KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 准备数据,定义泛型// 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数ProducerRecord<String, String> record = new ProducerRecord<String, String>("test123", "key11111111", "value1111111");// TODO 生产(发送)数据producer.send(record);// TODO 关闭生产者连接producer.close();}
}
三、消费数据
1. 命令行模式
Kafka是通过
kafka-console-consumer.bat
文件进行消息消费者操作的。
调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比较多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
--bootstrap-server :
把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。
--topic :
主题的名称,后面接的参数值就是之前已经创建好的主题名称。其实这个参数并不是必须传递的参数,因为如果不传递这个参数的话,那么消费者会消费所有主题的消息。如果传递这个参数,那么消费者只能消费到指定主题的消息数据。
--from-beginning :
从第一条数据开始消费,无参数值,是一个标记参数。默认情况下,消费者客户端连接上服务器后,是不会消费到连接之前所生产的数据的。也就意味着如果生产者客户端在消费者客户端连接前已经生产了数据,那么这部分数据消费者是无法正常消费到的。所以在实际环境中,应该是先启动消费者客户端,再启动生产者客户端,保证消费数据的完整性。增加参数后,Kafka就会从第一条数据开始消费,保证消息数据的完整性。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
注意:
控制台消费端或许会有乱码,自行解决即可,因为一般不会在控制台获取消息
2. Java代码模式
public class Consumer{public static void main(String[] args) {// TODO 配置属性集合Map<String, Object> configMap = new HashMap<String, Object>();// TODO 配置属性:Kafka集群地址configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// TODO 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// TODO 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// TODO 配置属性: 消费者组configMap.put("group.id", "atguigu");// TODO 配置属性: 自动提交偏移量configMap.put("enable.auto.commit", "true");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);// TODO 消费者订阅指定主题的数据consumer.subscribe(Collections.singletonList("test123"));while (true) {// TODO 每隔100毫秒,抓取一次数据ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(100));// TODO 打印抓取的数据for (ConsumerRecord<String, String> record : records) {System.out.println("K = " + record.key() + ", V = " + record.value());}}}
}