源码地址:
https://gitee.com/liuhaizhang/django2-configuring-websocket
https://gitee.com/liuhaizhang/django2-configuring-websocket
python3.9.0
django==2.2.1
channels==2.2.0
项目结构:
test_websocket_django2
-chat
-home
-test_websocket_django2
-manage.py
一、配置settings.py
INSTALLED_APPS = ['django.contrib.admin','django.contrib.auth','django.contrib.contenttypes','django.contrib.sessions','django.contrib.messages','django.contrib.staticfiles','channels', #1、注册channels'chat.apps.ChatConfig','home.apps.HomeConfig',]WSGI_APPLICATION = 'test_websocket_django2.wsgi.application'
#2、新增ASGI应用
ASGI_APPLICATION = 'test_websocket_django2.asgi.application'
二、chat应用:所有websocket都放在这里
新建
routings.py : 存放websocket路由
from django.urls import path
from . import consumers# 这个变量是存放websocket的路由
websocket_urlpatterns = [path('chat/socket/', consumers.ChatView),
]
consumers.py :写websocket的类
from channels.generic.websocket import WebsocketConsumer
from channels.exceptions import StopConsumer
from asgiref.sync import async_to_sync
import timeclass ChatView(WebsocketConsumer):def websocket_connect(self, message):# 客户端与服务端进行握手时,会触发这个方法# 服务端允许客户端进行连接,就是握手成功self.accept()def websocket_receive(self, message):# 接收到客户端发送的数据recv = message.get('text')print('接收到的数据,', recv)if recv == 'close':# 服务的主动断开连接print('服务器断开连接')self.close()else:# 客户端向服务端发送数据self.send(f'我收到了,{time.strftime("%Y-%m-%d %H:%M:%S")}')def websocket_disconnect(self, message):# 客户端端口连接时,会触发该方法,断开连接print('客户端断开连接')raise StopConsumer()
三、在settings.py同级目录下创建asgi.py
写入内容:
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routings #导入websocket的路由application = ProtocolTypeRouter({'websocket': AuthMiddlewareStack(URLRouter(chat.routings.websocket_urlpatterns, #把websocket的路由注册进去)),
})
四、启动项目,测试
1、启动
python manage.py runsever
启动显示,应该是这样:ASGI/Channels
2、测试
测试网站:
EasySwoole-WebSocket在线测试工具