文章目录
- 一、Source
- 1、预定义Source
- 2、自定义Source【重要】
- 3、Kafka Source【重要】
- 二、Transformation-转换算子
- 1、union和connect-合并和连接
- 2、Side Outputs(侧道输出)--分流
一、Source
1、预定义Source
基于本地集合的source(Collection-based-source)【测试】
- 1.env.fromElements(可变参数);
- 2.env.fromColletion(各种集合);
- 3.env.fromSequence(开始,结束);
基于文件的source(File-based-source)
env.readTextFile(文件系统路径,包括hdfs路径);
基于网络套接字(socketTextStream)【测试】
socketTextStream(主机名, 端口) :非并行的Source
2、自定义Source【重要】
- SourceFunction:非并行数据源(并行度只能=1) --接口
- RichSourceFunction:多功能非并行数据源(并行度只能=1) --类
- ParallelSourceFunction:并行数据源(并行度能够>=1) --接口
- RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】
下面是代码案例:
package com.bigdata.source;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Random;
import java.util.UUID;@Data
@NoArgsConstructor
@AllArgsConstructor
class Order {private String orderId;private int uid;private int money;private long timeStamp;
}// 继承RichParallelSourceFunction,需要加泛型
class AutoCreatOrderSource extends RichParallelSourceFunction<Order> {boolean flag = true;/*** 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)* 要求:* - 随机生成订单ID(UUID)* - 随机生成用户ID(0-2)* - 随机生成订单金额(0-100)* - 时间戳为当前系统时间*/Random random = new Random();@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {String OrderId = UUID.randomUUID().toString();int UserId = random.nextInt(3);int money = random.nextInt(101);long ts = System.currentTimeMillis();Order order = new Order(OrderId, UserId, money, ts);sourceContext.collect(order);Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}
}public class CustomSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 将自定义的数据源放入到env中DataStreamSource<Order> dataStreamSource = env.addSource(new AutoCreatOrderSource());dataStreamSource.print();env.execute();}
}
3、Kafka Source【重要】
代码案例:
package com.bigdata.source;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;public class KafkaSource {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties properties = new Properties();properties.put("bootstrap.servers","node01:9092");properties.put("group.id", "g1");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("topicA",new SimpleStringSchema(),properties);DataStreamSource<String> stringDataStreamSource = env.addSource(kafkaConsumer);stringDataStreamSource.print();env.execute("KafkaSource");}
}
二、Transformation-转换算子
有map、flatMap、filter、keyBy、reduce、union和connect(合并和连接)、Side Outputs(侧道输出--分流)
1、union和connect-合并和连接
- union合并的DataSet的类型必须是一致的,可以取并集,但是不会去重
- connect可以连接2个不同类型的流(最后需要处理后再输出),和union类似,但是connect只能连接两个流,两个流之间的数据类型可以不同,对两个流的数据可以分别应用不同的处理逻辑
2、Side Outputs(侧道输出)–分流
举例说明:对流中的数据按照奇数和偶数进行分流,并获取分流后的数据
package com.bigdata.transformation;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/**@基本功能:@program:FlinkDemo@author: hang@create:2024-11-22 11:19:46**/
public class SideOutputs {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据DataStreamSource<Long> dataStreamSource = env.fromSequence(1, 100);// 定义两个标签OutputTag<Long> even = new OutputTag<>("偶数",TypeInformation.of(Long.class));OutputTag<Long> odd = new OutputTag<>("奇数", TypeInformation.of(Long.class));//3. transformation-数据处理转换SingleOutputStreamOperator<Long> process = dataStreamSource.process(new ProcessFunction<Long, Long>() {@Overridepublic void processElement(Long value, ProcessFunction<Long, Long>.Context ctx, Collector<Long> out) throws Exception {if (value % 2 == 0) {// 打上偶数标签ctx.output(even, value);} else {// 打上奇数标签ctx.output(odd, value);}}});//4. sink-数据输出// 从数据集中获取偶数的所有数据DataStream<Long> sideOutput = process.getSideOutput(even);sideOutput.print("偶数");// 从数据集中获取奇数的所有数据DataStream<Long> sideOutput1 = process.getSideOutput(odd);sideOutput1.print("奇数");//5. execute-执行env.execute();}
}