elasticsearch源码分析-05分片分配

news/2024/9/23 9:27:02/

分片分配

上面的集群状态和索引级状态已经恢复完成,开始分配索引分片,恢复分片级元数据,构建routingTable路由表
allocationService的创建在ClusterModule的构造函数中

public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,ClusterInfoService clusterInfoService) {this.clusterPlugins = clusterPlugins;//分配选择器this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);this.allocationDeciders = new AllocationDeciders(deciderList);//分片分配this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);this.clusterService = clusterService;this.indexNameExpressionResolver = new IndexNameExpressionResolver();//分配服务this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService);
}

在这里初始化了分配选择器

Map<Class, AllocationDecider> deciders = new LinkedHashMap<>();
addAllocationDecider(deciders, new MaxRetryAllocationDecider());
addAllocationDecider(deciders, new ResizeAllocationDecider());
addAllocationDecider(deciders, new ReplicaAfterPrimaryActiveAllocationDecider());
addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider());
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));

ES中分配分片有两种一个是allocators分配器和deciders决策器,allocators主要负责尝试寻找最优的节点来分配分片,deciders则判断当前分配方案是否可行
deciders都继承AllocationDecider类,这个抽象类有几种决策方法

  • canRebalance:分片是否可以平衡到给的的allocation
  • canAllocate:分片是否可以分配到给定的节点
  • canRemain:分片是否可以保留在给定的节点
  • shouldAutoExpandToNode:分片是否可以自动扩展到给定节点
  • canForceAllocatePrimary:主分片是否可以强制分配在给定节点
    决策结果分为ALWAYS、YES、NO、THROTTLE,默认值都是always。
    这些决策器可以分为一下几类:
  • 负载均衡类
//相同的shard不能分配到同一节点
SameShardAllocationDecider
//一个节点可以存在同一index下的分片数量限制
ShardsLimitAllocationDecider
//机架感知,尽量将分片分配到不同的机架上
AwarenessAllocationDecider
  • 并发控制类
//重新负载并发控制
ConcurrentRebalanceAllocationDecider
//根据磁盘空间进行分配决策
DiskThresholdDecider
//恢复节点限速控制
ThrottlingAllocationDecider
  • 条件限制类
//所有分片都处于活跃状态才能rebalance
RebalanceOnlyWhenActiveAllocationDecider
//可以配置根据节点ip或名称,过滤节点
FilterAllocationDecider
//只有在主分片分配后再分配
ReplicaAfterPrimaryActiveAllocationDecider
//根据active的shards来决定是否执行rebalance
ClusterRebalanceAllocationDecider

接着回到ClusterModule的构造函数中,这里创建shardsAllocator,就是BalancedShardsAllocator

//分片分配this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);//分片平衡分配allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(settings, clusterSettings));

回到reroute方法

allocationService.reroute(newState, "state recovered");public ClusterState reroute(ClusterState clusterState, String reason) {ClusterState fixedClusterState = adaptAutoExpandReplicas(clusterState);RoutingNodes routingNodes = getMutableRoutingNodes(fixedClusterState);// shuffle the unassigned nodes, just so we won't have things like poison failed shards//重新平衡未分配的节点routingNodes.unassigned().shuffle();RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, fixedClusterState,clusterInfoService.getClusterInfo(), currentNanoTime());//分片reroute(allocation);if (fixedClusterState == clusterState && allocation.routingNodesChanged() == false) {return clusterState;}return buildResultAndLogHealthChange(clusterState, allocation, reason);
}

首先对未分配分片进行重新平衡,然后执行分配

