《十堂课学习 Flink》第九章:Flink Stream 的实战案例一:CPU 平均使用率监控告警案例

news/2024/9/19 10:56:59/ 标签: 学习, flink, linq

9.1 本章概述

本章的所有需求、设计、开发仅是模拟真实业务场景,因为实际业务需求、现场环境更加复杂,并且考虑到本系列课程本身就偏向于基础内容,因此这里我们对自己假设的业务场景进行设计与开发,整个流程虽然简单,但涉及到的内容较多,通过这个案例可以初步了解整个flink流计算开发案例的基本过程。

9.2 需求描述

这个案例中,我们假设,需要对 多个机器集群 进行监控,即定期采样这 若干个集群 中的每台机器的CPU使用率,并将采样结果写入 kafka 。我们的任务是开发一个 flink 项目,监听 kafka 作为输入数据,并且满足 特定条件 时进行告警。

9.2.1 通讯流程

在这里插入图片描述
在这里插入图片描述

9.2.2 通讯协议 —— 采样数据

我们需要与负责采样的上游约定通讯协议,按照指定的 json 格式进行通讯。上游按照指定的格式将样本数据写入 kafka 后,flink 根据kafka消息进行聚合、触发检测、返回告警结果等。

{"taskId": "39xr4d2dnb9x72d","clusterId": "49xrt","itemId": "38fx2d","clusterSize": 4,"currentIndex": 0,"data": {"timestamp": 1715003462,"value": 0.43},"thresholdConfig": {"cpuUsageThresholdAverage": 0.93,"cpuUsageThresholdMax": 0.99}
}

其中,

  • taskId 是每个集群检测任务的唯一标识,同一个集群中的不同机器具有相同taskId,同一个集群中同一台机器在不同时间的采样结果对应的样本编号均不同。在排查问题时可以根据这个定位到哪条消息有问题等。
  • clusterId 是指集群编号,同一个集群中的不同机器具有相同的集群编号,集群编号不因为采样时间而改变。
  • itemId:集群中某机器的唯一标识;
  • clusterSize:集群大小,即集群中含有多少台机器。
  • currentIndex:当前消息在集群中的索引,从0开始。比如一个集群中共 4 台机器,索引分别为 0, 1, 2, 3。由于 kafka 集群、flink 集群等环境原因,接收到 kafka 的消息可能不会严格按照索引从小到大的顺序。
  • data:采样实体类,即某个时刻采样得到CPU使用率的值
    • timestamp: 采样时的时间戳(10位,精度为秒)
    • value:采样的值,即CPU使用率。
  • thresholdConfig:检查时的触发风险阈值,因为不同的业务场景下不同集群可能导致 CPU 使用率情况不同,所以应该对不同集群有相应的触发配置。
    • cpuUsageThresholdAverage:集群中平均CPU使用率阈值;
    • cpuUsageThresholdMax:集群中单个CPU使用率阈值。

9.2.3 通讯协议 —— 结果数据

经过检测以后,算法返回对集群的检查结果,检查规则我们在后面介绍。这里只定义大致的检查结果实体结构:

{"taskId": "xejcfl34w23mfs""clusterId": "49xrt","results": {"code": 0,"message": "success","data": {"riskType": 0,"timestamp": 1715003462,"riskItems": []}}
}

其中,

  • taskId:任务唯一编号,与接收到任务编号保持一致。
  • clusterId:集群编号。
  • results:结果实体类
    • code:结果通讯码,成功为 0 ,样本数据缺失为1,执行过程未知异常为 -1。
    • message:结果消息说明。
    • data:结果实体类
      • riskType:结果类型;
      • timestamp:检查数据的采样时间;
      • riskItems:有风险的机器编号链表,即输入数据中的 itemId

9.2.4 风险检查规则

