【大数据技术】Spark分布式实现词频统计(hadoop+python+spark)

news/2025/2/12 9:35:20/

hadooppythonspark_0">Spark分布式实现词频统计(hadoop+python+spark)

  • 搭建完全分布式高可用大数据集群(VMware+CentOS+FinalShell)

  • 搭建完全分布式高可用大数据集群(Hadoop+MapReduce+Yarn)

  • 本机PyCharm远程连接CentOS虚拟机(Python)

  • 搭建完全分布式高可用大数据集群(Scala+Spark)

在阅读本文前,请确保已经阅读过以上4篇文章,成功搭建了Hadoop+MapReduce+Yarn+Python+Spark大数据集群环境。

写在前面

本文主要介绍基于hadoop+spark技术,自己编写python代码实现单词词频统计的详细步骤。

  • 电脑系统:Windows

  • 技术需求:HadoopMapReduceYarnPythonSpark

  • 使用软件:VMwareFinalShellPyCharm

注:本文的所有操作均在虚拟机master中进行,不涉及另外两台虚拟机。

启动Hadoop

  1. 使用finalshell连接并启动masterslave01slave02三台虚拟机。

  2. 在虚拟机master的终端输入命令start-all.sh启动hadoop、mapreduce和yarn。

  3. 随后可以用命令jps查看是否成功启动集群。

准备数据

注意:该部分的数据文件为/data/word.txt,如果做过之前的案例,已经拥有该数据文件,可以跳过该部分。

  1. 创建文本数据

① 在虚拟机master的终端输入命令mkdir /data创建一个/data目录。

00

② 在虚拟机master的终端输入命令 vi /data/word.txt 创建并打开word.txt文件,填入以下内容。

hello world
hello hadoop
hello hdfs
hello yarn

01

  1. 创建目录

① 在终端输入以下命令,可以在HDFS中创建/wordcount/input目录,用于存放文件word.txt

hdfs dfs -mkdir -p /wordcount/input

② 在终端输入以下命令验证是否创建/wordcount/input目录。

hdfs dfs -ls /

02

  1. 上传文件

① 在终端执行以下命令将文件word.txt上传到HDFS的/wordcount/input目录。

hdfs dfs -put /data/word.txt /wordcount/input

② 在终端输入以下命令验证是否成功将文件word.txt上传到HDFS的/wordcount/input目录。

hdfs dfs -ls /wordcount/input

03

③ 可以使用以下命令查看上传的word.txt文件的内容。

hdfs dfs -cat /wordcount/input/word.txt

04

