对kv类型的RDD数据集进行操作。
keys
"""
获取所有的key转换算子"""inputRdd = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)])
print(inputRdd.keys().collect())
# ['laoda', 'laoer', 'laosan', 'laosi']
values
"""
获取所有的value转换算子"""inputRdd = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)])
print(inputRdd.values().collect())
# [11, 22, 33, 44]
mapValues
"""
拿到所有的value值 对value进行改变 返回值 仍是以前的map转换算子
"""
inputRdd = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)])
print(inputRdd.mapValues(lambda values: values + 1).collect())# [('laoda', 12), ('laoer', 23), ('laosan', 34), ('laosi', 45)]
collectAsMap
"""
将二元组类型的RDD转换成一个Dict字典
必须是二元组 从表中查询后的结果需要先转为rdd ,再使用map将其转为二元组
触发算子
"""inputRdd = sc.parallelize([('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)])print(inputRdd.collectAsMap())# [('laoda', 11), ('laoer', 22), ('laosan', 33), ('laosi', 44)]
# {'laoda': 11, 'laoer': 22, 'laosan': 33, 'laosi': 44}dimMap = spark.sql("""
select * from dim.area_geo
""").rdd.map(lambda row:(row.geohash5,row.province+"-"+row.city+"-"+row.street)).collectAsMap()
print(dimMap)
join
join也可以视为kv类型的算子,因为是通过key值进行join操作的
rdd_singer_age = sc.parallelize([("周杰伦", 43), ("陈奕迅", 47), ("蔡依林", 41), ("林子祥", 74), ("陈升", 63)])rdd_singer_music = sc.parallelize([("周杰伦", "青花瓷"), ("陈奕迅", "孤勇者"), ("蔡依林", "日不落"), ("林子祥", "男儿当自强"),("动力火车", "当")])# leftOuterJoin 左为主 否则为None 外连接
print(rdd_singer_age.leftOuterJoin(rdd_singer_music).collectAsMap())
# join 内连接
print(rdd_singer_age.join(rdd_singer_music).collectAsMap())
# fullOuterJoin 全外连接 连接不上为None
print(rdd_singer_age.fullOuterJoin(rdd_singer_music).collectAsMap()){'蔡依林': (41, '日不落'), '陈升': (63, None), '陈奕迅': (47, '孤勇者'), '林子祥': (74, '男儿当自强'), '周杰伦': (43, '青花瓷')}{'蔡依林': (41, '日不落'), '陈奕迅': (47, '孤勇者'), '林子祥': (74, '男儿当自强'), '周杰伦': (43, '青花瓷')}{'动力火车': (None, '当'), '蔡依林': (41, '日不落'), '陈升': (63, None), '陈奕迅': (47, '孤勇者'), '林子祥': (74, '男儿当自强'), '周杰伦': (43, '青花瓷')}