文章目录
- day11_实时类标签和ElasticSearch高级
- 一、实时代码重构(掌握)
- 1、重构思路
- 2、基类
- 2.1 ETL基类
- 2.2 指标统计基类
- 2.3 标签计算基类
- 2.4 继续往上抽取爷爷类(扩展)
- 3、重构后的代码
- 3.1 Nginx日志
- 3.2 用户行为日志
- 3.3 转换率标签
- 二、业务库数据统计(了解)
- 1、数据实时采集(掌握)
- 1.1 采集方案
- 1.2 MySQLCDC介绍
- 1.3 SeaTunnel配置文件
- 2、窗口和Watermark(了解)
- 3、业务数据介绍(了解)
- 4、指标统计(了解)
- 三、近期消费活跃度实时标签开发
- 1、需求分析
- 2、标签开发
- 四、ElasticSearch高级(掌握)
- 1、ElasticSearch整体架构
- 2、倒排索引(ms)
- 3、ElasticSearch写入数据的流程
- 4、ElasticSearch读取数据的流程
day11_实时类标签和ElasticSearch高级
一、实时代码重构(掌握)
1、重构思路
ETL的过程可以分成4个步骤:
(1)创建SparkSession(基本相同,app_name名称不同)
(2)读取kafka源表(基本相同,消费的数据源Topic不同)
(3)处理日志(完全不同)
(4)写出到kafka(基本相同,写出的Topic不同)
(5)写出到hdfs(基本相同,写出的HDFS路径不同)
指标计算的过程可以分成4个步骤:
(1)创建SparkSession(基本相同,app_name名称不同)
(2)读取kafka源表(基本相同,消费的数据源Topic不同)
(3)指标计算(完全不同)
(4)写出到doris(基本相同,写出的表名称不同)
基类抽取的思路,不一样的部分交给子类实现, 基类中只是声明一下,一样的部分在基类中实现,子类只要继承。
具体方法:
(1)可以把一些通用的方法定义成静态/类/实例方法,这样可以方便其他代码的调用
(2)计算逻辑偶尔发生变化的代码要抽取成实例方法,以参数传递的形式去修改不同的地方,方便后续进行复写
(3)计算逻辑完全不同,需要子类必须进行实现的方法可以定义成抽象方法
2、基类
2.1 ETL基类
from pyspark.sql import SparkSession, DataFrame
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import abcos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'class AbstractStreamingETL(metaclass=abc.ABCMeta):# 创建SparkSession对象def create_spark(self,app_name,partitions=2):spark = SparkSession.builder \.appName(app_name) \.master("local[*]") \.config("spark.sql.shuffle.partitions", partitions) \.getOrCreate()# 配置checkpointLocation路径,推荐使用HDFS路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://192.168.88.166:8020/xtzg/chk")return spark# 数据输入:读取Kafka中的数据def read_kafka_data(self,spark,read_topic):init_df = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "192.168.88.166:9092") \.option("subscribe", read_topic) \.option("startingOffsets", "earliest") \.load()return init_df# value字段解码的操作def cast_get_value(self,init_df):return init_df.select(init_df.value.cast(StringType()).alias("value"))@abc.abstractmethoddef etl(self,pre_etl_df:DataFrame):"""ETL核心方法:param pre_etl_df: ETL之前的原始数据:return:"""passdef write_2_hdfs(self,etl_df, partition_field, hdfs_path):"""输出到HDFS:param etl_df: ETL之后的数据:param partition_field: 获取分区字段的数据字段名称:param hdfs_path: 输出到HDFS的什么地方,只需要传相对路径即可:return:"""# 新增一个分区字段dt_df = etl_df.withColumn("dt", F.split(partition_field, " ")[0])# partitionBy表示按照哪个字段进行分区dt_df.writeStream.format("orc").partitionBy("dt") \.option("path", "hdfs://192.168.88.166:8020/xtzg/etl/"+hdfs_path) \.start()def write_2_kafka(self,etl_df:DataFrame,write_topic):# 从DataFrame中获取所有的字段信息fields = etl_df.columnskafka_df = etl_df.select(F.to_json(F.struct(*fields)).alias("value"))kafka_df.writeStream.format("kafka") \.option("kafka.bootstrap.servers", "192.168.88.166:9092") \.option("topic", write_topic) \.start().awaitTermination()def execute(self, app_name, partitions, read_topic, partition_field, hdfs_path, write_topic):# 1- 创建SparkSession对象spark = self.create_spark(app_name,partitions)# 2- 数据输入:读取Kafka中的数据init_df = self.read_kafka_data(spark,read_topic)# 3- 数据ETL处理# 3.1- value字段解码的操作type_cast_df = self.cast_get_value(init_df)# 3.2- 通过正则表达式提取Nginx的字段etl_df = self.etl(type_cast_df)# 4- 数据输出,启动流式任务# 注意:ETL中的结果数据是要同时输出到HDFS和Kafka一份,因此可以直接将awaitTermination()加载最后一个start()的后面# 4.1- 输出到HDFSself.write_2_hdfs(etl_df,partition_field, hdfs_path)# 4.2- 输出到Kafkaself.write_2_kafka(etl_df,write_topic)
2.2 指标统计基类
from pyspark.sql import SparkSession, DataFrame
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import abcos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'class AbstractStreamingAnalysis(metaclass=abc.ABCMeta):# 创建SparkSession对象def create_spark(self,app_name,partitions=2):spark = SparkSession.builder \.appName(app_name) \.master("local[*]") \.config("spark.sql.shuffle.partitions", partitions) \.getOrCreate()# 配置checkpointLocation路径,推荐使用HDFS路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://192.168.88.166:8020/xtzg/chk")return spark# 数据输入:读取Kafka中的数据def read_kafka_data(self, spark, read_topic):init_df = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "192.168.88.166:9092") \.option("subscribe", read_topic) \.option("startingOffsets", "earliest") \.load()return init_dfdef parse_value(self, init_df, fields):# 3.1- value字段类型转换type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))# 3.2- 从JSON中提取一个个字段parse_json_df = type_cast_df.select(F.json_tuple("value", *fields).alias(*fields))return parse_json_df@abc.abstractmethoddef analysis(self, parse_json_df: DataFrame):"""指标统计核心部分:param parse_json_df: 从Kafka中读取进来的数据:return: 指标计算完成以后的数据"""passdef write_doris(self, result_df, doris_table):# 输出到Dorisdef write_2_doris(batch_df: DataFrame, batch_id):batch_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/log_analysis_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table=doris_table,mode="append",properties={'user': 'root', 'password': '123456'})doris_writer = result_df.writeStream.foreachBatch(write_2_doris).outputMode("update").start()return doris_writerdef write_kafka(self, result_df:DataFrame, write_topic):# 输出到Kafka中# 将DataFrame中每条数据转成JSON格式fields = result_df.columnskafka_df = result_df.select(F.to_json(F.struct(*fields)).alias("value"))# 输出kafka_writer = kafka_df.writeStream.format("kafka").outputMode("update") \.option("kafka.bootstrap.servers", "192.168.88.166:9092") \.option("topic", write_topic) \.start()return kafka_writerdef execute(self,app_name,partitions,read_topic,fields,doris_table=None,write_topic=None):# 1- 创建SparkSession对象spark = self.create_spark(app_name,partitions)# 2- 数据输入:读取Kafka中的数据init_df = self.read_kafka_data(spark,read_topic)# 3- 数据处理parse_json_df = self.parse_value(init_df,fields)# 3.3- 指标统计result_df = self.analysis(parse_json_df)# 4- 数据输出"""| 输出到Kafka | 输出到Doris || 输出 | 输出 || 输出 | 不输出 || 不输出 | 输出 || 不输出 | 不输出 | """# 输出到Dorisdoris_writer = Noneif doris_table is not None:doris_writer = self.write_doris(result_df, doris_table)# 输出到Kafkakafka_writer = Noneif write_topic is not None:kafka_writer = self.write_kafka(result_df, write_topic)if doris_writer is not None:doris_writer.awaitTermination()elif kafka_writer is not None:kafka_writer.awaitTermination()# 既不输出到Kafka,也不输出到Dorisif write_topic is None and doris_table is None:result_df.writeStream.format("console").outputMode("update").start().awaitTermination()
2.3 标签计算基类
from pyspark.sql import SparkSession, DataFrame
import os
from tags.utils.rule_parse_util import RuleParse
import abcos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'# metaclass=abc.ABCMeta这种写法不是继承的意思,而是表示该类是一个抽象类
class AbstractStreamingTag(metaclass=abc.ABCMeta):# 1- 创建SparkSession对象def create_spark(self, app_name, partitions=2):spark = SparkSession.builder \.appName(app_name) \.master("local[*]") \.config("spark.sql.shuffle.partitions", partitions) \.getOrCreate()# 配置checkpointLocation路径,推荐使用HDFS路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://192.168.88.166:8020/xtzg/chk")return spark# 3- 读取标签配置表数据def read_all_tag(self, spark):all_tag_df = spark.read.jdbc(url="jdbc:mysql://192.168.88.166:3306/tags_info?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table="tbl_basic_tag",properties={'user': 'root', 'password': '123456'})return all_tag_df# 4- 读取对应的性别四级标签配置中的rule规则内容# 5- 解析rule规则,得到实例对象def read_and_parse_rule(self, all_tag_df, four_tag_id):rule_str = all_tag_df.where(f"id={four_tag_id}").first().rulerule_obj = RuleParse.parse_rule(rule_str)return rule_obj# 6- 根据解析好的rule规则,读取对应的业务数据def read_kafka_data(self, rule_obj, spark):business_df = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", rule_obj.nodes) \.option("subscribe", rule_obj.table) \.option("startingOffsets", rule_obj.range) \.load()return business_df# 7- 根据四级标签ID,得到五级标签配置数据def read_five_tag(self, all_tag_df, four_tag_id):five_tag_df = all_tag_df.where(f"pid={four_tag_id}").select("id", "rule")return five_tag_df# 8- 打标签的抽象方法,由子类继承以后进行具体实现"""@abc.abstractmethod标记一个方法为抽象方法。能够实现强制要求子类进行方法的具体实现"""@abc.abstractmethoddef mark_tag(self,business_df:DataFrame, five_tag_df:DataFrame, rule_obj):pass# 9- 将结果数据输出到ElasticSearch中def write_2_es(self, result_df):result_df.writeStream.format("es").outputMode("update") \.option("es.nodes", "192.168.88.166:9200") \.option("es.resource", "user_profile_tags") \.option("es.mapping.id", "user_id") \.option("es.write.operation", "upsert") \.start().awaitTermination()def execute(self,app_name,partitions,four_tag_id):# 1- 创建SparkSession对象spark = self.create_spark(app_name, partitions)# 3- 读取标签配置表数据all_tag_df = self.read_all_tag(spark)# 4- 读取对应的性别四级标签配置中的rule规则内容# 5- 解析rule规则,得到实例对象rule_obj = self.read_and_parse_rule(all_tag_df, four_tag_id)# 6- 根据解析好的rule规则,读取对应的业务数据business_df = self.read_kafka_data(rule_obj, spark)# 7- 根据四级标签ID,得到五级标签配置数据five_tag_df = self.read_five_tag(all_tag_df, four_tag_id)# 8- 将业务数据与五级标签配置数据进行关联,给用户打上五级标签result_df = self.mark_tag(business_df,five_tag_df,rule_obj)# 9- 将结果数据输出到ElasticSearch中self.write_2_es(result_df)# 10- 释放资源spark.stop()
可能遇到的错误:
原因: 结构化流中目前不支持无界的DataFrame与有界的DataFrame进行full join
解决: 目前暂时无法解决
https://archive.apache.org/dist/spark/docs/3.1.2/structured-streaming-programming-guide.html#support-matrix-for-joins-in-streaming-queries
2.4 继续往上抽取爷爷类(扩展)
-
代码所在位置
-
爷爷类
from pyspark.sql import SparkSession, DataFrame
import os
import pyspark.sql.functions as F
import abcos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'class AbstractStreamingCommonBase(metaclass=abc.ABCMeta):# 创建SparkSession对象def create_spark(self, app_name, partitions=2):spark = SparkSession.builder \.appName(app_name) \.master("local[*]") \.config("spark.sql.shuffle.partitions", partitions) \.getOrCreate()# 配置checkpointLocation路径,推荐使用HDFS路径spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://192.168.88.166:8020/xtzg/chk")return spark# 数据输入:读取Kafka中的数据def read_kafka_data(self, spark, read_topic):init_df = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "192.168.88.166:9092") \.option("subscribe", read_topic) \.option("startingOffsets", "earliest") \.load()return init_dfdef write_kafka(self, result_df:DataFrame, write_topic):# 输出到Kafka中# 将DataFrame中每条数据转成JSON格式fields = result_df.columnskafka_df = result_df.select(F.to_json(F.struct(*fields)).alias("value"))# 输出kafka_writer = kafka_df.writeStream.format("kafka").outputMode("update") \.option("kafka.bootstrap.servers", "192.168.88.166:9092") \.option("topic", write_topic) \.start()return kafka_writerdef write_hdfs(self,etl_df, partition_field, hdfs_path):"""输出到HDFS:param etl_df: ETL之后的数据:param partition_field: 获取分区字段的数据字段名称:param hdfs_path: 输出到HDFS的什么地方,只需要传相对路径即可:return:"""# 新增一个分区字段dt_df = etl_df.withColumn("dt", F.split(partition_field, " ")[0])# partitionBy表示按照哪个字段进行分区hdfs_writer = dt_df.writeStream.format("orc").partitionBy("dt") \.option("path", "hdfs://192.168.88.166:8020/xtzg/etl/"+hdfs_path) \.start()return hdfs_writerdef write_doris(self, result_df, doris_table):# 输出到Dorisdef write_2_doris(batch_df: DataFrame, batch_id):batch_df.write.jdbc(url="jdbc:mysql://192.168.88.166:9030/log_analysis_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",table=doris_table,mode="append",properties={'user': 'root', 'password': '123456'})doris_writer = result_df.writeStream.foreachBatch(write_2_doris).outputMode("update").start()return doris_writer@abc.abstractmethoddef execute(self, app_name, partitions, read_topic, fields=None,partition_field=None, hdfs_path=None, doris_table=None, write_topic=None):pass
- 爸爸类_1
from pyspark.sql import DataFrame
import os
from pyspark.sql.types import StringType
import abc
from abstract_streaming_common_base import AbstractStreamingCommonBaseos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'class AbstractStreamingETL(AbstractStreamingCommonBase,metaclass=abc.ABCMeta):# value字段解码的操作def cast_get_value(self,init_df):return init_df.select(init_df.value.cast(StringType()).alias("value"))@abc.abstractmethoddef etl(self,pre_etl_df:DataFrame):"""ETL核心方法:param pre_etl_df: ETL之前的原始数据:return:"""passdef execute(self, app_name, partitions, read_topic, fields=None,partition_field=None, hdfs_path=None, doris_table=None, write_topic=None):# 1- 创建SparkSession对象spark = self.create_spark(app_name,partitions)# 2- 数据输入:读取Kafka中的数据init_df = self.read_kafka_data(spark,read_topic)# 3- 数据ETL处理# 3.1- value字段解码的操作type_cast_df = self.cast_get_value(init_df)# 3.2- 通过正则表达式提取Nginx的字段etl_df = self.etl(type_cast_df)# 4- 数据输出,启动流式任务# 注意:ETL中的结果数据是要同时输出到HDFS和Kafka一份,因此可以直接将awaitTermination()加载最后一个start()的后面# 4.1- 输出到HDFSself.write_hdfs(etl_df,partition_field, hdfs_path)# 4.2- 输出到Kafkaself.write_kafka(etl_df,write_topic).awaitTermination()
- 爸爸类_2
from pyspark.sql import DataFrame
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
import abc
from abstract_streaming_common_base import AbstractStreamingCommonBaseos.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'class AbstractStreamingAnalysis(AbstractStreamingCommonBase,metaclass=abc.ABCMeta):def parse_value(self, init_df, fields):# 3.1- value字段类型转换type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))# 如果不需要对JSON进行解析,那么直接返回value数据if fields is None:return type_cast_df# 3.2- 从JSON中提取一个个字段parse_json_df = type_cast_df.select(F.json_tuple("value", *fields).alias(*fields))return parse_json_df@abc.abstractmethoddef analysis(self, parse_json_df: DataFrame):"""指标统计核心部分:param parse_json_df: 从Kafka中读取进来的数据:return: 指标计算完成以后的数据"""passdef execute(self, app_name, partitions, read_topic, fields=None,partition_field=None, hdfs_path=None, doris_table=None, write_topic=None):# 1- 创建SparkSession对象spark = self.create_spark(app_name,partitions)# 2- 数据输入:读取Kafka中的数据init_df = self.read_kafka_data(spark,read_topic)# 3- 数据处理parse_json_df = self.parse_value(init_df,fields)# 3.3- 指标统计result_df = self.analysis(parse_json_df)# 4- 数据输出"""| 输出到Kafka | 输出到Doris || 输出 | 输出 || 输出 | 不输出 || 不输出 | 输出 || 不输出 | 不输出 | """# 输出到Dorisdoris_writer = Noneif doris_table is not None:doris_writer = self.write_doris(result_df, doris_table)# 输出到Kafkakafka_writer = Noneif write_topic is not None:kafka_writer = self.write_kafka(result_df, write_topic)# 不管代码中有多少个start(),但是一定是要在最后面来调用awaitTermination()触发结构化流程序的运行if doris_writer is not None:doris_writer.awaitTermination()elif kafka_writer is not None:kafka_writer.awaitTermination()# 既不输出到Kafka,也不输出到Dorisif write_topic is None and doris_table is None:result_df.writeStream.format("console").outputMode("update").start().awaitTermination()
3、重构后的代码
3.1 Nginx日志
- ETL
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType, MapType
import requests
from user_agents import parse
from tags.base.abstract_streaming_etl import AbstractStreamingETLclass NginxETL(AbstractStreamingETL):def etl(self,pre_etl_df:DataFrame):# 通过正则表达式提取Nginx的字段pattern = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'regexp_df = pre_etl_df.select(F.regexp_extract("value", pattern, 1).alias("ip"),F.regexp_extract("value", pattern, 3).alias("datetime"),F.regexp_extract("value", pattern, 4).alias("t1"),F.regexp_extract("value", pattern, 5).alias("request"),F.regexp_extract("value", pattern, 6).alias("url"),F.regexp_extract("value", pattern, 7).alias("protocol"),F.regexp_extract("value", pattern, 8).alias("code"),F.regexp_extract("value", pattern, 9).alias("sendbytes"),F.regexp_extract("value", pattern, 10).alias("refferer"),F.regexp_extract("value", pattern, 11).alias("useragent"),F.regexp_extract("value", pattern, 12).alias("proxyaddr"))# 日期时间格式转换datetime_df = regexp_df.withColumn("datetime",F.from_unixtime(F.unix_timestamp("datetime", "dd/MMM/yyyy:HH:mm:ss Z"),"yyyy-MM-dd HH:mm:ss"))# IP地理位置解析@F.udf(returnType=StringType())def parse_ip(ip_str):params = {"query": ip_str,"co": "","resource_id": "6006","oe": "utf8",}# 发送请求response = requests.get(url="https://opendata.baidu.com/api.php", params=params)# 解析响应内容result = response.json()status = result['status']if status == '0':# 正常try:return result['data'][0]['location'].split(" ")[0]except:return "未知区域"else:return "未知区域"area_df = datetime_df.withColumn("area", parse_ip("ip"))# UA解析@F.udf(returnType=MapType(keyType=StringType(), valueType=StringType()))def parse_ua(ua_str):result = parse(ua_str)os = result.os.familybrowser = result.browser.familydevice = result.device.modelreturn {"os": os, "browser": browser, "device": device}ua_df = area_df.withColumn("os", parse_ua("useragent")['os']) \.withColumn("browser", parse_ua("useragent")['browser']) \.withColumn("device", parse_ua("useragent")['device'])return ua_dfif __name__ == '__main__':etl_obj = NginxETL()etl_obj.execute(app_name="nginx_etl",partitions=2,read_topic="xtzg_nginx_log",partition_field="datetime",hdfs_path="dwd_nginx_etl_result",write_topic="dwd_nginx_etl_result")
- 指标统计
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from tags.base.abstract_streaming_analysis import AbstractStreamingAnalysisclass NginxAnalysis(AbstractStreamingAnalysis):def analysis(self, parse_json_df: DataFrame):result_df = parse_json_df.groupBy("ip").agg(F.count("ip").alias("pv"),F.lit(1).alias("uv"),F.first("area").alias("area"),F.first("code").alias("status_code"),F.first("os").alias("device_os"),F.first("device").alias("device_brand"),F.first("browser").alias("browser_name"),F.min("datetime").alias("first_access_time"),F.max("datetime").alias("last_access_time"))return result_dfif __name__ == '__main__':analysis_obj = NginxAnalysis()analysis_obj.execute(app_name="nginx_analysis",partitions=2,read_topic="dwd_nginx_etl_result",fields=["ip","datetime","code","area","os","browser","device"],doris_table="nginx_log_result")
3.2 用户行为日志
- ETL
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from tags.base.abstract_streaming_etl import AbstractStreamingETLclass UserEventETL(AbstractStreamingETL):def etl(self,pre_etl_df:DataFrame):# 解析JSON得到一个个字段parse_json_df = pre_etl_df.select(F.get_json_object("value", "$.phone_num").alias("phone_num"),F.get_json_object("value", "$.system_id").alias("system_id"),F.get_json_object("value", "$.area.province").alias("province"),F.get_json_object("value", "$.area.city").alias("city"),F.get_json_object("value", "$.area.sp").alias("sp"),F.get_json_object("value", "$.user_name").alias("user_name"),F.get_json_object("value", "$.user_id").alias("user_id"),F.get_json_object("value", "$.visit_time").alias("visit_time"),F.get_json_object("value", "$.goods_type").alias("goods_type"),F.get_json_object("value", "$.minimum_price").alias("minimum_price"),F.get_json_object("value", "$.user_behavior.is_browse").alias("is_browse"),F.get_json_object("value", "$.user_behavior.is_order").alias("is_order"),F.get_json_object("value", "$.user_behavior.is_buy").alias("is_buy"),F.get_json_object("value", "$.user_behavior.is_back_order").alias("is_back_order"),F.get_json_object("value", "$.user_behavior.is_cart").alias("is_cart"),F.get_json_object("value", "$.goods_detail.goods_name").alias("goods_name"),F.get_json_object("value", "$.goods_detail.browse_page").alias("browse_page"),F.get_json_object("value", "$.goods_detail.browse_time").alias("browse_time"),F.get_json_object("value", "$.goods_detail.to_page").alias("to_page"),F.get_json_object("value", "$.goods_detail.to_time").alias("to_time"),F.get_json_object("value", "$.goods_detail.page_keywords").alias("page_keywords"))return parse_json_dfif __name__ == '__main__':etl_obj = UserEventETL()etl_obj.execute(app_name="userevent_etl",partitions=2,read_topic="xtzg_user_event",partition_field="visit_time",hdfs_path="dwd_user_event_etl_result",write_topic="dwd_user_event_etl_result")
- 指标统计
from pyspark.sql import SparkSession, DataFrame
import os
import pyspark.sql.functions as F
from tags.base.abstract_streaming_analysis import AbstractStreamingAnalysisclass UserEventAnalysis(AbstractStreamingAnalysis):def analysis(self, parse_json_df: DataFrame):result_df = parse_json_df.groupBy("user_id").agg(F.count("city").alias("area_num"), # 区域(城市)数量F.sum("is_browse").alias("browse_num"), # 浏览行为数F.sum("is_cart").alias("cart_num"), # 加购行为数F.sum("is_order").alias("order_num"), # 下单行为数F.sum("is_buy").alias("buy_num"), # 付款行为数F.sum("is_back_order").alias("back_order_num"), # 退货行为数F.count("goods_name").alias("goods_num"), # 浏览商品数F.count("browse_page").alias("browse_page_num"), # 浏览页面数F.sum((F.unix_timestamp("to_time") - F.unix_timestamp("browse_time")) / 60).alias("stay_duration"),# 停留时间(分钟)= to_time(离开当前页面的时间)-browse_time (开始浏览的时间)F.avg("minimum_price").alias("avg_price"), # 平均商品价格F.count("page_keywords").alias("keywords_num"), # 浏览的关键词数量F.min("visit_time").alias("first_visit_time"), # 首次访问时间F.max("visit_time").alias("last_visit_time") # 末次访问时间)return result_dfif __name__ == '__main__':analysis_obj = UserEventAnalysis()analysis_obj.execute(app_name="userevent_analysis",partitions=2,read_topic="dwd_user_event_etl_result",fields=["phone_num","system_id","province","city","sp","user_name","user_id","visit_time","goods_type","minimum_price","is_browse","is_order","is_buy","is_back_order","is_cart","goods_name","browse_page","browse_time","to_page","to_time","page_keywords"],doris_table="user_event_result",write_topic="dws_user_event_analysis")
3.3 转换率标签
- 代码
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType
from tags.base.abstract_streaming_tag import AbstractStreamingTagclass ConversionPercent(AbstractStreamingTag):def mark_tag(self,business_df:DataFrame, five_tag_df:DataFrame, rule_obj):# value字段类型转换type_cast_df = business_df.select(business_df.value.cast(StringType()).alias("value"))# JSON中提取字段:selectFields过滤我们打标签需要的业务数据字段fields = rule_obj.selectFields.split(",")parse_json_df = type_cast_df.select(F.json_tuple("value", *fields).alias(*fields))# 计算转化率new_business_df_tmp = parse_json_df.groupBy("user_id").agg((F.sum("buy_num") / F.sum("browse_num")).alias("rate"))# 在实际的生成/真实环境中,不允许有此类打印输出到控制台的代码。一般会删除new_business_df = new_business_df_tmp.select("user_id",F.when(F.expr("rate is null"), 0).otherwise(new_business_df_tmp.rate).alias("rate"),)# 对五级标签的rule规则按照中横杠进行切分new_five_tag_df = five_tag_df.select("id",F.split("rule", "-")[0].alias("start"),F.split("rule", "-")[1].alias("end"))# 将处理后的业务数据与五级标签关联result_df = new_business_df.join(new_five_tag_df).where("rate>=start and rate<end").select(new_business_df.user_id.alias("user_id"),new_five_tag_df.id.alias("tags_id_streaming"))return result_dfif __name__ == '__main__':tag_obj = ConversionPercent()tag_obj.execute(app_name="conversion_percent",partitions=2,four_tag_id=123)
二、业务库数据统计(了解)
1、数据实时采集(掌握)
1.1 采集方案
将业务数据从MySQL数据库采集到Kafka属于CDC(Change Data Capture)操作,实现CDC有两种方式
- 基于主动查询 CDC:用户通常会在数据源表的某个字段中,保存上次更新的时间戳或版本号等信息,然后下游通过不断的查询和与上次的记录做对比,来确定数据是否有变动,是否需要同步。
- 基于事件接收CDC:当数据源表发生变动时,会通过附加在表上的触发器或者 binlog (类似edits)等途径,将操作记录下来。下游可以通过数据库底层的协议,订阅并消费这些事件,然后对数据库变动记录做重放,从而实现同步。
基于主动查询的CDC | 基于Binlog的CDC | |
---|---|---|
开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否可以捕获所有数据变化 | 否 | 是 |
延迟性 | 高延迟 | 低延迟 |
是否增加数据库压力 | 是 | 否 |
1.2 MySQLCDC介绍
MySQL CDC 连接器允许读取 MySQL 数据库的快照数据和增量数据。
特性:
-
支持多种数据库:如MySQL、Oracle等。
-
零编码:自动建表和动态增删表,无需编写代码。
-
高效读取:先进行数据快照,再跟踪binlog变化。
-
确保一致性:实现exactly-once语义,即使在中断恢复情况下也不会出现数据重复。
原理介绍:https://www.cnblogs.com/yeyuzhuanjia/p/17462461.html
1.3 SeaTunnel配置文件
- 环境配置
将资料/jar包/SeaTunnel中的mysql-connector-java-8.0.28.jar 放到**/export/server/apache-seatunnel-2.3.5/lib**
确保打开了MySQL Binlog。在mysql中运行如下语句:
show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
应显示与下方相同:
如果显示不同,则需要修改 /etc/my.cnf
# 在[mysqld]下面添加如下代码:
server_id=1
log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30
binlog_row_image = FULL
gtid_mode = on
enforce_gtid_consistency = on# 重启mysql服务
systemctl restart mysqld
然后在mysql中重新运行查询语句。
- SeaTunnel配置文件
在/export/server/apache-seatunnel-2.3.5/config/job/目录下创建test_mysql_cdc.config文件。
cd /export/server/apache-seatunnel-2.3.5/config/job/
vim test_mysql_cdc.config内容如下:
env {parallelism = 1job.mode = "STREAMING"checkpoint.interval = 10000
}source {MySQL-CDC {base-url = "jdbc:mysql://up01:3306/hive_data"username = "root"password = "123456"start.mode = "initial"server-time-zone = "Asia/Shanghai"table-names = ["hive_data.shop_order"]table-names-config = [{table = "hive_data.shop_order"primaryKeys = ["id"]}]format = compatible_debezium_jsondebezium = {# include schema into kafka messagekey.converter.schemas.enable = falsevalue.converter.schemas.enable = false# include ddlinclude.schema.changes = true# topic prefixdatabase.server.name = "mysql_cdc"}}
}sink {kafka {topic = "test_topic"bootstrap.servers = "up01:9092"format = compatible_debezium_jsontransaction_prefix = "sot"kafka.config = {acks = "all"request.timeout.ms = 60000buffer.memory = 33554432}}
}
- 创建Kafka的Topic
cd /export/server/kafka/bin
./kafka-topics.sh --create --topic test_topic --bootstrap-server up01:9092./kafka-topics.sh --list --bootstrap-server up01:9092
- 启动SeaTunnel
cd /export/server/apache-seatunnel-2.3.5/
./bin/seatunnel.sh --config ./config/job/test_mysql_cdc.config -e local
- 启动Kafka的消费者
cd /export/server/kafka/bin./kafka-console-consumer.sh --bootstrap-server up01:9092 --topic test_topic --from-beginning./kafka-console-consumer.sh --bootstrap-server up01:9092 --topic mysql_cdc.hive_data.shop_order --from-beginning注意: MySQLCDC采集后输出到Kafka中的Topic实际上是【mysql_cdc.hive_data.shop_order】
- 在MySQL中执行如下语句模拟表数据内容发生改变:注意下面是一条完整的SQL
insert into hive_data.shop_order
select id,parent_order_no,order_id,is_split,platform_id,tid,source_type,source_name,store_no,city_id,city_name,region_code,order_status,order_status_desc,pay_type,trade_type,is_deleted ,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), order_create_time) as order_create_time,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), order_pay_time) as order_pay_time,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), create_time) as create_time,print_status,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), print_time) as print_time,stock_up_status,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), stock_up_time) as stock_up_time,order_type,express_type,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), receive_time) as receive_time,express_code,delivery_status,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), delivery_time) as delivery_time,pick_up_status,qr_code,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), pick_up_time) as pick_up_time ,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), complete_time) as complete_time,is_cancel,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), cancel_time) as cancel_time,cancel_reason,refund_status,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), refund_time) as refund_time,timestampadd(SECOND, timestampdiff(SECOND, last_update_time, now()), last_update_time) as last_update_time ,order_total_amount,product_total_amount,pack_amount,delivery_amount,discount_amount,seller_discount_amount,platform_allowance_amount,real_paid_amount,product_discount,real_product_amount,buyer_id,buyer_phone,buyer_remark,r_name,r_tel,r_province,r_city,r_district,r_address,r_zipcode,is_tuan_head,store_leader_id,order_group_no,commision_amount,settle_amount,points_amount,pay_point,balance_amount,pay_channel_amount,point_amount,sync_erp_status,sync_erp_msg
from hive_data.shop_order_bak
where id=462
2、窗口和Watermark(了解)
什么是窗口?
将流式计算转成批量计算。滚动窗口大小对应数据展示粒度。
什么是Watermark?
也称之为水印,或者叫水位线。作用是处理延迟/乱序到来的数据。
举例:在每5分钟计算时,他会计算当前时刻所在的窗口有哪些,然后再分别计算当前窗口内数据的聚合的值。
比如在12:10时,它分别位于12:00-12:10的窗口和12:05-12:15两个窗口,
然后会取到位于12:00-12:10的窗口里的数据,进行聚合计算。同时会取到位于12:05-12:15的窗口里的数据,进行聚合计算.
窗口函数的指定的方式。通过F.window()方法来指定
def window(timeColumn: "ColumnOrName",windowDuration: str,slideDuration: Optional[str] = None,startTime: Optional[str] = None,
) -> Column:
参数:
- timeColumn:用于做开窗的类型为时间戳的字段,类型必须是
pyspark.sql.types.TimestampType
- windowDuration:窗口的大小
- slideDuration:滑动的间隔,也就是隔多久计算一次
- 必须小于或等于windowDuration。不能大于是因为会出现数据丢失。
- 如果这个参数不设置,窗口就会变成滚动窗口(也就是滑动时间和窗口大小相同)
- startTime:开始窗口的偏移时间。比如:为了有每小时从15分钟开始的滚动窗口,例如12:15-13:15,13:15-14:15…,将startTime设置为15 minutes。
窗口函数的使用方式:把它作为分组的一个字段,写在groupBy()中即可。
windowedCounts = words.groupBy(F.window(words.timestamp, "10 minutes", "5 minutes"),words.word
).count()
3、业务数据介绍(了解)
- before:更新前的数据的情况。当op为c和r的时候,为null。
- after:更新后的数据的情况。当op为d的时候,为null。
- source:具体的元数据信息。
- op:具体的操作类型。
- c-create:对应insert数据插入
- u-update:对应update数据更新
- r-read:对应读取原始数据
- d-delete:对应delete数据删除
- ts_ms:操作的时间。
- transaction:事务相关的信息。
4、指标统计(了解)
因为离线标签是每天计算一次,无法捕获当天的购买情况。所以,需要统计一下每个用户12小时购买的单量、总金额、优惠金额和实付金额。
- Doris结果表建表语句
CREATE DATABASE IF NOT EXISTS log_analysis_db;
CREATE TABLE IF NOT EXISTS log_analysis_db.user_12hours_analysis
(user_id bigint comment '用户id',window_time varchar(50) comment '窗口统计时间',order_num int comment '订单数',order_total_amount decimal(27, 2) comment '订单总金额',discount_amount decimal(27, 2) comment '优惠金额',real_paid_amount decimal(27, 2) comment '实付金额'
)
UNIQUE KEY(user_id)
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES("replication_num" = "1");
- 创建Topic
cd /export/server/kafka/bin
./kafka-topics.sh --create --topic dws_shop_order_analysis --bootstrap-server up01:9092
- 代码
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, TimestampType
from tags.base.abstract_streaming_analysis import AbstractStreamingAnalysisclass User12HoursAnalysis(AbstractStreamingAnalysis):def analysis(self, parse_json_df: DataFrame):# 1- 解析JSON获取字段fields_df = parse_json_df.select(F.get_json_object("value","$.before.id").alias("before_id"),F.get_json_object("value","$.before.buyer_id").alias("before_buyer_id"),F.get_json_object("value","$.before.create_time").alias("before_create_time"),F.get_json_object("value","$.before.parent_order_no").alias("before_parent_order_no"),F.get_json_object("value","$.before.order_id").alias("before_order_id"),F.get_json_object("value","$.before.order_total_amount").alias("before_order_total_amount"),F.get_json_object("value","$.before.discount_amount").alias("before_discount_amount"),F.get_json_object("value","$.before.real_paid_amount").alias("before_real_paid_amount"),F.get_json_object("value", "$.after.id").alias("after_id"),F.get_json_object("value", "$.after.buyer_id").alias("after_buyer_id"),F.get_json_object("value", "$.after.create_time").alias("after_create_time"),F.get_json_object("value", "$.after.parent_order_no").alias("after_parent_order_no"),F.get_json_object("value", "$.after.order_id").alias("after_order_id"),F.get_json_object("value", "$.after.order_total_amount").alias("after_order_total_amount"),F.get_json_object("value", "$.after.discount_amount").alias("after_discount_amount"),F.get_json_object("value", "$.after.real_paid_amount").alias("after_real_paid_amount"),F.get_json_object("value", "$.op").alias("op"))# 2- 根据不同操作情况,获得字段json_df = fields_df.select(F.when(F.col('op') == 'd', F.col('before_id')).otherwise(F.col('after_id')).alias('id'),F.when(F.col('op') == 'd', F.col('before_parent_order_no')).otherwise(F.col('after_parent_order_no')).alias('parent_order_no'),F.when(F.col('op') == 'd', F.col('before_order_id')).otherwise(F.col('after_order_id')).alias('order_id'),F.when(F.col('op') == 'd', F.col('before_create_time')).otherwise(F.col('after_create_time')).cast(TimestampType()).alias('create_time'),(F.when(F.isnull('after_order_total_amount'), 0).otherwise(F.col('after_order_total_amount')) - F.when(F.isnull('before_order_total_amount'), 0).otherwise(F.col('before_order_total_amount'))).alias('order_total_amount'),(F.when(F.isnull('after_discount_amount'), 0).otherwise(F.col('after_discount_amount')) - F.when(F.isnull('before_discount_amount'), 0).otherwise(F.col('before_discount_amount'))).alias('discount_amount'),(F.when(F.isnull('after_real_paid_amount'), 0).otherwise(F.col('after_real_paid_amount')) - F.when(F.isnull('before_real_paid_amount'), 0).otherwise(F.col('before_real_paid_amount'))).alias('real_paid_amount'),F.when(F.col('op') == 'd', F.col('before_buyer_id')).otherwise(F.col('after_buyer_id')).alias('user_id'))# json_df.writeStream.format("console").outputMode("update").start().awaitTermination()"""京东订单(母单)子订单_1:一件商品 iphone -> 京东快递子订单_2:另一件商品 手机壳,第三方商家的商品 -> 中通/韵达。。。"""json_df = json_df.withColumn('order_sn', F.when(F.isnull('parent_order_no') | (F.col('parent_order_no') == ''),F.col('order_id')).otherwise(F.col('parent_order_no')))# 3- 指标计算# 进行watermark定义,并使用滑动窗口统计# 定义窗口时,需要将窗口转换成String类型# 每分钟对前面12小时内的数据统计一次 12:01 -> [00:00, 12:00);12:02 -> [00:01, 12:01)result_df = json_df.withWatermark('create_time', '10 minutes'). \groupBy('user_id',F.window('create_time', '12 hours', '1 minutes').cast(StringType()).alias('window_time')). \agg(F.approx_count_distinct('order_sn').alias('order_num'),F.sum('order_total_amount').alias('order_total_amount'),F.sum('discount_amount').alias('discount_amount'),F.sum('real_paid_amount').alias('real_paid_amount'))return result_dfif __name__ == '__main__':obj = User12HoursAnalysis()obj.execute(app_name="User12HoursAnalysis",partitions=2,read_topic="mysql_cdc.hive_data.shop_order",fields=None,doris_table="user_12hours_analysis",write_topic="dws_shop_order_analysis")
三、近期消费活跃度实时标签开发
获取到12小时内用户的消费数据后,可以根据客户的消费订单数、消费总金额数以及优惠金额和实付金额获取很多标签数据。如近12小时的消费活跃度(根据消费单量划分)、近12小时的动机偏好(根据优惠占比划分)、近12小时的消费水平(根据消费金额划分)。当然,也可以获取更多数据,进而计算更多的标签,如近12小时的品牌偏好、近12小时的商品偏好等等。
这里计算一个近期消费活动度标签。
根据用户12小时内的消费单量,如果有一单消费则为活跃,有2-3单为中活跃,大于等于4单则为高活跃。
1、需求分析
需要手动调整标签配置表:
inType=Kafka##nodes=up01:9092##table=dws_shop_order_analysis##selectFields=user_id,order_num##range=earliest
2、标签开发
import os
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import StringTypefrom tags.base.abstract_streaming_tag import AbstractStreamingTag# 绑定Python解释器和Spark安装路径
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'class ActiveTag(AbstractStreamingTag):def mark_tag(self, business_df:DataFrame, five_tag_df:DataFrame, rule_obj):# value字段类型转换type_cast_df = business_df.select(business_df.value.cast(StringType()).alias("value"))# JSON中提取字段:selectFields过滤我们打标签需要的业务数据字段fields = rule_obj.selectFields.split(",")parse_json_df = type_cast_df.select(F.json_tuple("value", *fields).alias(*fields))# 处理5级标签配置数据,按照横杠切分得到start和endnew_five_tag_df = five_tag_df.select("id",F.split("rule","-")[0].alias("start"),F.split("rule","-")[1].alias("end"))# 业务数据与5级标签配置数据关联result_df = parse_json_df.join(new_five_tag_df).where("order_num between start and end").select(parse_json_df["user_id"],new_five_tag_df["id"].alias("tags_id_streaming"))return result_dfif __name__ == '__main__':obj = ActiveTag()obj.execute(app_name="active_tag",partitions=2,four_tag_id=127)
四、ElasticSearch高级(掌握)
1、ElasticSearch整体架构
Kafka-> Topic -> 分区 -> 副本
HDFS -> 文件 -> block -> 副本
ES -> Index -> shard -> 副本
Master: 主节点
作用:1- 负责管理众多的从节点(datanode)。负责从节点的状态监控2- 维护集群的元数据信息: 索引、索引类型、属性、属性类型、分词器、分片(shards)、副本(replicas)等注意:1- ES可以分为集群和单机版。单机版的时候,该节点既是主节点也是从节点。也就是该节点既要负责管理集群元数据信息,又要负责数据的存储和查询操作2- 当是集群模式的时候,主节点是一个轻量级的节点,主要是负责集群元数据的管理工作。---------------------------------------------------------------------------------------DataNode: 从节点
作用:1- 负责处理客户端具体的请求操作。CRUD(增删改查的操作)2- 当数据有分片和副本的时候,写入数据到从节点的主副本的时候,负责将数据同步到其他从副本上注意: 从节点上面,存放的副本,可能是主副本,也可能是从副本。
ES中相关的专有名词:
index : 索引 表示索引库, 一个es下可以有多个索引库
settings : 主要对索引库分片 和副本的设置 , 默认 有 5分片 2副本
type : 在一个索引库下, 可以有多个type类型, 类似于在一个数据库中可以有多个表注意: 目前新的版本中, 已经只允许创建一个type(_doc)
filed: 字段, 从一个type下可以有个字段, 类似于 在一个表中可以有一个列
mapping: 映射关系, 主要是对字段进行相关的设置的比如: 字段类型 字段是否需要进行分词 字段的原始数据是否保存...
document: 文档, 表示的每一行的数据
cluster: 集群, es的集群
node: es的节点
shards : 分片 默认 一个索引库有 5个
replicas: 副本 默认 每一个分片 都有一个副本加上本身 为 2个
2、倒排索引(ms)
倒排索引:是一种数据结构,广泛用于搜索引擎和信息检索系统,以快速查找文档中包含特定关键词的记录。它的基本思想是将文档中的词语与包含这些词语的文档建立一种映射关系。
倒排:通过关键词找对应的文档
正排:一个文档中包含了哪些关键词
例如,假设我们有三个文档如下:
- 文档1:“The cat is on the mat”
- 文档2:“The dog is in the fog”
- 文档3:“The cat and the dog play”
步骤一:对文档进行分词处理,得到关键词,最终生成的倒排索引如下:
词项 | 倒排列表(文档ID) |
---|---|
the | 1, 2, 3 |
cat | 1, 3 |
is | 1, 2 |
on | 1 |
mat | 1 |
dog | 2, 3 |
in | 2 |
fog | 2 |
and | 3 |
play | 3 |
步骤二:假设我现在在百度搜索框中搜索了 the dog play cat
步骤三:对你的搜索内容分词处理得到关键词,关键词有:the、dog、play、cat
步骤四:根据关键词去倒排索引中找到关键词出现的文档有哪些,匹配结果如下
关键词 | 文档ID |
---|---|
the | 1, 2, 3 |
dog | 2, 3 |
play | 3 |
cat | 1, 3 |
步骤五:汇总匹配的结果,按照得分(可以粗略的理解为按匹配次数)进行降序,所以最终呈现给用户的网页结果是:3、2、1
3、ElasticSearch写入数据的流程
1- 客户端(restful API/Python API)进行数据写入操作,随机连接一台ES服务器。连接上谁,那么谁就成为了coordinating node(协调节点),并且也是该次请求的管理者。2- 协调节点计算当前写入的数据应该存储在哪个分片的主副本(因为只有主副本才负责数据的写入操作)上,底层是基于文档ID的Hash取模方案3- 判断成功以后,找到对应分片的主副本。如果该主副本就在当前的协调节点上,直接写入即可;如果不在当前的协调节点上,需要将请求转发给到对应的分片主副本所在的机器,然后写入数据4- 对应的主副本节点接收到数据写入请求以后,执行数据写入操作。写入成功以后,并且将数据同步给到其他从副本中5- 当主副本和从副本中对数据同步完成以后,最终将写入成功的响应通过协调节点返回给到客户端
4、ElasticSearch读取数据的流程
1- 客户端(restful API/Python API)进行数据查询操作,随机连接一台ES服务器。连接上谁,那么谁就成为了coordinating node(协调节点),并且也是该次请求的管理者。2- 根据查询的方案:2.1- 精确查询: 如果是基于文档ID进行查询,此时会计算当前这个ID对应的数据存放在哪个分片上。接着将请求转发给到对应分片副本所在的机器2.2- 模糊查询: 如果不是基于文档ID进行查询,例如通过文本内容关键字查询。此时协调节点会将该数据查询请求广播给到其他的所有节点。由各个节点查询自己服务器上的数据,并且将查询到的结果数据汇聚到协调节点3- 由协调节点负责汇总数据,并且对数据进行全局排序,是全局的降序排序,也就是按照得分进行降序4- 最后由协调节点将查询结果返回给到客户端