Apache Beam构建批处理任务

news/2025/1/18 8:49:59/

在之前的博客中我介绍了如何用Beam来构建一个实时处理Kafka消息的流处理任务,Apache Beam构建流处理任务_gzroy的博客-CSDN博客,这次我将介绍一下如何构建批处理的任务。对于不同的业务场景来说,我们需要选择一个合适的方案,并不是说只有流处理任务是最好的。例如对于一些实时性要求不高的场景,例如定期汇总生成报表数据,这时候用批处理任务更加合适,因为这样对于系统资源的利用更充分,费用也更少。我比较过在Google云平台采用流处理和批处理的方式对报表数据进行处理,批处理可以大大节省费用。

下面以一个车辆里程报表的数据处理为例,介绍一下如何用Beam来编写一个批处理任务。

业务场景介绍

平台需要定期生成车辆上报的里程信息,并汇总为小时、天、月等不同时间粒度的里程数据。车辆上报的里程消息将通过Kafka平台传递。平台收到里程消息之后,需要每个小时进行一次处理,剔除异常的里程数据,计算每小时车辆的行驶里程。车辆的ETL消息和每小时的里程数据将保存为Parquet文件,以节省费用。

车辆的消息格式如下:

{"telemetry": {"odometer": {"odometer": 1234,}}, "timestamp": 1682563540419,"deviceId": "abc123",
}

生成Pipeline

这里基于JAVA来实现,首先新建一个项目,然后新建一个MileageBatch.java的文件,然后在里面定义一个Options接口,用于设置Pipeline运行时所需要的参数。

public interface Options extends PipelineOptions {@Description("Apache Kafka topic to read from.")@Default.String("test")@Validation.RequiredString getInputTopic();void setInputTopic(String value);@Description("ProjectID")@Default.String("gcpcvcsolutionintegdev01-c7bb")String getProjectId();@Description("Apache Kafka bootstrap servers in the form 'hostname:port'.")@Default.String("localhost:9092")String getBootstrapServer();void setBootstrapServer(String value);@Description("Define max_speed km/h for distance abnormal.")@Default.Integer(300)Integer getMaxSpeed();void setMaxSpeed(Integer value);@Description("Use start time to set start offset")long getStartTime();void setStartTime(long value);@Description("Use stop time to set end offset")long getStopTime();void setStopTime(long value);@Description("Use start time to filter out the required data")long getProcessStartTime();void setProcessStartTime(long value);@Description("Use start time to filter out the required data")long getProcessStopTime();void setProcessStopTime(long value);@Description("GS storage to store the raw data")@Default.String("gs://test/raw")String getBucket();void setBucket(String value);
}    

读取Kafka消息

KafkaIO读取的消息默认是无边界的数据,但是Batch任务是处理的有边界数据,因此我们需要设定要处理的Kafka信息的开始和结束的offset。KafkaIO提供了withStartReadTime和withStopReadTime这两个函数来获取和指定时间戳相等或更大的对应的offset。例如我们要获取2023年5月28日9点到10点范围的数据,那么我们可以把9点和10点对应的时间戳传入这两个函数。有一点需要注意的是,如果KafkaIO找不到比大于指定时间戳的offset消息,那就会报错。因此通常对于withStopReadTime,我们最好先判断一下消息最晚到的时间戳是多少。以下代码是读取Kafka指定时间范围的数据并转换为PCollection

PCollection<String> input =pipeline.apply("Read messages from Kafka",KafkaIO.<String, String>read().withBootstrapServers(options.getBootstrapServer()).withTopic(options.getInputTopic()).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class).withConsumerConfigUpdates(ImmutableMap.of("group.id", "mileage_app")).withStartReadTime(Instant.ofEpochMilli(start_read_time)).withStopReadTime(Instant.ofEpochMilli(stop_read_time)).withoutMetadata()).apply("Get message contents", Values.<String>create());

解析并过滤消息

