Redis高级——批处理优化

news/2025/1/18 11:57:50/

2、批处理优化

2.1、Pipeline

2.1.1、我们的客户端与redis服务器是这样交互的

单个命令的执行流程

在这里插入图片描述

N条命令的执行流程

在这里插入图片描述

redis处理指令是很快的,主要花费的时候在于网络传输。于是乎很容易想到将多条指令批量的传输给redis

在这里插入图片描述

2.1.2、MSet

Redis提供了很多Mxxx这样的命令,可以实现批量插入数据,例如:

  • mset
  • hmset

利用mset批量插入10万条数据

@Test
void testMxx() {String[] arr = new String[2000];int j;long b = System.currentTimeMillis();for (int i = 1; i <= 100000; i++) {j = (i % 1000) << 1;arr[j] = "test:key_" + i;arr[j + 1] = "value_" + i;if (j == 0) {jedis.mset(arr);}}long e = System.currentTimeMillis();System.out.println("time: " + (e - b));
}

2.1.3、Pipeline

MSET虽然可以批处理,但是却只能操作部分数据类型,因此如果有对复杂数据类型的批处理需要,建议使用Pipeline

@Test
void testPipeline() {// 创建管道Pipeline pipeline = jedis.pipelined();long b = System.currentTimeMillis();for (int i = 1; i <= 100000; i++) {// 放入命令到管道pipeline.set("test:key_" + i, "value_" + i);if (i % 1000 == 0) {// 每放入1000条命令,批量执行pipeline.sync();}}long e = System.currentTimeMillis();System.out.println("time: " + (e - b));
}

2.2、集群下的批处理

如MSET或Pipeline这样的批处理需要在一次请求中携带多条命令,而此时如果Redis是一个集群,那批处理命令的多个key必须落在一个插槽中,否则就会导致执行失败。大家可以想一想这样的要求其实很难实现,因为我们在批处理时,可能一次要插入很多条数据,这些数据很有可能不会都落在相同的节点上,这就会导致报错了

这个时候,我们可以找到4种解决方案

在这里插入图片描述

第一种方案:串行执行,所以这种方式没有什么意义,当然,执行起来就很简单了,缺点就是耗时过久。

第二种方案:串行slot,简单来说,就是执行前,客户端先计算一下对应的key的slot,一样slot的key就放到一个组里边,不同的,就放到不同的组里边,然后对每个组执行pipeline的批处理,他就能串行执行各个组的命令,这种做法比第一种方法耗时要少,但是缺点呢,相对来说复杂一点,所以这种方案还需要优化一下

第三种方案:并行slot,相较于第二种方案,在分组完成后串行执行,第三种方案,就变成了并行执行各个命令,所以他的耗时就非常短,但是实现呢,也更加复杂。

第四种:hash_tag,redis计算key的slot的时候,其实是根据key的有效部分来计算的,通过这种方式就能一次处理所有的key,这种方式耗时最短,实现也简单,但是如果通过操作key的有效部分,那么就会导致所有的key都落在一个节点上,产生数据倾斜的问题,所以我们推荐使用第三种方式。

2.2.1 串行化执行代码实践

public class JedisClusterTest {private JedisCluster jedisCluster;@BeforeEachvoid setUp() {// 配置连接池JedisPoolConfig poolConfig = new JedisPoolConfig();poolConfig.setMaxTotal(8);poolConfig.setMaxIdle(8);poolConfig.setMinIdle(0);poolConfig.setMaxWaitMillis(1000);HashSet<HostAndPort> nodes = new HashSet<>();nodes.add(new HostAndPort("192.168.150.101", 7001));nodes.add(new HostAndPort("192.168.150.101", 7002));nodes.add(new HostAndPort("192.168.150.101", 7003));nodes.add(new HostAndPort("192.168.150.101", 8001));nodes.add(new HostAndPort("192.168.150.101", 8002));nodes.add(new HostAndPort("192.168.150.101", 8003));jedisCluster = new JedisCluster(nodes, poolConfig);}@Testvoid testMSet() {jedisCluster.mset("name", "Jack", "age", "21", "sex", "male");}@Testvoid testMSet2() {Map<String, String> map = new HashMap<>(3);map.put("name", "Jack");map.put("age", "21");map.put("sex", "Male");//对Map数据进行分组。根据相同的slot放在一个分组//key就是slot,value就是一个组Map<Integer, List<Map.Entry<String, String>>> result = map.entrySet().stream().collect(Collectors.groupingBy(entry -> ClusterSlotHashUtil.calculateSlot(entry.getKey())));//串行的去执行mset的逻辑for (List<Map.Entry<String, String>> list : result.values()) {String[] arr = new String[list.size() * 2];int j = 0;for (int i = 0; i < list.size(); i++) {j = i<<2;Map.Entry<String, String> e = list.get(0);arr[j] = e.getKey();arr[j + 1] = e.getValue();}jedisCluster.mset(arr);}}@AfterEachvoid tearDown() {if (jedisCluster != null) {jedisCluster.close();}}
}

2.2.2 Spring集群环境下批处理代码

