在之前的博客中我介绍了如何用Beam来构建一个实时处理Kafka消息的流处理任务,Apache Beam构建流处理任务_gzroy的博客-CSDN博客,这次我将介绍一下如何构建批处理的任务。对于不同的业务场景来说,我们需要选择一个合适的方案,并不是说只有流处理任务是最好的。例如对于一些实时性要求不高的场景,例如定期汇总生成报表数据,这时候用批处理任务更加合适,因为这样对于系统资源的利用更充分,费用也更少。我比较过在Google云平台采用流处理和批处理的方式对报表数据进行处理,批处理可以大大节省费用。
下面以一个车辆里程报表的数据处理为例,介绍一下如何用Beam来编写一个批处理任务。
业务场景介绍
平台需要定期生成车辆上报的里程信息,并汇总为小时、天、月等不同时间粒度的里程数据。车辆上报的里程消息将通过Kafka平台传递。平台收到里程消息之后,需要每个小时进行一次处理,剔除异常的里程数据,计算每小时车辆的行驶里程。车辆的ETL消息和每小时的里程数据将保存为Parquet文件,以节省费用。
车辆的消息格式如下:
{"telemetry": {"odometer": {"odometer": 1234,}}, "timestamp": 1682563540419,"deviceId": "abc123",
}
生成Pipeline
这里基于JAVA来实现,首先新建一个项目,然后新建一个MileageBatch.java的文件,然后在里面定义一个Options接口,用于设置Pipeline运行时所需要的参数。
public interface Options extends PipelineOptions {@Description("Apache Kafka topic to read from.")@Default.String("test")@Validation.RequiredString getInputTopic();void setInputTopic(String value);@Description("ProjectID")@Default.String("gcpcvcsolutionintegdev01-c7bb")String getProjectId();@Description("Apache Kafka bootstrap servers in the form 'hostname:port'.")@Default.String("localhost:9092")String getBootstrapServer();void setBootstrapServer(String value);@Description("Define max_speed km/h for distance abnormal.")@Default.Integer(300)Integer getMaxSpeed();void setMaxSpeed(Integer value);@Description("Use start time to set start offset")long getStartTime();void setStartTime(long value);@Description("Use stop time to set end offset")long getStopTime();void setStopTime(long value);@Description("Use start time to filter out the required data")long getProcessStartTime();void setProcessStartTime(long value);@Description("Use start time to filter out the required data")long getProcessStopTime();void setProcessStopTime(long value);@Description("GS storage to store the raw data")@Default.String("gs://test/raw")String getBucket();void setBucket(String value);
}
读取Kafka消息
KafkaIO读取的消息默认是无边界的数据,但是Batch任务是处理的有边界数据,因此我们需要设定要处理的Kafka信息的开始和结束的offset。KafkaIO提供了withStartReadTime和withStopReadTime这两个函数来获取和指定时间戳相等或更大的对应的offset。例如我们要获取2023年5月28日9点到10点范围的数据,那么我们可以把9点和10点对应的时间戳传入这两个函数。有一点需要注意的是,如果KafkaIO找不到比大于指定时间戳的offset消息,那就会报错。因此通常对于withStopReadTime,我们最好先判断一下消息最晚到的时间戳是多少。以下代码是读取Kafka指定时间范围的数据并转换为PCollection
PCollection<String> input =pipeline.apply("Read messages from Kafka",KafkaIO.<String, String>read().withBootstrapServers(options.getBootstrapServer()).withTopic(options.getInputTopic()).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class).withConsumerConfigUpdates(ImmutableMap.of("group.id", "mileage_app")).withStartReadTime(Instant.ofEpochMilli(start_read_time)).withStopReadTime(Instant.ofEpochMilli(stop_read_time)).withoutMetadata()).apply("Get message contents", Values.<String>create());
解析并过滤消息
因为Kafka消息的时间戳和消息正文的时间戳通常是不一致的。因此在上一步获取的数据中,我们需要进一步过滤出我们需要的消息。例如车辆在8:59分发了一条消息,正文的时间戳是8:59分,但是消息到达Kafa的时候,消息的创建时间戳是9:00,那么我们需要丢弃这条消息。以下代码是把获取到的Kafka字符串消息处理为JSON对象,并过滤掉不符合要求的消息。
private static final Gson GSON = new Gson();
private static final TupleTag<UtilizationMsg> utilizationMsgTag = new TupleTag<UtilizationMsg>(){};
private static final TupleTag<String> otherMsgTag = new TupleTag<String>(){};PCollectionTuple msgTuple = input.apply("Filter message", ParDo.of(new DoFn<String, UtilizationMsg>() {@ProcessElementpublic void processElement(@Element String element, MultiOutputReceiver out) {UtilizationMsg msg = GSON.fromJson(element, UtilizationMsg.class);if (msg.timestamp==0 || msg.deviceId==null || msg.telemetry.odometer==null) {out.get(otherMsgTag).output(element);} else {if (msg.timestamp<start_process_time | msg.timestamp>stop_process_time) {out.get(otherMsgTag).output(element);} else {out.get(utilizationMsgTag).output(msg);}}}}).withOutputTags(utilizationMsgTag, TupleTagList.of(otherMsgTag))); // Get the filter out msg
PCollection<UtilizationMsg> utilizationMsg = msgTuple.get(utilizationMsgTag);
消息的格式定义如下:
public class TelemetryMsg {@DefaultCoder(AvroCoder.class)public static class UtilizationMsg {public long timestamp;public String deviceId;public Telemetry telemetry;}public static class Odometer {public int usageMode;public float odometer;}public static class Telemetry {public Odometer odometer;}
}
保存ETL数据
过滤处理完之后的数据,我们可以保存下来,以备以后数据分析之用。通常ETL的数据都是很多的,为了节省费用,我们可以把数据保存为Parquet格式。这里我把数据保存为Parquet,保存在GCS的文件桶中。在保存的时候,我们要把PCollection里面的数据转化为GenericRecord的格式才能保存,如以下代码:
// Save the raw records to Parquet files on GCS
utilizationMsg.apply("Convert to generic record", ParDo.of(new DoFn<UtilizationMsg, GenericRecord>() {@ProcessElementpublic void processElement(@Element UtilizationMsg element, OutputReceiver<GenericRecord> out) {GenericRecord record = new GenericData.Record(ReflectData.get().getSchema(UtilizationMsg.class));GenericRecord telemetry = new GenericData.Record(ReflectData.get().getSchema(TelemetryMsg.Telemetry.class));GenericRecord odometer = new GenericData.Record(ReflectData.get().getSchema(TelemetryMsg.Odometer.class));odometer.put("odometer", element.telemetry.odometer.odometer);odometer.put("usageMode", element.telemetry.odometer.usageMode);telemetry.put("odometer", odometer);record.put("timestamp", element.timestamp);record.put("deviceId", element.deviceId);record.put("telemetry", telemetry);out.output(record);//LOG.info("deviceId: {}, timestamp: {}, odometer:{}", element.deviceId, element.timestamp, element.telemetry.odometer.odometer);}})).setCoder(AvroCoder.of(GenericRecord.class, AvroCoder.of(UtilizationMsg.class).getSchema())).apply("Save to parquet files", FileIO.<GenericRecord>write().via(ParquetIO.sink(AvroCoder.of(UtilizationMsg.class).getSchema())).to(options.getBucket()).withSuffix(".parquet"));
数据分组
ETL得到的数据是每辆车的里程表的数据。我们需要对数据进行分组,按照车辆的vehicle_id来进行分组,以便后面步骤的计算。这里需要给数据添加Key,然后再按照Key来进行分组。
// Add deviceid as key and group
PCollection<KV<String, Iterable<UtilizationMsg>>> grouped_records = utilizationMsg.apply("Add DeviceID as Key", ParDo.of(new DoFn<UtilizationMsg, KV<String, UtilizationMsg>>() {@ProcessElementpublic void processElement(@Element UtilizationMsg element, OutputReceiver<KV<String, UtilizationMsg>> out) {out.output(KV.of(element.deviceId, element));//LOG.info("deviceId: {}, timestamp: {}, odometer:{}", element.deviceId, element.timestamp, element.telemetry.odometer.odometer);}})).apply(GroupByKey.<String, UtilizationMsg>create());
里程计算
最后我们可以对分组后的数据进行计算了。因为分组后的每一组数据代表某一辆车在一个小时内的所有里程表数据,所以我们可以对这个组的数据进行按照时间戳排序,然后以下一个时间点减上一个时间点的数据得到某一段时间的行驶里程,最后汇总即得到一个小时内的总里程。在计算某一段里程的时候,我们可以根据时速是否超过预定义的最大时速来排除掉问题的数据。代码如下:
// Calculate the distance
PCollectionTuple distance = grouped_records.apply("Calculate distance", ParDo.of(new DoFn<KV<String, Iterable<UtilizationMsg>>, DistanceObj>() {@ProcessElementpublic void processElement(@Element KV<String, Iterable<UtilizationMsg>> element, MultiOutputReceiver out) {Iterator<UtilizationMsg> iterator = element.getValue().iterator();List<UtilizationMsg> records = new ArrayList<UtilizationMsg>();while(iterator.hasNext()) {records.add(iterator.next());}Collections.sort(records, new UtilizationMsgCompare()); // Sort the records by timestampfloat total_distance = 0f;float pre_odometer = 0f;long pre_timestamp = 0L;Boolean has_abnormal_data = false;for (Iterator<UtilizationMsg> iter = records.iterator();iter.hasNext();) {UtilizationMsg record = (UtilizationMsg) iter.next();int usageMode = record.telemetry.odometer.usageMode;if (usageMode != min_usage_mode) {continue;}float odometer = record.telemetry.odometer.odometer;if (pre_odometer==0) {pre_odometer = odometer;pre_timestamp = record.timestamp;continue;}if (odometer >= pre_odometer) {float distance = (float) (record.telemetry.odometer.odometer - pre_odometer);float duration = (record.timestamp - pre_timestamp)/1000.0f; //secondsif(distance <= duration * max_speed) {total_distance += distance;} else {LOG.error("Odometer exceed speed limit: deviceId:{}, timestamp:{}, prevTimestamp:{}, odometer:{}, prevOdometer:{}", record.deviceId, String.valueOf(record.timestamp), String.valueOf(pre_timestamp), String.valueOf(odometer), String.valueOf(pre_odometer));has_abnormal_data = true;}} else {has_abnormal_data = true;LOG.error("Odometer same with previous: deviceId:{}, timestamp:{}, prevTimestamp:{}, odometer:{}, prevOdometer:{}", record.deviceId, String.valueOf(record.timestamp), String.valueOf(pre_timestamp), String.valueOf(odometer), String.valueOf(pre_odometer));}pre_odometer = odometer;pre_timestamp = record.timestamp;}DistanceObj d = new DistanceObj(element.getKey(), Long.parseLong(DateFormat.format(pre_timestamp)), (int)total_distance);if (!has_abnormal_data) {out.get(normalDistanceTag).output(d);}else {String errorMsg = String.format("Abnormal distance found for device: %s, period: %s - %s", element.getKey(), Instant.ofEpochMilli(start_process_time).toDateTime().toString(), Instant.ofEpochMilli(stop_process_time).toDateTime().toString());out.get(abnormalDistanceTag).output(errorMsg);}}}).withOutputTags(normalDistanceTag, TupleTagList.of(abnormalDistanceTag)));
保存里程数据
把计算得到的正常里程数据保存到数据库之中。这里我把数据保存到Postgresql数据库中
PCollection<DistanceObj> normalDistance = distance.get(normalDistanceTag);
// Save the distance records to PG
normalDistance.apply(JdbcIO.<DistanceObj>write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("org.postgresql.Driver", "jdbc:postgresql://127.0.0.1:5432/telematics").withUsername("postgres").withPassword("postgres")).withStatement("insert into distance Values (?, ?, ?, ?) ON CONFLICT (deviceId, hour) DO UPDATE SET (distance, process_time) = (excluded.distance, excluded.process_time);").withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<DistanceObj>() {public void setParameters(DistanceObj element, PreparedStatement query) throws SQLException {Timestamp process_time = new Timestamp(new Date().getTime());query.setString(1, element.getDeviceId());query.setString(2, String.valueOf(element.getHour()));query.setInt(3, element.getDistance());query.setString(4, process_time.toString());}}));
运行Pipeline
至此,整个Pipeline已经编写完成。最后就可以运行pipeline了,代码如下:
PipelineResult result = pipeline.run();
try {result.getState();result.waitUntilFinish();
} catch (UnsupportedOperationException e) {// do nothing
} catch (Exception e) {e.printStackTrace();
}