Kafka应用Demo:按主题订阅消费消息

devtools/2024/9/25 23:19:41/

安装环境

  Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。

  需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
在这里插入图片描述
  advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置

生产者代码样例

public class KafkaProducerService {private static final String NEO_TOPIC = "elon-topic";private KafkaProducer<String, String> producer = null;public KafkaProducerService() {Properties props = new Properties();props.put("bootstrap.servers", "192.168.5.128:9092");props.put("acks", "0");props.put("group.id", "1111");props.put("retries", "2");//设置key和value序列化方式props.put("key.serializer", StringSerializer.class);props.put("value.serializer", StringSerializer.class);//生产者实例producer = new KafkaProducer<>(props);}/*** 外部调用的发消息接口*/public void sendMessage() {for (int i = 0; i < 10; ++i) {int p = i % 2;ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));producer.send(record);}}
}

 发送消息时,将10个数据分别发送到0分区和1分区。

消费者代码样例

public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);private static final String NEO_TOPIC = "elon-topic";Properties properties = new Properties();private KafkaConsumer consumer = null;public KafkaConsumerService() {properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Brokerproperties.put("group.id", "neo1");              // 指定消费组群 IDproperties.put("max.poll.records", "5");properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList(NEO_TOPIC));  // 订阅主题 order-eventsnew Thread(this::receiveMessage).start();}public void receiveMessage() {try {while (true) {synchronized (this) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",record.topic(), record.partition(), record.offset(), record.key(), record.value());LOGGER.info("Received:" + info);Thread.sleep(100);}consumer.commitSync();}}} catch (Exception e){} finally {consumer.close();}}

 消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。

在这里插入图片描述


http://www.ppmy.cn/devtools/37135.html

相关文章

商城系统秒杀功能设计思想

业务特点 1、瞬时并发量大&#xff0c;秒杀时会有大量用户在同一时间进行抢购&#xff0c;瞬时并发访问量突增几倍、甚至几十倍以上 2、库存量少&#xff0c;一般秒杀活动商品量很少&#xff0c;这就导致了只有极少量用户能成功购买到。 3、业务和流程较为常见&#xff0c;一般…

ros安装cartographer

安装 当然是先去看cartograpger官方文档了&#xff0c;照着说明一步步下来。 执行以下语句会报错&#xff0c; wstool merge -t src https://raw.githubusercontent.com/cartographer-project/cartographer_ros/master/cartographer_ros.rosinstall wstool update -t src参看…

Redis(Redis配置和订阅发布)

文章目录 1.Redis配置1.网络配置1.配置文件位置 /etc/redis.conf2.bind&#xff08;注销支持远程访问&#xff09;1.默认情况bind 127.0.0.1 只能接受本机的访问2.首先编辑配置文件3.进入命令模式输入/bind定位&#xff0c;输入n查找下一个&#xff0c;shift n查找上一个&…

机器学习项目实践-基础知识部分

环境建立 我们做项目第一步就是单独创建一个python环境&#xff0c;Python新的隔离环境 创建&#xff1a;python -m venv ml 使用&#xff1a;.\Scripts\activate python -m venv ml 是在创建一个名为 ml 的虚拟环境&#xff0c;这样系统会自动创建一个文件夹ml&#xff0c;…

自学错误合集--项目打包报错,运行报错持续更新中

java后端自学错误总结 一.项目打包报错2.项目打包之后运行报错 二.项目运行报错 一.项目打包报错 javac: &#xfffd;Ҳ&#xfffd;&#xfffd;&#xfffd;&#xfffd;ļ&#xfffd;: E:\xx\xx\xx\docer-xx\src\main\java\xx\xx\xx\xx\xx\xx.java &#xfffd;&#xff…

简易录制视频做3D高斯

系统环境 ubuntu20 &#xff0c;cuda11.8&#xff0c;anaconda配置好了3D高斯的环境。 具体参考3D高斯环境配置&#xff1a;https://blog.csdn.net/Son_of_the_Bronx/article/details/138527329?spm1001.2014.3001.5501 colmap安装&#xff1a;https://blog.csdn.net/Son_of…

Python 全栈体系【四阶】(四十一)

第五章 深度学习 九、图像分割 1. 基本介绍 1.1 什么是图像分割 图像分割&#xff08;Segmentation&#xff09;是图像处理和机器视觉一个重要分支&#xff0c;其目标是精确理解图像场景与内容。图像分割是在像素级别上的分类&#xff0c;属于同一类的像素都要被归为一类&a…

【计组OS】访存过程以及存储层次化结构

苏泽 本专栏纯个人笔记作用 用于记录408 学习的笔记记录&#xff08;敲了两年码实在不习惯手写笔记了&#xff09; 如果能帮助到大家当然最好 但由于是工作后退下来备考 很多说法和想法都会结合实际开发的思想 可能不是那么的纯粹应试哈 希望大家挑选自己喜欢的口味食用…