因为Kafka消息的时间戳和消息正文的时间戳通常是不一致的。因此在上一步获取的数据中,我们需要进一步过滤出我们需要的消息。例如车辆在8:59分发了一条消息,正文的时间戳是8:59分,但是消息到达Kafa的时候,消息的创建时间戳是9:00,那么我们需要丢弃这条消息。以下代码是把获取到的Kafka字符串消息处理为JSON对象,并过滤掉不符合要求的消息。

private static final Gson GSON = new Gson();
private static final TupleTag<UtilizationMsg> utilizationMsgTag = new TupleTag<UtilizationMsg>(){};
private static final TupleTag<String> otherMsgTag = new TupleTag<String>(){};PCollectionTuple msgTuple = input.apply("Filter message", ParDo.of(new DoFn<String, UtilizationMsg>() {@ProcessElementpublic void processElement(@Element String element, MultiOutputReceiver out) {UtilizationMsg msg = GSON.fromJson(element, UtilizationMsg.class);if (msg.timestamp==0 || msg.deviceId==null || msg.telemetry.odometer==null) {out.get(otherMsgTag).output(element);} else {if (msg.timestamp<start_process_time | msg.timestamp>stop_process_time) {out.get(otherMsgTag).output(element);} else {out.get(utilizationMsgTag).output(msg);}}}}).withOutputTags(utilizationMsgTag, TupleTagList.of(otherMsgTag)));  // Get the filter out msg
PCollection<UtilizationMsg> utilizationMsg = msgTuple.get(utilizationMsgTag);

消息的格式定义如下:

public class TelemetryMsg {@DefaultCoder(AvroCoder.class)public static class UtilizationMsg {public long timestamp;public String deviceId;public Telemetry telemetry;}public static class Odometer {public int usageMode;public float odometer;}public static class Telemetry {public Odometer odometer;}
}

保存ETL数据

过滤处理完之后的数据,我们可以保存下来,以备以后数据分析之用。通常ETL的数据都是很多的,为了节省费用,我们可以把数据保存为Parquet格式。这里我把数据保存为Parquet,保存在GCS的文件桶中。在保存的时候,我们要把PCollection里面的数据转化为GenericRecord的格式才能保存,如以下代码:

