目录
- 1. 优化
- 1.1 扩展序列化算法
- 2 参数调优
- 2.1 CONNECT_TIMEOUT_MILLIS
- 2.2 SO_BACKLOG
- 2.3 ulimit -n
- 2.4 TCP_NODELAY
- 2.5 SO_SNDBUF & SO_RCVBUF
- 2.6 ALLOCATOR
- 2.7 RCVBUF_ALLOCATOR
1. 优化
1.1 扩展序列化算法
序列化,反序列化主要用在消息正文的转换上
- 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])
- 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理
目前的代码仅支持 Java 自带的序列化,反序列化机制,核心代码如下
// 反序列化
byte[] body = new byte[bodyLength];
byteByf.readBytes(body);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(body));
Message message = (Message) in.readObject();
message.setSequenceId(sequenceId);// 序列化
ByteArrayOutputStream out = new ByteArrayOutputStream();
new ObjectOutputStream(out).writeObject(message);
byte[] bytes = out.toByteArray();
为了支持更多序列化算法,抽象一个 Serializer 接口
public interface Serializer {// 反序列化方法<T> T deserialize(Class<T> clazz, byte[] bytes);// 序列化方法<T> byte[] serialize(T object);}
提供两个实现,我这里直接将实现加入了枚举类 Serializer.Algorithm 中
enum SerializerAlgorithm implements Serializer {// Java 实现Java {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {try {ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(bytes));Object object = in.readObject();return (T) object;} catch (IOException | ClassNotFoundException e) {throw new RuntimeException("SerializerAlgorithm.Java 反序列化错误", e);}}@Overridepublic <T> byte[] serialize(T object) {try {ByteArrayOutputStream out = new ByteArrayOutputStream();new ObjectOutputStream(out).writeObject(object);return out.toByteArray();} catch (IOException e) {throw new RuntimeException("SerializerAlgorithm.Java 序列化错误", e);}}}, // Json 实现(引入了 Gson 依赖)Json {@Overridepublic <T> T deserialize(Class<T> clazz, byte[] bytes) {return new Gson().fromJson(new String(bytes, StandardCharsets.UTF_8), clazz);}@Overridepublic <T> byte[] serialize(T object) {return new Gson().toJson(object).getBytes(StandardCharsets.UTF_8);}};// 需要从协议的字节中得到是哪种序列化算法public static SerializerAlgorithm getByInt(int type) {SerializerAlgorithm[] array = SerializerAlgorithm.values();if (type < 0 || type > array.length - 1) {throw new IllegalArgumentException("超过 SerializerAlgorithm 范围");}return array[type];}
}
增加配置类和配置文件
public abstract class Config {static Properties properties;static {try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {properties = new Properties();properties.load(in);} catch (IOException e) {throw new ExceptionInInitializerError(e);}}public static int getServerPort() {String value = properties.getProperty("server.port");if(value == null) {return 8080;} else {return Integer.parseInt(value);}}public static Serializer.Algorithm getSerializerAlgorithm() {String value = properties.getProperty("serializer.algorithm");if(value == null) {return Serializer.Algorithm.Java;} else {return Serializer.Algorithm.valueOf(value);}}
}
配置文件
serializer.algorithm=Json
修改编解码器
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overridepublic void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(Config.getSerializerAlgorithm().ordinal());// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerAlgorithm = in.readByte(); // 0 或 1byte messageType = in.readByte(); // 0,1,2...int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);// 找到反序列化算法Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];// 确定具体消息类型Class<? extends Message> messageClass = Message.getMessageClass(messageType);Message message = algorithm.deserialize(messageClass, bytes);
// log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
// log.debug("{}", message);out.add(message);}
}
其中确定具体消息类型,可以根据 消息类型字节
获取到对应的 消息 class
@Data
public abstract class Message implements Serializable {/*** 根据消息类型字节,获得对应的消息 class* @param messageType 消息类型字节* @return 消息 class*/public static Class<? extends Message> getMessageClass(int messageType) {return messageClasses.get(messageType);}private int sequenceId;private int messageType;public abstract int getMessageType();public static final int LoginRequestMessage = 0;public static final int LoginResponseMessage = 1;public static final int ChatRequestMessage = 2;public static final int ChatResponseMessage = 3;public static final int GroupCreateRequestMessage = 4;public static final int GroupCreateResponseMessage = 5;public static final int GroupJoinRequestMessage = 6;public static final int GroupJoinResponseMessage = 7;public static final int GroupQuitRequestMessage = 8;public static final int GroupQuitResponseMessage = 9;public static final int GroupChatRequestMessage = 10;public static final int GroupChatResponseMessage = 11;public static final int GroupMembersRequestMessage = 12;public static final int GroupMembersResponseMessage = 13;public static final int PingMessage = 14;public static final int PongMessage = 15;private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();static {messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);}
}
2 参数调优
2.1 CONNECT_TIMEOUT_MILLIS
-
属于 SocketChannal 参数
-
用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
-
SO_TIMEOUT 主要用在阻塞 IO,阻塞 IO 中 accept,read 等都是无限等待的,如果不希望永远阻塞,使用它调整超时时间
@Slf4j
public class TestConnectionTimeout {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300).channel(NioSocketChannel.class).handler(new LoggingHandler());ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);future.sync().channel().closeFuture().sync(); // 断点1} catch (Exception e) {e.printStackTrace();log.debug("timeout");} finally {group.shutdownGracefully();}}
}
另外源码部分 io.netty.channel.nio.AbstractNioChannel.AbstractNioUnsafe#connect
@Override
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {// ...// Schedule connect timeout.int connectTimeoutMillis = config().getConnectTimeoutMillis();if (connectTimeoutMillis > 0) {connectTimeoutFuture = eventLoop().schedule(new Runnable() {@Overridepublic void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;ConnectTimeoutException cause =new ConnectTimeoutException("connection timed out: " + remoteAddress); // 断点2if (connectPromise != null && connectPromise.tryFailure(cause)) {close(voidPromise());}}}, connectTimeoutMillis, TimeUnit.MILLISECONDS);}// ...
}
2.2 SO_BACKLOG
- 属于 ServerSocketChannal 参数
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
其中
-
在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 2.2 之后,分别用下面两个参数来控制
-
sync queue - 半连接队列
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
syncookies
启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
-
accept queue - 全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
- 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
netty 中
可以通过 option(ChannelOption.SO_BACKLOG, 值) 来设置大小
可以通过下面源码查看默认大小
public class DefaultServerSocketChannelConfig extends DefaultChannelConfigimplements ServerSocketChannelConfig {private volatile int backlog = NetUtil.SOMAXCONN;// ...
}
课堂调试关键断点为:io.netty.channel.nio.NioEventLoop#processSelectedKey
oio 中更容易说明,不用 debug 模式
public class Server {public static void main(String[] args) throws IOException {ServerSocket ss = new ServerSocket(8888, 2);Socket accept = ss.accept();System.out.println(accept);System.in.read();}
}
客户端启动 4 个
public class Client {public static void main(String[] args) throws IOException {try {Socket s = new Socket();System.out.println(new Date()+" connecting...");s.connect(new InetSocketAddress("localhost", 8888),1000);System.out.println(new Date()+" connected...");s.getOutputStream().write(1);System.in.read();} catch (IOException e) {System.out.println(new Date()+" connecting timeout...");e.printStackTrace();}}
}
第 1,2,3 个客户端都打印,但除了第一个处于 accpet 外,其它两个都处于 accept queue 中
Tue Apr 21 20:30:28 CST 2020 connecting...
Tue Apr 21 20:30:28 CST 2020 connected...
第 4 个客户端连接时
Tue Apr 21 20:53:58 CST 2020 connecting...
Tue Apr 21 20:53:59 CST 2020 connecting timeout...
java.net.SocketTimeoutException: connect timed out
2.3 ulimit -n
- 属于操作系统参数
2.4 TCP_NODELAY
- 属于 SocketChannal 参数
2.5 SO_SNDBUF & SO_RCVBUF
- SO_SNDBUF 属于 SocketChannal 参数
- SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
2.6 ALLOCATOR
- 属于 SocketChannal 参数
- 用来分配 ByteBuf, ctx.alloc()
2.7 RCVBUF_ALLOCATOR
- 属于 SocketChannal 参数
- 控制 netty 接收缓冲区大小
- 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定