Kafka消费者如何优雅下线

news/2025/1/16 0:03:36/

一、背景

我们在Kafka消费程序中,可能会调用dubbo接口,也可能会使用线程池,连接池等,但是在服务下线的时候,kafka的消费总是会报错。比如dubbo接口就会抛出异常RpcException: The channel is closed. 这说明kafka还在消费,但是dubbo rpc接口已经销毁了。

com.alibaba.dubbo.rpc.RpcException: Failed to invoke the method ****************cause: The channel com.alibaba.dubbo.remoting.transport.netty4.NettyClient [10.69.14.13:0 -> /10.21.105.4:8898] is closed!at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:111) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke$original$WNyopBb1(InvokerInvocationHandler.java:52) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke$original$WNyopBb1$accessor$b7jdPeHU(InvokerInvocationHandler.java) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler$auxiliary$dhLDgYzF.call(Unknown Source) ~[dubbo-2.6.18.jar!/:2.6.18]at cn.techwolf.observer.instrument.interceptor.enhance.InstMethodsBeforeInter.intercept(InstMethodsBeforeInter.java:73) ~[twl-observer-agent.jar:1.9.1-SNAPSHOT]at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java) ~[dubbo-2.6.18.jar!/:2.6.18]at com.alibaba.dubbo.common.bytecode.proxy0.getTags(proxy0.java) ~[dubbo-2.6.18.jar!/:2.6.18]at com.zhipin.zeus.engine.data.proxy.AbacusBizTagProxy.batchQueryTagValue(AbacusBizTagProxy.java:93) ~[classes!/:0.0.1-SNAPSHOT]at com.zhipin.zeus.engine.data.observer.bg.F1GrcdListBossObserver.handle(F1GrcdListBossObserver.java:94) ~[classes!/:0.0.1-SNAPSHOT]at com.zhipin.zeus.engine.data.consumer.BgActionConsumer.lambda$processBgAction$0(BgActionConsumer.java:97) ~[classes!/:0.0.1-SNAPSHOT]at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_342]

二、原因分析

我们直观的想一下,kakfa消费时调用了dubbo接口,那么项目启动时,dubbo接口bean应该早于kafka消费者加载;销毁时,顺序反过来,先销毁kafka消费者,再销毁dubbo接口bean才合理,为什么下线时,会出现kafka还在消费,dubbo bean却先一步销毁的情况呢?
dubbo关闭的钩子函数为DubboShutdownHook,项目启动时,com.alibaba.dubbo.config.AbstractConfig会将DubboShutdownHook注册到JVM,同时也会基于com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener将其注册到spring中去,而dubbo的钩子函数默认是早于spring的钩子函数触发的,都由JVM控制,两者是同级的。
那么将dubbo的钩子函数完全交于spring管理,是否能解决这个问题?
基于spring-context:5.3.31进行源码分析,可以看到org.springframework.context.support.AbstractApplicationContext#doClose方法中先发布了ContextClosedEvent事件,然后再关闭Kafka消费者,而ContextClosedEvent发布时,就会触发dubbo接口bean的关闭,导致出现dubbo接口先关闭而kafka消费者还在消费的情况。
在这里插入图片描述
com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener中可以看到监听到ContextClosedEvent事件则会执行dubbo的关闭。
在这里插入图片描述

三、解决方案

首先第一步,将dubbo的钩子函数从JVM中移除,由spring管理

DubboShutdownHook dubboShutdownHook = DubboShutdownHook.getDubboShutdownHook();
Runtime.getRuntime().removeShutdownHook(dubboShutdownHook);

第二步,我们可以基于SmartApplicationListener控制kafka消费者销毁早于dubbo接口,只要其getOrder()方法返回值小于Integer.MAX_VALUE(默认值)即可

