场景
我有程序,功能大概类似于一个程序进行生产数据,一个程序进行消费,起初我考虑到了各种MQ去解决这件事情,我们现有资源有Redis,引入MQ可能会导致资源,系统复杂性,实时性的一个问题,所以依然考虑使用Redis的发布-订阅模型来解决这问题。
发布-订阅模型
1. 订阅通道
订阅者使用SUBSCRIBE命令来订阅一个或多个通道。例如:
SUBSCRIBE channel1 channel2
2. 取消订阅通道
订阅者可以使用UNSUBSCRIBE命令来取消订阅一个或多个通道。例如:
UNSUBSCRIBE channel1 channel2
3. 接收消息
一旦订阅了通道,订阅者将开始接收发布者发送到这些通道的消息。消息将以异步方式传递给订阅者。
4.发布消息
发布消费到通道,发布到了之后订阅者就可用监听到这个消息了。
PUBLISH my_channel "Hello, subscribers!"
5.通配符的使用
Redis还支持通配符订阅,让订阅者可以使用通配符来匹配多个通道。通配符有两种:
:匹配一个通道名,例如 SUBSCRIBE news. 将订阅所有以 “news.” 开头的通道。
?:匹配一个通道名中的一个字符,例如 SUBSCRIBE news.?? 将订阅以 “news.” 开头且后面有两个字符的通道。
存在的问题
消息持久化和顺序问题
如果Redis挂壁了,那么消息也会丢失的,这个其实可用采用Redis Stream来解决这个问题。
发布者
我们采用c#语言来简单写一个demo
using StackExchange.Redis;
using System;class Publisher
{static void Main(){// 建立到Redis服务器的连接ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("your_redis_connection_string");IDatabase db = redis.GetDatabase();string streamName = "mystream";for (int i = 1; i <= 10; i++){string messageId = db.StreamAdd(streamName, new[]{new NameValueEntry("message", $"Message {i}")});Console.WriteLine($"Published: {messageId}");}// 关闭连接redis.Close();}
}
在上述示例中,我们使用Redis的StreamAdd方法将消息发布到名为 “mystream” 的流中。每条消息都有一个唯一的消息ID,消息将按照它们添加到流的顺序进行排序。
订阅者
using StackExchange.Redis;
using System;class Subscriber
{static void Main(){// 建立到Redis服务器的连接ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("your_redis_connection_string");IDatabase db = redis.GetDatabase();string streamName = "mystream";string consumerGroup = "mygroup";string consumerName = "mysubscriber";// 创建消费者组db.StreamCreateConsumerGroup(streamName, consumerGroup, "0");while (true){var messages = db.StreamReadGroup(streamName, consumerGroup, consumerName, "0", 10);foreach (var message in messages){Console.WriteLine($"Received: {message.Values[1]}");// 在这里处理消息// 可以实现消息确认等逻辑}}// 关闭连接redis.Close();}
}
在上述示例中,我们首先创建了一个消费者组,然后循环读取来自流 “mystream” 的消息。通过使用消费者组,我们可以确保每条消息只会被一个订阅者处理,并且即使订阅者离线一段时间,它也可以获取未处理的消息。
使用Redis Streams,消息将持久化存储在Redis中,即使Redis服务器重启,消息也不会丢失。这使得Redis Streams成为处理消息的可靠工具,适用于日志记录、事件处理和消息队列等应用。
安全性问题
Redis的消息其实是裸奔的。解决这个问题的核心在于可以在Redis上设置访问控制,只允许授权的发布者和订阅者连接到Redis。此外,可以考虑使用TLS/SSL来加密连接。
启用密码验证
首先,可以配置Redis以限制访问只允许授权的客户端连接。在Redis的配置文件中(通常是redis.conf),可以使用以下配置项来启用访问控制:
# 启用密码认证
requirepass your_password
也可以使用SSL加密协议来限制,这个要取得加密证书。
无法保证消息可靠性
这个问题事实上是无解的,接受不了就不要用,有人可能说发布消息之后,订阅者回电机制,这个事实上是伪的不能再伪的逻辑,我都收不到了,我甜蜜的怎么回电?还有人说存数据库,我就笑笑。
结束
我没有转语言,只是掌握的不只是一门语言,一个nice的IT工作者应该是不被语言限制的!