springboot第75集:kafka,线程,进程,容器化服务,线程池

devtools/2024/10/11 4:49:32/

消息中间件在异步通信中⽤的最多,很多业务流程中,如果所有步骤都同步进⾏可能会导致核⼼流程耗时⾮常⻓,更重

要的是所有步骤都同步进⾏⼀旦⾮核⼼步骤失败会导致核⼼流程整体失败,因此在很多业务流程中Kafka就充当了异步

通信⻆⾊。

⼤规模分布式系统中的机器⾮常多⽽且分散在不同机房中,分布式系统带来的⼀个明显问题就是业务⽇志的查看、追踪

和分析等⾏为变得⼗分困难,对于集群规模在百台以上的系统,查询线上⽇志很恐怖。

为了应对这种场景统⼀⽇志系统应运⽽⽣,⽇志数据都是海量数据,通常为了不给系统带来额外负担⼀般会采⽤异步上

报,这⾥Kafka以其⾼吞吐量在⽇志处理中得到了很好的应⽤。

随着据量的增加,离线的计算会越来越慢,难以满⾜⽤户在某些场景下的实时性要求,因此很多解决⽅案中引⼊了实时

计算。

很多时候,即使是海量数据,我们也希望即时去查看⼀些数据指标,实时流计算应运⽽⽣。

实时流计算有两个特点,⼀个是实时,随时可以看数据;另⼀个是流。实时流计算有两个特点,⼀个是实时,随时可以看数据;另⼀个是流。

push模式由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。

消息系统都致⼒于让consumer以最⼤的速率最快速的消费消息,push模式下,当broker推送的速率远⼤于consumer消息系统都致⼒于让consumer以最⼤的速率最快速的消费消息,push模式下,当broker推送的速率远⼤于consumer

消费的速率时,consumer恐怕就要崩溃了。

例如消息发送设置了重试机制,并且异步发送,消息A和B设置相同的key,业务上A先发,B后发,由于⽹络或者其他

原因A发送失败,B发送成功;A由于发送失败就会重试且重试成功,这时候消息顺序B在前A在后,与业务发送顺序不

⼀致,如果需要解决这个问题,需要设置参数 max.in.flight.requests.per.connection=1 ,其含义是限制客户

端在单个连接上能够发送的未响应请求的个数,设置此值是1表示kafka broker在响应请求之前client不能再向同⼀个

broker发送请求,这个参数默认值是5

kafka的消息是不断追加到⽂件中的,这个特性使 kafka 可以充分利⽤磁盘的顺序读写性能

顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写

Kafka 可以配置异步刷盘,不开启同步刷盘,异步刷盘不需要等写⼊磁盘后返回消息投递的 ACK,所以它提⾼了消息发

送的吞吐量,降低了请求的延时

传统的 IO 流程,需要先把数据拷⻉到内核缓冲区,再从内核缓冲拷⻉到⽤户空间,应⽤程序处理完成以后,再拷⻉回

内核缓冲区内核缓冲区

这个过程中发⽣了多次数据拷⻉

将数据保存到内存中的 Map(或其他数据结构)后,服务重启时这些数据不会自动保留,因为内存中的数据在进程结束时会被清除。要保留数据,可以考虑以下几种方法:

1. 持久化存储

将数据从 Map 持久化到文件或数据库中,以便在服务重启时能够恢复。

a. 使用文件存储

可以将 Map 的数据序列化并写入文件,例如使用 JSON 格式:

import java.io.*;
import java.util.HashMap;
import java.util.Map;public class DataStorage {private Map<String, String> dataMap = new HashMap<>();// 保存数据到文件public void saveToFile() throws IOException {try (ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream("data.ser"))) {oos.writeObject(dataMap);}}// 从文件加载数据public void loadFromFile() throws IOException, ClassNotFoundException {try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream("data.ser"))) {dataMap = (Map<String, String>) ois.readObject();}}
}
// 使用 Redis 的 Java 客户端(如 Jedis)存储 Map 数据
import redis.clients.jedis.Jedis;// 保存数据到 Redis
public void saveToRedis() {try (Jedis jedis = new Jedis("localhost")) {for (Map.Entry<String, String> entry : dataMap.entrySet()) {jedis.set(entry.getKey(), entry.getValue());}}
}// 从 Redis 加载数据
public void loadFromRedis() {try (Jedis jedis = new Jedis("localhost")) {for (String key : jedis.keys("*")) {String value = jedis.get(key);dataMap.put(key, value);}}
}

