尝试Redis发布-订阅模型

news/2024/11/27 17:52:25/

场景

我有程序,功能大概类似于一个程序进行生产数据,一个程序进行消费,起初我考虑到了各种MQ去解决这件事情,我们现有资源有Redis,引入MQ可能会导致资源,系统复杂性,实时性的一个问题,所以依然考虑使用Redis的发布-订阅模型来解决这问题。

发布-订阅模型

1. 订阅通道

订阅者使用SUBSCRIBE命令来订阅一个或多个通道。例如:

SUBSCRIBE channel1 channel2

2. 取消订阅通道

订阅者可以使用UNSUBSCRIBE命令来取消订阅一个或多个通道。例如:

UNSUBSCRIBE channel1 channel2

3. 接收消息

一旦订阅了通道,订阅者将开始接收发布者发送到这些通道的消息。消息将以异步方式传递给订阅者。

4.发布消息

发布消费到通道,发布到了之后订阅者就可用监听到这个消息了。

PUBLISH my_channel "Hello, subscribers!"

5.通配符的使用

Redis还支持通配符订阅,让订阅者可以使用通配符来匹配多个通道。通配符有两种:

:匹配一个通道名,例如 SUBSCRIBE news. 将订阅所有以 “news.” 开头的通道。

?:匹配一个通道名中的一个字符,例如 SUBSCRIBE news.?? 将订阅以 “news.” 开头且后面有两个字符的通道。

存在的问题

消息持久化和顺序问题

如果Redis挂壁了,那么消息也会丢失的,这个其实可用采用Redis Stream来解决这个问题。

发布者

我们采用c#语言来简单写一个demo

using StackExchange.Redis;
using System;class Publisher
{static void Main(){// 建立到Redis服务器的连接ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("your_redis_connection_string");IDatabase db = redis.GetDatabase();string streamName = "mystream";for (int i = 1; i <= 10; i++){string messageId = db.StreamAdd(streamName, new[]{new NameValueEntry("message", $"Message {i}")});Console.WriteLine($"Published: {messageId}");}// 关闭连接redis.Close();}
}

在上述示例中,我们使用Redis的StreamAdd方法将消息发布到名为 “mystream” 的流中。每条消息都有一个唯一的消息ID,消息将按照它们添加到流的顺序进行排序。

订阅者
using StackExchange.Redis;
using System;class Subscriber
{static void Main(){// 建立到Redis服务器的连接ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("your_redis_connection_string");IDatabase db = redis.GetDatabase();string streamName = "mystream";string consumerGroup = "mygroup";string consumerName = "mysubscriber";// 创建消费者组db.StreamCreateConsumerGroup(streamName, consumerGroup, "0");while (true){var messages = db.StreamReadGroup(streamName, consumerGroup, consumerName, "0", 10);foreach (var message in messages){Console.WriteLine($"Received: {message.Values[1]}");// 在这里处理消息// 可以实现消息确认等逻辑}}// 关闭连接redis.Close();}
}

在上述示例中,我们首先创建了一个消费者组,然后循环读取来自流 “mystream” 的消息。通过使用消费者组,我们可以确保每条消息只会被一个订阅者处理,并且即使订阅者离线一段时间,它也可以获取未处理的消息。

使用Redis Streams,消息将持久化存储在Redis中,即使Redis服务器重启,消息也不会丢失。这使得Redis Streams成为处理消息的可靠工具,适用于日志记录、事件处理和消息队列等应用。

安全性问题

Redis的消息其实是裸奔的。解决这个问题的核心在于可以在Redis上设置访问控制,只允许授权的发布者和订阅者连接到Redis。此外,可以考虑使用TLS/SSL来加密连接。

启用密码验证

首先,可以配置Redis以限制访问只允许授权的客户端连接。在Redis的配置文件中(通常是redis.conf),可以使用以下配置项来启用访问控制:

# 启用密码认证
requirepass your_password

也可以使用SSL加密协议来限制,这个要取得加密证书。

无法保证消息可靠性

这个问题事实上是无解的,接受不了就不要用,有人可能说发布消息之后,订阅者回电机制,这个事实上是伪的不能再伪的逻辑,我都收不到了,我甜蜜的怎么回电?还有人说存数据库,我就笑笑。

结束

我没有转语言,只是掌握的不只是一门语言,一个nice的IT工作者应该是不被语言限制的!


http://www.ppmy.cn/news/1121475.html

相关文章

Flink DataStream API

DataStream API是Flink的核心层API。一个Flink程序&#xff0c;其实就是对DataStream的各种转换。具体来说&#xff0c;代码基本上都由以下几部分构成&#xff1a; package com.atguigu.env;import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.a…

Java Web框架,如Spring MVC,是一种用于构建Web应用程序的软件框架:学生考试Web应用程序

文章目录 什么是Java Web框架&#xff1f;MVC模式在Spring MVC中的应用简单的学生考试Web应用程序设置Spring MVC项目创建实体类创建考试实体类创建控制器创建服务层创建数据库创建视图配置Spring MVC实现功能运行应用程序运行应用程序 &#x1f388;个人主页&#xff1a;程序员…

想要精通算法和SQL的成长之路 - 双指针【数组】

想要精通算法和SQL的成长之路 - 双指针【数组】 前言一. 合并两个有序数组二. 删除有序数组中的重复项 II 前言 想要精通算法和SQL的成长之路 - 系列导航 一. 合并两个有序数组 原题链接 抓住重点信息&#xff1a; 两个数组都是非递减顺序排列。num1数组&#xff0c;末尾包…

用《斗破苍穹》的视角打开C#多线程开发1(斗帝之路)

Thread.Start() 是的&#xff0c;我就是乌坦城那个斗之气三段的落魄少爷&#xff0c;在我捡到那个色眯眯的老爷爷后&#xff0c;斗气终于开始增长了。在各种软磨硬泡下&#xff0c;我终于学会了我人生中的第一个黄阶斗技——吸掌。 using System.Threading;namespace Framewo…

玩玩“小藤”开发者套件 Atlas 200I DK A2 之挂载 m2 硬盘

玩玩“小藤”开发者套件 Atlas 200I DK A2 之挂载 m2 硬盘 0. 背景1. 列出所有可用块设备的信息2. 格式化磁盘3. 创建 XFS 文件系统4. 挂载格式化的卷 0. 背景 总所周知&#xff0c;英伟达的GPU供不应求&#xff0c;还各种限制。华为推出了升腾AI可以提供AI算力&#xff0c;那…

【蓝桥杯选拔赛真题62】Scratch判断小球 少儿编程scratch图形化编程 蓝桥杯选拔赛真题解析

目录 scratch判断小球 一、题目要求 编程实现 二、案例分析 1、角色分析

Java笔记:看清类加载过程

1 类加载的过程 1.1 加载 “加载”是“类加载”(Class Loading)过程的第一步。这个加载过程主要就是靠类器实现的&#xff0c;包括用户自定义类加载器。 加载的过程 在加载的过程中&#xff0c;JVM主要做3件事情 1&#xff09;通过一个类的全限定名来获取定义此类的二进制字节…

PMP-项目收尾过程组的重要性

一、什么是项目收尾过程组 收尾过程组包括为正式完成或关闭项目、阶段或合同而开展的过程。本过程组旨在核实为完成项目或阶段所需的所有过程组的全部过程均已完成&#xff0c;并正式宣告项目或阶段关闭。本过程组的主要作用是&#xff0c;确保恰当地关闭阶段、项目和合同。虽然…