kafka 安装快速入门

news/2024/9/22 21:22:49/

直接上干货,我们公司最近要进行消息推送指定软件kafka,直接走起。

1.下载

        kafka 是apache的项目。下载地址:kafka.apache.org/

点击download kafka 进入查看相关版本进行下载。

我这里用的版本比窘旧一点,公司技术一切求稳。

下载好安装包就已经实现了。

2.安装

 说安装就是免安装版本,简单配置一下就可以使用了

直接解压就可以,但是需要我们修改一些配置文件。

kafka 集成了 zookeeper 的软件包,不需要安装,配置就好。

1):配置zookeeper.properties

进入到config文件夹里面,找到zookeeper.properties文件,进行编辑,找到dataDir,修改这个参数的路径为:

dataDir=D:\kafka\kafka_2.13-3.2.1\zookeeper-data (安装目录\zookeeper-data)

2):配置server.properties

进入到config文件夹里面,找到server.properties文件,进行编辑,找到log.dirs,修改这个参数的路径为:

log.dirs=D:\kafka\kafka_2.13-3.2.1\kafka-logs (安装目录\kefke-log)

完成了配置,现在就可以启动了

3.启动

1):启动zookeeper

 打开cmd,进入kafka安装目录,执行:

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

2):启动kafka

再打开一个cmd,进入kafka安装目录,执行:

.\bin\windows\kafka-server-start.bat .\config\server.properties

完成启动就可以访问了,但是我们访问有端口限制,所以我又进行了端口修改

kafka默认端口号:9092  我这里给它改为8081 方便测试访问。

4.配置端口(可以不修改)

config下

1、service.properties (添加)                               port = 8081    不指定的话,按照默认9092

2、connect-distributed.properties (集群)            bootstrap.servers=localhost:8081

3、producer.properties (生产者)                         bootstrap.servers=localhost:8081

4、connect-standalone.properties (单机)            bootstrap.servers=localhost:8081

5、consumer.properties (消费者)                        bootstrap.servers=localhost:8081

根据情况配置端口

5.编码

maven坐标

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.1</version></dependency>

1)生产者代码

package com.java;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class MyKafKaProducer extends Thread{KafkaProducer<Integer, String> producer;String topic; // 主题public MyKafKaProducer(String topic) {// 构建连接配置Properties properties = new Properties();// 若要配多个服务器,用逗号隔开// 注:服务器要开放端口,若云服务器还要在server.properties配置内网IP和外网IPproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.32:8081");properties.put(ProducerConfig.CLIENT_ID_CONFIG, "my-producer");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 构造Client无非是:new 或 工厂模式producer = new KafkaProducer<Integer, String>(properties);this.topic = topic;}public void run() {int num = 0;String msg = "kafka practice msg: " + num;while (num < 20) {try {// 发送消息send()!!! 同步调用// Future.get()会阻塞,等待返回结果......RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, msg)).get();// 等上面get到结果了,才能执行这里System.out.println(recordMetadata.offset() + "->" + recordMetadata.partition() + "->" + recordMetadata.topic());TimeUnit.SECONDS.sleep(2);num++;} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}public static void main(String[] args) {// 传入test主题new MyKafKaProducer("test").start();}
} 

2)消费者代码

