Kafka消费者如何优雅下线

embedded/2025/1/15 6:51:00/

一、背景

我们在Kafka消费程序中,可能会调用dubbo接口,也可能会使用线程池,连接池等,但是在服务下线的时候,kafka的消费总是会报错。比如dubbo接口就会抛出异常RpcException: The channel is closed. 这说明kafka还在消费,但是dubbo rpc接口已经销毁了。

com.alibaba.dubbo.rpc.RpcException: Failed to invoke the method ****************cause: The channel com.alibaba.dubbo.remoting.transport.netty4.NettyClient [10.69.14.13:0 -> /10.21.105.4:8898] is closed!at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:111) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke$original$WNyopBb1(InvokerInvocationHandler.java:52) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke$original$WNyopBb1$accessor$b7jdPeHU(InvokerInvocationHandler.java) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler$auxiliary$dhLDgYzF.call(Unknown Source) ~[dubbo-2.6.18.jar!/:2.6.18]at cn.techwolf.observer.instrument.interceptor.enhance.InstMethodsBeforeInter.intercept(InstMethodsBeforeInter.java:73) ~[twl-observer-agent.jar:1.9.1-SNAPSHOT]at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.common.bytecode.proxy0.getTags(proxy0.java) ~[dubbo-2.6.18.jar!/:2.6.18]at com.zhipin.zeus.engine.data.proxy.AbacusBizTagProxy.batchQueryTagValue(AbacusBizTagProxy.java:93) ~[classes!/:0.0.1-SNAPSHOT]at com.zhipin.zeus.engine.data.observer.bg.F1GrcdListBossObserver.handle(F1GrcdListBossObserver.java:94) ~[classes!/:0.0.1-SNAPSHOT]at com.zhipin.zeus.engine.data.consumer.BgActionConsumer.lambda$processBgAction$0(BgActionConsumer.java:97) ~[classes!/:0.0.1-SNAPSHOT]at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_342]

二、原因分析

我们直观的想一下,kakfa消费时调用了dubbo接口,那么项目启动时,dubbo接口bean应该早于kafka消费者加载;销毁时,顺序反过来,先销毁kafka消费者,再销毁dubbo接口bean才合理,为什么下线时,会出现kafka还在消费,dubbo bean却先一步销毁的情况呢?
dubbo关闭的钩子函数为DubboShutdownHook,项目启动时,com.alibaba.dubbo.config.AbstractConfig会将DubboShutdownHook注册到JVM,同时也会基于com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener将其注册到spring中去,而dubbo的钩子函数默认是早于spring的钩子函数触发的,都由JVM控制,两者是同级的。
那么将dubbo的钩子函数完全交于spring管理,是否能解决这个问题?
基于spring-context:5.3.31进行源码分析,可以看到org.springframework.context.support.AbstractApplicationContext#doClose方法中先发布了ContextClosedEvent事件,然后再关闭Kafka消费者,而ContextClosedEvent发布时,就会触发dubbo接口bean的关闭,导致出现dubbo接口先关闭而kafka消费者还在消费的情况。
在这里插入图片描述
com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener中可以看到监听到ContextClosedEvent事件则会执行dubbo的关闭。
在这里插入图片描述

三、解决方案

首先第一步,将dubbo的钩子函数从JVM中移除,由spring管理

DubboShutdownHook dubboShutdownHook = DubboShutdownHook.getDubboShutdownHook();
Runtime.getRuntime().removeShutdownHook(dubboShutdownHook);

第二步,我们可以基于SmartApplicationListener控制kafka消费者销毁早于dubbo接口,只要其getOrder()方法返回值小于Integer.MAX_VALUE(默认值)即可