对 CPU集群 进行检测,检测规则包括

    1. 不满足以下条件,无风险
    1. 集群中 CPU 采样数据是否存在缺失,比如 CPU 1 采集到了,CPU 2 采集不到
    1. 集群中 CPU 平均使用率是否超过阈值(阈值在接收数据中配置)
    1. 集群中单个CPU使用率是否超过阈值(阈值在接收数据中配置)

9.3 开发过程设计

首先这里引入相关依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.smileyan.demos</groupId><artifactId>flink-cpu-demo</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><scala.binary.version>2.12</scala.binary.version><lombok.version>1.18.30</lombok.version><flink.version>1.14.6</flink.version><slf4j.version>2.0.9</slf4j.version><logback.version>1.3.14</logback.version></properties><dependencies><!-- flink 相关 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.45</version></dependency><!-- 编译工具 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope></dependency><!-- log 相关 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>${logback.version}</version><scope>provided</scope></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>${logback.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>ch.qos.logback:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude><exclude>logback.xml</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>
</project>

9.3.1 main 方法入口

这里是总体流程,我们大概可以分为3个过程:

  1. 初始化kafka连接(包括source与sink对象)。
  2. 编写收集kafka数据以及窗口化过程,前面需求中有提到我们检测的是集群中各机器的CPU使用率情况。因此可能接收同个集群中的多条消息,需要进行窗口化聚合,进而进行检测。
  3. 检测过程,根据实际数据表现对风险等级进行评分。
  4. 返回风险结果到 kafka 中。
package cn.smileyan.demos;import cn.smileyan.demos.core.CpuCheckMapFunction;
import cn.smileyan.demos.core.TaskProcessingFunction;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskInput;
import cn.smileyan.demos.entity.TaskOutput;
import cn.smileyan.demos.core.CountAndTimeTrigger;
import cn.smileyan.demos.io.KafkaArgsBuilder;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;import java.util.Objects;/*** flink 任务入口* @author smileyan*/
public class CpuCheckJob {/*** 参数解释:*  -bs broker 地址 localhost:9092*  -kcg kafka consumer group*  -it kafka 输入数据 topic test-input-topic*  -ot kafka 输出数据 topic test-output-topic*  -ct 可选,是否自动创建 topic,使用方法 添加  -ct 即可,无需指定其值*  -pt topic 可选,分区数 1*  -rf topic 可选,副本数 1*  example:*  -bs localhost:9092 -it test-input-topic -ot test-output-topic -pt 1 -rf 1 -ct*/public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();final KafkaArgsBuilder kafkaArgsBuilder = new KafkaArgsBuilder(args);final KafkaSource<TaskInput> source = kafkaArgsBuilder.buildSource(TaskInput.class);final KafkaSink<TaskOutput> kafkaSink = kafkaArgsBuilder.buildSink(TaskOutput.class);final long gapSeconds = 10L;final DynamicEventTimeSessionWindows<TaskInput> dynamicWindow = DynamicEventTimeSessionWindows.withDynamicGap((SessionWindowTimeGapExtractor<TaskInput>) element -> gapSeconds * element.getClusterSize());final DataStreamSource<TaskInput> dataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");SingleOutputStreamOperator<TaskClusterData> mergedTaskData = dataStreamSource.filter(Objects::nonNull).keyBy(TaskInput::getTaskId).window(dynamicWindow).trigger(new CountAndTimeTrigger<>()).process(new TaskProcessingFunction()).name("taskProcessing");SingleOutputStreamOperator<TaskOutput> resultData = mergedTaskData.filter(Objects::nonNull).map(new CpuCheckMapFunction()).name("cpu usage check");resultData.sinkTo(kafkaSink);env.execute("Flink Kafka Example");}}

9.3.2 编写 kafka 序列化与反序列化类

package cn.smileyan.demos.io;import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;import java.nio.charset.StandardCharsets;/*** 将字节码数据进行序列化,以及将实体类转换* @author smileyan* @param <O> 实体类*/
@Slf4j
public class CommonEntitySchema<O> implements DeserializationSchema<O>, SerializationSchema<O> {private final Class<O> clazz;public CommonEntitySchema(Class<O> clazz) {this.clazz = clazz;}@Overridepublic O deserialize(byte[] message) {try {String str = new String(message, StandardCharsets.UTF_8);log.info("kafka received message: {}", str);return JSON.parseObject(str, clazz);} catch (Exception e) {log.error(e.getMessage());}return null;}@Overridepublic boolean isEndOfStream(O nextElement) {return false;}@Overridepublic TypeInformation<O> getProducedType() {return TypeInformation.of(clazz);}@Overridepublic byte[] serialize(O element) {return JSON.toJSONBytes(element);}
}

9.3.3 编写 kafka 的 source 与 sink 构建器

package cn.smileyan.demos.io;import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;/**
* 通过参数构建通用的 kafka 通讯序列化与反序列化实体
* @author smileyan
*/
@Slf4j
public class KafkaArgsBuilder {/*** 构建参数*/private final MultipleParameterTool parameterTool;public KafkaArgsBuilder(String[] args) {parameterTool = MultipleParameterTool.fromArgs(args);}/*** 构建kafka sink* @param clazz 实体类class* @param <E> 实体类泛型* @return kafka sink 对象*/public <E> KafkaSink<E> buildSink(Class<E> clazz) {final String bs = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);final String ot = parameterTool.getRequired(KafkaArgs.OUTPUT_TOPIC.key);return KafkaSink.<E>builder().setBootstrapServers(bs).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(ot).setValueSerializationSchema(new CommonEntitySchema<>(clazz)).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();}/*** 构建kafka source* @param clazz 实体类class* @param <E> 实体类泛型* @return kafka source 对象*/public <E> KafkaSource<E> buildSource(Class<E> clazz) throws ExecutionException, InterruptedException {final String kafkaConsumerGroup = parameterTool.getRequired(KafkaArgs.KAFKA_CONSUMER_GROUP.key);final String bootstrapServer = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);final String inputTopic = parameterTool.getRequired(KafkaArgs.INPUT_TOPIC.key);final boolean createTopic = parameterTool.has(KafkaArgs.CREATE_TOPIC.key);if (createTopic) {final int partition = parameterTool.getInt(KafkaArgs.CREATE_TOPIC_PARTITION.key, 1);final short replicationFactor = parameterTool.getShort(KafkaArgs.REPLICATION_FACTOR.key, (short) 1);createTopic(bootstrapServer, inputTopic, partition, replicationFactor);}return KafkaSource.<E>builder().setGroupId(kafkaConsumerGroup).setStartingOffsets(OffsetsInitializer.latest()).setBootstrapServers(bootstrapServer).setTopics(inputTopic).setValueOnlyDeserializer(new CommonEntitySchema<>(clazz)).build();}public enum KafkaArgs {/** kafka 服务地址*/BOOTSTRAP_SERVER("bs"),/** kafka 消费者组*/KAFKA_CONSUMER_GROUP("kcg"),/** kafka 输入主题*/INPUT_TOPIC("it"),/** kafka 输出主题*/OUTPUT_TOPIC("ot"),/** 是否自动创建主题*/CREATE_TOPIC("ct"),/** 分区数*/CREATE_TOPIC_PARTITION("pt"),/** 副本数*/REPLICATION_FACTOR("rf");private final String key;KafkaArgs(String key) {this.key = key;}}/*** 如果 TOPIC 不存在则创建该 TOPIC* @param bootstrapServer kafka broker 地址* @param topic 想要创建的 TOPIC* @param partitions 并行度* @param replicationFactor 副本数*/public static void createTopic(String bootstrapServer,String topic,int partitions,int replicationFactor) throws ExecutionException, InterruptedException {Properties adminProperties = new Properties();adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);try (AdminClient adminClient = AdminClient.create(adminProperties)) {if (!adminClient.listTopics().names().get().contains(topic)) {NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);adminClient.createTopics(Collections.singletonList(newTopic)).all().get();log.info("created topic: {}", topic);}}}
}

