Flume环境搭建和简介

news/2024/11/30 2:52:31/

这里写目录标题

  • 一、Flume环境搭建
    • 1.前提准备
    • 2.搭建
  • 二、Flume介绍
    • 1.简介
    • 2.Flume NG介绍
      • Flume特点
      • Flume的核心概念
    • 3.Flume简单实用示例
      • 安装netcat和telnet
      • netcat 数据源测试 Flume
      • Exec数据源测试Flume
      • Spooling Directory Source
      • Spooling Directory Source数据源测试Flume,并上传至HDFS
      • Spooling Directory Source数据源带有过滤器

一、Flume环境搭建

1.前提准备

  • hadoop环境搭建完成
    可参考:Centos7中安装配置Hadoop(伪分布式搭建)
  • 安装包
    apache-flume-1.6.0-bin.tar.gz ,需要自取:Flume安装包(提取码:6z6z)

2.搭建

  • 将压缩包上传至/opt下
  • 解压压缩包
    tar -zxvf flume-ng-1.6.0-cdh5.14.0.tar.gz
  • 修改名称
    mv apache-flume-1.6.0-cdh5.14.0-bin/ flume160
  • 修改配置
    ①进入conf目录:cd /opt/flume160/conf
    ②复制flume-env.sh.template --> flume-env.sh:cp ./flume-env.sh.template flume-env.sh
    ③修改 flume-env.sh
    vi /opt/flume160/conf/ flume-env.sh
    修改JAVA_HOME:export JAVA_HOME=/opt/jdk1.8.0_221
    调整内存大小:export JAVA_OPTS="-Xms2048m -Xmx2048m -Dcom.sun.management.jmxremote"

二、Flume介绍

1.简介

Flume 是 cloudera 开发的实时日志收集系统,与之相似的还有facebook的scribe,apache的chukwa,linkedin的kafka等。
本次将对Flume进行介绍。Flume 初始的发行版本目前被统称为 Flume OG(original generation),但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等缺点暴露出来。为了解决这些问题,cloudera 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本统称为 Flume NG(next generation),并纳入了apache旗下。
Flume 在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG,0.9.x的称为Flume OG。

2.Flume NG介绍

Flume特点

Flume的数据流由事件(Event)贯穿始终。事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。可以把Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source
在这里插入图片描述

  • Flume的可靠性
    当节点出现故障时,日志能够被传送到其他节点上而不会丢失
    Flume提供了三种级别的可靠性保障,从强到弱依次分别为:end-to-end、Store on failure、Besteffort
    ①end-to-end:收到数据agent首先将event写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。
    ②Store on failure:这也是scribe采用的策略,当数据接收方crash时,将数据写到本地,待恢复后,继续发送。
    ③Besteffort:数据发送到接收方后,不会进行确认。

  • Flume的可恢复性
    依靠Channel,Flume中的数据可以恢复。推荐使用FileChannel,事件持久化在本地文件系统里(性能较差)。

Flume的核心概念

  • Client:Client生产数据,运行在一个独立的线程
  • Event: 一个数据单元,消息头和消息体组成。Events可以是日志记录、 avro 对象等。
  • Flow: Event从源点到达目的点的迁移的抽象。
  • Agent: 一个独立的Flume进程,包含组件Source、 Channel、 Sink。Agent使用JVM运行Flume,每台机器运行一个agent,但是可以在一个agent中包含多个sources和sinks。
  • Source: 数据收集组件。Flume提供了各种source的实现,包括Avro Source、Exec Source、Spooling Directory Source、NetCat Source、Syslog Source、Syslog TCP Source、Syslog UDP Source、HTTP Source、HDFS Source,etc。如果内置的Source无法满足需要, Flume还支持自定义Source
  • Channel: 中转Event的一个临时存储,保存由Source组件传递过来的Event
  • Sink: 从Channel中读取并移除Event,也可以将Event传递到FlowPipeline中的下一个Agent。Flume也提供了各种sink的实现,包括HDFS sink、Logger sink、Avro sink、File Roll sink、Null sink、HBase sink,etc。

3.Flume简单实用示例

