大数据应用开发——实时数据处理(一)

server/2024/11/14 20:00:19/

前言

大数据应用开发——实时数据采集

大数据应用开发——实时数据处理

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

        并在HBase中进行备份

大数据应用开发——数据可视化

hadoop,zookeeper,kafka,flink要开启

目录

        题目

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中


题目

按照任务书要求使用Java语言基于Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中,并在HBase中进行备份同时建立Hive外表,基于Flink完成相关的数据指标计算并将计算结果存入Redis、ClickHouse中

Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

在IDEA下用maven创建flink项目:

# 用cmd执行,创建在当前目录下
# java版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=flink版本号# scala版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=flink版本号

修改pox.xml文件,将flink-connector-kafka_...依赖移出来

 demo包下有两个.java

PS:一个用于批处理,另一个用于流处理

public class StreamingJob {public static void main(String[] args) throws Exception {// set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置发送的KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("master:9092").setTopics("order").setGroupId("my_group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// 配置接收的KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("master:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("dwd_order").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.NONE).build();// 指定的源创建一个数据流DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 将数据里的'符号去掉DataStream<String> text = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s.replace("'","");}});// 打印处理结果到控制台text.print();// 发送text.sinkTo(sink);// execute programenv.execute("Flink Streaming Java API Skeleton");}
}

将代码打包成.jar,可以先clean,再package

生成位置在当前项目位置/target/项目名称-...jar

 放进主节点

# /usr/flink/bin/flink run -c 包名.运行class名 放在主节点的位置
/usr/flink/bin/flink run -c demo.StreamingJob /opt/flink-java-1.0-SNAPSHOT.jar

最后,可以用flink控制台或kafka-console-consumer.sh查看 


http://www.ppmy.cn/server/141939.html

相关文章

Chrome自动打开hao123解决

两个检查 检查是否主页被篡改 进入谷歌浏览器的设置&#xff0c;查看起始界面选项&#xff0c;看其是否被设置成hao123 如果有删掉它&#xff0c;改为自己想设置的主页或者关闭该选项。 检查是否快捷方式被动了手脚 如果在快捷方式目标位置后指定网址&#xff0c;会在打开浏…

pdf转excel;pdf中表格提取

一、问题描述 在工作中或多或少会遇到&#xff1a;需要将某份pdf中的表格数据提取出来&#xff0c;以便能够“修改使用”数据 可将pdf中的表格提取出来&#xff0c;解决办法还有点复杂 尤其涉及“pdf中表格不是标准的单元格”的时候&#xff0c;提取数据到excel不太容易 比…

效率工具-tig的使用

1、tig的安装 apt-get install tig 或者 brew install tig2、常用指令 2.1 tig 进入tig界面 在git 的repository 输入tig 进入tig界面 2.2 【 r 】进入 refs view 模式&#xff0c;查看所有分支&#xff0c;使用 【 j/k 】上下切换&#xff0c; 【 Enter 】查看分支演化 2.2…

HTML之列表学习记录

练习题&#xff1a; 图所示为一个问卷调查网页&#xff0c;请制作出来。要求&#xff1a;大标题用h1标签&#xff1b;小题目用h3标签&#xff1b;前两个问题使用有序列表&#xff1b;最后一个问题使用无序列表。 代码&#xff1a; <!DOCTYPE html> <html> <he…

自动化测试工具Ranorex Studio(三十二)-阅读RANOREX报告

在Ranorex Studio执行测试套件后&#xff0c;生成的报告文件以文件视图打开&#xff0c;如下图所示。 运行测试套件后的Ranorex报告 该报告在运行测试套件后&#xff0c;提供了一个通用的概述&#xff0c;多少测试用例执行成功&#xff0c;失败或被中断。 每个执行的测试用例及…

【网络安全】记一次APP登录爆破

使用工具 安卓12 jadx-gui 抓取登录HTTP请求包 安装burp证书&#xff0c;并抓取登录请求。 POST /loginUser HTTP/1.1 Host: api.xxxx.xxxxx.comapiaccountvrpuc-aaf91f835147ce2d01216bd3bd5c3516&phonexxxx&sign72C132B392873B3F4F6C0872E5EC4B5A&encM%2F8h…

单片机中的BootLoader(重要的概念讲解)

文章目录 一、链接地址和执行地址1. 链接地址(Load Address)2. 执行地址(Execution Address)链接地址与执行地址的关系实际工作流程总结二、相对跳转和绝对跳转1. 相对跳转(Relative Jump)2. 绝对跳转(Absolute Jump)3. `BX` 和 `BL` 指令总结三、散列文件1. 散列文件的…

【论文阅读】小样本学习相关研究

相关文献 Generalizing from a Few Examples: A Survey on Few-Shot Learning Author: YAQING WANG、QUANMING YAO、JAMES T. KWOK、LIONEL M. NIAbstract: Artificial intelligence succeeds in data-intensive applications, but it lacks the ability of learning from a …