Django websocket 进行实时通信(消费者)

news/2024/11/29 7:07:20/

1. settings.py 增加

ASGI_APPLICATION = "django_template_v1.routing.application"CHANNEL_LAYERS = {"default": {# This example apps uses the Redis channel layer implementation channels_redis"BACKEND": "channels_redis.core.RedisChannelLayer","CONFIG": {"hosts": ["{}0".format(REDIS_URL)],},},
}

2. 跟settings.py同级目录下,添加routing.py文件

from django.urls import path, re_path
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from apps.message_manage.consumers import ChatConsumer, NotificationChatConsumer# QueryAuthMiddlewareStack、AuthMiddlewareStackapplication = ProtocolTypeRouter({"websocket": URLRouter([re_path(r'^ws/chat/(?P<recipient>\w+)/$', ChatConsumer),# re_path(r'^ws/chatting/(?P<recipient>\w+)/$', NotificationChatConsumer),re_path(r'^ws/chatting/(?P<recipient>\w+)/(?P<platform_key>\w+)/$', NotificationChatConsumer),]),
})

3. 跟settings.py同级目录下,添加asgi.py文件

"""
ASGI config for django_template_v1 project.It exposes the ASGI callable as a module-level variable named ``application``.For more information on this file, see
https://docs.djangoproject.com/en/3.2/howto/deployment/asgi/
"""import os
import django
from channels.routing import get_default_applicationos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_template_v1.settings')
django.setup()
application = get_default_application()

4. 编写消费者,添加consumers.py文件(不是和settings.py一个目录了)

# -*- coding: utf-8 -*-
"""
-------------------------------------------------File Name:      consumersDescription:Author:          Administratordate:           2018/6/6
-------------------------------------------------Change Activity:2018/6/6:
-------------------------------------------------
"""
import json
import logging
import aioredis
import asyncio
from channels.generic.websocket import AsyncWebsocketConsumer
from django_template_v1.settings import REDIS_URLlogger = logging.getLogger("websocket")class ChatConsumer(AsyncWebsocketConsumer):async def connect(self):self.recipient = self.scope['url_route']['kwargs']['recipient']logger.info(f"websocket建立连接成功,连接的用户={self.recipient}")self.room_group_name = f'{self.recipient}'# Join room groupawait self.channel_layer.group_add(self.room_group_name,self.channel_name)await self.accept()# 执行定时任务self.periodic_task = asyncio.create_task(self.send_periodic_message())# 获取并发送历史未读消息# unread_notifications = NotificationRecord.objects.filter(recipient=self.recipient, is_read=False)# for notification in unread_notifications:#     await self.send(text_data=json.dumps({#         'message': notification.subject,#         'is_unread': True,#         'recipient': self.recipient,#         "receive_time": notification.receive_time.strftime('%Y-%m-%d %H:%M:%S')#     }))#     notification.is_read = True#     notification.save()async def disconnect(self, close_code):# Leave room groupprint(f"disconnect websocket")if hasattr(self, 'periodic_task') and not self.periodic_task.done():self.periodic_task.cancel()await self.channel_layer.group_discard(self.room_group_name,self.channel_name)await super().disconnect(close_code)# Receive message from WebSocketasync def receive(self, text_data):text_data_json = json.loads(text_data)message = text_data_json['message']print(f"Received message: {message}")# Send message to room groupawait self.channel_layer.group_send(self.room_group_name,{'type': 'chat_message','message': message})async def send_periodic_message(self):"""Periodically sends a message to the client."""while True:await asyncio.sleep(10)  # Sleep for a minuteawait self.send(text_data=json.dumps({'message': f'每隔一分钟的消息: {self.recipient}'}))async def chat_message(self, event):"""Handler for messages sent through channel layer."""message = '测试聊天室:' + event['message']# Send message to WebSocketawait self.send(text_data=json.dumps({'message': message}))class NotificationChatConsumer(AsyncWebsocketConsumer):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.redis_conn = Noneself.online_users_set_name = Noneself.room_group_name = Noneasync def connect(self):self.recipient = self.scope['url_route']['kwargs']['recipient']self.platform_key = self.scope['url_route']['kwargs']['platform_key']logger.info(f"WebSocket建立连接成功, recipient: {self.recipient}, platform_key: {self.platform_key}")# 使用aioredis创建异步连接self.redis_conn = await aioredis.from_url("{}12".format(REDIS_URL))logger.info(f"使用aioredis 进行redis连接")# 构建特定于平台的在线用户集合名称self.online_users_set_name = f'online_users_{self.platform_key}'# 异步添加recipient到特定于平台的online_users集合await self.redis_conn.sadd(self.online_users_set_name, self.recipient)logger.info(f"websocket 添加recipient到{self.online_users_set_name}集合")self.room_group_name = f'{self.recipient}_{self.platform_key}'# 加入room组await self.channel_layer.group_add(self.room_group_name,self.channel_name)await self.accept()async def disconnect(self, close_code):logger.info(f"disconnect WebSocket, close_code: {close_code}")if self.redis_conn and self.recipient:logger.info(f"websocket disconnect,从{self.online_users_set_name}集合移除{self.recipient}")await self.redis_conn.srem(self.online_users_set_name, self.recipient)# 离开room组await self.channel_layer.group_discard(self.room_group_name,self.channel_name)# 关闭Redis连接if self.redis_conn:self.redis_conn.close()await self.redis_conn.wait_closed()# Receive message from WebSocketasync def receive(self, text_data):text_data_json = json.loads(text_data)message = text_data_json['message']logger.info(f"Received message: {message}")# Send message to room groupawait self.channel_layer.group_send(self.room_group_name,{'type': 'chat_message','message': message})async def notification_message(self, event):logger.info(f"notification_message: {event}")# 直接发送event作为消息内容await self.send(text_data=json.dumps(event))