定期将 Map 中的数据快照保存到文件或数据库中,以便在服务重启时快速恢复。

在服务启动时,添加加载数据的逻辑,从持久化存储中读取数据并填充 Map

  • 内存中的数据(如 Map)在重启时不会保留。

  • 通过文件存储、数据库、或内存数据库等方法将数据持久化,可以在服务重启时恢复数据。

举例:单次拉取11条消息,每条消息耗时30s,11条消息耗时5分钟30秒,由于 max.poll.interval.ms 默认值5分

钟,所以消费者⽆法在5分钟内消费完,consumer会离开组,导致rebalance。

在消费完11条消息后,consumer会重新连接broker,再次rebalance,因为上次消费的offset未提交,再次拉取的消

息是之前消费过的消息,造成重复消费。

1、提⾼消费能⼒,提⾼单条消息的处理速度;根据实际场景可讲 max.poll.interval.ms 值设置⼤⼀点,避免不必

要的rebalance;可适当减⼩ max.poll.records 的值,默认值是500,可根据实际消息速率适当调⼩。

2、⽣成消息时,可加⼊唯⼀标识符如消息id,在消费端,保存最近的1000条消息id存⼊到redis或mysql中,消费的消

息时通过前置去重。

消费者内部根据线程数量创建等量的内存队列,对于需要顺序的⼀系列业务数据,根据key或者业务数据,放到同⼀个

内存队列中,然后线程从对应的内存队列中取出并操作

org.apache.kafka.clients.producer.ProducerInterceptor 接⼝。

该接⼝是Kafka提供的,⾥⾯有两个核⼼的⽅法。

  1. onSend:该⽅法会在消息发送之前被调⽤。

  2. onAcknowledgement:该⽅法会在消息成功提交或发送失败之后被调⽤。onAcknowledgement的调⽤要早于

callback的调⽤。值得注意的是,这个⽅法和onSend不是在同⼀个线程中被调⽤的,因此如果你在这两个⽅法中

调⽤了某个共享可变对象,⼀定要保证线程安全

在服务器上,进程和线程的产生通常与应用的运行模式和用户请求有关。以下是一些常见情况:

1. 进程
  • 服务启动时:当后端服务(如 web 服务器、数据库等)启动时,操作系统会为其创建一个进程。

  • 进程间通信:如果需要通过进程间通信(IPC)来处理任务,可能会启动新的进程。

2. 线程
  • 多线程服务:在多线程应用中,服务会在启动时创建多个线程,准备处理并发请求。

  • 用户请求处理:当用户请求接口时,后端服务通常会为每个请求分配一个线程来处理,从而提高响应能力。

3. 请求接口时的具体行为
  • 单线程模式:在单线程服务中,所有请求可能会在同一个线程中排队处理。

  • 多线程/进程模式:在多线程或多进程模式下,当一个请求到达时,可能会立即生成一个新的线程或进程,或者从线程池中借用一个现有的线程。

4. 异步处理
  • 一些现代框架采用异步处理方式,通过事件循环和回调机制来处理请求,减少线程或进程的使用。

在服务器中启动多个服务时,通常会产生多个进程。以下是一些具体情况:

1. 每个服务一个进程
  • 独立服务:大多数后端服务(如 web 服务器、数据库等)会以独立进程的形式运行。每个服务启动时,操作系统会为其分配一个新的进程。

  • 资源隔离:进程之间相互独立,具有自己的内存空间,有助于提高安全性和稳定性。

2. 多线程服务
  • 服务内部的多线程:一些服务可能内部实现为多线程结构,服务的每个进程可以包含多个线程来处理并发请求。这种情况下,服务的进程数量和线程数量是分开的。

  • 线程池:在一些高性能服务中,可能会使用线程池来管理线程,以便复用已有的线程来处理请求,减少线程创建和销毁的开销。