9.3.4 自定义 window 触发器

这里我们不使用默认的触发器,而是自定义一个更加方便的触发器。当接收到相同 taskId 的数据时,我们需要确定,什么时候确定接收完成,并触发检测过程。

需要注意的地方包括:

  1. 消息接收完成就触发。比如一个集群中总共有4台机器,当接收到这个4台机器的样本数据时,就应该触发检测过程。
  2. 消息接收到一半突然终止。比如一个集群中总共有4台机器,接收到3台机器的采样数据以后,等了很久没有收到第四条消息。等待超时后触发检查。
package cn.smileyan.demos.core;import cn.smileyan.demos.entity.TaskInput;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;/*** 自定义 window 触发器* @author smileyan*/
@Slf4j
public class CountAndTimeTrigger<T extends TaskInput, W extends TimeWindow> extends Trigger<T, W> {/*** ReducingStateDescriptor 的 key 字段,上下文根据这个字段获取状态指*/private static final String COUNT_KEY = "count";/*** ReducingStateDescriptor 根据聚合过程更新 count 结果*/private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>(COUNT_KEY, new Sum(), LongSerializer.INSTANCE);@Overridepublic TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {ctx.registerEventTimeTimer(window.getEnd());ctx.registerProcessingTimeTimer(window.getEnd());final int size = element.getClusterSize();final String id = element.getTaskId();ReducingState<Long> count = ctx.getPartitionedState(stateDesc);count.add(1L);log.info("[{}] window: ({}, {}) -> merged {}", ctx.getCurrentWatermark(), window.getStart(), window.getEnd(), count.get());if (count.get().intValue() == size) {log.info("[{} -> {}] merged successfully.", id, ctx.getCurrentWatermark());clear(window, ctx);ctx.getPartitionedState(stateDesc).clear();return TriggerResult.FIRE_AND_PURGE;} else if (count.get() > size) {log.warn("[{} -> {}] sent more than need {}", id, ctx.getCurrentWatermark(), size);return TriggerResult.PURGE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {if (time >= window.getEnd()) {log.debug("[ -> {}] onProcessingTime", ctx.getCurrentWatermark());return TriggerResult.FIRE_AND_PURGE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {if (time >= window.getEnd()) {log.debug("[ -> {}] onEventTime", ctx.getCurrentWatermark());return TriggerResult.FIRE_AND_PURGE;}return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {log.debug("[ -> {}] cleaning window ({}, {})", ctx.getCurrentWatermark(), window.getStart(), window.getEnd());ctx.deleteEventTimeTimer(window.getEnd());ctx.deleteProcessingTimeTimer(window.getEnd());}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {log.debug("[ -> {}] onMerge ({}, {})", ctx.getCurrentWatermark(), window.getStart(), window.getEnd());ctx.mergePartitionedState(stateDesc);}private static class Sum implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return value1 + value2;}}
}

此处有两个地方需要额外强调:

