关于otter监控告警使用

server/2024/12/3 0:19:50/

一、背景       

        近期在使用otter完成单机房单向同步时,常常遇到channel假死的情况,导致Pipeline同步停止,系统表数据同步停止,影响生产环境用户数据查询相关的功能,虽然事后能够通过停channel后再启用channel重新启用同步任务,恢复需要同步的数据,但时常出现该问题而不能及时发现让人头疼不已。通过查询相关的资料有发现早有同学遇到过类似的问题,初步认为是otter的调度算法导致死锁导致任务停止,但目前仍然没有石锤,且otter开源团队目前未对该问题进行官方解答。具体问题描述可参考:https://github.com/alibaba/otter/issues/911

       为了解决该问题,曾尝试在线下复现该问题,但是以失败告终,然后就换了一个思路:是否能及时发现同步停止的问题呢?按照这个思路我看到了官方其实目前支持五种同步的告警:延迟、Pipeline延迟、Process延迟、Position延迟、异常监控告警,同时还提供了告警自我恢复机制;通过使用测试其中的Pipeline延迟机制并开启自我恢复机制,发现确实能够及时的完成告警,而且在触发自我恢复阀的情况,系统能够自动完成channel的stop,然后自动再start,channel的同步任务恢复正常,现就针对otter监控告警机制进行说明。

二、otter监控机制解析:

       首先得看一下otter监控机制的流程:1.首先我们通过otter的控制台为channel下的Pipeline配置监控。2.otter-manager在启动时会启用一个单线程的定时任务线程池,定时任务每120秒执行一次。2.1 该线程池任务在执行时会查询当前所有启用的监控记录;2.2 然后对监控规则按照Pipeline进行分组;2.3 遍历分组的后的具体规则。2.4 查询Pipeline信息,然后根据Pipeline中的channelId查询zookeeper上对应channel的状态,若channel的状态节点为null或者未停止状态则不执行后监控逻辑。2.5 根据不同的监控类型执行具体的判断逻辑;若当前的统计的数据满足监控告警的条件则执行告警逻辑,若开启了自我恢复机制则尝试恢复channel任务同步。源码分析如下:

