大数据应用开发——实时数据处理(一)

ops/2024/11/17 21:43:48/

前言

大数据应用开发——实时数据采集

大数据应用开发——实时数据处理

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

        并在HBase中进行备份

大数据应用开发——数据可视化

hadoop,zookeeper,kafka,flink要开启

目录

        题目

        Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中


题目

按照任务书要求使用Java语言基于Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中,并在HBase中进行备份同时建立Hive外表,基于Flink完成相关的数据指标计算并将计算结果存入Redis、ClickHouse中

Flink完成Kafka中的数据消费,将数据分发至Kafka的dwd层中

在IDEA下用maven创建flink项目:

# 用cmd执行,创建在当前目录下
# java版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=flink版本号# scala版本
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=flink版本号

修改pox.xml文件,将flink-connector-kafka_...依赖移出来

 demo包下有两个.java

PS:一个用于批处理,另一个用于流处理

public class StreamingJob {public static void main(String[] args) throws Exception {// set up the streaming execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置发送的KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("master:9092").setTopics("order").setGroupId("my_group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();// 配置接收的KafkaSink<String> sink = KafkaSink.<String>builder().setBootstrapServers("master:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("dwd_order").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.NONE).build();// 指定的源创建一个数据流DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 将数据里的'符号去掉DataStream<String> text = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String s) throws Exception {return s.replace("'","");}});// 打印处理结果到控制台text.print();// 发送text.sinkTo(sink);// execute programenv.execute("Flink Streaming Java API Skeleton");}
}

将代码打包成.jar,可以先clean,再package

生成位置在当前项目位置/target/项目名称-...jar

 放进主节点

# /usr/flink/bin/flink run -c 包名.运行class名 放在主节点的位置
/usr/flink/bin/flink run -c demo.StreamingJob /opt/flink-java-1.0-SNAPSHOT.jar

最后,可以用flink控制台或kafka-console-consumer.sh查看 


http://www.ppmy.cn/ops/134540.html

相关文章

SpringCloud Feign 报错 Request method ‘POST‘ not supported 的解决办法

Request method POST not supportedorg.springframework.web.HttpRequestMethodNotSupportedException: Request method POST not supported解决办法: 在远程调用fegin使用GET请求时 应该附加注解 RequestParam(“pgQuery”) 实体类或者单个参数同样适用 在controller接受参数…

处理继承自QWidget类的自定义类背景样式不生效问题【Qt】

处理继承自QWidget类的自定义类背景样式不生效问题 问题解答 问题 问题抛出&#xff1a;   当我们定义一个自定义类&#xff0c;并且继承自QWidget类&#xff1a;   为我们的自定义类进行构造&#xff1a;   这是运行后的表现&#xff0c;其中每一份测试人物&#xff…

富格林:正确应付阻挠虚假交易

富格林指出&#xff0c;投资者进入现货黄金市场的第一选择&#xff0c;应该是要学会正确阻挠虚假交易应对市场风险。市场千变万化&#xff0c;投资风险也随之而来&#xff0c;几乎每天都会有数据或消息公布&#xff0c;这也就使得该市场变得十分活跃。投资者要想正确应付阻挠虚…

批量重命名Excel文件并排序

批量重命名Excel文件并排序 python环境&#xff1a;3.5.2 import os import logging# 配置日志记录 logging.basicConfig(levellogging.INFO, format%(asctime)s - %(levelname)s - %(message)s)def rename_files_with_sequence(directory):# 检查文件夹是否存在if not os.pa…

【python】掌握 Flask:轻量级 Web 开发框架解析

【Python】掌握 Flask&#xff1a;轻量级 Web 开发框架解析 引言 Flask 是一个轻量级、灵活且广受欢迎的 Python Web 开发框架。它以其简单易用、模块化和强大的扩展功能而闻名&#xff0c;适合用于小型应用和快速原型设计。同时&#xff0c;Flask 提供了一系列工具和库&…

【小白可懂】微信小程序---课表渲染

结果展示&#xff1a;&#xff08;代码在最后&#xff09; WeChat_20241116174431 项目简介 在数字化校园建设的大背景下&#xff0c;为了更好地服务于在校师生&#xff0c;我们开发了一款基于微信小程序的课表管理系统。该系统采用了现代化的前端技术和优雅的设计风格&#x…

6. Keepalived配置Nginx自动重启,实现7x24提供服务

一. Keepalived配置Nginx自动重启,实现7x24提供服务 1.编写不停的检查nginx服务器状态,停止并重启,重启失败后则停止keepalived脚本 cd /etc/keepalived/ vim check_nginx_alive_or_not.sh #---内容如下:--------------- #!/bin/bash A=`ps -C nginx --no-header |wc -l

23 种设计模式详解

设计模式的分类 总体来说设计模式分为三大类&#xff1a; 创建型模式&#xff0c;共五种&#xff1a;单例模式、工厂方法模式、抽象工厂模式、建造者模式、原型模式。 结构型模式&#xff0c;共七种&#xff1a;适配器模式、装饰器模式、代理模式、外观模式、桥接模式、 组合模…