canal DirectToKafka(采集数据到kafka、json解析、写入kafka)

news/2024/11/22 7:55:54/

canal DirectToKafka
环境的配置:
在mysql的配置文件中:/etc/my.cnf

linux>/etc/my.cnf 
[mysqld]
server-id=1
log-bin=mysql-bin	//binlog
binlog_format=row	//binlog形式row 一行数据
binlog-do-db=testdb //binlog 数据库testdb

在canal中中配置:
在canal.properties中的配置:

linux>vi conf/canal.properties 
canal.serverMode = kafka
canal.instance.parser.parallel = false
canal.destinations = example
canal.mq.servers = 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092

在example中的配置instance.properties

linux>vi conf/example/instance.properties
canal.instance.mysql.slaveId=21
#mysql的地址
canal.instance.master.address=192.168.58.203:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

1.启动canal

linux>bin/startup.sh
注意环境应是java8

2.启动一个消费者

linux>bin/kafka-console-consumer.sh --topic example --from-beginning --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092

在mysql中使用insert 语句
如果有数据,说明环境搭建好了(kafka内)

编写spark streaming 程序:
1.使用direct模式对接kafka

 val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)val ssc = new StreamingContext(conf, Seconds(5))ssc.sparkContext.setLogLevel("error")val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "canal-demo","auto.offset.reset" -> "latest"//      "enable.auto.commit" -> (t: java.lang.Boolean))val topics = Array("example")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))
  1. 使用 fastJson 解析,偏平化
 stream.map(_.value()).map(line=>{val jsonObject=JSON.parseObject(line)val table=jsonObject.getString("table")val types=jsonObject.getString("type")
//        val data=jsonObject.getJSONArray("data")val data = jsonObject.getJSONArray("data")
//        for( i <- 0 until jSONArray.size()){          val id=jSONArray.getJSONObject(i).getString("id")          val name=(jSONArray.getJSONObject(i).getString("name"))        }(table,types,data)}).print()
  1. transform 算子 关联 维度数据 (黑名单的程序)
 filter.transform(rdd=>{//建立数据库的链接rdd.map(line=>{})//执行查询语句,关联维度数据()})

json解析测试

package SparkTest.canal
import com.alibaba.fastjson.JSON
object JsonTest {//解析本地json 格式 日志分析//和TransformationDemo 关连def main(args: Array[String]): Unit = {// val nums = List(1, 2, 3, 4)val value = "{\"data\":[{\"id\":\"1\",\"name\":\"lisi\",\"score\":\"100\"},{\"id\":\"2\",\"name\":\"lisi\",\"score\":\"100\"},{\"id\":\"3\",\"name\":\"lisi\",\"score\":\"100\"}],\"database\":\"scott\",\"es\":1644282564000,\"id\":8,\"isDdl\":false,\"mysqlType\":{\"id\":\"int(11)\",\"name\":\"varchar(30)\",\"score\":\"int(11)\"},\"old\":[{\"name\":\"zhaoliu\"},{\"name\":\"zhaoliu\"},{\"name\":\"zhaoliu\"}],\"sql\":\"\",\"sqlType\":{\"id\":4,\"name\":12,\"score\":4},\"table\":\"student\",\"ts\":1644282564778,\"type\":\"UPDATE\"}"try {val jsonObject = JSON.parseObject(value)println(jsonObject)println(jsonObject.getString("table"))println(jsonObject.getString("type"))println(jsonObject.getJSONArray("data"))val jSONArray = jsonObject.getJSONArray("data")for (i <- 0 until jSONArray.size()) {println(jSONArray.getJSONObject(i).getString("id"))println(jSONArray.getJSONObject(i).getString("name"))}//      val students: = jSONArray.toJavaList[Student](Student.//      class)// println(students)} catch {case e: Exception => {e.printStackTrace()}}}
}

解析服务器上的 kafka日志分析,json 格式

package SparkTest.canal
import com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
//解析服务器上的 kafka日志分析,json 格式
object TransformationDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getName)val ssc = new StreamingContext(conf, Seconds(5))ssc.sparkContext.setLogLevel("error")val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "canal-demo","auto.offset.reset" -> "latest"//      "enable.auto.commit" -> (t: java.lang.Boolean)  //自动提交)val topics = Array("example")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))/*** 从topic中取出的数是key,value格式,一般情况只处理value*/val filter = stream.map(_.value())stream.map(_.value()).map(line => {val jsonObject = JSON.parseObject(line)val table = jsonObject.getString("table")val types = jsonObject.getString("type")//        val data=jsonObject.getJSONArray("data")val data = jsonObject.getJSONArray("data")/*              for( i <- 0 until jSONArray.size()){//          val id=jSONArray.getJSONObject(i).getString("id")//          val name=(jSONArray.getJSONObject(i).getString("name"))//        }*/(table, types, data) //取值 json扁平化}).print()filter.transform(rdd=>{//建立数据库的链接rdd.map(line=>{})//执行查询语句,关联维度数据})ssc.start()ssc.awaitTermination()}
}

将数据写入kafka
1,收集到Driver端,构造一个producer 发送数据
2,在每个executor端构造一个producer 发送数据
需要在driver发送到executor 需要序列化 lazy val producer

1.定义类KafkaSink:
2.在driver端:构造一个广播变量,广播到每个executor

 val kafkaProducer: Broadcast[KafkaSink[String, String]] = {val kafkaProducerConfig = {val p = new Properties()p.setProperty("bootstrap.servers", "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092")p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}
//      log.warn("kafka producer init done!")ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))}3.在executor端使用:stream.foreachRDD { rdd =>if (!rdd.isEmpty) {rdd.foreach(record => {kafkaProducer.value.send("example", record.key(),record.value())// do something else})}

1.启动一个生产者,生产数据 wordcount

linux>bin/kafka-console-producer.sh  --broker-list 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092 --topic wordcount

运行SparkStreamingKafkaDemo后 随便输入数据
2.启动一个消费者,example

linux>bin/kafka-console-consumer.sh  --topic example --from-beginning --bootstrap-server 192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092

DirectToKafka

package SparkTest.canal
import java.util.Properties
//6-4 最终
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentobject SparkStreamingKafkaDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setMaster("local[*]")conf.setAppName("submitoffset")val ssc = new StreamingContext(conf, Durations.seconds(5))//设置日志级别ssc.sparkContext.setLogLevel("Error")val kafkaProducer: Broadcast[KafkaSink[String, String]] = { //定义的广播变量val kafkaProducerConfig = {val p = new Properties()p.setProperty("bootstrap.servers", "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092")p.setProperty("key.serializer", classOf[StringSerializer].getName)p.setProperty("value.serializer", classOf[StringSerializer].getName)p}//      log.warn("kafka producer init done!")ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))  //调用方法做广播变量(List)}val kafkaParams = Map[String, Object]("bootstrap.servers" -> "192.168.58.201:9092,192.168.58.202:9092,192.168.58.203:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "test", ///*** 当没有初始的offset,或者当前的offset不存在,如何处理数据* earliest :自动重置偏移量为最小偏移量* latest:自动重置偏移量为最大偏移量【默认】* none:没有找到以前的offset,抛出异常*/"auto.offset.reset" -> "earliest",/*** 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交*/"enable.auto.commit" -> (true: java.lang.Boolean) //默认是true 每5s提交一次)//    从wordcount读出数据,写入example://      1.启动一个生产者,生产数据 wordcount//      2.启动一个消费者,exampleval topics = Array("wordcount")// 原始的stream 中包含了 offset的信息val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))// 获取 record 中 value//    val valueStream: DStream[String] = stream.map(record => {////      println("receive message key = " + record.key)//      println("receive message value = " + record.value)//      record.value//    })//    val words: DStream[String] = valueStream.flatMap(line => {//      line.split(" ")//    })//    val result: DStream[(String, Int)] = words.map((_, 1)).reduceByKey(_ + _)//    result.print()/*** 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset* 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。*///    stream.map(_.value()).transform(rdd=>{//      rdd//    })stream.foreachRDD { rdd =>if (!rdd.isEmpty) {rdd.foreach(record => {//    从wordcount读出数据,写入example://    1.启动一个生产者,生产数据 wordcount//    2.启动一个消费者,examplekafkaProducer.value.send("example", record.key(),record.value())// do something else})}//      val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges//      // some time later, after outputs have completed//      // 业务处理完,收动提交offset//      stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)}ssc.start()ssc.awaitTermination()ssc.stop()}
}

http://www.ppmy.cn/news/91223.html

相关文章

【JavaSE】Java基础语法(十四):Static

文章目录 概述特点与应用注意事项为什么一个静态方法中只能访问用static修饰的成员? 概述 Java中的static是一个修饰符&#xff08;也可称关键字&#xff09;&#xff0c;可以用于修饰变量、方法和代码块。 特点与应用 static修饰的成员具有以下特点&#xff1a; 被类的所有对…

华为nova11系列:一个月的深度体验感受,告诉你值不值得入手

作为一个追求时尚风格的年轻人&#xff0c; nova系列手机一直是我的关注重点。nova 11 Pro发布之后&#xff0c;独特少见的11号色一下子就戳中了我&#xff0c;于是第一时间我给我自己和我老婆分别下单了一台nova 11和nova 11 Pro。 作为主力机深度使用一个月后&#xff0c;可以…

MapReduce实现KNN算法分类推测鸢尾花种类

文章目录 代码地址一、KNN算法简介二、KNN算法示例&#xff1a;推测鸢尾花种类三、MapReduceHadoop实现KNN鸢尾花分类&#xff1a;1. 实现环境2.pom.xml 3.设计思路及代码1. KNN_Driver类2. MyData类3. KNN_Mapper类 4. KNN_Reducer类 代码地址 https://gitcode.net/m0_567453…

燕千云ChatGPT应用,用过的都说香

本期受访人物&#xff1a;张礼军 甄知科技联合创始人&#xff0c;CTO 首席产品官 2022年底&#xff0c;基于人工智能技术驱动的自然语言工具横空出世&#xff0c;一经推出&#xff0c;ChatGPT迅速火遍全球&#xff0c;几乎各行各业都在探索ChatGPT具体业务场景的应用&#xf…

Revit幕墙:用幕墙巧做屋面瓦及如何快速幕墙?

一、Revit中用幕墙巧做屋面瓦 屋面瓦重复性很高&#xff0c;我们如何快速的创建呢?下面我们来学会快速用幕墙来创建屋面瓦的技巧。 1.新建“公制轮廓-竖挺”族&#xff0c;以此来创建瓦的族(以便于载入项目中使用) 2.在轮廓族中绘制瓦的轮廓(轮廓需要闭合)&#xff0c;将族名称…

零基础web安全入门学习路线

相信很多新手都会遇到以下几个问题 1.零基础想学渗透怎么入手&#xff1f; 2.学习web渗透需要从哪里开始&#xff1f; 这让很多同学都处于迷茫状态而迟迟不下手&#xff0c;小编就在此贴给大家说一下web渗透的学习路线&#xff0c;希望对大家有帮助 同时本博客也会按照学习路…

【数据库】无效数据:软件测试对无效数据的处理

目录 一、无效数据的常见场景 &#xff08;1&#xff09;测试阶段 &#xff08;2&#xff09;测试方法 二、无效数据的概念 三、无效数据的影响 四、无效数据的识别 五、无效数据的处理方法 &#xff08;1&#xff09;拒绝无效数据 ① 拒绝无效数据的概念 ② 拒绝…

递归的学习

递归是一种解决计算问题的方法&#xff0c;其中解决方案取决于同一类问题的更小子集 说明&#xff1a; 1.自己调用自己&#xff0c;说过说每个函数对应着一种解决方案&#xff0c;自己调用自己意味着解决方案都是一样的 2.每次调用&#xff0c;函数处理的数据会较上次递减&a…