前言:通过Selector.open() 获取到Selector 的选择器后,服务端和客户的socket 都可以通过register 进行socker 的注册;
服务端 ServerSocketChannel 的注册:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);// 设置非阻塞serverSocketChannel.socket().bind(new InetSocketAddress(8080));// 连接事件注册到多路复用serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ServerSocketChannel 的类关系:
1)ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl;
2)ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel;
3) SelChImpl extends Channe;
先看下ServerSocketChannel 实例的初始化: ServerSocketChannel.open()
ServerSocketChannel 类:
public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();
}
可以看到先拿到了Select.open() 时创建的SelectorProvider 实例,然后在调用openServerSocketChannel;
SelectorProviderImpl 类:
public ServerSocketChannel openServerSocketChannel() throws IOException {// 创建ServerSocketChannelImpl 实例return new ServerSocketChannelImpl(this);
}public SocketChannel openSocketChannel() throws IOException {return new SocketChannelImpl(this);
}
ServerSocketChannelImpl 类:
private static NativeDispatcher nd;private final FileDescriptor fd;private int fdVal;private volatile long thread = 0L;private final Object lock = new Object();private final Object stateLock = new Object();private static final int ST_UNINITIALIZED = -1;private static final int ST_INUSE = 0;private static final int ST_KILLED = 1;private int state = -1;private InetSocketAddress localAddress;private boolean isReuseAddress;ServerSocket socket;ServerSocketChannelImpl(SelectorProvider var1) throws IOException {// 传递创建的 SelectorProvider 实例super(var1);this.fd = Net.serverSocket(true);this.fdVal = IOUtil.fdVal(this.fd);this.state = 0;}
可以看到 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();中 serverSocketChannel 实际指向的是其子类(ServerSocketChannelImpl)对象。
要说明的是:ServerSocketChannel ssc = ServerSocketChannel.open()创建的这个新的Channel中的Socket是最初的,必须对这个Socket通过bind方法绑定指定的地址之后才能接收连接;
再来看服务端socket 的建立: serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.socket():
ServerSocketChannelImpl 类中:
public ServerSocket socket() {synchronized(this.stateLock) {// 加锁保证 同一个ServerSocketChannel socket 的唯一性if (this.socket == null) {this.socket = ServerSocketAdaptor.create(this);}return this.socket;}
}
serverSocketChannel.socket().bind() 绑定要监听的端口;
接下来看serverSocketChannel实例的注册:
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
SelectableChannel 类中register 方法返回注册的SelectionKey:
public final SelectionKey register(Selector sel, int ops)throws ClosedChannelException{return register(sel, ops, null);}
调用: AbstractSelectableChannel 类中register 方法:
public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException
{synchronized (regLock) {// 获取锁if (!isOpen())// serverSocketChannel 通道是否打开throw new ClosedChannelException();if ((ops & ~validOps()) != 0)// serverSocketChannel 的操作行验证throw new IllegalArgumentException();if (blocking)// 如果建立的serverSocketChannel 通道是阻塞的直接抛出异常throw new IllegalBlockingModeException();// 当前通道有没在 Selector进行过注册,如果已经注册则修改属性后直接返回SelectionKey k = findKey(sel);if (k != null) {k.interestOps(ops);k.attach(att);}// 当前通道没有在当前selector 注册if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();// 当前管道注册到selector 选择器上k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}
}
validOps:
public final int validOps() {return SelectionKey.OP_ACCEPT;
}
再来看下 ((AbstractSelector)sel).register(this, ops, att):
实现将当前管道注册到当前的Selector 上并返回SelectionKey
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {if (!(var1 instanceof SelChImpl)) {throw new IllegalSelectorException();} else {// 新建 SelectionKeyImpl 赋值属性SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);var4.attach(var3);synchronized(this.publicKeys) {// 防止并发多个管道同时 向selector 注册的安全性问题this.implRegister(var4);}var4.interestOps(var2);// 返回新建的SelectionKey 对新return var4;}
}
SelectionKeyImpl的构造方法:可以看出将当前selector 和要注册的管道封装到了SelectionKeyImpl 对象中
final SelChImpl channel;
public final SelectorImpl selector;private int index;private volatile int interestOps;private int readyOps;SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) {this.channel = var1;this.selector = var2;}
继续看 this.implRegister(var4) 具体的注册方法:
WindowsSelectorImpl类
protected void implRegister(SelectionKeyImpl var1) {// 传入上一步封装的SelectionKeyImpl synchronized(this.closeLock) {// 获取selector 的关闭锁,防止正在注册的时候 ,selector关闭if (this.pollWrapper == null) {throw new ClosedSelectorException();} else {// SelectionKeyImpl 管道数组是否需要进行扩容this.growIfNeeded();// 当前管道放入channelArray数组中this.channelArray[this.totalChannels] = var1;var1.setIndex(this.totalChannels);this.fdMap.put(var1);// publicKeys 增加注册的管道SelectionKeythis.keys.add(var1);// 管道增加到 pollWrapperthis.pollWrapper.addEntry(this.totalChannels, var1);// 管道数量增加++this.totalChannels;}}
}
将管道放入到channelArray 和pollWrapper 中,并增加管道的数量;
growIfNeeded 扩容方法:
private void growIfNeeded() {if (this.channelArray.length == this.totalChannels) {// 2 倍扩容int var1 = this.totalChannels * 2;SelectionKeyImpl[] var2 = new SelectionKeyImpl[var1];System.arraycopy(this.channelArray, 1, var2, 1, this.totalChannels - 1);this.channelArray = var2;this.pollWrapper.grow(var1);}// 辅助线程的数量,当注册到selector 的数量每次达到1024个,就将辅助线程+1if (this.totalChannels % 1024 == 0) {this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, this.totalChannels);++this.totalChannels;++this.threadsCount;}}
通过register 可以看出,它的作用就是将管道注册到selector上,selector 中的publicKeys 记录所有注册的SelectionKey;
再来看下客户端的注册:
SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);socketChannel.connect(new InetSocketAddress(8080));// 连接事件注册到多路复用socketChannel.register(selector, SelectionKey.OP_CONNECT);
SocketChannel.open() 打开连接:
public static SocketChannel open() throws IOException {return SelectorProvider.provider().openSocketChannel();
}
SelectorProviderImpl 类中获取socket:
public SocketChannel openSocketChannel() throws IOException {return new SocketChannelImpl(this);
}
SocketChannelImpl 的构造方法:
private static NativeDispatcher nd;private final FileDescriptor fd;private final int fdVal;private volatile long readerThread = 0L;private volatile long writerThread = 0L;private final Object readLock = new Object();private final Object writeLock = new Object();private final Object stateLock = new Object();private boolean isReuseAddress;private static final int ST_UNINITIALIZED = -1;private static final int ST_UNCONNECTED = 0;private static final int ST_PENDING = 1;private static final int ST_CONNECTED = 2;private static final int ST_KILLPENDING = 3;private static final int ST_KILLED = 4;private int state = -1;private InetSocketAddress localAddress;private InetSocketAddress remoteAddress;private boolean isInputOpen = true;private boolean isOutputOpen = true;private boolean readyToConnect = false;private Socket socket;SocketChannelImpl(SelectorProvider var1) throws IOException {super(var1);this.fd = Net.socket(true);this.fdVal = IOUtil.fdVal(this.fd);this.state = 0;}
可以看到 new SocketChannelImpl(this)也仅仅是做了初始化,所以返回的SocketChannel 要有自己建立连接的操作;
继续看 socketChannel.register(selector, SelectionKey.OP_CONNECT);
同样调用AbstractSelectableChannel 的register 完成注册:
public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException{synchronized (regLock) {if (!isOpen())// 检查SocketChannel 是否建立连接throw new ClosedChannelException();if ((ops & ~validOps()) != 0)// 检查SocketChannel 的操作符throw new IllegalArgumentException();if (blocking)// 检查SocketChannel 通道是否阻塞throw new IllegalBlockingModeException();// 当前SocketChannel 通道是否已经注册到 当前的selector 对象上SelectionKey k = findKey(sel);if (k != null) {k.interestOps(ops);k.attach(att);}if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}}
总结:
1 服务端 ServerSocketChannel.open() 和客户端的SocketChannel.open() 只是做了初始化,它们的连接并没有建立;
2 register 方法用来将当前的通道注册到selector对象上,selector 的WindowsSelectorImpl 实例记录了注册的通道信息;