Redis 为什么要引入 Pipeline机制?

ops/2025/1/14 4:31:37/

在 Redis 中有一种 Pipeline(管道)机制,其目的是提高数据传输效率和吞吐量。那么,Pipeline是如何工作的?它又是如何提高性能的?Pipeline有什么优缺点?我们该如何使用 Pipeline?

1、Redis Pipeline是什么?

Redis Pipeline 是一种批量执行命令的技术,允许客户端在不等待服务器响应的情况下,一次性发送多个命令到 Redis 服务器。传统的请求-响应模式中,客户端每发送一个命令,就需要等待服务器响应后才能发送下一个命令,这种模式在高延迟网络环境下,严重影响 Redis 的性能表现。

Pipeline 通过消除或减少网络往返次数(Round-Trip Time, RTT),能够显著提高命令执行的吞吐量,客户端可以将多个命令打包发送,服务器则依次执行这些命令并将结果返回给客户端,从而有效地提升了网络利用率和整体性能。

2、为什么引入 Pipeline?
在了解 Redis为什么引入 Pipeline之前,我们先来了解传统请求-响应模式,在传统的请求-响应模式中,客户端与服务器之间的通信流程如下:

  • 客户端发送一个命令到服务器。
  • 服务器接收命令并执行。
  • 服务器将执行结果返回给客户端。
  • 客户端接收结果后,发送下一个命令。

在这里插入图片描述

在这种传统的模式下,每个命令都需要经历完整的 RTT,这在高延迟网络环境下会导致显著的性能瓶颈。

而 Pipeline的核心思想是“命令打包,高效传输”。其工作流程可以总结成下面 5个步骤:

  • 打包命令: 客户端将多个 Redis 命令按照特定的格式打包成一个请求包。
  • 发送命令: 将打包好的请求一次性发送给 Redis 服务器。
  • 执行命令: Redis 服务器按顺序执行接收到的所有命令。
  • 接收响应: 服务器将所有命令的执行结果按顺序返回给客户端。
  • 解析响应: 客户端解析接收到的响应,并将结果对应到各个命令。

在这里插入图片描述

这种方式通过减少网络往返次数,有效降低网络延迟对性能的影响,特别适合于需要执行大量 Redis 命令的高并发场景。

尽管 Pipeline带来了性能的提升,但它也有一些缺点:

  • 资源消耗: 发送大量命令一次性执行,可能会消耗较多的服务器资源,导致 Redis 其他操作的响应时间增加。
  • 错误处理复杂: 在批量执行命令时,单个命令的错误处理可能变得复杂,需要逐一检查每个命令的执行结果。
  • 顺序依赖: 如果命令之间存在顺序依赖,Pipeline 的批量执行需要确保正确的命令顺序。
  • 不支持事务功能: Pipeline 只是批量执行命令的工具,不具备事务的原子性和隔离性。
  • 客户端支持: 不同的 Redis 客户端对 Pipeline 的支持程度不同,使用时需考虑所选客户端库的特性和限制。

3、源码分析
在 Java中,常见的 Redis 客户端库有 Jedis 和 Lettuce两种,下面我们将分别分析这两个库实现 Pipeline功能。

