Netty实战(十三)

news/2024/10/21 3:51:40/

WebSocket协议(一)

  • 一、什么是WebSocket 协议
  • 二、简单的 WebSocket 程序示例
    • 2.1 程序逻辑
    • 2.2 添加 WebSocket 支持
    • 2.3 处理 HTTP 请求
    • 2.4 处理 WebSocket 帧

一、什么是WebSocket 协议

WebSocket 协议是完全重新设计的协议,旨在为 Web 上的双向数据传输问题提供一个切实可行的解决方案,使得客户端和服务器之间可以在任意时刻传输消息,因此,这也就要求它们异步地处理消息回执。

二、简单的 WebSocket 程序示例

2.1 程序逻辑

我们先设计一个基于浏览器的聊天程序来更好的理解WebSocket ,它的逻辑如下:

(1)客户端发送一个消息;
(2)该消息将被广播到所有其他连接的客户端。

像这样:
在这里插入图片描述

2.2 添加 WebSocket 支持

在从标准的HTTP或者HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此,使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。这个升级动作发生的确切时刻特定于应用程序;它可能会发生在启动时,也可能会发生在请求了某个特定的URL之后。

我们先这样设定:如果被请求的 URL 以/ws 结尾,那么我们把该协议升级为 WebSocket;否则,服务器将使用基本的 HTTP/S。在连接已经升级完成之后,所有数据都将会使用 WebSocket 进行传输。

在这里插入图片描述

2.3 处理 HTTP 请求

上一节我们说使用WebSocket的应用程序将始终以HTTP/S作为开始,然后再执行升级。所以我们应该先实现处理 HTTP 请求的组件。让它提供用于访问聊天室并显示由连接的客户端发送的消息的网页。

import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;import java.io.File;
import java.io.RandomAccessFile;
import java.net.URISyntaxException;
import java.net.URL;/*** Author: lhd* Data: 2023/6/12* Annotate: http请求处理*/
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //扩展 SimpleChannelInboundHandler 以处理 FullHttpRequest 消息private final String wsUri;private static final File INDEX;static {URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();try {String path = location.toURI() + "index.html";path = !path.contains("file:") ? path : path.substring(5);INDEX = new File(path);} catch (URISyntaxException e) {throw new IllegalStateException("Unable to locate index.html", e);}}public HttpRequestHandler(String wsUri) {this.wsUri = wsUri;}@Overridepublic void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {//如果请求了 WebSocket协议升级,则增加引用计数(调用 retain()方法),并将它传递给下一个ChannelInboundHandlerif (wsUri.equalsIgnoreCase(request.uri())) {ctx.fireChannelRead(request.retain());} else {//处理 100 Continue   请求以符合 HTTP 1.1 规范if (HttpUtil.is100ContinueExpected(request)) {send100Continue(ctx);}//读取 index.htmlRandomAccessFile file = new RandomAccessFile(INDEX, "r");HttpResponse response = new DefaultHttpResponse(request.protocolVersion(), HttpResponseStatus.OK);response.headers().set(HttpHeaderNames .CONTENT_TYPE, "text/plain; charset=UTF-8");boolean keepAlive = HttpUtil.isKeepAlive(request);//如果请求了keep-alive,则添加所需要的 HTTP头信息if (keepAlive) {response.headers().set(HttpHeaderNames.CONTENT_LENGTH, file.length());response.headers().set(HttpHeaderNames .CONNECTION, HttpHeaderValues.KEEP_ALIVE);}//将 HttpResponse写到客户端ctx.write(response);if (ctx.pipeline().get(SslHandler.class) == null) {//将 index.html 写到客户端ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));} else {ctx.write(new ChunkedNioFile(file.getChannel()));}//写 LastHttpContent 并冲刷至客户端ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);if (!keepAlive) {//如果没有请求 keep-alive,则在写操作完成后关闭 Channelfuture.addListener(ChannelFutureListener.CLOSE);}}}private static void send100Continue(ChannelHandlerContext ctx) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);ctx.writeAndFlush(response);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

我们简单的说一下上面代码的流程:

(1) 如果该 HTTP 请求指向了地址为/ws 的 URI,那么 HttpRequestHandler 将调用 FullHttpRequest 对象上的 retain()方法,并通过调用 fireChannelRead(msg)方法将它转发给下一个 ChannelInboundHandler 。之所以需要调用 retain()方法,是因为调用 channelRead()方法完成之后,它将调用 FullHttpRequest 对象上的 release()方法以释放它的资源。

(2)如果客户端发送了 HTTP 1.1 的 HTTP 头信息 Expect: 100-continue,那么 HttpRequestHandler 将会发送一个 100 Continue 响应。在该 HTTP 头信息被设置之后,HttpRequestHandler 将会写回一个 HttpResponse 给客户端。这不是一个 FullHttpResponse,因为它只是响应的第一个部分。此外,这里也不会调writeAndFlush()方法,在结束的时候才会调用。

(3)如果不需要加密和压缩,那么可以通过将 index.html 的内容存储到 DefaultFileRegion 中来达到最佳效率。这将会利用零拷贝特性来进行内容的传输。为此,可以检查一下,是否有 SslHandler 存在于在ChannelPipeline 中。否则,可以使用 ChunkedNioFile。

(4)HttpRequestHandler 将写一个 LastHttpContent 来标记响应的结束。如果没有请求 keep-alive ,那么HttpRequestHandler 将会添加一个 ChannelFutureListener到最后一次写出动作的 ChannelFuture,并关闭该连接。在这里,将调用 writeAndFlush()方法以冲刷所有之前写入的消息。

这部分代码纯粹的是 HTTP 请求和响应,下面我们来处理 实际传输聊天内容的 WebSocket 帧。

WEBSOCKET 帧: WebSocket 以帧的方式传输数据,每一帧代表消息的一部分。一个完整的消息可能会包含许多帧。

2.4 处理 WebSocket 帧

IETF 发布的 WebSocket RFC,定义了 6 种帧,Netty 为它们每种都提供了一个 POJO 实现。

类 型描 述
BinaryWebSocketFrame包含了二进制数据
TextWebSocketFrame包含了文本数据
ContinuationWebSocketFrame包含属于上一个BinaryWebSocketFrame或TextWebSocketFrame 的文本数据或者二进制数据
CloseWebSocketFrame表示一个 CLOSE 请求,包含一个关闭的状态码和关闭的原因
PingWebSocketFrame请求传输一个 PongWebSocketFrame
PongWebSocketFrame作为一个对于 PingWebSocketFrame 的响应被发送

我们的聊天应用程序将使用下面几种帧类型:

  • CloseWebSocketFrame;
  • PingWebSocketFrame;
  • PongWebSocketFrame;
  • TextWebSocketFrame。

TextWebSocketFrame 是我们唯一真正需要处理的帧类型。为了符合 WebSocket RFC,Netty 提供了 WebSocketServerProtocolHandler 来处理其他类型的帧。

下面我们来处理 TextWebSocketFrame 的 ChannelInboundHandler,再将它的 ChannelGroup 中跟踪所有活动的 WebSocket 连接:

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;/*** Author: lhd* Data: 2023/6/12* Annotate: 处理文本帧*/
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //扩展 SimpleChannelInboundHandler, 并处理 TextWebSocketFrame 消息private final ChannelGroup group;public TextWebSocketFrameHandler(ChannelGroup group) {this.group = group;}//重写 userEventTriggered() 方法以处理自定义事件@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {//如果该事件表示握手成功,则从该Channelipeline中移除 HttpRequestHandler,因为将不会接收到任何 HTTP 消息了ctx.pipeline().remove(HttpRequestHandler.class);//通知所有已经连接的WebSocket 客户端新的客户端已经连接上了group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));//将新的 WebSocket Channel添加到 ChannelGroup 中,以便它可以接收到所有的消息group.add(ctx.channel());} else {super.userEventTriggered(ctx, evt);}}@Overridepublic void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//增加消息的引用计数,并将它写到 ChannelGroup 中所有已经连接的客户端group.writeAndFlush(msg.retain());}
}

简单的说一下:

(1)TextWebSocketFrameHandler 只有少量的责任。当和新客户端的 WebSocket握手成功完成之后 ,它将通过把通知消息写到 ChannelGroup 中的所有 Channel 来通知所有已经连接的客户端,然后它将把这个新 Channel 加入到该 ChannelGroup 中 。

