springboot实现webocket长连接(四)
demo下载地址:多种websocket实现方式,其中有基于spring-websocekt,也有基于netty框架,即下即用。
之前的博客使用了spring-websocket实现了websocket服务端,现在我们利用netty框架实现,更灵活,更性能。在一些复杂场景下,可以通过调整参数提高效率。
之前的实现可以参考:
springboot实现webocket(一)
springboot实现webocket(二)
首先定义一个netty的server端,用于启动端口。
说明一下,netty需要占用一个端口,如果你的项目也提供了web服务,两者端口不能一样。
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 网状服务器** @author lukou* @date 2023/05/17*/
public class NettyServer {private static final Logger log = LoggerFactory.getLogger(NettyServer.class);private int port;private Channel channel;private EventLoopGroup bossGroup;private EventLoopGroup workGroup;private ChannelInitializer<SocketChannel> channelInitializer;public NettyServer(int port, ChannelInitializer<SocketChannel> channelInitializer) {this.port = port;this.channelInitializer = channelInitializer;bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();}/*** 开始** @throws Exception 异常*/public void start() throws Exception {try {ServerBootstrap sb = new ServerBootstrap();//绑定线程池sb.group(bossGroup, workGroup)//指定使用的channel.channel(NioServerSocketChannel.class)//临时存放已完成三次握手的请求的队列的最大长度.option(ChannelOption.SO_BACKLOG, 1024)//禁用nagle算法,不等待,立即发送.childOption(ChannelOption.TCP_NODELAY, true)//当没有数据包过来时超过一定时间主动发送一个ack探测包.childOption(ChannelOption.SO_KEEPALIVE, true)//允许共用端口.childOption(ChannelOption.SO_REUSEADDR, true)//绑定监听端口.localAddress(this.port)//添加自定义处理器.childHandler(this.channelInitializer);//服务器异步创建绑定ChannelFuture cf = sb.bind().sync();channel = cf.channel();log.info("netty服务启动。。正在监听:[{}]", channel.localAddress());//关闭服务器通道channel.closeFuture().sync();} catch (Exception e) {throw new Exception("启动netty服务发生异常,端口号:" + this.port, e);}}/*** 摧毁** @throws Exception 异常*/public void destroy() throws Exception {try {channel.close().sync();workGroup.shutdownGracefully().sync();bossGroup.shutdownGracefully().sync();} catch (Exception e) {throw new Exception("停止netty服务发生异常,端口号:" + this.port, e);}}}
接下来,需要实现业务处理逻辑的类,首先定义一个抽象类,将一些公共逻辑放到里面
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;import static io.netty.handler.codec.http.HttpMethod.GET;/*** 基本套接字服务器* 抽象了一层.<br>** @author lukou* @date 2023/05/17*/
public abstract class BaseSocketServer extends SimpleChannelInboundHandler<Object> {private static final Logger log = LoggerFactory.getLogger(BaseSocketServer.class);/**websocket协议内容*/public static final String WEBSOCKET = "websocket";public static final String UPGRADE = "Upgrade";/*** 客户端连接地址*/public static final String ENDPOINT = "/example4/ws";/*** 连接唯一id,方便链路追踪*/protected String taskId;/*** 上下文*/protected ChannelHandlerContext context;/*** websocket握手处理器*/private WebSocketServerHandshaker webSocketServerHandshaker;/*** 通道活性* 客户端与服务端创建链接的时候调用.<br>** @param context 上下文*/@Overridepublic abstract void channelActive(ChannelHandlerContext context);/*** 频道不活跃* 客户端与服务端断开连接的时候调用.<br>** @param context 上下文*/@Overridepublic abstract void channelInactive(ChannelHandlerContext context);/*** 通道读完整* 服务端接收客户端发送过来的数据结束之后调用.<br>** @param context 上下文*/@Overridepublic void channelReadComplete(ChannelHandlerContext context) {context.flush();}/*** 例外了* 工程出现异常的时候调用.<br>** @param context 上下文* @param throwable throwable*/@Overridepublic void exceptionCaught(ChannelHandlerContext context, Throwable throwable) {context.close();log.info("taskId:[{}]中发生错误,原因:[{}]", this.taskId, throwable.toString(), throwable);}/*** 通道read0* 连接和帧信息.<br>** @param ctx ctx* @param msg 味精*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {if (msg instanceof WebSocketFrame) {this.handWebSocketFrame(ctx, (WebSocketFrame) msg);return;}if (msg instanceof FullHttpRequest) {log.info("taskId:[{}]开始处理websocket握手请求。。", taskId);this.httpRequestHandler(ctx, (FullHttpRequest) msg);log.info("taskId:[{}]处理websocket握手请求结束。。", taskId);}}/*** 用户事件触发* 这里设置了一个读超时事件,可以参考{@link Example4WebSocketChannelHandler}中设置** @param ctx ctx* @param evt evt*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {ctx.close();log.info("taskId:[{}]读操作超时。。断开连接。。", this.taskId);}}}/*** 处理客户端与服务端之间的websocket业务.<br>** @param context 上下文* @param webSocketFrame 网络套接字框架*/public void handWebSocketFrame(ChannelHandlerContext context, WebSocketFrame webSocketFrame) {//判断是否是关闭websocket的指令if (webSocketFrame instanceof CloseWebSocketFrame) {webSocketServerHandshaker.close(context.channel(), (CloseWebSocketFrame) webSocketFrame.retain());log.info("taskId:[{}]接收到关闭帧。。断开连接。。", this.taskId);return;}//判断是否是ping消息if (webSocketFrame instanceof PingWebSocketFrame) {context.channel().write(new PongWebSocketFrame(webSocketFrame.content().retain()));log.info("taskId:[{}]接收到心跳帧。。", this.taskId);return;}//判断是否是二进制消息if (webSocketFrame instanceof TextWebSocketFrame) {this.handTextWebSocketFrame(context, webSocketFrame);}}/*** http请求处理程序* http握手请求校验.<br>** @param context 上下文* @param fullHttpRequest 完整http请求*/private void httpRequestHandler(ChannelHandlerContext context, FullHttpRequest fullHttpRequest) {//判断是否http握手请求if (!fullHttpRequest.decoderResult().isSuccess() || !(WEBSOCKET.equals(fullHttpRequest.headers().get(UPGRADE)))|| !GET.equals(fullHttpRequest.method())) {sendHttpResponse(context, new DefaultFullHttpResponse(fullHttpRequest.protocolVersion(), HttpResponseStatus.BAD_REQUEST));log.error("taskId:{{}}websocket握手内容不正确。。响应并关闭。。", taskId);return;}String uri = fullHttpRequest.uri();log.info("taskId:{{}}websocket握手uri[{}]", taskId, uri);if (!ENDPOINT.equals(getBasePath(uri))) {sendHttpResponse(context, new DefaultFullHttpResponse(fullHttpRequest.protocolVersion(), HttpResponseStatus.NOT_FOUND));log.info("taskId:[{}]websocket握手协议不正确。。响应并关闭。。", taskId);return;}WebSocketServerHandshakerFactory webSocketServerHandshakerFactory = new WebSocketServerHandshakerFactory("", null, false);webSocketServerHandshaker = webSocketServerHandshakerFactory.newHandshaker(fullHttpRequest);if (webSocketServerHandshaker == null) {WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(context.channel());log.info("taskId:[{}]websocket握手协议版本不正确。。响应并关闭。。", taskId);return;}webSocketServerHandshaker.handshake(context.channel(), fullHttpRequest);this.checkOpenInfo(context, fullHttpRequest);}/*** 得到基本路径** @param url url* @return {@link String}*/public static String getBasePath(String url) {if (StringUtils.isEmpty(url)) {return null;}int idx = url.indexOf("?");if (idx == -1) {return url;}return url.substring(0, idx);}/*** 发送http响应* 服务端发送响应消息.<br>** @param context 上下文* @param defaultFullHttpResponse 默认完整http响应*/private void sendHttpResponse(ChannelHandlerContext context, DefaultFullHttpResponse defaultFullHttpResponse) {if (defaultFullHttpResponse.status().code() != 200) {ByteBuf buf = Unpooled.copiedBuffer(defaultFullHttpResponse.status().toString(), CharsetUtil.UTF_8);defaultFullHttpResponse.content().writeBytes(buf);buf.release();}//服务端向客户端发送数据ChannelFuture future = context.channel().writeAndFlush(defaultFullHttpResponse);if (defaultFullHttpResponse.status().code() != 200) {future.addListener(ChannelFutureListener.CLOSE);}}/*** 回复消息给客户端.<br>** @param message 消息* @return {@link ChannelFuture}*/protected ChannelFuture reply( String message) {ChannelFuture channelFuture = context.writeAndFlush(new TextWebSocketFrame(message));log.info("taskId:[{}]回复给客户端消息完成:[{}]", this.taskId, message);return channelFuture;}/*** 检查打开信息* 检验连接打开时的信息.<br>** @param context 上下文* @param fullHttpRequest 完整http请求*/protected abstract void checkOpenInfo(ChannelHandlerContext context, FullHttpRequest fullHttpRequest);/*** 手文本框架网络套接字* 文本帧处理.<br>** @param context 上下文* @param webSocketFrame 网络套接字框架*/protected abstract void handTextWebSocketFrame(ChannelHandlerContext context, WebSocketFrame webSocketFrame);}
实例化
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import java.util.UUID;/*** 服务段实例化** @author lukou* @date 2023/05/17*/
@Component
public class MyWebSocketServer extends BaseSocketServer {private static final Logger log = LoggerFactory.getLogger(MyWebSocketServer.class);@Overridepublic void channelActive(ChannelHandlerContext context) {this.taskId = UUID.randomUUID().toString().replaceAll("-", "");this.context = context;log.info("taskId:[{}]有一个新请求进来了。。开始初始化上下文。。。", this.taskId);}@Overridepublic void channelInactive(ChannelHandlerContext context) {log.info("taskId:[{}]识别服务触发关闭事件.", this.taskId);// 这边可以收尾处理}@Overrideprotected void checkOpenInfo(ChannelHandlerContext context, FullHttpRequest fullHttpRequest) {log.info("taskId:[{}]识别服务中websocket握手协议正确。。开始校验其它。。", this.taskId);}@Overrideprotected void handTextWebSocketFrame(ChannelHandlerContext context, WebSocketFrame webSocketFrame) {String text = ((TextWebSocketFrame) webSocketFrame).text();this.reply(this.taskId + " : " + text + System.currentTimeMillis());}
}
之后,将业务处理层绑定到netty的channel上
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;import java.util.concurrent.TimeUnit;/*** example4网络套接字通道处理程序** @author lukou* @date 2023/05/17*/
public class Example4WebSocketChannelHandler extends ChannelInitializer<SocketChannel> {private static final EventExecutorGroup EVENT_EXECUTOR_GROUP = new DefaultEventExecutorGroup(100);@Overrideprotected void initChannel(SocketChannel ch) {// 设置30秒没有读到数据,则触发一个READER_IDLE事件。ch.pipeline().addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));// websocket协议本身就是基于http协议的,所以这边也要使用http编解码器ch.pipeline().addLast(new HttpServerCodec());// 以块的方式来写处理器ch.pipeline().addLast(new ChunkedWriteHandler());// netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度ch.pipeline().addLast(new HttpObjectAggregator(8192));// 在管道中添加我们自己的接收数据实现方法ch.pipeline().addLast(EVENT_EXECUTOR_GROUP, new MyWebSocketServer());ch.pipeline().addLast(new WebSocketServerProtocolHandler(BaseSocketServer.ENDPOINT, null, true, 65536 * 10));}}
之后,就是真正的使用了,这里是选择项目一启动就执行netty服务端,并注入到容器中(这里看个人选择,不一定非要注入到spring中,直接new也一样)。
import com.example.wsdemo.websocketserver.example4.netty.Example4WebSocketChannelHandler;
import com.example.wsdemo.websocketserver.example4.netty.NettyServer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ServerConfig {@Value("${netty.websocket.port:8081}")private int port;@Bean("example4WebSocketChannelHandler")public Example4WebSocketChannelHandler example4WebSocketChannelHandler() {return new Example4WebSocketChannelHandler();}@Bean("nettyServer")public NettyServer nettyServer(Example4WebSocketChannelHandler example4WebSocketChannelHandler) {return new NettyServer(this.port, example4WebSocketChannelHandler);}
}
import com.example.wsdemo.websocketserver.example4.netty.NettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;
import javax.annotation.Resource;/*** example4网络套接字服务端启动初始化** @author lukou* @date 2023/05/17*/
@Component
public class Example4WebSocketStartInit implements CommandLineRunner {private static final Logger log = LoggerFactory.getLogger(Example4WebSocketStartInit.class);@Resourceprivate NettyServer nettyServer;/*** 需要异步启动,不然会阻塞主线程* 这里自定义一个线程启动,也可以在方法上加上注解@Async,一样的效果** @param args arg游戏*/@Overridepublic void run(String... args) {new Thread(() -> {try {nettyServer.start();} catch (Exception e) {log.error("识别服务中netty服务启动报错!", e);}}).start();}@PreDestroypublic void destroy() {if (nettyServer != null) {try {nettyServer.destroy();} catch (Exception e) {log.error("停止netty服务发生异常!", e);}}log.info("netty识别服务已经销毁。。");}
}
启动测试,出现如下就代表服务启动成功了。
. ____ _ __ _ _/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \\\/ ___)| |_)| | | | | || (_| | ) ) ) )' |____| .__|_| |_|_| |_\__, | / / / /=========|_|==============|___/=/_/_/_/:: Spring Boot :: (v2.3.5.RELEASE)2023-05-18 14:36:06.423 INFO 3624 --- [ main] c.e.w.w.WebsocketServerApplication : Starting WebsocketServerApplication on qianpeng with PID 3624 (D:\projects\websocket-max\websocket-server\target\classes started by 钱鹏 in D:\projects\websocket-max)
2023-05-18 14:36:06.425 INFO 3624 --- [ main] c.e.w.w.WebsocketServerApplication : No active profile set, falling back to default profiles: default
2023-05-18 14:36:06.957 INFO 3624 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9000 (http)
2023-05-18 14:36:06.963 INFO 3624 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2023-05-18 14:36:06.963 INFO 3624 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.39]
2023-05-18 14:36:07.012 INFO 3624 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2023-05-18 14:36:07.012 INFO 3624 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 560 ms
2023-05-18 14:36:07.078 INFO 3624 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'clientInboundChannelExecutor'
2023-05-18 14:36:07.079 INFO 3624 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'clientOutboundChannelExecutor'
2023-05-18 14:36:07.164 INFO 3624 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'defaultSockJsTaskScheduler'
2023-05-18 14:36:07.181 INFO 3624 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'messageBrokerTaskScheduler'
2023-05-18 14:36:07.186 INFO 3624 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'brokerChannelExecutor'
2023-05-18 14:36:07.319 INFO 3624 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 9000 (http) with context path ''
2023-05-18 14:36:07.320 INFO 3624 --- [ main] o.s.m.s.b.SimpleBrokerMessageHandler : Starting...
2023-05-18 14:36:07.320 INFO 3624 --- [ main] o.s.m.s.b.SimpleBrokerMessageHandler : BrokerAvailabilityEvent[available=true, SimpleBrokerMessageHandler [DefaultSubscriptionRegistry[cache[0 destination(s)], registry[0 sessions]]]]
2023-05-18 14:36:07.320 INFO 3624 --- [ main] o.s.m.s.b.SimpleBrokerMessageHandler : Started.
2023-05-18 14:36:07.325 INFO 3624 --- [ main] c.e.w.w.WebsocketServerApplication : Started WebsocketServerApplication in 1.11 seconds (JVM running for 1.619)
2023-05-18 14:36:07.679 INFO 3624 --- [ Thread-4] c.e.w.w.example4.netty.NettyServer : netty服务启动。。正在监听:[/0:0:0:0:0:0:0:0:8081]
websocket的访问地址为:ws://localhost:8081/example4/ws
端口不再是项目的端口了。
另外,启动nettyserver的时候需要另起一个线程,不能直接在主线程中,不然会阻塞在那边的。
欢迎指正!