基于Netty实现TCP通信

news/2024/10/25 19:33:46/

创建一个Maven项目添加下面依赖

    <dependencies><!-- 日志依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.32</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><optional>true</optional></dependency><dependency><groupId>org.codehaus.jackson</groupId><artifactId>jackson-mapper-asl</artifactId><version>1.9.13</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.84.Final</version></dependency></dependencies>

编码解码器

package com.example.nettydemo.coder;import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;import java.nio.charset.StandardCharsets;public class NettyEncoder extends MessageToByteEncoder<String> {public NettyEncoder() {}@Overrideprotected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {byte[] byteMsg = msg.getBytes(StandardCharsets.UTF_8);int msgLength = byteMsg.length;ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 + byteMsg.length);buf.writeInt(msgLength);buf.writeBytes(byteMsg, 0, msgLength);out.writeBytes(buf);buf.release();}
}
package com.example.nettydemo.coder;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;
import java.util.List;@Slf4j
public class NettyDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int beginReader = in.readerIndex();int dataLength = in.readInt();if (in.readableBytes() < dataLength) {in.readerIndex(beginReader);} else {byte[] data = new byte[dataLength];in.readBytes(data);String str = new String(data, 0, dataLength, StandardCharsets.UTF_8);out.add(str);}}
}

服务端

