Kafka学习---2、kafka生产者、异步和同步发送API、分区、生产经验

news/2024/10/30 23:24:09/

1、kafka生产者

1.1 生产者消息发送流程

1.1.1 发送原理

在消息发生的过程中,设计到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
在这里插入图片描述

  • batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k
  • linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值数0ms,表示没有延迟。

应答acks:

  • 0:生产者发生过来的数据,不需要等数据落盘应答。
  • 1:生产者发生过来的数据,Leader收到数据后应答
  • -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。
1.1.2 生产者重要参数列表
参数名称描述
bootstrap.servers生产者连接集群所需的Broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或多个,中间用逗号隔开。注意这里并非需要所有broker地址,因为生产者从给定的broker里查到其他broker信息
key.serializer和value.serializer指定发生信息的key和value的序列化类型。一定要写全类名
buffer.memoryRecordAccumulator缓冲区总大小,默认32MB
batch.size缓冲区一批数据最大值,默认16K。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms如果数据迟迟未到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小5-100ms之间
acks0:生产者发生过来的数据,不需要等数据落盘应答。1: 生产者发送过来的数据,Leader收到数据后应答。-1(all):生产者发给过来的数据,Leader和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的
max.in.flight.requests.per.connection允许最多没有返回ack的次数,默认为5,开启幂等性包保证该值是1-5的数字
retries当消息发给出现错误的时候,系统会重发消息。retries表示重试的次数。默认是int的最大值,2147483647.如果设置了重试,还想抱着消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms。
enable.idempotence是否开启幂等性,默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

1.2 异步发送API

1.2.1 普通异步发送

1、需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
2、代码编写
(1)创建工程(KafkaDemo)
(2)导入依赖

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency>
</dependencies>

(3)创建包名org.zhm.producer
(4)编写不带回调函数的API代码

package org.zhm.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** @ClassName CustomProducer* @Description TODO* @Author Zouhuiming* @Date 2023/6/12 18:35* @Version 1.0*/
public class CustomProducer {public static void main(String[] args) {//1、创建kafka生产者的配置对象Properties properties=new Properties();//2、给kafka配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");//key,value序列化(必须):key.serializer,value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//3、创建kafka生产者对象KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);//4、调用send()方法,发生消息for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first","zhm"+i));}//5、关闭资源kafkaProducer.close();}
}

(5)测试
①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

在这里插入图片描述
②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
在这里插入图片描述

1.2.2 带回调函数的异步发送

回调函数会在Producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息·(Exception),如果Exception为null,说明消息发生成功,如果Exception不为null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package org.zhm.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @ClassName CustoProducerCallback* @Description TODO* @Author Zouhuiming* @Date 2023/6/12 18:44* @Version 1.0*/
public class CustoProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建kafka生产者的配置对象Properties properties=new Properties();//2、给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");//key、value序列化(必须)properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3、创建kafka生产者对象KafkaProducer<String,String> producer=new KafkaProducer<>(properties);//4、调用send()方法 发送信息for (int i = 0; i < 6; i++) {//添加回调producer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() {//该方法在Producer收到ack时调用,为异步调用@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){//没有异常,输出信息到控制台System.out.println("主题:"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());}else {//出现异常打印e.printStackTrace();}}});//延迟一会会看到数据发往不同分区Thread.sleep(20);}//5、关闭资源producer.close();}
}

1、测试
①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
在这里插入图片描述

1.3 同步发送API

只需在异步发送的基础上,再调用一下 get()方法即可。

