福利:kafka--生产者消费者

news/2024/9/20 2:07:43/ 标签: kafka

kafka是一个分布式流媒体平台,类似于消息队列或企业消息传递系统

案例一:生产者--消费者

1.导入依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency>

2.生产者发送消息

 /*** 生产者*/public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException,InterruptedException {//1.kafka链接配置信息Properties prop = new Properties();//kafka链接地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");//key和value的序列化prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//2.创建kafka生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String,String>(prop);//3.发送消息/*** 第一个参数 :topic* 第二个参数:消息的key* 第三个参数:消息的value*/ProducerRecord<String,String> kvProducerRecord = newProducerRecord<String,String>("topic-first","key-001","hello kafka");//同步发送消息(3)消费者接收消息RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();System.out.println(recordMetadata.offset());//4.关闭消息通道 必须要关闭,否则消息发送不成功producer.close();}}

3.消费者接收消息

/*** 消费者*/public class ConsumerQuickStart {public static void main(String[] args) {//1.kafka的配置信息Properties prop = new Properties();//链接地址prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//key和value的反序列化器prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");//设置消费者组//组一样,则属于生产者--消费者(一对一)prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");//组不一样,则属于生产者--消费者(一对多)//prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//2.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(prop);//3.订阅主题consumer.subscribe(Collections.singletonList("topic-first"));//4.拉取消息while (true) {ConsumerRecords<String, String> consumerRecords =consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> consumerRecord :consumerRecords) {System.out.println(consumerRecord.key());System.out.println(consumerRecord.value());}}}}

总结

  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者都是一个组)只能有一个消费者收到消息 (一对一)

  • 生产者发送消息,多个消费者订阅同一个主题(多个消费者不是一个组)所有消费者都能收到消息 (一对多)


http://www.ppmy.cn/news/1527835.html

相关文章

小琳AI课堂:强化学习初阶

大家好&#xff0c;这里是小琳AI课堂。今天我们来聊聊强化学习&#xff0c;一种让机器通过“实践”学习的方法。&#x1f916; 强化学习&#xff0c;听起来就像是给机器装上了成长的心智。想象一下&#xff0c;有个小机器人在迷宫里探险&#xff0c;它要找到出口。每次尝试走一…

6年前倒闭的机器人独角兽,再次杀入AGV市场

导语 大家好&#xff0c;我是社长&#xff0c;老K。专注分享智能制造和智能仓储物流等内容。 新书《智能物流系统构成与技术实践》 在科技创新的浪潮中&#xff0c;一个曾经辉煌又迅速陨落的企业正悄然重生&#xff0c;引发业界广泛关注。 曾经的协作机器人鼻祖Rethink Robotic…

专业学习|GERT网络概览(学习资源、原理介绍、变体介绍)

一、GERT 网络概览 GERT(Graphical Evaluation Review Technique&#xff0c;图示评审技术)是一种结合流线图理论(Flow Graphical Theory)、矩母函数(Moment Generating Function)、计划评审技术(Program Evaluation Review Technique)解决随机网络问题的方法&#xff0c;描述各…

leetcode41. 缺失的第一个正数,原地哈希表

leetcode41. 缺失的第一个正数 给你一个未排序的整数数组 nums &#xff0c;请你找出其中没有出现的最小的正整数。 请你实现时间复杂度为 O(n) 并且只使用常数级别额外空间的解决方案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,0] 输出&#xff1a;3 解释&#xf…

【高等数学学习记录】数列的极限

【高等数学&学习记录】数列的极限 从事测绘工作多年&#xff0c;深刻感受到基础知识的重要及自身在这方面的短板。 为此&#xff0c;打算重温测绘工作所需基础知识。练好基本功&#xff0c;为测绘工作赋能。 1 知识点 1.1 数列极限的定义 设 { x n } \lbrace x_n \rbrace…

将有序数组——>二叉搜索树

给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵平衡二叉搜索树。 示例 1&#xff1a; 输入&#xff1a;nums [-10,-3,0,5,9] 输出&#xff1a;[0,-3,9,-10,null,5] 解释&#xff1a;[0,-10,5,null,-3,null,9] 也将被视为正确答案…

光芯片版图绘制软件测评

光芯片版图绘制软件测评 正文KLayout优点缺点IPKISS优点缺点PIC Studio优点缺点GDSFactory优点缺点正文 KLayout KLayout 是光芯片版图绘制软件的最底层软件。市面上的大多数版图绘制软件都是围绕这一软件展开的。版图绘制软件最终生成的文件均以 .gds 结尾。 优点 免费可以…

【西电电装实习】6. 手装无人机的蓝牙断连debug

文章目录 前言零、闪灯状态零零、翻滚角&#xff0c;俯仰角&#xff0c;偏航角一、问题描述二、现象解释三、解决方案参考文献 前言 在 西电无人机电装实习 时遇到的问题使用蓝牙芯片 CH582F。沁恒的蓝牙芯片CH582F是一款集成了BLE&#xff08;Bluetooth Low Energy&#xff0…

基于微信小程序的健身房管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、SSM项目源码 系统展示 基于微信小程序JavaSpringBootVueMySQL的健…

Linux上使用touch修改文件时间属性的限制

缘由 在Linux上我想多个进程对于同一个文件进行访问和修改&#xff0c;并且根据文件的最后修改时间来判断时间是否需要更新缓存中的文件&#xff0c;这样能够达到减少每次加载文件时间的损耗。 尝试的做法 每当我修改文件后&#xff0c;为了确保文件的最后修改时间的属性生效…

QUIC的丢包处理

QUIC的重试数据包&#xff08;Retry Packet&#xff09;为什么会触发重启另一个连接 安全性考量 防止重放攻击 重试数据包&#xff08;Retry Packet&#xff09;是在初始握手过程中由服务端发送给客户端&#xff0c;用于验证客户端的IP地址&#xff0c;以防止重放攻击。 在一…

Java 23 的12 个新特性!!

Java 23 来啦&#xff01;和 Java 22 一样&#xff0c;这也是一个非 LTS&#xff08;长期支持&#xff09;版本&#xff0c;Oracle 仅提供六个月的支持。下一个长期支持版是 Java 25&#xff0c;预计明年 9 月份发布。 Java 23 一共有 12 个新特性&#xff01; 有同学表示&…

Spring Boot集成Akka Cluster快速入门Demo

1.什么是Akka Cluster&#xff1f; Akka Cluster将多个JVM连接整合在一起&#xff0c;实现消息地址的透明化和统一化使用管理&#xff0c;集成一体化的消息驱动系统。最终目的是将一个大型程序分割成若干子程序&#xff0c;部署到很多JVM上去实现程序的分布式并行运算&#xf…

Linux服务器及应用环境快速部署、调试、迁移、维护、监控

1. 请解释什么是Linux&#xff1f; Linux是一种开源操作系统&#xff08;Operating System&#xff0c;OS&#xff09;&#xff0c;它最初由Linus Torvalds于1991年创建。自那时起&#xff0c;Linux逐渐发展成为全球最广泛使用的操作系统之一。以下是对Linux的详细解释&#x…

颠覆想象!ReHiFace-S实现实时高保真换脸

颠覆想象&#xff01;ReHiFace-S实现实时高保真换脸 ReHiFace-S&#x1f680;&#xff0c;实时高保真换脸技术&#x1f31f;&#xff0c;开源易用&#x1f4bb;&#xff0c;支持ONNX和摄像头模式&#x1f4f8;&#xff0c;让数字人生成更真实✨&#xff01;体验前沿科技&#…

7. 在Java中集合mysql如何执行一条简单的SELECT查询,并获取结果集?

在Java中&#xff0c;使用JDBC&#xff08;Java Database Connectivity&#xff09;可以执行SQL查询&#xff0c;并获取结果集&#xff08;ResultSet&#xff09;。以下是执行一条简单的SELECT查询&#xff0c;并获取和处理结果集的详细步骤&#xff1a; 1. 导入必要的包 首先…

搭建 PHP

快速搭建 PHP 环境指南 PHP 是一种广泛用于 Web 开发的后端脚本语言&#xff0c;因其灵活性和易用性而受到开发者的青睐。无论是开发个人项目还是企业级应用&#xff0c;PHP 环境的搭建都是一个不可忽视的基础步骤。本指南将带您快速学习如何在不同平台上搭建 PHP 环境&#x…

苹果cms多语言插件,插件配置前端默认语言采集语言等

苹果CMS&#xff08;maccmscn&#xff09;是一款功能强大的内容管理系统&#xff0c;广泛应用于视频网站和其他内容发布平台。为了满足全球用户的需求&#xff0c;苹果CMS支持多语言插件&#xff0c;使得网站能够方便地提供多语言版本。以下是关于苹果CMS多语言插件的详细介绍&…

HarmonyOS Next鸿蒙扫一扫功能实现

直接使用的是华为官方提供的api&#xff0c;封装成一个工具类方便调用。 import { common } from kit.AbilityKit; import { scanBarcode, scanCore } from kit.ScanKit;export namespace ScanUtil {export async function startScan(context: common.Context) : Promise<s…

Doris相关记录

Doris工作整理 Doris索引、分区及物化视图踩坑 Doris向量化引擎理解与Clickhouse对比 Flink写Doris的checkpoint及label问题 Doris新增节点分片数据自动迁移