// Save the raw records to Parquet files on GCS
utilizationMsg.apply("Convert to generic record", ParDo.of(new DoFn<UtilizationMsg, GenericRecord>() {@ProcessElementpublic void processElement(@Element UtilizationMsg element, OutputReceiver<GenericRecord> out) {GenericRecord record = new GenericData.Record(ReflectData.get().getSchema(UtilizationMsg.class));GenericRecord telemetry = new GenericData.Record(ReflectData.get().getSchema(TelemetryMsg.Telemetry.class));GenericRecord odometer = new GenericData.Record(ReflectData.get().getSchema(TelemetryMsg.Odometer.class));odometer.put("odometer", element.telemetry.odometer.odometer);odometer.put("usageMode", element.telemetry.odometer.usageMode);telemetry.put("odometer", odometer);record.put("timestamp", element.timestamp);record.put("deviceId", element.deviceId);record.put("telemetry", telemetry);out.output(record);//LOG.info("deviceId: {}, timestamp: {}, odometer:{}", element.deviceId, element.timestamp, element.telemetry.odometer.odometer);}})).setCoder(AvroCoder.of(GenericRecord.class, AvroCoder.of(UtilizationMsg.class).getSchema())).apply("Save to parquet files", FileIO.<GenericRecord>write().via(ParquetIO.sink(AvroCoder.of(UtilizationMsg.class).getSchema())).to(options.getBucket()).withSuffix(".parquet"));

数据分组

ETL得到的数据是每辆车的里程表的数据。我们需要对数据进行分组,按照车辆的vehicle_id来进行分组,以便后面步骤的计算。这里需要给数据添加Key,然后再按照Key来进行分组。

// Add deviceid as key and group
PCollection<KV<String, Iterable<UtilizationMsg>>> grouped_records = utilizationMsg.apply("Add DeviceID as Key", ParDo.of(new DoFn<UtilizationMsg, KV<String, UtilizationMsg>>() {@ProcessElementpublic void processElement(@Element UtilizationMsg element, OutputReceiver<KV<String, UtilizationMsg>> out) {out.output(KV.of(element.deviceId, element));//LOG.info("deviceId: {}, timestamp: {}, odometer:{}", element.deviceId, element.timestamp, element.telemetry.odometer.odometer);}})).apply(GroupByKey.<String, UtilizationMsg>create());

里程计算

最后我们可以对分组后的数据进行计算了。因为分组后的每一组数据代表某一辆车在一个小时内的所有里程表数据,所以我们可以对这个组的数据进行按照时间戳排序,然后以下一个时间点减上一个时间点的数据得到某一段时间的行驶里程,最后汇总即得到一个小时内的总里程。在计算某一段里程的时候,我们可以根据时速是否超过预定义的最大时速来排除掉问题的数据。代码如下:

// Calculate the distance
PCollectionTuple distance = grouped_records.apply("Calculate distance", ParDo.of(new DoFn<KV<String, Iterable<UtilizationMsg>>, DistanceObj>() {@ProcessElementpublic void processElement(@Element KV<String, Iterable<UtilizationMsg>> element, MultiOutputReceiver out) {Iterator<UtilizationMsg> iterator = element.getValue().iterator();List<UtilizationMsg> records = new ArrayList<UtilizationMsg>();while(iterator.hasNext()) {records.add(iterator.next());}Collections.sort(records, new UtilizationMsgCompare());    // Sort the records by timestampfloat total_distance = 0f;float pre_odometer = 0f;long pre_timestamp = 0L;Boolean has_abnormal_data = false;for (Iterator<UtilizationMsg> iter = records.iterator();iter.hasNext();) {UtilizationMsg record = (UtilizationMsg) iter.next();int usageMode = record.telemetry.odometer.usageMode;if (usageMode != min_usage_mode) {continue;}float odometer = record.telemetry.odometer.odometer;if (pre_odometer==0) {pre_odometer = odometer;pre_timestamp = record.timestamp;continue;}if (odometer >= pre_odometer) {float distance = (float) (record.telemetry.odometer.odometer - pre_odometer);float duration = (record.timestamp - pre_timestamp)/1000.0f;   //secondsif(distance <= duration * max_speed) {total_distance += distance;} else {LOG.error("Odometer exceed speed limit: deviceId:{}, timestamp:{}, prevTimestamp:{}, odometer:{}, prevOdometer:{}", record.deviceId, String.valueOf(record.timestamp), String.valueOf(pre_timestamp), String.valueOf(odometer), String.valueOf(pre_odometer));has_abnormal_data = true;}} else {has_abnormal_data = true;LOG.error("Odometer same with previous: deviceId:{}, timestamp:{}, prevTimestamp:{}, odometer:{}, prevOdometer:{}", record.deviceId, String.valueOf(record.timestamp), String.valueOf(pre_timestamp), String.valueOf(odometer), String.valueOf(pre_odometer));}pre_odometer = odometer;pre_timestamp = record.timestamp;}DistanceObj d = new DistanceObj(element.getKey(), Long.parseLong(DateFormat.format(pre_timestamp)), (int)total_distance);if (!has_abnormal_data) {out.get(normalDistanceTag).output(d);}else {String errorMsg = String.format("Abnormal distance found for device: %s, period: %s - %s", element.getKey(), Instant.ofEpochMilli(start_process_time).toDateTime().toString(), Instant.ofEpochMilli(stop_process_time).toDateTime().toString());out.get(abnormalDistanceTag).output(errorMsg);}}}).withOutputTags(normalDistanceTag, TupleTagList.of(abnormalDistanceTag)));

保存里程数据

把计算得到的正常里程数据保存到数据库之中。这里我把数据保存到Postgresql数据库中

PCollection<DistanceObj> normalDistance = distance.get(normalDistanceTag);
// Save the distance records to PG
normalDistance.apply(JdbcIO.<DistanceObj>write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver", "jdbc:postgresql://127.0.0.1:5432/telematics").withUsername("postgres").withPassword("postgres")).withStatement("insert into distance Values (?, ?, ?, ?) ON CONFLICT (deviceId, hour) DO UPDATE SET (distance, process_time) = (excluded.distance, excluded.process_time);").withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<DistanceObj>() {public void setParameters(DistanceObj element, PreparedStatement query) throws SQLException {Timestamp process_time = new Timestamp(new Date().getTime());query.setString(1, element.getDeviceId());query.setString(2, String.valueOf(element.getHour()));query.setInt(3, element.getDistance());query.setString(4, process_time.toString());}}));

运行Pipeline

至此,整个Pipeline已经编写完成。最后就可以运行pipeline了,代码如下:

PipelineResult result = pipeline.run();
try {result.getState();result.waitUntilFinish();
} catch (UnsupportedOperationException e) {// do nothing
} catch (Exception e) {e.printStackTrace();
}


http://www.ppmy.cn/news/97486.html

相关文章

ubuntu22.04安装Docker 基于 Docker搭建测试用例平台 TestLink

两种方式&#xff0c;一般选择官方 一、官方 更新包管理器&#xff1a;sudo apt update安装 Docker 的依赖包&#xff1a;sudo apt install apt-transport-https ca-certificates curl gnupg lsb-release添加 Docker GPG 密钥&#xff1a;curl -fsSL https://download.docker…

Python潮流周刊#3:PyPI 的安全问题

△点击上方“Python猫”关注 &#xff0c;回复“1”领取电子书 你好&#xff0c;我是猫哥。这里记录每周值得分享的 Python 及通用技术内容&#xff0c;部分为英文&#xff0c;已在小标题注明。&#xff08;标题取自其中一则分享&#xff0c;不代表全部内容都是该主题&#xff…

为什么很多企业把35岁视为分水岭

(点击即可收听) 为什么很多企业把35岁视为分水岭 有时候,别人的故事,若干年后,就是自己的故事,只要身在互联网这个行业里,可以说,每个人都避免不了35岁危机 不要五十步笑百步 前阵子,朋友圈一位行业知名大佬,35岁,每天兢兢业业,任劳任怨,本以为安稳渡过3个月试用期,正快要转正时…

全面SOA化的电子电气架构是什么样?

交流群 | 进“传感器群/滑板底盘群/汽车基础软件群/域控制器群”请扫描文末二维码&#xff0c;添加九章小助手&#xff0c;务必备注交流群名称 真实姓名 公司 职位&#xff08;不备注无法通过好友验证&#xff09; 作者 | 张萌宇 在汽车产业向智能化转型的过程中&#xff0c…

Pyside6-第一篇-创建第一个窗口

Hi&#xff0c;今天起开始更新Pyside6教程了&#xff0c;从0-1开始更新&#xff0c;过程比较的久&#xff0c;一点点来。 今天&#xff0c;我们先来搭建环境。 我的环境&#xff1a; ❝ pycharm 2021.3.3(版本随意&#xff0c;只要不是很低就行)Python版本3.95Pyside版本6.50 ❞…

AutoEncoder GAN

AE Auto-Encoder (AE) 是20世纪80年代晚期提出的&#xff0c;它是一种无监督学习算法&#xff0c;使用了反向传播算法&#xff0c;让目标值等于输入值。 是神经网络的一种&#xff0c;经过训练后能尝试将输入复制到输出。三层网络结构&#xff1a;输入- 隐层- 输出自编码网络…

对于 CRC 校验的 学习笔记

参考资料 CRC校验原理及实现 - 知乎 (zhihu.com) <-- 这个讲的特别好&#xff0c;我的博客主要是抄他的&#xff0c;最后加了一点代码库的分析。 [CRC校验]手算与直观演示_哔哩哔哩_bilibili <-- 这个视频非常直观 【FPGA】CRC校验算法从数学原理到代码实现 CRC 参数…

[创业之路-71] :创业思维与打工思维的区别

其实打工思维和创业思维最核心的本质区别是你是否愿意去尝试。 很多时候我打工的时候老板没发现我的潜质&#xff0c;所以我去创业了&#xff0c;这个没有&#xff0c;你打工的时候一定有一项极其长的长项&#xff0c;只不过当时你可能也没意识到&#xff0c;老板没意识到。 …