【大数据学习 | flume】flume Sink Processors与拦截器Interceptor

embedded/2024/11/22 4:28:30/

1. Failover Sink Processor

故障转移处理器可以同时指定多个sink输出,按照优先级高低进行数据的分发,并具有故障转移能力。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=loggera1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,由于第三台节点的优先级高,所以第三台会打印数据到控制台

如果此时第三台flume宕机,则会将数据发送到优先级略低的第二台服务器上

2. Load balancing Sink Processor

负载平衡处理器提供了在多个sink负载平衡流量的能力。支持两种模式:round_robin and random 。round_robin 可以将数据负载均衡到多个sink上,random支持随机分发到不同的sink上。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = randoma1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

第二台和第三台agent编写如下:

a1.sources=r1
a1.sinks=k1
a1.channels=c1a1.sources.r1.type=avro
a1.sources.r1.bind=11.147.251.96
a1.sources.r1.port=55555a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=loggera1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

从后往前分别启动三台agent

flume-ng agent -n a1 -c /usr/local/flume/conf/ -f ./avro.agent -Dflume.root.logger=INFO,console

测试给第一台flume发送数据,第二台和第一台会随机收集数据

还支持轮询分发数据到两个sink中,这里的轮询是的是sink的轮询,不是event的轮询。

需要修改第一台服务器agent

a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-2
a1.sinks.k1.port = 55555a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = round_robina1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c1

3. Multiplexing Channel Selector

多路复用信道选择器,source是通过 event header 来决定传输到哪一个 channel。

比如:一个日志文件(多个系统的日志都在该文件中),根据日志中某个字段值,比如type=1,是系统A日志,sink to hdfs;type=2,是系统B日志,sink to kafka,此时就可以使用Flume多路复用,通过event header 来决定传输到哪个Channel

a1.sources=r1
a1.sinks=k1 k2 
a1.channels=c1  c2a1.sources.r1.type=http
a1.sources.r1.bind=worker-1
a1.sources.r1.port=44444a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.1= c1
a1.sources.r1.selector.mapping.2 = c2
a1.sources.r1.selector.default = c2a1.channels.c1.type=memory
a1.channels.c1.capacity=100000
a1.channels.c1.transactionCapacity=100a1.channels.c2.type=memory
a1.channels.c2.capacity=100000
a1.channels.c2.transactionCapacity=100a1.sinks.k1.type=avro
a1.sinks.k1.hostname = worke-1
a1.sinks.k1.port = 55555a1.sinks.k2.type=avro
a1.sinks.k2.hostname = worke-2
a1.sinks.k2.port = 55555a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

测试:

通过http协议并携带type头信息,测试type=1,type=2,type=3去往哪一台服务器

第二台服务器接收:

4. Interceptor拦截器

拦截器可以将flume收集到的event进行拦截,并使用对应的拦截器,对event进行简单修改,过滤。同时可以配置多个拦截器实现不同的功能,按照配置的先后顺序进行拦截处理。

常见的 Interceptor描述
timestamp Interceptor给event的头信息中添加时间戳
Static Interceptor给event的头信息中添加自定义键值
Host Interceptor给event的头信息中添加主机名或者ip信息
Search and Replace Interceptor拦截信息进行匹配和替换
Regex Filtering Interceptor拦截信息进行过滤

5. Timestamp Interceptor

此拦截器将插入事件标头,即它处理事件的时间(毫秒)到event中。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.142.160
a1.sources.r1.port = 22222
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

6. Host Interceptor

此拦截器器插入主机的主机名或IP地址。插入带有key为host标头,值是主机的主机名称或IP地址。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.142.160
a1.sources.r1.port = 22222
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = host#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

7. Static Interceptor

静态拦截器允许用户将带有静态值的静态标头附加到所有事件。

当前实现不允许同时指定多个标头。相反,用户可以使用多个静态拦截器,每个拦截器定义一个静态标头。

# Static Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = name
a1.sources.r1.interceptors.i1.value = zhangsan#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

8. Search and Replace Interceptor

这个拦截器基于Java正则表达式提供了简单的基于字符串的搜索和替换功能

# Search and Replace Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type = search_replace
a1.sources.r1.interceptors.i1.searchPattern = [a-z]
a1.sources.r1.interceptors.i1.replaceString =*#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

9. Regex Filtering Interceptor