package org.zhm.producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** @ClassName CustomProducerSync* @Description TODO* @Author Zouhuiming* @Date 2023/6/12 18:58* @Version 1.0*/
public class CustomProducerSync {public static void main(String[] args) throws ExecutionException, InterruptedException {//1、创建kafka生产者的配置对象Properties properties=new Properties();//2、给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");//key、value序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3、创建kafka生产者对象KafkaProducer<String,String> producer=new KafkaProducer<>(properties);//4、调用send方法,发送信息for (int i = 0; i < 10; i++) {//异步发送 默认
//            producer.send(new ProducerRecord<>("first","zhm"+i));//同步发送producer.send(new ProducerRecord<>("first","zhmzhm"+i)).get();}//5、关闭资源producer.close();}
}

1、测试
①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

在这里插入图片描述

1.4 生产者分区

1.4.1 分区好处

1、便于合理使用储存资源,每个Partition在一个Broker上储存,可以把海量的数据按照分区切割成一块一块数据储存在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
2、提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。
在这里插入图片描述

1.4.2 生产者发生消息的分区

1、默认分区器DefaultPartitioner
(1)指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0。
(2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。
(3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直
使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进
行使用(如果还是0会继续随机)。
2、案例一
将数据发往指定 partition 的情况

package org.zhm.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @ClassName CustomProducerCallbackPartitions* @Description TODO* @Author Zouhuiming* @Date 2023/6/12 19:10* @Version 1.0*/
public class CustomProducerCallbackPartitions {public static void main(String[] args) {//1、创建kafka生产者的配置对象Properties properties=new Properties();//2、给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");//键值序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//3、创建生产者对象KafkaProducer<String ,String> producer=new KafkaProducer<String, String>(properties);//4、调用send方法,发送信息for (int i = 0; i < 5; i++) {//指定数据发送到1号分区,key1为空producer.send(new ProducerRecord<>("first", 1, "", "zhm" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("主题:"+recordMetadata.topic()+"->"+"分区:"+recordMetadata.partition());}else {e.printStackTrace();}}});}//5、关闭资源producer.close();}
}

(1)测试
①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
在这里插入图片描述
3、案例二
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

package org.zhm.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @ClassName CustomProducerCallback1* @Description TODO* @Author Zouhuiming* @Date 2023/6/12 19:21* @Version 1.0*/
public class CustomProducerCallback1 {public static void main(String[] args) {Properties properties=new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String,String> kafkaProducer=new KafkaProducer(properties);for (int i = 0; i < 5; i++) {//依次指定key值为a、b、f,数据key的hash值与3分别发往1、2、0kafkaProducer.send(new ProducerRecord<>("first", "a", "zhm" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("当key为a时:"+"主题:"+recordMetadata.topic()+"分区:"+recordMetadata.partition());}else {e.printStackTrace();}}});}for (int i = 0; i < 5; i++) {//依次指定key值为a、b、f,数据key的hash值与3分别发往1、2、0kafkaProducer.send(new ProducerRecord<>("first", "b", "zhm" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("当key为b时:"+"主题:"+recordMetadata.topic()+"分区:"+recordMetadata.partition());}else {e.printStackTrace();}}});}for (int i = 0; i < 5; i++) {//依次指定key值为a、b、f,数据key的hash值与3分别发往1、2、0kafkaProducer.send(new ProducerRecord<>("first", "f", "zhm" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("当key为f时:"+"主题:"+recordMetadata.topic()+"分区:"+recordMetadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

(1)测试
①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
在这里插入图片描述

1.4.3 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器
1、例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。
2、案例实现
(1)定义类实现 Partitioner 接口。
(2)重写 partition()方法。

package org.zhm.producer;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;/*** @ClassName Mypartitioner* @Description TODO* @Author Zouhuiming* @Date 2023/6/12 19:28* @Version 1.0*//**1、实现接口Partitioner2、实现三个方法:Partition、close、configure3、编写Partition方法,返回分区号*/
public class MyPartitioner implements Partitioner {/*** @description:返回信息对应的分区* @author: zouhuiming* @date: 2023/6/12 19:30* @param: [s, o, bytes, o1, bytes1, cluster]* [主题、消息的key、消息的key序列化后的字节数组、消息的value、消息的value序列哈后字节数组、集群元数据可以查看的分区信息]* @return: int**/@Overridepublic int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {//获取信息String msyValue = o1.toString();//创建partitionint partition;//判断信息是否包含zhmif (msyValue.contains("zhm")){partition=0;}else {partition=1;}//返回分区号return partition;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

(3)使用分区器的方法,在生产者的配置中添加分区器参数。

package org.zhm.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** @ClassName CustomProducerCallbackPartitionsMine* @Description TODO* @Author Zouhuiming* @Date 2023/6/12 19:35* @Version 1.0*/
public class CustomProducerCallbackPartitionsMine {public static void main(String[] args) {Properties properties=new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//添加自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"org.zhm.producer.MyPartitioner");KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "zhm" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("主题:"+recordMetadata.topic()+"分区:"+recordMetadata.partition());}else {e.printStackTrace();}}});}for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<>("first", "hello" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e==null){System.out.println("主题:"+recordMetadata.topic()+"分区:"+recordMetadata.partition());}else {e.printStackTrace();}}});}kafkaProducer.close();}
}

(4)测试
①在 hadoop102 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

②在 IDEA 控制台观察回调信息。
在这里插入图片描述

1.5 生产经验——生产者如何提高吞吐量

  • batch.size:批次大小,默认16k
  • linger.ms:等待时间,修改为5-100ms
  • compression.type:压缩snappy
  • RecordAccumulator:缓存区大小,修改1为64MB

1.6 生产经验——数据可靠性

1、ack应答原理
在这里插入图片描述
在这里插入图片描述
可靠性总结:

  • acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
  • acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
  • acks=-1(all),,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;
    在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

数据重复分析
在这里插入图片描述

1.7 生产经验——数据去重

1.7.1 数据传递语义
  • 至少一次(At Least Once) =ACK级别设置为-1+分区副本数大于等于2+ISR里应答的最小副本数量大于等于2
  • 最多一次(At Most Once)=ACK级别设置为0
  • 总结
    • At Least Once可以保证数据不丢失,但是不能保证数据不重复;
    • At Most Once可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。
1.7.2 幂等性

幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。
精确一次(Exactly Once) = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID是Kafka每次重启都会分配一个新的;Partition表示分区号;Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复。
在这里插入图片描述
如何启用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭

1.7.3 生产者事务

1、Kafka事务原理
注意:开启事务,必须开启幂等性
在这里插入图片描述
2、Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

1.8 生产经验——数据有序

在这里插入图片描述

1.8 生产检验——数据乱序

1、kafka在1.x版本之前保证数据单分区有序,条件如下:
max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。
2、kafka在1.x及以后版本保证数据单分区有序,条件如下:
(1)未开启幂等性
max.in.flight.requests.per.connection需要设置为1。
(2)开启幂等性
max.in.flight.requests.per.connection需要设置小于等于5。
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。
在这里插入图片描述


http://www.ppmy.cn/news/362122.html

相关文章

【K8SRockyLinux】基于开源操作系统搭建K8S高可用集群(详细版)

文章目录 一、实验节点规划表&#x1f447;二、实验版本说明&#x1f4c3;三、实验拓扑&#x1f4ca;四、实验详细操作步骤&#x1f579;️1. 安装Rocky Linux开源企业操作系统2. 所有主机系统初始化3. 所有master节点部署keepalived4. 所有master节点部署haproxy5. 所有节点配…

【滤波】无迹卡尔曼滤波

本文主要翻译自rlabbe/Kalman-and-Bayesian-Filters-in-Python的第10章节10-Unscented-Kalman-Filter&#xff08;无迹卡尔曼滤波&#xff09;。 %matplotlib inline# format the book import book_format book_format.set_style()前文 在上一章中&#xff0c;我们讨论了非线…

机器学习实战 第16周|无监督学习与数据预处理|9:20~11:00

目录 一、无监督学习 无监督学习的类型 *1.1 降维动机 *1.2 降维弊端 无监督学习的挑战

特斯拉/奔驰/大众「押宝」中国,高阶智能驾驶迎来新增长周期

高阶智能驾驶&#xff0c;再次迎来风口。 本周&#xff0c;梅赛德斯奔驰正式对外宣布&#xff0c;满足L3级要求的自动辅助驾驶系统&#xff08;DRIVE PILOT&#xff09;已获得在美国加利福尼亚州公开道路正式运行的认证。这是该套系统在2022年5月于德国获批高速公路运行、以及…

语音消息实现(聊天向)

语音消息实现记录下&#xff1a; SpringBootVue3客服项目&#xff0c;网页录制发送语音消息 Vue中使用js-audio-recorder插件实现录音功能并实现上传Blob数据到SpringBoot后台接口 2fps/recorder github地址 Recorder的API 语音测试地址 [Web] 4分钟搭建一个简洁好看的 We…

ESXi 7.0 U3m Lenovo (联想) 定制版 OEM Custom Installer CD

VMware ESXi 7.0 Update 3m - 领先的裸机 Hypervisor (All OEM Customized Installer CDs) ESXi 7.0 U3m Standard (标准版) ESXi 7.0 U3m Dell (戴尔) 定制版 OEM Custom Installer CD ESXi 7.0 U3m HPE (慧与) 定制版 OEM Custom Installer CD ESXi 7.0 U3m Lenovo (联想) 定…

威固窗膜,给车主朋友带来不一样的驾车体验~

相信很多车主朋友将爱车买回来后&#xff0c;都会将其装饰一番&#xff0c;比如车窗贴膜、换个脚垫、坐垫等等&#xff0c;尤其汽车窗膜在近几年来可谓是异常火热。 因为从《中国消费者》杂志社与伊士曼&#xff08;中国&#xff09;投资管理公司联合发布的《2022年度汽车膜消费…

怎样区分汽车脚垫和汽车地胶?

汽车脚垫是汽车必备的汽车用品之一&#xff0c;但在汽车脚垫的分类中&#xff0c;汽车脚垫和汽车地胶是两种有相同点&#xff0c;但本质上又截然不同的产品。对于很多外行来说&#xff0c;一开始对汽车地胶和汽车脚垫的概念不太容易分清和混淆&#xff0c;以致在选购脚垫时&…