Reactor线程模型
- 1. 概念
- 2.角色
- 3.模型类型
- 单Reactor-单线程
- 单Reactor-多线程
- ⭐主从Reactor-多线程
- Code design(参考zk)
- 1. 创建服务, 创建服务上下文工厂
- 2. 初始化服务上下文工厂配置
- 3. 启动服务, 从里往外(worker > selector > accept)依次进行启动线程
- 4. Accept Thread中处理客户端请求连接
- 5.SelectorThread中检测IO事件
- 6.Worker线程处理IO时间
1. 概念
一种并发编程模型, 是一种思想, 也叫1+M+N线程模式, 应用广泛, 如Nginx、Memcached、Netty等
2.角色
Reactor: 负责监听和分配事件, 将IO事件分派给对应的Handler, 新的事件包括连接建立就绪、读就绪、写就绪等
Acceptor: 处理客户端新连接, 并分派请求到处理器中
Handler: 将自身与事件绑定, 执行非阻塞IO任务, 完成channel读写, 一级业务逻辑
3.模型类型
单Reactor-单线程
缺点: 所有接受连接, 处理数据操作都在一个线程中完成, 有性能瓶颈
缺点: 将比较耗时的数据编码、解码、计算等操作放入线程池中执行, 虽然提升性能但还不是最好的方式
单Reactor-多线程
⭐主从Reactor-多线程
主从多线程, 对于服务器来说, 接收客户端的链接单独用线程操作
工作流程:
- Reactor主线程MainReactor通过select监听客户端连接时间, 接收到事件后, 通过Acceptor处理客户端链接事件
- 当Acceptor处理完成链接事件后, MainReactor将连接分配到SubReactor
- SubReactor将连接加入到自己的连接队列进行监听, 并创建Handler各种事件进行处理
- 当连接上有新事件发生, Subreactor就会调用对应的Handler处理
- Handler通过read从通道上获取数据, 将请求分发到Worker线程池处理业务
- Worker线程池会分配独立线程完成业务处理, 并将处理请求返回给Handler, Handler通过send向客户端响应数据
- 一个MainReactor可以对应多个SubReactor, 即一个MainReactor线程对应多个SubReactor线程
Code design(参考zk)
1. 创建服务, 创建服务上下文工厂
Server server = new Server();
CountDownLatch latch = new CountDownLatch(1);
server.registerShutDownHandler(new ServerShutDownHandler(latch));// 创建连接管理器工厂
if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();// 初始化配置cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
2. 初始化服务上下文工厂配置
// 初始化最大连接数
this.maxClientCnxns = maxClientCnxns;
initMaxCnxns();// 准备相关资源 获取cpu核数
int coreNum = Runtime.getRuntime().availableProcessors();
// 计算用于检测客户端IO事件的线程数量
numSelectorThreads = Integer.getInteger(NIO_NUM_SELECTOR_THREADS,Math.max((int) Math.sqrt((double) coreNum / 2), 1));
// 计算用于处理客户端IO事件的线程数量
numWorkerThreads = Integer.getInteger(NIO_NUM_WORKER_THREADS, 2 * coreNum);//准备好SelectThread
for (int i = 0; i < numSelectorThreads; i++) {selectorThreads.add(new SelectorThread(i));
}
// 打开一个服务端的ServerSocketChannel
this.ss = ServerSocketChannel.open();
this.ss.configureBlocking(false);
// 关闭立即释放端口
ss.socket().setReuseAddress(true);
if ((listenBacklog = clientPortListenBacklog) == -1) {ss.socket().bind(clientPortAddress);
} else {ss.socket().bind(clientPortAddress, listenBacklog);
}
//创建AcceptThread
acceptThread = new AcceptThread(this.ss, selectorThreads, clientPortAddress);
创建固定长度Selector IO检测线程, 创建ServerSocketChannel, listenBacklog请求队列大小, 创建accept线程
3. 启动服务, 从里往外(worker > selector > accept)依次进行启动线程
// 初始化配置
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
//启动连接管理器
cnxnFactory.startup(server);
// 服务停止标识
stopped = false;
// 启动IO工作线程池
if (workerPool == null) {workerPool = new IOWorkerService(numWorkerThreads, "IOWorker");
}// 启动selector thread
for (SelectorThread selectorThread : selectorThreads) {if (selectorThread.getState() == Thread.State.NEW) {selectorThread.start();}
}
// 启动accept thread
if (acceptThread.getState() == Thread.State.NEW) {acceptThread.start();
}
4. Accept Thread中处理客户端请求连接
/*** AcceptThread 检测是否有新连接*/
@Override
public void run() {try {while (!stopped && !serverSocketChannel.socket().isClosed()) {//执行select程序select();}} finally {closeSelector();}
}private void select() {try {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (!stopped && iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (!key.isValid()) {continue;}// 判断是否是acceptif (key.isAcceptable()) {// 接收连接if (!doAccept()) {pauseAccept(10);}}}} catch (IOException e) {e.printStackTrace();}
}/*** 接收新连接,并绑定到一个selector thread** @return*/
private boolean doAccept() {boolean accepted = false;SocketChannel socketChannel = null;try {socketChannel = serverSocketChannel.accept();accepted = true;// TODO cnxn over limit// 设置非阻塞socketChannel.configureBlocking(false);// 将当前连接 绑定给一个 selector thread round robinif (!selectorIterator.hasNext()) {selectorIterator = selectorThreads.iterator();}SelectorThread selectorThread = selectorIterator.next();if (!selectorThread.addAcceptedConnection(socketChannel)) {throw new IOException("unable to add connection to selector thread");}} catch (IOException e) {e.printStackTrace();fastCloseSock(socketChannel);}return accepted;
}
doAccept为主要处理连接接收代码, selector接收到客户端请求, 接收到一个SocketChannel, 将其添加到selecor thread(IO检测线程)中, 其中获取selecor thread时采用迭代器实现轮询(round robin)的效果, 目的将SocketChannel平均分配到selecor thread的队列中
5.SelectorThread中检测IO事件
@Override
public void run() {while (!stopped) {try {// IO selectselect();// 处理新接收的连接processAcceptedConnections();processInterestOpsUpdateRequests();} catch (Exception e) {e.printStackTrace();}}//// Close connections still pending on the selector. Any others// with in-flight work, let drain out of the work queue.for (SelectionKey key : selector.keys()) {NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();if (cnxn.isSelectable()) {cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);}cleanupSelectionKey(key);}SocketChannel accepted;while ((accepted = acceptedQueue.poll()) != null) {fastCloseSock(accepted);}
}private void select() {try {selector.select();Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();while (!stopped && iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();if (!key.isValid()) {cleanupSelectionKey(key);continue;}//检测是否有 IOif (key.isReadable() || key.isWritable()) {handleIO(key);}}} catch (IOException e) {e.printStackTrace();}
}private void processAcceptedConnections() {SocketChannel sc;while (!stopped && (sc = acceptedQueue.poll()) != null) {SelectionKey key = null;try {key = sc.register(selector, SelectionKey.OP_READ);// 封装连接NIOServerCnxn cnxn = createConnection(sc, key, this);key.attach(cnxn);} catch (IOException e) {e.printStackTrace();cleanupSelectionKey(key);fastCloseSock(sc);}}
}// 包装 selectionKey, 提交给IOWorkPool
private void handleIO(SelectionKey key) {// 封装 IOWorkRequestIOWorkRequest ioWorkRequest = new IOWorkRequest(this, key);NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();// Stop selecting this key while processing on its connectioncnxn.disableSelectable();// IO处理是在异步线程中,Select线程会不断select该key ,在当前IO未完成前暂停检测key.interestOps(0);//交给 workerPoolworkerPool.schedule(ioWorkRequest);
}
select轮序检查有无新的读写事件, processAcceptedConnections处理新连接的时间绑定, 在select中遍历获取到的SelectionKey, 进行IO操作, 主要实现在handleIO中; 注意的是IO处理是在异步线程中, Select线程会不断select该key, 可以使用interestOps方法标记SelectionKey没有任何兴趣操作, selector.select()时就不会检测到该SelectionKey, 最终将上下文信息封装到IOWorkRequest进行工作线程的提交
6.Worker线程处理IO时间
// 处理 IOWork
public void schedule(IOWork ioWork) {if (stopped) {ioWork.cleanup();return;}// 将IOWork 包装成一个handlerIOWorkHandler ioWorkHandler = new IOWorkHandler(ioWork);if (ioWorkerPool != null) {try {ioWorkerPool.execute(ioWorkHandler);} catch (RejectedExecutionException e) {e.printStackTrace();ioWork.cleanup();}} else {ioWorkHandler.run();}
}
将ioWork封装到IOWorkHandler中提交事务, IOWorkHandler.run()->IOWork.doWork()
// 真正处理 IO
@Override
public void doWork() throws Exception {if (!key.isValid()) {selectorThread.cleanupSelectionKey(key);return;}if (key.isReadable() || key.isWritable()) {// 执行IO操作cnxn.doIO(key);if (stopped) {cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);return;}if (!key.isValid()) {selectorThread.cleanupSelectionKey(key);return;}}// Mark this connection as once again ready for selectioncnxn.enableSelectable();cnxn.requestInterestOpsUpdate();
}
/*** Handles read/write IO on connection.*/
public void doIO(SelectionKey key) {try {if (!isSocketOpen()) {return;}//处理读事件if (key.isReadable()) {int i = sc.read(incomingBuffer);if (i < 0) {handleFailedRead();}// 等到 incomingBuffer 读满if (incomingBuffer.remaining() == 0) {boolean isBody = false;// 如果header 读满if (incomingBuffer == headerBuffer) {incomingBuffer.flip();// 读 body 的长度isBody = readHeader(key);incomingBuffer.clear();} else {isBody = true;}// 如果 body 读满if (isBody) {readBody();} else {return;}}}//处理可写事件if (key.isWritable()) {handleWrite(key);}} catch (EndOfStreamException e) {e.printStackTrace();close(e.getReason());} catch (IOException e) {e.printStackTrace();close(DisconnectReason.IO_EXCEPTION);}}
获取客户端数据, readBody()>doRequest()>processPacket(), 讲读到的数据封装到BusinessHandler中, 提交业务线程
/*** 服务端处理数据** @param cnxn* @param incomingBuffer*/public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) {// 封装 requestRequest request = new Request(cnxn, incomingBuffer);// TODO throttled// submit requestbusinessService.submitRequest(request);}
/*** 提交请求** @param request*/
public void submitRequest(Request request) {BusinessHandler handler = new BusinessHandler(request);executorService.execute(handler);
}public class BusinessHandler implements Runnable {private final ServerCnxn cnxn;private final Request request;public BusinessHandler(Request request) {this.request = request;this.cnxn = request.getCnxn();}@Overridepublic void run() {try {Serializer serializer = server.getSerializer();ByteBuffer reqBuf = this.request.getRequest();byte[] bytes = new byte[reqBuf.remaining()];reqBuf.get(bytes);RequestData requestData = null;if (serializer instanceof RequestSerializer) {RequestSerializer requestSerializer = (RequestSerializer) serializer;requestData = requestSerializer.decodeRequest(bytes);}System.out.println("服务端接收到的请求数据为:" + requestData);SecureRandom secureRandom = new SecureRandom();int i = secureRandom.nextInt(2000);System.out.println("即将模拟请求执行耗费" + i + "毫秒");Thread.currentThread().sleep(i);//回写数据ResponseData response = new ResponseData();response.setId(requestData.getId());response.setStatus(200);response.setMsg("hello " + i);System.out.println("服务端要返回的响应为:" + response);if (serializer instanceof ResponseSerializer) {ResponseSerializer rs = (ResponseSerializer) serializer;byte[] rsbytes = rs.encodeResponse(response);ByteBuffer buffer = ByteBuffer.allocate(4 + rsbytes.length);buffer.putInt(rsbytes.length);buffer.put(rsbytes);buffer.flip();//发送buffercnxn.sendBuffer(buffer);// TODO throttled}} catch (Exception e) {e.printStackTrace();}}
}
在业务线程BusinessHandler中, 反序列化接收到的数据, 处理业务逻辑, 最终在上下文中写出响应数据
ServerCnxn.sendBuffer() > requestInterestOpsUpdate() > SelectorThread.addInterestOpsUpdateRequest() 最终将要写出的数据写入到IO检测线程的update队列中, 在执行一遍handlerIO操作, 将数据写回到客户端
线程比例1(accept thread) : M(selector thread): N(busness thread)
变种reactor线程比例1(accept thread) : M(selector thread) : N(ioWorker thread) : K(busness thread)