「Kafka」Kafka基础知识入门介绍(三)

devtools/2024/11/14 12:30:42/

「Kafka」Kafka基础知识入门介绍(三)

  • 一、消息主题
    • 1. 创建主题
  • 二、生产数据
    • 1. 命令行模式
    • 2. Java代码模式
  • 三、消费数据
    • 1. 命令行模式
    • 2. Java代码模式

「Kafka」Kafka理论知识解读(一)
「Kafka」Kafka安装和启动(二)

一、消息主题

消息主题(Message Topic)是Kafka中用于组织和存储消息的基本单元。它类似于一个命名的消息队列,生产者可以向主题发布消息,而消费者可以从主题订阅并接收消息。

每个主题可以分为多个分区(Partitions)每个分区可以在不同的服务器上进行复制以提供容错性。消息被附加到主题的分区中,并根据消息键(Key)进行分配和存储。这种设计允许Kafka在分布式环境中实现高吞吐量和水平扩展。

主题的名称在集群中必须是唯一的,并且可以根据需求创建任意数量的主题。通常,主题的名称会反映其中包含的消息类型或者业务逻辑。例如,一个电子商务应用可能会创建名为 “orders”、“payments” 和 “shipments” 的主题来存储订单、支付和发货相关的消息。

将不同的消息进行分类,分成不同的主题(Topic),然后消息生产者在生成消息时,就会向指定的主题(Topic)中发送。而消息消费者也可以订阅自己感兴趣的主题(Topic)并从中获取消息。
有很多种方式都可以操作Kafka消息中的主题(Topic):命令行、第三方工具、Java API、自动创建。下面将介绍命令行和Java API模式

1. 创建主题

  • 创建主题

在这里插入图片描述

# Kafka是通过kafka-topics.bat指令文件进行消息主题操作的。其中包含了对主题的查询,创建,删除等功能。
# 调用指令创建主题时,需要传递多个参数,而且参数的前缀为两个横线。因为参数比较多,为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
# --bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开
# --create : 表示对主题的创建操作,是个操作参数,后面无需增加参数值
# --topic : 主题的名称,后面接的参数值一般就是见名知意的字符串名称,类似于java中的字符串类型标识符名称,当然也可以使用数字,只不过最后还是当成数字字符串使用。
# 指令
kafka-topics.bat --bootstrap-server localhost:9092 --create --topic test
  • 查询主题
# 查看所有主题
# --list : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值
kafka-topics.bat --bootstrap-server localhost:9092 --list
# 查看某个主题的详细信息
# --describe : 查看主题的详细信息
# --topic : 查询的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --describe --topic test123

在这里插入图片描述

  • 修改主题
# --alter : 表示对所有主题的查询操作,是个操作参数,后面无需增加参数值
# --topic : 修改的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --alter --partitions 2

在这里插入图片描述

  • 删除主题
# --delete: 表示对主题的删除操作,是个操作参数,后面无需增加参数值。默认情况下,删除操作是逻辑删除,也就是说数据存储的文件依然存在,但是通过指令查询不出来。如果想要直接删除,需要在server.properties文件中设置参数delete.topic.enable=true
# --topic : 删除的主题名称
kafka-topics.bat --bootstrap-server localhost:9092 --topic test --delete

注意:windows系统中由于权限或进程锁定的问题,删除topic会导致kafka服务节点异常关闭。

二、生产数据

1. 命令行模式

Kafka是通过kafka-console-producer.bat文件进行消息生产者操作的。
调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比较多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
--bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。
--topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。

kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test123

注意:这里的数据需要回车后,才能真正将数据发送到Kafka服务器。
在这里插入图片描述

2. Java代码模式

