原始使用springboot + spark 整合在一起做了一个通过请求来驱动spark计算的demo工程,最近看了下GeoSpark,Geotrellis,GeoMeca关于GIS方面的大数据相关技术,就写了一个Demo,我这里用的是Spark 2.4.0 版本 scala 2.11 版本 Geotrellis 2.3.1版本 GeoTools 20.0版本 Guava 14.0.1版本
相关pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.i-tudou.bd</groupId><artifactId>spring-spark-demo</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>A Camel Scala Route</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><scala.version>2.11</scala.version><spark.version>2.4.0</spark.version><guava.version>14.0.1</guava.version><geotrellis.version>2.3.1</geotrellis.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.3.RELEASE</version><relativePath /> <!-- lookup parent from repository --></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>com.github.ben-manes.caffeine</groupId><artifactId>caffeine</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-spark_${scala.version}</artifactId><version>${geotrellis.version}</version></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-raster_${scala.version}</artifactId><version>${geotrellis.version}</version></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-vector_${scala.version}</artifactId><version>${geotrellis.version}</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-coverage</artifactId><version>20.0</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-referencing</artifactId><version>20.0</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-main</artifactId><version>20.0</version></dependency><dependency><groupId>org.geotools</groupId><artifactId>gt-shapefile</artifactId><version>20.0</version></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-geotools_${scala.version}</artifactId><version>${geotrellis.version}</version><exclusions><exclusion><groupId>org.geotools</groupId><artifactId>gt-coverage</artifactId></exclusion><exclusion><groupId>org.geotools</groupId><artifactId>gt-epsg-hsql</artifactId></exclusion><exclusion><groupId>org.geotools</groupId><artifactId>gt-main</artifactId></exclusion><exclusion><groupId>org.geotools</groupId><artifactId>gt-referencing</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-util_${scala.version}</artifactId><version>${geotrellis.version}</version></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-proj4_${scala.version}</artifactId><version>${geotrellis.version}</version></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-spark-pipeline_${scala.version}</artifactId><version>${geotrellis.version}</version></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-shapefile_${scala.version}</artifactId><version>${geotrellis.version}</version><exclusions><exclusion><groupId>org.geotools</groupId><artifactId>gt-shapefile</artifactId></exclusion><exclusion><groupId>javax.media</groupId><artifactId>jai_core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-spark-etl_${scala.version}</artifactId><version>${geotrellis.version}</version></dependency><!--<dependency><groupId>org.locationtech.geotrellis</groupId><artifactId>geotrellis-spark-etl_2.12</artifactId><version>${geotrellis.version}</version></dependency>--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springrfamework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion><exclusion><groupId>com.google.guava</groupId><artifactId>guava</artifactId></exclusion><!--<exclusion><groupId>org.hibernate</groupId><artifactId> hibernate-validator</artifactId></exclusion>--></exclusions></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>${guava.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-undertow</artifactId></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.8.0</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.8.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions><scope>compile</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.codehaus.janino</groupId><artifactId>janino</artifactId><version>3.0.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency></dependencies><repositories><repository><id>maven2-repository.dev.java.net</id><name>Java.net repository</name><url>http://download.java.net/maven/2</url></repository><repository><id>osgeo</id><name>Open Source Geospatial Foundation Repository</name><url>http://download.osgeo.org/webdav/geotools/</url></repository><repository><snapshots><enabled>true</enabled></snapshots><id>boundless</id><name>Boundless Maven Repository</name><url>http://repo.boundlessgeo.com/main</url></repository></repositories><build><plugins><plugin><inherited>true</inherited><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build>
</project>
app 启动类实现
@ComponentScan
@SpringBootApplication
class Config
object springsparkdemoApplication extends App{SpringApplication.run(classOf[Config])
}
springboot 启动时Configuration加载spark实例
@Configuration
class Sparkconfig {private val sparkHome = "."private val appName = "sparkTest"private val master = "local[*]"@Beandef SparkConf: SparkConf = {val conf = new SparkConf().setAppName(appName).setMaster(master)return conf}@Beandef SparkContext = new SparkContext(SparkConf)
}
Swagger配置
@Configuration
@EnableSwagger2
@ConditionalOnProperty(name = Array("swagger.enable"), havingValue = "true")
class SwaggerConfiguration {@Bean def createRestApi: Docket = {val pacakage = "com.itudou.bd.Controller"new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo).select.apis(RequestHandlerSelectors.basePackage(pacakage)).paths(PathSelectors.any).build}private def apiInfo = new ApiInfoBuilder().title("smartdata-uc").description("").termsOfServiceUrl("").contact("").version("1.0").build
}
Rest 接口调用GeoTrellis
@Api( value = "GeoTrellisController",description = "GeoTrellisController")
@RestController
@RequestMapping (value = Array("GeoTrellisController/data/"))
@CrossOrigin
class GeoTrellisController {@ApiOperation(value = "vecoterToraster")@GetMapping(value = Array("vecoterToraster"))def vecoterToraster ={/* DemoHandle.vactorToRaster("D:\\log\\61011_geo\\610111.shp","D:\\log\\61011_geo\\610111.tif",Sparkconfig.SparkContext);*/}@ApiOperation(value = "etl")@GetMapping(value = Array("etl"))def etl ={var args = Array[String]("--input","D:\\log\\61011_geo\\input.json","--output","D:\\log\\61011_geo\\output.json","--backend-profiles","D:\\log\\61011_geo\\backend-profiles.json");DemoHandle.etl(args);}
}
网上关于GeoTrellis的两个例子
object DemoHandle {val colorMap1 =ColorMap(Map(0 -> RGB(0,0,0),1 -> RGB(255,255,255)))var minX,minY,maxX,maxY = -180.0def getFeatures(path: String,charset: String = "UTF-8"): mutable.ListBuffer[Geometry] ={val features = mutable.ListBuffer[Geometry]()val shpDataStore = new ShapefileDataStore(new File(path).toURI().toURL())shpDataStore.setCharset(Charset.forName(charset))val typeName = shpDataStore.getTypeNames()(0)val featureSource = shpDataStore.getFeatureSource(typeName)val result = featureSource.getFeatures()val itertor = result.features()while(itertor.hasNext()){val sf = itertor.next();//System.out.println(sf.getAttribute(0).toString())features+= WKT.read(sf.getAttribute(0).toString);}itertor.close()shpDataStore.dispose()return features}def vactorToRaster(orgpath: String,armpath:String, sc:SparkContext)={val features = getFeatures(orgpath)minX = features(0).jtsGeom.getEnvelopeInternal.getMinXminY = features(0).jtsGeom.getEnvelopeInternal.getMinYmaxX = features(0).jtsGeom.getEnvelopeInternal.getMaxXmaxY = features(0).jtsGeom.getEnvelopeInternal.getMaxYfor (feature <- features) {if (feature.jtsGeom.getEnvelopeInternal.getMaxX > maxX)maxX = feature.jtsGeom.getEnvelopeInternal.getMaxXif (feature.jtsGeom.getEnvelopeInternal.getMaxY > maxY)maxY = feature.jtsGeom.getEnvelopeInternal.getMaxYif (feature.jtsGeom.getEnvelopeInternal.getMinX < minX)minX = feature.jtsGeom.getEnvelopeInternal.getMinXif (feature.jtsGeom.getEnvelopeInternal.getMinY < minY)minY = feature.jtsGeom.getEnvelopeInternal.getMinY}val geoms:RDD[Geometry] = sc.parallelize(features)val extent:Extent = Extent(minX, minY, maxX, maxY)val tl = TileLayout(100, 72, 256, 256)val layout = LayoutDefinition(extent, tl)val celltype: CellType = IntCellTypeval re = RasterExtent(extent, 1200, 600)val layer: RDD[(SpatialKey, Tile)] with Metadata[LayoutDefinition] =geoms.rasterize(36, celltype, layout)val layerResult = layer.collect();for(sublayer<-layerResult) {sublayer._2.renderPng(colorMap1).write(armpath)}}def etl(args: Array[String])= {implicit val sc = SparkUtils.createSparkContext("ETL", new SparkConf(true).setMaster("local[*]"))try {//Etl.ingest[ProjectedExtent, SpatialKey, Tile](args)} finally {sc.stop}}
}