1、背景
日志是非常重要的信息资源。它们记录了应用程序的运行状态、错误和异常情况,帮助我们了解系统的健康状况以及发现潜在的问题。为了高效地管理和分析日志数据,许多组织采用了Elasticsearch、Logstash和Kibana(ELK)堆栈作为日志收集和分析的解决方案。
开发一个实时监控和告警脚本,专门用于监控ELK平台中的错误日志,并及时发送告警通知给相关人员。该系统将通过扫描Elasticsearch中的日志数据,筛选出等级为ERROR的错误日志,并根据预设的告警规则进行处理。
2、目的
使用Python从Elasticsearch中查询特定级别为ERROR的错误日志,并通过钉钉机器人实现告警聚合和发送,以提高错误日志的处理效率和及时响应能力。
为什么开发这个脚本?
因为目前我们这边没有监控日志的信息,出现问题不能及时发现 和预知
优势
1、消息进行聚合,每个项目的多条告警信息,汇总一条发送。突破钉钉机器人每分钟只能发送20条的限制
2、告警信息you太多的重复,进行去重处理,添加告警次数发送。防止被钉钉限流
3、原理
- 使用Python的Elasticsearch库连接到Elasticsearch集群。
- 构建Elasticsearch查询DSL(领域专用语言),过滤出级别为ERROR的日志记录。
- 执行查询并获取结果。
- 对查询结果进行聚合,统计每个项目的错误次数。
- 根据聚合结果,生成告警消息的Markdown格式内容。
- 使用钉钉机器人发送告警消息到指定的钉钉群。
4、流程
- 导入必要的Python库,包括
elasticsearch
和requests
。 - 创建Elasticsearch连接,指定Elasticsearch集群的主机和端口。
- 构建Elasticsearch查询DSL,设置查询条件为日志级别为ERROR。
- 执行查询,获取查询结果。
- 对查询结果进行处理,聚合每个项目的错误次数。
- 根据聚合结果生成告警消息的Markdown内容。
- 使用钉钉机器人API发送告警消息到指定的钉钉群。
5、实现代码
# -*- coding: utf-8 -*-
# @Time : 2023/6/17 18:11
# @Author : 南宫乘风
# @Email : 1794748404@qq.com
# @File : all_es.py
# @Software: PyCharm
from collections import Counter
from datetime import datetime, timedeltaimport requests
from elasticsearch import Elasticsearchfrom monitor.es_ding import send_pretty_message# Elasticsearch客户端实例
es = Elasticsearch(hosts=['http://172.18.xxx.xxxx:9200'], http_auth=('elastic', 'xxxxx'),sniff_on_start=True, # 连接前测试sniff_on_connection_fail=True, # 节点无响应时刷新节点sniff_timeout=300, # 设置超时时间headers={'Content-Type': 'application/json'})def format_timestamp(timestamp):"""格式化时间为Elasticsearch接受的字符串格式"""return timestamp.strftime("%Y-%m-%d %H:%M:%S")def search_errors():"""执行查询,获取错误日志数据"""current_time = datetime.now()one_minute_ago = current_time - timedelta(minutes=10)current_time_str = format_timestamp(current_time)one_minute_ago_str = format_timestamp(one_minute_ago)index = 'app-prod-*' # 替换为实际的索引名称query = {"query": {"bool": {"filter": [{"range": {"@timestamp": {"gte": one_minute_ago_str,"lt": current_time_str,"format": "yyyy-MM-dd HH:mm:ss","time_zone": "+08:00"}}},{"match": {"loglevel": "ERROR" #匹配项目错误等级}},{"bool": {"must_not": [{"match": {"projectname": "fox-data-spiderman" # 需要屏蔽的项目}}]}}]}},"_source": [ ## 输出的字段"date","projectname","threadname","msg"],"from": 0,"size": 10000, # 返回查询的条数}result = es.search(index=index, body=query)total_documents = result["hits"]["total"]["value"]print(f"总共匹配到 {total_documents} 条文档")result = result['hits']['hits']all_result = []for i in result:all_result.append(i['_source'])msg_counter = Counter(d['msg'] for d in all_result if 'msg' in d)results = []for d in all_result:if 'msg' in d and d['msg'] in msg_counter:count = msg_counter[d['msg']]del msg_counter[d['msg']]d['count'] = countd['msg'] = d['msg'][:100] + ('...' if len(d['msg']) > 100 else '')results.append(d)return resultsdef aggregate_errors(results):"""按项目名称聚合错误日志"""aggregated_data = {}for d in results:projectname = d.get('projectname')if projectname:if projectname not in aggregated_data:aggregated_data[projectname] = []aggregated_data[projectname].append({'date': d.get('date'), 'msg': d.get('msg'), 'count': d.get('count')})return aggregated_datadef generate_summary(projectname, messages):"""生成Markdown格式的消息摘要"""markdown_text = f'### {projectname} \n\n'for message in messages:markdown_text += f"**时间:** {message['date']}\n\n"markdown_text += f"**告警次数:** <font color='red'><b>{message['count']}</b></font>\n\n"markdown_text += f"{message['msg']}\n\n---\n\n"return markdown_textdef send_message_summary(projectname, messages):"""发送摘要消息给钉钉机器人"""summary = generate_summary(projectname, messages)data = {'msgtype': 'markdown','markdown': {'title': f'{projectname}消息告警','text': summary}}webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxx' # 替换为实际的Webhook URLresponse = requests.post(webhook_url, json=data)if response.status_code == 200:print('消息发送成功')else:print('消息发送失败')if __name__ == '__main__':errors = search_errors()aggregated_errors = aggregate_errors(errors)for projectname, messages in aggregated_errors.items():print(f"{projectname}:")print(messages)
6、Crontab添加定时任务
也可以用采用:Jenkins与GitLab的定时任务工作流程
https://blog.csdn.net/heian_99/article/details/131164591?spm=1001.2014.3001.5501
#日志
*/2 * * * * cd /python_app/elasticsearch; /opt/anaconda3/envs/py38/bin/python -u es_monitor.py >> es_error_info.log 2>&1
该定时任务的含义是每隔2分钟执行一次指定目录下的 es_monitor.py 脚本,并将输出信息追加到 es_error_info.log 文件中。这样可以定期监控 Elasticsearch 的错误日志,并记录相关信息以便后续查看和分析。
7、总结
本博客,为我们构建了一个完整的应用日志监控和告警系统,通过ELK技术栈和钉钉机器人的结合,使得我们能够及时发现和处理应用中的错误,提高了团队的工作效率和系统的稳定性。