安装netcat和telnet

  • netcate
    1.监听服务器端口,并与客户端通信(最多只能接收一个客户端)
    2.对指定服务器进行端口扫描
    3.作为客户端连接到远程服务器进行通信
  • telnet
    1.连接服务器端口,并进行通信
    2.登录远程telnet服务器,使用命令行对其进行控制
  • 安装命令
    yum install -y nc
    yum install telnet-server.x86_64
    yum install telnet.x86_64
  • 测试
    开启两个窗口
    一个充当server端,输入:nc -lk 7777
    一个充当client端,输入:telnet localhost 7777
    在客户端发送消息,服务端可以接收到,则成功
    在这里插入图片描述

netcat 数据源测试 Flume

这个source十分像nc -k -l [host] [port]这个命令,监听一个指定的端口,把从该端口收到的TCP协议的文本数据按行转换为Event,它能识别的是带换行符的文本数据,同其他Source一样,解析成功的Event数据会发送到channel中

  • 在Flume的conf目录下新建一个文件夹,用于存放配置文件,我创建的为jobFile
    mkdir /opt/flume160/conf/jobFile
  • 在jobFile中新建 netcat-flume-loggere.conf
    vi /opt/flume160/conf/jobFile/netcat-flume-logger.conf
# 配置Agent a1各个组件的名称
a1.sources=r1    #Agent a1 的source有一个,叫做r1
a1.channels=c1   #Agent a1 的channel也有一个,叫做c1
a1.sinks=k1		 #Agent a1 的sink有一个,叫做k1# 配置Agent a1的source r1的属性
a1.sources.r1.type=netcat #使用的是NetCat TCP Source,这个的是别名,Flume内置的一些组件都是有别名的,没有别名填全限定类名
a1.sources.r1.bind=localhost  #NetCat TCP Source监听的hostname,这个是本机
a1.sources.r1.port=7777  #监听的端口# 配置Agent a1的sink k1的属性
a1.sinks.k1.type=logger # sink使用的是Logger Sink,这个配的也是别名# 配置Agent a1的channel c1的属性,channel是用来缓冲Event数据的
a1.channels.c1.type=memory #channel的类型是内存channel
a1.channels.c1.capacity=1000 #缓冲数据容量
a1.channels.c1.transactionCapacity=1000 #事务容量# 把source和sink绑定到channel上
a1.sources.r1.channels=c1  #与source r1绑定的channel有一个,叫做c1
a1.sinks.k1.channel=c1 #与sink k1绑定的channel有一个,叫做c1
  • 加载这个配置文件启动Flume
    bin目录下的flume-ng是Flume的启动脚本,启动时需要指定Agent的名字、配置文件的目录和配置文件的名称。
    进入flume目录执行以下语句:
// --conf/-c:表示配置文件存储在conf/目录
// --name/-n:表示给agent起名为a1,需要与自定义conf文件中的一致
// --conf-file/-f:flume本次启动读取的配置文件是在job文件夹下的flume-telnet.conf文件
// -Dflume.root.logger=INFO,console:-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error
./bin/flume-ng agent --name a1 --conf ./conf/ --conf-file ./conf/jobFile/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
  • 打开另一个窗口,开启客户端
    telnet localhost 7777
  • 测试
    在这里插入图片描述

Exec数据源测试Flume

这个source在启动时运行给定的Unix命令,并期望该进程在标准输出上连续生成数据。 如果进程因任何原因退出, 则source也会退出并且不会继续生成数据。 综上来看cat [named pipe]或tail -F [file]这两个命令符合要求可以产生所需的结果

  • 在jobFile中新建 file-flume-logger.conf
    vi /opt/flume160/conf/jobFile/file-flume-logger.conf
a2.sources=r1
a2.channels=c1
a2.sinks=k1
// 指定source的type为exec类型
a2.sources.r1.type=exec
// tail -f会把文件里的最尾部的内容显示在屏幕上,并且不断刷新
a2.sources.r1.command=tail -f /opt/bigdata/flume160/conf/jobFile/tmp/tmp.txta2.channels.c1.type=memory
a2.channels.c1.capacity=1000
a2.channels.c1.transactionCapacity=1000a2.sinks.k1.type=loggera2.sources.r1.channels=c1
a2.sinks.k1.channel=c1
  • 加载配置文件启动Flume
 ./bin/flume-ng agent --name a2 --conf ./conf/ --conf-file ./conf/jobFile/file-flume-logger.conf -Dflume.root.logger=INFO,console

在这里插入图片描述

Spooling Directory Source

