Hudi-IDEA编程

server/2024/9/23 5:18:50/

项目

一、Hudi+Spark+Kafka(Scala)

配置详见【1.Scala配置】

依赖详见【1.Hudi+Spark+Kafka依赖】

1-1 构建SparkSession对象

scala">  def main(args: Array[String]): Unit = {//1.构建SparkSession对象val spark: SparkSession = SparkUtils.createSparkSession(this.getClass);//2.从Kafka实时消费数据val kafkaStreamDF: DataFrame = readFromKafka(spark, "order-topic")//3.提取数据,转换数据类型val streamDF: DataFrame = process(kafkaStreamDF);//4.保存数据至Hudi表中:MOR(读取时保存)saveToHudi(streamDF);//5.流式应用启动以后,等待终止spark.streams.active.foreach(query => println(s"Query: ${query.name} is Running ............."))spark.streams.awaitAnyTermination()}

1-2 从Kafka/CSV文件读取数据

scala">  /*** 指定Kafka topic名称,实时消费数据** @param spark* @param topicName* @return*/def readFromKafka(spark: SparkSession, topicName: String): DataFrame = {spark.readStream.format("kafka") //指定Kafka.option("kafka.bootstrap.servers", "node1.itcast.cn:9099") //指定Kafka的服务IP和端口.option("subscribe", topicName) //订阅Kafka的topic的名称.option("startingOffsets", "latest") //从最新消费.option("maxOffsetsPerTrigger", 100000) //每次最多处理10万条数据.option("failOnDataLoss", value = false) //如果数据丢失是否失败.load()}/*** 读取CSV格式文本文件数据,封装到DataFrame数据集*/def readCsvFile(spark: SparkSession, path: String): DataFrame = {spark.read// 设置分隔符为\t.option("sep", "\\t")// 文件首行为列名称.option("header", "true")// 依据数值自动推断数据类型.option("inferSchema", "true")// 指定文件路径.csv(path)}

1-3 ETL转换后存储至Hudi表中

scala">  /*** 对Kafka获取数据,进行转换操作,获取所有字段的值,转换为String,以便保存到Hudi表* @param streamDF* @return*/def process(streamDF: DataFrame): DataFrame = {streamDF//选择字段.selectExpr("CAST(key AS STRING) order_id","CAST(value AS STRING) AS message","topic","partition","offset","timestamp")//解析message数据,提取字段值.withColumn("user_id",get_json_object(col("message"),"$.userId")).withColumn("order_time",get_json_object(col("message"),"$.orderTime")).withColumn("ip",get_json_object(col("message"),"$.ip")).withColumn("order_money",get_json_object(col("message"),"$.orderMoney")).withColumn("order_status",get_json_object(col("message"),"$.orderStatus"))//删除message字段.drop(col("message"))//转换订单日期时间格式为Long类型,作为Hudi表中合并数据字段.withColumn("ts",to_timestamp(col("order_time"),"yyyy-MM-dd HH:mm:ss.SSS"))//订单日期时间提取分区日期:yyyyMMdd.withColumn("day",substring(col("order_time"),0,10))}/*** 将流式数据集DataFrame保存至Hudi表,表类型可选:COW和MOR*/def saveToHudi(streamDF: DataFrame): Unit = {streamDF.writeStream.outputMode(OutputMode.Append()).queryName("query-hudi-streaming")// 针对每微批次数据保存.foreachBatch((batchDF: Dataset[Row], batchId: Long) => {println(s"============== BatchId: ${batchId} start ==============")writeHudiMor(batchDF) // TODO:表的类型MOR}).option("checkpointLocation", "/datas/hudi-spark/struct-ckpt-1001").start()}/*** 将数据集DataFrame保存到Hudi表中,表的类型:MOR(读取时合并)*/def writeHudiMor(dataframe: DataFrame): Unit = {import org.apache.hudi.DataSourceWriteOptions._import org.apache.hudi.config.HoodieWriteConfig._import org.apache.hudi.keygen.constant.KeyGeneratorOptions._dataframe.write.format("hudi").mode(SaveMode.Append)// 表的名称.option(TBL_NAME.key, "tbl_hudi_order")// 设置表的类型.option(TABLE_TYPE.key(), "MERGE_ON_READ")// 每条数据主键字段名称.option(RECORDKEY_FIELD_NAME.key(), "order_id")// 数据合并时,依据时间字段.option(PRECOMBINE_FIELD_NAME.key(), "ts")// 分区字段名称.option(PARTITIONPATH_FIELD_NAME.key(), "day")// 分区值对应目录格式,是否与Hive分区策略一致.option(HIVE_STYLE_PARTITIONING_ENABLE.key(), "true")// 插入数据,产生shuffle时,分区数目.option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2")// 表数据存储路径.save("/hudi-warehouse/tbl_hudi_order")}

