Kafka的安装及接入SpringBoot

news/2024/9/23 5:45:05/

环境:windows、jdk1.8、springboot2

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/

1.概述

        Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。

        详细概述见Kafka概述:

1.1 Kafka的作用

  • 发布和订阅记录流
  • 持久存储记录流,Kafka中的数据即使消费后也不会消失
  • 在系统或应用之间构建可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序
  • Kafka可以处理源源不断产生的数据

1.2 Kafka的一些概念

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2.Kafka下载安装

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/downloads        选择最新版就可以

2.1 配置kafka

        解压下载的文件,修改 config 文件夹下的 zookeeper.properties

        修改 config 文件夹下的 server.properties

        当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

 

2.2 启动 zookeeper

        Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

        可以本地访问看一下:http://localhost:2181/ 

2.3 启动Kafka 

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

kafka-server-start.sh ../../config/server.properties

        访问路径: http://localhost:9092/ 

2.4 便捷启动脚本

        两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中

cd bin\windows

zookeeper-server-start.bat ../../config/zookeeper.properties

cd bin\windows

kafka-server-start.bat ../../config/server.properties

3.springboot集成Kafka

3.1 环境搭建

(1)添加pom依赖

<!-- 继承Spring boot工程 -->
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.8.RELEASE</version>
</parent>
<properties><fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>

(2)配置类application.yml

        生产者:

spring:kafka:bootstrap-servers: xxx.xxx.xxx.xxx:9092producer:retries: 0key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

        消费者:

spring:kafka:bootstrap-servers: xxx.xxx.xxx.xxx:9092consumer:group-id: kafka-demo-kafka-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApp {public static void main(String[] args) {SpringApplication.run(KafkaApp.class, args);}
}

3.2 消息生产者

        junit测试,新建消息发送方

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
​
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSendTest {@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate; //如果这里有红色波浪线,那是假错误
​@Testpublic void sendMsg(){String topic = "spring_test";kafkaTemplate.send(topic,"hello spring boot kafka!");System.out.println("发送成功.");while (true){ //保存加载ioc容器
​}}
}

3.3 消息消费者

        新建监听类:

​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
​
@Component
public class MyKafkaListener {
​//    以下两种方法都行// 指定监听的主题
//    @KafkaListener(topics = "spring_test")
//    public void receiveMsg(String message){
//        System.out.println("接收到的消息:"+message);
//    }
​@KafkaListener(topics = "spring_test")public void handleMessage(ConsumerRecord<String, String> record) {System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());}
}

http://www.ppmy.cn/news/1459549.html

相关文章

保研面试408复习 3——操作系统

文章目录 1、操作系统一、进程有哪几种状态&#xff0c;状态之间的转换、二、调度策略a.处理机调度分为三级&#xff1a;b.调度算法 标记文字记忆&#xff0c;加粗文字注意&#xff0c;普通文字理解。 为什么越写越少&#xff1f; 问就是在打瓦。(bushi) 1、操作系统 一、进程…

前端测试策略与实践:单元测试、E2E测试与可访问性审计

前端测试策略是确保Web应用程序质量、性能和用户体验的关键组成部分。有效的测试策略通常包括单元测试、端到端&#xff08;E2E&#xff09;测试以及可访问性审计等多个层面。以下是关于这三类测试的策略与实践建议&#xff1a; 单元测试 定义与目的&#xff1a; 单元测试是针…

Verilog_学习路线(小白)

#前言&#xff1a; 自从专心学习专业课后&#xff0c;发现知识点得用&#xff0c;越用越熟练&#xff0c;工具也一样&#xff0c;高级工具的学习可帮助我们在工作中极大地提高效率&#xff0c;但这里要记住一点&#xff0c;任何工具都是为解决实际问题出现的&#xff0c;即落脚…

2024年第四届电子信息工程与计算机科学国际会议(EIECS 2024)

2024年第四届电子信息工程与计算机科学国际会议(EIECS 2024) 2024 4th International Conference on Electronic Information Engineering and Computer Science 中国延吉 | 2024年9月27-29日 投稿截止日期&#xff1a;2023年7月15日 收录检索&#xff1a;EI Compendex和Sc…

uniapp picker组件的样式更改

不知道有没有小伙伴遇到过这个问题 我是各种穿透和层级都尝试了更改不了其样式 梳理一下 H5端 在全局app.vue下添加如下代码 .uni-picker-container .uni-picker-header{ background-color: $uni-color-pink; //picker头部背景色}.uni-picker-container .…

Milvus Cloud 的RAG 的广泛应用及其独特优势

一个典型的 RAG 框架可以分为检索器(Retriever)和生成器(Generator)两块,检索过程包括为数据(如 Documents)做切分、嵌入向量(Embedding)、并构建索引(Chunks Vectors),再通过向量检索以召回相关结果,而生成过程则是利用基于检索结果(Context)增强的 Prompt 来激…

Unity 单例模式

Unity中单例模式是非常常用的写法&#xff0c;可以基于C#语言的几种不同方法来实现。 下面我将列出几种常见的实现方式&#xff1a; 1. 经典的单例模式 public class SingletonExample : MonoBehaviour {private static SingletonExample instance;public static SingletonEx…

Qt之摄像头操作

简单的摄像头测试类 头文件camerawidget.h #ifndef CAMERAWIDGET_H #define CAMERAWIDGET_H#include <QWidget> #include <QList> #include <QCamera> #include <QCameraInfo> #include <QCameraViewfinder> #include <QCameraImageCapture…