Xpack-Watchers基本语法与使用

news/2025/1/13 10:53:57/

一、Xpack-Watchs基本简介:

Elasticsearch 提供了数据的存储及快速的搜索,而其中的 Watcher 功能可以实时地根据一些条件来发送警报。与 Elasticsearch 的使用一样,可以通过 Resfulapi 调用创建、管理、更新预警任务。

  • 查看 watcher 插件是否正常运行,返回含有 "watcher_state": "started" 则表示正常运行
GET /_cluster/health

1. 首先看一个watcher的例子:

## 创建一个watcher,比如定义一个trigger 每隔10s查一下input里的数据
PUT _xpack/watcher/watch/school_watcher
{"trigger": {"schedule": {"interval": "10s"}},## 查看任务信息"input": {"search": {"request": {## 监控具体索引"indices": ["school*"],## body里面具体些搜索语句"body": {"size": 0,"query": {"match": {## 比如索引里面name 有 hello 则进行报警"name": "hello"}}}}}},## 对于上面的查询结果进行比较:"condition": {## compare进行比较"compare": {## 上面的query查询的结果会放入到ctx.payload中:## 比如获取 ctx.payload.hits.total  ctx.payload._shards.total 等等"ctx.payload.hits.total": {"gt": 0}}},## transform作用:重新查询出文档内容赋值给ctx.payload"transform": {"search": {"request": {"indices": ["school*"],"body": {"size": 10,"query": {"match": {"name": "hello"}}}}}},## 根据上面的查询、比较结果,执行actions里面定义的动作(定义多种报警类型)"actions": {## 报警名字"log_hello": {## 防止报警风暴: 设置阈值 15m内曾经报警过, 则不报警"throttle_period": "15m",## 报警方式:logging、mail、http等"logging": {## 报警具体内容:使用 {{ 查询参数 }} 进行赋值:"text": "Found {{ctx.payload.hits.total}} hello in the school"}}}
}

2. search搜索示例:

比如我们进行search搜索school里面name=zhangsan的数据:

## payload取值规范:比如我们进行search搜索school:
GET school/_search
{"query": {"match": {"name": "zhangsan"}}
}

查询结果如下:

{"took": 14,"timed_out": false,"_shards": {"total": 2,"successful": 2,"skipped": 0,"failed": 0},"hits": {"total": 1,"max_score": 1.5404451,"hits": [{"_index": "school","_type": "student","_id": "1","_score": 1.5404451,"_source": {"name": "zhangsan","age": 25,"course": "elasticsearch","study_date": "2018-06-15T20:30:50","mark": "today is a good day"}}]}
}

如果想使用查询结果集中的数据,即从payload中取值使用,就可以使用如下方式(表示查询方式):

