F、 客户端开发,在maven中引入canal的依赖
com.alibaba.otter
canal.client
1.0.21
代码示例:
package com.example;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CanalClientExample {
public static void main(String[] args) {
while (true) {
//连接canal
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), “example”, “canal”, “canal”);
connector.connect();
//订阅 监控的 数据库.表
connector.subscribe(“demo_db.user_tab”);
//一次取10条
Message msg = connector.getWithoutAck(10);
long batchId = msg.getId();
int size = msg.getEntries().size();
if (batchId < 0 || size == 0) {
System.out.println(“没有消息,休眠5秒”);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//
CanalEntry.RowChange row = null;
for (CanalEntry.Entry entry : msg.getEntries()) {
try {
row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
for (CanalEntry.RowData rowdata : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
Map<String, Object> dataMap = transforListToMap(afterColumnsList);
if (row.getEventType() == CanalEntry.EventType.INSERT) {
//具体业务操作
System.out.println(dataMap);
} else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
//具体业务操作
System.out.println(dataMap);
} else if (row.getEventType() == CanalEntry.EventType.DELETE) {
List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
if (“id”.equals(column.getName())) {
//具体业务操作
System.out.println(“删除的id:” + column.getValue());
}
}
} else {
System.out.println(“其他操作类型不做处理”);
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
//确认消息
connector.ack(batchId);
}
}
}
public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
Map map = new HashMap();
if (afterColumnsList != null && afterColumnsList.size() > 0) {
for (CanalEntry.Column column : afterColumnsList) {
map.put(column.getName(), column.getValue());
}
}
return map;
}
}
2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase
我们有两种方式可以实现,
A、 使用spark任务,通过HQl读取数据,然后再通过hbase的Api插入到hbase中
但是这种做法,效率很低,而且大批量的数据同时插入Hbase,对Hbase的性能影响很大。
在大数据量的情况下,使用BulkLoad可以快速导入,BulkLoad主要是借用了hbase的存储设计思想,因为hbase本质是存储在hdfs上的一个文件夹,然后底层是以一个个的Hfile存在的。HFile的形式存在。Hfile的路径格式一般是这样的:
/hbase/data/default(默认是这个,如果hbase的表没有指定命名空间的话,如果指定了,这个就是命名空间的名字)/<tbl_name>/<region_id>//<hfile_id>
B、 BulkLoad实现的原理就是按照HFile格式存储数据到HDFS上,生成Hfile可以使用hadoop的MapReduce来实现。如果不是hive中的数据,比如外部的数据,那么我们可以将外部的数据生成文件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。
当然我们也可以不事先生成hfile,可以使用spark任务直接从hive中读取数据转换成RDD,然后使用HbaseContext的自动生成Hfile文件,部分关键代码如下:
//将DataFrame转换bulkload需要的RDD格式
val rddnew = datahiveDF.rdd.map(row => {
val rowKey = row.getAsString
fields.map(field => {
val fieldValue = row.getAsString
(Bytes.toBytes(rowKey), Array((Bytes.toBytes(“info”), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
})
}).flatMap(array => {
(array)
})
…
//使用HBaseContext的bulkload生成HFile文件
hbaseContext.bulkLoad[Put](rddnew.map(record => {
val put = new Put(record._1)
record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
}), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), “/tmp/bulkload”)
val conn = ConnectionFactory.createConnection(hBaseConf)
val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
val realTable = conn.getTable(hbTableName)
HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
// bulk load start
val loader = new LoadIncrementalHFiles(hBaseConf)
val admin = conn.getAdmin()
loader.doBulkLoad(new Path(“/tmp/bulkload”),admin,realTable,regionLocator)
sc.stop()
}
…
def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
import scala.collection.JavaConversions._
for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
val family = cells.getKey
for (value <- cells.getValue) {
val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
ret.+=((kfq, CellUtil.cloneValue(value)))
}
}
ret.iterator
}
}
C、pg_bulkload的使用
这是一个支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过外部文件加载的方式,这个工具笔者没有亲自去用过,详细的介绍可以参考:https://my.oschina.net/u/3317105/blog/852785 pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/
3)、基于sqoop的全量导入
Sqoop 是hadoop生态中的一个工具,专门用于外部数据导入进入到hdfs中,外部数据导出时,支持很多常见的关系型数据库,也是在大数据中常用的一个数据导出导入的交换工具。
Sqoop从外部导入数据的流程图如下:
Sqoop将hdfs中的数据导出的流程如下:
本质都是用了大数据的数据分布式处理来快速的导入和导出数据。
4)、HBase中建表,然后Hive中建一个外部表,这样当Hive中写入数据后,HBase中也会同时更新,但是需要注意
A、hbase中的空cell在hive中会补null
B、hive和hbase中不匹配的字段会补null
我们可以在hbase的shell 交互模式下,创建一张hbse表
create ‘bokeyuan’,‘zhangyongqing’
使用这个命令,我们可以创建一张叫bokeyuan的表,并且里面有一个列族zhangyongqing,hbase创建表时,可以不用指定字段,但是需要指定表名以及列族
我们可以使用的hbase的put命令插入一些数据
put ‘bokeyuan’,‘001’,‘zhangyongqing:name’,‘robot’
put ‘bokeyuan’,‘001’,‘zhangyongqing:age’,‘20’
put ‘bokeyuan’,‘002’,‘zhangyongqing:name’,‘spring’
put ‘bokeyuan’,‘002’,‘zhangyongqing:age’,‘18’
可以通过hbase的scan 全表扫描的方式查看我们插入的数据
scan ’ bokeyuan’
我们继续创建一张hive外部表
create external table bokeyuan (id int, name string, age int)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler’
WITH SERDEPROPERTIES (“hbase.columns.mapping” = “:key,user:name,user:age”)
TBLPROPERTIES(“hbase.table.name” = " bokeyuan");
外部表创建好了后,我们可以使用HQL语句来查询hive中的数据了
select * from classes;
OK
1 robot 20
2 spring 18
5)、Debezium+bireme:Debezium for PostgreSQL to Kafka Debezium也是一个通过监控数据库的日志变化,通过对行级日志的处理来达到数据同步,而且Debezium 可以通过把数据放入到kafka,这样就可以通过消费kafka的数据来达到数据同步的目的。而且还可以给多个地方进行消费使用。
Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。
该项目的GitHub地址为:https://github.com/debezium/debezium 这是一个开源的项目。
本来监控数据库,并且在数据变动的时候获得通知其实一直是一件很复杂的事情。关系型数据库的触发器可以做到,但是只对特定的数据库有效,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是不同的,并且需要大量特定的知识和理解特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性。
Debezium正好提供了模块为你做这些复杂的工作。一些模块是通用的,并且能够适用多种数据库管理系统,但在功能和性能方面仍有一些限制。另一些模块是为特定的数据库管理系统定制的,所以他们通常可以更多地利用数据库系统本身的特性来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的支持。
Debezium是一个捕获数据更改(CDC)平台,并且利用Kafka和Kafka Connect实现了自己的持久性、可靠性和容错性。每一个部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一个上游数据库服务器,捕获所有的数据库更改,然后记录到一个或者多个Kafka topic(通常一个数据库表对应一个kafka topic)。Kafka确保所有这些数据更改事件都能够多副本并且总体上有序(Kafka只能保证一个topic的单个分区内有序),这样,更多的客户端可以独立消费同样的数据更改
事件而对上游数据库系统造成的影响降到很小(如果N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,可以把对数据库的压力降到1)。另外,客户端可以随时停止消费,然后重启,从上次停止消费的地方接着消费。每个客户端可以自行决定他们是否需要exactly-once或者at-least-once消息交付语义保证,并且所有的数据库或者表的更改事件是按照上游数据库发生的顺序被交付的。
对于不需要或者不想要这种容错级别、性能、可扩展性、可靠性的应用,他们可以使用内嵌的Debezium connector引擎来直接在应用内部运行connector。这种应用仍需要消费数据库更改事件,但更希望connector直接传递给它,而不是持久化到Kafka里。
更详细的介绍可以参考:https://www.jianshu.com/p/f86219b1ab98
bireme 的github 地址 https://github.com/HashDataInc/bireme
bireme 的介绍:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md
另外Maxwell也是可以实现MySQL到Kafka的消息中间件,消息格式采用Json:
Download:
https://github.com/zendesk/maxwell/releases/download/v1.22.5/maxwell-1.22.5.tar.gz
Source:
https://github.com/zendesk/maxwell
6)、datax
datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。
github地址:https://github.com/alibaba/DataX
A、设计架构:
数据交换通过DataX进行中转,任何数据源只要和DataX连接上即可以和已实现的任意数据源同步
B、框架
核心模块介绍:
-
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
-
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
-
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
-
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
-
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。 DataX的调度决策思路是:
-
DataXJob根据分库分表切分成了100个Task。
-
根据20个并发,DataX计算共需要分配4个TaskGroup。
-
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
优势:
-
每种插件都有自己的数据转换策略,放置数据失真;
-
提供作业全链路的流量以及数据量运行时监控,包括作业本身状态、数据流量、数据速度、执行进度等。
-
由于各种原因导致传输报错的脏数据,DataX可以实现精确的过滤、识别、采集、展示,为用户提过多种脏数据处理模式;
-
精确的速度控制
-
健壮的容错机制,包括线程内部重试、线程级别重试;
从插件视角看框架
-
Job:是DataX用来描述从一个源头到目的的同步作业,是DataX数据同步的最小业务单元;
-
Task:为最大化而把Job拆分得到最小的执行单元,进行并发执行;
-
TaskGroup:一组Task集合,在同一个TaskGroupContainer执行下的Task集合称为TaskGroup;
-
JobContainer:Job执行器,负责Job全局拆分、调度、前置语句和后置语句等工作的工作单元。类似Yarn中的JobTracker;
-
TaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,类似Yarn中的TAskTacker。
总之,Job拆分为Task,分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。
物理执行有三种运行模式:
-
Standalone:单进程运行,没有外部依赖;
-
Local:单进程运行,统计信息,错误信息汇报到集中存储;
-
Distrubuted:分布式多线程运行,依赖DataX Service服务;
总体来说,当JobContainer和TaskGroupContainer运行在同一个进程内的时候就是单机模式,在不同进程执行就是分布式模式。
如果需要开发插件,可以看zhege这个插件开发指南: https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md
数据源支持情况:
| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
| — | — | — | — | — |
| RDBMS 关系型数据库 | MySQL | √ | √ | 读 、写 |
| | Oracle | √ | √ | 读 、写 |
| | SQLServer | √ | √ | 读 、写 |
| | PostgreSQL | √ | √ | 读 、写 |
| | DRDS | √ | √ | 读 、写 |
| | 通用RDBMS(支持所有关系型数据库) | √ | √ | 读 、写 |
| 阿里云数仓数据存储 | ODPS | √ | √ | 读 、写 |
| | ADS | | √ | 写 |
| | OSS | √ | √ | 读 、写 |
| | OCS | √ | √ | 读 、写 |
| NoSQL数据存储 | OTS | √ | √ | 读 、写 |
| | Hbase0.94 | √ | √ | 读 、写 |
| | Hbase1.1 | √ | √ | 读 、写 |
| | Phoenix4.x | √ | √ | 读 、写 |
| | Phoenix5.x | √ | √ | 读 、写 |
| | MongoDB | √ | √ | 读 、写 |
| | Hive | √ | √ | 读 、写 |
| 无结构化数据存储 | TxtFile | √ | √ | 读 、写 |
| | FTP | √ | √ | 读 、写 |
| | HDFS | √ | √ | 读 、写 |
| | Elasticsearch | | √ | 写 |
| 时间序列数据库 | OpenTSDB | √ | | 读 |
| | TSDB | | √ | 写 |
7)、OGG
OGG 一般主要用于Oracle数据库。即Oracle GoldenGate是Oracle的同步工具 ,可以实现两个Oracle数据库之间的数据的同步,也可以实现Oracle数据同步到Kafka,相关的配置操作可以参考如下:
https://blog.csdn.net/dkl12/article/details/80447154
https://www.jianshu.com/p/446ed2f267fa
http://blog.itpub.net/15412087/viewspace-2154644/
8)、databus
Databus是一个实时的、可靠的、支持事务的、保持一致性的数据变更抓取系统。 2011年在LinkedIn正式进入生产系统,2013年开源。
Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更。
Databus的传输层端到端延迟是微秒级的,每台服务器每秒可以处理数千次数据吞吐变更事件,同时还支持无限回溯能力和丰富的变更订阅功能。
github:https://github.com/linkedin/databus
databus架构设计:
-
来源独立:Databus支持多种数据来源的变更抓取,包括Oracle和MySQL。
-
可扩展、高度可用:Databus能扩展到支持数千消费者和事务数据来源,同时保持高度可用性。
-
事务按序提交:Databus能保持来源数据库中的事务完整性,并按照事务分组和来源的提交顺寻交付变更事件。
-
低延迟、支持多种订阅机制:数据源变更完成后,Databus能在微秒级内将事务提交给消费者。同时,消费者使用Databus中的服务器端过滤功能,可以只获取自己需要的特定数据。
-
无限回溯:这是Databus最具创新性的组件之一,对消费者支持无限回溯能力。当消费者需要产生数据的完整拷贝时(比如新的搜索索引),它不会对数据库产生任何额外负担,就可以达成目的。当消费者的数据大大落后于来源数据库时,也可以使用该功能。
Databus Relay中继的功能主要包括:
-
从Databus来源读取变更行,并在内存缓存内将其序列化为Databus变更事件
-
监听来自Databus客户端(包括Bootstrap Producer)的请求,并传输新的Databus数据变更事件
Databus客户端的功能主要包括:
-
检查Relay上新的数据变更事件,并执行特定业务逻辑的回调
-
如果落后Relay太多,向Bootstrap Server发起查询
-
新Databus客户端会向Bootstrap Server发起bootstrap启动查询,然后切换到向中继发起查询,以完成最新的数据变更事件
-
单一客户端可以处理整个Databus数据流,或者可以成为消费者集群的一部分,其中每个消费者只处理一部分流数据
Databus Bootstrap Producer的功能有:
-
检查中继上的新数据变更事件
-
将变更存储在MySQL数据库中
-
MySQL数据库供Bootstrap和客户端使用
Databus Bootstrap Server的主要功能,监听来自Databus客户端的请求,并返回长期回溯数据变更事件。
更多可以参考 databus社区wiki主页:https://github.com/linkedin/Databus/wiki
Databus和canal的功能对比:
|
对比项
|
|
Databus
|
canal
|
结论
|
| — | — | — | — | — |
|
支持的数据库
|
|
mysql, oracle
|
mysql(据说内部版本支持oracle)
|
Databus目前支持的数据源更多
|
|
业务开发
|
|
业务只需要实现事件处理接口
|
事件处理外,需要处理ack/rollback,
反序列化异常等
|
Databus开发接口用户友好度更高
|
|
服务模型
|
relay
|
relay可以同时服务多个client
|
一个server instance只能服务一个client
(受限于server端保存拉取位点)
|
Databus服务模式更灵活
|
|
|
client
|
client可以拉取多个relay的变更,
访问的relay可以指定拉取某些表某些分片的变更
|
client只能从一个server拉取变更,
而且只能是拉取全量的变更
|
|
可扩展性
|
|
client可以线性扩展,处理能力也能线性扩展
(Databus可识别pk,自动做数据分片)
|
client无法扩展
|
Databus扩展性更好
|
|
可用性
|
client ha
|
client支持cluster模式,每个client处理一部分数据,
某个client挂掉,其他client自动接管对应分片数据
|
主备client模式,主client消费,
如果主client挂掉,备client可自动接管
|
Databus实时热备方案更成熟
|
|
|
relay/server ha
|
多个relay可连接到同一个数据库,
client可以配置多个relay,relay故障启动切换
|
主备relay模式,relay通过zk进行failover
|
canal主备模式对数据库影响更小
|
|
|
故障对上游
数据库的影响
|
client故障,bootstrap会继续拉取变更,
client恢复后直接从bootstrap拉取历史变更
|
client故障会阻塞server拉取变更,
client恢复会导致server瞬时从数据库拉取大量变更
|
Databus本身的故障对数据库影响几乎为0
|
|
系统状态监控
|
|
程序通过http接口将运行状态暴露给外部
|
暂无
|
Databus程序可监控性更好
|
|
开发语言
|
|
java,核心代码16w,测试代码6w
|
java,4.2w核心代码,6k测试代码
|
Databus项目更成熟,当然学习成本也更大
|
9)、gobblin
Gobblin是用来整合各种数据源的通用型ETL框架,在某种意义上,各种数据都可以在这里“一站式”的解决ETL整个过程,专为大数据采集而生,易于操作和监控,提供流式抽取支持。主要用于Kafka的数据同步到HDFS。
该框架来源于kafka的东家LinkedIn。大体的架构如下:
Gobblin的功能真的是非常的全。底层支持三种部署方式,分别是standalone,mapreduce,mapreduce on yarn。可以方便快捷的与Hadoop进行集成,上层有运行时任务调度和状态管理层,可以与Oozie,Azkaban进行整合,同时也支持使用Quartz来调度(standalone模式默认使用Quartz进行调度)。对于失败的任务还拥有多种级别的重试机制,可以充分满足我们的需求。再上层呢就是由6大组件组成的执行单元了。这6大组件的设计也正是Gobblin高度可扩展的原因。
Gobblin组件
Gobblin提供了6个不同的组件接口,因此易于扩展并进行定制化开发。分别是:
-
source
-
extractor
-
convertor
-
quality checker
-
writer
-
publisher
Source主要负责将源数据整合到一系列workunits中,并指出对应的extractor是什么。这有点类似于Hadoop的InputFormat。
Extractor则通过workunit指定数据源的信息,例如kafka,指出topic中每个partition的起始offset,用于本次抽取使用。Gobblin使用了watermark的概念,记录每次抽取的数据的起始位置信息。
Converter顾名思义是转换器的意思,即对抽取的数据进行一些过滤、转换操作,例如将byte arrays 或者JSON格式的数据转换为需要输出的格式。转换操作也可以将一条数据映射成0条或多条数据(类似于flatmap操作)。
Quality Checker即质量检测器,有2中类型的checker:record-level和task-level的策略。通过手动策略或可选的策略,将被check的数据输出到外部文件或者给出warning。
Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中。
Publisher就是根据writer写出的路径,将数据输出到最终的路径。同时其提供2种提交机制:完全提交和部分提交;如果是完全提交,则需要等到task成功后才pub,如果是部分提交模式,则当task失败时,有部分在staging directory的数据已经被pub到输出路径了。
Gobblin执行流程
以方便快捷的与Hadoop进行集成,上层有运行时任务调度和状态管理层,可以与Oozie,Azkaban进行整合,同时也支持使用Quartz来调度(standalone模式默认使用Quartz进行调度)。对于失败的任务还拥有多种级别的重试机制,可以充分满足我们的需求。再上层呢就是由6大组件组成的执行单元了。这6大组件的设计也正是Gobblin高度可扩展的原因。
Gobblin组件
Gobblin提供了6个不同的组件接口,因此易于扩展并进行定制化开发。分别是:
-
source
-
extractor
-
convertor
-
quality checker
-
writer
-
publisher
Source主要负责将源数据整合到一系列workunits中,并指出对应的extractor是什么。这有点类似于Hadoop的InputFormat。
Extractor则通过workunit指定数据源的信息,例如kafka,指出topic中每个partition的起始offset,用于本次抽取使用。Gobblin使用了watermark的概念,记录每次抽取的数据的起始位置信息。
Converter顾名思义是转换器的意思,即对抽取的数据进行一些过滤、转换操作,例如将byte arrays 或者JSON格式的数据转换为需要输出的格式。转换操作也可以将一条数据映射成0条或多条数据(类似于flatmap操作)。
Quality Checker即质量检测器,有2中类型的checker:record-level和task-level的策略。通过手动策略或可选的策略,将被check的数据输出到外部文件或者给出warning。
Writer就是把导出的数据写出,但是这里并不是直接写出到output file,而是写到一个缓冲路径( staging directory)中。当所有的数据被写完后,才写到输出路径以便被publisher发布。Sink的路径可以包括HDFS或者kafka或者S3中,而格式可以是Avro,Parquet,或者CSV格式。同时Writer也可是根据时间戳,将输出的文件输出到按照“小时”或者“天”命名的目录中。
Publisher就是根据writer写出的路径,将数据输出到最终的路径。同时其提供2种提交机制:完全提交和部分提交;如果是完全提交,则需要等到task成功后才pub,如果是部分提交模式,则当task失败时,有部分在staging directory的数据已经被pub到输出路径了。
Gobblin执行流程