#一、SelfMonitor中的start()开启监控
private synchronized void start() {if (executor == null) {// 创建定时任务线程池,单线程executor = new ScheduledThreadPoolExecutor(DEFAULT_POOL, new NamedThreadFactory("Self-Monitor"),new ThreadPoolExecutor.CallerRunsPolicy());}if (future == null) {// 每120秒执行一次future = executor.scheduleWithFixedDelay(new Runnable() {public void run() {try {// 调用GlobalMonitor#explore()monitor.explore();} catch (Exception e) {log.error("self-monitor failed.", e);}}}, interval, interval, TimeUnit.SECONDS);}}#二、GlobalMonitor#explore() 获取监控列表,默认是并行执行监控任务
public void explore() {// 查询了所有启用的监控列表通知根据Pipeline进行分组Map<Long, List<AlarmRule>> rules = alarmRuleService.getAlarmRules(AlarmRuleStatus.ENABLE);if (!CollectionUtils.isEmpty(rules)) {if (needConcurrent) {concurrentProcess(rules);} else {// 串行serialProcess(rules);}} else {log.warn("no enabled alarm rule at all. Check the rule setting please!");}// 自动恢复机制if (recoveryPaused) {List<Long> channelIds = channelService.listAllChannelId();if (needConcurrent) {concurrentProcess(channelIds);} else {// 串行serialProcess(channelIds);}}}#三、GlobalMonitor#concurrentProcess()将每个Pipeline的监控列表提交给线程池去执行
private void concurrentProcess(Map<Long, List<AlarmRule>> rules) {ExecutorCompletionService completionExecutor = new ExecutorCompletionService(executor);List<Future> futures = new ArrayList<Future>();for (Entry<Long, List<AlarmRule>> entry : rules.entrySet()) {final List<AlarmRule> alarmRules = entry.getValue();futures.add(completionExecutor.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {pipelineMonitor.explore(alarmRules);return null;}}));}List<Throwable> exceptions = new ArrayList<Throwable>();int index = 0;int size = futures.size();while (index < size) {try {Future<?> future = completionExecutor.take();future.get();} catch (InterruptedException e) {exceptions.add(e);} catch (ExecutionException e) {exceptions.add(e);}index++;}if (!exceptions.isEmpty()) {StringBuilder sb = new StringBuilder(exceptions.size() + " exception happens in global monitor\n");sb.append("exception stack start :\n");for (Throwable t : exceptions) {sb.append(ExceptionUtils.getStackTrace(t));}sb.append("exception stack end \n");throw new IllegalStateException(sb.toString());}}#四、PipelineMonitor#explore(List<AlarmRule> rules)完成不同类型的监控分发执行
public void explore(List<AlarmRule> rules) {Long pipelineId = rules.get(0).getPipelineId();Pipeline pipeline = pipelineService.findById(pipelineId);// 如果处于stop状态,则忽略报警ChannelStatus status = arbitrateManageService.channelEvent().status(pipeline.getChannelId());if (status == null || status.isStop()) {return;}List<AlarmRule> delayTimeRules = new LinkedList<AlarmRule>();List<AlarmRule> exceptonRules = new LinkedList<AlarmRule>();List<AlarmRule> pipelineTimeoutRules = new LinkedList<AlarmRule>();List<AlarmRule> processTimeoutRules = new LinkedList<AlarmRule>();List<AlarmRule> positionTimeoutRules = new LinkedList<AlarmRule>();Date now = new Date();for (AlarmRule rule : rules) {switch (rule.getMonitorName()) {case DELAYTIME:if (checkEnable(rule, now)) {delayTimeRules.add(rule);}break;case EXCEPTION:if (checkEnable(rule, now)) {exceptonRules.add(rule);}break;case PIPELINETIMEOUT:if (checkEnable(rule, now)) {pipelineTimeoutRules.add(rule);}break;case PROCESSTIMEOUT:if (checkEnable(rule, now)) {processTimeoutRules.add(rule);}break;case POSITIONTIMEOUT:if (checkEnable(rule, now)) {positionTimeoutRules.add(rule);}break;default:break;}}if (!delayTimeRules.isEmpty()) {delayStatRuleMonitor.explore(delayTimeRules);}if (!pipelineTimeoutRules.isEmpty()) {pipelineTimeoutRuleMonitor.explore(pipelineTimeoutRules);}if (!processTimeoutRules.isEmpty()) {processTimeoutRuleMonitor.explore(processTimeoutRules);}if (!positionTimeoutRules.isEmpty()) {positionTimeoutRuleMonitor.explore(positionTimeoutRules);}}#五、具体的监控实现类完成监控规则检查实施告警和自我恢复机制处理

       上面是otter监控告警类体系,每个监控实现都支持自我恢复机制,告警邮件发送,可配置单独的告警阀值,告警间隔等个性化参数。

       otter的node节点在完成数据同步同时,还会统计同步数据,监控告警机制目前主要是通过这些统计的数据进行的逻辑处理实现告警:

      1)Process延迟:zookeeper维护了节点/otter/channel/{0}/{1}/process信息,从该节点中获取延迟时间;

      2)Position延迟:binlog日志的位点信息是维护在zookeeper节点:

/otter/canal/destinations/%s/%s/cursor,通过监控该节点存储的位点信息可以及时发现Position是否延迟。

      3)Pipeline延迟:node节点会记录每个Pipeline同步数据记录耗时情况,通过监控Pipeline最近一次的同步的记录,判断是否存在Pipeline没有同步的情况,数据表为:throughput_stat。

      4)Exception监控:DeadNodeListener在监控到对应的node节点变化时,若是对应的节点出现删除变化,则会执行该Exception监控逻辑。

      5)延迟告警:数据来源于表:delay_stat,node节点会维护每一个Pipeline的同步延迟时间,通过监控该表可以及时发现同步延迟情况。