import com.zhipin.zeus.engine.data.config.RocketMQConsumerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.util.StopWatch;/*** @Author: Sun Zhenghao* @Date: 2025/1/9*/
@Configuration
@Slf4j
public class GracefulShutDownListener implements SmartApplicationListener {/*** 对于@KafkaListenr注解实现的方法会自动注册到KafkaListenerEndpointRegistry中*/@Autowiredprivate KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;/*** 这里自己维护了所有的RocketMQ的consumer*/@Autowiredprivate RocketMQConsumerConfig rocketMQConsumerConfig;/*** 设置Listener被什么事件触发*/@Overridepublic boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {return eventType == ContextClosedEvent.class;}@Overridepublic boolean supportsSourceType(Class<?> sourceType) {return SmartApplicationListener.super.supportsSourceType(sourceType);}/*** 设置优先级,这里只说作为shutdownhook存在的Listener* return的数字越小,优先级越高,越靠前执行* @return*/@Overridepublic int getOrder() {return LOWEST_PRECEDENCE - 100;}@Overridepublic String getListenerId() {return SmartApplicationListener.super.getListenerId();}/*** 实现真正优雅下线的逻辑* @param event the event to respond to*/@Overridepublic void onApplicationEvent(ApplicationEvent event) {if (event instanceof ContextClosedEvent) {// 将全部的kafka consumer关闭log.info("kafka consumer size: {}", kafkaListenerEndpointRegistry.getListenerContainers().size());kafkaListenerEndpointRegistry.getListenerContainers().forEach(messageListenerContainer -> {log.info("kafka stop consume, groupId: {}", messageListenerContainer.getGroupId());messageListenerContainer.stop();});// 同样的rocketMQ也可以优雅关闭log.info("rocketMQ consumer size: {}", rocketMQConsumerConfig.getTopicConsumerMap().size());rocketMQConsumerConfig.getTopicConsumerMap().forEach((topic, consumer) -> {StopWatch stopWatch = new StopWatch();stopWatch.start();// 尝试关闭,会等待awaitTerminationMillisWhenShutdown msconsumer.shutdown();stopWatch.stop();log.info("DefaultMQPushConsumer shutdown success, topic: {}, cost: {}ms", topic, stopWatch.getTotalTimeMillis());});}}
}

四、总结

本文主要以kafka消费者和dubbo接口为例,给出了优雅下线的解决方案,实际上也适用于线程池、连接池、rocketMQ消费者等的优雅下线。


http://www.ppmy.cn/news/1563456.html

相关文章

uniapp火车票样式

<template><view class"train-ticket"><view class"header"><view class"header-left"><text class"logo">铁路</text><text class"ticket-type">电子客票</text></vie…

C#中颜色的秘密

颜色的秘密: 颜色Color是一个调色板, 所有颜色都是由透明度Alpha,红Red,绿Green,蓝Blue按不同比例调色混合而成,如果不考虑透明度Alpha,颜色共有256*256*25616777216种 ColorARGB A,R,G,B都为byte型[8位],因此可以用整体的32个整数[Int32]来表示一种颜色 Color 所属命名空…

Spring bean的生命周期和扩展

接AnnotationConfigApplicationContext流程看实例化的beanPostProcessor-CSDN博客&#xff0c;以具体实例看bean生命周期的一些执行阶段 bean生命周期流程 生命周期扩展处理说明实例化:createBeanInstance 构造方法&#xff0c; 如Autowired的构造方法注入依赖bean 如UserSer…

利用 NATIVE SQL 实现不区分供应商名字大小写进行模糊查询

公司有个需求 &#xff0c;当按用英文名字来进行查询时&#xff0c;可以实现不区分供应商名字大小写进行模糊查询。 例如&#xff1a;如果用户输入‘br’ 那么可以查出名字含有 ‘BR’、‘bR’、‘Br’ 、‘br’ 的供应商来。利用SAP 常规的 Open SQL 是实现不了的。 只能利用…

各种特种无人机快速发展,无人机反制技术面临挑战

随着科技的飞速发展&#xff0c;各种特种无人机在军事、民用等领域得到了广泛应用&#xff0c;其性能不断提升&#xff0c;应用场景也日益丰富。然而&#xff0c;无人机反制技术的发展确实面临一定的挑战&#xff0c;难以完全跟上无人机技术的快速发展步伐。以下是对这一问题的…

基于YOLOv8的高空无人机小目标检测系统(python+pyside6界面+系统源码+可训练的数据集+也完成的训练模型

目标检测系统【环境搭建过程】&#xff08;GPU版本&#xff09;-CSDN博客 摘要 本文提出了一种基于YOLOv8算法的高空无人机小目标检测系统&#xff0c;利用VisDrone数据集中的7765张图片&#xff08;6903张训练集&#xff0c;862张验证集&#xff09;进行模型训练&#xff0c;…

shell脚本(二)

1、需求&#xff1a;判断192.168.1.0/24网络中&#xff0c;当前在线的ip有哪些&#xff0c;并编写脚本打印出来。 #!/bin/bashNetwork"192.168.1"for i in {1..254}; doip"${Network}.${i}"if ping -c 1 -W 1 "$ip" > /dev/null 2>&…

认识机器学习中的结构风险最小化准则

上一篇文章我们学习了关于经验风险最小化准则&#xff0c;其核心思想是通过最小化训练数据上的损失函数来优化模型参数&#xff0c;从而提高模型在训练集上的表现。但是这也会导致一个问题&#xff0c;经验风险最小化原则很容易导致模型在训练集上错误率很低&#xff0c;但在未…