Redis 简单的消息队列

news/2024/12/22 15:17:38/

使用redis 进行简单的队列很容易,不需要使用较为复杂的MQ队列,直接使用redis 进行,不过唯一不足的需要自己构造生产者消费者,这里使用while True的方法进行消费者操作

目录

  • 介绍
  • 数据类型
    • String
    • Hash
  • 重要命令
  • 消息队列

介绍

key-value 存储系统,是跨平台的非关系型数据库。Redis 通常被称为数据结构服务器,因为值(value)可以是字符串(String)、哈希(Hash)、列表(list)、集合(sets)和有序集合(sorted sets)等类型。

常用于缓存、消息队列、会话存储等应用场景。

  • **性能极高:**Redis 以其极高的性能而著称,能够支持每秒数十万次的读写操作24。这使得Redis成为处理高并发请求的理想选择,尤其是在需要快速响应的场景中,如缓存、会话管理、排行榜等。
  • **丰富的数据类型:**Redis 不仅支持基本的键值存储,还提供了丰富的数据类型,包括字符串、列表、集合、哈希表、有序集合等。这些数据类型为开发者提供了灵活的数据操作能力,使得Redis可以适应各种不同的应用场景。
  • **原子性操作:**Redis 的所有操作都是原子性的,这意味着操作要么完全执行,要么完全不执行。这种特性对于确保数据的一致性和完整性至关重要,尤其是在高并发环境下处理事务时。
  • **持久化:**Redis 支持数据的持久化,可以将内存中的数据保存到磁盘中,以便在系统重启后恢复数据。这为 Redis 提供了数据安全性,确保数据不会因为系统故障而丢失。
  • **支持发布/订阅模式:**Redis 内置了发布/订阅模式(Pub/Sub),允许客户端之间通过消息传递进行通信。这使得 Redis 可以作为消息队列和实时数据传输的平台。
  • **单线程模型:**尽管 Redis 是单线程的,但它通过高效的事件驱动模型来处理并发请求,确保了高性能和低延迟。单线程模型也简化了并发控制的复杂性。
  • **主从复制:**Redis 支持主从复制,可以通过从节点来备份数据或分担读请求,提高数据的可用性和系统的伸缩性。
  • **应用场景广泛:**Redis 被广泛应用于各种场景,包括但不限于缓存系统、会话存储、排行榜、实时分析、地理空间数据索引等。
  • **社区支持:**Redis 拥有一个活跃的开发者社区,提供了大量的文档、教程和第三方库,这为开发者提供了强大的支持和丰富的资源。
  • **跨平台兼容性:**Redis 可以在多种操作系统上运行,包括 Linux、macOS 和 Windows,这使得它能够在不同的技术栈中灵活部署。
    • **支持 Lua 脚本:**Redis 支持使用 Lua 脚本来编写复杂的操作,这些脚本可以在服务器端执行,提供了更多的灵活性和强大的功能。

数据类型

Redis主要支持以下几种数据类型:

  • string(字符串):

    基本的数据存储单元,可以存储字符串、整数或者浮点数。

  • hash(哈希):

    一个键值对集合,可以存储多个字段。

  • list(列表):

    一个简单的列表,可以存储一系列的字符串元素。

  • set(集合):

    一个无序集合,可以存储不重复的字符串元素。

  • zset(sorted set:社群集合):

    相似集合,但是每个元素都有一个分数(score)关联。

  • 位图(Bitmaps):

    基于操作字符串类型,可以对每个位进行。

  • 超日志(HyperLogLogs):

    用于基本统计,可以提示集合中的唯一元素数量。

  • 地理空间(Geospatial):

    用于存储断层信息。

  • 发布/订阅(Pub/Sub):

    一种消息通信模式,允许客户端订阅消息通道,并接收发布到该通道的消息。

  • 流(Streams):

    用于消息队列和日志存储,支持消息的持久化和时间排序。

  • 模块(Modules):

    Redis支持动态加载模块,可以扩展Redis的功能。

String

string 是 redis 最基本的类型,string类型最大存储512MB

  • SET key value:设置键的值。
  • GET key:获取键的值。
  • INCR key:将键的值加 1。
  • DECR key:将键的值减 1。
  • APPEND key value:将值追加到键的值之后。
> set run "1"

Hash

Redis hash 是一个键值(key=>value)对集合,类似于一个小型的 NoSQL 数据库。

Redis hash 是一个 string 类型的 field 和 value 的映射表,hash 特别适合用于存储对象。

  • HSET key field value:设置哈希表中字段的值。
  • HGET key field:获取哈希表中字段的值。
  • HGETALL key:获取哈希表中所有字段和值。
  • HDEL key field:删除哈希表中的一个或多个字段。

最常用的还是string,其他用到了再说。

Redis支持多个数据库,并且每个数据库的数据是隔离的不能共享,并且基于单机才有,如果是集群就没有数据库的概念。

Redis是一个字典结构的存储服务器,而实际上一个Redis实例提供了多个用来存储数据的字典,客户端可以指定将数据存储在哪个字典中。这与我们熟知的在一个关系数据库实例中可以创建多个数据库类似,所以可以将其中的每个字典都理解成一个独立的数据库。

每个数据库对外都是一个从0开始的递增数字命名,Redis默认支持16个数据库(可以通过配置文件支持更多,无上限),可以通过配置databases来修改这一数字。客户端与Redis建立连接后会自动选择0号数据库,不过可以随时使用SELECT命令更换数据库

重要命令

命令执行后输出 (integer) 1,否则将输出 (integer) 0

set
del
get
keys * 
lrange queue 0 -1  从第一个元素 (0) 到最后一个元素 (-1)BLPOP key1 [key2 ] timeout
移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。