(2)如果接收到了 TextWebSocketFrame 消息 ,TextWebSocketFrameHandler 将调用TextWebSocketFrame 消息上的 retain()方法,并使用 writeAndFlush()方法来将它传输给 ChannelGroup,以便所有已经连接的 WebSocket Channel 都将接收到它。

(3)然后调用 retain()方法,因为当 channelRead0()方法返回时,TextWebSocketFrame 的引用计数将会被减少。由于所有的操作都是异步的,因此,writeAndFlush()方法可能会在 channelRead0()方法返回之后完成,而且它绝对不能访问一个已经失效的引用。

(4)因为 Netty 在内部处理了大部分剩下的功能,所以现在剩下唯一需要做的事情就是为每个新创建的 Channel 初始化其 ChannelPipeline。所以我们需要一个 ChannelInitializer。


http://www.ppmy.cn/news/351257.html

相关文章

常用数学符号读音表(中英双语)

下表整理了数学中常见的希腊字母符号&#xff0c; 序号大写小写英文注音国际音标注音中文注音1Ααalphaa:lf阿尔法2Ββbetabet贝塔3Γγgammaga:m伽马4Δδdeltadelt德尔塔5Εεepsilonepsilon伊普西龙6Ζζzetazat截塔7Ηηetaeit艾塔8Θθthetθit西塔9Ιιiotaiot约塔10Κ…

科学计算机的英文怎么拼读,科学的英语读音,科学的英文怎么读谐音。

科学用英语怎么拼读 科学 science, scientific knowledge 博物馆 museum 邮局 post office 医院 hospital 书店 bookstore 电影院 cinema; the movies; a movie house科学的英语单词是science。英式读法是[ˈsʌɪəns]&#xff1b;美式读法是[ˈsaɪəns]。作名词时意思是科学…

【Python】集合 set ① ( 集合定义 | 集合特点 | 代码示例 - 集合定义 )

文章目录 一、集合特点二、集合定义三、代码示例 - 集合定义 一、集合特点 在之前 的博客中 介绍了 列表 / 元组 / 字符串 数据容器 , 列表 支持 定义后 , 增加元素 / 修改元素 / 删除元素 , 并且 列表中可以存储 重复 / 有序 的元素 ;元组 定义后 不能 进行 增加元素 / 修改元…

python查单词音标_有没有通过读音或音标就能查出英语单词的办法,比如发音查词软件?...

展开全部 英语发音软件可以通过读音或音标就能查出英语单词。 英语发音软件带一个独e68a84e8a2ad3231313335323631343130323136353331333431373239特的音标词典&#xff0c;可以根据发音查单词&#xff0c;哪怕音标记不太清楚也能查到。在听到某个单词想查的时候&#xff0c;可…

Netty实战(十四)

WebSocket协议&#xff08;二&#xff09; 一、初始化 ChannelPipeline二、引导三、加密 一、初始化 ChannelPipeline 我们之前说过为了将 ChannelHandler 安装到 ChannelPipeline 中&#xff0c;需要扩展了ChannelInitializer&#xff0c;并实现 initChannel()方法。 下面我…

解决git提交时候出现的错误提示“modified:xxxxx (modified content, untracked content)“方法

今天来分享一个关于自己在使用git从本地仓库提交至远程仓库时候遇到的一个错误。话不多说&#xff0c;先来看一下这个错误提示&#xff1a;“modified:xxxxx (modified content, untracked content)”。这个错误提示我&#xff0c;xxxxx里面有未跟踪且已修改的内容&#xff0c;…

【C++】optional 用法

返回值可接受为空&#xff0c;用以表示状态失败 举个栗子&#xff0c; #include <iostream> #include <optional>using namespace std;class User{string name;optional<string> nickName;optional<int> age;public:User(const string& name,opti…

大数据分析平台释疑专用帖

大数据分析平台是大数据时代&#xff0c;企业数字化运营决策的一大关键平台&#xff0c;但初次接触大数据分析平台时&#xff0c;大家必然是问题多多的&#xff0c;本文将就几个比较常见的问题进行总结回答&#xff0c;希望对正在了解、选型大数据分析平台的大家有所帮助。 首…