1-4 SparkSQL加载Hudi表数据并分析

scala">  /*** 从Hudi表加载数据,指定数据存在路径*/def readFromHudi(spark: SparkSession, path: String): DataFrame = {// a. 指定路径,加载数据,封装至DataFrameval didiDF: DataFrame = spark.read.format("hudi").load(path);// b. 选择字段didiDF// 选择字段.select("order_id", "product_id", "type", "traffic_type", "pre_total_fee", "start_dest_distance", "departure_time" )}/*** 订单类型统计,字段:product_id*/def reportProduct(dataframe: DataFrame): Unit = {val reportDF: DataFrame = dataframe.groupBy("product_id").count();val to_name = udf((product_id: Int) => {product_id match {case 1 => "滴滴专车"case 2 => "滴滴企业专车"case 3 => "滴滴快车"case 4 => "滴滴企业快车"}})val resultDF: DataFrame = reportDF.select(to_name(col("product_id")).as("order_type"), //col("count").as("total") //)resultDF.printSchema();resultDF.show(10, truncate = false);}

二、Hudi+Flink+Kafka(Java)

依赖详见【2.Hudi+Flink+Kafka依赖】

2-1 从Kafka消费数据

第1步获取表执行环境无需赘述。

第2步创建输入表:指定了Kafka的服务IP和端口、topic等信息,从这里读取数据

第3步中转换数据为Hudi表中需要的格式(添加两个必须字段:数据合并字段ts,分区字段partition_day)

java">package cn.itcast.hudi;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class FlinkSQLKafkaDemo {public static void main(String[] args) {//1.获取表执行环境EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode() //流式.build();TableEnvironment tableEnvironment = TableEnvironment.create(settings);//2.创建输入表:从Kafka消费数据tableEnvironment.executeSql("CREATE TABLE order_kafka_source (\n" +"  orderId STRING,\n" +"  userId STRING,\n" +"  orderTime STRING,\n" +"  ip STRING,\n" +"  orderMoney DOUBLE,\n" +"  orderStatus INT\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'order-topic',\n" +"  'properties.bootstrap.servers' = 'node1.itcast.cn:9099',\n" +"  'properties.group.id' = 'gid-1001',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json',\n" +"  'json.fail-on-missing-field' = 'false',\n" +"  'json.ignore-parse-errors' = 'true'\n" +")");//3.转换数据:可以使用SQL,也可以是Table apiTable table = tableEnvironment.from("order_kafka_source")//添加字段:hudi表数据合并字段,"orderId":"20211122103434136000001" -> 20211122103434136.addColumns($("orderId").substring(0, 17).as("ts"))//添加字段:hudi表中分区字段,"orderTime":"2021-11-22 10:34:34.136" -> 2021-11-22.addColumns($("orderTime").substring(0, 10).as("partition_day"));tableEnvironment.createTemporaryView("view_order",table);//4.创建输出表:将结果数据输出tableEnvironment.executeSql("select * from view_order").print();}
}

2-2 将数据输出到hudi表中

第4步创建输出表:指定了输出Hudi表路径(本地路径、Hadoop等)、表类型、数据合并字段、分组字段等,数据输出到这里

第5步将数据插入到输出Hudi表中

