第1关:SparkSql 数据清洗
# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
if __name__ =='__main__':spark = SparkSession.builder.appName("demo").master("local").getOrCreate()#**********begin**********#df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data.csv")df.createTempView("data")spark.sql("""select regexp_replace(TRIP_ID,'\\\W+','') as TRIP_ID ,regexp_replace(CALL_TYPE,'\\\W+','') as CALL_TYPE ,regexp_replace(ORIGIN_CALL,'\\\W+','') as ORIGIN_CALL ,regexp_replace(TAXI_ID,'\\\W+','') as TAXI_ID ,regexp_replace(ORIGIN_STAND,'\\\W+','') as ORIGIN_STAND ,regexp_replace(TIMESTAMP,'\\\W+','') as TIMESTAMP ,regexp_replace(POLYLINE,'\\\W+','') as POLYLINEfrom data""").show()#**********end**********#spark.stop()
第2关:SparkSql数据分析
# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
import jsonif __name__ == '__main__' :spark = SparkSession.builder.master("local").appName("demo").getOrCreate()#**********begin**********#df = spark.read.option("header",True).option("delimiter","\t").csv("/root/data2.csv")df.createTempView("data")spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL, TAXI_ID, ORIGIN_STAND, from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME ,POLYLINE from data").show()spark.udf.register("timeLen", lambda x: {(len(json.loads(x)) - 1) * 15 if len(json.loads(x)) > 0 else 8})spark.udf.register("startLocation", lambda x: {str(json.loads(x)[0]) if len(json.loads(x)) > 0 else ""})spark.udf.register( "endLocation", lambda x: {str(json.loads(x)[len(json.loads(x)) - 1]) if len(json.loads(x)) > 0 else ""})df.createTempView("data2")res=spark.sql("select TRIP_ID,CALL_TYPE,ORIGIN_CALL,TAXI_ID,ORIGIN_STAND,from_unixtime(TIMESTAMP,'yyyy-MM-dd') as TIME, POLYLINE, timeLen(POLYLINE) as TIMELEN, startLocation(POLYLINE) as STARTLOCATION, endLocation(POLYLINE) as ENDLOCATION from data2")res.createTempView("data3")res.show()spark.sql("select CALL_TYPE,TIME,count(1) as NUM from data3 group by TIME,CALL_TYPE order by CALL_TYPE,TIME").show()#**********end**********#