Java NIO实现高性能HTTP代理

server/2024/11/14 16:26:36/
http://www.w3.org/2000/svg" style="display: none;">
NIO采用多路复用IO模型,相比传统BIO(阻塞IO),通过轮询机制检测注册的Channel是否有事件发生,可以实现一个线程处理客户端的多个连接,极大提升了并发性能。
在5年前,本人出于对HTTP正向代理的好奇新,那时候也在学习JAVA,了解到了NIO,就想用NIO写一个正向代理软件,当时虽然实现了正向代理,但是代码逻辑及其混乱,而且没有经过测试也许有不少的bug
近期因为找工作,又复习起了以往的一些JAVA知识,包括JVM内存模型、GC垃圾回收机制等等,其中也包括NIO。现在回头再看NIO,理解也更深刻了一点。
在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有socket 读写事件进行时,才会使用IO资源,所以它大大减少了资源占用。在Java NIO中,是通过selector.select()去查询每个通道是否有到达事件,如果没有事件,则一直阻塞在那里,因此这种方式会导致用户线程的阻塞。多路复用IO模式,通过一个线程就可以管理多个socket,只有当socket 真正有读写事件发生才会占用资源来进行实际的读写操作。因此,多路复用IO比较适合连接数比较多的情况。
本HTTP代理软件只能代理HTTP和HTTPS协议,分享出来共广大网友参考和学习
1.Bootstrap类
此类用于创建和启动一个HTTP代理服务
java">package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;public class Bootstrap {private final Logger logger = LogManager.getLogger(Bootstrap.class);private AbstractEventLoop serverEventLoop;private int port;public Bootstrap() {port = 8888;serverEventLoop = new ServerEventLoop(this);}public Bootstrap bindPort(int port) {try {this.port = port;this.serverEventLoop.bind(port);} catch (Exception e) {logger.error("open server socket channel error.", e);}return this;}public void start() {serverEventLoop.getSelector().wakeup();logger.info("Proxy server started at port {}.", port);}public AbstractEventLoop getServerEventLoop() {return serverEventLoop;}
}
2.ServerEventLoop
事件循环,单线程处理事件循环。包括客户端的连接和读写请求,目标服务器的连接和读写事件,在同一个事件循环中处理。
java">package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.example.common.HttpRequestParser;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;public class ServerEventLoop extends AbstractEventLoop {private final Logger logger = LogManager.getLogger(ServerEventLoop.class);public ServerEventLoop(Bootstrap bootstrap) {super(bootstrap);}@Overrideprotected void processSelectedKey(SelectionKey key) {if (key.isValid() && key.isAcceptable()) {if (key.attachment() instanceof Acceptor acceptor) {acceptor.accept();}}if (key.isValid() && key.isReadable()) {if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleRead();}}if (key.isValid() && key.isConnectable()) {key.interestOpsAnd(~SelectionKey.OP_CONNECT);if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleConnect();}}if (key.isValid() && key.isWritable()) {key.interestOpsAnd(~SelectionKey.OP_WRITE);if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleWrite();}}}@Overridepublic void bind(int port) throws Exception {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);SelectionKey key = serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);key.attach(new Acceptor(serverSocketChannel));serverSocketChannel.bind(new InetSocketAddress(port));}class Acceptor {ServerSocketChannel ssc;public Acceptor(ServerSocketChannel ssc) {this.ssc = ssc;}public void accept() {try {SocketChannel socketChannel = ssc.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ, new ClientChannelHandler(socketChannel));logger.info("accept client connection");} catch (IOException e) {logger.error("accept error");}}}abstract class ChannelHandler {Logger logger;SocketChannel channel;ByteBuffer writeBuffer;public ChannelHandler(SocketChannel channel) {this.logger = LogManager.getLogger(this.getClass());this.channel = channel;this.writeBuffer = null;}abstract void handleRead();public void handleWrite() {doWrite();}public abstract void onChannelClose();public ByteBuffer doRead() {ByteBuffer buffer = ByteBuffer.allocate(4096);try {int len = channel.read(buffer);if (len == -1) {logger.info("read end-of-stream, close channel {}", channel);channel.close();onChannelClose();}if (len > 0) {buffer.flip();}} catch (IOException e) {logger.error("read channel error");try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error.");}}return buffer;}public void doWrite() {if (writeBuffer != null) {try {while (writeBuffer.hasRemaining()) {channel.write(writeBuffer);}} catch (IOException e) {logger.error("write channel error.");try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error");}}writeBuffer = null;}}public void handleConnect() {}}class ClientChannelHandler extends ChannelHandler {HttpRequestParser requestParser;private SelectableChannel proxyChannel;public ClientChannelHandler(SocketChannel sc) {super(sc);this.channel = sc;this.requestParser = new HttpRequestParser();this.proxyChannel = null;}@Overridepublic void handleRead() {if (requestParser.isParsed()) {if (proxyChannel != null) {SelectionKey proxyKey = proxyChannel.keyFor(selector);if (proxyKey != null && proxyKey.isValid() && proxyKey.attachment() instanceof ProxyChannelHandler proxyHandler) {//需要等待ProxyHandler的写入缓存为空后才可读取客户端的数据if (proxyHandler.writeBuffer == null) {ByteBuffer buffer = doRead();if (buffer.hasRemaining() && proxyKey.isValid()) {proxyHandler.writeBuffer = buffer;proxyKey.interestOpsOr(SelectionKey.OP_WRITE);}}}}} else {ByteBuffer buffer = doRead();requestParser.putFromByteBuffer(buffer);if (requestParser.isParsed()) {//连接到目标服务器ByteBuffer buf = null;if (requestParser.getMethod().equals(HttpRequestParser.HTTP_METHOD_CONNECT)) {//回写客户端连接成功SelectionKey clientKey = channel.keyFor(selector);if (clientKey != null && clientKey.isValid() && clientKey.attachment() instanceof ClientChannelHandler clientHandler) {clientHandler.writeBuffer = ByteBuffer.wrap((requestParser.getProtocol() + " 200 Connection Established\r\n\r\n").getBytes());clientKey.interestOpsOr(SelectionKey.OP_WRITE);}} else {//将缓存的客户端的数据通过代理转发byte[] allBytes = requestParser.getAllBytes();buf = ByteBuffer.wrap(allBytes);}this.proxyChannel = connect(requestParser.getAddress(), buf);}}}@Overridepublic void onChannelClose() {try {if (proxyChannel != null) {proxyChannel.close();}} catch (IOException e) {logger.error("close channel error");}}private SocketChannel connect(String address, ByteBuffer buffer) {String host = address;int port = 80;if (address.contains(":")) {host = address.split(":")[0].trim();port = Integer.parseInt(address.split(":")[1].trim());}SocketAddress target = new InetSocketAddress(host, port);SocketChannel socketChannel = null;SelectionKey proxyKey = null;int step = 0;try {socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);step = 1;ProxyChannelHandler proxyHandler = new ProxyChannelHandler(socketChannel);proxyHandler.setClientChannel(channel);proxyHandler.writeBuffer = buffer;proxyKey = socketChannel.register(selector, SelectionKey.OP_CONNECT, proxyHandler);proxyKey.interestOpsOr(SelectionKey.OP_WRITE);step = 2;socketChannel.connect(target);} catch (IOException e) {logger.error("connect error.");switch (step) {case 2:proxyKey.cancel();case 1:try {socketChannel.close();} catch (IOException ex) {logger.error("close channel error.");}socketChannel = null;break;}}return socketChannel;}}class ProxyChannelHandler extends ChannelHandler {private SelectableChannel clientChannel;public ProxyChannelHandler(SocketChannel sc) {super(sc);clientChannel = null;}@Overridepublic void handleConnect() {try {if (channel.isConnectionPending() && channel.finishConnect()) {SelectionKey proxyKey = channel.keyFor(selector);proxyKey.interestOpsOr(SelectionKey.OP_READ);}} catch (IOException e) {try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error.");}logger.error("finish connection error.");}}@Overridepublic void handleRead() {if (clientChannel != null) {SelectionKey clientKey = clientChannel.keyFor(selector);if (clientKey != null && clientKey.isValid() && clientKey.attachment() instanceof ClientChannelHandler clientHandler) {if (clientHandler.writeBuffer == null) {ByteBuffer buffer = doRead();if (buffer.hasRemaining() && clientKey.isValid()) {clientHandler.writeBuffer = buffer;clientKey.interestOpsOr(SelectionKey.OP_WRITE);}}}}}@Overridepublic void onChannelClose() {try {if (clientChannel != null) {clientChannel.close();}} catch (IOException e) {logger.error("close channel error");}}public void setClientChannel(SocketChannel client) {this.clientChannel = client;}}
}
3.AbstractEventLoop
事件循环的抽象类
java">package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executors;public abstract class AbstractEventLoop implements Runnable {private final Logger logger = LogManager.getLogger(AbstractEventLoop.class);protected Selector selector;protected Bootstrap bootstrap;public AbstractEventLoop(Bootstrap bootstrap) {this.bootstrap = bootstrap;openSelector();Executors.newSingleThreadExecutor().submit(this);}public void bind(int port) throws Exception {throw new Exception("not support");}@Overridepublic void run() {while (true) {try {if (selector.select() > 0) {processSelectedKeys();}} catch (Exception e) {logger.error("select error.", e);}}}private void processSelectedKeys() {Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();processSelectedKey(key);}}protected abstract void processSelectedKey(SelectionKey key);public Selector openSelector() {try {this.selector = Selector.open();return this.selector;} catch (IOException e) {logger.error("open selector error.", e);}return null;}public Selector getSelector() {return selector;}
}
4.HttpRequestParser
用于解析HTTP请求报文中的请求头,可以获取主机和端口号
java">package org.example.common;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;public class HttpRequestParser {private final Logger logger = LogManager.getLogger(HttpRequestParser.class);public static final String COLON = ":";public static final String REQUEST_HEADER_HOST_PREFIX = "host:";private UnboundedByteBuffer requestBytes = new UnboundedByteBuffer();private List<String> headers = new ArrayList<>();public static final String HTTP_METHOD_GET = "GET";public static final String HTTP_METHOD_POST = "POST";public static final String HTTP_METHOD_PUT = "PUT";public static final String HTTP_METHOD_DELETE = "DELETE";public static final String HTTP_METHOD_TRACE = "TRACE";public static final String HTTP_METHOD_OPTIONS = "OPTIONS";public static final String HTTP_METHOD_HEAD = "HEAD";public static final String HTTP_METHOD_CONNECT = "CONNECT";private String address;private String protocol;private String method;private boolean parsed = false;private StringBuffer reqHeaderBuffer = new StringBuffer();public void putFromByteBuffer(ByteBuffer buffer) {for (; buffer.hasRemaining(); ) {byte b = buffer.get();requestBytes.addByte(b);reqHeaderBuffer.append((char) b);if (b == '\n' && reqHeaderBuffer.charAt(reqHeaderBuffer.length() - 2) == '\r') {if (reqHeaderBuffer.length() == 2) {parsed = true;logger.debug("Request header line end.");break;}String headerLine = reqHeaderBuffer.substring(0, reqHeaderBuffer.length() - 2);logger.debug("Request header line parsed {}", headerLine);headers.add(headerLine);if (headerLine.startsWith(HTTP_METHOD_GET)|| headerLine.startsWith(HTTP_METHOD_POST)|| headerLine.startsWith(HTTP_METHOD_PUT)|| headerLine.startsWith(HTTP_METHOD_DELETE)|| headerLine.startsWith(HTTP_METHOD_TRACE)|| headerLine.startsWith(HTTP_METHOD_OPTIONS)|| headerLine.startsWith(HTTP_METHOD_HEAD)|| headerLine.startsWith(HTTP_METHOD_CONNECT)) {this.protocol = headerLine.split(" ")[2].trim();this.method = headerLine.split(" ")[0].trim();} else if (headerLine.toLowerCase().startsWith(REQUEST_HEADER_HOST_PREFIX)) {this.address = headerLine.toLowerCase().replace(REQUEST_HEADER_HOST_PREFIX, "").trim();}reqHeaderBuffer.delete(0, reqHeaderBuffer.length());}}}public boolean isParsed() {return parsed;}public String getAddress() {return address;}public String getProtocol() {return protocol;}public String getMethod() {return method;}public byte[] getAllBytes() {return requestBytes.toByteArray();}
}
5.UnboundedByteBuffer
无界的字节缓冲区,每次会以两倍的容量扩容,可以用于追加存入客户端的请求数据,实现粘包
java">package org.example.common;public class UnboundedByteBuffer {private byte[] bytes;private int size;private int cap;private final int DEFAULT_CAP = 4096;private final int MAX_CAP = 1 << 30;public UnboundedByteBuffer() {this.cap = DEFAULT_CAP;this.bytes = new byte[this.cap];this.size = 0;}public void addBytes(byte[] data) {ensureCapacity(data.length);System.arraycopy(data, 0, bytes, size, data.length);this.size += data.length;}private void ensureCapacity(int scale) {if (scale + this.size > this.cap) {int tmpCap = this.cap;while (scale + this.size > tmpCap) {tmpCap = tmpCap << 1;}if (tmpCap > MAX_CAP) {return;}byte[] newBytes = new byte[tmpCap];System.arraycopy(this.bytes, 0, newBytes, 0, this.size);this.bytes = newBytes;}}public byte[] toByteArray() {byte[] ret = new byte[this.size];System.arraycopy(this.bytes, 0, ret, 0, this.size);return ret;}public void addByte(byte b) {ensureCapacity(1);this.bytes[this.size++] = b;}
}
以上实现是在单个事件循环线程中处理所有事件,一个更好的方案是将客户端的Channel和代理服务器与目标服务器的Channel区分开,分别在两个事件循环中处理。基本实现也和本文中的代码大体一致,两者在理论上应该存在性能差距,实际经过本人测试可以每秒处理客户端的上千个连接。代码https://download.csdn.net/download/Yuriey/89971662">传送门