  1. 必须重写 canMerge 与 onMerge 方法。
  2. 必须使用ReducingStateDescriptor状态描述器而不是对象的属性。这里牵扯到 flink 的特性,比如4条消息每到来一次,对应的应该是再次创建 CountAndTimeTrigger 对象进行 trigger 检查,并根据检查结果决定是否触发风险。

9.3.5 将集群中每台机器的采样结果进行合并

package cn.smileyan.demos.core;import cn.smileyan.demos.entity.CpuDataItem;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskInput;
import cn.smileyan.demos.entity.ThresholdConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;/*** 合并任务数据为集群数据* @author smileyan*/
@Slf4j
public class TaskProcessingFunction extends ProcessWindowFunction<TaskInput, TaskClusterData, String, TimeWindow> {@Overridepublic void process(String key,ProcessWindowFunction<TaskInput, TaskClusterData, String, TimeWindow>.Context context,Iterable<TaskInput> elements,Collector<TaskClusterData> out) throws Exception {log.info("[{}] starting merge processing", key);final List<CpuDataItem> cpuDataItems = new LinkedList<>();Iterator<TaskInput> inputIterator = elements.iterator();TaskInput first = inputIterator.next();cpuDataItems.add(new CpuDataItem(first));String clusterId = first.getClusterId();String taskId = first.getTaskId();Integer clusterSize = first.getClusterSize();ThresholdConfig thresholdConfig = first.getThresholdConfig();while(inputIterator.hasNext()) {cpuDataItems.add(new CpuDataItem(inputIterator.next()));}log.info("[{}] finished merge processing", key);out.collect(new TaskClusterData(taskId, clusterId, clusterSize, thresholdConfig, cpuDataItems));}
}

9.3.6 最核心的风险检查过程

package cn.smileyan.demos.core;import cn.smileyan.demos.entity.CpuDataItem;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskOutput;
import cn.smileyan.demos.entity.TaskResult;
import cn.smileyan.demos.entity.TaskResultData;
import org.apache.flink.api.common.functions.MapFunction;import java.util.LinkedList;
import java.util.List;
import java.util.OptionalDouble;
import java.util.stream.Collectors;/*** 对 CPU集群 进行检测,检测规则包括*    0. 不满足以下条件,无风险*    1. 集群中 CPU 采样数据是否存在缺失,比如 CPU 1 采集到了,CPU 2 采集不到*    2. 集群中 CPU 平均使用率是否超过阈值(阈值在接收数据中配置)*    3. 集群中单个CPU使用率是否超过阈值(阈值在接收数据中配置)* @author smileyan*/
public class CpuCheckMapFunction implements MapFunction<TaskClusterData, TaskOutput> {@Overridepublic TaskOutput map(TaskClusterData taskClusterData) {TaskOutput taskOutput = new TaskOutput();taskOutput.setTaskId(taskClusterData.getTaskId());taskOutput.setClusterId(taskClusterData.getClusterId());TaskResult taskResult = new TaskResult();taskOutput.setResults(taskResult);TaskResultData taskResultData = new TaskResultData();taskResultData.setTimestamp(taskClusterData.getCpuDataItems().get(0).getTimestamp());List<String> items = taskClusterData.getCpuDataItems().stream().map(CpuDataItem::getItemId).collect(Collectors.toList());/** 1. 集群中 CPU 采样数据是否存在缺失,比如 CPU 1 采集到了,CPU 2 采集不到*/if (taskClusterData.getClusterSize() != taskClusterData.getCpuDataItems().size()) {taskResult.setCode(ResultCodeEnum.MISSING.getCode());taskResult.setMessage(ResultCodeEnum.MISSING.getMessage());return taskOutput;}taskResult.setCode(ResultCodeEnum.SUCCESS.getCode());taskResult.setMessage(ResultCodeEnum.SUCCESS.getMessage());taskResultData.setRiskType(RiskTypeEnum.NONE.getValue());/** 2. 集群中 CPU 平均使用率是否超过阈值(阈值在接收数据中配置)*/OptionalDouble average = taskClusterData.getCpuDataItems().stream().mapToDouble(CpuDataItem::getValue).average();if (average.isPresent()) {if (average.getAsDouble() > taskClusterData.getThresholdConfig().getCpuUsageThresholdAverage()) {taskResultData.setRiskItems(items);taskResultData.setRiskType(RiskTypeEnum.CPU_USAGE_AVERAGE.getValue());return taskOutput;}} else {taskResult.setCode(ResultCodeEnum.UNKNOWN_ERROR.getCode());taskResult.setMessage(ResultCodeEnum.UNKNOWN_ERROR.getMessage());return taskOutput;}// 3. 集群中单个CPU使用率是否超过阈值(阈值在接收数据中配置)List<String> riskItems = new LinkedList<>();for (CpuDataItem cpuDataItem : taskClusterData.getCpuDataItems()) {if (cpuDataItem.getValue() > taskClusterData.getThresholdConfig().getCpuUsageThresholdMax()) {riskItems.add(cpuDataItem.getItemId());}}if (!riskItems.isEmpty()) {taskResultData.setRiskItems(riskItems);taskResultData.setRiskType(RiskTypeEnum.CPU_USAGE_MAX.getValue());return taskOutput;}return taskOutput;}
}

9.3.7 其他部分代码未展示

由于篇幅问题,这里省略了一部分不那么重要的代码,具体内容请参考 https://gitee.com/smile-yan/flink-cpu-demo

9.4 编写测试

9.4.1 测试数据准备

我们准备了一些测试数据,主要包括以下几种情况:

