Hadoop之DataNode启动源码解析
添加依赖
为了能够编译和运行 Hadoop 的 DataNode 组件,我们需要在项目的 pom.xml
文件中添加以下依赖:
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency>
</dependencies>
DataNode 类介绍
DataNode
类是 Hadoop 分布式文件系统 (HDFS) 中的一个核心组件,它负责存储文件系统的数据块。每个部署可以包含一个或多个 DataNode
实例。DataNode
与 NameNode
通信以报告其存储状态,并响应来自 NameNode
的指令,如删除或复制块等操作。此外,DataNode
还需要与客户端代码和其他 DataNode
进行交互。
DataNode 主程序入口
DataNode
的主程序入口点位于 main
方法中,该方法首先检查命令行参数是否存在帮助请求,然后调用 secureMain
方法初始化 DataNode
实例。
public static void main(String args[]) {if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {System.exit(0);}secureMain(args, null);
}public static void secureMain(String args[], SecureResources resources) {int errorCode = 0;try {StringUtils.startupShutdownMessage(DataNode.class, args, LOG);DataNode datanode = createDataNode(args, null, resources);… …} catch (Throwable e) {LOG.error("Exception in secureMain", e);terminate(1, e);} finally {LOG.warn("Exiting Datanode");terminate(errorCode);}
}
DataNode 实例化
createDataNode
方法用于实例化 DataNode
对象,并启动其守护进程。
public static DataNode createDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {// 初始化DNDataNode dn = instantiateDataNode(args, conf, resources);if (dn != null) {// 启动DN进程dn.runDatanodeDaemon();}return dn;
}public static DataNode instantiateDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {... ...return makeInstance(dataLocations, conf, resources);
}static DataNode makeInstance(Collection<StorageLocation> dataDirs,Configuration conf, SecureResources resources) throws IOException {... ...return new DataNode(conf, locations, storageLocationChecker, resources);
}
DataNode 构造函数
构造函数初始化了 DataNode
的主要属性,并启动了必要的组件。
DataNode(final Configuration conf,final List<StorageLocation> dataDirs,final StorageLocationChecker storageLocationChecker,final SecureResources resources) throws IOException {super(conf);... ...try {hostName = getHostName(conf);LOG.info("Configured hostname is {}", hostName);// 启动DNstartDataNode(dataDirs, resources);} catch (IOException ie) {shutdown();throw ie;}... ...
}
DataNode 启动过程
startDataNode
方法初始化了 DataNode
的关键组件,包括数据存储 (DataStorage
)、MXBean 注册、DataXceiver 服务器、HTTP 服务器等。
void startDataNode(List<StorageLocation> dataDirectories,SecureResources resources) throws IOException {... ...// 创建数据存储对象storage = new DataStorage();// global DN settingsregisterMXBean();// 初始化DataXceiverinitDataXceiver();// 启动HttpServerstartInfoServer();pauseMonitor = new JvmPauseMonitor();pauseMonitor.init(getConf());pauseMonitor.start();// BlockPoolTokenSecretManager is required to create ipc server.this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();// Login is done by now. Set the DN user name.dnUserName = UserGroupInformation.getCurrentUser().getUserName();LOG.info("dnUserName = {}", dnUserName);LOG.info("supergroup = {}", supergroup);// 初始化RPC服务initIpcServer();metrics = DataNodeMetrics.create(getConf(), getDisplayName());peerMetrics = dnConf.peerStatsEnabled ?DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);ecWorker = new ErasureCodingWorker(getConf(), this);blockRecoveryWorker = new BlockRecoveryWorker(this);// 创建BlockPoolManagerblockPoolManager = new BlockPoolManager(this);// 心跳管理blockPoolManager.refreshNamenodes(getConf());// Create the ReadaheadPool from the DataNode context so we can// exit without having to explicitly shutdown its thread pool.readaheadPool = ReadaheadPool.getInstance();saslClient = new SaslDataTransferClient(dnConf.getConf(),dnConf.saslPropsResolver, dnConf.trustedChannelResolver);saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);startMetricsLogger();if (dnConf.diskStatsEnabled) {diskMetrics = new DataNodeDiskMetrics(this,dnConf.outliersReportIntervalMs);}
}
初始化DataXceiverServer
initDataXceiver
方法创建并启动了 DataXceiverServer
,它是 DataNode
用来接收客户端和其他 DataNode
发送过来的数据的服务。
private void initDataXceiver() throws IOException {// dataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务this.dataXceiverServer = new Daemon(threadGroup, xserver);this.threadGroup.setDaemon(true); // auto destroy when empty... ...
}
初始化HTTP服务
startInfoServer
方法初始化并启动了 HTTP 服务器,用于提供有关 DataNode
的信息和服务。
private void startInfoServer()throws IOException {// SecureDataNodeStarter will bind the privileged port to the channel if// the DN is started by JSVC, pass it along.ServerSocketChannel httpServerChannel = secureResources != null ?secureResources.getHttpServerChannel() : null;httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);httpServer.start();if (httpServer.getHttpAddress() != null) {infoPort = httpServer.getHttpAddress().getPort();}if (httpServer.getHttpsAddress() != null) {infoSecurePort = httpServer.getHttpsAddress().getPort();}
}
DatanodeHttpServer 构造函数
DatanodeHttpServer
构造函数用于创建 HTTP 服务器实例。
public DatanodeHttpServer(final Configuration conf,final DataNode datanode,final ServerSocketChannel externalHttpChannel)throws IOException {... ...HttpServer2.Builder builder = new HttpServer2.Builder().setName("datanode").setConf(confForInfoServer).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))).hostName(getHostnameForSpnegoPrincipal(confForInfoServer)).addEndpoint(URI.create("http://localhost:" + proxyPort)).setFindPort(true);... ...
}
初始化DataNode的RPC服务端
初始化 DataNode 的 RPC 服务端涉及到配置和启动相关的服务,以供客户端连接。
private void initIpcServer() throws IOException {InetSocketAddress ipcAddr = NetUtils.createSocketAddr(getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));... ...ipcServer = new RPC.Builder(getConf()).setProtocol(ClientDatanodeProtocolPB.class).setInstance(service).setBindAddress(ipcAddr.getHostName()).setPort(ipcAddr.getPort()).setNumHandlers(getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false).setSecretManager(blockPoolTokenSecretManager).build();... ...
}
DataNode 向 NameNode 注册
DataNode 需要向 NameNode 注册自身,以便 NameNode 可以跟踪和管理集群中的所有 DataNode。
void refreshNamenodes(Configuration conf)throws IOException {... ...synchronized (refreshNamenodesLock) {doRefreshNamenodes(newAddressMap, newLifelineAddressMap);}
}private void doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap,Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)throws IOException {… …synchronized (this) {… …// Step 3. Start new nameservicesif (!toAdd.isEmpty()) {for (String nsToAdd : toAdd) {… …BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);bpByNameserviceId.put(nsToAdd, bpos);offerServices.add(bpos);}}startAll();}… …
}protected BPOfferService createBPOS(final String nameserviceId,List<InetSocketAddress> nnAddrs,List<InetSocketAddress> lifelineNnAddrs) {// 根据NameNode个数创建对应的服务return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}
启动所有的服务
启动所有的服务涉及到创建服务实例并开始它们的生命周期。
synchronized void startAll() throws IOException {try {UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {for (BPOfferService bpos : offerServices) {// 启动服务bpos.start();}return null;}});} catch (InterruptedException ex) {... ...}
}void start() {for (BPServiceActor actor : bpServices) {actor.start();}
}void start() {... ...bpThread = new Thread(this);bpThread.setDaemon(true); // needed for JUnit testing// 表示开启一个线程,所有查找该线程的 run 方法bpThread.start();if (lifelineSender != null) {lifelineSender.start();}
}
DataNode 服务线程运行
DataNode 服务线程运行涉及初始化和心跳发送。
public void run() {LOG.info(this + " starting to offer service");try {while (true) {// init stufftry {// setup storage// 向NN 注册connectToNNAndHandshake();break;} catch (IOException ioe) {// Initial handshake, storage recovery or registration failedrunningState = RunningState.INIT_FAILED;if (shouldRetryInit()) {// Retry until all namenode's of BPOS failed initializationLOG.error("Initialization failed for " + this + " "+ ioe.getLocalizedMessage());// 注册失败,5s后重试sleepAndLogInterrupts(5000, "initializing");} else {runningState = RunningState.FAILED;LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);return;}}}… …while (shouldRun()) {try {// 发送心跳offerService();} catch (Exception ex) {... ...}}
}private void connectToNNAndHandshake() throws IOException {// get NN proxy 获取NN的RPC客户端对象bpNamenode = dn.connectToNN(nnAddr);// First phase of the handshake with NN - get the namespace// info.NamespaceInfo nsInfo = retrieveNamespaceInfo();// Verify that this matches the other NN in this HA pair.// This also initializes our block pool in the DN if we are// the first NN connection for this BP.bpos.verifyAndSetNamespaceInfo(this, nsInfo);/* set thread name again to include NamespaceInfo when it's available. */this.bpThread.setName(formatThreadName("heartbeating", nnAddr));// 注册register(nsInfo);
}
DataNode 注册至 NameNode
DataNode 注册至 NameNode 包括创建注册信息以及实际的注册过程。
void register(NamespaceInfo nsInfo) throws IOException {// 创建注册信息DatanodeRegistration newBpRegistration = bpos.createRegistration();LOG.info(this + " beginning handshake with NN");while (shouldRun()) {try {// Use returned registration from namenode with updated fields// 把注册信息发送给NN(DN调用接口方法,执行在NN)newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);newBpRegistration.setNamespaceInfo(nsInfo);bpRegistration = newBpRegistration;break;} catch(EOFException e) { // namenode might have just restartedLOG.info("Problem connecting to server: " + nnAddr + " :"+ e.getLocalizedMessage());sleepAndLogInterrupts(1000, "connecting to server");} catch(SocketTimeoutException e) { // namenode is busyLOG.info("Problem connecting to server: " + nnAddr);sleepAndLogInterrupts(1000, "connecting to server");}}… …
}
NameNode 处理注册
NameNode 接收到 DataNode 的注册请求后,会进行处理。
public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)throws IOException {checkNNStartup();verifySoftwareVersion(nodeReg);// 注册DNnamesystem.registerDatanode(nodeReg);return nodeReg;
}void registerDatanode(DatanodeRegistration nodeReg) throws IOException {writeLock();try {blockManager.registerDatanode(nodeReg);} finally {writeUnlock("registerDatanode");}
}public void registerDatanode(DatanodeRegistration nodeReg)throws DisallowedDatanodeException, UnresolvedTopologyException {... ...// register new datanode 注册DNaddDatanode(nodeDescr);blockManager.getBlockReportLeaseManager().register(nodeDescr);// also treat the registration message as a heartbeat// no need to update its timestamp// because its is done when the descriptor is created// 将DN添加到心跳管理heartbeatManager.addDatanode(nodeDescr);heartbeatManager.updateDnStat(nodeDescr);incrementVersionCount(nodeReg.getSoftwareVersion());startAdminOperationIfNecessary(nodeDescr);success = true;... ...
}void addDatanode(final DatanodeDescriptor node) {// To keep host2DatanodeMap consistent with datanodeMap,// remove from host2DatanodeMap the datanodeDescriptor removed// from datanodeMap before adding node to host2DatanodeMap.synchronized(this) {host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));}networktopology.add(node); // may throw InvalidTopologyExceptionhost2DatanodeMap.add(node);checkIfClusterIsNowMultiRack(node);resolveUpgradeDomain(node);… …
}