http://www.ppmy.cn/server/141200.html

相关文章

Flink介绍

什么是Flink Apache Flink 是一个开源的流处理框架&#xff0c;用于处理实时数据流和批处理数据。它具有高吞吐量、低延迟和容错性强的特点&#xff0c;适用于各种实时数据处理场景&#xff0c;如实时分析、事件驱动应用和数据管道等。Flink 提供了丰富的 API&#xff0c;支持…

基础数据结构——队列(链表实现)

队列的性质 先进先出&#xff08;FIFO - First In First Out&#xff09;&#xff1a;最先加入队列的元素最先被移出后进后出&#xff08;后来的元素排在队尾&#xff09;只允许在队尾插入元素&#xff0c;在队首删除元素具有先来先服务的特点 链表实现队列 和之前创建链表相…

【ROS2】usb摄像头识别二维码

1、安装依赖 1)安装usb摄像头 ROS包: sudo apt install ros-humble-usb-cam2)安装二维码识别库 sudo apt install libzbar-dev3)安装opencv cv::Mat和sensor_msgs转换库 sudo apt install ros-humble-cv-bridge4)安装pydantic库 pip3 install pydantic==1.10.7注意版…

ImportError: cannot import name ‘packaging‘ from ‘pkg_resources‘ 的参考解决方法

