由浅入深Netty代码调优

news/2024/11/25 17:22:39/

目录

  • 1. 优化
    • 1.1 扩展序列化算法
  • 2 参数调优
    • 2.1 CONNECT_TIMEOUT_MILLIS
    • 2.2 SO_BACKLOG
    • 2.3 ulimit -n
    • 2.4 TCP_NODELAY
    • 2.5 SO_SNDBUF & SO_RCVBUF
    • 2.6 ALLOCATOR
    • 2.7 RCVBUF_ALLOCATOR


1. 优化

在这里插入图片描述

1.1 扩展序列化算法

序列化,反序列化主要用在消息正文的转换上

  • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
  • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下

// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();

为了支持更多序列化算法,抽象一个 Serializer 接口

public interface Serializer {// 反序列化方法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化方法<T> byte[] serialize(T object);}

提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中

enum SerializerAlgorithm implements Serializer {// Java 实现Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));Object object = in.readObject();return (T) object;} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream out = new ByteArrayOutputStream();new ObjectOutputStream(out).writeObject(object);return out.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);}}}, // Json 实现(引入了 Gson 依赖)Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}@Overridepublic <T> byte[] serialize(T object) {return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);}};// 需要从协议的字节中得到是哪种序列化算法public static SerializerAlgorithm getByInt(int type) {SerializerAlgorithm[] array = SerializerAlgorithm.values();if (type < 0 || type > array.length - 1) {throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");}return array[type];}
}

增加配置类和配置文件

public abstract class Config {static Properties properties;static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}public static int getServerPort() {String value = properties.getProperty("server.port");if(value == null) {return 8080;} else {return Integer.parseInt(value);}}public static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if(value == null) {return Serializer.Algorithm.Java;} else {return Serializer.Algorithm.valueOf(value);}}
}

配置文件

serializer.algorithm=Json

修改编解码器

/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerAlgorithm = in.readByte(); // 0 或 1byte messageType = in.readByte(); // 0,1,2...int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);// 找到反序列化算法Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];// 确定具体消息类型Class<? extends Message> messageClass = Message.getMessageClass(messageType);Message message = algorithm.deserialize(messageClass, bytes);
//        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
//        log.debug("{}", message);out.add(message);}
}

其中确定具体消息类型,可以根据 消息类型字节 获取到对应的 消息 class

@Data
public abstract class Message implements Serializable {/*** 根据消息类型字节,获得对应的消息 class* @param messageType 消息类型字节* @return 消息 class*/public static Class<? extends Message> getMessageClass(int messageType) {return messageClasses.get(messageType);}private int sequenceId;private int messageType;public abstract int getMessageType();public static final int LoginRequestMessage = 0;public static final int LoginResponseMessage = 1;public static final int ChatRequestMessage = 2;public static final int ChatResponseMessage = 3;public static final int GroupCreateRequestMessage = 4;public static final int GroupCreateResponseMessage = 5;public static final int GroupJoinRequestMessage = 6;public static final int GroupJoinResponseMessage = 7;public static final int GroupQuitRequestMessage = 8;public static final int GroupQuitResponseMessage = 9;public static final int GroupChatRequestMessage = 10;public static final int GroupChatResponseMessage = 11;public static final int GroupMembersRequestMessage = 12;public static final int GroupMembersResponseMessage = 13;public static final int PingMessage = 14;public static final int PongMessage = 15;private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();static {messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);}
}

2 参数调优

2.1 CONNECT_TIMEOUT_MILLIS

  • 属于 SocketChannal 参数

  • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常

  • SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间

@Slf4j
public class TestConnectionTimeout {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300).channel(NioSocketChannel.class).handler(new LoggingHandler());ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);future.sync().channel().closeFuture().sync(); // 断点1} catch (Exception e) {e.printStackTrace();log.debug("timeout");} finally {group.shutdownGracefully();}}
}

另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect

@Override
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {// ...// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() {                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}// ...
}

2.2 SO_BACKLOG

  • 属于 ServerSocketChannal 参数
client server syns queue accept queue bind() listen() connect() 1. SYN SYN_SEND put SYN_RCVD 2. SYN + ACK ESTABLISHED 3. ACK put ESTABLISHED accept() client server syns queue accept queue
  1. 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
  2. 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
  3. 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue

其中

  • 在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制

  • sync queue - 半连接队列

    • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
  • accept queue - 全连接队列

    • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
    • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client

netty 中

可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小

可以通过下面源码查看默认大小