发布订阅 pub/sub 模式

在这里插入图片描述

需要开启两个redis-cli 客户端

第一个:

subscribe chat 
订阅该频道

第二个:

publish chat "redis-test"
往chat频道发消息

消息队列

使用redis 进行消息队列,关键就是有消费者、生产者的动作,这里以python一段代码进行启发:

#redis_aclient.py
from contextlib import asynccontextmanagerfrom django.conf import settings
from redis.asyncio import from_url as redis_from_urlclass RedisClient:def __init__(self):self._client = Nonedef connect(self):if self._client is None:# logger.info("Connecting to Redis...")self._client = redis_from_url(settings.REDIS_URL, decode_responses=True)@asynccontextmanagerasync def get_client(self):if self._client is None:self.connect()try:yield self._clientfinally:await self.close()async def close(self):if self._client:# logger.info("Shutting down Redis connection...")await self._client.aclose()# logger.info("Redis connection closed.")self._client = None# 单例模式 - 实例化 RedisClient
redis_client_manager = RedisClient()

调用生产者消费者的逻辑

import asyncio
import json
import osimport django
from asgiref.sync import sync_to_asyncasync def generate_by_queue(answer_id, answer_data):"""向 Redis 队列中推送任务"""task_data = {"answer_id": answer_id,"answer_data": answer_data}try:async with redis_client_manager.get_client() as redis_client:logger.info(f"Pushing task to queue answer_id: {answer_id}")await redis_client.rpush("ai_report_task_queue", json.dumps(task_data))except Exception as e:logger.info(f"Error pushing task to queue: {e}")async def process_tasks_by_ai_explain_answer():"""从 Redis 队列中获取任务并进行处理"""try:async with redis_client_manager.get_client() as redis_client:while True:try:# 阻塞直到有任务出现task = await redis_client.blpop("queue")if task:task_data = json.loads(task[1])  # 解析任务数据answer_id = task_data.get("answer_id")logger.info(f"Processing task answer_id {answer_id}")answer_data = task_data.get("answer_data")ai_explain = await generate_ai_report(answer_data)await update_answer_record(answer_id, ai_explain)logger.info(f"Task answer_id {answer_id} processed")except Exception as e:logger.info(f"Error processing task {e}")except asyncio.CancelledError:logger.info("Task processing cancelled.")except Exception as e:logger.info(f"Unexpected error: {e}")if __name__ == "__main__":try:asyncio.run(process_tasks_by_ai_explain_answer())except KeyboardInterrupt:asyncio.run(redis_client_manager.close())

http://www.ppmy.cn/news/1532722.html

相关文章

Llama微调以及Ollama部署

1 Llama微调 在基础模型的基础上,通过一些特定的数据集,将具有特定功能加在原有的模型上。 1.1 效果对比 特定数据集 未使用微调的基础模型的回答 使用微调后的回答 1.2 基础模型 基础大模型我选择Mistral-7B-v0.3-Chinese-Chat-uncensored&#x…

uni-app - - - - - 实现锚点定位和滚动监听功能(滚动监听功能暂未添加,待后续更新)

实现锚点定位和滚动监听功能 1. 思路解析2. 代码示例 效果截图示例: 点击左侧menu,右侧列表数据实现锚点定位 1. 思路解析 点击左侧按钮,更新右侧scroll-view对应的scroll-into-view的值,即可实现右侧锚点定位滚动右侧区域&am…

联通云 - 国产化全栈解决方案

中国联通国际有限公司产品之联通云 在数字化转型的时代浪潮中,中国联通国际有限公司凭借其强大的网络基础和技术创新能力,推出了旗下云服务品牌——联通云。联通云不仅承载着让数据安全无处不在的使命,同时也开启了智能云无限可能的新篇章。…

uniapp中h5环境添加console.log输出

<script src"https://unpkg.com/vconsolelatest/dist/vconsole.min.js"></script><script>// VConsole 默认会挂载到 window.VConsole 上window.vConsole new window.VConsole({defaultPlugins: [system, network, element, storage],// 可以在此…

Python将ONNX转为Json脚本

Python脚本 import onnx from onnx.shape_inference import infer_shapes import numpy as npfrom google.protobuf.json_format import MessageToJson, Parse import argparse import osdef convertToJson(onnx_model_path):onnx_model onnx.load(onnx_model_path)message …

排序个人总结

插入排序 思路&#xff1b;定义 i 和 j&#xff0c;默认 i 前面的数都是有序的&#xff0c;j 定义为 i 的前一个数&#xff0c;把 i 的值给tmp&#xff0c;tmp与j对应的值进行比较&#xff0c;如果arr[j] > tmp,将arr[j] (大的数前移一位)&#xff0c;如下图 代码&#xf…

Elasticsearch 开放推理 API 增加了对 Google AI Studio 的支持

作者&#xff1a;来自 Elastic Jeff Vestal 我们很高兴地宣布 Elasticsearch 的开放推理 API 支持 Gemini 开发者 API。使用 Google AI Studio 时&#xff0c;开发者现在可以与 Elasticsearch 索引中的数据进行聊天、运行实验并使用 Google Cloud 的模型&#xff08;例如 Gemin…

深度拆解:如何在Facebook上做跨境电商?

国内社交媒体正在逐渐兴盛&#xff0c;海外也不例外。在数字营销的新时代&#xff0c;Facebook已成为跨境电商不可或缺的平台之一。通过Facebook的巨大流量&#xff0c;卖家可以更好的触及潜在消费者&#xff0c;以实现销售增长。本文就深度拆解一下&#xff0c;卖家如何利用Fb…