package com.example.nettydemo.server;import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.ObjectMapper;import java.io.IOException;
import java.util.Map;@Slf4j
public class TcpServer {private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private ServerBootstrap server;private ChannelFuture channelFuture;private Integer port;public TcpServer(Integer port) {this.port = port;// nio连接处理池this.bossGroup = new NioEventLoopGroup();// 处理事件池this.workerGroup = new NioEventLoopGroup();server = new ServerBootstrap();server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 自定义处理类ch.pipeline().addLast(new NettyDecoder());ch.pipeline().addLast(new NettyEncoder());ch.pipeline().addLast(new TcpServerHandler());}});server.option(ChannelOption.SO_BACKLOG, 128);server.childOption(ChannelOption.SO_KEEPALIVE, true);}public synchronized void startListen() {try {// 绑定到指定端口channelFuture = server.bind(port).sync();log.info("netty服务器在[{}]端口启动监听", port);} catch (Exception e) {log.error("netty服务器在[{}]端口启动监听失败", port);e.printStackTrace();}}public void sendMessageToClient(String clientIp, Object msg) {Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);Channel channel = channelMap.get(clientIp);String sendStr;try {sendStr = OBJECT_MAPPER.writeValueAsString(msg);} catch (JsonGenerationException e) {throw new RuntimeException(e);} catch (IOException e) {throw new RuntimeException(e);}try {log.info("向客户端 {} 发送消息内容:{}", clientIp, sendStr);channel.writeAndFlush(sendStr);} catch (Exception var4) {log.error("向客户端 {} 发送消息失败,消息内容:{}", clientIp, sendStr);throw new RuntimeException(var4);}}public void pushMessageToClients(Object msg) {Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);if (channelMap != null && !channelMap.isEmpty()) {channelMap.forEach((k, v) -> sendMessageToClient(k, msg));}}
}
package com.example.nettydemo.server;import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler<String> {/*** 用跳表存储连接channel*/public static Map<Integer, Map<String, Channel>> channelSkipMap = new ConcurrentSkipListMap<>();@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("应用程序的监听通道异常!");cause.printStackTrace();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();// 获取每个用户端连接的ipInetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();String clientIp = ipSocket.getAddress().getHostAddress();InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();// 本地端口做键int localPort = localSocket.getPort();Map<String, Channel> channelMap = channelSkipMap.get(localPort);if (channelMap == null || channelMap.isEmpty()) {channelMap = new HashMap<>(4);}channelMap.put(clientIp, channel);channelSkipMap.put(localPort, channelMap);log.info("应用程序添加监听通道,与客户端:{} 建立连接成功!", clientIp);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {// 获取每个用户端连接的ipChannel channel = ctx.channel();InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();int localPort = localSocket.getPort();InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();String clientIp = ipSocket.getAddress().getHostAddress();Map<String, Channel> channelMap = channelSkipMap.get(localPort);channelMap.remove(clientIp);log.info("应用程序移除监听通道,与客户端:{} 断开连接!", clientIp);}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {Channel channel = channelHandlerContext.channel();// 获取每个用户端连接的ipInetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();log.info("接收到客户端: {} 应用数据:{}", ipSocket, msg);}
}
package com.example.nettydemo.server;public class ServerTest {public static void main(String[] args) {TcpServer tcpServer = new TcpServer(40001);tcpServer.startListen();while (true) {try {// 每5秒向客户端发送一次 "test-朱上林123"Thread.sleep(5000);tcpServer.pushMessageToClients("test-朱上林123");} catch (Exception e) {e.printStackTrace();}}}
}

客户端

package com.example.nettydemo.client;import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;import java.io.IOException;@Slf4j
public class TcpClient {private EventLoopGroup group;private ChannelFuture channelFuture;private final String ip;private final Integer port;private final ObjectMapper objectMapper = new ObjectMapper();public TcpClient(String ip, Integer port) {this.ip = ip;this.port = port;}/*** 建立连接**/public synchronized void connectServer() {log.info("开始建立连接,ip:{}, port:{}", ip, port);// 生命nio连接池this.group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();// 配置解码器以及消息处理类b.group(this.group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyEncoder());pipeline.addLast(new NettyDecoder());pipeline.addLast(new TcpClientHandler());}});// 开始连接this.channelFuture = b.connect(ip, port).sync();} catch (Exception var4) {log.error("连接建立失败,ip:{}, port:{}", ip, port);this.group.shutdownGracefully();var4.printStackTrace();}}/*** 关闭连接*/public void close() {this.group.shutdownGracefully();}/*** 发送消息** @param msg*/public synchronized void sendCommonMsg(Object msg) {String sendStr;if (!getConnectStatus()) {connectServer();}try {sendStr = objectMapper.writeValueAsString(msg);} catch (JsonMappingException e) {throw new RuntimeException(e);} catch (JsonGenerationException e) {throw new RuntimeException(e);} catch (IOException e) {throw new RuntimeException(e);}try {log.info("发送消息内容:{}", sendStr);this.channelFuture.channel().writeAndFlush(sendStr);} catch (Exception var4) {log.error("发送消息失败,消息内容:{}", sendStr);throw new RuntimeException(var4);}}/*** 获取当前连接状态*/public Boolean getConnectStatus() {return group != null && !group.isShutdown() && !group.isShuttingDown();}
}
package com.example.nettydemo.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<String> {/*** 读取事件** @param channelHandlerContext* @param msg*/@Overridepublic void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {log.info("服务返回消息 :{}", msg);}/*** 发生异常** @param channelHandlerContext* @param throwable*/@Overridepublic void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) {log.error("通信发生异常:" + throwable.getMessage());channelHandlerContext.close();}
}
package com.example.nettydemo.client;public class TcpClientTest {public static void main(String[] args) {TcpClient tcpClient = new TcpClient("127.0.0.1", 40001);// 客户端连接到服务器后,向服务器发送一条消息:tcpClient.connectServer();tcpClient.sendCommonMsg("我是Client,刚刚是我连接到你的!");}
}

启动服务端和客户端实现通信

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下课!


http://www.ppmy.cn/news/1249849.html

相关文章

开源免费跨平台数据同步工具-Syncthing

Syncthing是一款开源免费跨平台的文件同步工具&#xff0c;是基于P2P技术实现设备间的文件同步&#xff0c;所以它的同步是去中心化的&#xff0c;即你并不需要一个服务器&#xff0c;故不需要担心这个中心的服务器给你带来的种种限制&#xff0c;而且类似于torrent协议&#x…

WEB渗透—反序列化(六)

Web渗透—反序列化 课程学习分享&#xff08;课程非本人制作&#xff0c;仅提供学习分享&#xff09; 靶场下载地址&#xff1a;GitHub - mcc0624/php_ser_Class: php反序列化靶场课程&#xff0c;基于课程制作的靶场 课程地址&#xff1a;PHP反序列化漏洞学习_哔哩哔_…

Android帝国之进程杀手--lmkd

本文概要 这是Android系统启动的第三篇文章&#xff0c;本文以自述的方式来讲解lmkd进程&#xff0c;通过本文您将了解到lmkd进程在安卓系统中存在的意义&#xff0c;以及它是如何杀进程的。&#xff08;文中的代码是基于android13&#xff09; 我是谁 init&#xff1a;“大…

QT配合CSS隐藏按钮

第一种方法 在Qt的CSS样式表中&#xff0c;使用 visibility 属性来隐藏按钮。设置 visibility 为 hidden 不可见&#xff0c;而设置为 visible 则可见。 隐藏所有 QPushButton QPushButton {visibility: hidden; }隐藏特定的按钮&#xff0c;用按钮的名称或样式类进行定位就…

解码 SQL:深入探索 Antlr4 语法解析器背后的奥秘

探寻SQL的背后机制 前言 在数据领域&#xff0c;SQL&#xff08;Structured Query Language&#xff09;是一门广泛使用的语言&#xff0c;用于查询和处理数据。你可能已经使用过诸如MySQL、Hive、ClickHouse、Doris、Spark和Flink等工具来编写SQL查询。 每一种框架都提供了…

第五届全国高校计算机能力挑战赛-程序设计挑战赛(C语言模拟题)

1、已有定义“int a[10]{1,2},i0;”&#xff0c;下面语句中与“ a[i]a[i1],i;”等价的是()。 A. a[i]a[i1]; B. a[i]a[i]; C. a[i]a[i1]; D. i,a[i-1]a[i]; 2、两次运行下面的程序&#xff0c;如果从键盘上分别输入6和4&#xff0c;则输出结果是(&#xff09;。 A. 7和5 …

指向脚本时报错: ./install-oatpp-modules.sh: 17: Syntax error: “(“ unexpected

报错: ./install-oatpp-modules.sh: 17: Syntax error: “(” unexpected 解决办法 将 #!/bin/sh改为 #!/bin/bash或 将 #!/bin/bash改为 #!/bin/sh报错原因 The function keyword is supported in many modern shells, including Bash, which is an extended version …

CE认证关于电动滑板车安全标准EN17128和电动自行车EN15194电磁兼容测试解析

本标准适用于有或没有自平衡系统的全部或部分由自给式电源供电的个人轻型电动汽车&#xff0c;除无人值守站值守站租用的电动汽车外。自平衡系统完全或部分由最高100VDC电池电压的独立电源供电&#xff0c;并配备或无输入电压高达240VAC的集成电池充电器。该标准规定了与个人轻…