Spark Streaming的核心功能及其示例PySpark代码

server/2025/1/19 7:09:22/

Spark Streaming是Apache Spark中用于实时流数据处理的模块。以下是一些常见功能的实用PySpark代码示例:

  1. 基础流处理:从TCP套接字读取数据并统计单词数量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)  # 1秒的批处理间隔# 创建一个DStream,从TCP源读取数据
lines = ssc.socketTextStream("localhost", 9999)# 对每一行数据进行分词,映射为(word, 1)的键值对,然后按单词统计数量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)# 打印每个RDD中的前10个元素
word_counts.pprint()# 启动流计算
ssc.start()
# 等待流计算结束
ssc.awaitTermination()

在上述代码中:

  • sc 是 SparkContext ,用于与Spark集群交互。
  • ssc 是 StreamingContext ,定义了批处理间隔。
  • lines 是一个 DStream ,从指定的TCP套接字读取数据。
  • words 对每行数据进行分词, word_counts 统计每个单词出现的次数。
  • pprint 方法打印每个批次的前10个元素。
  1. 使用窗口函数
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用窗口函数,窗口大小为3秒,滑动间隔为1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)windowed_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • reduceByKeyAndWindow 方法用于在窗口上进行聚合操作。
  • 第一个参数是用于合并窗口内元素的函数,第二个参数是用于移除窗口外元素的函数。
  1. 状态更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")  # 启用检查点def updateFunction(new_values, running_count):if running_count is None:running_count = 0return sum(new_values, running_count)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用updateStateByKey进行状态更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)stateful_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在上述代码中:

  • updateStateByKey 方法用于维护每个键的状态。
  • updateFunction 定义了如何根据新值和现有状态更新状态。
  1. 与Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilssc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)# Kafka参数
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]# 创建Kafka输入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • KafkaUtils.createDirectStream 用于从Kafka主题读取数据。
  • kvs 是一个包含Kafka消息的DStream, lines 提取消息内容。

http://www.ppmy.cn/server/159565.html

相关文章

windows蓝牙驱动开发-BLE音频(二)

详细设计 音频格式要求 音频帧持续时间 蓝牙 LE 音频配置文件允许实现支持音频帧持续时间为 7.5 毫秒或 10 毫秒的音频流式处理。 Windows 要求 IHV 提供的编解码器支持这两个帧持续时间,以确保与蓝牙 LE 音频配件设备的互操作性,并与连接到系统的其他…

【Rust的2种线程锁 阻塞 vs 挂起】

async_std::sync::Mutex 和 std::sync::Mutex 之间的主要区别在于它们如何处理线程阻塞和异步编程模型。以下是两者的关键差异: 标准库的 Mutex (std::sync::Mutex) 同步阻塞:当一个线程尝试获取 std::sync::Mutex 的锁时,如果锁已经被其他线…

数字化时代,传统代理模式的变革之路

在数字化飞速发展的今天,线上线下融合(O2O)成了商业领域的大趋势。这股潮流,正猛烈冲击着传统代理模式,给它带来了新的改变。 咱们先看看线上线下融合现在啥情况。线上渠道那是越来越多,企业纷纷在电商平台…

一文读懂单片机的串口

目录 串口通信的基本概念 串口通信的关键参数 单片机串口的硬件连接 单片机串口的工作原理 数据发送过程 数据接收过程 单片机串口的编程实现 以51单片机为例 硬件连接 初始化串口 发送数据 接收数据 串口中断服务函数 代码示例 单片机串口的应用实例 单片机与…

第十二章:算法与程序设计

文章目录: 一:基本概念 1.算法与程序 1.1 算法 1.2 程序 2.编译预处理 3.面向对象技术 4.程序设计方法 5.SOP标志作业流程 6.工具 6.1 自然语言 6.2 流程图 6.3 N/S图 6.4 伪代码 6.5 计算机语言 二:程序设计 基础 1.常数 …

百度热力图数据原理,处理及论文应用9

目录 0、数据简介0、示例数据1、百度热力图数据日期如何选择1.1、其他实验数据的时间1.2、看日历1.3、看天气 2、百度热力图几天够研究?部分文章统计3、数据原理3.1.1 ** 这个比较重要,后面还会再次出现。核密度的值怎么理解?**3.1.2 Csv->…

Asp.Net Core 8.0 使用 Serilog 按日志级别写入日志文件的两种方式

1、所需的Nuget包 本文项目的版本是.NET 8.0,如果使用其它版本安装适配版本即可。 Serilog.AspNetCore(8.0.2) Serilog.Sinks.File(5.0.0) Serilog.Expressions(5.0.0) 2、两种配置方式 2.1 代码形式(Program.cs) 在Program.cs文件中&am…

C++/C语言判断重复数组(zznu)⭐

问题描述 如果一个数组中不包含重复的元素&#xff0c;那么我们称这个数组是独ONE无TWO的数组。给定一个数组&#xff0c;请你判断这个数组是否是独ONE无TWO的。 输入 首先输入一个正整数n表示数组的长度&#xff08;0<n<100&#xff09;。 接下来输入n个整数&#xff08…