PySpark3.4.4_基于StreamingContext实现网络字节流统计分析

ops/2024/12/14 0:42:11/

网络字节流与嵌套字节流的区别

  1. 概念解释

    • 网络嵌套字节流
      • 网络编程的情境下,网络嵌套字节流通常是指将字节流(字节序列)以一种分层或者包含的方式进行组织,用于在网络传输过程中更好地处理数据。例如,在一个复杂的网络协议栈中,高层协议的数据单元(往往也是字节流形式)可以嵌套在底层协议的字节流之中。这就好比包裹的嵌套,外层包裹可能包含了内层包裹的相关信息以及内层包裹本身。以 HTTP 协议在 TCP/IP 协议之上传输为例,HTTP 消息(本身是字节流)被嵌套在 TCP 的字节流中进行传输。TCP 协议负责将 HTTP 消息切割成合适的片段(字节流形式),加上 TCP 头信息(也是字节流),然后通过网络发送。接收端的 TCP 协议先处理接收到的字节流,提取出 HTTP 消息的字节流部分,再交给上层的 HTTP 协议处理。
    • 套字节流
      • 这个概念不是很常见,如果理解为 “包裹字节流” 的意思,和网络嵌套字节流有相似之处。不过,“套字节流” 可能更强调简单的封装形式,即将一个字节流作为另一个字节流的一部分进行简单包装。比如,在加密通信中,原始的字节流(如要传输的文件内容字节流)被加密算法处理后,会生成一个新的字节流,这个新字节流可以看作是原始字节流被 “套” 上了一层加密后的字节流。它可能没有像网络嵌套字节流那样涉及复杂的网络协议层次关系。
  2. 应用场景区别

    • 网络嵌套字节流
      • 广泛应用于网络通信的各个层次。在构建网络服务器和客户端应用时,不同层次的网络协议交互都涉及网络嵌套字节流。例如,在电子邮件传输(SMTP、POP3 等协议)中,邮件内容字节流被嵌套在相应的协议字节流中在网络上传输。它主要用于保证数据在不同网络环境和协议间的正确传递和解析,确保数据能够从源端的应用层通过层层协议封装,经过网络传输,最终在目的端的应用层被正确还原。
    • 套字节流
      • 更多地用于数据安全和简单的数据封装场景。如在数字签名的应用中,消息的字节流被 “套” 上签名信息的字节流,用于验证消息的来源和完整性。或者在数据存储中,为了区分不同类型的数据,将数据字节流 “套” 上一个标识头字节流进行存储,方便后续读取和分类处理。
  3. 处理方式区别

    • 网络嵌套字节流
      • 需要严格按照网络协议栈的规则进行处理。在发送端,数据从高层协议开始,一层一层地进行字节流的嵌套和封装,添加每层协议所需的头部、尾部等信息。在接收端,则是相反的过程,从最外层的协议字节流开始,逐步解包和解析,根据每层协议的规范提取出内层协议的字节流,直到最终得到应用层的数据字节流。这需要对各种网络协议的格式、功能和交互流程有深入的了解。
    • 套字节流
      • 处理相对简单,主要关注封装和提取两个操作。在封装时,根据具体的需求添加包裹字节流(如加密后的字节流添加到原始字节流外层)。在提取时,按照预先定义的规则(如加密算法对应的解密规则、数据标识头的解析规则等)去除外层字节流,获取内部的原始字节流或者所需的数据。

PySpark代码开发

需要在ubuntu环境下或windows环境下,提前安装好spark执行环境

软件说明:

  1. spark 3.4.4
  2. python 3.9.20
  3. java jdk1.8.0_431

代码说明

DataSourceSoket.py 用于模拟生成实时字节流数据的脚本

python"># coding:utf8
import random
from socket import socketserver = socket()server.bind(('localhost', 9999))
server.listen(1)
while True:# 为了方便识别,输出一个"I'm waiting the connect ..."print("I'm waiting the connect ...")conn, addr = server.accept()print("Connected by {0}".format(addr))print(f"Connected by {addr}")# 输出发送数据# 自定义10条中文数据在一个数据容器里,并随机选取一条中文数据集输出# 步骤1:创建一个列表作为数据容器data_container = []# 步骤2:向列表中添加10条不同的中文数据chinese_data = ["你好,世界","今天天气真好","学习是一件快乐的事","分享知识,传递快乐","探索未知的世界","坚持就是胜利","努力不懈,梦想终会实现","失败乃成功之母","平凡造就非凡","相信自己,你是最棒的","I like Spark","I like Flink","I like Hadoop"]data_container.extend(chinese_data)# 步骤3:使用random.choice()随机选择并输出一条数据random_item = random.choice(data_container)print(random_item)conn.sendall(random_item.encode())conn.close()print("Connection closed")

pysparkStreamingNetwordCountCN.py  SparkStreaming处理实时数据流

