java socket bio 改造为 netty nio

news/2024/12/21 5:21:57/

        公司早些时候接入一款健康监测设备,由于业务原因近日把端口暴露在公网后,每当被恶意连接时系统会创建大量线程,在排查问题是发现是使用了厂家提供的服务端demo代码,在代码中使用的是java 原生socket,在发现连接后使用独立线程处理后续通信,占用系统资源造成了服务宕机,因此需要进行改造。

        厂家提供的demo代码如下:

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;public class Demo {public static void main(String[] args) {int port = 8003;if (args.length == 1) {port = Integer.parseInt(args[0]);}ServerSocket ss;try {ss = new ServerSocket(port);}catch (Exception e) {System.out.println("服务端socket失败 port = " + port);return;}System.out.println("启动socket监听 端口:" + port);List<Socket> socketList = new ArrayList<>();while (true) {try {Socket socket = ss.accept();if (socket == null || socket.isClosed()) {socketList.remove(socket);continue;}if (socketList.contains(socket)) {continue;}socketList.add(socket);System.out.println("socket连接 address = " + socket.getInetAddress().toString() + " port = " + socket.getPort());new Thread(new HealthReadThread(socket)).start();}catch (IOException e) {System.out.println(e.getMessage());}}}
}
import java.io.*;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;public class HealthReadThread implements Runnable {private Socket socket;HealthReadThread(Socket socket) {this.socket = socket;}private static String message = "";@Overridepublic void run() {try {//输入InputStream inPutStream = socket.getInputStream();BufferedInputStream bis = new BufferedInputStream(inPutStream);
//            BufferedReader br = new BufferedReader(new InputStreamReader(inPutStream));//输出OutputStream outputStream = socket.getOutputStream();BufferedOutputStream bw = new BufferedOutputStream(outputStream);String ip = socket.getInetAddress().getHostAddress();int port = socket.getPort();String readStr = "";
//            char[] buf;byte[] buf;int readLen = 0;while (true) {if (socket.isClosed()) {break;}buf = new byte[1024];try {readLen = bis.read(buf);if (readLen <= 0) {
//                        System.out.println(Thread.currentThread().getId() + "线程: " + "ip地址:" + ip + " 端口地址:" + port + "暂无接收数据");continue;}System.out.println(Thread.currentThread().getId() + "线程: " + "ip地址:" + ip + " 端口地址:" + port + " 接收到原始命令长度:" + readLen);readStr = StringUtils.byteToHexString(buf, readLen);
//                    readStr = new String(buf ,0 , readLen);} catch (IOException e) {System.out.println(e.getMessage());socket.close();
//                    continue;}if (readStr == null || "".equals(readStr)) {continue;}// 省略业务代码}}catch (Exception e) {System.out.println(e.getMessage());}}
}

使用netty进行改造:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class DeviceNettyServer implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {start();}public void start() {Thread thread = new Thread(() -> {// 配置服务端的NIO线程组EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(4);ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)// 使用 NIO 方式进行网络通信.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 添加自己的处理器ch.pipeline().addLast(new DeviceMsgHandler());}});try {int port1 = 8081;int port2 = 8082;// 绑定一个端口并且同步,生成一个ChannelFuture对象ChannelFuture f1 = b.bind(port1).sync();ChannelFuture f2 = b.bind(port2).sync();log.info("启动监听, 端口:" + port1 + "、" + port2);// 对关闭通道进行监听f1.channel().closeFuture().sync();f2.channel().closeFuture().sync();} catch (Exception e) {log.error("启动监听失败", e);} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}});thread.setName("DeviceNettyServer");thread.start();}
}
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class DeviceMsgHandler extends SimpleChannelInboundHandler<ByteBuf> {/*** 已连接的设备*/private static final ConcurrentHashMap<Channel, DeviceDTO> CONNECTION_DEVICE_MAP = new ConcurrentHashMap<>(8);/*** 一旦连接,第一个被执行*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {String remoteAddress = ctx.channel().remoteAddress().toString();log.info("发现连接, remoteAddress: " + remoteAddress);// 发送查询设备信息指令sendQuery(ctx.channel());}/*** 读取数据*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {byte[] bytes = new byte[msg.readableBytes()];msg.readBytes(bytes);// 忽略业务处理代码// 传递给下一个处理器ctx.fireChannelRead(msg);}/*** 连接断开** @param ctx*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {log.info("连接断开, remoteAddress: " + ctx.channel().remoteAddress());CONNECTION_DEVICE_MAP.remove(ctx.channel());}/*** 连接异常** @param ctx* @param cause*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("连接异常, remoteAddress: " + ctx.channel().remoteAddress());CONNECTION_DEVICE_MAP.remove(ctx.channel());}

经过改造后使用了4个worker线程进行读写,消除了原先恶意连接造成线程数无线扩大的问题,使用nio也极大的提高了系统资源利用率。


http://www.ppmy.cn/news/1534777.html

相关文章

一个简单的SQL面试题

最近面试遇到一个SQL题&#xff0c;复述如下&#xff1a; SQL面试题 现在有两张表&#xff0c;结构如下&#xff1a; 学生表&#xff08;student&#xff09; 学号sid姓名name1张三2李四3王五 成绩表&#xff08;score&#xff09; 序号id学号sid科目subject分数score11语…

vue 不是spa 单页面应用吗? 配置路由工作模式为history 后 ,为什么配置Nginx的 try_files 可以根据url 找到对应的文件?

免责申明 记录用&#xff0c;本人主要是后端,可能理解有误 Vue.js 是一个前端框架&#xff0c;主要用于构建单页面应用程序&#xff08;SPA&#xff09;。然而&#xff0c;Nginx 是一个服务器端的应用程序&#xff0c;负责处理 HTTP 请求并返回相应的资源。 当在 Vue.js 应用…

【Qt】控件概述 (1)—— Widget属性

控件概述 1. QWidget核心属性1.1核心属性概述1.2 enable1.3 geometry——窗口坐标1.4 window frame的影响1.4 windowTitle——窗口标题1.5 windowIcon——窗口图标1.6 windowOpacity——透明度设置1.7 cursor——光标设置1.8 font——字体设置1.9 toolTip——鼠标悬停提示设置1…

数据订阅与消费中间件Canal 服务搭建(docker)

MySQL Bin-log开启 进入mysql容器 docker exec -it mysql5.7 bash开启mysql的binlog cd /etc/mysql/mysql.conf.dvi mysqld.cnf #在文件末尾处添加如下配置&#xff08;如果没有这个文件就创建一个&#xff09; [mysqld] # 开启 binlog log-binmysql-bin #log-bin/var/lib/mys…

第18场小白入门赛(蓝桥杯)

第 18 场 小白入门赛 6 武功秘籍 考察进制理解。 对于第 i i i 位&#xff0c;设 b i t i x bit_ix biti​x &#xff0c;每一位的最大值是 b j b_j bj​ &#xff0c;也就是说每一位是 b j 1 b_j1 bj​1 进制 &#xff0c;那么第 i i i 位的大小就是 x ∑ j i 1…

react+antdMobie实现消息通知页面样式

一、实现效果 二、代码 import React, { useEffect, useState } from react; import style from ./style/index.less; import { CapsuleTabs, Ellipsis, Empty, SearchBar, Tag } from antd-mobile; //消息通知页面 export default function Notification(props) {const [opti…

【数据库】 MongoDB 用户分配新的角色和权限

在 MongoDB 中&#xff0c;可以通过简单的命令为用户分配新的角色和权限。这对于调整用户的访问能力和管理数据库安全至关重要。以下是如何为用户分配新的角色和权限的详细步骤。 1. 使用 MongoDB Shell 分配角色 1.1 修改用户角色 要为现有用户分配新的角色&#xff0c;可以…

Zig开发环境搭建

简介 对于程序员来说&#xff0c;最重要的工具之一代码编辑器&#xff0c;一个好用的开发环境能编程过程无比顺畅丝滑&#xff0c;尤其是在学习Zig 这样的新编程语言时。而Visual Studio Code 开发环境就提供了最简单的设置&#xff0c;可以快速获得代码自动补全和代码生成等功…