需求背景
告警分析处理流程
通常我们收到 Prometheus 告警事件通知后,往往都需要登录 Alertmanager 页面查看当前激活的告警,如果需要分析告警历史数据信息,还需要登录 Prometheus 页面的在 Alerts 中查询告警 promQL 表达式,然后复制到 Graph 中查询数据。
这样做无疑大大降低了故障分析和处理流程,此时有些聪明的小伙伴就在想,能不能在告警事件推送的时候,将这些历史数据绘制成图表一块推送过来,方便我们第一时间了解指标变化趋势,便于更快的处理问题。
功能需求分析
- 可靠性要求:无论告警图表数据是否能获取到,都不能影响告警信息的及时推送。(这是 webhook 最核心的功能,告警带图只是锦上添花,不可本末倒置,不能因为获取图表失败而导致告警事件无法正常推送。)
- 数据折线图显示:希望可以查询指定时间范围内数据变化趋势,便于分析排查问题,这是最核心的功能。
- 只显示异常 instance 数据:每个 job 都会对应很多 targets,我们只希望显示异常 instance 相关的数据,避免整个图表因数据过多导致显示混乱。
- 最大值最小值最近值显示:这三个值是我们最关注的信息,尤其是最近值,在告警触发时还是告警恢复后,我们都需要知道这个指标当前值是多少。
- 时间范围灵活:有些告警比较灵敏,for 时间会较短,通常不到 1 分钟(例如网络探针检测 http 状态码)。而有些告警比较迟钝,for 时间会设置的比较长,通常以为小时为单位(例如证书有效期监控)。而此时告警图表时间范围不应该统一设置为某个指定的值,而是根据 for 配置的时间灵活变化。
实现思路与效果
实现效果
废话不多说,先展示一下最终的告警效果。
grafana 渲染指标图:
方案对比分析
针对这个需求,目前主流的实现方案主要有以下两种。
方案 1:开发程序,查询 prometheus 指标数据,然后使用图表库渲染出图片,以 python 为例,可以使用 Matplotlib 或 Pandas 实现 。
优点:实现起来较为灵活,可以很好的满足各种功能需求。
缺点:图表样式单一,如果需要修改图表样式需要变更代码。
方案 2:使用 grafana-image-renderer插件渲染图片。
优点:图表样式美观,图表配置灵活方便。
缺点:需要指定 dashboard 和 panel 的 id 才能渲染,也就意味着每个告警规则都需要创建一个图表,工作量巨大。
实现思路
既然两种方案都各有优缺点,能否将两种方案的优点整合起来,既要配置实现起来灵活方便又要图表样式美观且易于配置呢?毕竟只有小孩子才做选择题,大人都是全都要。
具体实现思路如下:
- 收到 alertmanager 推送告警数据
- 根据告警数据,提取告警标题和异常的 instance 。
- 根据告警标题请求 prometheus 的 rules 接口 ,获取 for 和表达式配置。接口文档:https://prometheus.ac.cn/docs/prometheus/latest/querying/api/#rules
- 正则处理表达式,去除条件判断值,并新增 instance 标签选择,例如原始的告警规则表达式为,处理后的查询语句为
- 请求 grafana 的 api 接口,获取 grafana dashboard 配置。接口文档:https://grafana.com/docs/grafana/latest/developers/http_api/dashboard/
- 根据获取到的 dashboard 配置数据,替换表达式,时间范围, 告警标题。
- 再次 post 请求 grafana 的 api 接口,更新 dashboard 配置。
- 调用grafana-image-renderer插件,传入 dashboard 和 panels 参数,渲染图片并下载到本地。
- 将本地图片推送至企业微信、钉钉或 teams(也可上传至公有云对象存储,直接返回图片公网 url 地址)。
流程图如下:
核心代码与配置
grafana 配置
- 创建 api key 用于调用 api 接口请求 grafana。
- 创建 dashboard 和图表,并配置图表样式。
表达式、标题、时间范围可以随便写,主要是为了调试图表配置。
- 安装grafana-image-renderer 插件。
参考文档:https://grafana.com/grafana/plugins/grafana-image-renderer/。安装完成后点击渲染图像验证是否可以正常渲染。
webhook 程序
核心代码如下,需要注意的是为了确保可以正常获取图片和推送消息,建议添加重试和异常处理逻辑,增加程序可靠性。
import base64
import hashlib
import json
import re
import time
import os
import httpx
from log import logger
from config import vx_conf, grafana_conf, alertmanager_conf, prometheus_conf, aliyun_conf
from oss2 import Auth, Bucket, exceptionsclass VxRobot:"""企业微信推送"""def __init__(self, team, max_retries=3, backoff_factor=2):self.url = vx_conf[team]self.headers = {'Content-Type': 'application/json'}self.params = {}self.max_retries = max_retriesself.backoff_factor = backoff_factordef __send_data(self):"""推送企业微信数据:return:"""retries = 0with httpx.Client(verify=False) as client:while retries < self.max_retries:try:# 发送 POST 请求response = client.post(self.url, headers=self.headers, json=self.params, timeout=10)response.raise_for_status() # 如果状态码不是 2xx,会抛出异常if response.json()['errcode'] == 0:logger.info("企业微信告警推送完成")returnelse:logger.error(f"企业微信告警推送内容有误: {response.json()['errmsg']}")breakexcept (httpx.RequestError, httpx.HTTPStatusError) as exc:retries += 1logger.error(f"企业微信告警推送失败 (尝试 {retries}/{self.max_retries}): {exc}")if retries < self.max_retries:sleep_time = self.backoff_factor * retrieslogger.info(f"等待 {sleep_time} 秒后重试...")time.sleep(sleep_time) # 实现退避策略else:logger.error("达到最大重试次数,企业微信告警推送失败")raise Exception(f"无法使用企业微信告警推送: {exc}")def send_img(self, image_path):"""推送图片信息:param image_path:图片路径:return: None"""try:# 读取图片文件内容with open(image_path, "rb") as f:image_data = f.read()# 计算图片内容的 MD5 值md5_hash = hashlib.md5(image_data).hexdigest()# 生成图片内容的 Base64 编码base64_encoded = base64.b64encode(image_data).decode("utf-8")except FileNotFoundError:raise FileNotFoundError(f"图片文件 {image_path} 未找到")except Exception as e:raise RuntimeError(f"处理图片时发生错误: {str(e)}")params = {"msgtype": "image","image": {"base64": "DATA","md5": "MD5"}}# 数据替换params["image"]["base64"] = base64_encodedparams["image"]["md5"] = md5_hashlogger.info("推送企业微信图片内容,图片地址为%s" % image_path)self.params = paramsself.__send_data()def get_panel(alert_name, instance):"""获取指标图表信息:return:"""prometheus = PrometheusTools()config = prometheus.get_alert_config(alert_name)config['instance'] = instanceconfig['time_range'] = 'now-' + str(2 * config['duration']) + 's'if 2 * config['duration'] > 60:config['alert_name'] = config['alert_name'] + '(最近' + str(round(2 * config['duration'] / 60)) + '分钟)'else:config['alert_name'] = config['alert_name'] + '(最近' + str(round(2 * config['duration'])) + '秒)'grafana = GrafanaTools()dashboard_data = grafana.fetch_dashboard()dashboard_conf = grafana.update_panel_query(dashboard_data, config)grafana.update_dashboard(dashboard_conf)local_img_path = grafana.render_image(config['alert_name'])return local_img_path# aliyun_oss = AliyunTools()# oss_path = local_img_path.replace("img/", "alert_img/")# aliyun_oss.upload_image(local_img_path, oss_path)class PrometheusTools:def __init__(self, max_retries=3, backoff_factor=2):"""初始化 Prometheus 客户端"""self.url = prometheus_conf['url']self.username = prometheus_conf['username']self.password = prometheus_conf['password']self.max_retries = max_retriesself.backoff_factor = backoff_factor# 生成认证头auth_str = f"{self.username}:{self.password}"auth_bytes = base64.b64encode(auth_str.encode("utf-8")).decode("utf-8")self.headers = {"Authorization": f"Basic {auth_bytes}"}def get_alert_config(self, alert_name):"""获取告警配置:param alert_name: 告警名称:return: 告警配置"""url = f"{self.url}/api/v1/rules"params = {"rule_name[]": alert_name}retries = 0with httpx.Client(verify=False) as client:while retries < self.max_retries:try:# 发起请求response = client.get(url, params=params, headers=self.headers, timeout=10)response.raise_for_status() # 如果状态码不是 2xx,会抛出异常logger.info("成功获取 Prometheus 告警数据")logger.debug(json.dumps(response.json()))# 提取关键数据config = {'alert_name': response.json()['data']['groups'][0]['rules'][0]['name'],'query': response.json()['data']['groups'][0]['rules'][0]['query'],'duration': response.json()['data']['groups'][0]['rules'][0]['duration']}logger.debug(config)return configexcept (httpx.RequestError, httpx.HTTPStatusError) as exc:logger.error(f"Attempt {retries + 1} failed: {exc}")retries += 1if retries < self.max_retries:time.sleep(self.backoff_factor * retries) # 退避策略else:raise Exception(f"Failed to fetch alert config after {self.max_retries} retries.")class GrafanaTools:"""grafana渲染图表工具"""def __init__(self, max_retries=3, backoff_factor=2):self.grafana_url = grafana_conf['url']self.api_key = grafana_conf['api_key']self.dashboard_uid = grafana_conf['dashboard_id']self.panel_id = grafana_conf['panel_id']self.headers = {"Authorization": f"Bearer {self.api_key}","Content-Type": "application/json",}self.max_retries = max_retriesself.backoff_factor = backoff_factordef fetch_dashboard(self):"""获取仪表盘的完整配置:return: 仪表盘的完整配置"""url = f"{self.grafana_url}/api/dashboards/uid/{self.dashboard_uid}"logger.info("开始获取dashboard配置, url: %s" % url)retries = 0with httpx.Client(verify=False) as client:while retries < self.max_retries:try:# 发送请求response = client.get(url, headers=self.headers, timeout=10)response.raise_for_status() # 如果状态码不是 2xx,会抛出异常logger.info("成功获取dashboard配置")logger.debug(json.dumps(response.json(), indent=2)) # 打印详细配置return response.json()except (httpx.RequestError, httpx.HTTPStatusError) as exc:retries += 1logger.error(f"请求失败 (尝试 {retries}/{self.max_retries}): {exc}")if retries < self.max_retries:sleep_time = self.backoff_factor * retrieslogger.info(f"等待 {sleep_time} 秒后重试...")time.sleep(sleep_time) # 实现退避策略else:logger.error("达到最大重试次数,获取dashboard配置失败")raise Exception(f"无法获取仪表盘配置: {exc}")def update_panel_query(self, dashboard_data, config):"""修改grafana dashboard配置内容:param dashboard_data: dashboard配置:param config: 告警配置:return: 新的grafana dashboard配置"""# logger.error(config)for panel in dashboard_data["dashboard"]["panels"]:if panel["id"] == self.panel_id:# 修改面板查询,动态添加 instance 变量panel["title"] = config['alert_name']instance = config['instance']query = config['query']# 去除规则条件query_prom = re.sub(r'\s*(==|!=|>=|<=|>|<)\s*([0-9]+(?:\.[0-9]+)?)$', '', query)# 正则表达式,匹配 Prometheus 查询语句中的指标名称pattern = r'([a-z_][a-z0-9_]*)(?=\s*(\{|\[|$))'# 查找匹配的指标名称match = re.search(pattern, query_prom)# 如果找到了匹配项if match and instance != "none":# 获取指标名称的结束位置end_pos = match.end(1) # end(1) 获取捕获组1(指标名称)的结束位置# 判断原始指标是否存在标签选择if '{' in query_prom:query_cleaned = query_prom[:end_pos + 1] + 'instance="' + instance + '",' + query_prom[end_pos + 1:]else:query_cleaned = query_prom[:end_pos] + '{instance="' + instance + '"}' + query_prom[end_pos:]else:logger.error("指标匹配失败,使用默认指标%s" % query_prom)query_cleaned = query_prom# logger.error(query_cleaned)panel["targets"][0]["expr"] = query_cleanedbreaklogger.info("dashboard配置替换完成")logger.debug(json.dumps(dashboard_data))return dashboard_datadef update_dashboard(self, dashboard):"""更新仪表盘配置到 Grafana:param dashboard: 要更新的仪表盘配置 (dict):return: None"""url = f"{self.grafana_url}/api/dashboards/db"logger.info("开始更新Dashboard配置, url: %s" % url)retries = 0with httpx.Client(verify=False) as client:while retries < self.max_retries:try:# 发送 POST 请求response = client.post(url, headers=self.headers, json=dashboard, timeout=10)response.raise_for_status() # 如果状态码不是 2xx,会抛出异常logger.info("Dashboard更新完成")returnexcept (httpx.RequestError, httpx.HTTPStatusError) as exc:retries += 1logger.error(f"更新Dashboard失败 (尝试 {retries}/{self.max_retries}): {exc}")if retries < self.max_retries:sleep_time = self.backoff_factor * retrieslogger.info(f"等待 {sleep_time} 秒后重试...")time.sleep(sleep_time) # 实现退避策略else:logger.error("达到最大重试次数,Dashboard更新失败")raise Exception(f"无法更新仪表盘配置: {exc}")def render_image(self, alert_name):"""调用 Grafana 渲染图片 API:param alert_name: 告警标题:return: 图片保存路径"""output_path = "img/" + alert_name + ".png"render_url = (f"{self.grafana_url}/render/d-solo/{self.dashboard_uid}"f"?orgId=1&panelId={self.panel_id}&width=2000&height=1000")logger.info("开始渲染图片, render_url: %s" % render_url)retries = 0with httpx.Client(verify=False) as client:while retries < self.max_retries:try:# 发起请求response = client.get(render_url, headers=self.headers, timeout=20)response.raise_for_status() # 如果状态码不是 2xx,会抛出异常# 确保输出目录存在os.makedirs(os.path.dirname(output_path), exist_ok=True)with open(output_path, "wb") as f:f.write(response.content)logger.info(f"告警图片下载至 {output_path}")return output_pathexcept (httpx.RequestError, httpx.HTTPStatusError) as exc:retries += 1logger.error(f"渲染图片失败 (尝试 {retries}/{self.max_retries}): {exc}")if retries < self.max_retries:sleep_time = self.backoff_factor * retrieslogger.info(f"等待 {sleep_time} 秒后重试...")time.sleep(sleep_time) # 实现退避策略else:logger.error("达到最大重试次数,渲染图片失败")raise Exception(f"无法渲染图片: {exc}")class AliyunTools:def __init__(self, max_retries=3, backoff_factor=2):self.auth = Auth(aliyun_conf['access_key_id'], aliyun_conf['access_key_secret'])self.bucket = Bucket(self.auth, aliyun_conf['endpoint'], aliyun_conf['bucket_name'])self.bucket_name = aliyun_conf['bucket_name']self.endpoint = aliyun_conf['endpoint']self.max_retries = max_retriesself.backoff_factor = backoff_factordef upload_image(self, local_image_path, oss_object_name):"""上传图片到阿里云 OSS:param local_image_path: 本地图片路径:param oss_object_name: OSS 中的对象名 (路径/文件名):return: 图片的公开访问地址"""if not os.path.exists(local_image_path):raise FileNotFoundError(f"本地文件 {local_image_path} 不存在")retries = 0while retries < self.max_retries:try:logger.info(f"开始上传图片 {local_image_path} 到 OSS,目标对象名: {oss_object_name}")# 上传图片到 OSSself.bucket.put_object_from_file(oss_object_name, local_image_path)# 获取图片访问地址image_url = self.bucket.sign_url('GET', oss_object_name, 60 * 60 * 24)logger.info(f"图片成功上传至 OSS,访问地址: {image_url}")return image_urlexcept (exceptions.RequestError, exceptions.ServerError) as exc:retries += 1logger.error(f"上传图片到 OSS 失败 (尝试 {retries}/{self.max_retries}): {exc}")if retries < self.max_retries:sleep_time = self.backoff_factor * retrieslogger.info(f"等待 {sleep_time} 秒后重试...")time.sleep(sleep_time)else:logger.error("达到最大重试次数,上传图片失败")raise Exception(f"无法上传图片到 OSS: {exc}")
查看更多
崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn