IDEA编写各种WordCount运行

news/2024/11/28 21:40:14/

目录

一、编写WordCount(Spark_scala)提交到spark高可用集群

1.项目结构

2.导入依赖

3.编写scala版的WordCount

4.maven打包

5.运行jar包

​6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

3.编辑Edit Configurations配置文件

4.运行后查看结果


搞了一个晚上加一个白天,总算搞出来了,呼~~

本地编写IDEA之前需要在windows下安装scala、hadoop和spark环境,参考文章如下:

《Scala安装》 《Windows环境部署Hadoop-3.3.2和Spark3.3.2》

一、编写WordCount(Spark_scala)提交到spark高可用集群

首先安装好scala,然后在IDEA创建一个maven项目,开始编写代码

1.项目结构

2.导入依赖

<name>spark-in-action</name><url>http://maven.apache.org</url><!-- 定义的一些常量 --><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><encoding>UTF-8</encoding><spark.version>3.3.2</spark.version><scala.version>2.12.15</scala.version></properties><dependencies><!-- scala的依赖 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- spark core 即为spark内核 ,其他高级组件都要依赖spark core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>${spark.version}</version></dependency></dependencies><!-- 配置Maven的镜像库 --><!-- 依赖下载国内镜像库 --><repositories><repository><id>nexus-aliyun</id><name>Nexus aliyun</name><layout>default</layout><url>http://maven.aliyun.com/nexus/content/groups/public</url><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases></repository></repositories><!-- maven插件下载国内镜像库 --><pluginRepositories><pluginRepository><id>ali-plugin</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled><updatePolicy>never</updatePolicy></snapshots><releases><enabled>true</enabled><updatePolicy>never</updatePolicy></releases></pluginRepository></pluginRepositories><build><pluginManagement><plugins><!-- 编译scala的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><!-- 编译java的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 打jar插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>

3.编写scala版的WordCount

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCount")// 1.创建SparkContextval sc = new SparkContext(conf)// 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)val lines: RDD[String] = sc.textFile(args(0))// 3.对RDD进行操作,调用RDD的方法// -------------Transformation (转换算子开始) --------------// 切分压平val words: RDD[String] = lines.flatMap(_.split(" "))// 将单词和1组合放入到元组中val wordAndOne: RDD[(String, Int)] = words.map((_, 1))// 将key相同的数据进行分组聚合val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 按照次数排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序// -------------Transformation (转换算子结束) --------------// 4.调用Action// 将数据写入到外部的存储系统中sorted.saveAsTextFile(args(1))// 5.释放资源sc.stop()}
}

4.maven打包

这个是胖包,除了项目本身的依赖,还有其他依赖,上面的original是瘦包,只有项目本身的依赖

5.运行jar包

首先启动zk、hdfs和spark高可用集群,这里我搭建的是standalone模式的高可用集群,不是on Yarn的

创建/opt/soft/spark-3.2.3/submit目录,将jar包上传到该目录下

提交命令

[root@node141 submit]# ../bin/spark-submit --master spark://node141:7077 --class cn.doitedu.day01.WordCount --executor-memory 1g --total-executor-cores 4 ./spark-in-action-1.0.jar hdfs://node141:9000/words.txt hdfs://node141:9000/out-1

--master spark://node141:7077     spark的master节点

--class cn.doitedu.day01.WordCount   运行的类名

--executor-memory 1g   占用的内存

--total-executor-cores 4  占用的核数

./spark-in-action-1.0.jar  运行的jar包地址

hdfs://node141:9000/words.txt   代码中args[0]对应的参数

hdfs://node141:9000/out-1  代码中args[1]对应的参数

6.查询hdfs的输出结果

二、本地编写WordCount(Spark_scala)读取本地文件

1.项目结构

2.编写scala版的WordCount

package cn.doitedu.day01import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}/*** 使用本地模式运行Spark程序(开发调试的时候使用)*/
object LocalWordCount {def main(args: Array[String]): Unit = {// 指定当前用户为rootSystem.setProperty("HADOOP_USER_NAME", "root")val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]") // 本地模式,*表示根据当前机器的核数开多个线程// 1.创建SparkContextval sc = new SparkContext(conf)// 2.创建RDD(神奇的大集合,该集合中不装真正要计算的数据,而是装描述信息)val lines: RDD[String] = sc.textFile(args(0))// 3.对RDD进行操作,调用RDD的方法// -------------Transformation (转换算子开始) --------------// 切分压平val words: RDD[String] = lines.flatMap(line => {val words = line.split(" ")println(words)  // debugwords})// 将单词和1组合放入到元祖中val wordAndOne: RDD[(String, Int)] = words.map((_, 1))// 将key相同的数据进行分组聚合val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)// 按照次数排序val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) // 降序// -------------Transformation (转换算子结束) --------------// 4.调用Action// 将数据写入到外部的存储系统中sorted.saveAsTextFile(args(1))// 5.释放资源sc.stop()}
}

3.编辑Edit Configurations配置文件

4.直接本地运行LocalWordCount.scala文件

查看运行结果

三、本地编写WordCount(Spark_java版)读取本地文件,并输出到本地

1.项目结构

2.编写java版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;import java.util.Arrays;
import java.util.Iterator;public class JavaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile(args[0]);JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String lines) throws Exception {String[] words = lines.split("\\s+");return Arrays.asList(words).iterator();}});JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return Tuple2.apply(word, 1);}});JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});// 将原来的kv顺序颠倒  (flink,3)  ----> (3,flink)JavaPairRDD<Integer, String> swapped = reduced.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {@Overridepublic Tuple2<Integer, String> call(Tuple2<String, Integer> tp) throws Exception {return tp.swap(); // 交换}});JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);JavaPairRDD<String, Integer> result = sorted.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tp) throws Exception {return tp.swap();}});result.saveAsTextFile(args[1]);jsc.stop();}
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