这个Source允许你把要收集的文件放入磁盘上的某个指定目录。它会将监视这个目录中产生的新文件,并在新文件出现时从新文件中解析数据出来。数据解析逻辑是可配置的。在新文件被完全读入Channel之后会重命名该文件以示完成(1.txt被Flume收集完成后会重命名为1.txt.COMPLETED)。与Exec Source不同,Spooling Directory Source是可靠的,即使Flume重新启动或被kill,也不会丢失数据。同时作为这种可靠性的代价,指定目录中的文件必须是不可变的、唯一命名的。

  • 在jobFile中新建events-flume-logger.conf
    `vi /opt/flume160/conf/jobFile/events-flume-logger.conf
events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSinkevents.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/bigdata/flume160/conf/jobFile/dataSourceFile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=10000
// 指定会被收集的文件名正则表达式,此处格式必须满足如:events_2020-12-03.csv形式
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csvevents.channels.eventsChannel.type=file
// 记录检查点的文件的存储目录
events.channels.eventsChannel.checkpointDir=/opt/bigdata/flume160/conf/jobFile/checkPointFile/events
用于存储data文件
events.channels.eventsChannel.dataDirs=/opt/bigdata/flume160/conf/jobFile/dataChannelFile/eventsevents.sinks.eventsSink.type=loggerevents.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel
  • 加载配置文件启动Flume
 ./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/jobFile/events-flume-logger.conf -Dflume.root.logger=INFO,console
  • 将符合要求的文件名称的文件传入dataSource/events下
cp ./events.csv /opt/bigdata/flume160/conf/jobkb09/dataSourceFile/events/events_2020-11-30.csv

Spooling Directory Source数据源测试Flume,并上传至HDFS

  • 在jobFile中新建events-flume-logger2.conf
    `vi /opt/flume160/conf/jobFile/events-flume-logger2.conf
events.sources=eSource
events.channels=eChannel
events.sinks=eSinkevents.sources.eSource.type=spooldir
events.sources.eSource.spoolDir=/opt/flumevents60/conf/jobFile/dataSourceFile/events2
events.sources.eSource.deserializer=LINE
events.sources.eSource.deserializer.maxLineLength=10000
events.sources.eSource.includePattern=es_[0-9]{4}-[0-9]{2}-[0-9]{2}.csvevents.channels.eChannel.type=file
events.channels.eChannel.checkpointDir=/opt/flumevents60/conf/jobFile/checkPointFile/events2
events.channels.eChannel.dataDirs=/opt/flumevents60/conf/jobFile/dataChannelFile/events2events.sinks.eSink.type=hdfs // 组件类型,这个Sink将Event写入Hadoop分布式文件系统
events.sinks.eSink.hdfs.fileType=DataStream // 文件格式
events.sinks.eSink.hdfs.filePrefix=events // Flume在HDFS文件夹下创建新文件的固定前缀
events.sinks.eSink.hdfs.fileSuffix=.csv // Flume在HDFS文件夹下创建新文件的后缀
events.sinks.eSink.hdfs.path=hdfs://192.168.233.133:9000/testfile/user/event/%Y-%m-%d  // HDFS目录路径,%Y-%m-%d为日期转义符,可根据本地时间显示日期
events.sinks.eSink.hdfs.useLocalTimeStamp=true // 使用日期时间转义符时是否使用本地时间戳
events.sinks.eSink.hdfs.batchSize=640  // 读取并向channel发送数据时单次发送的最大数量
events.sinks.eSink.hdfs.rollInterval=20 // 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件)
events.sinks.eSink.hdfs.rollCount=0 // 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
events.sinks.eSink.hdfs.rollSize=120000000 // 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件)events.sources.eSource.channels=eChannel
events.sinks.eSink.channel=eChannel
  • 加载配置文件启动Flume
 ./bin/flume-ng agent --name events --conf ./conf/ --conf-file ./conf/jobFile/events-flume-logger2.conf -Dflume.root.logger=INFO,console
  • 将符合要求的文件名称的文件传入dataSource/events下
cp ./events.csv /opt/bigdata/flume160/conf/jobkb09/dataSourceFile/events2/events_2020-11-30.csv

Spooling Directory Source数据源带有过滤器

  • 在jobFile中新建user-flume-hdfs.conf
    vi user-flume-hdfs.conf
