1. kafka
具体步骤:
- 启动
zookeeper、kafka
Shell
方法测试kafka producer、consumer
生产消费情况- 启动
hdfs、yarn
- 提交
spark
任务消费kafka
消息
1.1 启动 zk 和 kafka
[root@bogon bin]# cd /usr/local/src/zookeeper-3.4.14/bin
[root@bogon bin]# ./zkServer.sh start# 查看状态,也可以使用 ps -ef | grep zookeeper
[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/src/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: standalone# 启动 kafka
[root@localhost bin]# cd /usr/local/src/kafka_2.11-2.4.1/bin/
[root@localhost bin]# ./kafka-server-start.sh -daemon /usr/local/src/kafka_2.11-2.4.1/config/server.properties# 检查是否启动成功
[root@localhost bin]# jps
20817 QuorumPeerMain
24954 Kafka
26364 Jps
1.2 测试 kafka
1、创建 topic
:
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.
2、查看 topic
:
# 查看 topic 分区
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
topic2
3、启动生产者:
# 生产消息
[root@bogon bin]# ./kafka-console-producer.sh --broker-list localhost:9092 -topic topic2
>[2021-01-23 13:18:52,684] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
>hello world
>hello python
>hello spark
生产者启动成功后,会一直卡在那,等待输入消息,以上已输入三组信息,现在来启动一个消费者消费。
4、启动消费者:
# 接收消息
[root@bogon bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic topic2 --from-beginning
>hello world
>hello python
>hello spark
1.3 启动 hdfs 和 yarn
1、启动 hdfs
:
[root@bogon sbin]# cd /home/hj/app/hadoop-2.6.0-cdh5.7.0/sbin/
[root@bogon sbin]# ./start-dfs.sh
[root@bogon sbin]# jps
8881 SecondaryNameNode
7379 DataNode
8664 NameNode
9261 Jps
1054 QuorumPeerMain
2、启动 yarn
:
[root@bogon sbin]# ./start-yarn.sh
starting yarn daemons
starting resourcemanager, logging to /home/hj/app/hadoop-2.6.0-cdh5.7.0/logs/yarn-root-resourcemanager-bogon.out
localhost: /usr/bin/python: No module named virtualenvwrapper
localhost: virtualenvwrapper.sh: There was a problem running the initialization hooks.
localhost:
localhost: If Python could not import the module virtualenvwrapper.hook_loader,
localhost: check that virtualenvwrapper has been installed for
localhost: VIRTUALENVWRAPPER_PYTHON=/usr/bin/python and that PATH is
localhost: set properly.
localhost: starting nodemanager, logging to /home/hj/app/hadoop-2.6.0-cdh5.7.0/logs/yarn-root-nodemanager-bogon.out[root@bogon sbin]# jps
12945 DataNode
13089 SecondaryNameNode
14065 Jps
13924 ResourceManager
12840 NameNode
14031 NodeManager
1.4 spark 消费 kafka 消息
1.4.1 配置依赖包
spark streaming
连接 kafka
需要依赖两个 jar
包:
- spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar:其中
2.11
表示scala
版本 - spark-streaming-kafka-0-8_2.11-2.4.4.jar
否则无法连接 kafka
,具体可参照官网:http://spark.apache.org/docs/2.4.4/streaming-kafka-integration.html
。
下载完毕后,将其拷贝到 spark/jars
目录下:
[root@bogon jars]# cd /home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars
[root@bogon jars]# cp ../../spark-streaming-kafka-0-8-assembly_2.11-2.4.7.jar .
[root@bogon jars]# cp ../../spark-streaming-kafka-0-8_2.11-2.4.4.jar .
1.4.2 编写 SparkStreaming 程序
# coding=utf-8import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilsif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: test_spark_kafka_streaming.py topic")sys.exit(-1)conf = SparkConf().setAppName("test_kafka_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10) # 流处理 时间 10s brokers = "localhost:9092"topic = sys.argv[-1]kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})lines_rdd = kvs.map(lambda x: x[1])counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()
该程序是一个简单的 wordcount
程序, 接收一个参数 topic
,为 kafka topic
,10
秒钟处理一次 kafka
的消息。
kafka stream 两种模式
spark streaming
从 kafka
接收数据,有两种方式
-
使用
Direct API
,这是更底层的kafka API
-
使用
receivers
方式,这是更为高层次的API
brokers = "localhost:9092"
topic = sys.argv[-1]# receiver 模式
line = KafkaUtils.createStream(ssc, brokers, 'test', {topic: 1})# no receiver 模式
line = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
两种方式各有利弊,有关更多详解,请参考:
https://blog.csdn.net/pysense/article/details/104179736/
https://www.cnblogs.com/heml/p/6796414.html
1.4.3 提交 spark 任务
因为我之前就已经开启了 kafka producer
,如果你还没开启可以参照 1.2 测试 kafka,现在将 test_spark_kafka_streaming.py
提交到 yarn
上去执行:
[root@bogon bin]# ./spark-submit --master local[2] --name test_kafka_streaming /home/hj/app/test_spark_kafka_streaming.py topic2
1、kafka produce
:
[root@bogon bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic topic2
>hello spark hello python hello world hello alina
2、yarn
日志:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RYpQSd6c-1675346664787)(https://hubery624.oss-cn-shenzhen.aliyuncs.com/ddb6cfe195cc8cd30d52496f8a4eecf.png)]
1.5 参考文章
- Spark2.1.0+入门:Apache Kafka作为DStream数据源(Python版)
- pyspark streaming简介 和 消费 kafka示例
- 基于PySpark整合Spark Streaming与Kafka
2. socket 字节流
1、test_spark_steaming_socket.py
:
# coding=utf-8from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextif __name__ == '__main__':conf = SparkConf().setAppName("test_socket_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)lines_rdd = ssc.socketTextStream("localhost", 9999)counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()
2、安装并启动 nc
:
[root@bogon app]# yum install nc[root@bogon app]# nc -lk 9999
hello world
hello spark
hello python
hello spark hello python hello world
3、提交 spark
任务:
[root@bogon bin]# ./spark-submit --master local[2] --name test_socket_streaming /home/hj/app/test_spark_steaming_socket.py
注意:一定要先启动
nc
,否则spark
报错!
3. 文件流
1、test_spark_streaming_file.py
:
# coding=utf-8import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: test_spark_streaming_file.py file_path")sys.exit(-1)conf = SparkConf().setAppName("test_socket_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)lines_rdd = ssc.textFileStream(sys.argv[-1])counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()
2、提交 spark
任务:
# 监听 /home/hj/app/logfile/ 这个目录,在此之前先创建 logfile 目录
[root@bogon bin]# ./spark-submit --master local[2] --name test_file_streaming /home/hj/app/test_spark_streaming_file.py /home/hj/app/logfile/
3、创建文件并输入以下数据:
[root@bogon app]# mkdir logfile
[root@bogon app]# cd logfile/
[root@bogon logfile]# vim test
I love Hadoop
I love Spark
Spark is fast
注意:文件流只会监听
spark
任务已经启动后创建的文件,对于已经创建的文件是不能监听到的!
访问:http://192.168.131.131:4040/jobs/
可查看任务详细:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yoCmAmby-1675346664788)(https://hubery624.oss-cn-shenzhen.aliyuncs.com/20210131194416.png)]
4. hbase 伪分布式安装
1、下载:http://archive.apache.org/dist/hbase/1.1.2/
2、解压并配置环境变量:
tar -zxvf hbase-1.1.2-bin.tar.gz -C /home/hj/app/[root@bogon app]# vim ~/.bash_profile
export HBASE_HOME=/home/hj/app/hbase-1.1.2
PATH=$PATH:$JAVE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin# 使其生效
[root@bogon app]# source ~/.bash_profile
3、配置 hbase-env.sh
:
# HBASE_CLASSPATH设置为本机Hadoop安装目录下的 conf目录(即 /home/hj/app/hadoop-2.6.0-cdh5.7.0/conf)export JAVA_HOME=/home/hj/app/jdk1.8.0_261
export HBASE_CLASSPATH=/home/hj/app/hadoop-2.6.0-cdh5.7.0/conf
export HBASE_MANAGES_ZK=true
4、配置 conf/hbase-site.xml
:
<configuration><property><name>hbase.rootdir</name># <value>hdfs://localhost:9000/hbase</value> 错误<value>hdfs://localhost/hbase</value></property><property><name>hbase.cluster.distributed</name><value>true</value></property>
</configuration>
- 修改
hbase.rootdir
,指定HBase
数据在HDFS
上的存储路径;将属性hbase.cluter.distributed
设置为true
。假设当前Hadoop集群运行在伪分布式模式下,在本机上运行,且NameNode
运行在 9000 端口。 hbase.rootdir
指定HBase的存储目录; 设置集群处于分布式模式.
5、运行 hadoop
:
./home/hj/app/hadoop-2.6.0-cdh5.7.0/sbin/start-dfs.sh
6、运行 hbase
:
[root@bogon hbase-1.1.2]# ./bin/start-hbase.sh
localhost: /usr/bin/python: No module named virtualenvwrapper
localhost: virtualenvwrapper.sh: There was a problem running the initialization hooks.
localhost:
localhost: If Python could not import the module virtualenvwrapper.hook_loader,
localhost: check that virtualenvwrapper has been installed for
localhost: VIRTUALENVWRAPPER_PYTHON=/usr/bin/python and that PATH is
localhost: set properly.
localhost: starting zookeeper, logging to /home/hj/app/hbase-1.1.2/bin/../logs/hbase-root-zookeeper-bogon.out
starting master, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-master-bogon.out
starting regionserver, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-1-regionserver-bogon.out# 查看启动的进程,主要是 HRegionServer 和 HMaster
[root@bogon hbase-1.1.2]# jps
5586 NodeManager
5045 SecondaryNameNode
112135 Kafka
126087 HRegionServer
125977 HMaster
126376 Jps
1082 QuorumPeerMain
4730 NameNode
5468 ResourceManager
4847 DataNode
4.1 踩坑
4.1.1 启动没有 HMaster
进程
启动后没有 HMaster
进程,查看 hbase-1.1.2/bin/../logs/hbase-root-zookeeper-bogon.out
日志:
java.net.BindException: 地址已在使用at sun.nio.ch.Net.bind0(Native Method)at sun.nio.ch.Net.bind(Net.java:444)at sun.nio.ch.Net.bind(Net.java:436)at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:225)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)at org.apache.hadoop.hbase.zookeeper.HQuorumPeer.runZKServer(HQuorumPeer.java:94)at org.apache.hadoop.hbase.zookeeper.HQuorumPeer.main(HQuorumPeer.java:79)
其实是因为 hbase
启动时也会启动 zookeeper
,但是在此之前我已经启动了 zookeeper
,导致地址被占用。
解决方法
- 停止
zookeeper
服务,启动hbase
时自动启动zookeeper
服务 - 设置
hbase-env.sh
配置文件,将HBASE_MANAGES_ZK
设置为false
,禁用zookeeper
启动,通过手动启动
[root@bogon conf]# vim hbase-env.sh
export JAVA_HOME=/home/hj/app/jdk1.8.0_261
export HBASE_CLASSPATH=/home/hj/app/hadoop-2.6.0-cdh5.7.0/conf
export HBASE_MANAGES_ZK=false
4.1.2 启动时报:regionserver running as process 16996. Stop it first.
,
具体错误详情:
[root@bogon bin]# sh start-hbase.sh
starting master, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-master-bogon.out
regionserver running as process 16996. Stop it first.
解决方法
先 kill
掉 regionserver
,再启动 hbase
,kill -9 16996
4.1.3 启动报 SLF4J: Class path contains multiple SLF4J bindings
原因:hbase
和 hadoop
的 jar
包名冲突,修改 hbase 的 jar
包名:
mv slf4j-logj12-1.7.25.jar slf4j-logj12-1.7.25.jar-copy
参考文章:https://blog.csdn.net/qq_45135120/article/details/107048944
4.1.4 启动报 Java HotSpot™ 64-Bit Server VM warning: ignoring option PermSize=128m;
参考文章:https://blog.csdn.net/tiankong_12345/article/details/93585463
4.1.5 hbase shell 操作时报 Can’t get master address from ZooKeeper; znode data == null
问题:HMaster
进程掉了,没起来:
解决办法:http://blog.51yip.com/hadoop/2193.html
hbase 操作
[root@bogon hbase-1.1.2]# ./bin/hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hj/app/hbase-1.1.2/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hj/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-01-17 11:53:03,530 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2, rcc2b70cf03e3378800661ec5cab11eb43fafe0fc, Wed Aug 26 20:11:27 PDT 2015
[root@bogon bin]# ./hbase shell
2021-02-06 21:54:05,371 WARN [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2, rcc2b70cf03e3378800661ec5cab11eb43fafe0fc, Wed Aug 26 20:11:27 PDT 2015hbase(main):001:0> list
TABLE
0 row(s) in 0.7260 seconds=> []hbase(main):005:0> create 'student', 'info'
0 row(s) in 2.4760 seconds=> Hbase::Table - student
hbase(main):006:0> describe 'student'
Table student is ENABLED
student
COLUMN FAMILIES DESCRIPTION
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BL
OCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZ
E => '65536', REPLICATION_SCOPE => '0'}
1 row(s) in 0.3350 secondshbase(main):002:0> put 'student','1','info:name','Xueqian'
0 row(s) in 0.5210 secondshbase(main):003:0> put 'student','1','info:gender','F'
0 row(s) in 0.0340 secondshbase(main):004:0> put 'student','1','info:age','23'
0 row(s) in 0.0490 secondshbase(main):005:0> put 'student','2','info:name','Weiliang'
0 row(s) in 0.0480 secondshbase(main):006:0> put 'student','2','info:gender','M'
0 row(s) in 0.0570 secondshbase(main):007:0> put 'student','2','info:age','24'
0 row(s) in 0.0270 secondshbase(main):008:0> get 'student', '1'
COLUMN CELL info:age timestamp=1612620059368, value=23 info:gender timestamp=1612620052093, value=F info:name timestamp=1612620042398, value=Xueqian
3 row(s) in 0.1330 secondshbase(main):010:0> scan 'student'
ROW COLUMN+CELL 1 column=info:age, timestamp=1612620059368, value=23 1 column=info:gender, timestamp=1612620052093, value=F 1 column=info:name, timestamp=1612620042398, value=Xueqian 2 column=info:age, timestamp=1612620095417, value=24 2 column=info:gender, timestamp=1612620086286, value=M 2 column=info:name, timestamp=1612620076564, value=Weiliang
2 row(s) in 0.0870 seconds
export SPARK_DIST_CLASSPATH=$(/home/hj/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop classpath):$(/home/hj/app/hbase-1.1.2/bin/hbase classpath):/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars/hbase/*
4.2 pyspark 读写 hbase
4.2.1 配置 spark
spark
想要读写 hbase
,需要将 hbase
中相关jar
包拷贝到 spark/jars
中,需要拷贝的 jar
包有:所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar
:
1、spark
安装目录 jars
中,新建 hbase 目录
[root@bogon spark-2.2.0-bin-2.6.0-cdh5.7.0]# cd jars
[root@bogon jars]# pwd
/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jarsmkdir hbase
cp /home/hj/app/hbase-1.1.2/lib/hbase*.jar hbase/
cp /home/hj/app/hbase-1.1.2/lib/guava-12.0.1.jar hbase/
cp /home/hj/app/hbase-1.1.2/lib/htrace-core-3.1.0-incubating.jar hbase/
cp /home/hj/app/hbase-1.1.2/lib/protobuf-java-2.5.0.jar hbase/
2、Spark 2.0
版本上缺少相关把 hbase
的数据转换 python
可读取的 jar
包,需要另行下载 spark-example-1.6.0.jar,下载后再将其包括到 hbase/
中:
mv ~/下载/spark-examples* hbase/
4.2.2 读写 hbase
1、打开 pyspark shell
终端:
# 切换到 spark/bin 目录,启动 pyspark shell 终端
[root@bogon spark-2.2.0-bin-2.6.0-cdh5.7.0]# cd bin/
[root@bogon bin]# pwd
/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/bin
[root@bogon bin]# ls
beeline load-spark-env.cmd pyspark2.cmd spark-class sparkR2.cmd spark-shell.cmd spark-submit.cmd
beeline.cmd load-spark-env.sh pyspark.cmd spark-class2.cmd sparkR.cmd spark-sql
derby.log metastore_db run-example spark-class.cmd spark-shell spark-submit
find-spark-home pyspark run-example.cmd sparkR spark-shell2.cmd spark-submit2.cmd
[root@bogon bin]# ./pyspark
2、读取之前 hbase
中数据
hbase(main):002:0> scan 'student'
ROW COLUMN+CELL 1 column=info:age, timestamp=1612620059368, value=23 1 column=info:gender, timestamp=1612620052093, value=F 1 column=info:name, timestamp=1612620042398, value=Xueqian 2 column=info:age, timestamp=1612620095417, value=24 2 column=info:gender, timestamp=1612620086286, value=M 2 column=info:name, timestamp=1612620076564, value=Weiliang
2 row(s) in 3.3040 seconds
3、读取:
>>> host = 'localhost'
>>> table = 'student'
>>> conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
>>> keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
>>> valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
>>> hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
>>> count = hbase_rdd.count()
>>> hbase_rdd.cache()
MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:208
>>> output = hbase_rdd.collect()
>>> for (k, v) in output:
... print(k, v)
...
(u'1', u'{"qualifier" : "age", "timestamp" : "1612620059368", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}\n{"qualifier" : "gender", "timestamp" : "1612620052093", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}\n{"qualifier" : "name", "timestamp" : "1612620042398", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}')
(u'2', u'{"qualifier" : "age", "timestamp" : "1612620095417", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}\n{"qualifier" : "gender", "timestamp" : "1612620086286", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}\n{"qualifier" : "name", "timestamp" : "1612620076564", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}')
>>> count
2
4、写入数据:
>>> k1_conv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
>>> v1_conv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>>> conf_1 = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
>>> rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
>>> sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf_1,keyConverter=k1_conv,valueConverter=v1_conv)
21/02/09 22:31:50 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20210209223147_0008.
java.lang.IllegalArgumentException: Can not create a Path from a null stringat org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)at org.apache.hadoop.fs.Path.<init>(Path.java:135)at org.apache.hadoop.fs.Path.<init>(Path.java:89)at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:132)at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:861)at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):File "<stdin>", line 1, in <module>File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/pyspark/rdd.py", line 1393, in saveAsNewAPIHadoopDatasetkeyConverter, valueConverter, True)File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/pyspark/sql/utils.py", line 79, in decoraise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from a null string'
查看是否写入成功:
# 发现多了两条新数据,写入成功
hbase(main):003:0> scan 'student'
ROW COLUMN+CELL 1 column=info:age, timestamp=1612620059368, value=23 1 column=info:gender, timestamp=1612620052093, value=F 1 column=info:name, timestamp=1612620042398, value=Xueqian 2 column=info:age, timestamp=1612620095417, value=24 2 column=info:gender, timestamp=1612620086286, value=M 2 column=info:name, timestamp=1612620076564, value=Weiliang 3 column=info:name, timestamp=1612881109880, value=Rongcheng 4 column=info:name, timestamp=1612881109879, value=Guanhua
4 row(s) in 0.1310 seconds
注意:写入数据会报错
pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from a null string'
,但是仍会写入成功!