StructuredStreamingKafka中的实时ETL案例及常见问题

devtools/2024/11/19 23:39:22/

实时ETL

在 java 项目中,导入 jar 包:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.50</version></dependency>
package com.bigdata.moni;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@NoArgsConstructor
@AllArgsConstructor
public class StationLog {private String stationId;private String callOut;private String callIn;private String callStatus;private long callTime;private int duration;}
package com.bigdata.smartedu;import com.alibaba.fastjson.JSON;
import com.bigdata.moni.StationLog;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.Random;public class SendStationLogProducer {public static void main(String[] args) throws InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");String[] arr = {"fail", "busy", "barring", "success", "success", "success","success", "success", "success", "success", "success", "success"};Random random = new Random();KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);while(true){String callOut = "1860000"+ String.format("%04d",random.nextInt(10000));String callIn = "1890000"+ String.format("%04d",random.nextInt(10000));String callStatus = arr[random.nextInt(arr.length)];int callDuration = "success".equals(callStatus) ? (1 + random.nextInt(10)) * 1000 : 0;// 随机产生一条基站日志数据StationLog stationLog = new StationLog("station_" + random.nextInt(10),callOut,callIn,callStatus,System.currentTimeMillis(),callDuration);// 将一个对象变为jsonString jsonString = JSON.toJSONString(stationLog);System.out.println(jsonString);ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("topicA",jsonString);kafkaProducer.send(producerRecord);Thread.sleep(2000);}//kafkaProducer.close();}
}

也可以 python 发送 kafka 数据(将以上 java 代码替换为 python 代码)

首先安装 kafka 环境:

pip install kafka-python

接着编写代码:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random
import timeclass StationLog:def __init__(self, station_id, call_out, call_in, call_status, timestamp, call_duration):self.station_id = station_idself.call_out = call_outself.call_in = call_inself.call_status = call_statusself.timestamp = timestampself.call_duration = call_durationdef to_string(self):return json.dumps(self.__dict__)def main():# 设置连接kafka集群的ip和端口producer = KafkaProducer(bootstrap_servers='bigdata01:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))arr = ["fail", "busy", "barring", "success", "success", "success", "success", "success", "success", "success", "success", "success"]while True:call_out = "1860000" + str(random.randint(0, 9999)).zfill(4)call_in = "1890000" + str(random.randint(0, 9999)).zfill(4)call_status = random.choice(arr)call_duration = 1000 * (10 + random.randint(0, 9)) if call_status == "success" else 0# 随机产生一条基站日志数据station_log = StationLog("station_" + str(random.randint(0, 9)),call_out,call_in,call_status,int(time.time() * 1000),  # 当前时间戳call_duration)print(station_log.to_string())time.sleep(0.1 + random.randint(0, 99) / 100)try:# 发送数据到Kafkaproducer.send('topicA', station_log.to_string())except KafkaError as e:print(f"Failed to send message: {e}")# 确保所有异步消息都被发送producer.flush()if __name__ == "__main__":main()

可以使用本地的 kafka-ui 的工具进行消费,查看是否可以正常发送和接收消息:

解压kafka-ui安装包,双击打开bin目录下的kafkaUI.bat(注意:一直保持打开的状态,不要关掉)

通过端口http://localhost:8889/#/进入ui界面

 

接着编写 pyspark 中的 StructStreaming 代码:

import osfrom pyspark.sql import SparkSession
import pyspark.sql.functions as Fif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("streamingkafka").config("spark.sql.shuffle.partitions", 2).getOrCreate()readDf = spark.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("subscribe", "topicA") \.load()#etlDf = readDf.selectExpr("CAST(value AS STRING)").filter(F.col("value").contains("success"))readDf.createOrReplaceTempView("temp_donghu")etlDf = spark.sql("""select cast(value as string) from temp_donghu where cast(value as string) like '%success%'""")etlDf.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("topic", "etlTopic") \.option("checkpointLocation", "./ckp") \.start().awaitTermination()spark.stop()

cast 函数可以将一个 byte[] 结构的字符串变为一个普通的字符串。

报错 : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

解决:这个是因为缺少了Kafka和Spark的集成包,前往https://mvnrepository.com/artifact/org.apache.spark

下载对应的jar包即可,比如我是SparkSql写入的Kafka,那么我就需要下载Spark-Sql-Kafka.x.x.x.jar

接着运行又报错:

Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializerat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 21 more

原因是缺少 kafka-clients.jar 包:

遇到如下错误:

Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdaterat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:418)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 44 more

在Spark 3.0.0环境下运行StructuredStreaming程序时遇到版本不兼容错误,需要额外添加commons-pools2和spark-token-provider-kafka jar包

将这些 jar 包都下载下来,放入 pyspark 中的 jars 目录下,代码即可运行

情况解决!!!


http://www.ppmy.cn/devtools/135324.html

相关文章

富格林:正确应付阻挠虚假交易

富格林指出&#xff0c;投资者进入现货黄金市场的第一选择&#xff0c;应该是要学会正确阻挠虚假交易应对市场风险。市场千变万化&#xff0c;投资风险也随之而来&#xff0c;几乎每天都会有数据或消息公布&#xff0c;这也就使得该市场变得十分活跃。投资者要想正确应付阻挠虚…

【jvm】为什么要用元空间替代永久代

目录 1. 说明2. 永久代的限制与问题2.1 内存管理限制2.2 垃圾收集效率2.3 类的卸载问题 3. 元空间的优势 1. 说明 1.Java使用元空间替代永久代&#xff0c;这一变化主要源于永久代在实现上存在的限制和问题&#xff0c;以及元空间所提供的更优性能和更高灵活性。2.Java使用元空…

ISUP协议视频平台EasyCVR私有化部署视频平台如何实现RTMP推流将大疆无人机的视频画面回传?

在现代视频监控和流媒体技术领域&#xff0c;EasyCVR视频融合云平台以其卓越的性能和灵活性&#xff0c;成为了跨区域、网络化视频监控综合管理的理想选择。作为TSINGSEE青犀视频“云边端”架构体系中的核心组件&#xff0c;私有化部署视频平台EasyCVR不仅能够实现视频数据的集…

基于Java Springboot宠物救助管理系统

一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据…

Android 开发与救砖工具介绍

Android 开发与救砖工具介绍 在 Android 开发和设备维护中&#xff0c;fastboot、adb 和 9008 模式是三个非常重要的工具和模式。它们各自有不同的用途和操作方式&#xff0c;对于开发者和技术支持人员来说&#xff0c;了解它们的功能和使用方法是必不可少的。 1. Fastboot …

Springboot采用jasypt加密配置

目录 前言 一、Jasypt简介 二、运用场景 三、整合Jasypt 2.1.环境配置 2.2.添加依赖 2.3.添加Jasypt配置 2.4.编写加/解密工具类 2.5.自定义加密属性前缀和后缀 2.6.防止密码泄露措施 2.61.自定义加密器 2.6.2通过环境变量指定加密盐值 总结 前言 在以往的多数项目中&#xff0…

沃丰科技呼叫中心质检:定义、重要性及选择策略

一、引言 随着客户服务行业的不断发展&#xff0c;呼叫中心成为了企业与客户之间沟通的重要桥梁。而呼叫中心质检&#xff0c;作为保障服务质量的关键环节&#xff0c;越来越受到企业的重视。本文将深入探讨呼叫中心质检的定义、重要性&#xff0c;以及如何选择适合企业需求的…

React 教程第一节 简介概述 以及 特点

概述&#xff1a; 一个用于构建web与原生交互界面的UI库&#xff0c;无论是独立开发者&#xff0c;还是团队协作&#xff0c;React 都可以轻松的组合由不同人群开发的组件&#xff0c;随写随插随用&#xff0c;方便快捷&#xff1b; 特点&#xff1a; 1、声明式设计&#xf…