java">package cn.itcast.hudi;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;/*** 基于Flink SQL Connector实现:实时消费Topic中数据,转换处理后,实时存储Hudi表中*/
public class FlinkSQLHudiDemo {public static void main(String[] args) {//1.获取表执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(5000);//由于增量将数据写入到Hudi表,所以需要启动Flink CheckPoint检查点EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode() //流式.build();StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env,settings);//2.创建输入表:从Kafka消费数据tableEnvironment.executeSql("CREATE TABLE order_kafka_source (\n" +"  orderId STRING,\n" +"  userId STRING,\n" +"  orderTime STRING,\n" +"  ip STRING,\n" +"  orderMoney DOUBLE,\n" +"  orderStatus INT\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'order-topic',\n" +"  'properties.bootstrap.servers' = 'node1.itcast.cn:9099',\n" +"  'properties.group.id' = 'gid-1001',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json',\n" +"  'json.fail-on-missing-field' = 'false',\n" +"  'json.ignore-parse-errors' = 'true'\n" +")");//3.转换数据:可以使用SQL,也可以是Table apiTable table = tableEnvironment.from("order_kafka_source")//添加字段:hudi表数据合并字段,"orderId":"20211122103434136000001" -> 20211122103434136.addColumns($("orderId").substring(0, 17).as("ts"))//添加字段:hudi表中分区字段,"orderTime":"2021-11-22 10:34:34.136" -> 2021-11-22.addColumns($("orderTime").substring(0, 10).as("partition_day"));tableEnvironment.createTemporaryView("view_order", table);//4.创建输出表:将数据输出到hudi表中tableEnvironment.executeSql("CREATE TABLE order_hudi_sink (\n" +"  orderId STRING PRIMARY KEY NOT ENFORCED,\n" +"  userId STRING,\n" +"  orderTime STRING,\n" +"  ip STRING,\n" +"  orderMoney DOUBLE,\n" +"  orderStatus INT,\n" +"  ts STRING,\n" +"  partition_day STRING\n" +")\n" +"PARTITIONED BY (partition_day) \n" +"WITH (\n" +"  'connector' = 'hudi',\n" +"  'path' = 'file:///D:/flink_hudi_order',\n" +"  'table.type' = 'MERGE_ON_READ',\n" +"  'write.operation' = 'upsert',\n" +"  'hoodie.datasource.write.recordkey.field' = 'orderId',\n" +"  'write.precombine.field' = 'ts',\n" +"  'write.tasks'= '1'\n" +")");// 5.通过子查询方式,将数据写入输出表(注意,字段顺序要一致)tableEnvironment.executeSql("INSERT INTO order_hudi_sink\n" +"SELECT\n" +"  orderId, userId, orderTime, ip, orderMoney, orderStatus, ts, partition_day\n" +"FROM view_order");}
}

2-3 从hudi表中加载数据

创建输入表,加载Hudi表查询数据即可。

java">package cn.itcast.hudi;import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;/*** 基于Flink SQL Connector实现:从Hudi表中加载数据,编写SQL查询*/
public class FlinkSQLReadDemo {public static void main(String[] args) {//1.获取表执行环境EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();TableEnvironment tableEnvironment = TableEnvironment.create(settings);//2.创建输入表,加载Hudi表查询数据tableEnvironment.executeSql("CREATE TABLE order_hudi(\n" +"  orderId STRING PRIMARY KEY NOT ENFORCED,\n" +"  userId STRING,\n" +"  orderTime STRING,\n" +"  ip STRING,\n" +"  orderMoney DOUBLE,\n" +"  orderStatus INT,\n" +"  ts STRING,\n" +"  partition_day STRING\n" +")\n" +"PARTITIONED BY (partition_day)\n" +"WITH (\n" +"  'connector' = 'hudi',\n" +"  'path' = 'file:///D:/flink_hudi_order',\n" +"  'table.type' = 'MERGE_ON_READ',\n" +"  'read.streaming.enabled' = 'true',\n" +"  'read.streaming.check-interval' = '4'\n" +")");//3.执行查询语句,流式读取Hudi数据tableEnvironment.executeSql("SELECT orderId, userId, orderTime, ip, orderMoney, orderStatus, ts ,partition_day FROM order_hudi").print();}
}

附:依赖

1.Hudi+Spark+Kafka依赖

<repositories><repository><id>aliyun</id><url>http://maven.aliyun.com/nexus/content/groups/public/</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>jboss</id><url>http://repository.jboss.com/nexus/content/groups/public</url></repository>
</repositories><properties><scala.version>2.12.10</scala.version><scala.binary.version>2.12</scala.binary.version><spark.version>3.0.0</spark.version><hadoop.version>2.7.3</hadoop.version><hudi.version>0.9.0</hudi.version>
</properties><dependencies><!-- 依赖Scala语言 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><!-- Spark Core 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Spark SQL 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Structured Streaming + Kafka  依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><!-- Hadoop Client 依赖 --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!-- hudi-spark3 --><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-spark3-bundle_2.12</artifactId><version>${hudi.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-avro_2.12</artifactId><version>${spark.version}</version></dependency><!-- Spark SQL 与 Hive 集成 依赖 --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.4.13</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.5.12</version></dependency></dependencies><build><outputDirectory>target/classes</outputDirectory><testOutputDirectory>target/test-classes</testOutputDirectory><resources><resource><directory>${project.basedir}/src/main/resources</directory></resource></resources><!-- Maven 编译的插件 --><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.0</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins>
</build>

2.Hudi+Flink+Kafka依赖

<repositories><repository><id>nexus-aliyun</id><name>Nexus aliyun</name><url>http://maven.aliyun.com/nexus/content/groups/public</url></repository><repository><id>central_maven</id><name>central maven</name><url>https://repo1.maven.org/maven2</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository>
</repositories><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><flink.version>1.12.2</flink.version><hadoop.version>2.7.3</hadoop.version><mysql.version>8.0.16</mysql.version>
</properties><dependencies><!-- Flink Client --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Table API & SQL --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId><version>0.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2-uber</artifactId><version>2.7.5-10.0</version></dependency><!-- MySQL/FastJson/lombok --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency><!-- slf4j及log4j --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><!-- 编译插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><!--<encoding>${project.build.sourceEncoding}</encoding>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.18.1</version><configuration><useFile>false</useFile><disableXmlReport>true</disableXmlReport><includes><include>**/*Test.*</include><include>**/*Suite.*</include></includes></configuration></plugin><!-- 打jar包插件(会包含所有依赖) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><!-- <mainClass>com.itcast.flink.batch.FlinkBatchWordCount</mainClass> --></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>

附:报错

1.运行报错

【报错代码】

scala">Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

【原因】

windows下运行时需要安装Windows下运行的支持插件:hadoop2.7-common-bin

网址:https://gitcode.net/mirrors/cdarlint/winutils?utm_source=csdn_github_accelerator

选择需要版本的包下载,配置环境变量HADOOP_HOME和path,重启idea再运行就不会报错了

cd hudi/server/hadoop

./bin/hadoop checknative

2.运行报错

【报错】

NoSuchFieldError: INSTANCE

【原因】

由于代码中的httpclient和httpcore版本过高, 而hadoop中的版本过低导致(<4.3)

【解决】

将&HADOOP_HOME/share/hadoop/common/lib 下和 &HADOOP_HOME/share/hadoop/tools/lib/下的httpclient和httpcore替换成高版本(>4.3)

cd /home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
rm httpclient-4.2.5.jar
rm httpcore-4.2.5.jar
cd /home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib
rm httpclient-4.2.5.jar
rm httpcore-4.2.5.jar
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpclient-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpcore-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/common/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpclient-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib
scp -r D:\Users\zh\Desktop\Hudi\compressedPackage\httpcore-4.4.jar zhangheng@10.8.4.212:/home/zhangheng/hudi/server/hadoop/share/hadoop/tools/lib

3.运行警告

【警告】

java">WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped

【原因】

spark版本太高,最开始选的spark版本为v3.0.0,但是不太合适,改成v2.4.6,就ok了。

【解决】

官方网址:https://archive.apache.org/dist/spark/spark-2.4.6/
下载安装配置环境变量:spark-2.4.6-bin-hadoop2.7.tgz   

附:配置

1.Scala配置

1.Windows安装Scala:https://www.scala-lang.org/
安装完成后配置环境变量SCALA_HOME、path
输入scala -version查看是否安装成功
2.idea安装Scala插件:plugins搜索scala直接安装
重启之后,找到file(工具)——>project structure,找到左下角Glob libararies,然后点击中间 + 号,选择最后一个 Scala SDK,找到自己安装scala的版本,点击ok即可

2.idea中虚拟机配置

Tools -> Deployment -> Browse Remote Host
配置自己虚拟机的SSH configuration、Root path、Web server URL。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传


http://www.ppmy.cn/server/6189.html

相关文章

Zookeeper和Kafka的部署

目录 一、Zookeeper的基本概念 1. Zookeeper定义 2. Zookeeper工作机制 3. Zookeeper特点 4. Zookeeper数据结构 5. Zookeeper应用场景 5.1 统一命名服务 5.2 统一配置管理 5.3 统一集群管理 5.4 服务器动态上下线 5.5 软负载均衡 6. Zookeeper 选举机制 6.1 第一…

Redis中的Lua脚本(一)

Lua脚本 概述 Redis从2.6版本开始引入对Lua脚本的支持&#xff0c;通过在服务器中嵌入Lua环境,Redis客户端可以使用Lua脚本&#xff0c;直接在服务器端原子地执行多个Redis命令。其中使用EVAL命令可以直接对输入的脚本进行求值: 127.0.0.1:6379> EVAL "return hello…

什么是神经网络和机器学习?【云驻共创】

什么是神经网络和机器学习&#xff1f; 一.背景 在当今数字化浪潮中&#xff0c;神经网络和机器学习已成为科技领域的中流砥柱。它们作为人工智能的支柱&#xff0c;推动了自动化、智能化和数据驱动决策的进步。然而&#xff0c;对于初学者和专业人士来说&#xff0c;理解神经…

(四)SQL面试题(连续登录、近N日留存)学习简要笔记 #CDA学习打卡

目录 一. 连续登录N天的用户数量 1&#xff09;举例题目 2&#xff09;分析思路 3&#xff09;解题步骤 &#xff08;a&#xff09;Step1&#xff1a;选择12月的记录&#xff0c;并根据用户ID和登录日期先去重 &#xff08;b&#xff09;Step2&#xff1a;创建辅助列a_rk…

超平实版Pytorch CNN Conv2d

torch.nn.Conv2d 基本参数 in_channels (int) 输入的通道数量。比如一个2D的图片&#xff0c;由R、G、B三个通道的2D数据叠加。 out_channels (int) 输出的通道数量。 kernel_size (int or tuple) kernel&#xff08;也就是卷积核&#xff0c;也可…

MySQL到Doris的StreamingETL实现(Flink CDC 3.0)

MySQL到Doris的StreamingETL实现&#xff08;Flink CDC 3.0&#xff09; 1 环境准备 1&#xff09;安装FlinkCDC [roothadoop1 software]$ tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/2&#xff09;拖入MySQL以及Doris依赖包 将flink-cdc-pipeline-connector-do…

5.1激光雷达跟随(冰达机器人)

5.1激光雷达跟随功能开发 5.1.1激光雷达跟随功能需求分析 在完成激光雷达跟踪之前&#xff0c;我们先来拆解一下功能。要实现跟随&#xff0c;首先需要确定跟随的目标&#xff0c;在这个例程中&#xff0c;我们使机器人根据离它最近的物体。周围物体的距离可以通过激光雷达测量…

python基础语法+爬虫+图像处理+NumpyPandas数据处理(12天速成,第7天上-爬虫Scrapy)

爬虫&#xff08;Scrapy&#xff09;:写一段程序代码&#xff08;网络访问&#xff09;&#xff0c;自动获取网页&#xff08;网络&#xff09;上的数据服务端语言&#xff1a;网络编程&#xff0c;都可以作为爬虫java c c python 等均可写爬虫程序js不是一个典型的服务端程序&…