四、本地编写WordCount(Spark_java_lambda版)读取本地文件,并输出到本地

1.项目结构

2.编写java-lambda版的WordCount

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;import java.util.Arrays;public class JavaLambdaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("JavaLambdaWordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<String> lines = jsc.textFile(args[0]);JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(w -> Tuple2.apply(w, 1));
//        JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey((a, b) -> a + b);JavaPairRDD<String, Integer> reduced = wordAndOne.reduceByKey(Integer::sum);JavaPairRDD<Integer, String> swapped = reduced.mapToPair(Tuple2::swap);// 排序JavaPairRDD<Integer, String> sorted = swapped.sortByKey(false);// 调回来JavaPairRDD<String, Integer> result = sorted.mapToPair(Tuple2::swap);result.saveAsTextFile(args[1]);jsc.stop();}
}

3.编辑Edit Configurations配置文件

4.运行后查看结果

好啦~~


http://www.ppmy.cn/news/1379708.html

相关文章

软件测试 基础(2)

文章目录 1. 软件测试&软件开发生命周期2. 如何描述一个 BUG3. 如何定义 BUG 的级别4. BUG 的生命周期5. 如何进行第一次测试6. 测试的执行和 BUG 管理7. 产生争执怎么办&#xff08;处理人际关系&#xff09; 1. 软件测试&软件开发生命周期 软件测试的生命周期&#…

手机群控软件开发必备源代码分享!

随着移动互联网的飞速发展&#xff0c;手机群控技术在市场推广、自动化测试、应用管理等领域的应用越来越广泛&#xff0c;手机群控软件作为一种能够同时控制多台手机设备的工具&#xff0c;其开发过程中&#xff0c;源代码的编写显得尤为重要。 1、设备连接与识别模块 设备连…

计算机网络基础【信息系统监理师】

计算机网络基础【信息系统监理师】 1、OSI七层参考模型2、TCP/IP协议3、网络拓扑结构分类4、网络传输介质分类5、网络交换技术6、网络存储技术7、网络规划技术8、综合布线系统8.1、综合布线工程内容8.1、隐蔽工程-金属线槽安装8.2、隐蔽工程-管道安装槽道与各种管线间的最小净距…

22 Dytechlab Cup 2022C. Ela and Crickets(思维、找规律、模拟)

思路就是找规律 可以发现&#xff0c;当拐点在角落时的情况和不在角落的情况是不同 当拐点在角落时&#xff0c;只有目标点的横纵坐标其中的一个和它相同时&#xff0c;这时才可能到达。 否则&#xff0c;我们就简单的例子可以看一下&#xff0c;当一个 2 ∗ 2 2*2 2∗2的矩阵的…

【智能家居】东胜物联ODM定制ZigBee网关,助力能源管理解决方案商,提升市场占有率

背景 本文案例服务的客户是专业从事智能家居能源管理的解决方案商&#xff0c;其产品与服务旨在帮助用户监测、管理和优化能源消耗&#xff0c;以提高能源使用效率。 随着公司的扩张&#xff0c;为了增加市场占有率&#xff0c;他们希望找到更好的硬件服务支持&#xff0c;以…

CSS 02

1.复合选择器 &#xff08;1.1&#xff09;后代选择器 代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0&q…

Flink实时数仓之用户埋点系统(一)

需求分析及框架选型 需求分析数据采集用户行为采集业务数据采集 行为日志分析用户行为日志页面日志启动日志APP在线日志 业务数据分析用户Insert数据用户Update数据 技术选型Nginx配置Flume配置MaxWellHadoopFlink架构图 需求分析 数据采集 用户行为采集 行为数据&#xff1…

FPGA TestBench编写学习

1 timescale 1.1 简介 timescale指令用于指定编译器在处理仿真时的时间单位和时间精度。这个指令通常在模块的顶层声明中使用&#xff0c;它告诉编译器和仿真器如何解释代码中的时间值。 timescale指令的语法如下&#xff1a; timescale <time_unit> <time_precis…