提高错误日志处理效率!使用Python和钉钉机器人实现自动告警聚合

news/2024/11/17 18:42:15/

1、背景

日志是非常重要的信息资源。它们记录了应用程序的运行状态、错误和异常情况,帮助我们了解系统的健康状况以及发现潜在的问题。为了高效地管理和分析日志数据,许多组织采用了Elasticsearch、Logstash和Kibana(ELK)堆栈作为日志收集和分析的解决方案。

开发一个实时监控和告警脚本,专门用于监控ELK平台中的错误日志,并及时发送告警通知给相关人员。该系统将通过扫描Elasticsearch中的日志数据,筛选出等级为ERROR的错误日志,并根据预设的告警规则进行处理。

2、目的

使用Python从Elasticsearch中查询特定级别为ERROR的错误日志,并通过钉钉机器人实现告警聚合和发送,以提高错误日志的处理效率和及时响应能力。

为什么开发这个脚本?
因为目前我们这边没有监控日志的信息,出现问题不能及时发现 和预知
优势
1、消息进行聚合,每个项目的多条告警信息,汇总一条发送。突破钉钉机器人每分钟只能发送20条的限制
2、告警信息you太多的重复,进行去重处理,添加告警次数发送。防止被钉钉限流
在这里插入图片描述

3、原理

  1. 使用Python的Elasticsearch库连接到Elasticsearch集群。
  2. 构建Elasticsearch查询DSL(领域专用语言),过滤出级别为ERROR的日志记录。
  3. 执行查询并获取结果。
  4. 对查询结果进行聚合,统计每个项目的错误次数。
  5. 根据聚合结果,生成告警消息的Markdown格式内容。
  6. 使用钉钉机器人发送告警消息到指定的钉钉群。

4、流程

  1. 导入必要的Python库,包括elasticsearchrequests
  2. 创建Elasticsearch连接,指定Elasticsearch集群的主机和端口。
  3. 构建Elasticsearch查询DSL,设置查询条件为日志级别为ERROR。
  4. 执行查询,获取查询结果。
  5. 对查询结果进行处理,聚合每个项目的错误次数。
  6. 根据聚合结果生成告警消息的Markdown内容。
  7. 使用钉钉机器人API发送告警消息到指定的钉钉群。

5、实现代码

在这里插入图片描述