文章目录 写在前面一、问题描述二、解决方法参考链接 写在前面 自己的测试环境&#xff1a; Ubuntu20.04 ROS-Noetic 一、问题描述 自己在通过 pip install 安装module时 &#xff08;使用的是 pip install mmcv&#xff09;遇到如下问题&#xff1a; ImportError: cannot …

rust字符串

Rust字符串 在Rust中&#xff0c;字符串(string)是一种非常重要的数据类型&#xff0c;用于表示文本数据。Rust中的字符串有两种类型&#xff1a;String和&str。 String类型是可变的、堆分配的字符串类型&#xff0c;可以动态地增加或删除字符。例如&#xff0c;下面创建了…

Vite与Vue Cli的区别与详解

它们的功能非常相似&#xff0c;都是提供基本项目脚手架和开发服务器的构建工具。 主要区别 Vite在开发环境下基于浏览器原生ES6 Modules提供功能支持&#xff0c;在生产环境下基于Rollup打包&#xff1b; Vue Cli不区分环境&#xff0c;都是基于Webpack。 在生产环境下&…

初识Electron 进程通信

概述 Electron chromium nodejs native API&#xff0c;也就是将node环境和浏览器环境整合到了一起&#xff0c;这样就构成了桌面端&#xff08;chromium负责渲染、node负责操作系统API等&#xff09; 流程模型 预加载脚本&#xff1a;运行在浏览器环境下&#xff0c;但是…

半导体制造技术导论(第二版)萧宏 第五章集成电路加热工艺答案

本章要求 1. 至少列出三种重要的加热工艺 水平式高温炉加热、垂直式高温炉加热&#xff08;即传统的管式炉加热&#xff09;、快速加热工艺RTP、激光镭射 退火LSA 2. 说明直立式和水平式炉管的基本系统&#xff0c;以及直立式炉管的优点 基本系统&#xff1a;控制系统…