基于Spark3.4.4开发StructuredStreaming读取socket数据

news/2024/11/24 16:22:39/

maven依赖文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.lh.pblh123</groupId><artifactId>spark2024</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><!--    设置国内maven下载镜像源--><repositories><repository><id>alimaven</id><name>aliyun maven</name><url>https://maven.aliyun.com/repository/public</url></repository></repositories><dependencies><dependency> <!-- Spark dependency --><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.4.4</version><exclusions>  <!--设置日志级别--><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency> <!-- Spark dependency --><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.4.4</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.4.4</version> <!-- 请根据实际版本调整 --></dependency><!--        添加spark streaming依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.4.4</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal></goals></execution></executions><configuration><scalaVersion>2.12.17</scalaVersion><args><arg>-target:jvm-1.8</arg></args></configuration></plugin></plugins></build></project>

源码如下:

package cn.lh.pblh123.spark2024.theorycourse.charpter8import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{StreamingQueryException, Trigger}object StructureNetworkWordCount {def main(args: Array[String]): Unit = {if (args.length != 1 || args(0).trim.isEmpty) {System.err.println(s"Usage: ${this.getClass.getSimpleName} <master_url>")System.exit(5)}val murl = args(0)val spark = SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master(murl).getOrCreate()// 从配置文件或环境变量中读取主机名和端口号val host = sys.env.getOrElse("SOCKET_HOST", "localhost")val port = sys.env.getOrElse("SOCKET_PORT", "9999").toInttry {val lines = readSocketStream(spark, host, port)import spark.implicits._// 导入Spark隐式转换,使得可以使用Spark SQL和Dataset相关操作val words = lines.as[String].flatMap(_.split(" "))val wordCounts = words.groupBy("value").count()val query = wordCounts.writeStream.outputMode("complete").format("console").trigger(Trigger.ProcessingTime("5 seconds")).start()query.awaitTermination()} catch {case e: StreamingQueryException =>println(s"Streaming query failed with exception: ${e.getMessage}")// 可以在这里添加更多的错误处理逻辑,例如重试机制case e: Exception =>println(s"An unexpected error occurred: ${e.getMessage}")// 可以在这里添加更多的错误处理逻辑}spark.stop()}/*** 读取 Socket 流数据** @param spark SparkSession 实例* @param host  主机名* @param port  端口号* @return 读取的 DataFrame*/def readSocketStream(spark: SparkSession, host: String, port: Int): org.apache.spark.sql.DataFrame = {// 读取来自指定主机和端口的socket数据流spark.readStream.format("socket").options(Map("host" -> host, "port" -> port.toString)).load()}//    待优化代码如下//    val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()}

终端启动nc服务

(base) pblh123@LeginR7:~$ nc -lk 9999
i like hadoop
i like spark
你好世界 你

代码运行效果如下,需要先启动nc服务后在启动


http://www.ppmy.cn/news/1549584.html

相关文章

深度学习day1-Tensor 1

深度学习 一 初识Torch 1基础介绍 PyTorch是一个基于Python的深度学习框架&#xff0c;最初由Facebook开发&#xff0c;广泛用于计算机视觉、自然语言处理、语音识别等领域。用张量&#xff08;tensor&#xff09;来表示数据&#xff0c;可以在GPU上加速&#xff0c;处理大规…

软件工程9、10章小测

一 单项选择题(8分) 1、软件体系结构定义为&#xff08;&#xff09;&#xff08;1分&#xff09; {component, connector, configuration} {models, connector} {object, collaboration, message, } 正确答案&#xff1a;{component, connector, configuration} 2、软件产品…

(长期更新)《零基础入门 ArcGIS(ArcMap) 》实验一(下)----空间数据的编辑与处理(超超超详细!!!)

续上篇博客&#xff08;长期更新&#xff09;《零基础入门 ArcGIS(ArcMap) 》实验一&#xff08;上&#xff09;----空间数据的编辑与处理&#xff08;超超超详细&#xff01;&#xff01;&#xff01;&#xff09;-CSDN博客 继续更新 目录 什么是拓扑&#xff1f; 1.3.5道路…

【深度学习之回归预测篇】 深度极限学习机DELM多特征回归拟合预测(Matlab源代码)

深度极限学习机 (DELM) 作为一种新型的深度学习算法&#xff0c;凭借其独特的结构和训练方式&#xff0c;在诸多领域展现出优异的性能。本文将重点探讨DELM在多输入单输出 (MISO) 场景下的应用&#xff0c;深入分析其算法原理、性能特点以及未来发展前景。 1、 DELM算法原理及其…

《用 Pygame 制作浪漫的告白气球》

《用 Pygame 制作浪漫的告白气球》 表白是一件浪漫且需要仪式感的事情&#xff01;如果你是一位程序员&#xff0c;为什么不亲手为特别的人制作一个专属的告白动画呢&#xff1f;今天&#xff0c;我将带你用 Python 的 Pygame 库制作一个简单且充满浪漫气息的 “告白气球”动画…

MySql四种事务隔离级别以及使用什么隔离级别可以解决幻读

一、四种事务隔离级别 InnoDB 存储引擎支持四种事务隔离级别&#xff0c;每种级别的主要目的是为了在多用户环境中保证数据的一致性和并发性。以下是这四种隔离级别的名称及其之间的主要区别&#xff1a; 1. 读未提交 定义&#xff1a;这是最低的隔离级别&#xff0c;在这种模…

Halo 正式开源: 使用可穿戴设备进行开源健康追踪

在飞速发展的可穿戴技术领域&#xff0c;我们正处于一个十字路口——市场上充斥着各式时尚、功能丰富的设备&#xff0c;声称能够彻底改变我们对健康和健身的方式。 然而&#xff0c;在这些光鲜的外观和营销宣传背后&#xff0c;隐藏着一个令人担忧的现实&#xff1a;大多数这些…

如何在Word文件中设置水印以及如何禁止修改水印

在日常办公和学习中&#xff0c;我们经常需要在Word文档中设置水印&#xff0c;以保护文件的版权或标明文件的机密性。水印可以是文字形式&#xff0c;也可以是图片形式&#xff0c;能够灵活地适应不同的需求。但仅仅设置水印是不够的&#xff0c;有时我们还需要确保水印不被随…