Spark Streaming的核心功能及其示例PySpark代码

news/2025/1/19 16:42:06/

Spark Streaming是Apache Spark中用于实时流数据处理的模块。以下是一些常见功能的实用PySpark代码示例:

  1. 基础流处理:从TCP套接字读取数据并统计单词数量
from pyspark import `SparkContext
from pyspark.streaming import StreamingContext# 创建SparkContext和StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)  # 1秒的批处理间隔# 创建一个DStream,从TCP源读取数据
lines = ssc.socketTextStream("localhost", 9999)# 对每一行数据进行分词,映射为(word, 1)的键值对,然后按单词统计数量
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)# 打印每个RDD中的前10个元素
word_counts.pprint()# 启动流计算
ssc.start()
# 等待流计算结束
ssc.awaitTermination()

在上述代码中:

  • sc 是 SparkContext ,用于与Spark集群交互。
  • ssc 是 StreamingContext ,定义了批处理间隔。
  • lines 是一个 DStream ,从指定的TCP套接字读取数据。
  • words 对每行数据进行分词, word_counts 统计每个单词出现的次数。
  • pprint 方法打印每个批次的前10个元素。
  1. 使用窗口函数
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "WindowedWordCount")
ssc = StreamingContext(sc, 1)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用窗口函数,窗口大小为3秒,滑动间隔为1秒
windowed_word_counts = word_counts.reduceByKeyAndWindow(lambda a, b: a + b, lambda a, b: a - b, 3, 1)windowed_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • reduceByKeyAndWindow 方法用于在窗口上进行聚合操作。
  • 第一个参数是用于合并窗口内元素的函数,第二个参数是用于移除窗口外元素的函数。
  1. 状态更新
from pyspark import SparkContext
from pyspark.streaming import StreamingContextsc = SparkContext("local[2]", "StatefulWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")  # 启用检查点def updateFunction(new_values, running_count):if running_count is None:running_count = 0return sum(new_values, running_count)lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1))# 使用updateStateByKey进行状态更新
stateful_word_counts = word_counts.updateStateByKey(updateFunction)stateful_word_counts.pprint()ssc.start()
ssc.awaitTermination()

在上述代码中:

  • updateStateByKey 方法用于维护每个键的状态。
  • updateFunction 定义了如何根据新值和现有状态更新状态。
  1. 与Kafka集成
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilssc = SparkContext("local[2]", "KafkaWordCount")
ssc = StreamingContext(sc, 1)# Kafka参数
kafkaParams = {"metadata.broker.list": "localhost:9092"}
topics = ["test"]# 创建Kafka输入DStream
kvs = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)
lines = kvs.map(lambda x: x[1])words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)word_counts.pprint()ssc.start()
ssc.awaitTermination()

在这个示例中:

  • KafkaUtils.createDirectStream 用于从Kafka主题读取数据。
  • kvs 是一个包含Kafka消息的DStream, lines 提取消息内容。

http://www.ppmy.cn/news/1564458.html

相关文章

Docker 搭建mysql 连接超时问题,xxl-job启动mysql连接报错,禁用dns

1.本地连接Navicat报错信息,猜测是navicat默认连接超时导致的,后面换成idea一个插件虽然慢但连接上了 2013 - Lost connection to MySQL server at reading initial communication packet 2.启动xxl-job会报错,网上有人mysql驱动与数据库不匹…

PyTorch使用教程(6)一文讲清楚torch.nn和torch.nn.functional的区别

torch.nn 和 torch.nn.functional 在 PyTorch 中都是用于构建神经网络的重要组件,但它们在设计理念、使用方式和功能上存在一些显著的区别。以下是关于这两个模块的详细区别: 1. 继承方式与结构 torch.nn torch.nn 中的模块大多数是通过继承 torch.nn…

使用jupyter notebook没有正常打开浏览器的几种情况解决

迅速记录前期 1.下载 https://www.anaconda.com/products/individual 2.安装 直接默认安装就行 3.打开jupyter notebook 在开始菜单里面可以找到 4.遇到的问题解决 1.运行jupyter notebook,黑窗口自动关了 每次黑窗口迅速的加载完就自己关掉了 也没有打开新…

麒麟kylin YUM配置管理

一、概述 YUM(Yellowdog Updater Modified)是一个用于RPM包管理系统的自动化更新工具,最初由Duke University的Linux实验室开发。它简化了基于RPM的Linux发行版(如Red Hat Enterprise Linux, CentOS, Fedora等)上的软…

基于django中医药数据可视化平台(源码+lw+部署文档+讲解),源码可白嫖!

摘要 时代在飞速进步,每个行业都在努力发展现在先进技术,通过这些先进的技术来提高自己的水平和优势,中医药管理平台当然不能排除在外。中医药数据可视化平台是在实际应用和软件工程的开发原理之上,运用Python语言、ECharts技术、…

hydra破解密码

hydra九头蛇是常用的密码破解工具 1、破解centos ssh密码 hydra -l root -P password.txt ssh://192.168.1.107:2222 hydra -l root -P password.txt -s 2222 192.168.1.107 ssh2、破解ftp hydra -l allen -P e:\aa.txt ftp://127.0.0.1 hydra -l allen -P e:\aa.txt ftp:…

Banana Pi BPI-RV2 RISC-V路由开发板采用矽昌通信SF2H8898芯片

Banana Pi BPI-RV2 开源网关是⼀款基于矽昌SF2H8898 SoC的设备,1 2.5 G WAN⽹络接⼝、5 个千兆LAN ⽹络接⼝、板载 512MB DDR3 内存 、128 MiB NAND、16 MiB NOR、M.2接⼝,MINI PCIE和USB 2.0接⼝等。 Banana Pi BPI-RV2 开源网关是矽昌和⾹蕉派开源社…

leetcode118.杨辉三角

给定一个非负整数 numRows,生成「杨辉三角」的前 numRows 行。在「杨辉三角」中,每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1],[1,2,1],[1,3,3,1],[1,4,6,4,1]]示例 2: 输入: numRows 1 输出: [[1]] public List&l…