HDFS 写流程源码分析
- 一、客户端
- 二、NameNode端
- (一)create
- (二)addBlock
环境为hadoop 3.1.3
一、客户端
HDFS写流程源码分析(一)-客户端
二、NameNode端
(一)create
首先找到NameNode的rpc服务端,进入NameNodeRpcServer.create()
。
public HdfsFileStatus create(String src, FsPermission masked,String clientName, EnumSetWritable<CreateFlag> flag,boolean createParent, short replication, long blockSize,CryptoProtocolVersion[] supportedVersions, String ecPolicyName)throws IOException {checkNNStartup();// 发起请求的客户端ipString clientMachine = getClientMachine(); if (stateChangeLog.isDebugEnabled()) {stateChangeLog.debug("*DIR* NameNode.create: file "+src+" for "+clientName+" at "+clientMachine);}// 目录的长度(8000)和深度(1000)是否超出限制if (!checkPathLength(src)) { throw new IOException("create: Pathname too long. Limit "+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");}// 当前NameNode的状态(active、backup、standby)是否支持该操作namesystem.checkOperation(OperationCategory.WRITE); CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);if (cacheEntry != null && cacheEntry.isSuccess()) {return (HdfsFileStatus) cacheEntry.getPayload();}HdfsFileStatus status = null;try {PermissionStatus perm = new PermissionStatus(getRemoteUser().getShortUserName(), null, masked);// 创建文件status = namesystem.startFile(src, perm, clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions,ecPolicyName, cacheEntry != null);} finally {RetryCache.setState(cacheEntry, status != null, status);}metrics.incrFilesCreated();metrics.incrCreateFileOps();return status;}
该方法创建了文件,并返回了fileId
以及权限等文件相关信息使客户端创建输出流。这里我们着重看FSNamesystem.startFile()
。
HdfsFileStatus startFile(String src, PermissionStatus permissions,String holder, String clientMachine, EnumSet<CreateFlag> flag,boolean createParent, short replication, long blockSize,CryptoProtocolVersion[] supportedVersions, String ecPolicyName,boolean logRetryCache) throws IOException {HdfsFileStatus status;try {// 创建文件status = startFileInt(src, permissions, holder, clientMachine, flag, createParent, replication, blockSize, supportedVersions, ecPolicyName,logRetryCache);} catch (AccessControlException e) {logAuditEvent(false, "create", src);throw e;}logAuditEvent(true, "create", src, status);return status;}
不需要关注ecPolicy等相关参数,这是利用纠删码(Erasure Coding)实现条带式(striped)存储的方式,可以降低数据存储空间的开销,这里我们不考虑这些。继续看startFileInt()
。
private HdfsFileStatus startFileInt(String src,PermissionStatus permissions, String holder, String clientMachine,EnumSet<CreateFlag> flag, boolean createParent, short replication,long blockSize, CryptoProtocolVersion[] supportedVersions,String ecPolicyName, boolean logRetryCache) throws IOException {if (NameNode.stateChangeLog.isDebugEnabled()) {StringBuilder builder = new StringBuilder();builder.append("DIR* NameSystem.startFile: src=").append(src).append(", holder=").append(holder).append(", clientMachine=").append(clientMachine).append(", createParent=").append(createParent).append(", replication=").append(replication).append(", createFlag=").append(flag).append(", blockSize=").append(blockSize).append(", supportedVersions=").append(Arrays.toString(supportedVersions));NameNode.stateChangeLog.debug(builder.toString());}if (!DFSUtil.isValidName(src) || // 路径是否合法FSDirectory.isExactReservedName(src) || // 路径是否是reserved的(FSDirectory.isReservedName(src) // 同上&& !FSDirectory.isReservedRawName(src) // 是否是预留raw&& !FSDirectory.isReservedInodesName(src))) { // 是否是预留inodethrow new InvalidPathException(src);}boolean shouldReplicate = flag.contains(CreateFlag.SHOULD_REPLICATE);if (shouldReplicate &&(!org.apache.commons.lang.StringUtils.isEmpty(ecPolicyName))) {throw new HadoopIllegalArgumentException("SHOULD_REPLICATE flag and " +"ecPolicyName are exclusive parameters. Set both is not allowed!");}INodesInPath iip = null;boolean skipSync = true; // until we do something that might create editsHdfsFileStatus stat = null;BlocksMapUpdateInfo toRemoveBlocks = null;checkOperation(OperationCategory.WRITE);final FSPermissionChecker pc = getPermissionChecker();writeLock();try {checkOperation(OperationCategory.WRITE);checkNameNodeSafeMode("Cannot create file" + src);// 获取路径中的inodes,INodesInPath中包含了从根目录到当前文件的各级inode信息iip = FSDirWriteFileOp.resolvePathForStartFile( dir, pc, src, flag, createParent);if (blockSize < minBlockSize) {throw new IOException("Specified block size is less than configured" +" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY+ "): " + blockSize + " < " + minBlockSize);}if (shouldReplicate) {blockManager.verifyReplication(src, replication, clientMachine);} else {final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(this, ecPolicyName, iip);if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {checkErasureCodingSupported("createWithEC");if (blockSize < ecPolicy.getCellSize()) {throw new IOException("Specified block size (" + blockSize+ ") is less than the cell size (" + ecPolicy.getCellSize()+") of the erasure coding policy (" + ecPolicy + ").");}} else {// 判断副本数是否超出配置文件设置的限制blockManager.verifyReplication(src, replication, clientMachine);}}FileEncryptionInfo feInfo = null;if (!iip.isRaw() && provider != null) {EncryptionKeyInfo ezInfo = FSDirEncryptionZoneOp.getEncryptionKeyInfo(this, iip, supportedVersions);// if the path has an encryption zone, the lock was released while// generating the EDEK. re-resolve the path to ensure the namesystem// and/or EZ has not mutatedif (ezInfo != null) {checkOperation(OperationCategory.WRITE);iip = FSDirWriteFileOp.resolvePathForStartFile(dir, pc, iip.getPath(), flag, createParent);feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(dir, iip, ezInfo);}}skipSync = false; // following might generate editstoRemoveBlocks = new BlocksMapUpdateInfo();// 目录上写锁dir.writeLock();try {// 创建文件stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,clientMachine, flag, createParent, replication, blockSize, feInfo,toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);} catch (IOException e) {skipSync = e instanceof StandbyException;throw e;} finally {dir.writeUnlock();}} finally {writeUnlock("create");// There might be transactions logged while trying to recover the lease.// They need to be sync'ed even when an exception was thrown.if (!skipSync) {// edit log落盘,实际上就是预写日志getEditLog().logSync();// 如果覆盖文件,则需要清理对应blockif (toRemoveBlocks != null) {removeBlocks(toRemoveBlocks);toRemoveBlocks.clear();}}}return stat;}
着重看FSDirWriteFileOp.startFile()
。
static HdfsFileStatus startFile(FSNamesystem fsn, INodesInPath iip,PermissionStatus permissions, String holder, String clientMachine,EnumSet<CreateFlag> flag, boolean createParent,short replication, long blockSize,FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry)throws IOException {assert fsn.hasWriteLock();boolean overwrite = flag.contains(CreateFlag.OVERWRITE);boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);final String src = iip.getPath();// 目录树FSDirectory fsd = fsn.getFSDirectory(); // 如果目标文件是已存在的if (iip.getLastINode() != null) { // 覆盖if (overwrite) { List<INode> toRemoveINodes = new ChunkedArrayList<>();List<Long> toRemoveUCFiles = new ChunkedArrayList<>();// 1、将文件从命名空间中移除// 2、删除文件对应blocklong ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks, // toRemoveBlocks将在删除流程没出错的情况下在上级方法删除 toRemoveINodes, toRemoveUCFiles, now());if (ret >= 0) {// 将INodesInPath中最后一级inode删掉,即被overwrite的文件iip = INodesInPath.replace(iip, iip.length() - 1, null);FSDirDeleteOp.incrDeletedFileCount(ret);// 删除lease,将inode移除fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);}} else {// If lease soft limit time is expired, recover the leasefsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,src, holder, clientMachine, false);throw new FileAlreadyExistsException(src + " for client " +clientMachine + " already exists");}}// object(inode、block)数量是否超出限制fsn.checkFsObjectLimit(); INodeFile newNode = null;INodesInPath parent =FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);if (parent != null) {// 如果父目录不为空,创建目标文件iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder, clientMachine, shouldReplicate,ecPolicyName);newNode = iip != null ? iip.getLastINode().asFile() : null;}if (newNode == null) {throw new IOException("Unable to add " + src + " to namespace");}fsn.leaseManager.addLease( // 上lease,clientName -> filesnewNode.getFileUnderConstructionFeature().getClientName(),newNode.getId());if (feInfo != null) {FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo,XAttrSetFlag.CREATE);}// 设置存储策略setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist); // 预写日志fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); if (NameNode.stateChangeLog.isDebugEnabled()) {NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +src + " inode " + newNode.getId() + " " + holder);}return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);}
继续看addFile()
。
private static INodesInPath addFile(FSDirectory fsd, INodesInPath existing, byte[] localName,PermissionStatus permissions, short replication, long preferredBlockSize,String clientName, String clientMachine, boolean shouldReplicate,String ecPolicyName) throws IOException {Preconditions.checkNotNull(existing);long modTime = now();INodesInPath newiip;fsd.writeLock();try {boolean isStriped = false;ErasureCodingPolicy ecPolicy = null;if (!shouldReplicate) {ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(fsd.getFSNamesystem(), ecPolicyName, existing);if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {isStriped = true;}}final BlockType blockType = isStriped ?BlockType.STRIPED : BlockType.CONTIGUOUS;final Short replicationFactor = (!isStriped ? replication : null);final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);// 创建inodeINodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions, modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,blockType);newNode.setLocalName(localName);newNode.toUnderConstruction(clientName, clientMachine);// 将inode加入命名空间中newiip = fsd.addINode(existing, newNode, permissions.getPermission()); } finally {fsd.writeUnlock();}if (newiip == null) {NameNode.stateChangeLog.info("DIR* addFile: failed to add " +existing.getPath() + "/" + DFSUtil.bytes2String(localName));return null;}if(NameNode.stateChangeLog.isDebugEnabled()) {NameNode.stateChangeLog.debug("DIR* addFile: " +DFSUtil.bytes2String(localName) + " is added");}return newiip;}
在该方法中,创建了目标文件的inode
,并将其加入目录树中。
(二)addBlock
首先看NameNodeRpcServer
的addBlock()
方法,这是rpc的server端实现。
public LocatedBlock addBlock(String src, String clientName,ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)throws IOException {// NameNode是否完全启动checkNNStartup();// 申请block并获取其存储的DataNodeLocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,clientName, previous, excludedNodes, favoredNodes, addBlockFlags);if (locatedBlock != null) {metrics.incrAddBlockOps();}return locatedBlock;}
进入namesystem.getAdditionalBlock()
方法。
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, ExtendedBlock previous,DatanodeInfo[] excludedNodes, String[] favoredNodes,EnumSet<AddBlockFlag> flags) throws IOException {final String operationName = "getAdditionalBlock";NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +" for {}", src, fileId, clientName);// 用于判断当前块是不是重试块LocatedBlock[] onRetryBlock = new LocatedBlock[1];FSDirWriteFileOp.ValidateAddBlockResult r;// 检查NameNode当前状态(Active Backup StandBy)是否可以执行read操作checkOperation(OperationCategory.READ);final FSPermissionChecker pc = getPermissionChecker();readLock();try {checkOperation(OperationCategory.READ);// 1、是否可以添加block// 2、是否有潜在的重试块// 3、分配DataNoder = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,previous, onRetryBlock);} finally {readUnlock(operationName);}// 如果是重试块,直接返回该块if (r == null) {assert onRetryBlock[0] != null : "Retry block is null";// This is a retry. Just return the last block.return onRetryBlock[0];}// 选择目标存储节点DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(blockManager, src, excludedNodes, favoredNodes, flags, r);checkOperation(OperationCategory.WRITE);writeLock();LocatedBlock lb;try {checkOperation(OperationCategory.WRITE);// block加入blocksMap,记录DataNode正在传输的block数等操作lb = FSDirWriteFileOp.storeAllocatedBlock( this, src, fileId, clientName, previous, targets);} finally {writeUnlock(operationName);}getEditLog().logSync();return lb;}
这个方法做了许多事,我们一个个来看。首先是FSDirWriteFileOp.validateAddBlock()
。
static ValidateAddBlockResult validateAddBlock(FSNamesystem fsn, FSPermissionChecker pc,String src, long fileId, String clientName,ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {final long blockSize;final short numTargets;final byte storagePolicyID;String clientMachine;final BlockType blockType;// 获取从根目录到目标文件每级的inodeINodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);/** 分析文件状态:* 1、判断上一个块和当前名称空间是否为同一个block pool* 2、判断object(inode及block)数是否超出限制* 3、检查lease(单写多读)* 4、校验多种情况下前一块是否合格以及是否为重试块*/FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,previous, onRetryBlock);if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {// This is a retry. No need to generate new locations.// Use the last block if it has locations.return null;}final INodeFile pendingFile = fileState.inode;// 判断先前块是否都已complete,// 是否可以添加新块if (!fsn.checkFileProgress(src, pendingFile, false)) { throw new NotReplicatedYetException("Not replicated yet: " + src);}// 文件过大if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {throw new IOException("File has reached the limit on maximum number of"+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY+ "): " + pendingFile.getBlocks().length + " >= "+ fsn.maxBlocksPerFile);}blockSize = pendingFile.getPreferredBlockSize(); // 块大小,128MBclientMachine = pendingFile.getFileUnderConstructionFeature() // 客户端IP.getClientMachine();// 块类型 // CONTIGUOUS:连续存储,一般是用这个// STRIPED:条带化,用纠删码存储,减少存储空间blockType = pendingFile.getBlockType(); ErasureCodingPolicy ecPolicy = null;// 条带化存储纠删码相关if (blockType == BlockType.STRIPED) {ecPolicy =FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(fsn, iip);numTargets = (short) (ecPolicy.getSchema().getNumDataUnits()+ ecPolicy.getSchema().getNumParityUnits());} else {// 需要的副本数量numTargets = pendingFile.getFileReplication();}storagePolicyID = pendingFile.getStoragePolicyID();return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,clientMachine, blockType, ecPolicy);}
该方法主要验证了文件状态(判断上一个块和当前名称空间是否为同一个block pool、判断object(inode及block)数是否超出限制、检查lease(单写多读)、校验多种情况下前一块是否合格以及是否为重试块),并封装了块相关信息。接下来回到上级方法,看FSDirWriteFileOp.chooseTargetForNewBlock()
。
static DatanodeStorageInfo[] chooseTargetForNewBlock(BlockManager bm, String src, DatanodeInfo[] excludedNodes,String[] favoredNodes, EnumSet<AddBlockFlag> flags,ValidateAddBlockResult r) throws IOException {Node clientNode = null;boolean ignoreClientLocality = (flags != null&& flags.contains(AddBlockFlag.IGNORE_CLIENT_LOCALITY));// If client locality is ignored, clientNode remains 'null' to indicate// 是否考虑客户端本机,因为客户端有可能也是DataNodeif (!ignoreClientLocality) {clientNode = bm.getDatanodeManager().getDatanodeByHost(r.clientMachine);if (clientNode == null) {clientNode = getClientNode(bm, r.clientMachine);}}// 排除的DataNodeSet<Node> excludedNodesSet =(excludedNodes == null) ? new HashSet<>(): new HashSet<>(Arrays.asList(excludedNodes));// 倾向的DataNodeList<String> favoredNodesList =(favoredNodes == null) ? Collections.emptyList(): Arrays.asList(favoredNodes);// choose targets for the new block to be allocated. // 选择DataNodesreturn bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,excludedNodesSet, r.blockSize,favoredNodesList, r.storagePolicyID,r.blockType, r.ecPolicy, flags);}
这个方法主要选择用于存储该block的DataNode。其中excludedNodes
和favoredNodes
都由客户端决定,比如,当客户端尝试连接NameNode对某块分配的DataNode但发现连不上时,就会将该DataNode加入excludedNodes
并重新调用addBlock
分配block,以避免选择客户端不可达的DataNode作为副本。进入bm.chooseTarget4NewBlock()
。