hbase 工具类

ops/2024/11/13 22:37:31/

hbase__0">hbase 工具类

pom.xml

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.5.10-hadoop3</version>
</dependency>
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>33.2.1-jre</version>
</dependency>

RowKey注解

package cn.lhz.util.annotation;import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.annotation.ElementType;/*** @author 李昊哲* @version 1.0.0*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
public @interface RowKeyAnnotation {
}

hbase___38">hbase 自定义 过滤器

package cn.lhz.util.hbase;import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.Cell;/*** hbase 自定义 过滤器** @author 李昊哲* @version 1.0.0*/
public class CustomFilter extends FilterBase {private final String targetValue;public CustomFilter(String targetValue) {this.targetValue = targetValue;}@Overridepublic ReturnCode filterCell(Cell cell) {// 在这里实现过滤逻辑// 比如,如果单元格的值等于 targetValue,则允许它通过byte[] value = cell.getValueArray();String cellValue = new String(value, cell.getValueOffset(), cell.getValueLength());if (cellValue.equals(targetValue)) {// 允许通过return ReturnCode.INCLUDE;} else {// 跳过此单元格return ReturnCode.SKIP;}}
}

hbase__78">hbase 工具类

package cn.lhz.util.hbase;import cn.lhz.util.annotation.RowKeyAnnotation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
import java.util.regex.Pattern;/*** @author 李昊哲* @version 1.0.0*/
public class HbaseUtil {// 创建 hbase 配置 对象private final static Configuration conf = HBaseConfiguration.create();/*** 获取 Hbase 连接** @return Hbase 连接* @throws IOException IOException*/public static Connection getConnection() throws IOException {return ConnectionFactory.createConnection(conf);}/*** 获取 org.apache.hadoop.hbase.client.Admin 对象** @return org.apache.hadoop.hbase.client.Admin 对象* @throws IOException IOException*/public static Admin getAdmin() throws IOException {return getConnection().getAdmin();}/*** 获取 org.apache.hadoop.hbase.client.Admin 对象** @param connection Connection* @return org.apache.hadoop.hbase.client.Admin 对象* @throws IOException IOException*/public static Admin getAdmin(Connection connection) throws IOException {return connection.getAdmin();}/*** 断开 hbase 连接** @param connection Connection* @throws IOException IOException*/public static void closeConnection(Connection connection) throws IOException {if (connection != null) {connection.close();}}/*** 释放 org.apache.hadoop.hbase.client.Admin 对象** @param admin org.apache.hadoop.hbase.client.Admin 对象* @throws IOException IOException*/public static void closeAdmin(Admin admin) throws IOException {if (admin != null) {admin.close();}}/*** 释放集群管理对象和连接** @param admin      集群管理对象* @param connection 客户端与集群建立的连接* @throws IOException IOException*/public static void closeAdminAndConnection(Admin admin, Connection connection) throws IOException {closeAdmin(admin);closeConnection(connection);}/*** 获取 数据表** @param tableName 数据表名称* @return 数据表* @throws IOException IOException*/public static Table getTable(String tableName) throws IOException {return getConnection().getTable(TableName.valueOf(tableName));}/*** 获取 数据表** @param connection 客户端与集群建立的连接* @param tableName  数据表名称* @return 数据表* @throws IOException IOException*/public static Table getTable(Connection connection, String tableName) throws IOException {return connection.getTable(TableName.valueOf(tableName));}/*** 创建数据表** @param admin             集群管理对象* @param name              数据表名称* @param columnFamilyNames 一个或多个ColumnFamilyName* @throws IOException IOException*/public static void createTable(Admin admin, String name, String... columnFamilyNames) throws IOException {// 4.1、定义表名称TableName tableName = TableName.valueOf(name);// 4.2、构建表描述构造器TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName);// 4.3、构建列簇描述构造器Collection<ColumnFamilyDescriptor> collection = new ArrayList<>();if (columnFamilyNames != null) {for (String columnFamilyName : columnFamilyNames) {// 4.4、构建列簇描述collection.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamilyName)).build());}}// 4.5、表结构构造器与列簇构造器建立关联tableDescriptorBuilder.setColumnFamilies(collection);// 4.6、定义描述对象TableDescriptor tableDescriptor = tableDescriptorBuilder.build();// 4.7、创建表admin.createTable(tableDescriptor);}/*** 删除数据表** @param admin 数据库管理对象* @param name  数据表名称* @return 是否删除* @throws IOException IOException*/public static boolean deleteTable(Admin admin, String name) throws IOException {// 定义表名称TableName tableName = TableName.valueOf(name);// 删除表 分两步 先禁用再删除if (admin.tableExists(tableName)) {// 禁用表admin.disableTable(tableName);// 删除表admin.deleteTable(tableName);}return true;}/*** 新增或更新数据** @param columnFamilyName columnFamilyName* @param e                数据表对应 javabean 对象* @param <E>              数据表对应 javabean 数据类型* @throws IOException            IOException* @throws IllegalAccessException IllegalAccessException*/public static <E> void upsert(Table table, String columnFamilyName, E e) throws IOException, IllegalAccessException {// 获取对象的 Class 对象Class<?> aClass = e.getClass();Put put = null;for (Field field : aClass.getDeclaredFields()) {// 设置允许访问私有属性field.setAccessible(true);// 获取属性 RowKeyAnnotation 注解RowKeyAnnotation annotation = field.getAnnotation(RowKeyAnnotation.class);if (annotation != null) {// 根据 RowKey 构建 Put 对象put = new Put(Bytes.toBytes(String.valueOf(field.get(e))));} else {if (put == null) {return;}// 添加 字段 与 字段对应的值put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(field.getName()), Bytes.toBytes(String.valueOf(field.get(e))));}}if (put != null) {// 插入数据table.put(put);}}/*** 新增或更新数据** @param connection       客户端与集群建立的连接* @param name             数据表名称* @param columnFamilyName columnFamilyName* @param e                数据表对应 javabean 对象* @param <E>              数据表对应 javabean 数据类型* @return upsert 是否成功* @throws IOException            IOException* @throws IllegalAccessException IllegalAccessException*/public static <E> boolean upsert(Connection connection, String name, String columnFamilyName, E e) throws IOException, IllegalAccessException {// 定义表名称TableName tableName = TableName.valueOf(name);// 连接 person 表Table table = connection.getTable(tableName);// 获取对象的 Class 对象Class<?> aClass = e.getClass();Put put = null;for (Field field : aClass.getDeclaredFields()) {// 设置允许访问私有属性field.setAccessible(true);// 获取属性 RowKeyAnnotation 注解RowKeyAnnotation annotation = field.getAnnotation(RowKeyAnnotation.class);if (annotation != null) {// 根据 RowKey 构建 Put 对象put = new Put(Bytes.toBytes(String.valueOf(field.get(e))));} else {if (put == null) {return false;}// 添加 字段 与 字段对应的值put.addColumn(Bytes.toBytes(columnFamilyName), Bytes.toBytes(field.getName()), Bytes.toBytes(String.valueOf(field.get(e))));}}if (put != null) {// 插入数据table.put(put);return true;}return false;}/*** 根据 RowKey 删除数据** @param connection 客户端与集群建立的连接* @param name       数据表名称* @param rowKey     数据表对应 RowKey* @return upsert 是否成功* @throws IOException IOException*/public static boolean delete(Connection connection, String name, String rowKey) throws IOException {// 定义表名称TableName tableName = TableName.valueOf(name);// 连接 数据表 表Table table = connection.getTable(tableName);// 根据 RowKey 构建 Delete 对象Delete delete = new Delete(Bytes.toBytes(rowKey));// 执行请求table.delete(delete);return true;}/*** 获取数据表中 ColumnFamily** @param admin 数据库管理对象* @param name  数据表名称* @return ColumnFamily 名称集合* @throws IOException IOException*/public static Set<String> columnFamilyNames(Admin admin, String name) throws IOException {// 指定表名TableName tableName = TableName.valueOf(name);// 获取表的描述对象TableDescriptor tableDescriptor = admin.getDescriptor(tableName);Set<String> columnFamilyNames = new HashSet<>();for (byte[] columnFamilyName : tableDescriptor.getColumnFamilyNames()) {columnFamilyNames.add(Bytes.toString(columnFamilyName));}return columnFamilyNames;}/*** 获取数据表中 column** @param connection 客户端与集群建立的连接* @param name       数据表名称* @return 字段名名称集合* @throws IOException IOException*/public static List<String> columnNames(Connection connection, String name) throws IOException {List<String> columnNames = new ArrayList<>();// 指定表名TableName tableName = TableName.valueOf(name);// 连接 数据表 表Table table = connection.getTable(tableName);// 获取表的描述对象Admin admin = connection.getAdmin();TableDescriptor tableDescriptor = admin.getDescriptor(tableName);// 获取列簇ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();for (ColumnFamilyDescriptor columnFamilyDescriptor : columnFamilies) {Result result = table.getScanner(columnFamilyDescriptor.getName()).next();for (Cell cell : result.rawCells()) {byte[] column = CellUtil.cloneQualifier(cell);columnNames.add(Bytes.toString(column));// String columnName = Bytes.toString(column);// String columnValue = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());}}return columnNames;}/*** 使用RowKey获取查询数据表中RowKey对应的记录** @param connection 客户端与集群建立的连接* @param name       数据表名称* @param rowKey     数据表RowKey* @param clazz      返回对象数据类型* @return 数据表模型类对象* @throws IOException IOException*/public static <T> Class<T> selectByRowKey(Connection connection, String name, String rowKey, Class<T> clazz) throws IOException, NoSuchFieldException, IllegalAccessException {Admin admin = connection.getAdmin();// 定义表名称TableName tableName = TableName.valueOf(name);// 连接 person 表Table table = connection.getTable(tableName);// 根据 RowKey 构建 Get 对象Get get = new Get(Bytes.toBytes(rowKey));// 执行请求 获取结果Result result = table.get(get);// 获取表的描述对象TableDescriptor tableDescriptor = admin.getDescriptor(tableName);// 获取列簇ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();for (ColumnFamilyDescriptor columnFamilyDescriptor : columnFamilies) {List<String> columnNames = new ArrayList<>();byte[] columnFamilyByteName = columnFamilyDescriptor.getName();Result rs = table.getScanner(columnFamilyDescriptor.getName()).next();for (Cell cell : rs.rawCells()) {byte[] column = CellUtil.cloneQualifier(cell);columnNames.add(Bytes.toString(column));}for (String columnName : columnNames) {// 解析结果String value = Bytes.toString(result.getValue(columnFamilyByteName, Bytes.toBytes(columnName)));Field field = clazz.getDeclaredField(nameInDb2nameInJava(columnName));field.setAccessible(true);field.set(clazz, value);}}return clazz;}/*** @param connection       客户端与集群建立的连接* @param name             数据表名称* @param columnFamilyName ColumnFamily 名称* @param columnName       字段名称* @param columnValue      字段匹配的值* @param clazz            数据表对应的模型类* @param cacheCount       缓存数量* @param <T>              返回值泛型* @return 查询结果* @throws IOException            IOException* @throws NoSuchFieldException   NoSuchFieldException* @throws IllegalAccessException IllegalAccessException*/public static <T> List<Class<T>> filter(Connection connection, String name, String columnFamilyName, String columnName, String columnValue, Class<T> clazz, int cacheCount) throws IOException, NoSuchFieldException, IllegalAccessException {List<Class<T>> list = new ArrayList<>();Table table = connection.getTable(TableName.valueOf(name));// 创建扫描对象Scan scan = new Scan();// 设置缓存scan.setCaching(cacheCount);// 创建自定义过滤器Filter customFilter = new CustomFilter(columnValue);scan.setFilter(customFilter);// 进行扫描try (ResultScanner scanner = table.getScanner(scan)) {for (Result result : scanner) {for (Field field : clazz.getDeclaredFields()) {field.setAccessible(true);field.set(clazz, null);}List<String> columnNames = new ArrayList<>();Result rs = table.getScanner(Bytes.toBytes(columnFamilyName)).next();for (Cell cell : rs.rawCells()) {byte[] column = CellUtil.cloneQualifier(cell);columnNames.add(Bytes.toString(column));}for (String columnTempName : columnNames) {// 解析结果String value = Bytes.toString(result.getValue(Bytes.toBytes(columnTempName), Bytes.toBytes(columnName)));Field field = clazz.getDeclaredField(nameInDb2nameInJava(columnName));field.setAccessible(true);field.set(clazz, value);}list.add(clazz);}}return list;}/*** 数据库里下划线命名规则转化为java里面驼峰式命名** @param filedName 字段名称* @return javabean属性名称*/public static String nameInDb2nameInJava(String filedName) {String coluname = filedName.toLowerCase();//正则if (Pattern.compile("^\\S+_+\\S+$").matcher(coluname).find()) {char[] ca = coluname.toCharArray();for (int j = 1; j < ca.length - 1; j++) {if (ca[j] == '_') {ca[j] = '\0';ca[j + 1] = Character.toUpperCase(ca[j + 1]);}}coluname = new String(ca);}return coluname.replaceAll("\0", "");}public static void main(String[] args) throws IOException {Admin admin = getAdmin();ServerName master = admin.getMaster();System.out.println(master.getHostname() + ":" + master.getPort());Collection<ServerName> regionServers = admin.getRegionServers();for (ServerName serverName : regionServers) {System.out.println(serverName.getHostname() + ":" + serverName.getPort());}}
}

http://www.ppmy.cn/ops/132948.html

相关文章

如何不封禁UDP协议同时防止UDP攻击

UDP&#xff08;User Datagram Protocol&#xff09;协议因其简单、高效的特点&#xff0c;广泛应用于各种网络服务中&#xff0c;如视频流、在线游戏和VoIP等。然而&#xff0c;UDP协议的无连接特性和缺乏内置的安全机制使其容易成为攻击者的靶标&#xff0c;常见的攻击类型包…

Redis 高并发分布式锁实战

目录 环境准备 一 . Redis 安装 二&#xff1a;Spring boot 项目准备 三&#xff1a;nginx 安装 四&#xff1a;Jmeter 下载和配置 案例实战 优化一&#xff1a;加 synchronized 锁 优化二&#xff1a;使用 redis 的 setnx 实现分布式锁 优化三&#xff1a;使用 Lua 脚本…

docker快速安装与配置mongoDB

docker快速安装与配置mongoDB 拉取 MongoDB Docker 映像 docker pull mongodb/mongodb-community-server:latest将映像作为 container 运行 docker run --restartalways --name mongodb -p 27017:27017 --privilegedtrue -e MONGO_INITDB_ROOT_USERNAMEroot -e MONGO_INITD…

科技改变生活:最新智能开关、调光器及插座产品亮相

根据QYResearch调研团队的最新力作《欧洲开关、调光器和插座市场报告2023-2029》显示&#xff0c;预计到2029年&#xff0c;欧洲开关、调光器和插座市场的规模将攀升至57.8亿美元&#xff0c;并且在接下来的几年里&#xff0c;将以4.2%的复合年增长率&#xff08;CAGR&#xff…

《手写Spring渐进式源码实践》实践笔记(第十七章 数据类型转换)

文章目录 第十七章 数据类型转换工厂设计实现背景技术背景Spring数据转换实现方式类型转换器&#xff08;Converter&#xff09;接口设计实现 业务背景 目标设计实现代码结构类图实现步骤 测试事先准备属性配置文件转换器工厂Bean测试用例测试结果&#xff1a; 总结 第十七章 数…

数据中心类DataCenter(三)

数据中心类DataCenter&#xff08;三&#xff09; 前言 在上一集&#xff0c;我们就完成了整个数据中心类的构造函数以及析构函数&#xff0c;我们讨论了我们的数据持久化的相关事宜&#xff0c;那么我们这一集就要来完成这一个内容。 需求分析 我们需要暂且就规定我们的路…

springboot快递物流管理系统-计算机设计毕业源码85178

目 录 摘要 1 绪论 1.1 选题背景与意义 1.2国内外研究现状 1.3论文结构与章节安排 2 快递物流管理系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2 经济可行性分析 2.1.3 操作可行性分析 2.2 系统流程分析 2.2.1数据增加流程 2.2.2 数据修改流程 2.2.3 数据…

PCD文件(Point Cloud Data)详细解析

PCD文件&#xff08;Point Cloud Data&#xff09;详细解析 PCD文件格式是专为存储和处理三维点云数据而设计的&#xff0c;由开源项目Point Cloud Library&#xff08;PCL&#xff09;引入。该格式广泛应用于机器人视觉、计算机视觉以及三维建模等领域&#xff0c;支持多种数…