大数据技术之HBase API(3)

embedded/2024/9/23 9:50:19/

目录

 HBase API

 环境准备

 创建连接

 单线程创建连接

 多线程创建连接

 DDL

 DML


 HBase API

 环境准备

新建项目后,在 pom.xml 中添加如下依赖:

<dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>2.4.11</version><exclusions><exclusion><groupId>org.glassfish</groupId><artifactId>javax.el</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.glassfish</groupId><artifactId>javax.el</artifactId><version>3.0.1-b06</version></dependency>
</dependencies>

注意:javax.el 包虽然会报错不存在,但这仅是一个测试用的依赖,不会影响实际使用。

 创建连接

根据官方API介绍,HBase的客户端连接由 ConnectionFactory 类来创建,使用完成后需手动关闭连接。由于连接是重量级的,建议每个进程只使用一个连接,通过连接中的 AdminTable 属性执行HBase命令。

 单线程创建连接

package com.lzl.hbase;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.CompletableFuture;public class HBaseConnect {public static void main(String[] args) throws IOException {// 1. 创建配置对象Configuration conf = new Configuration();// 2. 添加配置参数conf.set("hbase.zookeeper.quorum", "hadoop12,hadoop13,hadoop14");// 3. 创建 hbase 的连接// 默认使用同步连接Connection connection = ConnectionFactory.createConnection(conf);// 可以使用异步连接// 主要影响后续的 DML 操作CompletableFuture<AsyncConnection> asyncConnection = ConnectionFactory.createAsyncConnection(conf);// 4. 使用连接System.out.println(connection);// 5. 关闭连接connection.close();}
}

 多线程创建连接

使用单例模式确保一个连接被多个线程共享。

package com.lzl;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.CompletableFuture;public class HBaseConnect {// 设置静态属性 hbase 连接public static Connection connection = null;static {// 创建 hbase 的连接try {// 使用配置文件的方法connection = ConnectionFactory.createConnection();} catch (IOException e) {System.out.println("连接获取失败");e.printStackTrace();}}/*** 连接关闭方法,用于进程关闭时调用* @throws IOException*/public static void closeConnection() throws IOException {if (connection != null) {connection.close();}}
}

resources 文件夹中创建配置文件 hbase-site.xml,内容如下:

1<?xml version="1.0"?>
2<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
3<configuration>
4    <property>
5        <name>hbase.zookeeper.quorum</name>
6        <value>hadoop12,hadoop13,hadoop14</value>
7    </property>
8</configuration>

 DDL

创建 HBaseDDL 类,添加静态方法即可作为工具类

public class HBaseDDL {// 添加静态属性 connection 指向单例连接public static Connection connection = HBaseConnect.connection;/*** 创建命名空间* @param namespace 命名空间名称*/public static void createNamespace(String namespace) throws IOException {// 1. 获取 adminAdmin admin = connection.getAdmin();// 2. 调用方法创建命名空间// 2.1 创建命令空间描述建造者 => 设计师NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);// 2.2 给命令空间添加需求builder.addConfiguration("user", "lzl");// 2.3 使用 builder 构造出对应的添加完参数的对象 完成创建try {admin.createNamespace(builder.build());} catch (IOException e) {System.out.println("命令空间已经存在");e.printStackTrace();}// 3. 关闭 adminadmin.close();}/*** 判断表格是否存在* @param namespace 命名空间名称* @param tableName 表格名称* @return true 表示存在*/public static boolean isTableExists(String namespace, String tableName) throws IOException {// 1. 获取 adminAdmin admin = connection.getAdmin();// 2. 使用方法判断表格是否存在boolean b = false;try {b = admin.tableExists(TableName.valueOf(namespace, tableName));} catch (IOException e) {e.printStackTrace();}// 3. 关闭 adminadmin.close();// 3. 返回结果return b;}/*** 创建表格* @param namespace 命名空间名称* @param tableName 表格名称* @param columnFamilies 列族名称 可以有多个*/public static void createTable(String namespace, String tableName, String... columnFamilies) throws IOException {// 判断是否有至少一个列族if (columnFamilies.length == 0) {System.out.println("创建表格至少有一个列族");return;}// 判断表格是否存在if (isTableExists(namespace, tableName)) {System.out.println("表格已经存在");return;}// 1. 获取 adminAdmin admin = connection.getAdmin();// 2. 调用方法创建表格// 2.1 创建表格描述的建造者TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));// 2.2 添加参数for (String columnFamily : columnFamilies) {// 2.3 创建列族描述的建造者ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));// 2.4 对应当前的列族添加参数// 添加版本参数columnFamilyDescriptorBuilder.setMaxVersions(5);// 2.5 创建添加完参数的列族描述tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());}// 2.6 创建对应的表格描述try {admin.createTable(tableDescriptorBuilder.build());} catch (IOException e) {e.printStackTrace();}// 3. 关闭 adminadmin.close();}/*** 修改表格中一个列族的版本* @param namespace 命名空间名称* @param tableName 表格名称* @param columnFamily 列族名称* @param version 版本*/public static void modifyTable(String namespace, String tableName, String columnFamily, int version) throws IOException {// 判断表格是否存在if (!isTableExists(namespace, tableName)) {System.out.println("表格不存在无法修改");return;}// 1. 获取 adminAdmin admin = connection.getAdmin();try {// 2. 调用方法修改表格// 2.0 获取之前的表格描述TableDescriptor descriptor = admin.getDescriptor(TableName.valueOf(namespace, tableName));// 2.1 创建一个表格描述建造者// 如果使用填写 tableName 的方法 相当于创建了一个新的表格描述建造者 没有之前的信息// 如果想要修改之前的信息 必须调用方法填写一个旧的表格描述TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(descriptor);// 2.2 对应建造者进行表格数据的修改ColumnFamilyDescriptor columnFamily1 = descriptor.getColumnFamily(Bytes.toBytes(columnFamily));// 创建列族描述建造者// 需要填写旧的列族描述ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(columnFamily1);// 修改对应的版本columnFamilyDescriptorBuilder.setMaxVersions(version);// 此处修改的时候 如果填写的新创建 那么别的参数会初始化tableDescriptorBuilder.modifyColumnFamily(columnFamilyDescriptorBuilder.build());admin.modifyTable(tableDescriptorBuilder.build());} catch (IOException e) {e.printStackTrace();}// 3. 关闭 adminadmin.close();}/*** 删除表格* @param namespace 命名空间名称* @param tableName 表格名称* @return true 表示删除成功*/public static boolean deleteTable(String namespace, String tableName) throws IOException {// 1. 判断表格是否存在if (!isTableExists(namespace, tableName)) {System.out.println("表格不存在 无法删除");return false;}// 2. 获取 adminAdmin admin = connection.getAdmin();// 3. 调用相关的方法删除表格try {// HBase 删除表格之前 一定要先标记表格为不可以TableName tableName1 = TableName.valueOf(namespace, tableName);admin.disableTable(tableName1);admin.deleteTable(tableName1);} catch (IOException e) {e.printStackTrace();}// 4. 关闭 adminadmin.close();return true;}
}

 DML

创建类 HBaseDML
public class HBaseDML {// 添加静态属性 connection 指向单例连接public static Connection connection = HBaseConnect.connection;/*** 插入数据* @param namespace 命名空间名称* @param tableName 表格名称* @param rowKey 主键* @param columnFamily 列族名称* @param columnName 列名* @param value 值*/public static void putCell(String namespace, String tableName, String rowKey, String columnFamily, String columnName, String value) throws IOException {// 1. 获取 tableTable table = connection.getTable(TableName.valueOf(namespace, tableName));// 2. 调用相关方法插入数据// 2.1 创建 put 对象Put put = new Put(Bytes.toBytes(rowKey));// 2.2 给 put 对象添加数据put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(value));// 2.3 将对象写入对应的方法try {table.put(put);} catch (IOException e) {e.printStackTrace();}// 3. 关闭 tabletable.close();}/*** 读取数据 读取对应的一行中的某一列* @param namespace 命名空间名称* @param tableName 表格名称* @param rowKey 主键* @param columnFamily 列族名称* @param columnName 列名*/public static void getCells(String namespace, String tableName, String rowKey, String columnFamily, String columnName) throws IOException {// 1. 获取 tableTable table = connection.getTable(TableName.valueOf(namespace, tableName));// 2. 创建 get 对象Get get = new Get(Bytes.toBytes(rowKey));// 如果直接调用 get 方法读取数据 此时读一整行数据// 如果想读取某一列的数据 需要添加对应的参数get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));// 设置读取数据的版本get.readAllVersions();try {// 读取数据 得到 result 对象Result result = table.get(get);// 处理数据Cell[] cells = result.rawCells();// 测试方法: 直接把读取的数据打印到控制台// 如果是实际开发 需要再额外写方法 对应处理数据for (Cell cell : cells) {String value = new String(CellUtil.cloneValue(cell));System.out.println(value);}} catch (IOException e) {e.printStackTrace();}// 关闭 tabletable.close();}/*** 扫描数据* @param namespace 命名空间* @param tableName 表格名称* @param startRow 开始的 row 包含的* @param stopRow 结束的 row 不包含*/public static void scanRows(String namespace, String tableName, String startRow, String stopRow) throws IOException {// 1. 获取 tableTable table = connection.getTable(TableName.valueOf(namespace, tableName));// 2. 创建 scan 对象Scan scan = new Scan();// 如果此时直接调用 会直接扫描整张表// 添加参数 来控制扫描的数据// 默认包含scan.withStartRow(Bytes.toBytes(startRow));// 默认不包含scan.withStopRow(Bytes.toBytes(stopRow));try {// 读取多行数据 获得 scannerResultScanner scanner = table.getScanner(scan);// result 来记录一行数据 cell 数组// ResultScanner 来记录多行数据 result 的数组for (Result result : scanner) {Cell[] cells = result.rawCells();for (Cell cell : cells) {System.out.print(new String(CellUtil.cloneRow(cell)) + "-" + new String(CellUtil.cloneFamily(cell)) + "-" + new String(CellUtil.cloneQualifier(cell)) + "-" + new String(CellUtil.cloneValue(cell)) + "\t");}System.out.println();}} catch (IOException e) {e.printStackTrace();}// 3. 关闭 tabletable.close();}/*** 带过滤的扫描* @param namespace 命名空间* @param tableName 表格名称* @param startRow 开始 row* @param stopRow 结束 row* @param columnFamily 列族名称* @param columnName 列名* @param value value 值* @throws IOException*/public static void filterScan(String namespace, String tableName, String startRow, String stopRow, String columnFamily, String columnName, String value) throws IOException {// 1. 获取 tableTable table = connection.getTable(TableName.valueOf(namespace, tableName));// 2. 创建 scan 对象Scan scan = new Scan();// 如果此时直接调用 会直接扫描整张表// 添加参数 来控制扫描的数据// 默认包含scan.withStartRow(Bytes.toBytes(startRow));// 默认不包含scan.withStopRow(Bytes.toBytes(stopRow));// 可以添加多个过滤FilterList filterList = new FilterList();// 创建过滤器// (1) 结果只保留当前列的数据ColumnValueFilter columnValueFilter = new ColumnValueFilter(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),CompareOperator.EQUAL,Bytes.toBytes(value));// (2) 结果保留整行数据// 结果同时会保留没有当前列的数据SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),CompareOperator.EQUAL,Bytes.toBytes(value));// 本身可以添加多个过滤器filterList.addFilter(singleColumnValueFilter);// 添加过滤scan.setFilter(filterList);try {// 读取多行数据 获得 scannerResultScanner scanner = table.getScanner(scan);// result 来记录一行数据 cell 数组// ResultScanner 来记录多行数据 result 的数组for (Result result : scanner) {Cell[] cells = result.rawCells();for (Cell cell : cells) {System.out.print(new String(CellUtil.cloneRow(cell)) + "-" + new String(CellUtil.cloneFamily(cell)) + "-" + new String(CellUtil.cloneQualifier(cell)) + "-" + new String(CellUtil.cloneValue(cell)) + "\t");}System.out.println();}} catch (IOException e) {e.printStackTrace();}// 3. 关闭 tabletable.close();}/*** 删除 column 数据* @param nameSpace 命名空间* @param tableName 表格名称* @param rowKey 主键* @param family 列族* @param column 列名* @throws IOException*/public static void deleteColumn(String nameSpace, String tableName, String rowKey, String family, String column) throws IOException {// 1. 获取 tableTable table = connection.getTable(TableName.valueOf(nameSpace, tableName));// 2. 创建 Delete 对象Delete delete = new Delete(Bytes.toBytes(rowKey));// 3. 添加删除信息// 3.1 删除单个版本delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));// 3.2 删除所有版本// delete.addColumns(Bytes.toBytes(family), Bytes.toBytes(column));// 3.3 删除列族// delete.addFamily(Bytes.toBytes(family));// 3. 删除数据table.delete(delete);// 5. 关闭资源table.close();}public static void main(String[] args) throws IOException {// putCell("bigdata", "student", "1002", "info", "name", "lisi");// String cell = getCell("bigdata", "student", "1001", "info", "name");// System.out.println(cell);// List<String> strings = scanRows("bigdata", "student", "1001", "2000");// for (String string : strings) {//     System.out.println(string);// }deleteColumn("bigdata", "student", "1001", "info", "name");}
}

 


http://www.ppmy.cn/embedded/104188.html

相关文章

创建 AD9361 的 vivado 工程,纯FPGA配置,不使用ARM程序

前言 AD9361 的配置程序&#xff0c;如果使用官方的&#xff0c;就必须用ps进行配置&#xff0c;复杂不好使&#xff0c;如果直接使用FPGA配置&#xff0c;将会特别的简单。 配置软件 创建一份完整的寄存器配置表 //*******************************************************…

Python编程基础知识,让编程基础更加扎实(输出个人简介)

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;开发者-曼亿点 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 曼亿点 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a…

Qt中的各种“q+基本数据类型“

前言 虽说Qt支持C的数据类型&#xff0c;但是还是用Qt自己又封装的数据类型比较好。你在支持能有我原生的支持&#xff1f; 正文 先看qint系列 有qint8,quint8,qint16,quint16,qint32,quint32,qint64,quint64 源码如下 解读 1. typedef signed char qint8; 说明: 定义…

源代码编译,Apache DolphinScheduler前后端分离部署解决方案

转载自神龙大侠 生产环境部署方案 在企业线上生产环境中&#xff0c;普遍的做法是至少实施两套环境。 测试环境线上环境 测试环境用于验证代码的正确性&#xff0c;当测试环境验证ok后才会部署线上环境。 鉴于CI/CD应用的普遍性&#xff0c;源代码一键部署是必要的。 本文…

Spring Boot如何压缩Json并写入redis?

1.为什么需要压缩json&#xff1f; 由于业务需要&#xff0c;存入redis中的缓存数据过大&#xff0c;占用了10G的内存&#xff0c;内存作为重要资源&#xff0c;需要优化一下大对象缓存&#xff0c;采用gzip压缩存储&#xff0c;可以将 redis 的 kv 对大小缩小大约 7-8 倍&…

高效达人必备!Simple Sticky Notes让灵感与任务不再遗漏!

前言 阿尔伯特爱因斯坦所言&#xff1a;“我们不能用制造问题时的同一水平思维来解决它。”这句话深刻地揭示了创新与突破的必要性。正是基于这样的理念&#xff0c;Simple Sticky Notes这款桌面便签软件以其独特的创新视角和实用性&#xff0c;在众多同类软件中脱颖而出。 它…

Ubuntu/Linux 配置 locale

文章目录 Ubuntu/Linux 配置 locale1 概述2 locale2.1 locale 规则命令规则环境变量优先级 2.2 查看当前 locale 设置2.3 查看当前系统所有可用的 locale2.4 安装中文 locale 语言环境/字符集2.5 安装 locales 包2.6 使用 locale-gen 命令生成语言支持2.7 设置当前默认字符集 3…

恺英网络:有业绩,无“游戏”

2024年上半年&#xff0c;恺英网络的业绩依然很好&#xff0c;但有些不讲逻辑了。 8月22日晚&#xff0c;恺英网络发布了2024年半年度财报。 报告显示&#xff0c;上半年公司实现营业收入25.55亿元&#xff0c;同比增长29.28%&#xff1b;归母净利润和扣非净利润分别为8.09亿…