public class DefaultServerSocketChannelConfig extends DefaultChannelConfigimplements ServerSocketChannelConfig {private volatile int backlog = NetUtil.SOMAXCONN;// ...
}

课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey

oio 中更容易说明,不用 debug 模式

public class Server {public static void main(String[] args) throws IOException {ServerSocket ss = new ServerSocket(8888, 2);Socket accept = ss.accept();System.out.println(accept);System.in.read();}
}

客户端启动 4 个

public class Client {public static void main(String[] args) throws IOException {try {Socket s = new Socket();System.out.println(new Date()+" connecting...");s.connect(new InetSocketAddress("localhost", 8888),1000);System.out.println(new Date()+" connected...");s.getOutputStream().write(1);System.in.read();} catch (IOException e) {System.out.println(new Date()+" connecting timeout...");e.printStackTrace();}}
}

第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中

Tue Apr 21 20:30:28 CST 2020 connecting...
Tue Apr 21 20:30:28 CST 2020 connected...

第 4 个客户端连接时

Tue Apr 21 20:53:58 CST 2020 connecting...
Tue Apr 21 20:53:59 CST 2020 connecting timeout...
java.net.SocketTimeoutException: connect timed out

2.3 ulimit -n

  • 属于操作系统参数

2.4 TCP_NODELAY

  • 属于 SocketChannal 参数

2.5 SO_SNDBUF & SO_RCVBUF

  • SO_SNDBUF 属于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)

2.6 ALLOCATOR

  • 属于 SocketChannal 参数
  • 用来分配 ByteBuf, ctx.alloc()

2.7 RCVBUF_ALLOCATOR

  • 属于 SocketChannal 参数
  • 控制 netty 接收缓冲区大小
  • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定

http://www.ppmy.cn/news/82493.html

相关文章

Flink有状态计算的状态容错

状态容错 State Fault Tolerance 首先来说一说状态容错。Flink 支持有状态的计算&#xff0c;可以把数据流的结果一直维持在内存&#xff08;或 disk&#xff09;中&#xff0c;比如累加一个点击数&#xff0c;如果某一时刻计算程序挂掉了&#xff0c;如何保证下次重启的时候&…

CMake常用命令总结与练习

CMake常用命令总结 前言cmake_minimum_required (VERSION XX):CMake最低版本project (demo)&#xff1a;CMake工程名add_executable(main main.c):生成可执行文件aux_source_directory(dir var)&#xff1a;指定源文件放入变量set(val src):指定源文件放入变量include_director…

【AUTOSAR】【以太网】SomeIpTp

目录 一、概述 二、限制与约束 三、功能说明 3.1 SOME/IP帧头 3.1.1 消息类型字段

【设计模式】习题选择

设计模式习题 统一建模语言基础知识 1、在UML提供的图中&#xff0c;&#xff08; &#xff09;用于描述系统与外部系统及用户之间的交互&#xff1b; A、用例图 B、类图 C、对象图 D、部署图 2【单选题】 在UML提供的图中&#xff0c;&#xff08; &#xff09;用于按时间顺序…

测试计划编写说明

第1章 引言 1.1目的 简述本计划的目的,旨在说明各种测试阶段任务、人员分配和时间安排、工作规范等。 测试计划在策略和方法的高度说明如何计划、组织和管理测试项目。测试计划包含足够的信息使测试人员明白项目需要做什么是如何运作的。另外,清晰的文档结构能使任何一个读…

阿里云数据库独有功能

RDS MySQL读写分离如何确保数据读取的时效性 阿里云内部网络会确保同步日志在主实例和只读实例间的实时传输&#xff0c;正常情况下只读实例不会有延迟产生。但受限于MySQL本身的复制机制&#xff0c;若同步日志的应用时间较久&#xff0c;会产生数据同步的延迟&#xff0c;这个…

A small program development similar to the WeChat small program type

A WeChat Mini Program is indeed different from developing a small-scale traditional software program. Let’s look at the steps involved in developing a WeChat Mini Program: Account Setup: To develop a WeChat Mini Program, you’ll first need to sign up for…

sql注入详解+docker靶场搭建举例

本文将详细介绍 SQL 注入攻击的概念、原理、危害以及防御措施&#xff0c;包括常见的 SQL 注入攻击方式&#xff0c;并实际演示 SQL 注入攻击的过程&#xff0c;帮助读者深入了解 SQL 注入攻击并学会防范和应对。 一、SQL 注入攻击概述 1.1 什么是 SQL 注入攻击&#xff1f; …