Spring架构篇--2.5.2 远程通信基础Select 源码篇--window--sokcet.register

news/2024/11/25 17:45:44/

前言:通过Selector.open() 获取到Selector 的选择器后,服务端和客户的socket 都可以通过register 进行socker 的注册;

服务端 ServerSocketChannel 的注册:

  ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);// 设置非阻塞serverSocketChannel.socket().bind(new InetSocketAddress(8080));// 连接事件注册到多路复用serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

ServerSocketChannel 的类关系:
1)ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl;
2)ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel;
3) SelChImpl extends Channe;

先看下ServerSocketChannel 实例的初始化: ServerSocketChannel.open()
ServerSocketChannel 类:

public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();
}

可以看到先拿到了Select.open() 时创建的SelectorProvider 实例,然后在调用openServerSocketChannel;
SelectorProviderImpl 类:

 public ServerSocketChannel openServerSocketChannel() throws IOException {// 创建ServerSocketChannelImpl 实例return new ServerSocketChannelImpl(this);
}public SocketChannel openSocketChannel() throws IOException {return new SocketChannelImpl(this);
}

ServerSocketChannelImpl 类:

 private static NativeDispatcher nd;private final FileDescriptor fd;private int fdVal;private volatile long thread = 0L;private final Object lock = new Object();private final Object stateLock = new Object();private static final int ST_UNINITIALIZED = -1;private static final int ST_INUSE = 0;private static final int ST_KILLED = 1;private int state = -1;private InetSocketAddress localAddress;private boolean isReuseAddress;ServerSocket socket;ServerSocketChannelImpl(SelectorProvider var1) throws IOException {// 传递创建的 SelectorProvider 实例super(var1);this.fd = Net.serverSocket(true);this.fdVal = IOUtil.fdVal(this.fd);this.state = 0;}

可以看到 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();中 serverSocketChannel 实际指向的是其子类(ServerSocketChannelImpl)对象。
要说明的是:ServerSocketChannel ssc = ServerSocketChannel.open()创建的这个新的Channel中的Socket是最初的,必须对这个Socket通过bind方法绑定指定的地址之后才能接收连接;

再来看服务端socket 的建立: serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.socket():
ServerSocketChannelImpl 类中:

public ServerSocket socket() {synchronized(this.stateLock) {// 加锁保证 同一个ServerSocketChannel socket 的唯一性if (this.socket == null) {this.socket = ServerSocketAdaptor.create(this);}return this.socket;}
}

serverSocketChannel.socket().bind() 绑定要监听的端口;

接下来看serverSocketChannel实例的注册:
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
SelectableChannel 类中register 方法返回注册的SelectionKey:

 public final SelectionKey register(Selector sel, int ops)throws ClosedChannelException{return register(sel, ops, null);}

调用: AbstractSelectableChannel 类中register 方法:

 public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException
{synchronized (regLock) {// 获取锁if (!isOpen())// serverSocketChannel 通道是否打开throw new ClosedChannelException();if ((ops & ~validOps()) != 0)// serverSocketChannel 的操作行验证throw new IllegalArgumentException();if (blocking)// 如果建立的serverSocketChannel 通道是阻塞的直接抛出异常throw new IllegalBlockingModeException();//  当前通道有没在 Selector进行过注册,如果已经注册则修改属性后直接返回SelectionKey k = findKey(sel);if (k != null) {k.interestOps(ops);k.attach(att);}// 当前通道没有在当前selector 注册if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();// 当前管道注册到selector 选择器上k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}
}

validOps:

public final int validOps() {return SelectionKey.OP_ACCEPT;
}

再来看下 ((AbstractSelector)sel).register(this, ops, att):
实现将当前管道注册到当前的Selector 上并返回SelectionKey

protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {if (!(var1 instanceof SelChImpl)) {throw new IllegalSelectorException();} else {// 新建 SelectionKeyImpl 赋值属性SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);var4.attach(var3);synchronized(this.publicKeys) {// 防止并发多个管道同时 向selector 注册的安全性问题this.implRegister(var4);}var4.interestOps(var2);// 返回新建的SelectionKey 对新return var4;}
}

SelectionKeyImpl的构造方法:可以看出将当前selector 和要注册的管道封装到了SelectionKeyImpl 对象中

final SelChImpl channel;
public final SelectorImpl selector;private int index;private volatile int interestOps;private int readyOps;SelectionKeyImpl(SelChImpl var1, SelectorImpl var2) {this.channel = var1;this.selector = var2;}

继续看 this.implRegister(var4) 具体的注册方法:
WindowsSelectorImpl类

 protected void implRegister(SelectionKeyImpl var1) {// 传入上一步封装的SelectionKeyImpl synchronized(this.closeLock) {// 获取selector 的关闭锁,防止正在注册的时候 ,selector关闭if (this.pollWrapper == null) {throw new ClosedSelectorException();} else {// SelectionKeyImpl  管道数组是否需要进行扩容this.growIfNeeded();// 当前管道放入channelArray数组中this.channelArray[this.totalChannels] = var1;var1.setIndex(this.totalChannels);this.fdMap.put(var1);// publicKeys 增加注册的管道SelectionKeythis.keys.add(var1);// 管道增加到 pollWrapperthis.pollWrapper.addEntry(this.totalChannels, var1);// 管道数量增加++this.totalChannels;}}
}

将管道放入到channelArray 和pollWrapper 中,并增加管道的数量;

growIfNeeded 扩容方法:

private void growIfNeeded() {if (this.channelArray.length == this.totalChannels) {// 2 倍扩容int var1 = this.totalChannels * 2;SelectionKeyImpl[] var2 = new SelectionKeyImpl[var1];System.arraycopy(this.channelArray, 1, var2, 1, this.totalChannels - 1);this.channelArray = var2;this.pollWrapper.grow(var1);}// 辅助线程的数量,当注册到selector 的数量每次达到1024个,就将辅助线程+1if (this.totalChannels % 1024 == 0) {this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, this.totalChannels);++this.totalChannels;++this.threadsCount;}}

通过register 可以看出,它的作用就是将管道注册到selector上,selector 中的publicKeys 记录所有注册的SelectionKey;

再来看下客户端的注册:

  SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);socketChannel.connect(new InetSocketAddress(8080));// 连接事件注册到多路复用socketChannel.register(selector, SelectionKey.OP_CONNECT);

SocketChannel.open() 打开连接:

public static SocketChannel open() throws IOException {return SelectorProvider.provider().openSocketChannel();
}

SelectorProviderImpl 类中获取socket:

public SocketChannel openSocketChannel() throws IOException {return new SocketChannelImpl(this);
}

SocketChannelImpl 的构造方法:

private static NativeDispatcher nd;private final FileDescriptor fd;private final int fdVal;private volatile long readerThread = 0L;private volatile long writerThread = 0L;private final Object readLock = new Object();private final Object writeLock = new Object();private final Object stateLock = new Object();private boolean isReuseAddress;private static final int ST_UNINITIALIZED = -1;private static final int ST_UNCONNECTED = 0;private static final int ST_PENDING = 1;private static final int ST_CONNECTED = 2;private static final int ST_KILLPENDING = 3;private static final int ST_KILLED = 4;private int state = -1;private InetSocketAddress localAddress;private InetSocketAddress remoteAddress;private boolean isInputOpen = true;private boolean isOutputOpen = true;private boolean readyToConnect = false;private Socket socket;SocketChannelImpl(SelectorProvider var1) throws IOException {super(var1);this.fd = Net.socket(true);this.fdVal = IOUtil.fdVal(this.fd);this.state = 0;}

可以看到 new SocketChannelImpl(this)也仅仅是做了初始化,所以返回的SocketChannel 要有自己建立连接的操作;
继续看 socketChannel.register(selector, SelectionKey.OP_CONNECT);
同样调用AbstractSelectableChannel 的register 完成注册:

 public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException{synchronized (regLock) {if (!isOpen())//  检查SocketChannel  是否建立连接throw new ClosedChannelException();if ((ops & ~validOps()) != 0)// 检查SocketChannel 的操作符throw new IllegalArgumentException();if (blocking)// 检查SocketChannel  通道是否阻塞throw new IllegalBlockingModeException();// 当前SocketChannel 通道是否已经注册到 当前的selector 对象上SelectionKey k = findKey(sel);if (k != null) {k.interestOps(ops);k.attach(att);}if (k == null) {// New registrationsynchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k = ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;}}

总结:
1 服务端 ServerSocketChannel.open() 和客户端的SocketChannel.open() 只是做了初始化,它们的连接并没有建立;
2 register 方法用来将当前的通道注册到selector对象上,selector 的WindowsSelectorImpl 实例记录了注册的通道信息;


http://www.ppmy.cn/news/27205.html

相关文章

手写线程池实例并测试

前言:在之前的文章中介绍过线程池的核心原理,在一次面试中面试官让手写线程池,这块知识忘记的差不多了,因此本篇文章做一个回顾。 希望能够加深自己的印象以及帮助到其他的小伙伴儿们😉😉。 如果文章有什么…

【二叉树】

1,利用类来构建结点,利用函数递归来构建树2,因为左子树的结点编号是父节点的2倍,右子树的结点编号是父节点的2倍1,所以可以用数组模拟建树的过程构建二叉树第一种构建方式class treenode():#二叉树节点def __init__(se…

canvas样式与颜色,字体,图片,状态,形变

色彩 fillStyle color 设置图形的填充颜色。 strokeStyle color 设置图形轮廓的颜色。 备注: 一旦您设置了 strokeStyle 或者 fillStyle 的值,那么这个新值就会成为新绘制的图形的默认值。如果你要给每个图形上不同的颜色,你需要重新设置…

JAVA练习58-汉明距离、颠倒二进制位

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 目录 前言 一、题目1-汉明距离 1.题目描述 2.思路与代码 2.1 思路 2.2 代码 二、题目2-颠倒二进制位 1.题目描述 2.思路与代码 2.1 思路 2.2 代码 总结 前言 提示…

【第一章 - 绪论】- 数据结构(近八千字详解)

目录 一、 数据结构的研究内容 二、基本概念和术语 2.1 - 数据、数据元素、数据项和数据对象 2.2 - 数据结构 2.2.1 - 逻辑结构 2.2.2 - 存储结构 2.3 - 数据类型和抽象数据类型 三、抽象数据类型的表现与实现 四、算法和算法分析 4.1 - 算法的定义及特性 4.2 - 评价…

Scala的变量声明

文章目录变量声明(一)简单说明(二)利用val声明变量1,声明方式2,案例演示(三)利用var声明变量1,声明方式2,案例演示(四)换行输入语句&a…

acwing 2058笨拙的手指

acwing 2058笨拙的手指 奶牛贝茜正在学习如何在不同进制之间转换数字。 但是她总是犯错误,因为她无法轻易的用两个前蹄握住笔。 每当贝茜将数字转换为一个新的进制并写下结果时,她总是将其中的某一位数字写错。 例如,如果她将数字 14 转…

[qiankun]实战问题汇总

[qiankun]实战问题汇总ERROR SyntaxError: Cannot use import statement outside a module问题分析解决方案子应用命名问题问题分析解决方案jsonpFunction详细错误信息问题分析解决方案微应用的注册问题Uncaught Error: application cli5-beta6-test-name died in status LOADI…