Kafka 下载安装及使用总结

embedded/2024/9/22 20:18:29/

1. 下载安装

官网下载地址:Apache Kafka

下载对应的文件

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

上传到服务器上,解压

tar -xzf kafka_2.13-3.7.0.tgz

目录结果如下

├── bin
│   └── windows
├── config
│   └── kraft
├── libs
├── licenses
└── site-docs

官方文档:Apache Kafka

kafka 有两种启动方式,ZooKeeper 和 KRaft,这里采用 KRaft 的方式,使用 kraft 目录下的配置文件

config
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-mirror-maker.properties
├── connect-standalone.properties
├── consumer.properties
├── kraft
│   ├── broker.properties
│   ├── controller.properties
│   └── server.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
├── trogdor.conf
└── zookeeper.properties

修改 server.properties 文件

  • process.roles:KRaft 模式角色
    • broker
    • controller
    • broker,controller
  • node.id:节点 ID,需要为每个节点分配一个唯一的 ID
  • controller.quorum.voters:Controller 的投票者配置
  • log.dirs:日志目录

初始化集群,先生成一个 UUID

./bin/kafka-storage.sh random-uuid

再执行命令,使用生成的 UUID 完成集群初始化

./bin/kafka-storage.sh format -t thCDFveGRleJro7zTaOOGA -c ./config/kraft/server.properties

然后启动 Kafka

./bin/kafka-server-start.sh -daemon ./config/kraft/server.properties

2. Spring Boot 集成

2.1 引入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.1.4</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.0</version></dependency>
</dependencies>

2.2 配置文件

spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: fable-group

2.3 生产者

使用 KafkaTemplate 实现消息的生产

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProducerController {private final KafkaTemplate<String, String> kafkaTemplate;@Autowiredpublic ProducerController(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@GetMapping("/send/message")public String sendMessage() {kafkaTemplate.send("test", "Hello, Kafka!");return "Message sent successfully";}
}

2.4 消费者

添加 @KafkaListener 注解,监听消息

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class ConsumerService {@KafkaListener(topics = "test", groupId = "fable-group")public void listen(ConsumerRecord<String, String> consumerRecord) {System.out.println("Received message: " + consumerRecord.value());}
}

2.5 启动类

添加 @EnableKafka 注解

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;@SpringBootApplication
@EnableKafka
public class FableApplication {public static void main(String[] args) {SpringApplication.run(FableApplication.class, args);}
}

2.6 测试

访问 /send/message 接口,可以看到控制台打印出接收到的消息

Received message: Hello, Kafka!

http://www.ppmy.cn/embedded/115217.html

相关文章

【加密算法基础——AES加密CBC模式和CFB模式的差异分析】

AES 解密实践之代码实现 上篇提到对于AES解密&#xff0c;命令行无法处理key截断的问题。 看一下实测代码&#xff0c;目前只测试到OpenSSL可以正确解密&#xff0c;但是库函数无法正确解密。 1. CFB模式代码展示 from Crypto.Cipher import AES import base64 import binas…

【Python报错已解决】 Requests.exceptions.ProxyError: HTTPSConnectionPool

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

JavaEE: 创造无限连接——网络编程中的套接字

文章目录 Socket套接字TCP和UDP的区别有连接/无连接可靠传输/不可靠传输面向字节流/面向数据报全双工/半双工 UDP/TCP api的使用UDPDatagramSocketDatagramPacketInetSocketAddress练习 TCPServerSocketSocket练习 Socket套接字 Socket是计算机网络中的一种通信机制&#xff0…

Kafka 为什么这么快?

Kafka 是一款性能非常优秀的消息队列&#xff0c;每秒处理的消息体量可以达到千万级别。今天来聊一聊 Kafka 高性能背后的技术原理。 1 批量发送 Kafka 收发消息都是批量进行处理的。我们看一下 Kafka 生产者发送消息的代码&#xff1a; private Future<RecordMetadata>…

C# 中Faker

在 C# 中&#xff0c;Faker 类通常用于生成模拟数据&#xff08;也称为虚拟数据、测试数据&#xff09;&#xff0c;这对于开发、测试以及演示应用程序非常有用。一个流行的库叫做 Faker&#xff0c;它提供了一种简单的方式来生成各种随机数据。 安装 Faker 库 要使用 Faker …

【webpack4系列】webpack基础用法(二)

文章目录 entryoutputloaderpluginmode前端构建基础配置关联HTML插件html-webpack-plugin构建 CSS 解析 ES6和React JSX解析 ES6解析 React JSX 解析CSS、Less和Sass解析CSS解析Less解析sass 解析图片和字体资源解析&#xff1a;解析图片资源解析&#xff1a;解析字体资源解析&…

【JVM】符号引用 和 直接引用

符号引用 vs. 直接引用 在计算机科学中&#xff0c;特别是在编译原理和虚拟机技术中&#xff0c;涉及到两个概念&#xff1a;符号引用&#xff08;Symbolic Reference&#xff09;和直接引用&#xff08;Direct Reference&#xff09;。 符号引用&#xff08;Symbolic Refere…

AWS 将 OpenSearch 纳入 Linux 基金会旗下

AWS 今天宣布&#xff0c;随着OpenSearch 基金会的成立&#xff0c;它将把OpenSearch&#xff08;流行的 Elasticsearch 搜索和分析引擎的开源分叉&#xff09;移交给 Linux 基金会。在 Elastic 将其 Elasticsearch 和 Kibana 项目的许可证更改为自己的专有许可证 Elastic Lice…