Hadoop之DataNode启动源码解析

server/2024/9/24 17:27:52/

Hadoop之DataNode启动源码解析

添加依赖

为了能够编译和运行 Hadoop 的 DataNode 组件,我们需要在项目的 pom.xml 文件中添加以下依赖:

<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.3</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency>
</dependencies>
DataNode 类介绍

DataNode 类是 Hadoop 分布式文件系统 (HDFS) 中的一个核心组件,它负责存储文件系统的数据块。每个部署可以包含一个或多个 DataNode 实例。DataNodeNameNode 通信以报告其存储状态,并响应来自 NameNode 的指令,如删除或复制块等操作。此外,DataNode 还需要与客户端代码和其他 DataNode 进行交互。

DataNode 主程序入口

DataNode 的主程序入口点位于 main 方法中,该方法首先检查命令行参数是否存在帮助请求,然后调用 secureMain 方法初始化 DataNode 实例。

public static void main(String args[]) {if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {System.exit(0);}secureMain(args, null);
}public static void secureMain(String args[], SecureResources resources) {int errorCode = 0;try {StringUtils.startupShutdownMessage(DataNode.class, args, LOG);DataNode datanode = createDataNode(args, null, resources);… …} catch (Throwable e) {LOG.error("Exception in secureMain", e);terminate(1, e);} finally {LOG.warn("Exiting Datanode");terminate(errorCode);}
}
DataNode 实例化

createDataNode 方法用于实例化 DataNode 对象,并启动其守护进程。

public static DataNode createDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {// 初始化DNDataNode dn = instantiateDataNode(args, conf, resources);if (dn != null) {// 启动DN进程dn.runDatanodeDaemon();}return dn;
}public static DataNode instantiateDataNode(String args[], Configuration conf,SecureResources resources) throws IOException {... ...return makeInstance(dataLocations, conf, resources);
}static DataNode makeInstance(Collection<StorageLocation> dataDirs,Configuration conf, SecureResources resources) throws IOException {... ...return new DataNode(conf, locations, storageLocationChecker, resources);
}
DataNode 构造函数

构造函数初始化了 DataNode 的主要属性,并启动了必要的组件。

DataNode(final Configuration conf,final List<StorageLocation> dataDirs,final StorageLocationChecker storageLocationChecker,final SecureResources resources) throws IOException {super(conf);... ...try {hostName = getHostName(conf);LOG.info("Configured hostname is {}", hostName);// 启动DNstartDataNode(dataDirs, resources);} catch (IOException ie) {shutdown();throw ie;}... ...
}
DataNode 启动过程

startDataNode 方法初始化了 DataNode 的关键组件,包括数据存储 (DataStorage)、MXBean 注册、DataXceiver 服务器、HTTP 服务器等。

void startDataNode(List<StorageLocation> dataDirectories,SecureResources resources) throws IOException {... ...// 创建数据存储对象storage = new DataStorage();// global DN settingsregisterMXBean();// 初始化DataXceiverinitDataXceiver();// 启动HttpServerstartInfoServer();pauseMonitor = new JvmPauseMonitor();pauseMonitor.init(getConf());pauseMonitor.start();// BlockPoolTokenSecretManager is required to create ipc server.this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();// Login is done by now. Set the DN user name.dnUserName = UserGroupInformation.getCurrentUser().getUserName();LOG.info("dnUserName = {}", dnUserName);LOG.info("supergroup = {}", supergroup);// 初始化RPC服务initIpcServer();metrics = DataNodeMetrics.create(getConf(), getDisplayName());peerMetrics = dnConf.peerStatsEnabled ?DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);ecWorker = new ErasureCodingWorker(getConf(), this);blockRecoveryWorker = new BlockRecoveryWorker(this);// 创建BlockPoolManagerblockPoolManager = new BlockPoolManager(this);// 心跳管理blockPoolManager.refreshNamenodes(getConf());// Create the ReadaheadPool from the DataNode context so we can// exit without having to explicitly shutdown its thread pool.readaheadPool = ReadaheadPool.getInstance();saslClient = new SaslDataTransferClient(dnConf.getConf(),dnConf.saslPropsResolver, dnConf.trustedChannelResolver);saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);startMetricsLogger();if (dnConf.diskStatsEnabled) {diskMetrics = new DataNodeDiskMetrics(this,dnConf.outliersReportIntervalMs);}
}
初始化DataXceiverServer

initDataXceiver 方法创建并启动了 DataXceiverServer,它是 DataNode 用来接收客户端和其他 DataNode 发送过来的数据的服务。

private void initDataXceiver() throws IOException {// dataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务this.dataXceiverServer = new Daemon(threadGroup, xserver);this.threadGroup.setDaemon(true); // auto destroy when empty... ...
}
初始化HTTP服务

startInfoServer 方法初始化并启动了 HTTP 服务器,用于提供有关 DataNode 的信息和服务。

private void startInfoServer()throws IOException {// SecureDataNodeStarter will bind the privileged port to the channel if// the DN is started by JSVC, pass it along.ServerSocketChannel httpServerChannel = secureResources != null ?secureResources.getHttpServerChannel() : null;httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);httpServer.start();if (httpServer.getHttpAddress() != null) {infoPort = httpServer.getHttpAddress().getPort();}if (httpServer.getHttpsAddress() != null) {infoSecurePort = httpServer.getHttpsAddress().getPort();}
}
DatanodeHttpServer 构造函数

DatanodeHttpServer 构造函数用于创建 HTTP 服务器实例。

public DatanodeHttpServer(final Configuration conf,final DataNode datanode,final ServerSocketChannel externalHttpChannel)throws IOException {... ...HttpServer2.Builder builder = new HttpServer2.Builder().setName("datanode").setConf(confForInfoServer).setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))).hostName(getHostnameForSpnegoPrincipal(confForInfoServer)).addEndpoint(URI.create("http://localhost:" + proxyPort)).setFindPort(true);... ...
}
初始化DataNode的RPC服务端