# -*- coding: utf-8 -*-
# @Time    : 2023/6/17 18:11
# @Author  : 南宫乘风
# @Email   : 1794748404@qq.com
# @File    : all_es.py
# @Software: PyCharm
from collections import Counter
from datetime import datetime, timedeltaimport requests
from elasticsearch import Elasticsearchfrom monitor.es_ding import send_pretty_message# Elasticsearch客户端实例
es = Elasticsearch(hosts=['http://172.18.xxx.xxxx:9200'], http_auth=('elastic', 'xxxxx'),sniff_on_start=True,  # 连接前测试sniff_on_connection_fail=True,  # 节点无响应时刷新节点sniff_timeout=300,  # 设置超时时间headers={'Content-Type': 'application/json'})def format_timestamp(timestamp):"""格式化时间为Elasticsearch接受的字符串格式"""return timestamp.strftime("%Y-%m-%d %H:%M:%S")def search_errors():"""执行查询,获取错误日志数据"""current_time = datetime.now()one_minute_ago = current_time - timedelta(minutes=10)current_time_str = format_timestamp(current_time)one_minute_ago_str = format_timestamp(one_minute_ago)index = 'app-prod-*'  # 替换为实际的索引名称query = {"query": {"bool": {"filter": [{"range": {"@timestamp": {"gte": one_minute_ago_str,"lt": current_time_str,"format": "yyyy-MM-dd HH:mm:ss","time_zone": "+08:00"}}},{"match": {"loglevel": "ERROR" #匹配项目错误等级}},{"bool": {"must_not": [{"match": {"projectname": "fox-data-spiderman" # 需要屏蔽的项目}}]}}]}},"_source": [  ## 输出的字段"date","projectname","threadname","msg"],"from": 0,"size": 10000, # 返回查询的条数}result = es.search(index=index, body=query)total_documents = result["hits"]["total"]["value"]print(f"总共匹配到 {total_documents} 条文档")result = result['hits']['hits']all_result = []for i in result:all_result.append(i['_source'])msg_counter = Counter(d['msg'] for d in all_result if 'msg' in d)results = []for d in all_result:if 'msg' in d and d['msg'] in msg_counter:count = msg_counter[d['msg']]del msg_counter[d['msg']]d['count'] = countd['msg'] = d['msg'][:100] + ('...' if len(d['msg']) > 100 else '')results.append(d)return resultsdef aggregate_errors(results):"""按项目名称聚合错误日志"""aggregated_data = {}for d in results:projectname = d.get('projectname')if projectname:if projectname not in aggregated_data:aggregated_data[projectname] = []aggregated_data[projectname].append({'date': d.get('date'), 'msg': d.get('msg'), 'count': d.get('count')})return aggregated_datadef generate_summary(projectname, messages):"""生成Markdown格式的消息摘要"""markdown_text = f'### {projectname} \n\n'for message in messages:markdown_text += f"**时间:** {message['date']}\n\n"markdown_text += f"**告警次数:** <font color='red'><b>{message['count']}</b></font>\n\n"markdown_text += f"{message['msg']}\n\n---\n\n"return markdown_textdef send_message_summary(projectname, messages):"""发送摘要消息给钉钉机器人"""summary = generate_summary(projectname, messages)data = {'msgtype': 'markdown','markdown': {'title': f'{projectname}消息告警','text': summary}}webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxx'  # 替换为实际的Webhook URLresponse = requests.post(webhook_url, json=data)if response.status_code == 200:print('消息发送成功')else:print('消息发送失败')if __name__ == '__main__':errors = search_errors()aggregated_errors = aggregate_errors(errors)for projectname, messages in aggregated_errors.items():print(f"{projectname}:")print(messages)

在这里插入图片描述

6、Crontab添加定时任务

也可以用采用:Jenkins与GitLab的定时任务工作流程
https://blog.csdn.net/heian_99/article/details/131164591?spm=1001.2014.3001.5501

#日志
*/2 * * * * cd /python_app/elasticsearch; /opt/anaconda3/envs/py38/bin/python -u  es_monitor.py >> es_error_info.log 2>&1

该定时任务的含义是每隔2分钟执行一次指定目录下的 es_monitor.py 脚本,并将输出信息追加到 es_error_info.log 文件中。这样可以定期监控 Elasticsearch 的错误日志,并记录相关信息以便后续查看和分析。

7、总结

本博客,为我们构建了一个完整的应用日志监控和告警系统,通过ELK技术栈和钉钉机器人的结合,使得我们能够及时发现和处理应用中的错误,提高了团队的工作效率和系统的稳定性。


http://www.ppmy.cn/news/438187.html

相关文章

大疆哪吒飞控naza-m等无法解锁的问题遥控无法启动电机不转解决疑难杂症。

大疆的飞控都是闭源飞控。操作简单&#xff0c;稳定性好。通常情况下&#xff0c;不会出现无法解锁的情况。假如出现了这个情况&#xff0c;可以证明可能你设计了你的飞行器只是用了他的飞控而已。但是下一次就无法解锁。当然&#xff0c;遥控器。以及其他的硬件&#xff0c;包…

第7讲2 --- 求解相机的运动

上一篇博客中学习了特征提取和匹配的概念&#xff0c;并且调用OpenCV库实现了ORB特征的提取和匹配。 找到了匹配点后&#xff0c;我们希望能够根据匹配的 点对 来估计相机的运动。一共有三种方法可以用来估计相机的运动&#xff0c;分别为对极几何约束、PnP、ICP。这三种方法…

大疆3508、2006的输出轴的角度获取

因为大疆3508、2006电调的报文只有转子的角度&#xff0c;在使用过程中可能需要用的到输出轴的角度&#xff0c;比如用PID控制电机的输出轴角度。 把转子的角度累积到输出轴上 算法思路如下&#xff1a; 1、我们知道转子每周的角度随着方向不一样有两种情况&#xff0c;一种累…

大疆 DJI SDK 开发介绍

大疆 DJI SDK 开发介绍 转自&#xff1a;http://blog.sina.com.cn/s/blog_6266a8840102xn4x.html 大疆SDK开发分为三种&#xff1a;Mobile SDK&#xff0c;Onboard SDK&#xff0c;Guidance SDK。 1.Mobile SDK以大疆遥控器作为通信设备&#xff0c;以高数据速率传输所有数据…

在DJI大疆做算法的日常工作与体验~

大家好&#xff0c;我是对白。 今天给大家分享一位朋友在大疆做算法工程师的日常&#xff0c;工作内容还是比较有趣的&#xff0c;也给我们科普了大疆的一些福利&#xff0c;以下为原文。 作者&#xff1a;全之 | 编辑&#xff1a;对白的算法屋 https://zhuanlan.zhihu.com/p/…

机器人运动学【2】

目录 1.刚体状态的表达2.顺向运动学及DH表3.逆向运动学 1.刚体状态的表达 我们前面已经学习了刚体移动和转动的表达&#xff0c;那么怎么将两者在数学上结合呢&#xff1f;这里我们开始构造如下矩阵&#xff0c;记作: 下面我们来看一下只有移动情况下的刚体的描述&#xff1a;…

JVM知识点整理

JVM 回收哪个区域&#xff1f;关联面试题&#xff1a;fullgc会回收方法区&#xff08;元空间&#xff09;吗? 怎么判断对象可以被回收了关联面试题&#xff1a;哪些对象可以作为 GC Root &#xff08;两栈两方法&#xff09; JVM GC什么时候执行&#xff1f;分代回收机制思考&…

大疆无人机支持移动开发二次开发的设备支持

Mobile SDK&#xff1a;旨在让开发者能够访问DJI无人机和手持相机产品的丰富功能。该SDK通过兼顾更底层的功能&#xff0c;诸如飞行稳定&#xff0c;电池管理&#xff0c;信号传输和通信等&#xff0c;简化了应用程序开发的过程。连接框图及设备支持如下图&#xff1a; 参考htt…