public class Producer {public static void main(String[] args) {// TODO 配置属性集合Map<String, Object> configMap = new HashMap<>();// TODO 配置属性:Kafka服务器集群地址configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// TODO 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// TODO 创建Kafka生产者对象,建立Kafka连接//      构造对象时,需要传递配置参数KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);// TODO 准备数据,定义泛型//      构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数ProducerRecord<String, String> record = new ProducerRecord<String, String>("test123", "key11111111", "value1111111");// TODO 生产(发送)数据producer.send(record);// TODO 关闭生产者连接producer.close();}
}

三、消费数据

1. 命令行模式

Kafka是通过kafka-console-consumer.bat文件进行消息消费者操作的。
调用指令时,需要传递多个参数,而且参数的前缀为两个横线,因为参数比较多。为了演示方便,这里我们只说明必须传递的参数,其他参数后面课程中会进行讲解
--bootstrap-server : 把当前的DOS窗口当成Kafka的客户端,那么进行操作前,就需要连接服务器,这里的参数就表示服务器的连接方式,因为我们在本机启动Kafka服务进程,且Kafka默认端口为9092,所以此处,后面接的参数值为localhost:9092,用空格隔开。早期版本的Kafka也可以通过 --broker-list参数进行连接,当前版本已经不推荐使用了。
--topic : 主题的名称,后面接的参数值就是之前已经创建好的主题名称。其实这个参数并不是必须传递的参数,因为如果不传递这个参数的话,那么消费者会消费所有主题的消息。如果传递这个参数,那么消费者只能消费到指定主题的消息数据。
--from-beginning :从第一条数据开始消费,无参数值,是一个标记参数。默认情况下,消费者客户端连接上服务器后,是不会消费到连接之前所生产的数据的。也就意味着如果生产者客户端在消费者客户端连接前已经生产了数据,那么这部分数据消费者是无法正常消费到的。所以在实际环境中,应该是先启动消费者客户端,再启动生产者客户端,保证消费数据的完整性。增加参数后,Kafka就会从第一条数据开始消费,保证消息数据的完整性。

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

在这里插入图片描述
注意:控制台消费端或许会有乱码,自行解决即可,因为一般不会在控制台获取消息

2. Java代码模式

public class Consumer{public static void main(String[] args) {// TODO 配置属性集合Map<String, Object> configMap = new HashMap<String, Object>();// TODO 配置属性:Kafka集群地址configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");// TODO 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");// TODO 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// TODO 配置属性: 消费者组configMap.put("group.id", "atguigu");// TODO 配置属性: 自动提交偏移量configMap.put("enable.auto.commit", "true");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configMap);// TODO 消费者订阅指定主题的数据consumer.subscribe(Collections.singletonList("test123"));while (true) {// TODO 每隔100毫秒,抓取一次数据ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis(100));// TODO 打印抓取的数据for (ConsumerRecord<String, String> record : records) {System.out.println("K = " + record.key() + ", V = " + record.value());}}}
}

http://www.ppmy.cn/devtools/23836.html

相关文章

数据分析-numpy

数据分析 numpy Numpy 是一个开源的 Python 科学计算库&#xff0c;用于快速处理任意维度的数组。Numpy 支持常见的数组和矩阵操作&#xff0c;对于同样的数值计算任务&#xff0c;使用 NumPy 不仅代码要简洁的多&#xff0c;而且 NumPy 在性能上也远远优于原生 Python&#…

springcloud Ribbon的详解

1、Ribbon是什么 Ribbon是Netflix发布的开源项目&#xff0c;Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端负载均衡的框架。 2、Ribbon能干什么 LB负载均衡(Load Balance)是什么&#xff1f;简单的说就是将用户的请求平摊的分配到多个服务上&#xff0c;从而达…

Postman的安装与汉化(超级详细!!!)

1、下载安装包 链接&#xff1a;百度网盘 请输入提取码 提取码&#xff1a;ywmk --来自百度网盘超级会员V5的分享 下载后的目录如图所示 2、Postman安装 双击目录下的 Postman-win64-9.10.0-Setup.exe 即可自动安装 3、Postman汉化 找到postman的安装目录&#xff0c;然后…

day84 json中实现简单验证码

项目中的问题: 1 修改JS或CSS后,页面不能及时更新? 把浏览器-->"开发者工具"-->"网络"-->选中"禁用缓存" 2 如何把HTML页面转为JSP页面 将jsp页面中的<% page contentType"text/html;charse…

vue +antvX6 根据节点与线,动态设置节点坐标生成流程图

需求 vue2 antvX6完成流程图&#xff0c;但只有节点与线&#xff0c;没有节点的坐标&#xff0c;需要根据节点的顺序显示流程图。 需求&#xff1a; 1.根据数据动态生成对应的节点与线&#xff1b; 2.节点不能重叠&#xff1b; 3.节点与线可拖拽&#xff1b; 4.因为线存在重…

关于前后端一体项目SpringSecurity框架登陆失效,HTTPS重定向登陆页面异常的问题

现有环境是基于SpringBoot 2.6.8&#xff0c;然后是前后台一体化的项目。 安全框架使用的是内置版本的SpringSecurity。 场景&#xff1a;用户登陆&#xff0c;系统重启导致用户的session失效。但前端并没有跳转到对应的登录页&#xff0c;在HTTP的环境下可以正常跳转&#x…

【bug已解决】发生错误,导致虚拟 CPU 进入关闭状态。如果虚拟机外部发生此错误,则可能已导致物理计算机重新启动......

本bug报错已找到原因,并成功解决。 项目场景: vmware安装ubuntu报错。 如下: 发生错误,导致虚拟 CPU 进入关闭状态。如果虚拟机外部发生此错误,则可能已导致物理计算机重新启动。错误配置虚拟机、客户机操作系统中的错误或 VMware Workstation 中的问题都可以导致关闭状…

Stable Diffusion教程:文生图

最近几天AI绘画没有什么大动作&#xff0c;正好有时间总结下Stable Diffusion的一些基础知识&#xff0c;今天就给大家再唠叨一下文生图这个功能&#xff0c;会详细说明其中的各个参数。 文生图是Stable Diffusion的核心功能&#xff0c;它的核心能力就是根据提示词生成相应的…