3. 容器化服务
  • 容器化:在使用 Docker 等容器技术时,通常每个服务在不同的容器中运行,每个容器也是一个独立的进程。这种方式可以方便地管理和扩展服务。

总结
  • 启动多个服务时,通常是产生多个进程。

  • 每个服务进程内部可以使用多线程来处理并发请求。

确保 replication.factor > min.insync.replicas ,如果两者相等,那么只要有⼀个副本挂机,整个分区就

⽆法正常⼯作了,我们不仅要改善消息的持久性,防⽌数据丢失,还要在不降低可⽤性的基础上完成,推荐设置

成 replication.factor = min.insync.replicas + 1 。

加群联系作者vx:xiaoda0423

仓库地址:github.com/webVueBlog/…


http://www.ppmy.cn/devtools/123988.html

相关文章

接口测试-day3-jmeter-2组件和元件

组件和元件&#xff1a; 组件&#xff1a;组件指的是jmeter里面任意一个可以使用的功能。比如说查看结果树或者是http请求 元件&#xff1a;元件指是提对组件的分类 组件的作用域&#xff1a;组件放的位置不一样生效也不一样。 作用域取决于组件的的层级结构并不取决于组件的…

【解决办法】git clone报错unable to access ‘xxx‘: SSL certificate problem

git clone 是 Git 版本控制系统中的一个基本命令&#xff0c;用于从远程仓库复制一个完整的版本库到本地。这个命令不仅复制远程仓库中的所有文件&#xff0c;还复制仓库的历史记录&#xff0c;使得你可以在本地进行版本控制操作&#xff0c;如提交&#xff08;commit&#xff…

【Linux】线程与线程安全知识总结

向外张望的人在做梦&#xff0c; 向内审视的人才是清醒的。 --- 荣格 --- 我最近复习了线程安全这部分知识&#xff0c;将不明白的问题总结出来&#xff0c;并通过AI进行问答帮助我进行学习巩固。本人能力有限 &#xff0c;可能有些内容不准确&#xff0c;望各位大佬海涵&am…

TadTR(TIP 2022)视频动作检测方法详解

前言 论文&#xff1a;End-to-end Temporal Action Detection with Transformer 代码&#xff1a;TadTR 从论文题目可以看出 TadTR 是基于 Transformer 的端到端的方法&#xff0c;TAD 在视频动作分类任务上更进一步&#xff0c;不仅对动作分类&#xff0c;还要检测动作发生的…

王道408考研数据结构-图-第六章

6.1 图的基本概念 6.1.1 图的定义 图G由顶点集V和边集 E组成&#xff0c;记为G(V,E),其中 V(G)表示图G中顶点的有限非空集&#xff1b;E(G)表示图G中顶点之间的关系(边)集合。若V{v?,v?,…,vn},则用|M表示图G中顶第6章 点的个数&#xff0c;E{(u,v) | uεV,vεV},用|E|表示图…

vue 入门二

参考&#xff1a;丁丁的哔哩哔哩 11.组件基础 传递 props 父组件 <BlogPost title"My journey with Vue" />子组件 <script setup> defineProps([title]) </script><template><h4>{{ title }}</h4> </template>props第…

【重学 MySQL】四十九、阿里 MySQL 命名规范及 MySQL8 DDL 的原子化

【重学 MySQL】四十九、阿里 MySQL 命名规范及 MySQL8 DDL 的原子化 阿里 MySQL 命名规范MySQL8 DDL的原子化 阿里 MySQL 命名规范 【强制】表名、字段名必须使用小写字母或数字&#xff0c;禁止出现数字开头&#xff0c;禁止两个下划线中间只出现数字。数据库字段名的修改代价…

Ubuntu 22.04.4 LTS更换下载源

方法1&#xff1a;使用图形界面更换下载源 1. 打开软件和更新应用 2. 在Ubuntu 软件标签中&#xff0c;点击“下载自”旁边的下拉菜单&#xff0c;选择“其他” 3. 点击“选择最佳服务器”来自动选择最快的服务器 4. 选择服务器 5. 确定并关闭窗口&#xff0c;系统会提示您重新…