users.sources=usersSource
users.channels=usersChannel
users.sinks=usersSinkusers.sources.usersSource.type=spooldir
users.sources.usersSource.spoolDir=/opt/flume160/conf/jobFile/dataSourceFile/users
users.sources.usersSource.includePattern=users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.sources.usersSource.deserializer=LINE
users.sources.usersSource.deserializer.maxLineLength=10000
users.sources.usersSource.interceptors=head_filter // 定义过滤器名称
users.sources.usersSource.interceptors.head_filter.type=regex_filter // 过滤器类型,正则过滤拦截器
users.sources.usersSource.interceptors.head_filter.regex=^user_id* // 过滤拦截内容:以user_id开头的文件
users.sources.usersSource.interceptors.head_filter.excludeEvents=true 如果为true,被正则匹配到的Event会被丢弃;如果为false,不被正则匹配到的Event会被丢弃users.channels.usersChannel.type=file
users.channels.usersChannel.checkpointDir=/opt/flume160/conf/jobFile/cheakPointFile/users
users.channels.usersChannel.dataDirs=/opt/flume160/conf/jobFile/dataChannelFile/usersusers.sinks.usersSink.type=hdfs
users.sinks.usersSink.hdfs.fileType=DataStream
users.sinks.usersSink.hdfs.filePrefix=users
users.sinks.usersSink.hdfs.fileSuffix=.csv
users.sinks.usersSink.hdfs.path=hdfs://192.168.233.133:9000/testfile/users/users/%Y-%m-%d
users.sinks.usersSink.hdfs.useLocalTimeStamp=true
users.sinks.usersSink.hdfs.batchSize=640
users.sinks.usersSink.hdfs.rollCount=0
users.sinks.usersSink.hdfs.rollSize=120000000
users.sinks.usersSink.hdfs.rollInterval=20users.sources.usersSource.channels=usersChannel
users.sinks.usersSink.channel=usersChannel

更多参数和使用方法可以参照Flume使用手册:Flume使用手册(英文版)、Flume使用手册(中文版)


http://www.ppmy.cn/news/560170.html

相关文章

【验证码逆向专栏】某验二代滑块验证码逆向分析

声明 本文章中所有内容仅供学习交流,抓包内容、敏感网址、数据接口均已做脱敏处理,严禁用于商业用途和非法用途,否则由此产生的一切后果均与作者无关,若有侵权,请联系我立即删除! 本文章未经许可禁止转载…

ThreadPoolExecutor源码剖析

ThreadPoolExecutor源码涉及到的内容比较多,需要一点点的去啃和查看… ThreadPoolExecutor的核心属性 ThreadPoolExecutor的核心属性主要就是CTL。基于CTL获取到线程池的状态以及工作线程个数。 ctl是一个int类型的整数,內部基于AtomicInteger&#xff0…

Windows——Win10系统笔记本的触摸板失灵怎么办?

Win10系统用笔记本触摸板失灵怎么办?

笔记本触摸板双指失灵

华硕笔记本是驱动出现问题,找到驱动自行修复即可,如图

笔记本触摸板没反应怎么回事?笔记本触控板失灵解决办法

华硕飞行堡垒FX504GE-FX80GE 系统WIN10专业版64位 驱动貌似正常,可以尝试以下几方面检查排除故障: 1、去华硕官网下载最新的触控板驱动 2、进入bios检查触控板是否被禁用 3、按快捷键打开触控板 4、用驱动人生更新驱动 5、检查系统和触控板控制面板设置…

笔记本 触摸板无法使用 解决办法

拔掉全部外设,U盘,电源线 关机后 按住开机键20秒以上 之后在开机 我是这样解决的 来源 https://tieba.baidu.com/p/6663546687

java面试题 --- IO

1. IO流的分类有哪些? 按照流向分为输入流和输出流;按照实现功能分为节点流和处理流。节点就是从一个特定的地方读写数据,包括数组操作,管道操作和文件操作;处理流对已存在的流进行封装和处理,包括缓冲操作…

联想笔记本触摸板失灵的一个原因及解决方法

今天不小心点击了笔记本上面的飞行模式,关闭飞行模式后,发现触摸板没反应了,没有禁用、更新驱动都没用。最后将设备管理器里面**“鼠标和其他指针设备”中的“ELAN pointing device”禁用就可以了。**