1、添加不同数据源
package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.WordCountData;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class EnvDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);
DataStream<String> text = env.fromSource(createDataGeneratorSource(), WatermarkStrategy.noWatermarks(), "datagen source");DataStream<Tuple2<String, Integer>> counts =text.flatMap(new WordCount.Tokenizer()).keyBy(0).sum(1);counts.print();env.execute("WordCount");}public static FileSource crateFileSource() {FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineInputFormat(),new Path("input/word.txt")).build();return fileSource;}public static KafkaSource<String> crateKafkaSource() {KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("ip-port").setTopics("topic").setGroupId("groupId").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();return kafkaSource;}public static DataGeneratorSource<String> createDataGeneratorSource() {DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>((GeneratorFunction<Long, String>) value -> "hello" + value,100, RateLimiterStrategy.perSecond(5), Types.STRING);return dataGeneratorSource;}
}
2、数据处理
package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.WordCount;
import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class DataProcessDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));SingleOutputStreamOperator<String> map = sensorDs.map(new MapFunction<WaterSensor, String>() {@Overridepublic String map(WaterSensor waterSensor) throws Exception {return waterSensor.getId() + " : " + waterSensor.getVc();}});
SingleOutputStreamOperator<WaterSensor> filter = sensorDs.filter(new FilterFunction<WaterSensor>() {@Overridepublic boolean filter(WaterSensor waterSensor) throws Exception {return waterSensor.getVc() > 1; }});
SingleOutputStreamOperator<String> flatMap = sensorDs.flatMap(new FlatMapFunction<WaterSensor, String>() {@Overridepublic void flatMap(WaterSensor waterSensor, Collector<String> collector) throws Exception {if ("s1".equals(waterSensor.getId())) {collector.collect(waterSensor.getId());} else {collector.collect(waterSensor.getId());collector.collect(waterSensor.getVc().toString());}}});
KeyedStream<WaterSensor, String> keyBy = sensorDs.keyBy(new KeySelector<WaterSensor, String>() {@Overridepublic String getKey(WaterSensor waterSensor) throws Exception {return waterSensor.getId(); }});
keyBy.reduce(new ReduceFunction<WaterSensor>() {@Overridepublic WaterSensor reduce(WaterSensor t2, WaterSensor t1) throws Exception {System.out.println("t1=" + t1);System.out.println("t2=" + t2);return new WaterSensor(t1.getId(), t1.getTs() + t2.getTs(), t1.getVc() + t2.getVc());}}).print();env.execute("WordCount");}
}
3、分流/合流
package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FenliuDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));SingleOutputStreamOperator<WaterSensor> oushu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 0);SingleOutputStreamOperator<WaterSensor> jishu = sensorDs.filter(waterSensor -> waterSensor.getVc() % 2 == 1);oushu.print("偶数流");jishu.print("奇数流");oushu.union(jishu).print("合并流");env.execute("WordCount");}
}
4、输出流 sink
package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class SinkDemo {public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();configuration.set(RestOptions.BIND_PORT, "8082");StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 设置流处理还是批处理env.setRuntimeMode(RuntimeExecutionMode.STREAMING);env.setParallelism(1);// 读取数据DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));FileSink<WaterSensor> fileSink = FileSink.<WaterSensor>forRowFormat(new Path("/Users/chengyong03/Downloads/output/flink"),new SimpleStringEncoder<>("UTF-8")).build();sensorDs.sinkTo(fileSink);env.execute("WordCount");}
}
package com.baidu.keyue.deepsight.memory.test;import com.baidu.keyue.deepsight.memory.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class SqlDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);DataStream<WaterSensor> sensorDs = env.fromElements(new WaterSensor("s1", 1L, 1),new WaterSensor("s1", 100L, 100),new WaterSensor("s1", 1000L, 1000),new WaterSensor("s3", 3L, 3));sensorDs.print();Table sensorTable = tableEnv.fromDataStream(sensorDs);tableEnv.createTemporaryView("sensorTable", sensorTable);Table resultTable = tableEnv.sqlQuery("select * from sensorTable where vc > 10");DataStream<WaterSensor> waterSensorDataStream = tableEnv.toDataStream(resultTable, WaterSensor.class);waterSensorDataStream.print();env.execute();}
}