   @Testvoid testMSetInCluster() {Map<String, String> map = new HashMap<>(3);map.put("name", "Rose");map.put("age", "21");map.put("sex", "Female");stringRedisTemplate.opsForValue().multiSet(map);List<String> strings = stringRedisTemplate.opsForValue().multiGet(Arrays.asList("name", "age", "sex"));strings.forEach(System.out::println);}

原理分析

在RedisAdvancedClusterAsyncCommandsImpl 类中

首先根据slotHash算出来一个partitioned的map,map中的key就是slot,而他的value就是对应的对应相同slot的key对应的数据

通过 RedisFuture mset = super.mset(op);进行异步的消息发送

@Override
public RedisFuture<String> mset(Map<K, V> map) {Map<Integer, List<K>> partitioned = SlotHash.partition(codec, map.keySet());if (partitioned.size() < 2) {return super.mset(map);}Map<Integer, RedisFuture<String>> executions = new HashMap<>();for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {Map<K, V> op = new HashMap<>();entry.getValue().forEach(k -> op.put(k, map.get(k)));RedisFuture<String> mset = super.mset(op);executions.put(entry.getKey(), mset);}return MultiNodeExecution.firstOfAsync(executions);
}

http://www.ppmy.cn/news/60054.html

相关文章

跟着我学 AI丨让计算机看懂世界

计算机视觉是一种利用计算机和数学算法来处理、分析和识别数字影像的技术。这项技术在近年来得到了快速发展&#xff0c;应用范围也越来越广泛&#xff0c;它已经成为了人工智能领域中的重要分支之一。 技术原理 计算机视觉技术主要涉及图像处理、模式识别和机器学习等方面的技…

Java8中DateTimeFormatter真的是线程安全的吗?

文章目录 [toc] 1.背景2.解决办法2.1办法一&#xff1a;换姿势或者升级JDK的版本2.1办法二&#xff1a;更换文件名称字生成策略 Java8中DateTimeFormatter真的是线程安全的吗&#xff1f; 答案是否定的 1.背景 由于之前写了一个旷世的ocr的服务,接入了旷世的FaceID的人脸比对…

RN系统精讲-----基础了解

原生基础 安装SDK与Tools preference > appearance > systemSetting > Android sdk 如何连接设备&#xff0c;以及开发中的常用的adb命令 USB连接设备 adb devices 查看连接设备 wifi网络连接设备 adb connect ip&#xff08;手机自己的ip地址&#xff0c;可以通过…

基于J2EE的B2C电子商务系统开发与实现

摘要 当今社会,科学技术突飞猛进,知识经济初见端倪。电子商务作为一种新型的贸易方式,极大地促进了全球经济贸易的发展,同时也正在改变人们的生活方式和思想观念。电子商务是指整个贸易活动实现电子化,交易各方以电子交易方式而进行的商业交易。世界贸易组织电子商务专题报告定…

python 统计pdf页数

python 统计pdf页数 import os import PyPDF2 import sys, jm, traceback from PyQt5.QtWidgets import QApplication, QMainWindow, QFileDialog, QAction from PyQt5 import QtCore, QtGui, QtWidgets def get_all_file_by_type(path, type(), get_all_dirs True): # 获得以…

【华为OD机试 2023最新 】统一限载货物数最小值(C语言题解 100%)

文章目录 题目描述输入描述输出描述备注用例题目解析代码思路C语言题目描述 火车站附近的货物中转站负责将到站货物运往仓库,小明在中转站负责调度2K辆中转车(K辆干货中转车,K辆湿货中转车)。 货物由不同供货商从各地发来,各地的货物是依次进站,然后小明按照卸货顺序依…

【网络安全】记一次杀猪盘渗透实战

看起来非常假的网站&#xff0c;这个网站是没有 cdn 的用的是 thinkphpk 框架搭建的。 先打一波 poc 没有效果 访问一下后台直接在 url 后面加/admin。 一个开源的 cms 还没有验证码尝试用 burp 进行爆破&#xff0c;首先在火狐上设置代理 ip 为 127.0.0.1 代理端口为 8081。 B…

1、Flutter使用总结(RichText、Container)

1、创建Flutter项目 flutter create DemoName 2、运行项目 flutter run -d ‘iPhone 14 Pro Max’ 注: 当运用Android Studio时、选择安卓模拟器运行项目、如果项目路径有中文名称: 那么运行报错、如果直接在项目路径下,采用终端运行安卓模拟器、可执行如下命令 flutter ru…