该拦截器通过将event解释为文本并将文本与配置的正则表达式匹配来选择性地过滤事件。提供的正则表达式可用于包含事件或排除事件。

# Regex Filtering Interceptor
#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_filter
a1.sources.r1.interceptors.i1.regex=^jp.*
a1.sources.r1.interceptors.i1.excludeEvents=true#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

10. Regex Extractor Interceptor

此拦截器使用指定的正则表达式提取正则表达式匹配组,并将匹配组作为标头附加到事件上。

#给agent组件起名
a1.sources=r1
a1.sinks=k1
a1.channels=c1#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=worker-1
a1.sources.r1.port=55555
a1.sources.r1.interceptors = i1  
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (^[a-zA-Z]*)\\s([0-9]*$)
a1.sources.r1.interceptors.i1.serializers = s1 s2
# key name
a1.sources.r1.interceptors.i1.serializers.s1.name = word
a1.sources.r1.interceptors.i1.serializers.s2.name = digital 
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000000
a1.channels.c1.transactionCapacity=100#定义sink
a1.sinks.k1.type=logger#绑定
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

测试:

收到:


http://www.ppmy.cn/embedded/139523.html

相关文章

word转pdf保存高清图技巧

1. PPT编辑的图片转矢量图&#xff0c;导入word 2.PPT的图全选复制到visio&#xff0c;再从visio复制到word 3.使用adboe PDF word插件&#xff0c;另存为ADOBE PDF。 图片保存质量 另存为DPF< 导入PDF < 另存为ADOBE PDF 但是ADOBE PDF有时候图片保存出来会一片空白…

uni-app获取安全区域

2024年8月2日 使用自定义导航栏的uni-app项目在真机演示时&#xff0c;可能会出现页面内容被手机的刘海或者状态栏给遮挡了&#xff0c;或者在小程序上给胶囊按钮给档住了。这时候就需要获取刘海屏状态了和胶囊按钮的高度来获取安全渲染区域. 1. CSS内置变量 来自uni-aap官网&…

32.2 prometheus倒排索引统计功能

本节重点介绍 : 获取采集端的高基数metrics的tsdb页面解析tsdb统计函数Stats源码解读 依赖倒排索引统计 获取采集端的高基数metrics tsdb页面解析 Top 10 label names with value count&#xff1a; 标签中value最多的10个Top 10 series count by metric names&#xff1a…

如何提高代理IP的并发能力

在数据采集的领域&#xff0c;时间就是金钱&#xff0c;效率就是生命。代理IP的并发能力直接关系到我们能否快速、高效地获取数据。那么&#xff0c;如何提高代理IP的并发能力呢&#xff1f;今天&#xff0c;我们就来聊聊这个话题&#xff0c;为你的数据采集项目提速。 并发&a…

Vue项目开发 element-UI 前端实现 1到10排列选择的按钮

在 Element UI 中&#xff0c;你可以通过 el-button 来实现按钮的排列选择&#xff0c;例如让用户选择 1 到 10 之间的数字。为了实现这一功能&#xff0c;我们可以使用 v-for 来动态生成 1 到 10 的按钮&#xff0c;并通过按钮点击事件来更新所选的数字。 以下是一个基本的实…

【高等数学学习记录】隐函数及由参数方程所确定的函数的导数、相关变化率

一、知识点 &#xff08;一&#xff09;隐函数的导数 显函数 对于形如 y s i n x ysinx ysinx 这种等号左端是因变量&#xff0c;右端是含有自变量的式子&#xff0c;当自变量取定义域内任一值时&#xff0c;由这个式子能确定对应的函数值&#xff0c;这种方式表达的函数叫做…

FPGA上板项目(六)——UART测试,串口收发

目录 实验内容串口接收模块模块框图时序波形仿真结果 顶层模块设计模块框图时序波形代码调整仿真结果上板测试 实验内容 将接收到的数据发送出去&#xff0c;实现串口回环。 串口发送的内容 FPGA上板项目&#xff08;五&#xff09;——UART测试&#xff0c;串口发送 已做过阐…

力扣 LeetCode 111. 二叉树的最小深度(Day7:二叉树)

解题思路&#xff1a; 用后序遍历 题目要求的最小深度为根节点到叶子节点的最小深度&#xff0c;注意是到根节点&#xff0c;所以如图所示假设&#xff08;没有9这个节点&#xff09;是需要返回3的&#xff0c;而不是1&#xff08;根节点左子树为空的情况&#xff09;&#x…