头歌:Spark Streaming

embedded/2024/10/18 16:46:02/

第1关:套接字流实现黑名单过滤
 

简介

套接字流是通过监听Socket端口接收的数据,相当于Socket之间的通信,任何用户在用Socket(套接字)通信之前,首先要先申请一个Socket号,Socket号相当于该用户的电话号码。同时要知道对方的Socket,相当于对方也有一个电话号码。然后向对方拨号呼叫,相当于发出连接请求。对方假如在场并空闲,相当于通信的另一主机开机且可以接受连接请求,拿起电话话筒,双方就可以正式通话,相当于连接成功。Spark Streaming通过监听套接字端口获取流数据信息并处理。
黑名单过滤是获取套接字流发送的名单信息,通过指定的黑名单信息对其进行过滤,输出结果为没有黑名单的名单信息。

任务描述


本关任务:本关是利用套接字流监听方法,监听名单信息并过滤黑名单信息。首先需要模拟名单的生成,首先需要建一个文档,每行为一个姓名。然后编写代码,当有指定套接字连接产生时,从文件中依次选取所有名单,发送给套接字端口。另外在编写代码,通过连接套接字端口,监听端口的数据,获取发送的名单,并过滤黑名单。需用提供的文档文件内的名单来完成此实验。名单文档内容如下:
Jim
Mary
Tom
Jack
Abby
Bee
Belle
Babs
Carla
Dale
Dan
Gary
Ken
Jane
Paige

相关知识


1.名单生成器的创建


代码依次获取指定文件中的一个名单。内部是首先获取指定名单文件,然后指定监听端口,然后等待连接建立,连接建立后依次获取一个名单总个数内的整数,并索引到名单,将其发送至套接字端口。

import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object NameProducer {def main(args: Array[String]): Unit = {//获取指定文件val filename = "/root/data/Namelist"//指定文件按行切分为listval lines = Source.fromFile(filename).getLines.toList//获取list长度val filerow = lines.length//指定监听窗口,当外部程序请求时建立连接val listener = new ServerSocket(5566)//等待socket连接成功println("bengin11111111111!!!")val socket = listener.accept()new Thread() {override def run = {println("Got client connected from: " + socket.getInetAddress)//创建写入socket对象val out = new PrintWriter(socket.getOutputStream(), true)println("bengin222222222222222222!!!")for (i <- 0 to filerow-1) {//逐行取一个名字val content = lines(i)println(content)//写入socketout.write(content + '\n')//清空缓冲区数据out.flush()}socket.close()}}.start()}
}

2.监听套接字端口过滤黑名单


首先要设置需要过滤的黑名单:

val BlackList = Array("name1", "name2")

接着把名单转换成RDD,然后连接套接字端口,连接端口代码如下:

ssc.socketTextStream("localhost", 5566)

编程要求


根据提示,补充监听套接字并过滤黑名单代码文TransformBlackList。

测试说明


平台会对你编写的代码进行测试:
测试输入:自动获取指定的Namelist文件。
预期输出:

Jim
Tom
Jack
Bee
Belle
Babs
Dale
Dan
Gary
Jane

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Secondsobject TransformBlackList {def main(args: Array[String]): Unit = {/********** Begin **********/// 初始化val sparkConf = new SparkConf().setAppName("TransformBlackList").setMaster("local[2]")// 创建StreamingContext,设置每五秒刷新一次val ssc = new StreamingContext(sparkConf, Seconds(5))ssc.sparkContext.setLogLevel("ERROR")// 设置需要过滤的黑名单(Abby、 Paige、 Carla、 Mary、 Ken)val BlackList = Array("Abby", "Paige", "Carla", "Mary", "Ken")// 把黑名单数组转换成rddval BlackListRdd = ssc.sparkContext.parallelize(BlackList)// 设置主机名localhost,端口号5566val NameList = ssc.socketTextStream("localhost", 5566)// 过滤黑名单算法val ValidName = NameList.transform(rdd => {rdd.filter(name => !BlackList.contains(name))})ValidName.print()ssc.start()// 等待足够长的时间以触发至少一次计算Thread.sleep(5000)ssc.stop(false, false)/********** End **********/}
}


http://www.ppmy.cn/embedded/26442.html

相关文章

等保测评:网络安全合规的基石

在数字化时代&#xff0c;网络安全已成为国家安全战略的重要组成部分。信息安全等级保护测评&#xff08;等保测评&#xff09;作为网络安全合规的核心&#xff0c;对于维护网络空间的安全稳定、保护企业和个人的信息资产具有不可替代的作用。 ## 一、等保测评的法律地位 等保…

【补充】图神经网络前传——图论

本文作为对图神经网络的补充。主要内容是看书。 仅包含Introduction to Graph Theory前五章以及其他相关书籍的相关内容&#xff08;如果后续在实践中发现前五章不够&#xff0c;会补上剩余内容&#xff09; 引入 什么是图&#xff1f; 如上图所示的路线图和电路图都可以使用…

【linuxC语言】fcntl和ioctl函数

文章目录 前言一、功能介绍二、具体使用2.1 fcntl函数2.2 ioctl函数 三、拓展&#xff1a;填写arg总结 前言 在Linux系统编程中&#xff0c;经常会涉及到对文件描述符、套接字以及设备的控制操作。fcntl和ioctl函数就是用来进行这些控制操作的两个重要的系统调用。它们提供了对…

【vite】执行npm create vue@latest卡死

目录标题 现场呈现解决方法 现场呈现 如图所示&#xff0c;执行上面命令之后一直卡这里不动了。 解决方法 查看镜像 npm config get registry切换镜像 npm config set registryhttps://registry.npmmirror.com

MongoDB聚合运算符:$strLenBytes

MongoDB聚合运算符&#xff1a;$strLenBytes 文章目录 MongoDB聚合运算符&#xff1a;$strLenBytes语法使用举例单字节和多字节字符集 $strLenBytes聚合运算符返回指定字符串中 UTF-8 编码的字节数。 语法 { $strLenBytes: <string expression> }<expression>为可…

四:物联网ARM开发

软件安装 keil5软件的安装包&#xff08;软件开发&#xff09;-> Keil.STM32F0XX_DFP.1.4.0.pack &#xff08;这是STM32F0设备安装包&#xff09;-> KEIL_Lic(破解keil的安装包) -> 安装 ST_LINK 烧写工具 -> 安装Java运行环境 -> 选择STM32CubeMX(微控制器图…

PHP利用JWT refresh_token获取新access_token

PHP利用JWT refresh_token获取新token 在PHP中使用JWT&#xff08;JSON Web Tokens&#xff09;来刷新refresh_token并获取新的access_token&#xff0c;你需要实现以下步骤&#xff1a; 当用户登录时&#xff0c;生成一个access_token和一个refresh_token。 设置refresh_tok…

servlet生命周期

生命周期过程介绍 serlvet 是单实例【产生一次&#xff0c;销毁一次】多线程 默认第一次访问的时候,服务器创建 servlet,并调用 init 主方法实现初始化操作,只要请求来的时候,服务器就会自动创建一个线程,去调用 service 方法执行业务逻辑代码&#xff0c;而当 serlvet 被移除…