kafka快速入门+应用

embedded/2024/12/22 14:16:13/

Kafka, 构建TB级异步消息系统

0.mq优劣

优点:应用解耦、异步提速、削峰填谷

缺点:系统可用性降低、系统复杂度提高、一致性问题

mq特点:消息不丢失、支持消息存储

1.快速入门

1.1 阻塞队列

        在生产线程  和  消费线程  之间起到了 , 缓冲作用,即避免CPU  资源被浪费掉

  • BlockingQueue
    • 解决  线程通信  的问题
    • 阻塞方法  put 、 take
  • 生产者、消费者 模式
    • 生产者:产生数据的线程
    • 消费者:使用数据的线程
  • 实现类
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等

1.2 Kafaka 入门

Kafaka 简介

  • Kafaka 是一个分布式的(消息队列    发展到---》 功能更加丰富) 流媒体平台
  • 应用:消息系统、日志收集、用户行为追踪、流式处理

Kafaka 特点

  • 高吞吐量(对于硬盘的顺序读写,性能很高,高于对于内存的随机读写)
  • 消息持久化(将数据存储在硬盘中)
  • 高可靠性分布式集群部署,一台服务器挂了,切换到其他服务器)
  • 高扩展性(服务器不够用,则加一台服务器)

Kafaka 术语

  • Broker(集群中的每一台服务器称为一个Broker)、Zookeeper(管理集群的软件)
  • Topic(发布订阅模式,生产者把消息发送的位置-》topic)、Partition(表示对于主题位置的分区,每一个分区按照顺序往其中追加数据)、Offset(消息在分区内存在的索引)
  • Leader Replica(主副本可以提供想要读取某个分区的数据,如果主副本挂了,则从集群中选用一个从副本作为主副本) 、Follower Replica(从副本只是 备份,不负责做响应)
    • 每一个分区有多个副本,如果一个副本坏了,还有备份,提高容错率

1.3 Spring 整合 Kafka

  1. 引入依赖
    1. spring-kafka
  2. 配置Kafka
    1. 配置server
    2. 配置consumer
  3. 访问Kafka
  • 生产者
    • kafkaTemplate.send(topic, data);
  • 消费者
    • @KafkaListener(topics = {"test"})  public void handleMessage(ConsumerRecord record) {}

2.应用

2.1 安装

(1)安装包位置

链接:https://pan.baidu.com/s/1QjZdOUYdmSCSf0zjprJQIQ
提取码:9630

下载后解压缩

(2)相关配置

1

 2

2.2 启动+使用

(1)启动zookeeper
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

(2)启动kafka
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\kafka-server-start.bat config\server.properties

(3)创建主题

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 -replication-factor 1 --partitions 1 --topic test1

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
test
test1

(4)创建生产者

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test1

(5)创建消费者

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

再次生产消息,会自动消费消息

2.3 spring中整合kafka

(1)配置Properties

#9. kafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
# 是否自动提交(记录)   消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

其中spring.kafka.consumer.group-id=community-consumer-group与。。。保持一致

(2)导入依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

(3)test

1.定义producer

@Component
class KafkaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, String content) {kafkaTemplate.send(topic,content);}
}

2.定义consumer

@Component
class KafkaConsumer {@KafkaListener(topics ={"test"})public void handleMessage(ConsumerRecord record){System.out.println(record.value());//PHDVB干嘛呢//呕吼}
}

3.编写测试类

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka() {kafkaProducer.sendMessage("test","PHDVB干嘛呢");kafkaProducer.sendMessage("test","呕吼");// 生产者发消息是  主动的 ,消费者收消息是  被动地try {Thread.sleep(1000 * 10);} catch (InterruptedException e) {e.printStackTrace();}}
}


http://www.ppmy.cn/embedded/1892.html

相关文章

“好”玩游戏让我本能痴迷游戏编程

源地址&#xff1a;https://www.ctvol.com/c-cdevelopment/5842.html 我的游戏生涯是从最开始的热血传奇开始的&#xff0c;那时候&#xff0c;我们年少轻狂&#xff0c;不知道多少80后的青春都洒在了这个游戏上面&#xff0c;那时候&#xff0c;热血传奇的热度比现在的英雄联…

【位运算】Leetcode 判定字符是否唯一

题目解析 算法讲解 正常思路&#xff1a;使用unordered_map判断并保存每一个字符出现的次数&#xff0c;如果当前的字符在添加到Hash之前已经出现了一次了&#xff0c;直接返回false&#xff0c;反之循环结束返回true 优化思路&#xff1a;可以使用位图来充当Hash表&#xff…

项目之旅(前两周)

文章目录 学习总结input1.text 文本框2.password 密码框3.button 按钮4.file 文件还可定义上传类型 5.日期6.radio 单选框7. checkbox 复选框 项目总结生活总结 学习总结 input 本次写项目时才发现&#xff0c;input有很多种用法&#xff0c;这里列举几种 1.text 文本框 不…

阿里云4核16G服务器可以用来做什么?

阿里云4核16G服务器可以用来做什么&#xff1f;可用来搭建游戏服务器&#xff0c;阿里云4核16G服务器10M带宽30元1个月、90元3个月&#xff0c;优惠活动 aliyunfuwuqi.com/go/youhui 阿里云4核16G服务器可以用来做什么&#xff1f;除了搭建游戏服务器&#xff0c;还可以用来哪…

网络协议学习——以太网协议

目录 ​编辑 一&#xff0c;以太网简介 二&#xff0c;以太网通信的过程 为什么不用IP地址&#xff1f; 过程 MAC帧 MAC帧的字段介绍 ARP协议 传输过程的一些问题 RARP协议 提高效率 三&#xff0c;其他问题 ARP诈骗问题 URL解析过程 一&#xff0c;以太网简介 …

Java刷题API

因为经常用Java刷题&#xff0c;记录一下常用到的API 数组 1. 定义&#xff08;两种方法&#xff09; type arrayName[]; //第一种 type[] arrayName; //第二种//eg int arrayName[] new int[5]; int[] arrayName new int[5];//二维数组 int arrayName[][] new int[5][5]…

数据结构从入门到实战——顺序表

目录 前言 一、顺序表的概念及结构 1.1 线性表 二、顺序表分类 三、动态顺序表的实现 3.1 顺序表结构的创建以及初始化 3.2 顺序表的销毁 3.3 顺序表的打印 3.4 尾插数据 ——最困难的 3.5 头插数据 3.6 尾删数据 3.7 头部删除数据 前言 在计算机科学和数据结…

深入浅出理解CSS中的文本样式与排版:探索文字之美

在Web设计与开发的广阔天地里&#xff0c;CSS&#xff08;层叠样式表&#xff09;对于网页文本的呈现与排版起着举足轻重的作用。CSS中与文本相关的属性繁多而细致&#xff0c;涵盖了字体选择、大小调整、颜色填充、对齐方式、缩进、间距调整以及更多高级样式处理。本文将以通俗…