三、具体使用

3.1 选在需要配置告警的channel,点击对应Pipeline后面的监控

3.2 进入监控页面选择添加按钮(提供了一件添加-将五种类型的告警配置全部新增)

3.3 保存后一定要启用对应的监控

四、针对otter的监控的一些其他想法

       1.otter自身维护了同步相关的统计数据,并保存在数据库中,可以监控这些数据进行告警,otter本身也是基于这些数据进行的告警,可以根据自己的需要合理使用这些统计数据进行监控。

       2.otter在zookeeper上保存了许多的数据,其中关于binlog的位点信息、各node的节点在启动时就会在zookeeper上注册信息,可以通过监控zookeeper的上的这些节点信息进行自定义监控。


http://www.ppmy.cn/server/146877.html

相关文章

基于PHP的音乐网站的设计与实现

摘 要 本系统采用PHP编程语言和MySQL数据库技术搭载了Apache服务器&#xff0c;完成了基于PHP的音乐网站设计&#xff0c;通过此次毕 业论文的撰写我明白了对于论文的选题要精确&#xff0c;要明确&#xff0c;要有明确的见解&#xff0c;要有足够的论证和创意&#xff0c;必须…

Fastify装饰器:增强你的路由处理功能加入日志

Fastify以其出色的性能和扩展性脱颖而出。装饰器是Fastify提供的一个强大功能&#xff0c;它允许开发者在不修改核心代码的情况下&#xff0c;向请求&#xff08;Request&#xff09;和响应&#xff08;Response&#xff09;对象添加自定义属性和方法。本文将通过一个简单的示例…

linux桌面qt应用程序UI自动化实现之dogtail

1. 前言 Dogtail适用于Linux 系统上进行 GUI 自动化测试,利用 Accessibility 技术与桌面程序通信;Dogtail 包含一个名为 sniff 的组件,这是一个嗅探器,用于 GUI 程序追踪; 源码下载:​​dogtail PyPI 可通过sudo python setup.py install安装或sudo pip install dogt…

Network Link Conditioner Mac 上模拟网络环境工具的安装和使用

前言 Xcode 的模拟器本身是不支持模拟网络环境的&#xff0c;在开发界面的时候&#xff0c;设计会出无网、弱网这种情况的设计图&#xff0c;为了方便在开发过程中实现这些情况的代码逻辑&#xff0c;Network Link Conditioner 就是模拟网络环境的好帮手。 安装 Network Lin…

修改插槽样式,el-input 插槽 append 的样式

需缩少插槽 append 的 宽度 方法1、使用内联样式直接修改&#xff0c;指定 width 为 30px <el-input v-model"props.applyBasicInfo.outerApplyId" :disabled"props.operateCommandType input-modify"><template #append><el-button click…

Leetcode3232:判断是否可以赢得数字游戏

题目描述&#xff1a; 给你一个 正整数 数组 nums。 Alice 和 Bob 正在玩游戏。在游戏中&#xff0c;Alice 可以从 nums 中选择所有个位数 或 所有两位数&#xff0c;剩余的数字归 Bob 所有。如果 Alice 所选数字之和 严格大于 Bob 的数字之和&#xff0c;则 Alice 获胜。 如…

Ubuntu中的apt update 和 apt upgrade

apt update 和 apt upgrade 是 Debian 及其衍生发行版&#xff08;如 Ubuntu&#xff09;中常用的两个 APT 包管理命令&#xff0c;它们各自执行不同的任务&#xff1a; apt update: 这个命令用于更新本地软件包列表。当你运行 apt update 时&#xff0c;APT 会从配置的源&…

PyTorch:神经网络的基本骨架 nn.Module的使用

神经网络的基本骨架 nn.Module的使用 为了更全面地展示如何使用 nn.Module 构建一个适用于现代图像处理任务的卷积神经网络&#xff08;CNN&#xff09;&#xff0c;我们将设计一个针对手写数字识别&#xff08;如MNIST数据集&#xff09;的简单CNN模型。CNN非常适合处理图像数…