当kafka消费的数据滞后1000条时,打印告警信息

server/2024/11/17 18:03:31/

要在 Kafka
消费者中实现当数据滞后1000条时打印告警信息,你需要在消费循环中添加逻辑来检查当前消费者的偏移量与主题中的最新偏移量之间的差异。如果这个差异大于1000,就打印告警信息。以下是修改后的代码示例:

package com.mita.web.core.config.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** @author sunpeiyang* @date 2024/11/12 14:54*/
public class KafkaConsumerDemo {public static void main(String[] args) {int numConsumers = 5; // 增加消费者的数量for (int i = 0; i < numConsumers; i++) {new Thread(new KafkaConsumerThread()).start();}}static class KafkaConsumerThread implements Runnable {private static final int ALERT_THRESHOLD = 1000; // 设置告警阈值@Overridepublic void run() {// 配置消费者属性Properties props = new Properties();props.put("bootstrap.servers", "4.15.18.14:9092");props.put("group.id", "test-group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000"); // 增加自动提交偏移量的间隔props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 调整消费者配置props.put("fetch.min.bytes", "1"); // 减少最小获取字节数props.put("fetch.max.wait.ms", "100"); // 减少最大等待时间props.put("max.poll.records", "500"); // 增加一次拉取的最大记录数// 创建消费者实例KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {processRecords(records); // 异步处理消息checkLag(ALERT_THRESHOLD, consumer, "test-topic"); // 检查滞后并告警consumer.commitAsync(); // 异步提交偏移量}}}private void processRecords(ConsumerRecords<String, String> records) {// 异步处理消息的逻辑for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());// 这里可以添加消息处理逻辑,例如使用线程池并行处理}}private void checkLag(int threshold, KafkaConsumer<String, String> consumer, String topic) {for (TopicPartition partition : consumer.assignment()) {long currentOffset = consumer.position(partition);long endOffset = consumer.endOffsets(Collections.singleton(partition)).values().iterator().next();long lag = endOffset - currentOffset;if (lag > threshold) {System.out.printf("Alert: Consumer lag for partition %s is %d, which exceeds the threshold of %d%n", partition, lag, threshold);}}}}
}

这里你可以发送钉钉消息等告警信息

在这里插入图片描述
在这里插入图片描述

其实我的积压很多,哈哈

在这里插入图片描述
积压的数据还有400多万,怎么快速的处理积压数据,当前代码也有做处理哈


http://www.ppmy.cn/server/142701.html

相关文章

第二十一周机器学习笔记:动手深度学习之——数据操作、数据预处理

第二十周周报 摘要Abstract一、动手深度学习1. 数据操作1.1 数据基本操作1.2 数据运算1.2.1 广播机制 1.3 索引和切片 2. 数据预处理 二、复习RNN与LSTM1. Recurrent Neural Network&#xff08;RNN&#xff0c;循环神经网络&#xff09;1.1 词汇vector的编码方式1.2 RNN的变形…

常见git命令记录

记录一些常见的git操作 下载代码 下载 git clone [代码连接] 切分支 git branch -b [分支名] 提交代码 添加 git add [需要提交的代码路径] 提交 git commit -m "一些骚话" push git push origin HEAD:refs/for/[仓名称] 通过diff文件&#xff0c;同步修…

PCL 点云拟合 最小二乘法拟合平面

目录 一、概述 1.1原理 1.2实现步骤 1.3应用场景 二、代码实现 2.1关键函数 2.1.1计算点云质心 2.1.2最小二乘法拟合平面 2.1.3可视化函数 2.2完整代码 三、实现效果 PCL点云算法汇总及实战案例汇总的目录地址链接: PCL点云算法与项目实战案例汇总(长期更新) 一…

Dubbo 使用轻量的 Java SDK 开发 RPC Server 和 Client

本示例演示如何使用轻量 Dubbo SDK 开发 RPC Server 与 Client&#xff0c;示例使用 Java Interface 方式定义、发布和访问 RPC 服务&#xff0c;底层使用 Triple 协议通信。本示例完整代码请参见 dubbo-samples。 基于 Dubbo 定义的 Triple 协议&#xff0c;你可以轻松编写浏…

mindtorch study

安装 pip install mindtorch mindtorch 用于帮助迁移torch模型到mindspore 大部分都可以直接把mindtorch的torch搞成torch&#xff0c;就和以前的代码一致&#xff0c;注意下面 只有静态图有点点差异 step也有差异 自定义优化器就麻烦了。 pyttorch还是牛啊 并行计算还是用的…

Python →爬虫实践

爬取研究中心的书目 现在&#xff0c;想要把如下网站中的书目信息爬取出来。 案例一 耶鲁 Publications | Yale Law School 分析网页&#xff0c;如下图所示&#xff0c;需要爬取的页面&#xff0c;标签信息是“<p>”&#xff0c;所以用 itemssoup.find_all("p&…

String类型

String类 在Java中&#xff0c;String 类是一个非常核心且常用的类&#xff0c;它用于表示文本值&#xff0c;即字符序列或者说字符串。 1.1 类的声明 public final class Stringimplements java.io.Serializable, Comparable<String>, CharSequence 解释&#xff1a…

llamaindex实战-Agent-在Agent中和数据库对话(本地部署)

概述 本文实现了一个简单的智能Agent&#xff0c;该Agent先从数据库中查询出数据&#xff0c;然后再通过工具函数来对数据进行处理。这是一个非常常见的场景。从这个场景可以扩展到多个实际的场景。 同样&#xff0c;本文的实验都是在本地一台&#xff1a;16C32G的linux机器(…