Spark Streaming从Kafka中拉取数据,并且使用过“窗口函数”统计一些流量信息

news/2024/11/20 1:36:43/

一、应用案例场景:

在Spark Streaming中,我们通常计算的是一段时间间隔内的数据。比如http://blog.csdn.net/tototuzuoquan/article/details/75094540这个案例中,统计单词出现次数时,每间隔5秒钟进行实时从Kafka中读取数据。但是当遇到一些其它的场景,比如一些流量计算类的,可能日志信息是30秒收集并被生成一次。但是我们有时候需要计算2.5分钟(150秒)中流量变化情况。所以这时候就要用到Spark Streaming中的窗口函数来实现这种业务需求。

二、关于流量相关的日志文件说明



三、日志格式如下

2016-06-30 00:18:57,10.242.91.130,117.135.250.52,33799,80,1,0,8270,mmbiz.qpic.cn,/mmbiz/3tsIcoqhsab2ybEANKia7va097HPwphsEOU7DjyQGTXA5LGzjJElVMKaRMJFiaAnDLVzN3j5libBbAzPrgky95libQ/0?wx_fmt=gif&tp=webp&wxfrom=5&wx_lazy=1,17,638,0,200,0,9698,618685,0,1,0,0,2,4,1,211,Mozilla/5.0 (Linux; Android 5.1; m1 metal Build/LMY47I) AppleWebKit/537.36 (KHTML%2C like Gecko) Version/4.0 Chrome/37.0.0.0 Mobile MQQBrowser/6.2 TBS/036524 Safari/537.36 MicroMessenger/6.3.18.800 NetType/WIFI Language/zh_CN,601647,0,0,8270,437,219,419
2016-06-30 00:19:00,10.242.149.112,120.198.203.230,64736,80,1,1,5247,szshort.weixin.qq.com,/mmtls/7dd6aa5f,94,20,1,200,0,1316,2940,4,1,5,1,45,5494,341,94,MicroMessenger Client,589,0,1,5247,3093,10,10
2016-06-30 00:18:52,10.242.162.104,117.187.22.24,10032,80,1,0,92,comm.inner.bbk.com,/clientRequest/detectCollectControlMsg?apiVison=2&model=vivo+Y27&netType=WIFI&elapsedtime=177150983&imei=867534026587667&sysvison=PD1410BL_PD1410LMA_1.15.5&plan_switch=1&modelNumber=vivo+Y27&cs=0,78,11,0,500,0,533,509,1,1,1,1,2,10,3,78,IQooAppstore,4,0,0,92,78,5,6
2016-06-30 00:18:33,10.242.237.196,117.135.196.194,59561,80,1,1,1703,117.135.196.194,/videos/v0/20160629/00/0b/e6e8417d9074fa7d262f537980ca4f08.f4v?key=050b684298dc90e6319fd3f97f654a4a5&src=iqiyi.com&qd_tvid=502276800&qd_vipres=0&qd_index=6&qd_aid=204159701&qd_stert=1850267&qd_scc=4ab54c5ef32351913d522de9958615f0&qd_sc=e30b195312b2975a136a0900265bbd40&qd_src=02022001010000000000&qd_ip=7587d460&qd_uid=1296545976&qd_tm=1467216601880&qd_vip=1&qyid=860734037201152&qypid=&la=CMNET|GuiZhou&li=guiyang_cmnet&lsp=-1&lc=18&uuid=7587d460-5773f471-ce,2,3205,1,206,0,75700,2163473,0,1,0,0,2,5,1,2,HCDNClient_ANDROID_MOBILE;0.0.0.0;QK/0.0.0.0,2097152,0,1,1703,1693,1588,1617
2016-06-30 00:14:52,10.242.7.143,117.135.250.56,56012,80,1,0,1,i.gtimg.cn,/qqlive/images/20141217/corner_pic_lianzai.png?rand=1467216908,1,8,0,65535,0,528,0,8,0,0,0,1,1,1,1,Mozilla/4.0 (compatible; MSIE 5.00; Windows 98),0,0,0,1,1,8,0
2016-06-30 00:14:53,10.242.210.84,183.232.10.27,60160,80,1,0,278,i0.letvimg.com,/lc07_isvrs/201606/29/09/49/8609ad16-4a6a-4957-8147-62440bfc1429/thumb/2_400_300.jpg,1,9,0,65535,0,736,0,60,0,0,0,1,1,1,1,Dalvik/2.1.0 (Linux; U; Android 5.0.1; HUAWEI GRA-UL00 Build/HUAWEIGRA-UL00),0,0,0,278,1,9,0
2016-06-30 00:14:54,10.242.210.84,183.232.10.27,32898,80,1,0,1,i1.letvimg.com,/lc07_isvrs/201606/29/18/26/a8975515-5b03-4c75-af89-e1dc3487d04c/thumb/2_400_300.jpg,1,17,0,65535,0,1160,0,17,0,0,0,1,1,1,1,Dalvik/2.1.0 (Linux; U; Android 5.0.1; HUAWEI GRA-UL00 Build/HUAWEIGRA-UL00),0,0,0,1,1,17,0
2016-06-30 00:18:49,10.242.132.12,117.135.252.141,55817,80,1,0,3282,ww3.sinaimg.cn,/or480/eb5de2dfjw1f56d2p47ujj20id5221kx.jpg,3,74,0,200,0,854,85431,0,1,0,0,1,6,8,2,Weibo/7174 CFNetwork/758.2.8 Darwin/15.0.0,82462,0,0,3282,67,15,59
2016-06-30 00:19:05,10.242.72.31,183.232.119.170,37365,80,1,0,398,wap.mpush.qq.com,/push/conn2?mid=0e0e75865dc4c9c44a86b446518fd8714ab8406e&devid=861916035175798&mac=1c%253A77%253Af6%253Ab5%253Ab3%253Ae7&did=861916035175798&qqnetwork=gsm&bid=10001&msgid=66108462&store=303&screen_height=1920&Cookie=%20lskey%3D%3B%20luin%3D%3B%20skey%3D%3B%20uin%3D%3B%20logintype%3D0%20&apptype=android&netstate=4&hw=OPPO_OPPOR9tm&appver=22_android_5.0.1&uid=846d4c65230ae371&screen_width=1080&qn-sig=d74d29d89304463d9d17a7dd1eee91a8&qn-rid=935575748&imsi=460021808763237&auth=3e0c10fff52338d72bc23450,25,10,0,200,0,1022,965,1,1,1,1,25,102,13,25,%E8%85%BE%E8%AE%AF%E6%96%B0%E9%97%BB501(android),589,0,0,398,25,5,5
2016-06-30 00:15:05,10.242.153.62,183.232.90.90,47509,80,1,0,13,monitor.uu.qq.com,/analytics/rqdsync,1,7,0,65535,0,958,0,7,0,0,0,1,1,1,1,,369,0,0,13,1,7,0
2016-06-30 00:12:31,10.242.169.95,117.135.250.120,56104,80,1,1,376073,117.135.250.120,/cache.p4p.com/k0017irptuc.mp4?vkey=345ED8D9B599A30FCF19C84D138517D9BB48CB597C4400D5F204B38101C098F616CCF866B944D6FDAD0DC4A10C3ABA92F2D663EC34B591C64E93754872E4F776A4F5AEE545A3D0A1A6B357F5B91C3AB61BFEF45EBD95ABF3&sdtfrom=v5060&type=mp4&platform=60301&fmt=mp4&level=0&br=60&sp=0,2,37433,1,206,0,706581,36054392,1,1,1,1,2,253,1,2,Mozilla/4.0 (compatible; MSIE 5.00; Windows 98),46926315,0,1,376073,376073,13070,24363
2016-06-30 00:18:47,10.242.98.209,117.135.250.55,50244,80,1,0,161,nlogtj.zuoyebang.cc,/nlogtj/rule/zuoye_android_1.0.0.rule,86,10,0,200,0,459,589,1,1,1,1,2,159,30,86,Dalvik/1.6.0 (Linux; U; Android 4.4.2; find7 Build/JDQ39),215,0,0,161,86,6,4
2016-06-30 00:18:02,10.242.174.78,112.12.6.139,24629,80,1,1,45199,c.f1zd.com,/b/1/656/fid9fid.swf?uid=519327,62,31,1,200,0,796,24305,1,1,1,1,62,2,8459,62,Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML%2C like Gecko) Chrome/45.0.2454.101 Safari/537.36,23168,1,0,45199,128,11,20
2016-06-30 00:15:09,10.242.37.84,117.135.250.55,54828,80,1,0,1,d1.sina.com.cn,/litong/lijun/0525/mm_15890324_8176878_30042709.js?uuid=wap_photo_pics,1,5,0,65535,0,773,0,0,0,0,0,1,1,1,1,Mozilla/5.0 (iPhone; CPU iPhone OS 9_3_2 like Mac OS X) AppleWebKit/601.1.46 (KHTML%2C like Gecko) Version/9.0 Mobile/13F69 Safari/601.1,0,0,0,1,1,5,0
2016-06-30 00:16:30,10.242.151.53,117.187.19.46,47439,80,1,0,7,cfgstatic.51y5.net,/php/configunion.html,3,19,0,200,0,476,1032,1,1,1,1,2,4,2,3,,165,0,0,7,3,8,11