3.1 使用 Jedis 库
Jedis 是一个简单、直观的 Redis 客户端,支持 Pipeline 功能。下面的示例展示如何使用 Jedis实现 Pipeline操作。

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;import java.util.ArrayList;
import java.util.List;publicclass JedisPipelineExample {public static void main(String[] args) {// Redis 连接参数String redisHost = "localhost";int redisPort = 6379;String redisPassword = null; // 若有密码,填写密码// 连接 Redistry (Jedis jedis = new Jedis(redisHost, redisPort)) {if (redisPassword != null && !redisPassword.isEmpty()) {jedis.auth(redisPassword);}// 批量设置键值对batchSet(jedis);// 批量获取键值对batchGet(jedis);} catch (Exception e) {e.printStackTrace();}}/*** 使用 Pipeline 批量设置键值对** @param jedis Jedis 实例*/public static void batchSet(Jedis jedis) {System.out.println("开始批量设置键值对...");Pipeline pipeline = jedis.pipelined();int numCommands = 1000;for (int i = 0; i < numCommands; i++) {pipeline.set("key-" + i, "value-" + i);}// 执行所有命令pipeline.sync();System.out.println("批量设置完成,共设置 " + numCommands + " 个键值对。");}/*** 使用 Pipeline 批量获取键值对*/public static void batchGet(Jedis jedis) {System.out.println("开始批量获取键值对...");Pipeline pipeline = jedis.pipelined();int numCommands = 1000;List<Response<String>> responses = new ArrayList<>(numCommands);for (int i = 0; i < numCommands; i++) {Response<String> response = pipeline.get("key-" + i);responses.add(response);}// 执行所有命令pipeline.sync();// 处理结果for (int i = 0; i < numCommands; i++) {String value = responses.get(i).get();System.out.println("key-" + i + " = " + value);}System.out.println("批量获取完成,共获取 " + numCommands + " 个键值对。");}
}

代码解析

上面的代码主要总结为 4个步骤:

1、连接 Redis:

使用 Jedis 类连接 Redis 服务器。如果 Redis 服务器设置了密码,需要调用 jedis.auth 进行认证。

2、批量设置键值对:

  • 调用 jedis.pipelined() 获取一个 Pipeline 对象。
  • 使用循环将多个 set 命令添加到 Pipeline 中。
  • 调用 pipeline.sync() 发送所有命令并等待执行结果。
  • 通过 Pipeline 一次性提交所有命令,减少了网络往返次数。

3、批量获取键值对:

  • 同样使用 pipelines 获取 Pipeline 对象。
  • 使用 pipeline.get 方法批量添加 get 命令,并将 Response 对象保存到列表中。
  • 调用 pipeline.sync() 发送所有命令并等待执行结果。
  • 遍历 Response 对象列表,获取每个键的值。

4、关闭连接:

使用 try-with-resources 语法自动关闭 Jedis 连接,确保资源的正确释放。

3.2 使用 Lettuce 库
Lettuce 是一个基于 Netty 的可伸缩、多线程的 Redis 客户端,支持异步和反应式编程模型,同样支持 Pipeline 功能。

以下示例展示如何使用 Lettuce 实现 Pipeline 操作,包括批量设置和获取键值对。

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.api.sync.SyncCommands;
import io.lettuce.core.api.sync.RedisScriptingCommands;
import io.lettuce.core.api.sync.RedisClusterCommands;import java.util.ArrayList;
import java.util.List;publicclass LettucePipelineExample {public static void main(String[] args) {// Redis 连接参数String redisHost = "localhost";int redisPort = 6379;String redisPassword = null; // 若有密码,填写密码// 创建 RedisURIRedisURI redisURI = RedisURI.Builder.redis(redisHost).withPort(redisPort).withPassword(redisPassword != null ? redisPassword.toCharArray() : null).build();// 创建 RedisClientRedisClient redisClient = RedisClient.create(redisURI);// 建立连接try (StatefulRedisConnection<String, String> connection = redisClient.connect()) {RedisCommands<String, String> syncCommands = connection.sync();// 批量设置键值对batchSet(syncCommands);// 批量获取键值对batchGet(syncCommands);} catch (Exception e) {e.printStackTrace();} finally {// 关闭客户端redisClient.shutdown();}}/*** 使用 Lettuce 的 Pipeline 批量设置键值对** @param syncCommands 同步命令接口*/public static void batchSet(RedisCommands<String, String> syncCommands) {System.out.println("开始批量设置键值对...");int numCommands = 1000;for (int i = 0; i < numCommands; i++) {syncCommands.set("key-" + i, "value-" + i);}// 批量执行所有命令syncCommands.getStatefulConnection().flushCommands();System.out.println("批量设置完成,共设置 " + numCommands + " 个键值对。");}/*** 使用 Lettuce 的 Pipeline 批量获取键值对** @param syncCommands 同步命令接口*/public static void batchGet(RedisCommands<String, String> syncCommands) {System.out.println("开始批量获取键值对...");int numCommands = 1000;List<String> keys = new ArrayList<>(numCommands);for (int i = 0; i < numCommands; i++) {keys.add("key-" + i);}List<String> values = syncCommands.mget(keys.toArray(new String[0])).stream().map(res -> res.getValue()).toList();for (int i = 0; i < numCommands; i++) {System.out.println(keys.get(i) + " = " + values.get(i));}System.out.println("批量获取完成,共获取 " + numCommands + " 个键值对。");}
}

代码解析

上面的代码主要总结为 4个步骤:

1、连接 Redis:

使用 RedisClient 创建连接,RedisURI 封装了连接参数。如果 Redis 服务器设置了密码,需要在 RedisURI 中指定。

2、批量设置键值对:

  • 使用 syncCommands.set 方法批量添加 set 命令。
  • 调用 flushCommands() 方法将所有积累的命令一次性发送到服务器。

注:Lettuce 的 Pipeline 支持隐式的 Pipeline,即没有显式的 Pipeline API,通过积累命令并调用 flushCommands() 实现批量发送。

3、批量获取键值对:

使用 mget 方法一次性获取多个键的值,这是 Lettuce 提供的批量获取命令,天然支持 Pipeline。
mget 返回一个包含每个键值的 List,通过流处理提取值。

4、关闭连接:

使用 try-with-resources 语法自动关闭连接,最后调用 redisClient.shutdown() 关闭 Redis 客户端。

尽管 Lettuce 支持 Pipeline,但其 API 不如 Jedis 那样显式。要实现更细粒度的 Pipeline 控制,可以使用 Lettuce 的命令缓冲机制或异步 API。上述示例中展示的是同步方式,适用于简单的批量操作。

4、使用场景

  • 批量设置键值对: 将大量键值对一次性写入 Redis,适用于数据初始化或大规模更新。
  • 批量获取键值对: 在需要同时获取多个键的值时,通过 Pipeline 减少请求次数,提高效率。
  • 分布式计数器: 高并发情况下,使用 Pipeline 聚合多个计数操作,提升吞吐量。
  • 缓存预热: 在应用启动或重启时,通过 Pipeline 将常用数据加载到缓存中,提高应用启动性能。

http://www.ppmy.cn/ops/149917.html

相关文章

VTK知识学习(33)-交互问题2

1、前言 主要是针对前面有过实现不了交互的情况进行说明&#xff0c;经过一些尝试和分析调用API&#xff0c;总算实现RenderWindowControl函数回调正常串接&#xff0c;当然这个移动处理事件的效果目前也没有确认。 2、使用 vtkImageReslice reslice vtkImageReslice.New();p…

“深入浅出”系列之QT:(6)如何在一个项目中调用另一个项目

在Qt中&#xff0c;如果想在一个项目中调用另一个项目&#xff0c;这通常意味着想要在一个CMake构建的项目中集成或依赖另一个CMake构建的项目。 1‌.子模块或子目录方式‌&#xff1a; 如果另一个项目可以作为一个子模块或子目录包含在当前项目中&#xff0c;可以使用add_sub…

【Docker】入门教程

目录 一、Docker的安装 二、Docker的命令 Docker命令实验 1.下载镜像 2.启动容器 3.修改页面 4.保存镜像 5.分享社区 三、Docker存储 1.目录挂载 2.卷映射 四、Docker网络 1.容器间相互访问 2.Redis主从同步集群 3.启动MySQL 五、Docker Compose 1.命令式安装 …

【数据库】一、数据库系统概述

文章目录 一、数据库系统概述1 基本概念2 现实世界的信息化过程3 数据库系统内部体系结构4 数据库系统外部体系结构5 数据管理方式 一、数据库系统概述 1 基本概念 数据&#xff1a;描述事物的符号记录 数据库&#xff08;DB&#xff09;&#xff1a;长期存储在计算机内的、…

docker安装rabbit后访问报错最佳的几种解决方案

错误通常是由于RabbitMQ的安全配置导致的&#xff0c;RabbitMQ默认配置允许的用户仅能通过localhost访问。这通常出现在RabbitMQ的guest用户上&#xff0c;guest用户默认只能从localhost登录&#xff0c;而无法从其他IP地址进行远程访问。 解决方法&#xff1a; 1. **创建一个…

java 查询树结构数据,无限层级树结构通用方法

1、数据库表数据 2、controller层TestTree简单测试 RestController RequestMapping("/test") public class testTreeController {Autowiredprivate TestTreeService testTreeService;GetMapping("/list")public List<TestTree> List(TestTree tree)…

【Python】edge-tts :便捷语音合成

edge-tts 是一个功能强大的 Python 库&#xff0c;利用 Microsoft Azure 的云端文本到语音&#xff08;TTS&#xff09;服务&#xff0c;支持多种语言和声音选项&#xff0c;能够生成高质量、自然听感的语音输出。它支持多种音频格式&#xff0c;包括 MP3、WAV 和 OGG&#xff…

深入解析 C++ 类型转换

简介 C 类型转换是开发者必须掌握的重要技能之一, 无论是处理隐式转换还是显式转换, 理解其背后的机制与用法至关重要. 本篇博客旨在从基础到高级全面解析 C 的类型转换, 包括实际开发中的应用场景和性能分析. 自动转换 隐式类型转换 编译器可以在无需明确指示的情况下, 将一…