④ 也可以通过HDFS的Web UI(http://master:9870)查看文件word.txt是否上传成功。

05

编写Python脚本

打开PyCharm专业版,远程连接虚拟机master,创建脚本/wordcount/wordspark.py,填入以下代码。

from pyspark.sql import SparkSession# 创建 SparkSession,连接到 Hadoop 集群
spark = SparkSession.builder \.appName("WordCount") \.getOrCreate()# 从 HDFS 读取输入文件
text_file = spark.sparkContext.textFile("hdfs://master:9000/wordcount/input/word.txt")# 计算词频
counts = text_file.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKey(lambda a, b: a + b)# 将结果保存到 HDFS
counts.saveAsTextFile("hdfs://master:9000/wordcount/output")# 停止 SparkSession
spark.stop()

这段代码是一个典型的使用 PySpark 实现的词频统计程序,具体分析如下。

  1. 导入 PySpark 模块
python">from pyspark.sql import SparkSession
  • SparkSession 是 PySpark 中的入口点,用于创建和配置 Spark 应用程序。SparkSession 提供了多种方法,允许我们与 Spark 集群进行交互,包括读取数据、执行转换、管理 Spark 作业等。
  1. 创建 SparkSession
python">spark = SparkSession.builder \.appName("WordCount") \.getOrCreate()
  • SparkSession.builder 是用来配置并构建一个 SparkSession 实例。通过 appName 方法,给当前的 Spark 应用程序指定一个名称(这里是 “WordCount”)。getOrCreate() 会返回一个现有的 SparkSession 或创建一个新的实例,如果 Spark 会话已经存在,它将返回该会话。
  1. 读取输入文件
python">text_file = spark.sparkContext.textFile("hdfs://master:9000/wordcount/input/input.txt")
  • sparkContext.textFile() 用于读取文本文件,并将其分割成多个行(行级数据)。这里的输入文件位于 HDFS 路径 hdfs://master:9000/wordcount/input/input.txt
  • sparkContext 是 SparkSession 的底层对象,它是与底层 Spark 集群进行交互的接口。
  • HDFS(Hadoop 分布式文件系统)作为分布式存储系统,存储着待处理的文件数据。
  1. 计算词频
python">counts = text_file.flatMap(lambda line: line.split(" ")) \.map(lambda word: (word, 1)) \.reduceByKey(lambda a, b: a + b)
  • flatMap(lambda line: line.split(" "))
    • flatMap 是一种转换操作,它会将每一行的文本通过空格分割成多个单词,并返回一个由单词构成的平坦化列表。例如,一行文本 "hello world" 会变成 ["hello", "world"]
    • flatMap 的特点是会扁平化返回的列表,生成的 RDD(弹性分布式数据集)将包含所有的单词。
  • map(lambda word: (word, 1))
    • map 操作对每个单词进行转换,返回一个键值对 (word, 1),其中 word 是单词,1 表示出现的次数。
    • 这样,对于每个单词,都会创建一个键值对,后续会对相同的单词进行聚合操作。
  • reduceByKey(lambda a, b: a + b)
    • reduceByKey 是对相同键(单词)进行归约操作。它会将具有相同键的所有值(次数)加起来,得到每个单词的总词频。
    • lambda a, b: a + b 表示对于同一单词的多个 1 值,执行求和操作。
  1. 保存结果
python">counts.saveAsTextFile("hdfs://master:9000/wordcount/output")
  • saveAsTextFile() 方法将结果保存到指定路径。在此,计算得到的词频统计结果会被保存到 HDFS 路径 hdfs://master:9000/wordcount/output
  • 结果会以文本文件的形式保存,每个文件包含一部分输出数据,Spark 会自动将结果分布在多个文件中。
  1. 停止 SparkSession
python">spark.stop()
  • stop() 方法用于停止当前的 SparkSession,这样可以释放占用的资源。
  • 停止 SparkSession 是良好的实践,特别是在处理完 Spark 作业后,防止资源泄漏。

总的来说,这段代码完成了一个简单的分布式词频统计任务,其基本步骤包括:

  1. 初始化 SparkSession。
  2. 从 HDFS 中读取输入数据。
  3. 对输入数据进行词频统计:拆分单词、生成键值对、按键聚合计算词频。
  4. 将统计结果保存回 HDFS。
  5. 最后关闭 SparkSession,释放资源。

这种类型的作业常见于大数据处理和日志分析等场景。通过 Spark 的分布式计算能力,能够高效地处理大量文本数据并进行复杂计算。

运行Python脚本

注意:运行Python脚本前请确保已经启动hadoop集群。

  1. 输入以下命令查看虚拟机是否有pip工具。
pip --version

06

注意:在虚拟机master中输入命令pip --version ,如果提示没有pip,请根据提示安装pip。

07

  1. 输入以下命令安装pyspark库。
pip install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple

08

  1. 在PyCharm中运行wordspark.py程序。

09

  1. 在HDFS的Web UI(http://master:9870/explorer.html#/wordcount/output)中查看程序运行结果。

注意:有part-00000part-00001两个文件,因为是分布式存储。

10

11

运行Spark程序

注意:建议在运行spark程序前,三台虚拟机的配置为:

  • master:8G内存,4个CPU
  • slave01:4G内存,2个CPU
  • slave02:4G内存,2个CPU

  1. 在运行Spark程序前,请先删除http://master:9870/explorer.html#/wordcount/output目录。

12

  1. 输入以下命令关闭HDFS的安全模式。
hdfs dfsadmin -safemode leave

13

  1. 输入以下命令运行spark代码。

注意:运行前请确保HDFS中/wordcount/output文件不存在,如果存在,请将其删除。

spark-submit --master yarn /opt/python/code/wordcount/wordspark.py
  • spark-submit :这是启动 Spark 应用程序的命令。无论你的应用程序是使用 Scala、Java、Python 还是 R 编写的,都需要通过这个命令来提交。

  • --master yarn :这个参数指定了要使用的集群管理器(master)。在这里指定的是 YARN (Yet Another Resource Negotiator),这意味着你希望在配置为使用 YARN 作为资源管理器的 Hadoop 集群上运行此 Spark 应用程序。YARN 负责管理集群中的资源(如内存、CPU等)以及调度任务。

  • /opt/python/code/wordcount/wordspark.py :这是你想要运行的 Spark 应用程序的入口脚本。在这个例子中,它是一个 Python 文件,实现了 WordCount 算法,通常用于计算输入数据集中每个单词出现的次数。WordCount 是一个经典的入门示例,常用来展示大数据处理框架的基本使用方法。

总结一下,这条命令的作用是告诉 Spark 以客户端模式向 YARN 集群提交 wordspark.py 这个 Spark 应用程序,并由 YARN 来负责分配资源和调度作业执行。

14

  1. 重新启动安全模式。
hdfs dfsadmin -safemode enter

查看程序运行状态和结果

  1. 程序运行过程中,可以使用浏览器访问Spark的Web UI(http://master:4040/jobs/)查看程序的运行状态。

15

  1. 程序运行过程中,也可以使用浏览器访问YARN的Web UI(http://master:8088)查看程序的运行状态。

16

  1. 程序运行结束后,可以在HDFS的Web UI(http://master:9870)查看词频统计的结果。

17

  1. 当然,也可以在master的终端输入以下命令查看程序运行结果。
hdfs dfs -cat /wordcount/output/part-00000
hdfs dfs -cat /wordcount/output/part-00001

18

写在后面

本文仅供学习使用,原创文章,请勿转载,谢谢配合。


http://www.ppmy.cn/news/1570747.html

相关文章

力扣LeetCode: 63 不同路径Ⅱ

题目: 给定一个 m x n 的整数数组 grid。一个机器人初始位于 左上角(即 grid[0][0])。机器人尝试移动到 右下角(即 grid[m - 1][n - 1])。机器人每次只能向下或者向右移动一步。 网格中的障碍物和空位置分别用 1 和 …

在亚马逊云科技上云原生部署DeepSeek-R1模型(下)

在本系列的上篇中,我们介绍了如何通过Amazon Bedrock部署并测试使用了DeepSeek模型。在接下来的下篇中小李哥将继续介绍,如何利用亚马逊的AI模型训练平台SageMaker AI中的,Amazon Sagemaker JumpStart通过脚本轻松一键式部署DeepSeek预训练模…

android 动态库加载机制

省流:android 不兼容 glibc,而是写了一套独立的 c 运行时库 (bionic libc),为移动设备和 google 自己推的东西做了大量优化。在这套工具链里,aosp 实现了一个兼容 bionic libc 的链接器,放到系统中代替 ld。 这个链接…

【含文档+PPT+源码】基于Python校园跑腿管理系统设计与实现

项目介绍 本课程演示的是一款基于Python校园跑腿管理系统设计与实现,主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Python学习者。 1.包含:项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 3.…

深度整理总结MySQL——事务简介

事务简介 什么是事务为什么需要事务事务特性原子性现实世界的转账操作是不可分割的数据库世界的转账操作可能是多个步骤可能发生的错误和故障 隔离性一致性数据库的一致性 持久性 事务的状态状态分析活动的(Active)部分提交的(Partially Committed)提交的…

67.为日志添加行号,第一行不加 C#例子

事先要在本地创建一个叫该名称的文件,在代码路径下。TempFile.txt 你可以自由的输入一些换行符,或者复制一片文章进去,然后运行代码就会发现有行号。 using System; class Program {static void Main(string[] args){string FilePath &quo…

pytest.fixture

pytest.fixture 是 pytest 测试框架中的一个非常强大的功能,它允许你在测试函数运行前后执行一些设置或清理代码。以下是关于 pytest.fixture 的详细介绍: 一、定义与用途 pytest.fixture 是一个装饰器,用于标记一个函数为 fixture。Fixture 函数中的代码可以在测试函数运…

AIGC-辅助小说(斗破苍穹为例)创作智能体完整指令(DeepSeek,豆包,千问,Kimi,GPT)

Unity3D特效百例案例项目实战源码Android-Unity实战问题汇总游戏脚本-辅助自动化Android控件全解手册再战Android系列Scratch编程案例软考全系列Unity3D学习专栏蓝桥系列AIGC(GPT、DeepSeek、豆包、千问、Kimi)👉关于作者 专注于Android/Unity和各种游戏开发技巧,以及各种资…