  1. 数据 size 不够聚合,比如期望达到 4 条消息时,才进行合并并检测;
  2. 数据 size 足够聚合,并且数据正常;
  3. 数据 size 足够聚合,但存在个别数据有风险;
  4. 数据 size 足够聚合,所有数据都存在风险。

具体内容请参考我的开源地址:https://gitee.com/smile-yan/flink-cpu-demo

9.4.2 测试脚本准备

我们需要将前面准备的 json 数据按照顺序写入 kafka ,以触发数据的聚合以及检测。

在这里插入图片描述

这里我们准备了一份python 脚本,将 json 文件写入 kafka 中。

import os
import sysfrom kafka import KafkaProducer
import jsonif __name__ == '__main__':kafka_broker, kafka_topic = sys.argv[1], sys.argv[2]files_dir = sys.argv[3]producer = KafkaProducer(bootstrap_servers=kafka_broker, value_serializer=lambda v: json.dumps(v).encode('utf-8'))files = os.listdir(files_dir)for file in files:if file.endswith(".json"):with open(f'{files_dir}/{file}', 'r') as f:data = json.load(f)producer.send(kafka_topic, data)producer.flush()producer.close()

注意,这里我们考虑到不同用户的存放数据的位置不同、绝对路径不同,将存放数据的路径、kafka 的相关参数以参数的形式存放在 main 方法的参数中,其中:

  • 参数 1:kafka 的服务地址,比如 localhost:9092
  • 参数 2:kafka 的通讯 topic,也就是 flink 任务监听的 topic。比如 input-test-data
  • 参数 3:json 文件所在文件夹,前面有提到四个场景中存放在四个文件夹中。比如 /Users/smileyan/me/flink-cpu-demo/scripts/normal

flink__791">9.4.3 运行 flink 任务

运行时需要注意启动参数:

-bs
localhost:9092
-kcg
flink-consumer
-it
test-input-topic
-ot
test-output-topic
-ct

在这里插入图片描述
运行时还需要添加
在这里插入图片描述

在这里插入图片描述

9.5 运行效果展示

9.5.1 启动 kafka 过程

这个过程不再重复介绍,本地启动 kafka 两行命令即可。

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties

flink__kafka__820">9.5.2 运行 flink 与 kafka 过程录屏

flink-kafka检测CPU使用率案例

这里案例中,我们通过脚本将每个文件夹中的文件发送到 kafka 中,触发检测过程。

在这里插入图片描述

9.5.3 源码地址

https://gitee.com/smile-yan/flink-cpu-demo

除了 java 源码,还包括生成测试数据的脚本,以及发送数据的脚本,以及已经生成的数据文件夹。

9.5 总结

本章内容提供了一个非常简单的Flink Stream 计算案例,涉及内容包括:kafka 通讯,flink 的动态时间窗口,自定义窗口聚合条件以及 flink 的窗口聚合处理等。此外,本章通过录屏的方式验证整个项目运行正常。

希望作为 flink 的初学者能够提供一个简单案例。感谢各位小伙伴们的支持 ~ 共勉 ~

如果认为本章节写得还行,一定记得点击下方免费的赞 ~ 感谢 !
在这里插入图片描述

Smileyan
2024.05.12 18:19


http://www.ppmy.cn/news/1460439.html

相关文章

项目管理-计算题公式【复习】

1.【进度】相关公式 1.1三点估算 PERT 三点估算法是基于 任务成本的三种估算值&#xff08;最可能成本CM&#xff0c;最乐观成本CO&#xff0c;最悲观成本CP&#xff09;来计算预期成本的方法。 三角 分布&#xff1a;预期成本&#xff08;最乐观成本最可能成本最悲观成本&am…

java项目之企业OA管理系统源码(springboot+vue+mysql)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的企业OA管理系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; 企业OA管理系统的主要使用…

使用 Parallels Desktop 在 Mac 上畅玩 PC 游戏

我们不再需要接受 “Mac 不是为游戏而打造” 这一事实&#xff1b;Parallels Desktop 通过将电脑变成高性能的游戏设备&#xff0c;从而改变了一切。 Parallels Desktop 充分利用 Mac 硬件的强大功能&#xff0c;让您无缝畅玩 Windows 专享游戏。 性能得到提升&#xff0c;可玩…

sql update 多表关联 inner join

当您需要更新一个表或者多个表中的数据&#xff0c;而多个表又存在关联时&#xff0c;可以使用 INNER JOIN 子句将多个表关联起来&#xff0c;并使用 SET更新。 格式如下&#xff1a; UPDATE table1 INNER JOIN table2 ON table1.column1 table2.column1 SET table1.column2…

蓝桥杯成绩已出

蓝桥杯的成绩早就已经出来了&#xff0c;虽然没有十分惊艳 &#xff0c;但是对于最终的结果我是心满意足的&#xff0c;感谢各位的陪伴&#xff0c;关于蓝桥杯的刷题笔记我已经坚持更新了49篇&#xff0c;但是现在即将会告别一段落&#xff0c;人生即将进入下一个规划。我们一起…

Django国际化与本地化指南

title: Django国际化与本地化指南 date: 2024/5/12 16:51:04 updated: 2024/5/12 16:51:04 categories: 后端开发 tags: Django-i18n本地化-L10n多语言国际化翻译工具表单验证性能优化 引言 在数字化时代&#xff0c;网站和应用程序必须跨越地域限制&#xff0c;服务于全球…

【算法刷题day44】Leetcode:518. 零钱兑换 II、377. 组合总和 Ⅳ

文章目录 Leetcode 518. 零钱兑换 II解题思路代码总结 Leetcode 377. 组合总和 Ⅳ解题思路代码总结 草稿图网站 java的Deque Leetcode 518. 零钱兑换 II 题目&#xff1a;518. 零钱兑换 II 解析&#xff1a;代码随想录解析 解题思路 先遍历物品&#xff0c;再遍历背包。 代码…

【教程向】从零开始创建浏览器插件(二)深入理解 Chrome 扩展的 manifest.json 配置文件

第二步&#xff1a;深入理解 Chrome 扩展的 manifest.json 配置文件 上一次我们已经着手完成了一个自己的浏览器插件&#xff0c;链接在这里&#xff1a;我是链接 在本篇博客中&#xff0c;我们将更详细地探讨 Chrome 扩展中的 manifest.json 文件。这个文件是每个浏览器扩展…

除了GET方法,新的HTTP QUERY方法规范发布

近日IETF提交新规范讨论&#xff1a;定义了一种新的 HTTP 方法QUERY&#xff0c;QUERY方法作为一种安全、幂等的请求方法&#xff0c;可以携带请求内容。 大多数情况下&#xff0c;当请求中传送的数据量太大而无法编码到请求的 URI 中时&#xff0c;推荐采取QUERY方式。 例如&a…

中金:如何把握不断轮动的资产“风口”

从比特币到日股&#xff0c;到黄金与铜再到当前的港股&#xff0c;每次超预期大涨后都透支回调。 今年以来资产的“风口”不断轮动&#xff0c;从比特币到日股&#xff0c;到黄金与铜&#xff0c;再到当前的港股&#xff0c;资产仿佛“接力”般交替领先&#xff0c;同时“风口”…

20240506金融读报:知识产权质押贷款CBAM碳关税统一授信系统低代码助手二季度金融政策趋势受益人管理法

1、北京农商银行知识产权质押贷款&#xff0c;同业是如何评审这个的额度呢&#xff0c;抄&#xff01; 2、欧盟碳边境调节机制&#xff08;CBAM&#xff09;&#xff08;企业需要支付与产品碳足迹相关的税务成本或退还碳排放配额&#xff09;已通过一年。此类单边措施不利于竞品…

百度云内容审核快速配置 (java)

为什么要选择百度云 &#xff1f; 因为他免费用一年 首先要先开通百度云内容安全服务 按照操作指引走完整套 ContentCensor Java SDK目录结构** com.baidu.aip├── auth //签名相关类├── http //Http通…

uniapp开发小程序使用vue的v-html解析富文本图片过大过宽显示超过屏幕解决办法

如果没有设置的话&#xff0c;就会导致图片溢出&#xff0c;过宽显示或者错位显示&#xff0c;显示效果非常的丑陋&#xff1a; 修改后显示的效果&#xff1a; 网上比较low的解决办法&#xff1a;网上各种解决方法核心思想就是在数据层把数据模板上的img数据加上style样式&…

数据结构与算法===回溯法

文章目录 原理使用场景括号生成代码 小结 原理 回溯法是采用试错的思想&#xff0c;它尝试分步骤的去解决一个问题。在分步骤解决问题的过程中&#xff0c;当它通过尝试发现现有的分步答案不能得到有效的正确的解答的时候&#xff0c;它将取消上一步甚至是上几步的计算&#x…

运维:SSH常用命令简介

SH&#xff0c;全称为Secure Shell&#xff0c;是建立在应用层和传输层基础上的安全协议。SSH 是目前较可靠&#xff0c;专为远程登录会话和其他网络服务提供安全性的协议。利用 SSH 协议可以有效防止远程管理过程中的信息泄露问题。通过 SSH 可以对所有传输的数据进行加密&…

“A”分心得:我的云计算HCIE学习之路

大家好&#xff0c;我是誉天云计算HCIE周末班梁同学&#xff0c;在誉天老师和同学们的帮助下&#xff0c;我终于在4月24日顺利通过了云计算3.0 HCIE的认证考试&#xff0c;而且获得了A&#xff0c;这是让我特别惊喜的&#xff0c;功夫不负有心人。 我日常的工作是网络运维&…

MySQL查询篇-聚合函数-窗口函数

文章目录 distinct 关键字聚合函数常见的聚合函数group by和having 分组过滤 窗口函数with as窗口聚合函数排名窗口函数值窗口函数 distinct 关键字 distinct 去重数据&#xff0c;ps:null值也会查出来 select distinct column from table;聚合函数 常见的聚合函数 select …

运维:CentOS常见命令详解

CentOS&#xff08;Community ENTerprise Operating System&#xff0c;中文意思是社区企业操作系统&#xff09;是Linux发行版之一&#xff0c;它是来自于Red Hat Enterprise Linux依照开放源代码规定释出的源代码所编译而成。由于出自同样的源代码&#xff0c;因此有些要求高…

前端AJAX与后台交互技术知识点及案例(续2)

以下笔记均为学习哔站黑马程序员AJAX视频所得&#xff01;&#xff01;&#xff01; AJAX作用&#xff1a;浏览器和服务器之间通信&#xff0c;动态数据交互 axios函数 先引入axios库&#xff0c;可在bootcdn中寻找相关js文件或者对应的script标签 axios({url:http://hmajax…

MFC编程之设计美丽的对话框

目录 写在前面&#xff1a; Part 1&#xff1a;美美的设计一下计算器的布局 1.描述文字&#xff1a; ​编辑 2.ID&#xff1a; Part 2&#xff1a;美美熟悉一下计算器的工作流程 Part 3&#xff1a;美美设计一下控件功能 1.edit control&#xff1a; 2.相关变量初始化&…