三、创建项目

项目创建参考:http://blog.csdn.net/tototuzuoquan/article/details/75094540


四、编写项目代码

package cn.toto.sparkimport cn.toto.spark.streams.LoggerLevels
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/14.*/
case class DataBean(session_count: Int,      //会话数retrans_count: Long,      //重传次数connect_time: Long,       //连接时常resp_delay: Long,         //相应延迟total_packets: Long,      //总包数retrans_packets: Int,     //重传包数up_bytes: Int,            //上行流量字节数down_bytes: Int           //下行流量) extends Serializable/*** Created by ZX on 2016/7/13.*/
object TrafficCount {//这里用于累加,把a的DataBean和b的DataBean最后返回给一个新的DataBeanval reduceFunc = (a: DataBean, b: DataBean) => {DataBean(a.session_count + b.session_count,a.retrans_count + b.retrans_count,a.connect_time + b.connect_time,a.resp_delay + b.resp_delay,a.total_packets + b.total_packets,a.retrans_packets + b.retrans_packets,a.up_bytes + b.up_bytes,a.down_bytes + b.down_bytes)}def main(args: Array[String]) {LoggerLevels.setStreamingLogLevels()val Array(zkQuorum, group, topics, numThreads) = argsval sparkConf = new SparkConf().setAppName("TrafficCount").setMaster("local[2]")//设置30秒从kafka中拉取数据val ssc = new StreamingContext(sparkConf, Seconds(30))ssc.checkpoint("E://workspace//trafficResult")val topicMap = topics.split(",").map((_, numThreads.toInt)).toMapval data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)//拿出一行行的数据,_._2就是kafka中的数据val lines: DStream[String] = data.map(_._2)val tupleData: DStream[((String, String), DataBean)] = lines.map(line => {//切分字段val fields = line.split(",")//将时间和IP作为Key,其他字段封装到DataBean中作为Value//fields(0)           :记录生成时间//fields(1)           : 用户IP//DataBean((fields(0), fields(1)), DataBean(fields(5).toInt,fields(6).toLong,fields(7).toLong,fields(10).toLong,fields(11).toLong,fields(12).toInt,fields(16).toInt,fields(17).toInt))})val result: DStream[((String, String), DataBean)] = tupleData.reduceByKeyAndWindow(reduceFunc,Seconds(180),    //窗口参数(6个窗口的时间长度),因为上面的每个窗口长度是30秒Seconds(150))    //设置滑动的长度是5个窗口的长度//将结果保存到HBase或者redis等其它的地方。result.foreachRDD(rdd => {rdd.foreachPartition(part => {//将下面的代码修改一下表明就可以了//val hConf = new HBaseConfiguration()//val hTable = new HTable(hConf, "table")part.foreach(x => {//修改一下对应的列名就可以了//val thePut = new Put(Bytes.toBytes(row(0)))//thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)), Bytes.toBytes(row(0)))//hTable.put(thePut)})//htable.close()})})ssc.start()ssc.awaitTermination()}
}
三、关于项目的运行

参考:http://blog.csdn.net/tototuzuoquan/article/details/75094540,和这里面的运行方式一样。只需要把上面的url放到kafka中,然后即可进行计算了


http://www.ppmy.cn/news/336566.html

相关文章

用TM320C55x汇编语言编写一个实现自相关的程序,并给出注释

TM320C55x汇编语言中,可以使用指令MACRO来实现自相关函数,MACRO可以将复杂的指令序列组合成一条指令,使自相关函数求解实现变得更加简单。比如,在TM320C55x汇编语言中,可以使用如下的指令序列来实现自相关函数&#xf…

想做个项目练练手却不知道做啥?松哥准备了 50 个热门需求文档给大家!

之前有好几位小伙伴向松哥反映,学完了微人事项目的技术点之后,想再做个项目练练手,但是却不知道该做啥!不了解业务就无从下手。 其实我理解大家的这种处境,因此,这个周末松哥花了两天时间,整理…

使用 NDK r9 编译ffmpeg

转自:http://wang-peng1.iteye.com/blog/2004897 1. 环境 ubuntu 我的是13.10 ndk r9d x86_64 2. 下载ffmpeg http://www.ffmpeg.org/download.html 找到 FFmpeg 2.1.3 "Fourier" 2.1.3 was released on 2014-01-15. It is the latest stable FFmpeg r…

java tm 和jdk_三大厂商JDK版本信息参考

目前主要的JDK有Oracle(原SUN)、HP、IBM,各厂商JDK版本信息参考如下,以便于安装部署时确认JDK类型和版本。 一、Oracle JDK java version "1.6.0_26" Java(TM) SE Runtime Environment (build 1.6.0_26-b03) Java HotSpot(TM) Client VM (build 20.1-b02, mixed mo…

lte tm模式_TD―LTE TM8传输模式分析

【摘 要】针对TD-LTE R9阶段新增加的TM8技术,从原理分析上给出了TM8的实现基本原理以及相较于TM3/TM7等传输模式的优缺点。结合实际测试情况,分析了TM8在现网中的性能,并对提高单用户吞吐量、提升小区吞吐量、抗小区间干扰等方面进行了详细的对比分析,同时给出了TM8的应用建…

9.17lamp

一。挂载镜像 转载于:https://www.cnblogs.com/chl1022/p/9666912.html

oppoR9m降级 root刷机 Magiskroot 解锁system文件夹

降级root资料下载,挑选需要的下载 阿里云盘不让分享,离谱 方法一: 1.手机安装Magisk版降级包(app版本24.3/24300)(直接获取面具root) 2.进入桌面会提示安装Magisk,安装不了去官网下载一个就可以了 3.电脑用奇兔刷机刷入TWRPrecovery(r9m-twrp-recovery-3.0.2-160707.img)…

android菜单键 r9,【报Bug】android oppoR9tm 使用subnvue导航栏初始化时 导航栏阴影闪烁...

详细问题描述 (DCloud产品不会有明显的bug,所以你遇到的问题大都是在特定环境下才能重现的问题,请仔细描述你的环境和重现方式,否则DCloud很难排查解决你的问题) [内容] 重现步骤 android oppoR9tm 版本5.1 使用subnvue导航栏 界面初始化时 导…