package com.java;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class MykafkaConsumer extends Thread{KafkaConsumer<Integer, String> consumer;String topic;public MykafkaConsumer(String topic) {// 构建连接配置,这里是ConsumerConfigProperties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.32:8081");properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer");// 反序列化,这里是Deserializerproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 以下是Producer没有的配置properties.put(ConsumerConfig.GROUP_ID_CONFIG, "my-gid"); // 要加入的groupproperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); // 超时,心跳properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); // 自动提交(批量)properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 新group消费为位置consumer = new KafkaConsumer<>(properties);this.topic = topic;}public void run() {// 死循环不断消费消息while (true) {// 绑定订阅主题// 注:Collections.singleton返回一个单元素&不可修改Set集合,// 同样的还有singletonList,singletonMapconsumer.subscribe(Collections.singleton(this.topic));// 接收消息 POLL()!!!ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));// 注:一行的lamada表达式可以不用{}consumerRecords.forEach(record -> System.out.println(record.key() + "->" + record.value() + "->" + record.offset()));}}public static void main(String[] args) {// 拉取test主题的消息new MykafkaConsumer("test").start();}
}

 topic 主题必须一致才可以实现生产者和消费者的互通


http://www.ppmy.cn/news/279090.html

相关文章

世界杯 | 中国首次承建世界杯主赛场,基建狂魔用BIM征服世界杯

hi&#xff0c;家人们&#xff0c;我是建模助手。 呐&#xff0c;一如既往地&#xff0c;这次世界杯中国除了球队&#xff0c;其他基本都去了卡塔尔&#xff0c;我可不是在搞幽默&#xff0c;本次世界杯可真是中国元素满满—— 包括但不限于Made in China的纯电大巴、无人驾驶地…

2018世界杯所有场次记录

比赛日程&#xff1a;2018年6月14日——2018年7月15日&#xff0c;共32比赛日&#xff0c;64场比赛。其中48场小组赛&#xff0c;15场淘汰赛&#xff0c;1场排位赛。 冠军&#xff1a;法国。亚军&#xff1a;克罗地亚。季军&#xff1a;比利时。第四名&#xff1a;英格兰。 八强…

世界杯决赛倒计时,你最看好谁

一、期待终极决赛 C站肯定有许多看球的朋友吧。明天即将迎来2022年世界杯终极对决&#xff0c;阿根廷队对战法国。来赶个晚集&#xff0c;也来聊聊世界杯&#xff0c;下一个四年就是2026&#xff0c;会不会还有这种精神呢。先来回顾一下本轮世界杯比赛中的对阵吧。 本轮世界杯中…

卡塔尔世界杯有哪些高科技?-世界杯征文

世界杯来了&#xff0c;估计大多数人都看过一两场比赛吧。可是&#xff0c;你有没有发现好多越位和点球都不是现场裁判现吹的&#xff0c;而是现场的视频裁判提示主裁判有类似的犯规&#xff0c;再由主裁判定夺&#xff01;视频真的有那么强大么&#xff0c;恐怕不然。不过&…

NFT+体育,卡塔尔世界杯有哪些NFT看点!

有人说没有冷门的世界杯不是真正的世界杯&#xff01;11月22日&#xff0c;卡塔尔世界杯小组赛C组第1轮比赛中&#xff0c;沙特爆冷2:1逆转阿根廷队&#xff0c;成了今年世界杯的第一个冷门。世界排名第51位的沙特队战胜了排名第3的阿根廷队&#xff0c;结束了阿根廷队此前的36…

2022,世界杯,来了

文 / 王不留&#xff08;微信公众号&#xff1a;考研英语笔记&#xff09; The World in 2022: time for football to up its game The FIFA World Cup starts in November in Qatar. It used to be the only competition to involve all of the world’s best footballers, pl…

卡塔尔世界杯︱足球游戏的打开方式

距离卡塔尔世界杯开始还有一周的时间&#xff0c;你为此做好什么准备了吗&#xff1f; 一些狂热的球迷为了追求极致的观感和身临其境感&#xff0c;不惜在电视、音响上下重金&#xff1b;也有一些为了享受世界杯热闹氛围的人&#xff0c;囤好零食&#xff0c;或是和好友早早约…

我与世界杯足球那些事——世界杯征文

征文活动链接&#xff1a; https://bbs.csdn.net/topics/609601920 目录 第一次了解世界杯 第一次观看世界杯 世界杯主题曲 我最热爱的球员 今年世界杯 预测冠军 第一次了解世界杯 提起世界杯&#xff0c;我可能了解的比较晚一些&#xff0c;是在2014年的巴西世界杯的时…