原来用jetty 和 restful 做过驱动spark的计算框架,最近想用spring boot +scala + spark 重新做一个,一下是pom.xml 和代码分享
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.i-tudou.bd</groupId><artifactId>spring-spark-demo</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>A Camel Scala Route</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><scala.version>2.11</scala.version><spark.version>2.4.0</spark.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.3.RELEASE</version><relativePath /> <!-- lookup parent from repository --></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion></exclusions><scope>compile</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.codehaus.janino</groupId><artifactId>janino</artifactId><version>3.0.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency></dependencies></project>
application.scala 分享
package com.itudou.bdimport org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.{EnableAutoConfiguration, SpringBootApplication}
import org.springframework.context.annotation.{ComponentScan, Configuration}@Configuration
@EnableAutoConfiguration
@ComponentScan
@SpringBootApplication
class Config
object springsparkdemoApplication extends App{SpringApplication.run(classOf[Config])
}
sparkconfig.scala 分享
package com.itudou.bd.configimport org.apache.spark.{SparkConf, SparkContext}
import org.springframework.context.annotation.{Bean, Configuration}@Configuration
class Sparkconfig {private val sparkHome = "."private val appName = "sparkTest"private val master = "local"@Beandef SparkConf: SparkConf = {val conf = new SparkConf().setAppName(appName).setMaster(master)return conf}@Beandef SparkContext = new SparkContext(SparkConf)
}
DataController.scala 分享
package com.itudou.bd.Controllerimport java.util.Propertiesimport org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.web.bind.annotation._import scala.util.parsing.json.{JSON, JSONObject}@RestController
@RequestMapping (value = Array("DataController/data/"))
@CrossOrigin
class DataController {@Autowiredvar sc:SparkContext = _@GetMapping(value = Array("test"))def test ={val url = "jdbc:mysql://10.1.3.49:3309/tdgistaskDB?useUnicode=true&characterEncoding=UTF-8&user=root&password=123";val prop = new Properties();val sqlContext = new SQLContext(sc);val df = sqlContext.read.jdbc(url, "t_task", prop);df.createOrReplaceTempView("t_task")//使用SQL语句进行查询var df1 = sqlContext.sql("select * from t_task where parent_id = 0")println("1.------------->" + df1.show().toString())//println("1.------------->" + df1.rdd.partitions.size)JSON.parseFull("{lige:1}")}}