Spark Streaming

news/2024/11/24 5:36:09/

1. kafka

具体步骤:

  • 启动 zookeeper、kafka
  • Shell 方法测试 kafka producer、consumer 生产消费情况
  • 启动 hdfs、yarn
  • 提交 spark 任务消费 kafka 消息

1.1 启动 zk 和 kafka

[root@bogon bin]# cd /usr/local/src/zookeeper-3.4.14/bin
[root@bogon bin]# ./zkServer.sh start# 查看状态,也可以使用 ps -ef | grep zookeeper
[root@localhost bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/src/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: standalone# 启动 kafka
[root@localhost bin]# cd /usr/local/src/kafka_2.11-2.4.1/bin/
[root@localhost bin]# ./kafka-server-start.sh -daemon /usr/local/src/kafka_2.11-2.4.1/config/server.properties# 检查是否启动成功
[root@localhost bin]# jps
20817 QuorumPeerMain
24954 Kafka
26364 Jps

1.2 测试 kafka

1、创建 topic

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

2、查看 topic

# 查看 topic 分区
[root@localhost bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
topic2

3、启动生产者:

# 生产消息
[root@bogon bin]# ./kafka-console-producer.sh --broker-list localhost:9092 -topic topic2
>[2021-01-23 13:18:52,684] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
>hello world
>hello python
>hello spark

生产者启动成功后,会一直卡在那,等待输入消息,以上已输入三组信息,现在来启动一个消费者消费。

4、启动消费者:

# 接收消息
[root@bogon bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic topic2 --from-beginning
>hello world
>hello python
>hello spark

1.3 启动 hdfs 和 yarn

1、启动 hdfs

[root@bogon sbin]# cd /home/hj/app/hadoop-2.6.0-cdh5.7.0/sbin/
[root@bogon sbin]# ./start-dfs.sh 
[root@bogon sbin]# jps
8881 SecondaryNameNode
7379 DataNode
8664 NameNode
9261 Jps
1054 QuorumPeerMain

2、启动 yarn

[root@bogon sbin]# ./start-yarn.sh 
starting yarn daemons
starting resourcemanager, logging to /home/hj/app/hadoop-2.6.0-cdh5.7.0/logs/yarn-root-resourcemanager-bogon.out
localhost: /usr/bin/python: No module named virtualenvwrapper
localhost: virtualenvwrapper.sh: There was a problem running the initialization hooks.
localhost: 
localhost: If Python could not import the module virtualenvwrapper.hook_loader,
localhost: check that virtualenvwrapper has been installed for
localhost: VIRTUALENVWRAPPER_PYTHON=/usr/bin/python and that PATH is
localhost: set properly.
localhost: starting nodemanager, logging to /home/hj/app/hadoop-2.6.0-cdh5.7.0/logs/yarn-root-nodemanager-bogon.out[root@bogon sbin]# jps
12945 DataNode
13089 SecondaryNameNode
14065 Jps
13924 ResourceManager
12840 NameNode
14031 NodeManager

1.4 spark 消费 kafka 消息

1.4.1 配置依赖包

spark streaming 连接 kafka 需要依赖两个 jar 包:

  • spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar:其中 2.11 表示 scala 版本
  • spark-streaming-kafka-0-8_2.11-2.4.4.jar

否则无法连接 kafka,具体可参照官网:http://spark.apache.org/docs/2.4.4/streaming-kafka-integration.html

下载完毕后,将其拷贝到 spark/jars 目录下:

[root@bogon jars]# cd /home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars
[root@bogon jars]# cp ../../spark-streaming-kafka-0-8-assembly_2.11-2.4.7.jar .
[root@bogon jars]# cp ../../spark-streaming-kafka-0-8_2.11-2.4.4.jar .

1.4.2 编写 SparkStreaming 程序

# coding=utf-8import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtilsif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: test_spark_kafka_streaming.py topic")sys.exit(-1)conf = SparkConf().setAppName("test_kafka_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)	# 流处理 时间 10s brokers = "localhost:9092"topic = sys.argv[-1]kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})lines_rdd = kvs.map(lambda x: x[1])counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

该程序是一个简单的 wordcount 程序, 接收一个参数 topic,为 kafka topic10 秒钟处理一次 kafka 的消息。

kafka stream 两种模式

spark streamingkafka 接收数据,有两种方式

  • 使用 Direct API,这是更底层的 kafka API

  • 使用 receivers方式,这是更为高层次的 API

brokers = "localhost:9092"
topic = sys.argv[-1]# receiver 模式
line = KafkaUtils.createStream(ssc, brokers, 'test', {topic: 1})# no receiver 模式
line = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

两种方式各有利弊,有关更多详解,请参考:

  • https://blog.csdn.net/pysense/article/details/104179736/
  • https://www.cnblogs.com/heml/p/6796414.html

1.4.3 提交 spark 任务

因为我之前就已经开启了 kafka producer,如果你还没开启可以参照 1.2 测试 kafka,现在将 test_spark_kafka_streaming.py 提交到 yarn 上去执行:

[root@bogon bin]# ./spark-submit --master local[2] --name test_kafka_streaming /home/hj/app/test_spark_kafka_streaming.py topic2

1、kafka produce

[root@bogon bin]# ./kafka-console-producer.sh --broker-list localhost:9092 --topic topic2
>hello spark hello python hello world hello alina

2、yarn 日志:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RYpQSd6c-1675346664787)(https://hubery624.oss-cn-shenzhen.aliyuncs.com/ddb6cfe195cc8cd30d52496f8a4eecf.png)]

1.5 参考文章

  • Spark2.1.0+入门:Apache Kafka作为DStream数据源(Python版)
  • pyspark streaming简介 和 消费 kafka示例
  • 基于PySpark整合Spark Streaming与Kafka

2. socket 字节流

1、test_spark_steaming_socket.py

# coding=utf-8from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextif __name__ == '__main__':conf = SparkConf().setAppName("test_socket_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)lines_rdd = ssc.socketTextStream("localhost", 9999)counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

2、安装并启动 nc

[root@bogon app]# yum install nc[root@bogon app]# nc -lk 9999
hello world
hello spark
hello python
hello spark hello python hello world

3、提交 spark 任务:

[root@bogon bin]# ./spark-submit --master local[2] --name test_socket_streaming /home/hj/app/test_spark_steaming_socket.py 

注意:一定要先启动 nc,否则 spark 报错!

3. 文件流

1、test_spark_streaming_file.py

# coding=utf-8import sys
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContextif __name__ == '__main__':if len(sys.argv) != 2:print("Usage: test_spark_streaming_file.py file_path")sys.exit(-1)conf = SparkConf().setAppName("test_socket_streaming").setMaster("local[2]")sc = SparkContext(conf=conf)ssc = StreamingContext(sc, 10)lines_rdd = ssc.textFileStream(sys.argv[-1])counts = lines_rdd.flatMap(lambda x: x.split(" ")). \map(lambda x: (x, 1)). \reduceByKey(lambda x, y: x + y)counts.pprint()ssc.start()ssc.awaitTermination()

2、提交 spark 任务:

# 监听 /home/hj/app/logfile/ 这个目录,在此之前先创建 logfile 目录
[root@bogon bin]# ./spark-submit --master local[2] --name test_file_streaming /home/hj/app/test_spark_streaming_file.py /home/hj/app/logfile/

3、创建文件并输入以下数据:

[root@bogon app]# mkdir logfile
[root@bogon app]# cd logfile/
[root@bogon logfile]# vim test
I love Hadoop
I love Spark
Spark is fast

注意:文件流只会监听 spark 任务已经启动后创建的文件,对于已经创建的文件是不能监听到的!

访问:http://192.168.131.131:4040/jobs/ 可查看任务详细:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yoCmAmby-1675346664788)(https://hubery624.oss-cn-shenzhen.aliyuncs.com/20210131194416.png)]

4. hbase 伪分布式安装

1、下载:http://archive.apache.org/dist/hbase/1.1.2/

2、解压并配置环境变量:

tar -zxvf hbase-1.1.2-bin.tar.gz -C /home/hj/app/[root@bogon app]# vim ~/.bash_profile
export HBASE_HOME=/home/hj/app/hbase-1.1.2
PATH=$PATH:$JAVE_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HBASE_HOME/bin# 使其生效
[root@bogon app]# source ~/.bash_profile

3、配置 hbase-env.sh

# HBASE_CLASSPATH设置为本机Hadoop安装目录下的 conf目录(即 /home/hj/app/hadoop-2.6.0-cdh5.7.0/conf)export JAVA_HOME=/home/hj/app/jdk1.8.0_261
export HBASE_CLASSPATH=/home/hj/app/hadoop-2.6.0-cdh5.7.0/conf
export HBASE_MANAGES_ZK=true

4、配置 conf/hbase-site.xml

<configuration><property><name>hbase.rootdir</name># <value>hdfs://localhost:9000/hbase</value>  错误<value>hdfs://localhost/hbase</value></property><property><name>hbase.cluster.distributed</name><value>true</value></property>
</configuration>
  • 修改 hbase.rootdir,指定 HBase 数据在 HDFS 上的存储路径;将属性hbase.cluter.distributed 设置为 true。假设当前Hadoop集群运行在伪分布式模式下,在本机上运行,且 NameNode 运行在 9000 端口。
  • hbase.rootdir 指定HBase的存储目录; 设置集群处于分布式模式.

5、运行 hadoop

./home/hj/app/hadoop-2.6.0-cdh5.7.0/sbin/start-dfs.sh 

6、运行 hbase

[root@bogon hbase-1.1.2]# ./bin/start-hbase.sh
localhost: /usr/bin/python: No module named virtualenvwrapper
localhost: virtualenvwrapper.sh: There was a problem running the initialization hooks.
localhost: 
localhost: If Python could not import the module virtualenvwrapper.hook_loader,
localhost: check that virtualenvwrapper has been installed for
localhost: VIRTUALENVWRAPPER_PYTHON=/usr/bin/python and that PATH is
localhost: set properly.
localhost: starting zookeeper, logging to /home/hj/app/hbase-1.1.2/bin/../logs/hbase-root-zookeeper-bogon.out
starting master, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-master-bogon.out
starting regionserver, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-1-regionserver-bogon.out# 查看启动的进程,主要是 HRegionServer 和 HMaster
[root@bogon hbase-1.1.2]# jps
5586 NodeManager
5045 SecondaryNameNode
112135 Kafka
126087 HRegionServer
125977 HMaster
126376 Jps
1082 QuorumPeerMain
4730 NameNode
5468 ResourceManager
4847 DataNode

4.1 踩坑

4.1.1 启动没有 HMaster 进程

启动后没有 HMaster 进程,查看 hbase-1.1.2/bin/../logs/hbase-root-zookeeper-bogon.out 日志:

java.net.BindException: 地址已在使用at sun.nio.ch.Net.bind0(Native Method)at sun.nio.ch.Net.bind(Net.java:444)at sun.nio.ch.Net.bind(Net.java:436)at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:225)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)at org.apache.hadoop.hbase.zookeeper.HQuorumPeer.runZKServer(HQuorumPeer.java:94)at org.apache.hadoop.hbase.zookeeper.HQuorumPeer.main(HQuorumPeer.java:79)

其实是因为 hbase 启动时也会启动 zookeeper,但是在此之前我已经启动了 zookeeper,导致地址被占用。

解决方法

  • 停止 zookeeper 服务,启动 hbase 时自动启动 zookeeper 服务
  • 设置 hbase-env.sh 配置文件,将 HBASE_MANAGES_ZK 设置为 false,禁用 zookeeper 启动,通过手动启动
[root@bogon conf]# vim hbase-env.sh 
export JAVA_HOME=/home/hj/app/jdk1.8.0_261
export HBASE_CLASSPATH=/home/hj/app/hadoop-2.6.0-cdh5.7.0/conf
export HBASE_MANAGES_ZK=false

4.1.2 启动时报:regionserver running as process 16996. Stop it first.

具体错误详情:

[root@bogon bin]# sh start-hbase.sh
starting master, logging to /home/hj/app/hbase-1.1.2/logs/hbase-root-master-bogon.out
regionserver running as process 16996. Stop it first.

解决方法

killregionserver,再启动 hbasekill -9 16996


4.1.3 启动报 SLF4J: Class path contains multiple SLF4J bindings

原因:hbasehadoopjar 包名冲突,修改 hbase 的 jar 包名:

mv slf4j-logj12-1.7.25.jar slf4j-logj12-1.7.25.jar-copy

参考文章:https://blog.csdn.net/qq_45135120/article/details/107048944

4.1.4 启动报 Java HotSpot™ 64-Bit Server VM warning: ignoring option PermSize=128m;

参考文章:https://blog.csdn.net/tiankong_12345/article/details/93585463

4.1.5 hbase shell 操作时报 Can’t get master address from ZooKeeper; znode data == null

问题:HMaster 进程掉了,没起来:

解决办法:http://blog.51yip.com/hadoop/2193.html

hbase 操作

[root@bogon hbase-1.1.2]# ./bin/hbase shell
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hj/app/hbase-1.1.2/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hj/app/hadoop-2.6.0-cdh5.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-01-17 11:53:03,530 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2, rcc2b70cf03e3378800661ec5cab11eb43fafe0fc, Wed Aug 26 20:11:27 PDT 2015
[root@bogon bin]# ./hbase shell
2021-02-06 21:54:05,371 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.1.2, rcc2b70cf03e3378800661ec5cab11eb43fafe0fc, Wed Aug 26 20:11:27 PDT 2015hbase(main):001:0> list
TABLE                                                                                                                                
0 row(s) in 0.7260 seconds=> []hbase(main):005:0> create 'student', 'info'
0 row(s) in 2.4760 seconds=> Hbase::Table - student
hbase(main):006:0> describe 'student'
Table student is ENABLED                                                                                            
student                                                                                                             
COLUMN FAMILIES DESCRIPTION                                                                                         
{NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BL
OCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZ
E => '65536', REPLICATION_SCOPE => '0'}                                                                             
1 row(s) in 0.3350 secondshbase(main):002:0> put 'student','1','info:name','Xueqian'
0 row(s) in 0.5210 secondshbase(main):003:0> put 'student','1','info:gender','F'
0 row(s) in 0.0340 secondshbase(main):004:0> put 'student','1','info:age','23'
0 row(s) in 0.0490 secondshbase(main):005:0> put 'student','2','info:name','Weiliang'
0 row(s) in 0.0480 secondshbase(main):006:0> put 'student','2','info:gender','M'
0 row(s) in 0.0570 secondshbase(main):007:0> put 'student','2','info:age','24'
0 row(s) in 0.0270 secondshbase(main):008:0> get 'student', '1'
COLUMN                             CELL                                                                                              info:age                          timestamp=1612620059368, value=23                                                                 info:gender                       timestamp=1612620052093, value=F                                                                  info:name                         timestamp=1612620042398, value=Xueqian                                                            
3 row(s) in 0.1330 secondshbase(main):010:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         
2 row(s) in 0.0870 seconds
export SPARK_DIST_CLASSPATH=$(/home/hj/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop classpath):$(/home/hj/app/hbase-1.1.2/bin/hbase classpath):/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jars/hbase/*

4.2 pyspark 读写 hbase

4.2.1 配置 spark

spark 想要读写 hbase,需要将 hbase 中相关jar 包拷贝到 spark/jars 中,需要拷贝的 jar 包有:所有hbase开头的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar

1、spark 安装目录 jars 中,新建 hbase 目录

[root@bogon spark-2.2.0-bin-2.6.0-cdh5.7.0]# cd jars
[root@bogon jars]# pwd
/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/jarsmkdir hbase
cp /home/hj/app/hbase-1.1.2/lib/hbase*.jar hbase/
cp /home/hj/app/hbase-1.1.2/lib/guava-12.0.1.jar  hbase/
cp /home/hj/app/hbase-1.1.2/lib/htrace-core-3.1.0-incubating.jar  hbase/
cp /home/hj/app/hbase-1.1.2/lib/protobuf-java-2.5.0.jar  hbase/

2、Spark 2.0 版本上缺少相关把 hbase 的数据转换 python 可读取的 jar 包,需要另行下载 spark-example-1.6.0.jar,下载后再将其包括到 hbase/ 中:

mv ~/下载/spark-examples* hbase/

4.2.2 读写 hbase

1、打开 pyspark shell 终端:

# 切换到 spark/bin 目录,启动 pyspark shell 终端
[root@bogon spark-2.2.0-bin-2.6.0-cdh5.7.0]# cd bin/
[root@bogon bin]# pwd
/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/bin
[root@bogon bin]# ls
beeline          load-spark-env.cmd  pyspark2.cmd     spark-class       sparkR2.cmd       spark-shell.cmd    spark-submit.cmd
beeline.cmd      load-spark-env.sh   pyspark.cmd      spark-class2.cmd  sparkR.cmd        spark-sql
derby.log        metastore_db        run-example      spark-class.cmd   spark-shell       spark-submit
find-spark-home  pyspark             run-example.cmd  sparkR            spark-shell2.cmd  spark-submit2.cmd
[root@bogon bin]# ./pyspark

2、读取之前 hbase 中数据

hbase(main):002:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         
2 row(s) in 3.3040 seconds

3、读取:

>>> host = 'localhost'
>>> table = 'student'
>>> conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
>>> keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
>>> valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
>>> hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
>>> count = hbase_rdd.count()
>>> hbase_rdd.cache()
MapPartitionsRDD[2] at mapPartitions at SerDeUtil.scala:208
>>> output = hbase_rdd.collect()
>>> for (k, v) in output:
...     print(k, v)
... 
(u'1', u'{"qualifier" : "age", "timestamp" : "1612620059368", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "23"}\n{"qualifier" : "gender", "timestamp" : "1612620052093", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "F"}\n{"qualifier" : "name", "timestamp" : "1612620042398", "columnFamily" : "info", "row" : "1", "type" : "Put", "value" : "Xueqian"}')
(u'2', u'{"qualifier" : "age", "timestamp" : "1612620095417", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "24"}\n{"qualifier" : "gender", "timestamp" : "1612620086286", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "M"}\n{"qualifier" : "name", "timestamp" : "1612620076564", "columnFamily" : "info", "row" : "2", "type" : "Put", "value" : "Weiliang"}')
>>> count
2

4、写入数据:

>>> k1_conv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
>>> v1_conv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
>>> conf_1 = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
>>> rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']
>>> sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf_1,keyConverter=k1_conv,valueConverter=v1_conv)
21/02/09 22:31:50 ERROR io.SparkHadoopMapReduceWriter: Aborting job job_20210209223147_0008.
java.lang.IllegalArgumentException: Can not create a Path from a null stringat org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)at org.apache.hadoop.fs.Path.<init>(Path.java:135)at org.apache.hadoop.fs.Path.<init>(Path.java:89)at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:132)at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:101)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)at org.apache.spark.api.python.PythonRDD$.saveAsHadoopDataset(PythonRDD.scala:861)at org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset(PythonRDD.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)at py4j.Gateway.invoke(Gateway.java:280)at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):File "<stdin>", line 1, in <module>File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/pyspark/rdd.py", line 1393, in saveAsNewAPIHadoopDatasetkeyConverter, valueConverter, True)File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__File "/home/hj/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/python/pyspark/sql/utils.py", line 79, in decoraise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from a null string'

查看是否写入成功:

# 发现多了两条新数据,写入成功
hbase(main):003:0> scan 'student'
ROW                                COLUMN+CELL                                                                                       1                                 column=info:age, timestamp=1612620059368, value=23                                                1                                 column=info:gender, timestamp=1612620052093, value=F                                              1                                 column=info:name, timestamp=1612620042398, value=Xueqian                                          2                                 column=info:age, timestamp=1612620095417, value=24                                                2                                 column=info:gender, timestamp=1612620086286, value=M                                              2                                 column=info:name, timestamp=1612620076564, value=Weiliang                                         3                                 column=info:name, timestamp=1612881109880, value=Rongcheng                                        4                                 column=info:name, timestamp=1612881109879, value=Guanhua                                          
4 row(s) in 0.1310 seconds

注意:写入数据会报错 pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from a null string',但是仍会写入成功!


http://www.ppmy.cn/news/21630.html

相关文章

Windows实时运动控制软核(六):LOCAL高速接口测试之Matlab

今天&#xff0c;正运动小助手给大家分享一下MotionRT7的安装和使用&#xff0c;以及使用Matlab对MotionRT7开发的前期准备。 01 MotionRT7简介 MotionRT7是深圳市正运动技术推出的跨平台运动控制实时内核&#xff0c;也是国内首家完全自主自研&#xff0c;自主可控的Windows…

算法刷题-回文数、找出小于平均值的数、旋转图像(C_C++)

文章目录回文数找出小于平均值的数旋转图像回文数 给你一个整数 x &#xff0c;如果 x 是一个回文整数&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 回文数是指正序&#xff08;从左向右&#xff09;和倒序&#xff08;从右向左&#xff09;读都是一样的…

面了个阿里拿28k跳槽出来的,真正见识到了跳槽天花板

2022年已经结束了&#xff0c;迎来的是2023崭新的一年&#xff0c;最近内卷严重&#xff0c;各种跳槽裁员&#xff0c;相信很多小伙伴也在准备金三银四的面试计划。 作为一个入职5年的老人家&#xff0c;目前工资比较乐观&#xff0c;但是我还是会选择跳槽&#xff0c;因为感觉…

浅读人月神话(完)

读书笔记&#xff1a;今天翻书读完了人月神话&#xff0c;总的来说有些囫囵吞枣。在后半本书中&#xff0c;Brooks的表达中有更多的悲观主义色彩&#xff0c;软件是难以开发而且不存在银弹的&#xff0c;但追求稳定的质量&#xff0c;更高的效率以及可控的成本是项目经理是需要…

STM32开发(1)----stm32f103c6t6开发板介绍和环境搭建

stm32f103c6t6开发板介绍一、stm32f103c6t6芯片资源介绍STM32 的命名规则二、最小系统开发板介绍三、开发板基本使用方法软件安装MDK5 安装安装STM32芯片包安装licenseUSB转串口驱动安装四、本文小结一、stm32f103c6t6芯片资源介绍 stm32f103c6t6 是一款基于 ARM Cortex M3 内…

智慧物业管理系统的设计与实现

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下&#xff0c;你想解决的问…

STL——string类

一、标准库中的string类 1.string类文档介绍 &#xff08;1&#xff09;字符串是表示字符序列的类。 &#xff08;2&#xff09;标准的字符串类提供了对此类对象的支持&#xff0c;其接口类似于标准字符容器的接口&#xff0c;但添加了专门用于操作单字节字符字符串的设计特…

requests

基本使用 response的属性以及类型 类型 &#xff1a;models.Responser.text : 获取网站源码r.encoding &#xff1a;访问或定制编码方式r.url &#xff1a;获取请求的urlr.content &#xff1a;响应的字节类型r.status_code &#xff1a;响应的状态码r.headers &#xff1a;响…