5. 进行实时发送,添加task.py文件

from channels.layers import get_channel_layer
from asgiref.sync import async_to_syncchannel_layer = get_channel_layer()# 发送websocket消息
def send_ws_msg(group_name, data):async_to_sync(channel_layer.group_send)(group_name,{'type': 'notification_message', 'message': data})returndef test_send(account, platform_key, message_dict):send_ws_msg(f"{account}_{platform_key}", message_dict)

http://www.ppmy.cn/news/1550842.html

相关文章

代码随想录算法训练营第六十天|Day60 图论

Bellman_ford 队列优化算法&#xff08;又名SPFA&#xff09; https://www.programmercarl.com/kamacoder/0094.%E5%9F%8E%E5%B8%82%E9%97%B4%E8%B4%A7%E7%89%A9%E8%BF%90%E8%BE%93I-SPFA.html 本题我们来系统讲解 Bellman_ford 队列优化算法 &#xff0c;也叫SPFA算法&#xf…

Zookeeper学习心得

本人学zookeeper时按照此文路线学的 Zookeeper学习大纲 - 似懂非懂视为不懂 - 博客园 一、Zookeeper安装 ZooKeeper 入门教程 - Java陈序员 - 博客园 Docker安装Zookeeper教程&#xff08;超详细&#xff09;_docker 安装zk-CSDN博客 二、 zookeeper的数据模型 ZooKeepe…

免费下载 | 2024年中国网络安全产业分析报告

《2024年中国网络安全产业分析报告》由中国网络安全产业联盟&#xff08;CCIA&#xff09;发布&#xff0c;主要内容包括&#xff1a; 前言&#xff1a;强调网络安全是国家安全的重要组成部分&#xff0c;概述了中国在网络安全治理方面的进展和挑战。 网络安全产业发展形势&am…

【linux】(21)进程端口排查-fuser

fuser 是一个用于显示进程使用的文件、套接字或端口的 Linux 命令。它可以帮助诊断某个文件、目录、端口或设备被哪个进程占用。 基本语法 fuser [选项] 文件或端口常用选项 选项说明-a显示所有指定文件或端口的进程信息。-k杀死占用指定文件或端口的进程。-i在杀死进程前询问…

机器学习-决策树(ID3算法及详细计算推导过程)

决策树是一种基于树结构进行决策的机器学习算法 &#xff0c;以下是关于它的详细介绍&#xff1a; 1.基本原理 决策树通过一系列的条件判断对样本进行分类或预测数值。它从根节点开始&#xff0c;根据不同的属性值逐步将样本划分到不同的分支&#xff0c;直到到达叶节点&…

【C++】简单数据类型详解

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;字符型&#xff08;char&#xff09;1.1 ASCII 码表 &#x1f4af;整型&#xff08;int&#xff09;2.1 整型的分类2.2 有符号和无符号整型2.3 跨平台差异2.4 整型数据类型…

Android获取状态栏、导航栏的高度

Android获取状态栏的高度&#xff1a; 方法一&#xff1a;通过资源名称获取&#xff0c; getDimensionPixelSize&#xff0c;获取系统中"status_bar_height"的值&#xff0c;方法如下&#xff1a; Java&#xff1a; public static int getStatusBarHeight(Context…

【操作文档】mysql分区操作步骤.docx

1、建立分区表 执行 tb_intercept_notice表-重建-添加分区.sql 文件&#xff1b; DROP TABLE IF EXISTS tb_intercept_notice_20241101_new; CREATE TABLE tb_intercept_notice_20241101_new (id char(32) NOT NULL COMMENT id,number varchar(30) NOT NULL COMMENT 号码,cre…