## 表示查询:对ctx.payload结果集使用表示查询方式:
{{#ctx.payload.hits.hits}} {{_source.name}} {{_source.course}} {{/ctx.payload.hits.hits}}

3. 表示查询方式

比如我们进行search搜索school并采用聚合的方式来查询terms:course数据:

GET school/_search
{"size": 0, "aggs": {"myterms": {"terms": {"field": "course","size": 10}}}
}

查询结果:

{"took": 11,"timed_out": false,"_shards": {"total": 2,"successful": 2,"skipped": 0,"failed": 0},"hits": {"total": 10,"max_score": 0,"hits": []},"aggregations": {"myterms": {"doc_count_error_upper_bound": 0,"sum_other_doc_count": 0,"buckets": [{"key": "elasticsearch","doc_count": 7},{"key": "good","doc_count": 1},{"key": "spring","doc_count": 1},{"key": "spring elasticsearch","doc_count": 1}]}}
}

从payload中取值使用:现在想取得上面的hits.hits里面的数据内容,就可以使用如下方式,即表示查询方式:

## 表示查询:ctx.payload结果集:
{{#ctx.payload.aggregations.aggsname.buckets}} {{key}} {{doc_count}} {{/ctx.payload.aggregations.aggsname.buckets}}## 针对这里内容就是:
{{#ctx.payload.aggregations.myterms.buckets}} {{key}} {{doc_count}} {{/ctx.payload.aggregations.myterms.buckets}}

二、watcher API使用:

API使用方法:

#查看一个watcher
GET _xpack/watcher/watch/school_watcher#删除一个watcher
DELETE _xpack/watcher/watch/school_watcher#执行watcher
POST _xpack/watcher/watch/school_watcher/_execute#查看执行结果
GET /.watcher-history*/_search?pretty
# 查看执行结果的入参
{"sort" : [{ "result.execution_time" : "desc" }],"query": {"match": {"watch_id": "school_watcher"}}
}

三、配置 Watcher

配置一个 Watcher,我们必须配置如下的几个:

PUT _watcher/watch/log_error_watch
{"trigger": {},"input": {},"condition": {},"transform" {},"actions": {}
}

即一个 Watcher 由五个部分组成:

  • trigger 定义多长时间 watcher 运行一次
  • input 获取评估的数据
  • condition 评估你加载到 watch 中的数据并确定是否需要执行actions操作,比如已经将日志 error 加载到 watch 中,你可以定义一个条件来检查是否发现了某个特定的 error
  • actions 操作,watch 的操作定义了当 watch 条件评估为真时要做什么

下面分别看一下各个部分的语法:

1. trigger

triggers的几种类型:hourly、daily、weekly、monthly、yearly、cron、interval

#hourly:每小时执行#例如:12:00, 12:15, 12:30, 12:45, 1:00, 1:15{"trigger" : {"schedule" : {"hourly" : { "minute" : [ 0, 15, 30, 45 ] }}}}#daily:每天执行#每天00:00, 12:00, and 17:00{"trigger" : {"schedule" : {"daily" : { "at" : [ "midnight", "noon", "17:00" ] }}}}#每天00:00, 00:30, 12:00, 12:30, 17:00 and 17:30{"trigger" : {"schedule" : {"daily" : {"at" {"hour" : [ 0, 12, 17 ],"minute" : [0, 30]}}}}}#weekly:指定星期几#周二12:00,周五17:00{"trigger" : {"schedule" : {"weekly" : [{ "on" : "tuesday", "at" : "noon" },{ "on" : "friday", "at" : "17:00" }]}}}#周二、周五的17:00{"trigger" : {"schedule" : {"weekly" : {"on" : [ "tuesday", "friday" ],"at" : [ "noon", "17:00" ]}}}}#monthly:指定每月哪天执行#每月10号中午、每月20号17:00{"trigger" : {"schedule" : {"monthly" : [{ "on" : 10, "at" : "noon" },{ "on" : 20, "at" : "17:00" }]}}}#每月10号、20号的00:00,12:00{"trigger" : {"schedule" : {"monthly" : {"on" : [ 10, 20 ],"at" : [ "midnight", "noon" ]}}}}#yearly-指定月、日、时#每年的1月10日12:00,每年的7月20日17:00{"trigger" : {"schedule" : {"yearly" : [{ "in" : "january", "on" : 10, "at" : "noon" },{ "in" : "july", "on" : 20, "at" : "17:00" }]}}}#每年1月10日,1月20日,12月10日,12月20日的12:00,00:00{"trigger" : {"schedule" : {"yearly" : {"in" : [ "jan", "dec" ],"on" : [ 10, 20 ],"at" : [ "midnight", "noon" ]}}}}

而cron表达式语法如下:

   #cron-表达式<seconds> <minutes> <hours> <day_of_month> <month> <day_of_week> [year]0 5 9 * * ?0 0-5 9 * * ?0 0/15 9 * * ?

 interval 表示周期的,间隔单位:s:秒、m:分钟、h:小时、d:天、w:星期

2. input

Input的几种类型:  Simple、Search、HTTP、Chain

   #Simple Input-静态数据#每天12点触发{"trigger" : {"schedule" : {"daily" : { "at" : "noon" }}},"input" : {"simple" : {"name" : "John"}},"actions" : {"reminder_email" : {"email" : {"to" : "to@host.domain","subject" : "Reminder","body" : "Dear {{ctx.payload.name}}, by the time you read these lines, I'll be gone"}}}}#Search-搜索{"input" : {"search" : {"request" : {"indices" : [ "logs" ],"body" : {"query" : { "match_all" : {} }}}}},"condition" : {"compare" : { "ctx.payload.hits.total" : { "gt" : 5 }}}...}

HTTP包含的参数有:#request.host

  • request.port
  • request.path
  • request.headers
  • request.params
  • request.url:request.scheme, request.host, request.port and request.params
  • request.method:head、get、post、put、delete
  • request.auth
  • request.body
  • request.proxy.host
  • request.proxy.port
  • request.connection_timeout
  • request.read_timeout
  • response_content_type:json, yaml and text
  • extract
  • get

语法格式示例如下:

  
# Http-请求{"input" : {"http" : {"request" : {"host" : "example.com","port" : 9200,"path" : "/idx/_search"}}}}#含有body体内容{"input" : {"http" : {"request" : {"host" : "host.domain","port" : 9200,"path" : "/idx/_search","body" :  "{\"query\" :  {  \"match\" : { \"category\" : \"event\"}}}"}}}}#含有参数的{"input" : {"http" : {"request" : {"host" : "host.domain","port" : "9200","path" : "/_cluster/stats","params" : {"human" : "true" }}}}}#含有用户密码{"input" : {"http" : {"request" : {"host" : "host.domain","port" : "9200","path" : "/myservice","auth" : {"basic" : {"username" : "user","password" : "pass"}}}}}}#直接请求url的{"input" : {"http" : {"request" : {"url" : "http://api.openweathermap.org/data/2.5/weather","params" : {"lat" : "52.374031","lon" : "4.88969","appid" : "<your openweathermap appid>"}}}}}

Chain-input-可以同时设置多个input,串行

   
#Chain-input-同时设置多个input,串行{"input" : {"chain" : {"inputs" : [ ## 第一步input{"first" : {"simple" : { "path" : "/_search" }}},## 第二步input (可以去使用第一步input返回的结果){"second" : {"http" : {"request" : {"host" : "localhost","port" : 9200,"path" : "{{ctx.payload.first.path}}" }}}}]}}}

3. condition条件设置

如果condition条件返回true 则会触发action,如果返回 false 则就停止,不执行action

#--------------------条件设置--------------------
#Always Condition
"condition" : {"always" : {}
}
#Never Condition"condition" : {"never" : {}
}#Compare Condition (进行和查询的结果进行比较语法如下:)
# eq:、not_eq、gt、gte、lt、lte
## 比如错误条数超过了5条进行报警、响应长时间超过多少毫秒进行报警等
{"condition" : {"compare" : {"ctx.payload.hits.total" : { "gte" : 5 }}
}#<{expression}> 正则表达式 使用 <> 中写正则表达式: 比如 当前时间 - 5分钟 进行比较,如下:
{"condition" : {"compare" : {"ctx.execution_time" : {"gte" : "<{now-5m}>"}}
}#{{参数的path}} 比较,这个就是最开始的示例里面的获取参数方式,如下:
{"condition" : {"compare" : {"ctx.payload.aggregations.status.buckets.error.doc_count" : {"not_eq" : "{{ctx.payload.aggregations.handled.buckets.true.doc_count}}"}}
}#Array Compare Condition 数组比较: 比如当前的doc_count大于25 就进行报警
{"condition": {"array_compare": {"ctx.payload.aggregations.top_tweeters.buckets" : { "path": "doc_count" ,"gte": { "value": 25, }}}}
}#Script Condition 脚本方式
{"input" : {"search" : {"indices" : "log-events","body" : {"size" : 0,"query" : { "match" : { "status" : "error" } }}}},"condition" : {"script" : {## 当前返回的条数是否大于阈值,进行报警"inline" : "return ctx.payload.hits.total > threshold","params" : {"threshold" : 5}}}
}

4. Actions 触发器

#--------------------Actions--------------------
#Email Action--发送邮件 #如果使用发送邮件的报警,则需要在elasticsearch.yml中配置发送邮件服务的信息
xpack.notification.email:default_account: gmail_accountaccount:gmail_account:profile: gmailsmtp:auth: truestarttls.enable: truehost: smtp.gmail.comport: 587user: <username>password: <password>outlook_account:profile: outlooksmtp:auth: truestarttls.enable: truehost: smtp-mail.outlook.comport: 587user: <username>password: <password>:exchange_account:profile: outlookemail_defaults:from: <email address of service account> smtp:auth: truestarttls.enable: truehost: <your exchange server>port: 587user: <email address of service account> password: <password>#发送邮件
"actions" : {## actions名字"send_email" : { "email" : { "to" : "'Recipient Name <recipient@example.com>'", #"to" : ['Personal Name <user1@host.domain>', 'user2@host.domain'], "subject" : "Watcher Notification", "body" : "{{ctx.payload.hits.total}} error logs found" }}
}#发送含有附件信息的邮件
"actions" : {"email_admin" : {"email": {"to": "'John Doe <john.doe@example.com>'","attachments" : {## 附件方式"my_image.png" : { "http" : { "content_type" : "image.png","request" : {"url": "http://example.org/foo/my-image.png" }}},## xpack reporting插件生成方式:"dashboard.pdf" : {"reporting" : {"url": "http://example.org:5601/api/reporting/generate/dashboard/Error-Monitoring"}},## 自定义附件"data.yml" : {"data" : {"format" : "yaml" }}}}}
}#Webhook Action,发送一个http请求
#发送github的issue
"actions" : {"create_github_issue" : {## 因为发邮件到达率不是特别高,所以可以使用外部的接口调用方式## 比如这里调用url为外部的手机短信接口进行发送 "webhook" : {## 请求方式"method" : "POST",## 外部请求地址"url" : "https://api.github.com/repos/<owner>/<repo>/issues",## 请求报文"body" : "{\"title\": \"Found errors in 'contact.html'\",\"body\": \"Found {{ctx.payload.hits.total}} errors in the last 5 minutes\",\"assignee\": \"web-admin\",\"labels\": [ \"bug\", \"sev2\" ]}",## 用户名密码"auth" : {"basic" : {"username" : "<username>", "password" : "<password>"}}}}
}#带有url参数的请求
"actions" : {"my_webhook" : {"webhook" : {"method" : "POST","host" : "mylisteningserver","port" : 9200,"path": ":/alert","params" : {"watch_id" : "{{ctx.watch_id}}" }}}
}#自定义header
"actions" : {"my_webhook" : {"webhook" : {"method" : "POST","host" : "mylisteningserver","port" : 9200,"path": ":/alert/{{ctx.watch_id}}","headers" : {"Content-Type" : "application/yaml" },"body" : "count: {{ctx.payload.hits.total}}"}}
}#Index Action--创建索引文档
"actions" : {"index_payload" : { "index" : {"index" : "my-index", "doc_type" : "my-type", "doc_id": "my-id" }}
}#Logging Action--记录日志
#level:error, warn, info, debug and trace
## 日志种类:
#category:xpack.watcher.actions.logging
"actions" : {"log" : { "transform" : { ... }, ## 日志报警"logging" : {"text" : "executed at {{ctx.execution_time}}",## 日志级别"level": "info"}}
}

除了上面几种actions类型,还有:

  • Jira Action: 与jira集成
  • HipChat Action
  • Slack Action
  • PagerDuty Action

5. 示例:

使用接口的形式创建一个watcher, 进行模拟

(1)执行 Watcher 脚本

## 查询school
GET school/student/_search
{"query": {"match_all":{}}
}## 创建school_watcher
PUT _xpack/watcher/watch/school_watcher
{"trigger": {"schedule": {"interval": "10s"}},"input": {"search": {"request": {"indices": ["school*"],"body": {"size": 0,"query": {"match": {"name": "hello"}}}}}},"condition": {"compare": {"ctx.payload.hits.total": {"gt": 0}}},"transform": {"search": {"request": {"indices": ["school*"],"body": {"size": 10,"query": {"match": {"name": "hello"}}}}}},"actions": {"log_hello": {"throttle_period": "15m","logging": {"text": "Found {{ctx.payload.hits.total}} hello in the school"}}}
}## 查看watcher执行结果
GET /.watcher-history*/_search?pretty
{"sort" : [{ "result.execution_time" : "desc" }],"query": {"match": {"watch_id": "school_watcher"}}
}## 进行数据测试:
POST /school/student
{"name": "hello","age": 18,"course": "elasticsearch","study_date": "2018-08-20T20:30:50","mark": "take care day day"
}

(2)可视化操作 watcher

可以在 Kibana 进行可视化操作,可以启用、禁用、添加修改、删除watcher;


http://www.ppmy.cn/news/72492.html

相关文章

【FMC202】基于FMC标准的1路CameraLink Full 输入、1路DVI输出 子卡模块

产品概述 FMC202是一款基于FMC接口标准的1路CameraLink Full模式&#xff08;或者2路CameraLink Base模式&#xff09;采集、1路HDMI&#xff08;DVI&#xff09;视频输出的子卡模块&#xff0c;该模块具有2个CameraLink端口&#xff08;SDR&#xff0c;26PIN&#xff09;&…

信息安全-数据安全-字节大数据平台安全与权限治理实践

导读&#xff1a;本次分享题目为字节跳动大数据平台安全与权限治理实践&#xff0c;文章会围绕下面四点展开&#xff1a; 字节大数据安全体系现状和难点 细粒度权限管控和治理 资产保护能力 数据删除能力 分享嘉宾&#xff5c;许从余 火山引擎 数据平台产品经理 编辑整理&#…

C# 匿名方法

C# 匿名方法 我们已经提到过&#xff0c;委托是用于引用与其具有相同标签的方法。换句话说&#xff0c;您可以使用委托对象调用可由委托引用的方法。 匿名方法&#xff08;Anonymous methods&#xff09; 提供了一种传递代码块作为委托参数的技术。匿名方法是没有名称只有主体…

Java NIO和IO的主要区别

当学习了Java NIO和IO的API后&#xff0c;一个问题马上涌入脑海&#xff1a; 我应该何时使用IO&#xff0c;何时使用NIO呢&#xff1f;在本文中&#xff0c;我会尽量清晰地解析Java NIO和IO的差异、它们的使用场景&#xff0c;以及它们如何影响您的代码设计。 下表总结了Java N…

深度解析接口自动化框架封装项目:封装层级,关联调用,极限改进

目录 前言&#xff1a; 一、接口封装与封装层级 二、接口关联和数据准备 三、接口封装极限改进 四、代码示例 五、总结 前言&#xff1a; 接口自动化是软件测试领域中的一个重要环节&#xff0c;它可以自动化执行接口测试用例&#xff0c;快速发现和定位接口问题&#xf…

实验室检验系统源码,集检验业务、质量控制、报告、统计分析、两癌等模块于一体

云 LIS 系统针对区域化 LIS 而设计&#xff0c;依托底层云架构&#xff0c;将传统的 LIS 功能模块进行“云化”。 该系统是集检验业务、科室管理、质量控制、报告、统计分析、两癌等模块于一体的数据检验信息平台。通过计算机联网&#xff0c;实现各类仪器数据结果的实时自动接…

【数据结构 -- C语言】 双向带头循环链表的实现

目录 1、双向带头循环链表的介绍 2、双向带头循环链表的接口 3、接口实现 3.1 开辟结点 3.2 创建返回链表的头结点 3.3 判断链表是否为空 3.4 打印 3.5 双向链表查找 3.6 双向链表在pos的前面进行插入 3.6.1 头插 3.6.2 尾插 3.6.3 更新头插、尾插写法 3.7 双向链…

Bias(偏差),Variance(方差),Error(误差)

目录 一、Bias(偏差) 二、Variance(方差) 三、Error(误差) 一、Bias(偏差) Bias描述的是根据样本拟合出的模型的输出值与真实值之间的差距,就是模型在样本上拟合的好坏。Bias反映了模型本身的精准度。 Bias越小,即模型在样本上拟合的越好。但是若要减小Bias,则…