spark stream:从Kafka中读取数据

news/2024/12/22 13:12:43/

一、添加依赖

ThisBuild / version := "0.1.0-SNAPSHOT"ThisBuild / scalaVersion := "2.12.12"
libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.12" % "3.0.0")
libraryDependencies ++= Seq( "org.apache.spark" % "spark-sql_2.12" % "3.0.0")
libraryDependencies ++= Seq( "org.apache.spark" % "spark-streaming_2.12" % "3.0.0")
libraryDependencies ++= Seq( "org.apache.spark" % "spark-streaming-kafka-0-10_2.12" % "3.0.0")
libraryDependencies ++= Seq( "com.fasterxml.jackson.core" % "jackson-core" % "2.10.1")libraryDependencies ++= Seq( "mysql" % "mysql-connector-java" % "5.1.30")lazy val root = (project in file(".")).settings(name := "scala-proj")

二、demo程序

package example3import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}import scala.collection.mutable
import scala.util.Randomobject HelloStreaming04 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaConsumer")val ssc = new StreamingContext(sparkConf, Seconds(5))val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "xx.xx.xx.xx:9092", ConsumerConfig.GROUP_ID_CONFIG -> "myGroup","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val kafkaDataDs: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("myTopic"), kafkaPara))kafkaDataDs.map(_.value()).print()ssc.start();ssc.awaitTermination();}}

http://www.ppmy.cn/news/1364602.html

相关文章

Bootloader开发

Bootloader开发 Bootloader开发目标 1) 要求Bootloader程序在升级过程中不被擦除,保证即使升级失败也能进入下次升级流程; 2) 上电后先进入Bootloader程序,在程序接收并存储完毕后进入main()函数; 3) 要求Bootloader程序可以作为CCS应用程序工程的一部分,而不需要将B…

Maven编译报processing instruction can not have PITarget with reserveld xml name

在java项目中,平时我们会执行mvn clean package命令来编译我们的java项目,可是博主今天执行编译时突然报了 processing instruction can not have PITarget with reserveld xml name 这个错,网上也说法不一,但是绝大绝大部分是因…

vue3的echarts从后端获取数据,用于绘制图表

场景需求:后端采用flask通过pymysql从数据库获取数据,并返回给前端。前端vue3利用axios获取数据并运用到echarts绘制图表。 第一步,vue中引入echarts 首先vue下载echarts npm install echarts 然后在main.js文件写如下代码 import {create…

【Nginx笔记02】通过Nginx服务器转发客户端的WebSocket接口到后端服务

这篇文章,主要介绍如何通过Nginx服务器转发客户端的WebSocket接口到后端服务【知识星球】。 目录 一、Nginx配置WebSocket 1.1、Nginx配置内容 1.2、客户端请求地址 1.3、创建WebSocket测试工程 1.4、启动测试 1.5、WebSocket超时问题 1.5.1、设置超时时间 …

日本韩国媒体宣发稿渠道平台怎么找?跨境出海推广新闻报道营销公司告诉你

【本篇由言同数字科技有限公司原创】随着全球化和互联网的快速发展,品牌出海已经成为众多企业的共同目标。在这个过程中,通过在日本和韩国的媒体上发表文章,可以带来许多重要的意义和益处。在本文中,我们将探讨一下这些意义。 首…

新学期新小艺,华为nova 12 Pro 你的学习好搭子上线啦~

又是一年返校季,全国的莘莘学子开启了自己的新学期,伴随着人工智能快速发展,越来越多的功能正在改变我们的学习和生活。接入了盘古大模型的小艺,以其卓越的智能功能,为我们的生活带来了前所未有的便捷。深受学生党们喜…

浅谈 Linux 网络编程 - 网络字节序

文章目录 前言核心知识关于 小端法关于 大端法网络字节序的转换 函数 前言 在进行 socket 网络编程时,会用到字节流的转换函数、例如 inet_pton、htons 等,那么为什么要用到这些函数呢,本篇主要就是对这部分进行介绍。 核心知识 重点需要记…

LDR6020双盲插音频随便插充电听歌随便插

随着智能手机的普及和功能的日益丰富,手机已经成为我们日常生活中不可或缺的一部分。音乐、电影、游戏等娱乐内容更是丰富了手机的使用体验。而在这其中,音频转接器的作用愈发凸显,特别是在边听边充的场景下,一款高效且便捷的手机…