1.概述
Zookeeper作为一个服务器,需要与客户端进行网络通信,Zookeeper使用ServerCnxFactory管理与客户端的连接,其中有两个实现,一个是NIOServerCnxnFactory,使用java原生Nio实现,一个是NettyServerCnxnFactory,使用netty实现。
2.NIOServerCnxnFactory
使用的是java的NIO思路,1个accept Thread,该线程主要接收客户端的连接,并将其分配给selector thread; selector thread:该线程执行select(),由于在处理大量连接时,select()会成为性能瓶颈,因此启动多个selector thread,使用zookeeper.nio.numSelectorThreads来进行配置该类线程数,默认个数为核心数/2;worker thread:该线程执行基本的操作,可以理解为真正的处理线程,使用zookeeper.nio.numWorkerThreads来进行配置该线程的线程数,默认的线程数为:核心线程数*2;connection expiration thread:连接上的session过期,则关闭该连接。
上面是对这个NIOServerCnxnFactory类类上的注释说明
/*** NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using* NIO non-blocking socket calls. Communication between threads is handled via* queues.** - 1 accept thread, which accepts new connections and assigns to a* selector thread* - 1-N selector threads, each of which selects on 1/N of the connections.* The reason the factory supports more than one selector thread is that* with large numbers of connections, select() itself can become a* performance bottleneck.* - 0-M socket I/O worker threads, which perform basic socket reads and* writes. If configured with 0 worker threads, the selector threads* do the socket I/O directly.* - 1 connection expiration thread, which closes idle connections; this is* necessary to expire connections on which no session is established.** Typical (default) thread counts are: on a 32 core machine, 1 accept thread,* 1 connection expiration thread, 4 selector threads, and 64 worker threads.*/
public class NIOServerCnxnFactory extends ServerCnxnFactory
2.1 AcceptThread
下面是AcceptThread线程的源码,我先把里面的实现方法给删除了。该线程的执行流程:run方法执行selector.select(),并进行调用doAccept()接收客户端连接,因此我们需要重点关注doAccept()方法
private class AcceptThread extends AbstractSelectThread {private final ServerSocketChannel acceptSocket;private final SelectionKey acceptKey;private final RateLogger acceptErrorLogger = new RateLogger(LOG);private final Collection<SelectorThread> selectorThreads;private Iterator<SelectorThread> selectorIterator;private volatile boolean reconfiguring = false;public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr, Set<SelectorThread> selectorThreads) throws IOException {}public void run() {}public void setReconfiguring() {reconfiguring = true;}private void select() {