初始化 DataNode 的 RPC 服务端涉及到配置和启动相关的服务,以供客户端连接。

private void initIpcServer() throws IOException {InetSocketAddress ipcAddr = NetUtils.createSocketAddr(getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));... ...ipcServer = new RPC.Builder(getConf()).setProtocol(ClientDatanodeProtocolPB.class).setInstance(service).setBindAddress(ipcAddr.getHostName()).setPort(ipcAddr.getPort()).setNumHandlers(getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false).setSecretManager(blockPoolTokenSecretManager).build();... ...
}
DataNode 向 NameNode 注册

DataNode 需要向 NameNode 注册自身,以便 NameNode 可以跟踪和管理集群中的所有 DataNode。

void refreshNamenodes(Configuration conf)throws IOException {... ...synchronized (refreshNamenodesLock) {doRefreshNamenodes(newAddressMap, newLifelineAddressMap);}
}private void doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap,Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)throws IOException {… …synchronized (this) {… …// Step 3. Start new nameservicesif (!toAdd.isEmpty()) {for (String nsToAdd : toAdd) {… …BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);bpByNameserviceId.put(nsToAdd, bpos);offerServices.add(bpos);}}startAll();}… …
}protected BPOfferService createBPOS(final String nameserviceId,List<InetSocketAddress> nnAddrs,List<InetSocketAddress> lifelineNnAddrs) {// 根据NameNode个数创建对应的服务return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}
启动所有的服务

启动所有的服务涉及到创建服务实例并开始它们的生命周期。

synchronized void startAll() throws IOException {try {UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Object>() {@Overridepublic Object run() throws Exception {for (BPOfferService bpos : offerServices) {// 启动服务bpos.start();}return null;}});} catch (InterruptedException ex) {... ...}
}void start() {for (BPServiceActor actor : bpServices) {actor.start();}
}void start() {... ...bpThread = new Thread(this);bpThread.setDaemon(true); // needed for JUnit testing// 表示开启一个线程,所有查找该线程的 run 方法bpThread.start();if (lifelineSender != null) {lifelineSender.start();}
}
DataNode 服务线程运行

DataNode 服务线程运行涉及初始化和心跳发送。

public void run() {LOG.info(this + " starting to offer service");try {while (true) {// init stufftry {// setup storage// 向NN 注册connectToNNAndHandshake();break;} catch (IOException ioe) {// Initial handshake, storage recovery or registration failedrunningState = RunningState.INIT_FAILED;if (shouldRetryInit()) {// Retry until all namenode's of BPOS failed initializationLOG.error("Initialization failed for " + this + " "+ ioe.getLocalizedMessage());// 注册失败,5s后重试sleepAndLogInterrupts(5000, "initializing");} else {runningState = RunningState.FAILED;LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);return;}}}… …while (shouldRun()) {try {// 发送心跳offerService();} catch (Exception ex) {... ...}}
}private void connectToNNAndHandshake() throws IOException {// get NN proxy 获取NN的RPC客户端对象bpNamenode = dn.connectToNN(nnAddr);// First phase of the handshake with NN - get the namespace// info.NamespaceInfo nsInfo = retrieveNamespaceInfo();// Verify that this matches the other NN in this HA pair.// This also initializes our block pool in the DN if we are// the first NN connection for this BP.bpos.verifyAndSetNamespaceInfo(this, nsInfo);/* set thread name again to include NamespaceInfo when it's available. */this.bpThread.setName(formatThreadName("heartbeating", nnAddr));// 注册register(nsInfo);
}
DataNode 注册至 NameNode

DataNode 注册至 NameNode 包括创建注册信息以及实际的注册过程。

void register(NamespaceInfo nsInfo) throws IOException {// 创建注册信息DatanodeRegistration newBpRegistration = bpos.createRegistration();LOG.info(this + " beginning handshake with NN");while (shouldRun()) {try {// Use returned registration from namenode with updated fields// 把注册信息发送给NN(DN调用接口方法,执行在NN)newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);newBpRegistration.setNamespaceInfo(nsInfo);bpRegistration = newBpRegistration;break;} catch(EOFException e) {  // namenode might have just restartedLOG.info("Problem connecting to server: " + nnAddr + " :"+ e.getLocalizedMessage());sleepAndLogInterrupts(1000, "connecting to server");} catch(SocketTimeoutException e) {  // namenode is busyLOG.info("Problem connecting to server: " + nnAddr);sleepAndLogInterrupts(1000, "connecting to server");}}… …
}
NameNode 处理注册

NameNode 接收到 DataNode 的注册请求后,会进行处理。

public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)throws IOException {checkNNStartup();verifySoftwareVersion(nodeReg);// 注册DNnamesystem.registerDatanode(nodeReg);return nodeReg;
}void registerDatanode(DatanodeRegistration nodeReg) throws IOException {writeLock();try {blockManager.registerDatanode(nodeReg);} finally {writeUnlock("registerDatanode");}
}public void registerDatanode(DatanodeRegistration nodeReg)throws DisallowedDatanodeException, UnresolvedTopologyException {... ...// register new datanode 注册DNaddDatanode(nodeDescr);blockManager.getBlockReportLeaseManager().register(nodeDescr);// also treat the registration message as a heartbeat// no need to update its timestamp// because its is done when the descriptor is created// 将DN添加到心跳管理heartbeatManager.addDatanode(nodeDescr);heartbeatManager.updateDnStat(nodeDescr);incrementVersionCount(nodeReg.getSoftwareVersion());startAdminOperationIfNecessary(nodeDescr);success = true;... ...
}void addDatanode(final DatanodeDescriptor node) {// To keep host2DatanodeMap consistent with datanodeMap,// remove  from host2DatanodeMap the datanodeDescriptor removed// from datanodeMap before adding node to host2DatanodeMap.synchronized(this) {host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));}networktopology.add(node); // may throw InvalidTopologyExceptionhost2DatanodeMap.add(node);checkIfClusterIsNowMultiRack(node);resolveUpgradeDomain(node);… …
}

http://www.ppmy.cn/server/101898.html

相关文章

农业物联网6公里WiFi远距离传输模块,信号传输不再受限,抗干扰、连接快

在数字化时代&#xff0c;WiFi已成为我们生活中不可或缺的一部分。然而&#xff0c;你是否曾遇到过这样的困扰&#xff1a;在户外或大型场所&#xff0c;WiFi信号总是那么微弱&#xff0c;甚至完全无法连接。这时候&#xff0c;一种能够实现6公里远距离WiFi传输的神秘模块便应运…

uniapp本地打包app安装说明

uniapp本地打包app安装说明 目录 uniapp本地打包app安装说明一、打包说明1.HBuilder X 生成本地打包资源2.Android Studio和App离线SDK环境准备2.1 下载Android Studio和 App离线SDK2.2 资源替换2.3 id属性值修改。2.4 添加provider信息到AndroidManifest.xml中的<applicati…

HarmonyOS NEXT - Navigation组件封装BaseNavigation

demo 地址: https://github.com/iotjin/JhHarmonyDemo 代码不定时更新&#xff0c;请前往github查看最新代码 在demo中这些组件和工具类都通过module实现了&#xff0c;具体可以参考HarmonyOS NEXT - 通过 module 模块化引用公共组件和utils 官方介绍 组件导航 (Navigation)(推…

GNU/Linux - GNU Software之ncurses

下载了Linux Kernel&#xff0c;要make menuconfig时&#xff0c;出现错误&#xff1a; $ make menuconfig HOSTCC scripts/basic/fixdep HOSTCC scripts/kconfig/mconf.o <command-line>: fatal error: curses.h: No such file or directory compilation terminated…

【python】OpenCV—Optical Flow

文章目录 1、光流2、Opencv 中光流的实现3、稀疏光流4、密集光流4.1、farneback4.2、lucaskanade_dense4.3、rlof 5、涉及到的库5.1、cv2.goodFeaturesToTrack5.2、cv2.calcOpticalFlowPyrLK5.3、cv2.optflow.calcOpticalFlowSparseToDense5.4、cv2.calcOpticalFlowFarneback5.…

c语言中比较特殊的输入函数

目录 一.getchar()函数 1.基本功能 2.使用方法 (1).读取单个字符 (2).读取多个字符&#xff08;直到遇到换行符&#xff09; (3).处理输入中的空白字符 3.返回值 4.应用场景 5.注意事项 二.fgets()函数 1.函数原型 2.工作原理 3.使用示例 (1).从标准输入读取一行…

外卖系统小程序安卓app如何开发运营?

外卖系统的有效开发是一个复杂而系统的过程&#xff0c;需要综合考虑需求分析、系统设计、技术选型、开发实施、测试上线以及后期维护等多个方面。以下是一个详细的开发流程建议&#xff1a; 一、需求分析 用户调研&#xff1a;深入了解目标用户群体&#xff08;包括消费者、…

【网络编程】 TCP机械臂测试(C语言)

目录 前言&#xff1a; 代码实现&#xff1a; 输出结果如下&#xff1a; 前言&#xff1a; 1、通过以下操作实现机械臂控制 w(红色臂角度增大) s(红色臂角度减小) d(蓝色臂角度增大) a(蓝色臂角度减小)按键控制机械臂 >>需要对机械臂发…