private void reroute(RoutingAllocation allocation) {//移除延迟分片的分片removeDelayMarkers(allocation);//尝试先分配现有的分片副本allocateExistingUnassignedShards(allocation);  // try to allocate existing shard copies first//负载均衡的分配shardsAllocator.allocate(allocation);assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

移除延迟分配的分片,然后尝试分配现有分片

private void allocateExistingUnassignedShards(RoutingAllocation allocation) {//按优先顺序排序allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority orderingfor (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {existingShardsAllocator.beforeAllocation(allocation);}//先分配已经分配过的主分片final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();while (primaryIterator.hasNext()) {final ShardRouting shardRouting = primaryIterator.next();if (shardRouting.primary()) {//分配主分片getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, primaryIterator);}}for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {existingShardsAllocator.afterPrimariesBeforeReplicas(allocation);}//执行未分配分片分配final RoutingNodes.UnassignedShards.UnassignedIterator replicaIterator = allocation.routingNodes().unassigned().iterator();while (replicaIterator.hasNext()) {final ShardRouting shardRouting = replicaIterator.next();if (shardRouting.primary() == false) {getAllocatorForShard(shardRouting, allocation).allocateUnassigned(shardRouting, allocation, replicaIterator);}}}

执行GatewayAllocator执行已存在分片的分片

//已存在的分片分配
public void allocateUnassigned(ShardRouting shardRouting, final RoutingAllocation allocation,UnassignedAllocationHandler unassignedAllocationHandler) {assert primaryShardAllocator != null;assert replicaShardAllocator != null;innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler);
}protected static void innerAllocatedUnassigned(RoutingAllocation allocation,PrimaryShardAllocator primaryShardAllocator,ReplicaShardAllocator replicaShardAllocator,ShardRouting shardRouting,ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {assert shardRouting.unassigned();if (shardRouting.primary()) {//分配主shardprimaryShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler);} else {//分配副本shardreplicaShardAllocator.allocateUnassigned(shardRouting, allocation, unassignedAllocationHandler);}
}

主分分配器primaryShardAllocator和replicaShardAllocator都继承了BaseGatewayShardAllocator方法,在执行allocateUnassigned时主分片分配和副本分片分配会执行相同的方法,只是会执行不同的决策

public void allocateUnassigned(ShardRouting shardRouting, RoutingAllocation allocation,ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {//获取所有的shard信息,根据决策判断在那个节点可以分配shard,返回结果final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);if (allocateUnassignedDecision.isDecisionTaken() == false) {// no decision was taken by this allocatorreturn;}//所有决策都返回trueif (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {unassignedAllocationHandler.initialize(allocateUnassignedDecision.getTargetNode().getId(),allocateUnassignedDecision.getAllocationId(),shardRouting.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE :allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),allocation.changes());} else {unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());}}

主分片分配决策

@Overridepublic AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unassignedShard,final RoutingAllocation allocation,final Logger logger) {if (isResponsibleFor(unassignedShard) == false) {// this allocator is not responsible for allocating this shard//此分配器不负责分配此分片return AllocateUnassignedDecision.NOT_TAKEN;}//debug决策过程final boolean explain = allocation.debugDecision();//获取所有shard信息final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);//没有返回数据说明还在获取中if (shardState.hasData() == false) {allocation.setHasPendingAsyncFetch();List<NodeAllocationResult> nodeDecisions = null;if (explain) {nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation);}return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);}// don't create a new IndexSetting object for every shard as this could cause a lot of garbage// on cluster restart if we allocate a boat load of shardsfinal IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index());//根据分片id,获取保持数据同步的shard集合final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id());//从快照恢复final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;assert inSyncAllocationIds.isEmpty() == false;// use in-sync allocation ids to select nodes//使用同步分配 ID 来选择节点final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore,allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);//是否已经有分配选择final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;logger.debug("[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]", unassignedShard.index(),unassignedShard.id(), nodeShardsResult.orderedAllocationCandidates.size(), unassignedShard, inSyncAllocationIds);if (enoughAllocationsFound == false) {if (snapshotRestore) {// let BalancedShardsAllocator take care of allocating this shardlogger.debug("[{}][{}]: missing local data, will restore from [{}]",unassignedShard.index(), unassignedShard.id(), unassignedShard.recoverySource());return AllocateUnassignedDecision.NOT_TAKEN;} else {// We have a shard that was previously allocated, but we could not find a valid shard copy to allocate the primary.// We could just be waiting for the node that holds the primary to start back up, in which case the allocation for// this shard will be picked up when the node joins and we do another allocation reroutelogger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]",unassignedShard.index(), unassignedShard.id(), nodeShardsResult.allocationsFound);return AllocateUnassignedDecision.no(AllocationStatus.NO_VALID_SHARD_COPY,explain ? buildNodeDecisions(null, shardState, inSyncAllocationIds) : null);}}//构建节点分配分片NodesToAllocate nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, false);DiscoveryNode node = null;String allocationId = null;boolean throttled = false;//可以分配if (nodesToAllocate.yesNodeShards.isEmpty() == false) {//获取第一个DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation",unassignedShard.index(), unassignedShard.id(), unassignedShard, decidedNode.nodeShardState.getNode());//节点node = decidedNode.nodeShardState.getNode();//分片分配idallocationId = decidedNode.nodeShardState.allocationId();} else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) {// The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard// can be force-allocated to one of the nodes.//一个节点可以强制分配nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);if (nodesToAllocate.yesNodeShards.isEmpty() == false) {final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeShardState.getNode());node = nodeShardState.getNode();allocationId = nodeShardState.allocationId();} else if (nodesToAllocate.throttleNodeShards.isEmpty() == false) {logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);throttled = true;} else {logger.debug("[{}][{}]: forced primary allocation denied [{}]",unassignedShard.index(), unassignedShard.id(), unassignedShard);}} else {// we are throttling this, since we are allowed to allocate to this node but there are enough allocations// taking place on the node currently, ignore it for nowlogger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation",unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);throttled = true;}List<NodeAllocationResult> nodeResults = null;if (explain) {//构建分配决策nodeResults = buildNodeDecisions(nodesToAllocate, shardState, inSyncAllocationIds);}//如果异步获取还没结束if (allocation.hasPendingAsyncFetch()) {return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeResults);} else if (node != null) {return AllocateUnassignedDecision.yes(node, allocationId, nodeResults, false);} else if (throttled) {return AllocateUnassignedDecision.throttle(nodeResults);} else {return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, nodeResults, true);}}

如果发现没有分片的状态数据时需要发送请求到其他所有节点获取分片信息,因为不知道哪个节点有shard的数据,启动时候需要遍历所有shard发起fetchData请求,如果集群规模比较大,shard分片数量比较多的的时候需要发送大量请求

//获取所有shard信息
final FetchResult<NodeGatewayStartedShards> shardState = fetchData(unassignedShard, allocation);@Override
protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>fetchData(ShardRouting shard, RoutingAllocation allocation) {AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetch =asyncFetchStarted.computeIfAbsent(shard.shardId(),shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId,//分配索引的配置位置IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),startedAction));//分片元数据AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState =fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));//处理返回的数据if (shardState.hasData()) {shardState.processAllocation(allocation);}return shardState;}

进行异步发送

void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {logger.trace("{} fetching [{}] from {}", shardId, type, nodes);//获取shard的信息action.list(shardId, customDataPath, nodes, new ActionListener<BaseNodesResponse<T>>() {//所有节点已经返回了结果,开始处理@Overridepublic void onResponse(BaseNodesResponse<T> response) {//处理获取的结果processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);}@Overridepublic void onFailure(Exception e) {List<FailedNodeException> failures = new ArrayList<>(nodes.length);for (final DiscoveryNode node: nodes) {failures.add(new FailedNodeException(node.getId(), "total failure in fetching", e));}processAsyncFetch(null, failures, fetchingRound);}});
}
private final TransportNodesListGatewayStartedShards startedAction;
asyncFetchStarted.computeIfAbsent(shard.shardId(),
shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId,
//分配索引的配置位置
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),startedAction));

action就是TransportNodesListGatewayStartedShards,调用list方法发送请求处理action为

public static final String ACTION_NAME = "internal:gateway/local/started_shards";

调用TransportNodesAction的doExecute方法

@Override
protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {//执行new AsyncAction(task, request, listener).start();
}

对端对此的处理在TransportNodesListGatewayStartedShards的nodeOperation方法

@Override
protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {try {//shard标识final ShardId shardId = request.getShardId();logger.trace("{} loading local shard state info", shardId);//加载shard分配元数据ShardStateMetadata shardStateMetadata = ShardStateMetadata.FORMAT.loadLatestState(logger, namedXContentRegistry,nodeEnv.availableShardPaths(request.shardId));if (shardStateMetadata != null) {if (indicesService.getShardOrNull(shardId) == null) {final String customDataPath;if (request.getCustomDataPath() != null) {customDataPath = request.getCustomDataPath();} else {// TODO: Fallback for BWC with older ES versions. Remove once request.getCustomDataPath() always returns non-nullfinal IndexMetadata metadata = clusterService.state().metadata().index(shardId.getIndex());if (metadata != null) {customDataPath = new IndexSettings(metadata, settings).customDataPath();} else {logger.trace("{} node doesn't have meta data for the requests index", shardId);throw new ElasticsearchException("node doesn't have meta data for index " + shardId.getIndex());}}// we don't have an open shard on the store, validate the files on disk are openableShardPath shardPath = null;try {shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);if (shardPath == null) {throw new IllegalStateException(shardId + " no shard path found");}Store.tryOpenIndex(shardPath.resolveIndex(), shardId, nodeEnv::shardLock, logger);} catch (Exception exception) {final ShardPath finalShardPath = shardPath;logger.trace(() -> new ParameterizedMessage("{} can't open index for shard [{}] in path [{}]",shardId,shardStateMetadata,(finalShardPath != null) ? finalShardPath.resolveIndex() : ""),exception);String allocationId = shardStateMetadata.allocationId != null ?shardStateMetadata.allocationId.getId() : null;return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetadata.primary,exception);}}logger.debug("{} shard state info found: [{}]", shardId, shardStateMetadata);String allocationId = shardStateMetadata.allocationId != null ?shardStateMetadata.allocationId.getId() : null;return new NodeGatewayStartedShards(clusterService.localNode(), allocationId, shardStateMetadata.primary);}//没有shard元数据logger.trace("{} no local shard info found", shardId);return new NodeGatewayStartedShards(clusterService.localNode(), null, false);} catch (Exception e) {throw new ElasticsearchException("failed to load started shards", e);}
}

这里加载本地shard级元数据然后返回。接收到对端返回数据后将数据放入cache中以便后面执行reroute的时候不用重复获取,然后执行reroute(shardId, “post_response”)方法

@Override
protected void reroute(ShardId shardId, String reason) {logger.trace("{} scheduling reroute for {}", shardId, reason);assert rerouteService != null;rerouteService.reroute("async_shard_fetch", Priority.HIGH, ActionListener.wrap(r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
}

提交一个任务clusterService.submitStateUpdateTask

//cluster_rerouteclusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",new ClusterStateUpdateTask(priority) {@Overridepublic ClusterState execute(ClusterState currentState) {final boolean currentListenersArePending;synchronized (mutex) {assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners): "currentListeners=" + currentListeners + ", pendingRerouteListeners=" + pendingRerouteListeners;currentListenersArePending = pendingRerouteListeners == currentListeners;if (currentListenersArePending) {pendingRerouteListeners = null;}}if (currentListenersArePending) {logger.trace("performing batched reroute [{}]", reason);return reroute.apply(currentState, reason);} else {logger.trace("batched reroute [{}] was promoted", reason);return currentState;}}......});

调用execute方法执行return reroute.apply(currentState, reason),reroute在node构造函数中进行设置就是AllocationService的reroute

//集群重新平衡服务
final RerouteService rerouteService= new BatchedRerouteService(clusterService, clusterModule.getAllocationService()::reroute);

这里继续调用AllocationService的reroute方法,当前cache已经有了shard元数据,可以进行主分片分配流程
继续回到PrimaryShardAllocator.makeAllocationDecision方法中

final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index());//根据分片id,获取保持数据同步的shard集合final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id());

获取具有同步副本的shard

//使用同步分配 ID 来选择节点
final NodeShardsResult nodeShardsResult = buildNodeShardsResult(unassignedShard, snapshotRestore,                                             allocation.getIgnoreNodes(unassignedShard.shardId()), inSyncAllocationIds, shardState, logger);protected static NodeShardsResult buildNodeShardsResult(ShardRouting shard, boolean matchAnyShard,Set<String> ignoreNodes, Set<String> inSyncAllocationIds,FetchResult<NodeGatewayStartedShards> shardState,Logger logger) {List<NodeGatewayStartedShards> nodeShardStates = new ArrayList<>();int numberOfAllocationsFound = 0;for (NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {DiscoveryNode node = nodeShardState.getNode();String allocationId = nodeShardState.allocationId();if (ignoreNodes.contains(node.getId())) {continue;}//没有发生异常if (nodeShardState.storeException() == null) {if (allocationId == null) {logger.trace("[{}] on node [{}] has no shard state information", shard, nodeShardState.getNode());} else {logger.trace("[{}] on node [{}] has allocation id [{}]", shard, nodeShardState.getNode(), allocationId);}} else {final String finalAllocationId = allocationId;if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be " +"opened as it's locked, treating as valid shard", shard, nodeShardState.getNode(), finalAllocationId),nodeShardState.storeException());} else {logger.trace(() -> new ParameterizedMessage("[{}] on node [{}] has allocation id [{}] but the store can not be " +"opened, treating as no allocation id", shard, nodeShardState.getNode(), finalAllocationId),nodeShardState.storeException());allocationId = null;}}if (allocationId != null) {assert nodeShardState.storeException() == null ||nodeShardState.storeException() instanceof ShardLockObtainFailedException :"only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a " +"store throwing " + nodeShardState.storeException();//记录发现分片被分配的次数numberOfAllocationsFound++;//发回的节点shard是保持数据同步的if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) {nodeShardStates.add(nodeShardState);}}}final Comparator<NodeGatewayStartedShards> comparator; // allocation preferenceif (matchAnyShard) {// prefer shards with matching allocation ids//匹配具有最新数据的节点靠前Comparator<NodeGatewayStartedShards> matchingAllocationsFirst = Comparator.comparing((NodeGatewayStartedShards state) -> inSyncAllocationIds.contains(state.allocationId())).reversed();comparator = matchingAllocationsFirst.thenComparing(NO_STORE_EXCEPTION_FIRST_COMPARATOR).thenComparing(PRIMARY_FIRST_COMPARATOR);} else {//没有发生异常的靠前comparator = NO_STORE_EXCEPTION_FIRST_COMPARATOR.thenComparing(PRIMARY_FIRST_COMPARATOR);}nodeShardStates.sort(comparator);if (logger.isTraceEnabled()) {logger.trace("{} candidates for allocation: {}", shard, nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", ")));}return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);}

构造节点列表,如果atchAnyShard 设置为 false只会将同步副本中的shard放入列表,否则任何具有分配的节点都会被添加到列表中,但是同步副本shard在列表前面

//构建节点分配分片
NodesToAllocate nodesToAllocate = buildNodesToAllocate(
allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, false
);private static NodesToAllocate buildNodesToAllocate(RoutingAllocation allocation,List<NodeGatewayStartedShards> nodeShardStates,ShardRouting shardRouting,boolean forceAllocate) {List<DecidedNode> yesNodeShards = new ArrayList<>();List<DecidedNode> throttledNodeShards = new ArrayList<>();List<DecidedNode> noNodeShards = new ArrayList<>();for (NodeGatewayStartedShards nodeShardState : nodeShardStates) {RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());if (node == null) {continue;}//是否开启强制分配主节点Decision decision = forceAllocate ? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation) :allocation.deciders().canAllocate(shardRouting, node, allocation);//记录分片分配的决策DecidedNode decidedNode = new DecidedNode(nodeShardState, decision);if (decision.type() == Type.THROTTLE) {throttledNodeShards.add(decidedNode);} else if (decision.type() == Type.NO) {noNodeShards.add(decidedNode);} else {yesNodeShards.add(decidedNode);}}return new NodesToAllocate(Collections.unmodifiableList(yesNodeShards), Collections.unmodifiableList(throttledNodeShards),Collections.unmodifiableList(noNodeShards));}

遍历所有deciders将决策结果分类保存THROTTLE、NO和YES三类然后返回

//可以分配if (nodesToAllocate.yesNodeShards.isEmpty() == false) {//获取第一个DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation",unassignedShard.index(), unassignedShard.id(), unassignedShard, decidedNode.nodeShardState.getNode());//节点node = decidedNode.nodeShardState.getNode();//分片分配idallocationId = decidedNode.nodeShardState.allocationId();} else if (nodesToAllocate.throttleNodeShards.isEmpty() && !nodesToAllocate.noNodeShards.isEmpty()) {// The deciders returned a NO decision for all nodes with shard copies, so we check if primary shard// can be force-allocated to one of the nodes.//可以强制分配nodesToAllocate = buildNodesToAllocate(allocation, nodeShardsResult.orderedAllocationCandidates, unassignedShard, true);if (nodesToAllocate.yesNodeShards.isEmpty() == false) {final DecidedNode decidedNode = nodesToAllocate.yesNodeShards.get(0);final NodeGatewayStartedShards nodeShardState = decidedNode.nodeShardState;logger.debug("[{}][{}]: allocating [{}] to [{}] on forced primary allocation",unassignedShard.index(), unassignedShard.id(), unassignedShard, nodeShardState.getNode());node = nodeShardState.getNode();allocationId = nodeShardState.allocationId();} else if (nodesToAllocate.throttleNodeShards.isEmpty() == false) {logger.debug("[{}][{}]: throttling allocation [{}] to [{}] on forced primary allocation",unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);throttled = true;} else {logger.debug("[{}][{}]: forced primary allocation denied [{}]",unassignedShard.index(), unassignedShard.id(), unassignedShard);}} else {// we are throttling this, since we are allowed to allocate to this node but there are enough allocations// taking place on the node currently, ignore it for nowlogger.debug("[{}][{}]: throttling allocation [{}] to [{}] on primary allocation",unassignedShard.index(), unassignedShard.id(), unassignedShard, nodesToAllocate.throttleNodeShards);throttled = true;}List<NodeAllocationResult> nodeResults = null;if (explain) {//构建分配决策nodeResults = buildNodeDecisions(nodesToAllocate, shardState, inSyncAllocationIds);}//如果异步获取还没结束if (allocation.hasPendingAsyncFetch()) {return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeResults);} else if (node != null) {return AllocateUnassignedDecision.yes(node, allocationId, nodeResults, false);} else if (throttled) {return AllocateUnassignedDecision.throttle(nodeResults);} else {return AllocateUnassignedDecision.no(AllocationStatus.DECIDERS_NO, nodeResults, true);}

如果node可以分配主shard,则在该节点分配主分片。如果没有node可以选择分配主分片则查看是否可以强制分配主分片。如果没有节点可以分配主分片且返回需要限速节点则返回限速节点结果
回到BaseGatewayShardAllocator的allocateUnassigned方法

public void allocateUnassigned(ShardRouting shardRouting, RoutingAllocation allocation,ExistingShardsAllocator.UnassignedAllocationHandler unassignedAllocationHandler) {//获取所有的shard信息,根据决策判断在那个节点可以分配shard,返回结果final AllocateUnassignedDecision allocateUnassignedDecision = makeAllocationDecision(shardRouting, allocation, logger);if (allocateUnassignedDecision.isDecisionTaken() == false) {// no decision was taken by this allocatorreturn;}//所有决策都返回true的nodeif (allocateUnassignedDecision.getAllocationDecision() == AllocationDecision.YES) {unassignedAllocationHandler.initialize(allocateUnassignedDecision.getTargetNode().getId(),allocateUnassignedDecision.getAllocationId(),shardRouting.primary() ? ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE :allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),allocation.changes());} else {unassignedAllocationHandler.removeAndIgnore(allocateUnassignedDecision.getAllocationStatus(), allocation.changes());}}

调用initialize开始初始化主分片信息

public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId,long expectedSize, RoutingChangesObserver routingChangesObserver) {ensureMutable();assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;//创建shardRoutingShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);//分配节点添加分配分片,node到shard关系node(nodeId).add(initializedShard);//活跃shard计数inactiveShardCount++;if (initializedShard.primary()) {inactivePrimaryCount++;}//添加分配恢复addRecovery(initializedShard);assignedShardsAdd(initializedShard);routingChangesObserver.shardInitialized(unassignedShard, initializedShard);return initializedShard;}

副本分片分配与主分片分配逻辑相同,调用了replicaShardAllocator.allocateUnassigned方法,调用ReplicaShardAllocator的makeAllocationDecision方法决策副本分片分配。先判断是否可以至少在一个节点上分配,如果不能分配则直接跳过后面的步骤。副本分片分配也需要获取一次shard信息,但是之前主分片分配已经获取了一次数据,副本分片分配可以直接使用上次执行获取分片的结果,如果没有node可以分配则查看是否延迟分配,然后执行initialize方法

public ShardRouting initializeShard(ShardRouting unassignedShard, String nodeId, @Nullable String existingAllocationId,long expectedSize, RoutingChangesObserver routingChangesObserver) {ensureMutable();assert unassignedShard.unassigned() : "expected an unassigned shard " + unassignedShard;//创建shardRoutingShardRouting initializedShard = unassignedShard.initialize(nodeId, existingAllocationId, expectedSize);//分配节点添加分配分片,node到shard关系node(nodeId).add(initializedShard);//活跃shard计数inactiveShardCount++;if (initializedShard.primary()) {inactivePrimaryCount++;}//添加分配恢复addRecovery(initializedShard);assignedShardsAdd(initializedShard);routingChangesObserver.shardInitialized(unassignedShard, initializedShard);return initializedShard;}public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize) {assert state == ShardRoutingState.UNASSIGNED : this;assert relocatingNodeId == null : this;final AllocationId allocationId;//如果分配id不存在则新创建一个allocationIdif (existingAllocationId == null) {allocationId = AllocationId.newInitializing();} else {allocationId = AllocationId.newInitializing(existingAllocationId);}return new ShardRouting(shardId, nodeId, null, primary, ShardRoutingState.INITIALIZING, recoverySource,unassignedInfo, allocationId, expectedShardSize);}

shard进入INITIALIZING状态
已存在shard分配结束执行不存在shard的分配

//负载均衡的分配
shardsAllocator.allocate(allocation);public void allocate(RoutingAllocation allocation) {if (allocation.routingNodes().size() == 0) {failAllocationOfNewPrimaries(allocation);return;}final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);//负载均衡的分配,未分配的分配balancer.allocateUnassigned();//移动分配balancer.moveShards();//负载平衡balancer.balance();}

reroute执行最后后会将shardRouting添加到集群状态中然后返回一个新的集群状态,然后会将集群状态广播出去开始执行recovery流程


http://www.ppmy.cn/news/1476008.html

相关文章

ssh作用及原理

目录 什么是SSH ssh作用 ssh配置 Port UseDNS yes SyslogFacility AUTHPRIV LoginGraceTime 2m PermitRootLogin yes PasswordAuthentication yes PermitEmptyPasswords no PrintLastLog yes MaxAuthTries 6 什么是SSH 那天看到一篇关于SSH的博客,我想到,其实关…

QT--控件篇四

一、对话框 在软件开发中&#xff0c;对话框&#xff08;Dialog&#xff09;是一种常见的用户界面元素&#xff0c;用于与用户进行交互和获取信息。它通常以模态或非模态的形式出现&#xff0c;模态对话框会阻止用户与应用程序的其他部分交互&#xff0c;直到对话框关闭为止&a…

C++ :内联函数inline|nullptr

欢迎来到HarperLee的学习笔记&#xff01; 博主主页传送门&#xff1a;HarperLee博客主页&#xff01; 欢迎交流学习&#xff01; 一、inline关键字 1.1 什么是内联函数&#xff1f; 内联函数&#xff1a;用** inline 修饰的函数叫做内联函数&#xff0c;编译时C编译器会在调用…

[Linux+git+Gitee+Jenkins]持续集成实验安装配置详细

首先理解持续集成原理&#xff0c;看懂并理解图 1。 图 1 持续集成原理结构 图 1 中&#xff0c;版本控制服务器指远程代码仓库&#xff0c;本实验使用 GitEE 作为远程代码仓库&#xff1b;Jenkins 自动化部署服务器为虚拟机&#xff0c;操作系统为 Linux &#xff1b…

数仓工具—Hive基础之临时表及示例

Hive基础之临时表及示例 临时表是应用程序自动管理在大型或复杂查询执行期间生成的中间数据的一种便捷方式。Hive 0.14 及更高版本支持临时表。可以在用户会话中像使用普通表一样多次使用它们。在本文中,我们将介绍 Apache Hive 临时表,以及如何创建和使用限制的示例。 Hiv…

Windows与Ubuntu安装ffmpeg

文章目录 前言ffmpeg的简介安装ffmpegWindows下载设置环境变量 Ubuntu 总结 前言 FFmpeg是一款非常强大的开源音视频处理工具&#xff0c;它包含了众多的音视频编解码库&#xff0c;可以用于音视频的采集、编解码、转码、流化、过滤和播放等复杂的处理。在Windows系统上安装FF…

汽车的驱动力,是驱动汽车行驶的力吗?

一、地面对驱动轮的反作用力&#xff1f; 汽车发动机产生的转矩&#xff0c;经传动系传至驱动轮上。此时作用于驱动轮上的转矩Tt产生一个对地面的圆周力F0&#xff0c;地面对驱动轮的反作用力Ft(方向与F0相反)即是驱动汽车的外力&#xff0c;此外力称为汽车的驱动力。 即汽车…

【iOS】——ARC源码探究

一、ARC介绍 ARC的全称Auto Reference Counting. 也就是自动引用计数。使用MRC时开发者不得不花大量的时间在内存管理上&#xff0c;并且容易出现内存泄漏或者release一个已被释放的对象&#xff0c;导致crash。后来&#xff0c;Apple引入了ARC。使用ARC&#xff0c;开发者不再…