python"># coding:utf8from __future__ import print_functionimport os
import sys
import jieba
from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 设置环境变量,确保指向正确的 Java 解释器
os.environ['JAVA_HOME'] = '/opt/HadoopEco/jdk1.8.0_431'  # 替换为你的 JDK 8 安装路径
os.environ['SPARK_HOME'] = '/opt/HadoopEco/spark-3.4.4-bin-without-hadoop'# 加载停用词表
def load_stopwords(file_path):"""从指定文件或文件夹中加载停用词列表。参数:file_path (str): 停用词文件或文件夹的路径。返回:set: 包含停用词的集合。"""stopwords = set()try:if os.path.isfile(file_path):with open(file_path, 'r', encoding='utf-8') as f:stopwords.update(line.strip() for line in f)elif os.path.isdir(file_path):for filename in os.listdir(file_path):file_full_path = os.path.join(file_path, filename)if os.path.isfile(file_full_path):with open(file_full_path, 'r', encoding='utf-8') as f:stopwords.update(line.strip() for line in f)else:print(f"Error: The path {file_path} is neither a file nor a directory.")except FileNotFoundError:print(f"Error: The file or directory {file_path} does not exist.")except PermissionError:print(f"Error: Permission denied for the file or directory {file_path}.")except Exception as e:print(f"An unexpected error occurred: {e}")return stopwords# 替换为你的停用词表路径或文件夹路径
stopwords = load_stopwords(sys.argv[3])  # 或 'path/to/stopwords_folder'def sparkstreamingnetworkcount():global sc, ssc, linessc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 10)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))def split_words(line):try:# 使用 jieba 进行中文分词chinese_words = jieba.lcut(line.strip())# 使用空格进行英文分词english_words = line.strip().split(" ")# 合并分词结果并过滤掉空字符串words = set(chinese_words + english_words) - {''}# 过滤掉停用词filtered_words = [word.lower() for word in words if word not in stopwords]return filtered_wordsexcept Exception as e:print(f"Error processing line: {line}, Error: {e}", file=sys.stderr)return []counts = lines.flatMap(split_words).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()ssc.awaitTermination()if __name__ == "__main__":if len(sys.argv) != 4:print("Usage: networkcount.py <hostname> <port> <stopwords>", file=sys.stderr)exit(-1)sparkstreamingnetworkcount()

运行时的运行参数配置

运行结果如下

DataSourceSoket.py

pysparkStreamingNetwordCountCN.py 运行结果

注意事项:

1. 需要先启动 DataSourceSocket.py, 在启动 pysparkStreamingNetwordCountCN.py


http://www.ppmy.cn/ops/141671.html

相关文章

python 下载 b站视频 和音频

video_bvid&#xff1a; import os import requests import json import re from bs4 import BeautifulSoup import subprocess # from detail_video import video_bvid# video_bvid 是一个从外部得到的单个视频ID video_bvid BV1cx421Q7veclass BilibiliVideoAudio:def __in…

新手上路,学Go还是Python

对于新手来说&#xff0c;Go和Python都是很好的编程语言&#xff0c;它们各有特点&#xff0c;以下是详细的对比来帮助你决定先学哪一个&#xff1a; 一、语法和学习难度 Python 语法简洁易懂&#xff1a;Python以其简洁、优雅的语法而闻名&#xff0c;代码的可读性很高。例如…

[Redis#19] 集群 | 数据分片 | docker模拟 | 故障转移 | 集群扩容

目录 集群 数据分片算法 1. 哈希求余 2 一致性哈希算法 3. 哈希槽分区算法 (Redis 使用) Docker搭建集群 i&#xff1a;创建目录和配置文件 编写 generate.sh 脚本 执行生成命令 ii&#xff1a;编写 docker-compose.yml 网络配置 iii: 构建集群 连接并验证集群 重…

TPM 2.0:安全固件的新标准

得益于可信计算组 ( TCG ) 推出的全新 TPM 2.0规范&#xff0c;联网设备可以更好地抵御网络攻击&#xff0c;并且不太可能受到错误的攻击。 制造商将可信平台模块 (TPM) 附加到设备上&#xff0c;以帮助用户和管理员验证其身份、生成和存储加密密钥以及确保平台完整性。 在 T…

【Innodb阅读笔记】之 本地搭建多个MYSQL

一、背景 在开展工作与学习任务的进程中&#xff0c;时常会涉及到运用多个 MySQL 实例执行特定操作的需求。例如&#xff0c;在深入研习主从复制机制时&#xff0c;借助多个 MySQL 实例能够更为直观地观察数据的传输与同步过程&#xff0c;有效加深对其原理及应用场景的理…

SpringBoot启动执行操作实现

伴随SpringBoot启动执行操作实现方式总结 引言方案使用EventListener监听使用ApplicationRunner 或者CommandLineRunner接口使用Spring Bean初始化方法使用Async注解Async 的主要效果和特性 使用SmartLifecycle接口 总结 引言 在实际项目中&#xff0c;经常需要在当项目启动的…

JavaScript 写css的内联样式

一、使用style属性-直接设置单个 CSS 属性 // 获取元素 var element document.getElementById("myElement");// 设置样式 element.style.color "red"; element.style.backgroundColor "blue"; element.style.fontSize "20px"; 二…

Qt源码阅读(六) ⏱️QTimer

Qt源码阅读(六) ⏱️QTimer Qt 为我们提供了一个非常实用的定时器&#xff08;QTimer&#xff09;&#xff0c;而在标准库中却没有类似的通用定时器。网络上有很多文章教你如何实现一个定时器&#xff0c;但本着就近原则&#xff0c;今天我们将深入阅读 Qt 中 QTimer 的源码&a…