Hbase整合Mapreduce案例2 hbase数据下载至hdfs中——wordcount

embedded/2024/12/29 19:10:04/

目录

  • 整合结构
  • 准备
  • 数据下载
    • pom.xml
    • Main.java
    • Reduce.java
    • Map.java
    • 操作
  • 总结

整合结构

和案例1的结构差不多,Hbase移动到开头,后面跟随MR程序。
因此对于输入的K1 V1会进行一定的修改

准备

  1. 在HBASE中创建表,并写入数据
create "wunaiieq:sentence","colf"
  1. 系统文件上传

datain3.java

package org.wunaiieq.hbase2hdfs;import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.wunaiieq.HBaseConnection;
import org.wunaiieq.HbaseDML;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;public class datain3 {public static Connection connection = HBaseConnection.connection;public static void main(String[] args) throws IOException {BufferedReader bufferedReader =new BufferedReader(new FileReader("/opt/module/jar/data.txt"));String line =null;Table table = connection.getTable(TableName.valueOf("wunaiieq", "sentence"));int rowkey = 1;while ((line=bufferedReader.readLine())!=null){Put put = new Put(Bytes.toBytes(rowkey));put.addColumn(Bytes.toBytes("colf"),Bytes.toBytes("line"),Bytes.toBytes(line));table.put(put);rowkey++;}bufferedReader.close();}
}

在这里插入图片描述

数据下载

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.hbase</groupId><artifactId>hbase2hdfs</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><hadoop.version>3.1.3</hadoop.version><hbase.version>2.2.3</hbase.version></properties><dependencies><!-- Hadoop Dependencies --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-api</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-streaming</artifactId><version>${hadoop.version}</version></dependency><!-- HBase Dependencies --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-mapreduce</artifactId><version>${hbase.version}</version></dependency><!-- Other Dependencies --><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.19.1</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version><scope>compile</scope></dependency></dependencies><build><plugins><plugin><!--声明--><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><!--具体配置--><configuration><archive><manifest><!--jar包的执行入口--><mainClass>org.wunaiieq.hbase2hdfs.Main</mainClass></manifest></archive><descriptorRefs><!--描述符,此处为预定义的,表示创建一个包含项目所有依赖的可执行 JAR 文件;允许自定义生成jar文件内容--><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><!--执行配置--><executions><execution><!--执行配置ID,可修改--><id>make-assembly</id><!--执行的生命周期--><phase>package</phase><goals><!--执行的目标,single表示创建一个分发包--><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

Main.java

