canal DirectToKafka
环境的配置:
在mysql的配置文件中:/etc/my.cnf
linux>/etc/my.cnf
[mysqld]
server-id=1
log-bin=mysql-bin //binlog
binlog_format=row //binlog形式row 一行数据
binlog-do-db=testdb //binlog 数据库testdb
在canal中中配置:
在canal.properties中的配置:
linux>vi conf/canal.properties
canal.serverMode = kafka
canal.instance.parser.parallel = false
canal.destinations = example
canal.mq.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
在example中的配置instance.properties
linux>vi conf/example/instance.properties
canal.instance.mysql.slaveId=21
#mysql的地址
canal.instance.master.address=192.168.58.203:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
1.启动canal
linux>bin/startup.sh
注意环境应是java8
2.启动一个消费者
linux>bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
在mysql中使用insert 语句
如果有数据,说明环境搭建好了(kafka内)
编写spark streaming 程序:
1.使用direct模式对接kafka
val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)val ssc = new StreamingContext(conf, Seconds(5))ssc.sparkContext.setLogLevel("error")val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "canal-demo","auto.offset.reset" -> "latest"// "enable.auto.commit" -> (t: java.lang.Boolean))val topics = Array("example")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
- 使用 fastJson 解析,偏平化
stream.map(_.value()).map(line=>{val jsonObject=JSON.parseObject(line)val table=jsonObject.getString("table")val types=jsonObject.getString("type")
// val data=jsonObject.getJSONArray("data")val data = jsonObject.getJSONArray("data")
// for( i <- 0 until jSONArray.size()){ val id=jSONArray.getJSONObject(i).getString("id") val name=(jSONArray.getJSONObject(i).getString("name")) }(table,types,data)}).print()
- transform 算子 关联 维度数据 (黑名单的程序)
filter.transform(rdd=>{//建立数据库的链接rdd.map(line=>{})//执行查询语句,关联维度数据()})
json解析测试
package SparkTest.canal
import com.alibaba.fastjson.JSON
object JsonTest {//解析本地json 格式 日志分析//和TransformationDemo 关连def main(args: Array[String]): Unit = {// val nums = List(1, 2, 3, 4)val value = "{\"data\":[{\"id\":\"1\",\"name\":\"lisi\",\"score\":\"100\"},{\"id\":\"2\",\"name\":\"lisi\",\"score\":\"100\"},{\"id\":\"3\",\"name\":\"lisi\",\"score\":\"100\"}],\"database\":\"scott\",\"es\":1644282564000,\"id\":8,\"isDdl\":false,\"mysqlType\":{\"id\":\"int(11)\",\"name\":\"varchar(30)\",\"score\":\"int(11)\"},\"old\":[{\"name\":\"zhaoliu\"},{\"name\":\"zhaoliu\"},{\"name\":\"zhaoliu\"}],\"sql\":\"\",\"sqlType\":{\"id\":4,\"name\":12,\"score\":4},\"table\":\"student\",\"ts\":1644282564778,\"type\":\"UPDATE\"}"try {val jsonObject = JSON.parseObject(value)println(jsonObject)println(jsonObject.getString("table"))println(jsonObject.getString("type"))println(jsonObject.getJSONArray("data"))val jSONArray = jsonObject.getJSONArray("data")for (i <- 0 until jSONArray.size()) {println(jSONArray.getJSONObject(i).getString("id"))println(jSONArray.getJSONObject(i).getString("name"))}// val students: = jSONArray.toJavaList[Student](Student.// class)// println(students)} catch {case e: Exception => {e.printStackTrace()}}}
}
解析服务器上的 kafka日志分析,json 格式
package SparkTest.canal
import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
//解析服务器上的 kafka日志分析,json 格式
object TransformationDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)val ssc = new StreamingContext(conf, Seconds(5))ssc.sparkContext.setLogLevel("error")val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "canal-demo","auto.offset.reset" -> "latest"// "enable.auto.commit" -> (t: java.lang.Boolean) //自动提交)val topics = Array("example")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))/*** 从topic中取出的数是key,value格式,一般情况只处理value*/val filter = stream.map(_.value())stream.map(_.value()).map(line => {val jsonObject = JSON.parseObject(line)val table = jsonObject.getString("table")val types = jsonObject.getString("type")// val data=jsonObject.getJSONArray("data")val data = jsonObject.getJSONArray("data")/* for( i <- 0 until jSONArray.size()){// val id=jSONArray.getJSONObject(i).getString("id")// val name=(jSONArray.getJSONObject(i).getString("name"))// }*/(table, types, data) //取值 json扁平化}).print()filter.transform(rdd=>{//建立数据库的链接rdd.map(line=>{})//执行查询语句,关联维度数据})ssc.start()ssc.awaitTermination()}
}
将数据写入kafka
1,收集到Driver端,构造一个producer 发送数据
2,在每个executor端构造一个producer 发送数据
需要在driver发送到executor 需要序列化 lazy val producer
1.定义类KafkaSink:
2.在driver端:构造一个广播变量,广播到每个executor
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {val kafkaProducerConfig = {val p = new Properties()p.setProperty("bootstrap.servers", "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092")p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}
// log.warn("kafka producer init done!")ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}3.在executor端使用:stream.foreachRDD { rdd =>if (!rdd.isEmpty) {rdd.foreach(record => {kafkaProducer.value.send("example", record.key(),record.value())// do something else})}
1.启动一个生产者,生产数据 wordcount
linux>bin/kafka-console-producer.sh --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --topic wordcount
运行SparkStreamingKafkaDemo后 随便输入数据
2.启动一个消费者,example
linux>bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092
DirectToKafka
package SparkTest.canal
import java.util.Properties
//6-4 最终
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentobject SparkStreamingKafkaDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("submitoffset")val ssc = new StreamingContext(conf, Durations.seconds(5))//设置日志级别ssc.sparkContext.setLogLevel("Error")val kafkaProducer: Broadcast[KafkaSink[String, String]] = { //定义的广播变量val kafkaProducerConfig = {val p = new Properties()p.setProperty("bootstrap.servers", "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092")p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}// log.warn("kafka producer init done!")ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig)) //调用方法做广播变量(List)}val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "test", ///*** 当没有初始的offset,或者当前的offset不存在,如何处理数据* earliest :自动重置偏移量为最小偏移量* latest:自动重置偏移量为最大偏移量【默认】* none:没有找到以前的offset,抛出异常*/"auto.offset.reset" -> "earliest",/*** 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交*/"enable.auto.commit" -> (true: java.lang.Boolean) //默认是true 每5s提交一次)// 从wordcount读出数据,写入example:// 1.启动一个生产者,生产数据 wordcount// 2.启动一个消费者,exampleval topics = Array("wordcount")// 原始的stream 中包含了 offset的信息val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))// 获取 record 中 value// val valueStream: DStream[String] = stream.map(record => {//// println("receive message key = " + record.key)// println("receive message value = " + record.value)// record.value// })// val words: DStream[String] = valueStream.flatMap(line => {// line.split(" ")// })// val result: DStream[(String, Int)] = words.map((_, 1)).reduceByKey(_ + _)// result.print()/*** 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset* 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。*/// stream.map(_.value()).transform(rdd=>{// rdd// })stream.foreachRDD { rdd =>if (!rdd.isEmpty) {rdd.foreach(record => {// 从wordcount读出数据,写入example:// 1.启动一个生产者,生产数据 wordcount// 2.启动一个消费者,examplekafkaProducer.value.send("example", record.key(),record.value())// do something else})}// val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges// // some time later, after outputs have completed// // 业务处理完,收动提交offset// stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}ssc.start()ssc.awaitTermination()ssc.stop()}
}