一、Xpack-Watchs基本简介:
Elasticsearch 提供了数据的存储及快速的搜索,而其中的 Watcher 功能可以实时地根据一些条件来发送警报。与 Elasticsearch 的使用一样,可以通过 Resfulapi 调用创建、管理、更新预警任务。
- 查看
watcher
插件是否正常运行,返回含有 "watcher_state": "started" 则表示正常运行
GET /_cluster/health
1. 首先看一个watcher的例子:
## 创建一个watcher,比如定义一个trigger 每隔10s查一下input里的数据
PUT _xpack/watcher/watch/school_watcher
{"trigger": {"schedule": {"interval": "10s"}},## 查看任务信息"input": {"search": {"request": {## 监控具体索引"indices": ["school*"],## body里面具体些搜索语句"body": {"size": 0,"query": {"match": {## 比如索引里面name 有 hello 则进行报警"name": "hello"}}}}}},## 对于上面的查询结果进行比较:"condition": {## compare进行比较"compare": {## 上面的query查询的结果会放入到ctx.payload中:## 比如获取 ctx.payload.hits.total ctx.payload._shards.total 等等"ctx.payload.hits.total": {"gt": 0}}},## transform作用:重新查询出文档内容赋值给ctx.payload"transform": {"search": {"request": {"indices": ["school*"],"body": {"size": 10,"query": {"match": {"name": "hello"}}}}}},## 根据上面的查询、比较结果,执行actions里面定义的动作(定义多种报警类型)"actions": {## 报警名字"log_hello": {## 防止报警风暴: 设置阈值 15m内曾经报警过, 则不报警"throttle_period": "15m",## 报警方式:logging、mail、http等"logging": {## 报警具体内容:使用 {{ 查询参数 }} 进行赋值:"text": "Found {{ctx.payload.hits.total}} hello in the school"}}}
}
2. search搜索示例:
比如我们进行search搜索school里面name=zhangsan的数据:
## payload取值规范:比如我们进行search搜索school:
GET school/_search
{"query": {"match": {"name": "zhangsan"}}
}
查询结果如下:
{"took": 14,"timed_out": false,"_shards": {"total": 2,"successful": 2,"skipped": 0,"failed": 0},"hits": {"total": 1,"max_score": 1.5404451,"hits": [{"_index": "school","_type": "student","_id": "1","_score": 1.5404451,"_source": {"name": "zhangsan","age": 25,"course": "elasticsearch","study_date": "2018-06-15T20:30:50","mark": "today is a good day"}}]}
}
如果想使用查询结果集中的数据,即从payload中取值使用,就可以使用如下方式(表示查询方式):
## 表示查询:对ctx.payload结果集使用表示查询方式:
{{#ctx.payload.hits.hits}} {{_source.name}} {{_source.course}} {{/ctx.payload.hits.hits}}
3. 表示查询方式
比如我们进行search搜索school并采用聚合的方式来查询terms:course数据:
GET school/_search
{"size": 0, "aggs": {"myterms": {"terms": {"field": "course","size": 10}}}
}
查询结果:
{"took": 11,"timed_out": false,"_shards": {"total": 2,"successful": 2,"skipped": 0,"failed": 0},"hits": {"total": 10,"max_score": 0,"hits": []},"aggregations": {"myterms": {"doc_count_error_upper_bound": 0,"sum_other_doc_count": 0,"buckets": [{"key": "elasticsearch","doc_count": 7},{"key": "good","doc_count": 1},{"key": "spring","doc_count": 1},{"key": "spring elasticsearch","doc_count": 1}]}}
}
从payload中取值使用:现在想取得上面的hits.hits里面的数据内容,就可以使用如下方式,即表示查询方式:
## 表示查询:ctx.payload结果集:
{{#ctx.payload.aggregations.aggsname.buckets}} {{key}} {{doc_count}} {{/ctx.payload.aggregations.aggsname.buckets}}## 针对这里内容就是:
{{#ctx.payload.aggregations.myterms.buckets}} {{key}} {{doc_count}} {{/ctx.payload.aggregations.myterms.buckets}}
二、watcher API使用:
API使用方法:
#查看一个watcher
GET _xpack/watcher/watch/school_watcher#删除一个watcher
DELETE _xpack/watcher/watch/school_watcher#执行watcher
POST _xpack/watcher/watch/school_watcher/_execute#查看执行结果
GET /.watcher-history*/_search?pretty
# 查看执行结果的入参
{"sort" : [{ "result.execution_time" : "desc" }],"query": {"match": {"watch_id": "school_watcher"}}
}
三、配置 Watcher
配置一个 Watcher,我们必须配置如下的几个:
PUT _watcher/watch/log_error_watch
{"trigger": {},"input": {},"condition": {},"transform" {},"actions": {}
}
即一个 Watcher 由五个部分组成:
- trigger 定义多长时间 watcher 运行一次
- input 获取评估的数据
- condition 评估你加载到 watch 中的数据并确定是否需要执行actions操作,比如已经将日志 error 加载到 watch 中,你可以定义一个条件来检查是否发现了某个特定的 error
- actions 操作,watch 的操作定义了当 watch 条件评估为真时要做什么
下面分别看一下各个部分的语法:
1. trigger
triggers的几种类型:hourly、daily、weekly、monthly、yearly、cron、interval
#hourly:每小时执行#例如:12:00, 12:15, 12:30, 12:45, 1:00, 1:15{"trigger" : {"schedule" : {"hourly" : { "minute" : [ 0, 15, 30, 45 ] }}}}#daily:每天执行#每天00:00, 12:00, and 17:00{"trigger" : {"schedule" : {"daily" : { "at" : [ "midnight", "noon", "17:00" ] }}}}#每天00:00, 00:30, 12:00, 12:30, 17:00 and 17:30{"trigger" : {"schedule" : {"daily" : {"at" {"hour" : [ 0, 12, 17 ],"minute" : [0, 30]}}}}}#weekly:指定星期几#周二12:00,周五17:00{"trigger" : {"schedule" : {"weekly" : [{ "on" : "tuesday", "at" : "noon" },{ "on" : "friday", "at" : "17:00" }]}}}#周二、周五的17:00{"trigger" : {"schedule" : {"weekly" : {"on" : [ "tuesday", "friday" ],"at" : [ "noon", "17:00" ]}}}}#monthly:指定每月哪天执行#每月10号中午、每月20号17:00{"trigger" : {"schedule" : {"monthly" : [{ "on" : 10, "at" : "noon" },{ "on" : 20, "at" : "17:00" }]}}}#每月10号、20号的00:00,12:00{"trigger" : {"schedule" : {"monthly" : {"on" : [ 10, 20 ],"at" : [ "midnight", "noon" ]}}}}#yearly-指定月、日、时#每年的1月10日12:00,每年的7月20日17:00{"trigger" : {"schedule" : {"yearly" : [{ "in" : "january", "on" : 10, "at" : "noon" },{ "in" : "july", "on" : 20, "at" : "17:00" }]}}}#每年1月10日,1月20日,12月10日,12月20日的12:00,00:00{"trigger" : {"schedule" : {"yearly" : {"in" : [ "jan", "dec" ],"on" : [ 10, 20 ],"at" : [ "midnight", "noon" ]}}}}
而cron表达式语法如下:
#cron-表达式<seconds> <minutes> <hours> <day_of_month> <month> <day_of_week> [year]0 5 9 * * ?0 0-5 9 * * ?0 0/15 9 * * ?
interval 表示周期的,间隔单位:s:秒、m:分钟、h:小时、d:天、w:星期
2. input
Input的几种类型: Simple、Search、HTTP、Chain
#Simple Input-静态数据#每天12点触发{"trigger" : {"schedule" : {"daily" : { "at" : "noon" }}},"input" : {"simple" : {"name" : "John"}},"actions" : {"reminder_email" : {"email" : {"to" : "to@host.domain","subject" : "Reminder","body" : "Dear {{ctx.payload.name}}, by the time you read these lines, I'll be gone"}}}}#Search-搜索{"input" : {"search" : {"request" : {"indices" : [ "logs" ],"body" : {"query" : { "match_all" : {} }}}}},"condition" : {"compare" : { "ctx.payload.hits.total" : { "gt" : 5 }}}...}
HTTP包含的参数有:#request.host
- request.port
- request.path
- request.headers
- request.params
- request.url:request.scheme, request.host, request.port and request.params
- request.method:head、get、post、put、delete
- request.auth
- request.body
- request.proxy.host
- request.proxy.port
- request.connection_timeout
- request.read_timeout
- response_content_type:json, yaml and text
- extract
- get
语法格式示例如下:
# Http-请求{"input" : {"http" : {"request" : {"host" : "example.com","port" : 9200,"path" : "/idx/_search"}}}}#含有body体内容{"input" : {"http" : {"request" : {"host" : "host.domain","port" : 9200,"path" : "/idx/_search","body" : "{\"query\" : { \"match\" : { \"category\" : \"event\"}}}"}}}}#含有参数的{"input" : {"http" : {"request" : {"host" : "host.domain","port" : "9200","path" : "/_cluster/stats","params" : {"human" : "true" }}}}}#含有用户密码{"input" : {"http" : {"request" : {"host" : "host.domain","port" : "9200","path" : "/myservice","auth" : {"basic" : {"username" : "user","password" : "pass"}}}}}}#直接请求url的{"input" : {"http" : {"request" : {"url" : "http://api.openweathermap.org/data/2.5/weather","params" : {"lat" : "52.374031","lon" : "4.88969","appid" : "<your openweathermap appid>"}}}}}
Chain-input-可以同时设置多个input,串行
#Chain-input-同时设置多个input,串行{"input" : {"chain" : {"inputs" : [ ## 第一步input{"first" : {"simple" : { "path" : "/_search" }}},## 第二步input (可以去使用第一步input返回的结果){"second" : {"http" : {"request" : {"host" : "localhost","port" : 9200,"path" : "{{ctx.payload.first.path}}" }}}}]}}}
3. condition条件设置
如果condition条件返回true 则会触发action,如果返回 false 则就停止,不执行action
#--------------------条件设置--------------------
#Always Condition
"condition" : {"always" : {}
}
#Never Condition"condition" : {"never" : {}
}#Compare Condition (进行和查询的结果进行比较语法如下:)
# eq:、not_eq、gt、gte、lt、lte
## 比如错误条数超过了5条进行报警、响应长时间超过多少毫秒进行报警等
{"condition" : {"compare" : {"ctx.payload.hits.total" : { "gte" : 5 }}
}#<{expression}> 正则表达式 使用 <> 中写正则表达式: 比如 当前时间 - 5分钟 进行比较,如下:
{"condition" : {"compare" : {"ctx.execution_time" : {"gte" : "<{now-5m}>"}}
}#{{参数的path}} 比较,这个就是最开始的示例里面的获取参数方式,如下:
{"condition" : {"compare" : {"ctx.payload.aggregations.status.buckets.error.doc_count" : {"not_eq" : "{{ctx.payload.aggregations.handled.buckets.true.doc_count}}"}}
}#Array Compare Condition 数组比较: 比如当前的doc_count大于25 就进行报警
{"condition": {"array_compare": {"ctx.payload.aggregations.top_tweeters.buckets" : { "path": "doc_count" ,"gte": { "value": 25, }}}}
}#Script Condition 脚本方式
{"input" : {"search" : {"indices" : "log-events","body" : {"size" : 0,"query" : { "match" : { "status" : "error" } }}}},"condition" : {"script" : {## 当前返回的条数是否大于阈值,进行报警"inline" : "return ctx.payload.hits.total > threshold","params" : {"threshold" : 5}}}
}
4. Actions 触发器
#--------------------Actions--------------------
#Email Action--发送邮件 #如果使用发送邮件的报警,则需要在elasticsearch.yml中配置发送邮件服务的信息
xpack.notification.email:default_account: gmail_accountaccount:gmail_account:profile: gmailsmtp:auth: truestarttls.enable: truehost: smtp.gmail.comport: 587user: <username>password: <password>outlook_account:profile: outlooksmtp:auth: truestarttls.enable: truehost: smtp-mail.outlook.comport: 587user: <username>password: <password>:exchange_account:profile: outlookemail_defaults:from: <email address of service account> smtp:auth: truestarttls.enable: truehost: <your exchange server>port: 587user: <email address of service account> password: <password>#发送邮件
"actions" : {## actions名字"send_email" : { "email" : { "to" : "'Recipient Name <recipient@example.com>'", #"to" : ['Personal Name <user1@host.domain>', 'user2@host.domain'], "subject" : "Watcher Notification", "body" : "{{ctx.payload.hits.total}} error logs found" }}
}#发送含有附件信息的邮件
"actions" : {"email_admin" : {"email": {"to": "'John Doe <john.doe@example.com>'","attachments" : {## 附件方式"my_image.png" : { "http" : { "content_type" : "image.png","request" : {"url": "http://example.org/foo/my-image.png" }}},## xpack reporting插件生成方式:"dashboard.pdf" : {"reporting" : {"url": "http://example.org:5601/api/reporting/generate/dashboard/Error-Monitoring"}},## 自定义附件"data.yml" : {"data" : {"format" : "yaml" }}}}}
}#Webhook Action,发送一个http请求
#发送github的issue
"actions" : {"create_github_issue" : {## 因为发邮件到达率不是特别高,所以可以使用外部的接口调用方式## 比如这里调用url为外部的手机短信接口进行发送 "webhook" : {## 请求方式"method" : "POST",## 外部请求地址"url" : "https://api.github.com/repos/<owner>/<repo>/issues",## 请求报文"body" : "{\"title\": \"Found errors in 'contact.html'\",\"body\": \"Found {{ctx.payload.hits.total}} errors in the last 5 minutes\",\"assignee\": \"web-admin\",\"labels\": [ \"bug\", \"sev2\" ]}",## 用户名密码"auth" : {"basic" : {"username" : "<username>", "password" : "<password>"}}}}
}#带有url参数的请求
"actions" : {"my_webhook" : {"webhook" : {"method" : "POST","host" : "mylisteningserver","port" : 9200,"path": ":/alert","params" : {"watch_id" : "{{ctx.watch_id}}" }}}
}#自定义header
"actions" : {"my_webhook" : {"webhook" : {"method" : "POST","host" : "mylisteningserver","port" : 9200,"path": ":/alert/{{ctx.watch_id}}","headers" : {"Content-Type" : "application/yaml" },"body" : "count: {{ctx.payload.hits.total}}"}}
}#Index Action--创建索引文档
"actions" : {"index_payload" : { "index" : {"index" : "my-index", "doc_type" : "my-type", "doc_id": "my-id" }}
}#Logging Action--记录日志
#level:error, warn, info, debug and trace
## 日志种类:
#category:xpack.watcher.actions.logging
"actions" : {"log" : { "transform" : { ... }, ## 日志报警"logging" : {"text" : "executed at {{ctx.execution_time}}",## 日志级别"level": "info"}}
}
除了上面几种actions类型,还有:
- Jira Action: 与jira集成
- HipChat Action
- Slack Action
- PagerDuty Action
5. 示例:
使用接口的形式创建一个watcher, 进行模拟
(1)执行 Watcher 脚本
## 查询school
GET school/student/_search
{"query": {"match_all":{}}
}## 创建school_watcher
PUT _xpack/watcher/watch/school_watcher
{"trigger": {"schedule": {"interval": "10s"}},"input": {"search": {"request": {"indices": ["school*"],"body": {"size": 0,"query": {"match": {"name": "hello"}}}}}},"condition": {"compare": {"ctx.payload.hits.total": {"gt": 0}}},"transform": {"search": {"request": {"indices": ["school*"],"body": {"size": 10,"query": {"match": {"name": "hello"}}}}}},"actions": {"log_hello": {"throttle_period": "15m","logging": {"text": "Found {{ctx.payload.hits.total}} hello in the school"}}}
}## 查看watcher执行结果
GET /.watcher-history*/_search?pretty
{"sort" : [{ "result.execution_time" : "desc" }],"query": {"match": {"watch_id": "school_watcher"}}
}## 进行数据测试:
POST /school/student
{"name": "hello","age": 18,"course": "elasticsearch","study_date": "2018-08-20T20:30:50","mark": "take care day day"
}
(2)可视化操作 watcher
可以在 Kibana 进行可视化操作,可以启用、禁用、添加修改、删除watcher;