package org.wunaiieq.hbase2hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Main {public static void main(String[] args) throws Exception {//配置文件,写在resources目录下Job job =Job.getInstance(new Configuration());//入口类job.setJarByClass(Main.class);Scan scan = new Scan();TableMapReduceUtil.initTableMapperJob("wunaiieq:sentence",//表名scan,//表输入时,可以在此处进行部分设置,如选择查询的列簇,列,过滤行等等org.wunaiieq.hbase2hdfs.Map.class,//指定mapper类Text.class,//k2IntWritable.class,//v2job,false);job.setOutputKeyClass(Text.class);//K3job.setOutputValueClass(IntWritable.class);//V3job.setReducerClass(org.wunaiieq.hbase2hdfs.Reduce.class);//手动输入输出路径FileOutputFormat.setOutputPath(job,new Path(args[0]));job.waitForCompletion(true);}
}

Reduce.java

package org.wunaiieq.hbase2hdfs;import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;//                                        K3    V3     K4     V4
public class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{private IntWritable v4 =new IntWritable();private Text k4 =new Text();@Overrideprotected void reduce(Text k3, Iterable<IntWritable> v3,Context context) throws IOException, InterruptedException {int sum =0;for (IntWritable v30:v3){sum+=v30.get();}v4.set(sum);k4=k3;context.write(k4,v4);}
}

Map.java

package org.wunaiieq.hbase2hdfs;import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
//                                      K1   V1
public class Map extends TableMapper<Text,IntWritable> {private Text k2=new Text();private IntWritable v2 =new IntWritable(1);@Overrideprotected void map(ImmutableBytesWritable k1, Result v1,Context context) throws IOException, InterruptedException {System.out.println("k1:"+k1.toString());//读取当前行中的colf:line数据byte[] data =v1.getValue(Bytes.toBytes("colf"),Bytes.toBytes("line"));String line =Bytes.toString(data);String [] words =line.split(" ");for (String word :words){k2.set(word);context.write(k2,v2);}}
}

操作

打包上传至linux系统中

hadoop jar hbase2hdfs-1.0-SNAPSHOT-jar-with-dependencies.jar /output/test

检查文件

hdfs dfs -cat /output/test/part-r-00000

总结

没什么特殊点,记录下这两个案例即可,只需要在MR程序中替换掉对应的Mapper和Reducer即可


http://www.ppmy.cn/embedded/143177.html

相关文章

【计算机网络】实验8:聚合了不存在的网络导致的路由环路问题

实验 8&#xff1a;聚合了不存在的网络导致的路由环路问题 一、 实验目的 聚合了不存在的网络导致的路由环路问题。 网络故障导致的路由环路问题。 二、 实验环境 • Cisco Packet Tracer 模拟器 三、 实验内容 1、聚合了不存在的网络导致的路由环路问题 (1) 第一步&am…

duxapp 中 APP端支付宝支付组件

APP端支付宝支付组件 安装 yarn duxapp app add alipay安装之后添加到模块依赖中&#xff0c;然后重新编译之后就能使用 使用 导入 uiw/react-native-alipay 组件进行支付

pymysql模块详解

华子目录 简介安装pymysql连接对象常用方法游标对象常用方法数据库操作查改批量增加删 使用with语句总结 简介 pymysql是一个用于Python编程的第三方模块&#xff0c;用于连接和操作MySQL数据库。它提供了一个简单而强大的接口&#xff0c;使开发者能够轻松地在Python程序中执…

知乎大数据开发面试题及参考答案

Java 两个线程之间是怎么通信的,属于哪种机制? 在 Java 中,线程间通信主要有以下几种方式: 共享变量:线程可以通过访问共享变量来进行通信。例如,一个线程修改一个共享的成员变量,另一个线程读取这个变量的值。但是这种方式需要注意线程安全问题。如果多个线程同时访问和…

STM32F030单片机AD采集应用总结

最近在设计一款产品的AD时&#xff0c;采集到的电压老是比输入电压0.2V左右&#xff0c;电路如图所示 查阅资料得知&#xff0c;STM32f030 的输入阻抗应小于 50K。于是将电阻改为 39K/10K&#xff0c;但情况依旧。随后&#xff0c;干脆将电阻值改为 3.9K/1K&#xff0c;虽有一定…

linux 压缩命令,压缩a目录,但是不压缩a目录下的b目录,zip命令

在 Linux 中使用 zip 命令来压缩目录时&#xff0c;可以通过排除特定的目录或文件来实现你的需求。具体来说&#xff0c;你可以使用 -r 选项递归地压缩目录&#xff0c;并使用 -x 选项来排除特定的目录或文件。 假设你要压缩 a 目录&#xff0c;但不包括 a 目录下的 b 目录&am…

基于Matlab自适应滤波和特征提取的雷达信号分选与去噪方法研究

随着雷达技术的广泛应用&#xff0c;雷达信号在军事、航空航天、交通监控、气象探测等领域的应用变得越来越重要。然而&#xff0c;雷达信号在传输过程中常常受到多种噪声源的干扰&#xff0c;这些噪声干扰会严重影响信号的质量&#xff0c;从而对目标检测、跟踪、识别等后续处…

浅谈YashanDB三权分立

什么是三权分立&#xff1f; 三权分立&#xff0c;即是对DBA的职责进行划分&#xff0c;定义不同管理职位具备并行使不同角色&#xff0c;互相限制和监督&#xff0c;从机制上尽可能地防止因误操作删除或修改不属于职责范围内的数据或对象&#xff0c;保障系统整体安全&#x…