前言
最近在做有关地理时空大数据的实验,本文将介绍如何利用geowave框架,将矢量数据导入到HBase或Accumulo等NoSQL数据库中。
软件版本:
Hadoop: 2.10.2
Zookeeper: 3.6.4
geowave: 1.2.0
Accumulo:1.9.3
HBase: 1.4.0
Java: 1.8
准备工作
从GeoWave官网下载geowave-hbase-1.2.0-apache.jar导入到HBase的lib文件夹下。(Accumulo数据库导入geowave-accumulo-1.2.0-apache-accumulo1.7.jar包)
代码
1、引入依赖
<dependency><groupId>org.locationtech.geowave</groupId><artifactId>geowave-datastore-hbase</artifactId><version>1.2.0</version>
</dependency>
<dependency><groupId>org.locationtech.geowave</groupId><artifactId>geowave-adapter-vector</artifactId><version>1.2.0</version>
</dependency>
<dependency><groupId>org.locationtech.geowave</groupId><artifactId>geowave-format-vector</artifactId><version>1.2.0</version>
</dependency>
<dependency><groupId>org.locationtech.geowave</groupId><artifactId>geowave-datastore-accumulo</artifactId><version>1.2.0</version>
</dependency>
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>4.6.10</version>
</dependency>
<dependency><groupId>org.apache.accumulo</groupId><artifactId>accumulo-core</artifactId><version>1.9.3</version>
</dependency>
2、HBase数据库导入矢量数据
本文选取AIS数据,文件为JSON格式如下:
首先需要写代码构建SimpleFeatureTypeBuilder对象,该对象用于创建SimpleFeatureType,其实 用于定义你的矢量数据的Geometry类型,有哪些属性字段。
java">public static SimpleFeatureTypeBuilder getSimpleFeatureBuilder (String typeName) throws IOException, JSONException {// 创建 SimpleFeatureTypeBuilder 对象SimpleFeatureTypeBuilder featureTypeBuilder = new SimpleFeatureTypeBuilder();AttributeTypeBuilder attributeBuilder = new AttributeTypeBuilder();featureTypeBuilder.add(attributeBuilder.binding(Point.class).nillable(false).buildDescriptor("the_geom"));// 添加属性featureTypeBuilder.add("uuid", String.class);featureTypeBuilder.add("mmsi", Integer.class);featureTypeBuilder.add("timestamp", Date.class);featureTypeBuilder.add("system_timestamp", Date.class);featureTypeBuilder.add("nav_status", Integer.class);featureTypeBuilder.add("rot", Double.class);featureTypeBuilder.add("sog", Double.class);featureTypeBuilder.add("pos_acc", Double.class);featureTypeBuilder.add("longitude", Double.class);featureTypeBuilder.add("latitude", Double.class);featureTypeBuilder.add("cog", Double.class);featureTypeBuilder.add("true_head", Double.class);featureTypeBuilder.add("eta", String.class);featureTypeBuilder.add("destid", Integer.class);featureTypeBuilder.add("dest", String.class);featureTypeBuilder.add("srcid", Integer.class);featureTypeBuilder.add("distance", Double.class);featureTypeBuilder.add("speed", Double.class);featureTypeBuilder.add("draught", Double.class);featureTypeBuilder.add("ship_type", Integer.class);featureTypeBuilder.setCRS(DefaultGeographicCRS.WGS84);featureTypeBuilder.setName(typeName);return featureTypeBuilder;}
导入数据,这里的typeName参数是定义矢量数据名称,indexName参数定义索引名称
java">public static void IngestData(String typeName, String indexName) throwsIOException, JSONException, ParseException {HBaseRequiredOptions hBaseRequiredOptions = new HBaseRequiredOptions();//Zookeeper IPhBaseRequiredOptions.setZookeeper("localhost:2181");HBaseDataStore hBaseDataStore = (HBaseDataStore) DataStoreFactory.createDataStore(hBaseRequiredOptions);// JSON 数据 文件路径String filePath = "D:\\轨迹数据\\5h.json";long startTimeStamp = System.currentTimeMillis();// 转换 JSON 文件为 SimpleFeatureTypeSimpleFeatureTypeBuilder featureTypeBuilder = getSimpleFeatureBuilder(typeName);SimpleFeatureType pointType = featureTypeBuilder.buildFeatureType();//创建SimpleFeatureBuilderSimpleFeatureBuilder pointFeatureBuilder = new SimpleFeatureBuilder(pointType);// Create an adapter for point typeFeatureDataAdapter pointTypeAdapter = new FeatureDataAdapter(pointType);//创建索引Index spatialTemporalIndex = new SpatialTemporalIndexBuilder().setMaxDuplicates(-1).setNumPartitions(3).setPeriodicity(TemporalBinningStrategy.Unit.DAY).setPartitionStrategy(IndexPluginOptions.PartitionStrategy.HASH).setName(indexName).createIndex();//Add the point type to the data store in the spatial indexhbaseStore.addType(pointTypeAdapter, spatialTemporalIndex);hbaseStore.getIndexStore().addIndex(spatialTemporalIndex);Writer<SimpleFeature> writer = hbaseStore.createWriter(pointTypeAdapter.getTypeName());// 读取 JSON 文件BufferedReader bufferedReader = new BufferedReader(new FileReader(filePath));String line;SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");while ((line = bufferedReader.readLine()) != null) {JSONObject jsonObject = JSONObject.parseObject(line);// Write some features to the data storeGeometryFactory factory = new GeometryFactory();String uuId = jsonObject.get("uuid").toString();String mmsi = jsonObject.get("mmsi").toString();pointFeatureBuilder.set("the_geom", factory.createPoint(new Coordinate(Double.parseDouble(jsonObject.get("longitude").toString()),Double.parseDouble(jsonObject.get("latitude").toString()))));pointFeatureBuilder.set("uuid", uuId);pointFeatureBuilder.set("mmsi", mmsi);String timestamp_str = jsonObject.get("timestamp").toString();pointFeatureBuilder.set("timestamp", sdf.parse(timestamp_str));String system_timestamp_str = jsonObject.get("system_timestamp").toString();pointFeatureBuilder.set("system_timestamp", sdf.parse(system_timestamp_str));pointFeatureBuilder.set("nav_status", jsonObject.get("nav_status"));pointFeatureBuilder.set("rot", jsonObject.get("rot"));pointFeatureBuilder.set("sog", jsonObject.get("sog"));pointFeatureBuilder.set("pos_acc", jsonObject.get("pos_acc"));pointFeatureBuilder.set("cog", jsonObject.get("cog"));pointFeatureBuilder.set("true_head", jsonObject.get("true_head"));pointFeatureBuilder.set("eta", jsonObject.get("eta"));pointFeatureBuilder.set("destid", jsonObject.get("destid"));pointFeatureBuilder.set("dest", jsonObject.get("dest"));pointFeatureBuilder.set("srcid", jsonObject.get("srcid"));pointFeatureBuilder.set("distance", jsonObject.get("distance"));pointFeatureBuilder.set("speed", jsonObject.get("speed"));pointFeatureBuilder.set("draught", jsonObject.get("draught"));pointFeatureBuilder.set("ship_type", jsonObject.get("ship_type"));//取UUID 前三位 + 后三位 + 船舶编号String id = uuId.substring(0, 3) + uuId.substring(uuId.length() -3) + "-" + mmsi;writer.write(pointFeatureBuilder.buildFeature(id));}System.out.println("ingest finished!!!!");Long endTimeStamp = System.currentTimeMillis();System.out.println("导入耗时:" + (endTimeStamp - startTimeStamp) + "毫秒");bufferedReader.close();writer.flush();writer.close();}
3、Accumulo数据库导入矢量数据
代码和上面基本相同,把HBaseDataStore换成AccumuloDataStore即可。