前言
大数据应用开发——实时数据采集
大数据应用开发——实时数据处理
Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中
并在HBase中进行备份
大数据应用开发——数据可视化
hadoop,zookeeper,kafka,flink要开启
目录
题目
Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中
题目
按照任务书要求使用Java语言基于Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中,并在HBase中进行备份同时建立Hive外表,基于Flink完成相关的数据指标计算并将计算结果存入Redis、ClickHouse中
Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中
在IDEA下用maven创建flink项目:
# 用cmd执行,创建在当前目录下
# java版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=flink版本号# scala版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=flink版本号
修改pox.xml文件,将flink-connector-kafka_...依赖移出来
demo包下有两个.java
PS:一个用于批处理,另一个用于流处理
public class StreamingJob {public static void main(String[] args) throws Exception {// set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置发送的KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("master:9092").setTopics("order").setGroupId("my_group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// 配置接收的KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("master:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("dwd_order").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.NONE).build();// 指定的源创建一个数据流DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 将数据里的'符号去掉DataStream<String> text = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s.replace("'","");}});// 打印处理结果到控制台text.print();// 发送text.sinkTo(sink);// execute programenv.execute("Flink Streaming Java API Skeleton");}
}
将代码打包成.jar,可以先clean,再package
生成位置在当前项目位置/target/项目名称-...jar
放进主节点
# /usr/flink/bin/flink run -c 包名.运行class名 放在主节点的位置
/usr/flink/bin/flink run -c demo.StreamingJob /opt/flink-java-1.0-SNAPSHOT.jar
最后,可以用flink控制台或kafka-console-consumer.sh查看