一个简单的kafka 消费者

devtools/2024/9/24 13:15:09/

写一个简单的kafka 消费者

1. 依赖

 		<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2. 消费者

import com.xxxx.npi.module.common.msg.enums.Topic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SslConfigs;import java.util.Arrays;
import java.util.Properties;public class ConsumerTest {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "xxxx.xxxx.xxxx.xxxx:9092");props.put("group.id", "gedigital");props.put("enable.auto.commit", "false");props.put("auto.offset.reset", "earliest");props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 后面安全相关的配置,如果没有,可以不用配置;如果有,必须配props.put("security.protocol", "SASL_SSL");props.put("sasl.mechanism", "SCRAM-SHA-512");props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "e:\\client_truststore.jks"); //Save the certificate (dowload client_truststore.jks) in trust store to local server directoy (only for Java client)props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "passwdconfig");props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';");//注意passwod结尾的分号一定不要漏props.put("ssl.endpoint.identification.algorithm", "");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);String topic = Topic.FBR.getTopic();consumer.subscribe(Arrays.asList(topic));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {System.out.printf("partition= %d, offset = %d, key = %s, value = %s\n", record.partition(), record.offset(), record.key(), record.value());}consumer.commitSync();}}
}

http://www.ppmy.cn/devtools/13175.html

相关文章

Linux 安装 JDK17

1、切换到目录 /usr/local/src cd /usr/local/src 2、下载 wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz 3、解压 tar -zxvf jdk-17_linux-x64_bin.tar.gz 4、添加环境变量 vim /etc/profile 在最后一行添加以下内容&#xff1a; …

学习记录694@java 多个文件zip压缩后下载

实际应用中需要下载多个文件&#xff0c;这个时候最好将这些文件打包成zip&#xff0c;然后再下载。其实非常的简单&#xff0c;只要借助hutool包即可&#xff0c;另外需要对基本的输入输出流了解。 代码 以下代码的基本逻辑是&#xff0c;或者要压缩打包的文件的输入流&…

比特币成长的代价

作者&#xff1a;Jeffrey Tucker&#xff0c;作家和总裁。曾就经济、技术、社会哲学和文化等话题广泛发表演讲。编译&#xff1a;秦晋 2017 年之后参与比特币市场的人遇到了与之前的人不同的操作和理想。如今&#xff0c;没有人会太在意之前的事情&#xff0c;说的是 2010-2016…

抛弃历史兼容对于Windows和X86而言是一项困难的挑战。

在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「 Windows的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#xff01;微软、英特尔和AMD如果能够放…

武汉星起航:深耕亚马逊,一站式孵化新手,开启跨境新篇章

在全球经济一体化的时代背景下&#xff0c;跨境电商行业以其独特的优势&#xff0c;正成为推动经济增长的新引擎。武汉星起航电子商务有限公司&#xff0c;作为一家专注于自营亚马逊跨境电商及亚马逊卖家孵化服务的公司&#xff0c;凭借创始人张振邦先生深厚的电子商务运营经验…

Linux 上清理 SSSD Cache

1. 简介 系统安全服务守护程序 (SSSD) 提供对身份和身份验证提供程序的访问。 基本上&#xff0c;SSSD 不依赖于本地配置的身份验证&#xff0c;而是用于查找其本地缓存。 此缓存中的条目可能来自不同的远程身份提供商&#xff0c;例如 LDAP 目录、FreeIPA 或 Active Director…

【Linux】权限

博主首页&#xff1a; 有趣的中国人 专栏首页&#xff1a; Linux 本篇文章主要讲解 Linux权限 的相关内容 1. Linux权限的概念 什么是权限&#xff1f; 权限就是通过一定的条件&#xff0c;拦住一部分人&#xff0c;给另一部分人权力来访问某一种资源。 例如&#xff1a;你是…

雪花算法改造: 兼容JS截短位数的53bit分布式ID生成器

一、基本介绍 雪花算法是一种生成分布式ID的算法。此种算法由Twitter创建&#xff0c;并应用于推文的ID。 一个SnowFlake有64位&#xff1a; • 符号位&#xff08;1&#xff09; &#xff1a;正数0&#xff0c;负数1。一般生成的ID 都为正数&#xff0c;所以默认为0. • 时…