Kafka-Controller角色需要做什么?

embedded/2024/11/24 1:24:11/

一、上下文

《Kafka-Controller选举》博客中分析了Controller是如何选举出来的,且比如会执行onControllerFailover()。接下来让我们看看Controller角色都承担了哪些职责。

二、注册监听器

在从zookeeper读取资源前,注册监听器以获取 broker/topic 的回调。

    val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler,isrChangeNotificationHandler)//依次注册这些 Handler//子节点变化监听childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)//节点变化监听nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)

三、初始化ControllerContext

1、获取所有的broker

其实就是获取brokers/ids/ 目录下的id来得到broker列表

val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
  def getAllBrokerAndEpochsInCluster: Map[Broker, Long] = {//从 brokers/ids 目录下获取所有 brokerid  且排好序val brokerIds = getSortedBrokerList//为每个 brokerid 都封装一个 请求val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))val getDataResponses = retryRequestsUntilConnected(getDataRequests)getDataResponses.flatMap { getDataResponse =>val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]getDataResponse.resultCode match {case Code.OK =>// decode 解读 将json 合 brokerid 构建成 BrokerInfo//{//    "version":5,//    "host":"localhost",//    "port":9092,//    "jmx_port":9999,//    "timestamp":"2233345666",//    "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],//    *"rack":"dc1",//    "features": {"feature": {"min_version":1, "first_active_version":2, "max_version":3}}//   }Some((BrokerIdZNode.decode(brokerId, getDataResponse.data).broker, getDataResponse.stat.getCzxid))case Code.NONODE => Nonecase _ => throw getDataResponse.resultException.get}}.toMap}

这一步会得到一个Map[Broker, Long]

Broker中有brokerid,也有这台broker的连接信息、机架信息,此时Controller已经知道自己需要管理的broker有哪些,且可以建立通信

2、判断这些broker是否兼容

val (compatibleBrokerAndEpochs, incompatibleBrokerAndEpochs) = partitionOnFeatureCompatibility(curBrokerAndEpochs)

返回的结果中:

compatibleBrokerAndEpochs 为兼容的borker map

incompatibleBrokerAndEpochs 为不兼容的borker map

那么怎么判断一个broker是否兼容呢?我们看看下面的代码:

  private def partitionOnFeatureCompatibility(brokersAndEpochs: Map[Broker, Long]): (Map[Broker, Long], Map[Broker, Long]) = {//partition 方法://一对元素,首先,所有满足谓词p的元素,其次,所有不满足谓词p的元素。//这两个可迭代集合分别对应filter和filterNot的结果。//这里提供的默认实现需要遍历该集合两次。严格集合在StrictOptimizedIterableOps中有一个重写版本的分区,只需要一次遍历。brokersAndEpochs.partition {case (broker, _) =>!config.isFeatureVersioningSupported ||!featureCache.getFeatureOption.exists(latestFinalizedFeatures =>BrokerFeatures.hasIncompatibleFeatures(broker.features,latestFinalizedFeatures.finalizedFeatures().asScala.map(kv => (kv._1, kv._2.toShort)).toMap))}}
def isFeatureVersioningSupported = interBrokerProtocolVersion.isFeatureVersioningSupported
public enum MetadataVersion {IBP_0_8_0(-1, "0.8.0", ""),//.....IBP_2_7_IV0(-1, "2.7", "IV0"),public boolean isFeatureVersioningSupported() {return this.isAtLeast(IBP_2_7_IV0);}
}

MetadataVersion包含不同的Kafka版本,其中2.7就表示该Kafka集群可以兼容的最小版本。如果某个broker的代码版本低于这个版本,就会判定为不兼容。

3、将兼容的broker设置成live状态

controllerContext.setLiveBrokers(compatibleBrokerAndEpochs)
class ControllerContext extends ControllerChannelContext {private val liveBrokers = mutable.Set.empty[Broker]private val liveBrokerEpochs = mutable.Map.empty[Int, Long]def setLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {clearLiveBrokers()addLiveBrokers(brokerAndEpochs)}def addLiveBrokers(brokerAndEpochs: Map[Broker, Long]): Unit = {liveBrokers ++= brokerAndEpochs.keySetliveBrokerEpochs ++= brokerAndEpochs.map { case (broker, brokerEpoch) => (broker.id, brokerEpoch) }}}

其实就是在其内部维护了一个map(liveBrokers ),将存活的broker都放入其中。

4、获取所有的topic

controllerContext.setAllTopics(zkClient.getAllTopicsInCluster(true))
  def getAllTopicsInCluster(registerWatch: Boolean = false): Set[String] = {//查看 brokers/topics 下有哪些信息val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(TopicsZNode.path, registerWatch))getChildrenResponse.resultCode match {case Code.OK => getChildrenResponse.children.toSetcase Code.NONODE => Set.emptycase _ => throw getChildrenResponse.resultException.get}}

从zookeeper的brokers/topics目录下获取所有的topic

  val allTopics = mutable.Set.empty[String]def setAllTopics(topics: Set[String]): Unit = {allTopics.clear()allTopics ++= topics}

维护了一个set,将所有的topic都放进去

5、检测每个topic下partition的变化

  registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)private def registerPartitionModificationsHandlers(topics: Seq[String]): Unit = {topics.foreach { topic =>//依次将每个 topic 注册ControllerEventManager中的 队列中 ,监控topic的改变val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic)partitionModificationsHandlers.put(topic, partitionModificationsHandler)}partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)}

topic下有topic_id、partitions、adding_replicas、removing_replicas等信息,当这些发生改变时,kafka也要对正在使用它们的producer、consumer进行调整,详细可以查看processPartitionModifications()

6、获取TopicPartition副本分配信息

val replicaAssignmentAndTopicIds = zkClient.getReplicaAssignmentAndTopicIdForTopics(controllerContext.allTopics.toSet)
  def getReplicaAssignmentAndTopicIdForTopics(topics: Set[String]): Set[TopicIdReplicaAssignment] = {val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)getDataResponses.map { getDataResponse =>val topic = getDataResponse.ctx.get.asInstanceOf[String]getDataResponse.resultCode match {case Code.OK => TopicZNode.decode(topic, getDataResponse.data)case Code.NONODE => TopicIdReplicaAssignment(topic, None, Map.empty[TopicPartition, ReplicaAssignment])case _ => throw getDataResponse.resultException.get}}.toSet}

brokers/topic/partitions下有每个分区对应的副本信息,此时可以到达 TopicPartition -> 副本信息的对应关系,并将最新的对应关系更新到ControllerContext中

    replicaAssignmentAndTopicIds.foreach { case TopicIdReplicaAssignment(_, _, assignments) =>assignments.foreach { case (topicPartition, replicaAssignment) =>//更新分区中的所有副本分配controllerContext.updatePartitionFullReplicaAssignment(topicPartition, replicaAssignment)if (replicaAssignment.isBeingReassigned)controllerContext.partitionsBeingReassigned.add(topicPartition)}}

7、检测broker改变

  registerBrokerModificationsHandler(controllerContext.liveOrShuttingDownBrokerIds)private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {debug(s"Register BrokerModifications handler for $brokerIds")//循环每个broker,如果broker发生改变,会触发processBrokerModification(brokerId) 进行处理brokerIds.foreach { brokerId =>val brokerModificationsHandler = new BrokerModificationsHandler(eventManager, brokerId)zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerModificationsHandler)brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)}}

《Kafka-Controller选举》博客中我们已经知道它内部有一个队列,并且有一个循环线程,不停的处理队列中的事件,上面分析的broker改变、partition改变、topic改变都会触发事件,而这些事件都会放入这个对立,进行对应的处理

8、更新所有现有分区的leader和isr缓存

updateLeaderAndIsrCache()
  private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq): Unit = {//TopicPartitionStateZNode.decode   json中有 leader 、leader_epoch、isr、leader_recovery_state、controller_epoch//返回一个 map Map[TopicPartition, LeaderIsrAndControllerEpoch]// LeaderIsrAndControllerEpoch 中有 leader、ISR、LeaderEpoch 、ControllerEpoch 、ZkVersion、LeaderRecoveryStateval leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)leaderIsrAndControllerEpochs.forKeyValue { (partition, leaderIsrAndControllerEpoch) =>controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)}}
class ControllerContext extends ControllerChannelContext {private val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]def putPartitionLeadershipInfo(partition: TopicPartition,leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {//设置 分区 leader 信息val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)val replicaAssignment = partitionFullReplicaAssignment(partition)updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))}
}

在ControllerContext中维护了一个map(TopicPartition -> LeaderIsrAndControllerEpoch)来存放每个分区的leader和isr

9、与每个broker建立通信

controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
class ControllerChannelManager(...){protected val brokerStateInfo = new mutable.HashMap[Int, ControllerBrokerStateInfo]def startup(initialBrokers: Set[Broker]):Unit = {//controller 会与每个broker建立连接 ,这一步只会将 brokerStateInfo 进行填充initialBrokers.foreach(addNewBroker)//为每个broker启动一个线程进行连接连接brokerLock synchronized {brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))}}private def addNewBroker(broker: Broker): Unit = {//......val requestThread = new RequestSendThread(config.brokerId, controllerEpoch, messageQueue, networkClient,brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)requestThread.setDaemon(false)brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics, reconfigurableChannelBuilder))}private def startRequestSendThread(brokerId: Int): Unit = {val requestThread = brokerStateInfo(brokerId).requestSendThreadif (requestThread.getState == Thread.State.NEW)requestThread.start()}
}

下面我们看下 RequestSendThread中都做了什么?

RequestSendThread继承了ShutdownableThread,它里面会循环调起doWork();

  override def doWork(): Unit = {def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS)var clientResponse: ClientResponse = nulltry {var isSendSuccessful = falsewhile (isRunning && !isSendSuccessful) {// 如果代理长时间关闭,那么在某个时候,控制器的zookeeper监听器将触发removeBroker,该监听器将在该线程上调用shutdown()。到那时,我们将停止重试。try {if (!brokerReady()) {isSendSuccessful = falsebackoff()}else {val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,time.milliseconds(), true)clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)isSendSuccessful = true}} catch {case e: Throwable => //如果发送不成功,请重新连接到代理并重新发送消息networkClient.close(brokerNode.idString)isSendSuccessful = falsebackoff()}}if (clientResponse != null) {val requestHeader = clientResponse.requestHeader//controller 与broker 之间使用的api有这三个// ApiKeys.LEADER_AND_ISR 、ApiKeys.LEADER_AND_ISR 、ApiKeys.UPDATE_METADATA)val api = requestHeader.apiKeyif (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.UPDATE_METADATA)throw new KafkaException(s"Unexpected apiKey received: $apiKey")val response = clientResponse.responseBodystateChangeLogger.withControllerEpoch(controllerEpoch()).trace(s"Received response " +s"$response for request $api with correlation id " +s"${requestHeader.correlationId} sent to broker $brokerNode")if (callback != null) {callback(response)}}} catch {case e: Throwable =>//如果出现任何socket错误(例如socket超时),则连接不再可用,需要重新创建。networkClient.close(brokerNode.idString)}}

四、过滤并删除topic

    //topicsToBeDeleted : 要删除的topics列表//topicsIneligibleForDeletion : 不符合删除条件的 topics 列表val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()info("Initializing topic deletion manager")//正在初始化 topic删除 状态机。topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
  private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {///admin/delete_topics  下 被标记为删除的 topic// 说明 topic被删除,就是在 zk 的目录下 进行标记val topicsToBeDeleted = zkClient.getTopicDeletions.toSetval topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => {//从controllerContext 获取topic的副本 也就是 TopicPartitionval replicasForTopic = controllerContext.replicasForTopic(topic)//判断是否存在  broker 在线  &&  副本的状态是不在线replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition))}}//topic 重新分配中val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.map(_.topic)// 什么叫不符合删除条件?// 副本的状态是不在线  或者 在重新分配 就将该topic标记为 不符合删除条件的topicval topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress//要删除的topics列表info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")//不符合删除条件的topics列表info(s"List of topics ineligible for deletion: ${topicsIneligibleForDeletion.mkString(",")}")(topicsToBeDeleted, topicsIneligibleForDeletion)}

五、更新元数据

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
  private[controller] def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicPartition]): Unit = {try {brokerRequestBatch.newBatch()brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)//通过api向每个borker发送一下请求//ApiKeys.LEADER_AND_ISR//ApiKeys.UPDATE_METADATA//ApiKeys.STOP_REPLICAbrokerRequestBatch.sendRequestsToBrokers(epoch)} catch {case e: IllegalStateException =>handleIllegalState(e)}}

ApiKeys.LEADER_AND_ISR:我们在《Kafka-确定broker中的分区是leader还是follower》中分析过,它会让每个borker启动对所属分区的leader角色或者follower角色,并开始各自角色所负责的任务。

ApiKeys.UPDATE_METADATA:会像每个broker同步最新的元数据

六、启动副本状态机、分区状态机

  val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))replicaStateMachine.startup()partitionStateMachine.startup()

1、副本状态机

其中定义了副本可以处于的状态和转换状态的前置状态。replica可能处于的不同状态如下:

1、NewReplica

        controller可以在partition重新分配期间创建新的replicas。在这种状态下,副本只能成为follower状态更改请求。有效的前置状态为NonExistentReplica

2、OnlineReplica

        一旦启动了replica并为其partition分配了部分replica,它就处于这种状态。在这种状态下,它可以成为leader或follower状态更改请求。有效的前置状态为NewReplica、OnlineReplica、OfflineReplica和ReplicaDeletionIneligible

3、OfflineReplica

        如果replica死亡,它将移动到此状态。当承载replica的broker关闭时,就会发生这种情况。有效的前置状态为NewReplica、OnlineReplica、OfflineReplica和ReplicaDeletionIneligible

4、ReplicaDeletionStarted

        如果开始删除replica,它将移动到此状态。有效的前置状态为OfflineReplica

5、ReplicaDeletionSuccessful

        如果replica在响应删除replica请求时没有错误代码,则将其移动到此状态。有效的前置状态为ReplicaDelegationStarted

6、ReplicaDeletionSuccessful

        如果replica删除失败,则将其移动到此状态。有效的前置状态为ReplicaDelegationStarted和OfflineReplica

7、NonExistentReplica

        如果replica被成功删除,它将移动到此状态。有效的前置状态为ReplicaDelegationSuccessful

2、分区状态机

其中定义了分区可以处于的状态和转换状态的前置状态。分区可能处于的不同状态如下:

1、NonExistentPartition

        此状态表示分区从未创建或创建后删除。有效的前置状态(如果存在)是OfflinePartition

2、NewPartition

        创建后,分区处于NewPartition状态。在这种状态下,分区应该分配了副本,但还没有leader/isr。有效的前置状态为NonExistentPartition

3、OnlinePartition

        一旦为分区选出了领导者,它就处于OnlinePartition状态。有效的前置状态为NewPartition/OfflinePartition

4、OfflinePartition

        如果在成功选举领导者后,分区的领导者死亡,则分区将移动到OfflinePartition状态。有效的先前状态为NewPartition/OnlinePartition

七、分区重分配

initializePartitionReassignments()

通过检测/admin/revariation_partions的变化来对未来的分区进行重分配

  private def initializePartitionReassignments(): Unit = {//当controller 进行故障转移时,新的重新分配可能已通过Zookeeper提交val zkPartitionsResumed = processZkPartitionReassignment()// 我们可能还有一些基于API的重新分配需要重新启动maybeResumeReassignments { (tp, _) =>!zkPartitionsResumed.contains(tp)}}

八、从副本中选举leader

val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
//尝试为每个给定分区选择一个副本作为领导者。
onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {// 从 zk的 admin/preferred_replica_election 获取首选副本// PreferredReplicaElectionZNode.decode//  admin/preferred_replica_election/partitions/topic//  admin/preferred_replica_election/partitions/partition//  封装成TopicPartition(topic, partition)val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection// 检查是否已完成或主题是否已删除val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>val replicas = controllerContext.partitionReplicaAssignment(partition)val topicDeleted = replicas.isEmptyval successful =if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).get.leaderAndIsr.leader == replicas.head else falsesuccessful || topicDeleted}val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElectionval pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion//正在进行首选副本选择的分区info(s"Partitions undergoing preferred replica election: ${partitionsUndergoingPreferredReplicaElection.mkString(",")}")//已完成首选副本选择的分区info(s"Partitions that completed preferred replica election: ${partitionsThatCompletedPreferredReplicaElection.mkString(",")}")//由于主题删除,跳过分区的首选副本选择info(s"Skipping preferred replica election for partitions due to topic deletion: ${pendingPreferredReplicaElectionsSkippedFromTopicDeletion.mkString(",")}")//恢复分区的首选副本选择info(s"Resuming preferred replica election for partitions: ${pendingPreferredReplicaElections.mkString(",")}")pendingPreferredReplicaElections}

九、开始调度

private[controller] val kafkaScheduler = new KafkaScheduler(1)kafkaScheduler.startup()
    public void startup() {log.debug("Initializing task scheduler.");synchronized (this) {if (isStarted())throw new IllegalStateException("This scheduler has already been started.");//初始化线程池//kafkaController 中的 background.threads  默认 10  既 threads = 10 ,但这里是1// 用于各种后台处理任务的线程数ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threads);executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);executor.setRemoveOnCancelPolicy(true);executor.setThreadFactory(runnable ->new KafkaThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon));this.executor = executor;}}

 十、启动leader自动再平衡任务

auto.leader.rebalance.enable 默认 true ,可以设置成false将其关闭

启用自动 leader 平衡。后台线程定期检查分区 leader 的分布,可由5s配置。如果领导者不平衡超过5s,则触发领导者重新平衡到分区的首选领导者

    if (config.autoLeaderRebalanceEnable) {//分区lieader 再平衡任务scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)}

十一、检测Controller变动

《Kafka-Controller选举》中提到,KafkaController启动时注册了两个事件:RegisterBrokerAndReelect 和 Startup

RegisterBrokerAndReelect事件是从众多的broker启动时选举出Controller,而Startup事件是检测zookeeper的controller目录,并再次进行controller的选举。因为Controller是集群的核心,必须在有故障时里面选举出来。

  private def processStartup(): Unit = {zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)elect()}

http://www.ppmy.cn/embedded/139993.html

相关文章

雷达图像用dB使图像细节更好

数字例子:雷达图像转换为dB尺度的影响 假设我们有一个雷达图像,其Sigma0值的线性表示如下: 像素A的Sigma0值为0.2(较暗)像素B的Sigma0值为1.0(中等亮度)像素C的Sigma0值为5.0(较亮…

Nginx通过url获取代理地址,动态代理

目的 底层服务返回的ws地址代理成wss的, 但是我们不知道底层服务返回的地址ip port所以 通过拼接的方式来通过url信息中获取到ws地址信息,进行动态代理 nginx.conf server {listen 11081 ssl;server_name localhost11081;rewrite ^/old-url$ /new-url permanent;add_…

如何在WPF中嵌入其它程序

在WPF中嵌入其它程序&#xff0c;这里提供两种方案 一、使用WindowsFormHost 使用步骤如下 1、添加WindowsFormsIntegration和System.Windows.Forms引用 2、在界面上放置WindowsFormHost和System.Windows.Forms.Panel 1 <Grid> 2 <WindowsFormsHost> 3…

Oracle - 多区间按权重取值逻辑 ,分时区-多层级-取配置方案(三)

本篇紧跟第一篇&#xff0c; 和 第二篇无关 Oracle - 多区间按权重取值逻辑 &#xff0c;分时区-多层级-取配置方案 Oracle - 多区间按权重取值逻辑 &#xff0c;分时区-多层级-取配置方案(二) 先说需求&#xff1a; 某业务配置表&#xff0c;按配置的时间区间及组织层级取方…

Ubuntu上安装MySQL并且实现远程登录

目录 下载网络工具 查看网络连接 更新系统软件包&#xff1b; 安装mysql数据库 查看mysql数据库状态 以数字ip形式显示mysql的监听状态。&#xff08;默认监听端口是3306&#xff09; 查看安装mysql数据库时系统创建的目录信息。 根据查询到的系统用户名以及随机密码&a…

超高流量多级缓存架构设计!

文章内容已经收录在《面试进阶之路》&#xff0c;从原理出发&#xff0c;直击面试难点&#xff0c;实现更高维度的降维打击&#xff01; 文章目录 电商-多级缓存架构设计多级缓存架构介绍多级缓存请求流程负载均衡算法的选择轮询负载均衡一致性哈希负载均衡算法选择 应用层 Ngi…

Perfetto学习大全

Perfetto 是一个功能强大的性能分析和追踪工具&#xff0c;主要用于捕获和分析复杂系统中的事件和性能数据&#xff0c;特别是在 Android 和 Linux 环境下。它的核心目标是帮助开发者深入了解系统和应用程序的运行状态&#xff0c;以便优化性能和诊断问题。 Perfetto的主要作用…

【蓝桥杯备赛】深秋的苹果

# 4.1.1. 题目解析 要求某个区间内的数字两两相乘的总和想到前缀和&#xff0c;但是这题重点在于两两相乘先硬算&#xff0c;找找规律&#xff1a; 比如要算这串数字的两两相乘的积之和&#xff1a; 1, 2, 3 1*2 1*3 2*3 1*(23) 2*3 前缀和数组&#xff1a; 1 3 6 发现…