在单线程环境下一个线程虽然可以执行任务但是所有的任务都交给一个线程来做当任务积累起来时,前面的任务会影响后续任务的执行,并且现在都是多核处理器我们需要尽可能利用cpu所以多线程 的优化就是一个不错的选择。
我们选择多线程后可以对任务进行分类,一个线程只执行某一特定任务,比如我们设置boss线程来处理客户端的连接请求,设置worker线程来处理客户端的读写操作等
但是引入多线程后线程间的执行顺利我们需要控制,比如当boss线程接收到一个请求后执行SocketChannel sc = ssc.accept();语句获取到连接,但是还需要将这个连接交给worker线程来处理读写请求,这就要求我们一定存在worker线程并且正在工作,但是工作状态的worker线程在没有任务进来的情况下会被多路选择器(selector)阻塞在select方法处,而我们又需要将连接sc注册到worker的selector上让它去监听这个连接上的事件,由于selector被阻塞我们无法完成注册,这就尬住了。
解决方法就是我们控制register和select方法,使register方法能再select方法执行后执行,我们这时可以将register方法放到worker线程的注册方法中执行,并且唤醒一次selector就可以执行到register了
package cn.itcast.mytest;import javafx.concurrent.Worker;
import lombok.extern.slf4j.Slf4j;import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;import static cn.itcast.nio.c2.ByteBufferUtil.debugAll;@Slf4j
public class MultiThreadServer {public static void main(String[] args) throws IOException {//创建一个boss线程,专门用来监听有没有连接请求Thread.currentThread().setName("boss");//开启服务器ServerSocketChannel ssc = ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8080));//创建一个selectorSelector boss = Selector.open();SelectionKey bossKey = ssc.register(boss, SelectionKey.OP_ACCEPT, null);//创建多个worker线程(固定数量的),专门用来处理读写事件Worker[] workers = new Worker[2];for (int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-" + i);}//不断轮询看是否有新客户接入while(true){boss.select();//有新连接接入Iterator<SelectionKey> iterator = boss.selectedKeys().iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();//删除任务iterator.remove();if(key.isAcceptable()){//获取连接SocketChannel sc = ssc.accept();sc.configureBlocking(false);//将连接交给worker线程处理int index = (int) (sc.hashCode() % workers.length);log.debug("分配给worker-{}", index);workers[index].register(sc);//这个方法是在boss线程中被调用的,所以register方法中的内容实际还是由boss线程执行log.debug("connected...{}", sc.getRemoteAddress());}}}}static class Worker implements Runnable{private Thread thread;private Selector worker;private String name;private boolean start = false;//保证我们一个worker线程只创建一次线程,避免创建太多线程private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();public Worker(String name) {this.name = name;}public void register(SocketChannel sc) throws IOException {if (!start) {this.thread = new Thread(this, name);this.worker = Selector.open();thread.start();start = true;}//将channel注册到worker的selector上,为了这条语句不被select()阻塞我们需要他在select()方法前执行,所以最好是在同一个线程被执行这样我们容易控制先后关系//通过线程安全的队列来实现,向队列中添加任务但先不执行queue.add(() -> {try {sc.register(worker, SelectionKey.OP_READ, null);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});//因为在thread.start();后已经执行了worker.select();方法所以我们需要主动唤醒select来执行registerworker.wakeup();}@Overridepublic void run() {//worker的职责就是专门关注读写事件while(true) {try {//监听worker.select();Runnable task = queue.poll();if(task != null){task.run();//执行注册任务}//获取事件集Iterator<SelectionKey> iterator = worker.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey workerKey = iterator.next();iterator.remove();if (workerKey.isReadable()) {//读取数据try {ByteBuffer buffer = ByteBuffer.allocate(16);SocketChannel sc = (SocketChannel) workerKey.channel();log.debug("read...{}", sc.getRemoteAddress());int read = sc.read(buffer);if (read == -1) {//客户端断开连接workerKey.cancel();} else {buffer.flip();debugAll(buffer);}} catch (IOException e) {e.printStackTrace();}}}} catch (IOException e) {throw new RuntimeException(e);}}}}
}