import com.zhipin.zeus.engine.data.config.RocketMQConsumerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.util.StopWatch;/*** @Author: Sun Zhenghao* @Date: 2025/1/9*/
@Configuration
@Slf4j
public class GracefulShutDownListener implements SmartApplicationListener {/*** 对于@KafkaListenr注解实现的方法会自动注册到KafkaListenerEndpointRegistry中*/@Autowiredprivate KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;/*** 这里自己维护了所有的RocketMQ的consumer*/@Autowiredprivate RocketMQConsumerConfig rocketMQConsumerConfig;/*** 设置Listener被什么事件触发*/@Overridepublic boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {return eventType == ContextClosedEvent.class;}@Overridepublic boolean supportsSourceType(Class<?> sourceType) {return SmartApplicationListener.super.supportsSourceType(sourceType);}/*** 设置优先级,这里只说作为shutdownhook存在的Listener* return的数字越小,优先级越高,越靠前执行* @return*/@Overridepublic int getOrder() {return LOWEST_PRECEDENCE - 100;}@Overridepublic String getListenerId() {return SmartApplicationListener.super.getListenerId();}/*** 实现真正优雅下线的逻辑* @param event the event to respond to*/@Overridepublic void onApplicationEvent(ApplicationEvent event) {if (event instanceof ContextClosedEvent) {// 将全部的kafka consumer关闭log.info("kafka consumer size: {}", kafkaListenerEndpointRegistry.getListenerContainers().size());kafkaListenerEndpointRegistry.getListenerContainers().forEach(messageListenerContainer -> {log.info("kafka stop consume, groupId: {}", messageListenerContainer.getGroupId());messageListenerContainer.stop();});// 同样的rocketMQ也可以优雅关闭log.info("rocketMQ consumer size: {}", rocketMQConsumerConfig.getTopicConsumerMap().size());rocketMQConsumerConfig.getTopicConsumerMap().forEach((topic, consumer) -> {StopWatch stopWatch = new StopWatch();stopWatch.start();// 尝试关闭,会等待awaitTerminationMillisWhenShutdown msconsumer.shutdown();stopWatch.stop();log.info("DefaultMQPushConsumer shutdown success, topic: {}, cost: {}ms", topic, stopWatch.getTotalTimeMillis());});}}
}

四、总结

本文主要以kafka消费者和dubbo接口为例,给出了优雅下线的解决方案,实际上也适用于线程池、连接池、rocketMQ消费者等的优雅下线。


http://www.ppmy.cn/embedded/154047.html

相关文章

django基于Python的校园个人闲置物品换购平台

Django 基于 Python 的校园个人闲置物品换购平台 一、平台概述 Django 基于 Python 的校园个人闲置物品换购平台是专为校园师生打造的一个便捷、环保且充满活力的线上交易场所。它借助 Django 这一强大的 Python Web 开发框架&#xff0c;整合了校园内丰富的闲置物品资源&…

WEB攻防-通用漏洞_XSS跨站_MXSS_UXSS_FlashXSS_PDFXSS

目录 MXSS攻击 UXSS攻击 FlashXSS PDFXSS MXSS攻击 MXSS&#xff0c;全称“Mutation XSS”&#xff0c;MXSS攻击是一种特殊的XSS攻击类型&#xff0c;简单来说&#xff0c;就是XSS攻击的一种特殊形式&#xff0c;它通过利用网页内容的动态变化或特定条件触发&#xff0c;使…

Golang 并发之 Goroutine

Goroutine 是 Go 编程语言中的一个重要概念。它是 Go 语言实现并发的基础,可以简单地理解为 Go 语言中的轻量级线程。 具体来说,Goroutine 有以下特点: 1.轻量级: Goroutine 的创建和切换都非常快速,只需要几微秒。这与操作系统级别的线程相比要快得多。 2.并发性: Gorout…

工业视觉2-相机选型

工业视觉2-相机选型 一、按芯片类型二、按传感器结构特征三、按扫描方式四、按分辨率大小五、按输出信号六、按输出色彩接口类型 这张图片对工业相机的分类方式进行了总结&#xff0c;具体如下&#xff1a; 一、按芯片类型 CCD相机&#xff1a;采用电荷耦合器件&#xff08;CC…

【深度学习】PyTorch:手写数字识别

在这个技术博客中,我们将一起探索如何使用PyTorch来实现一个手写数字识别系统。这个系统将基于经典的MNIST数据集,这是一个包含60,000个训练样本和10,000个测试样本的手写数字(0-9)数据库。通过这个项目,你将了解如何使用PyTorch进行深度学习模型的构建、训练和评估。 文…

问题记录-Linux 下.sh脚本中变量不识别-2025-1-14

源文件: CROSS_COMPILE=/opt/cross_chain/gcc-linaro-7.5.0-2019.12-x86_64_aarch64-linux-gnu/bin/aarch64-linux-gnu- CC = $(CROSS_COMPILE)g++ 运行 ./auto.sh: line 4: CROSS_COMPILE: command not found ./auto.sh: line 4: CC: command not found 分析: 在 sh 脚本中…

Linux 高级路由 —— 筑梦之路

Linux 高级路由详解 本文将基于您提供的 Linux 高级路由极简教程 文章&#xff0c;深入探讨 Linux 高级路由的概念、配置方法以及应用场景。 一、什么是 Linux 高级路由&#xff1f; Linux 高级路由是指利用 Linux 内核提供的强大网络功能&#xff0c;实现超越传统路由表和默…

鸿蒙面试 2025-01-11

ArkTs 和TS的关系&#xff1f; ArkTS&#xff08;方舟开发语言&#xff09;与 TypeScript&#xff08;TS&#xff09;存在紧密联系&#xff0c;同时也有显著区别&#xff1a; 联系 语法基础&#xff1a;ArkTS 在语法层面大量借鉴了 TypeScript &#xff0c;TypeScript 里诸如…