SpringBoot + MyBatis 实现号段模式的分布式ID

ops/2024/12/14 11:21:02/

号段模式是一种常见的 ID 生成策略,在高并发场景中广泛应用。其核心思想是,发号服务每次从数据库获取一批 ID,并将这些 ID 缓存到本地。业务系统每次请求 ID 时,首先会判断本地缓存是否有可用的 ID。如果有,则直接分配给请求方;如果没有,则重新从数据库中批量获取 ID。通过这种方式,减少了频繁访问数据库的压力。

1. 优缺点分析
1.1 优点
• 减少数据库访问压力:在号段模式下,发号服务不需要每次请求都访问数据库,这大大减少了数据库的负载,提高了系统的性能。

• 提高系统可用性与可靠性:由于发号服务能缓存 ID,当数据库出现短时故障时,系统仍能正常运行,增强了系统的容错性。

• 扩展性强:随着业务的拓展和系统的增长,号段模式便于实现分库分表,支持更多的 ID 生成需求。例如,在处理高并发订单号生成时,可以根据业务需求快速扩展生成规则。

• 易于调整:随着业务的变化,可以灵活地调整号段的大小或增加号段的数量,适应不同的需求变化。

1.2. 缺点
• 依赖数据库:号段模式依赖数据库来管理 ID 的分配,数据库本身的性能和稳定性会直接影响 ID 生成服务的可用性。如果数据库负载过重或发生故障,可能会导致 ID 生成服务的瓶颈。

• ID 无业务含义:虽然号段生成的 ID 高效且简洁,但其值通常是纯数字,无法携带任何业务信息,因此对于某些业务场景(如订单号、用户编号等),其可读性和业务关联性较差。

2. 应用场景
号段模式特别适用于中等并发量的场景,尤其是在不想引入额外中间件(如 Redis)时。它常用于生成订单号、支付流水号等需要高效生成且不频繁变更的 ID。例如,在订单管理系统中,当系统订单量大幅增长时,号段模式可以通过简单的横向扩展来满足业务需求,而无需大规模修改系统架构。

3. 代码实现示例
以下是基于号段模式生成 ID 的具体实现。该方案可以集成在微服务中,数据表和代码逻辑简单易懂,适合用于生成订单号等高并发需求场景。

3.1 数据库设计
首先,在数据库中创建一张 code_seq 表,用于存储每个号段的基本信息。

drop TABLE IF EXISTS `code_seq`;create TABLE `code_seq` (`env` VARCHAR(10) NOT NULL COMMENT '环境编号',`prefix` INT(10) NOT NULL COMMENT 'code前缀',`seq` INT(10) NOT NULL COMMENT '当前序列',`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY(`prefix`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT 'CODE编号表';

通过向 code_seq 表中插入多条数据,我们可以为每个环境(如 T1)定义多个不同的 prefix,这些 prefix 用于生成不同的 ID。

INSERT INTO code_seq (env, prefix, seq)
VALUES('T1', 1001, 0),('T1', 1002, 0),('T1', 1003, 0),('T1', 1004, 0),('T1', 1005, 0),('T1', 1006, 0),('T1', 1007, 0),('T1', 1008, 0),('T1', 1009, 0),('T1', 1010, 0);

3.2 代码设计
整体UML类图设计如下
在这里插入图片描述

• CodeHandler:主类,提供初始化和生成Code方法。主要采用并发原子类减少并发依赖,从数据库可以捞取一批codePrefixSet,在code缓存不到UPDATE_THRESHOLD比例时,会预加载下一批code到缓存中,其代码如下:

public class CodeHandler {private static final double UPDATE_THRESHOLD = 0.9f;private final String name;private final Set<Integer> codePrefixSet = new ConcurrentSkipListSet<>();private final BlockingQueue<CodeSegment> codeSegmentBlockingQueue = new LinkedBlockingQueue<>(1);private final AtomicBoolean needLoading = new AtomicBoolean(false);private final AtomicBoolean statusHealthy = new AtomicBoolean(false);private final String idc = "T1";private CodeSeqService codeSeqService;private CodeTpsMonitor codeTpsMonitor;private Executor loadingExecutor;public CodeHandler(String name) {this.name = name;}public void setCodeSeqService(CodeSeqService codeSeqService) {this.codeSeqService = codeSeqService;}public void init() {this.loadingExecutor = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder().namingPattern("order-handle-" + name).daemon(true).priority(Thread.MAX_PRIORITY).build());this.codeTpsMonitor = new CodeTpsMonitor(name);this.codeTpsMonitor.start();initQueue();}private void initQueue() {CodeSeqRet codeSeqRet = loadCodeAlloc();try {codeSegmentBlockingQueue.put(new CodeSegment(codeSeqRet.getPrefix(), codeSeqRet.getEnd(), codeSeqRet.getStart()));} catch (InterruptedException e) {log.error("initQueue put error", e);}}private CodeSeqRet loadCodeAlloc() {CodeSeqRet result = null;try {loadCodePrefix();int retryCount = 0;while (Objects.isNull(result) && !codePrefixSet.isEmpty() && retryCount <= 2) {int index = SecureRandomUtil.getInstance().nextInt(codePrefixSet.size());Integer prefix = (codePrefixSet.toArray(new Integer[codePrefixSet.size()]))[index];CodeSeqRet codeSeqRet = codeSeqService.generateCodeByPrefix(idc, prefix, codeTpsMonitor.getStep().get());int retCode = Objects.isNull(codeSeqRet) ? 1 : codeSeqRet.getResult();if (retCode == 0) {result = codeSeqRet;} else if (retCode == 1) {this.codePrefixSet.remove(prefix);if (this.codePrefixSet.isEmpty()) {loadCodePrefix();}} else {retryCount++;}}} finally {this.statusHealthy.set(result != null);}if (Objects.isNull(result)) {throw new IllegalStateException("load code from db error!");}return result;}private void loadCodePrefix() {if (!codePrefixSet.isEmpty()) {return;}codePrefixSet.addAll(codeSeqService.selectPrefixByEnv(idc));if (codePrefixSet.isEmpty()) {throw new IllegalStateException("no code prefix,plz check db config.");}}public String getCode() {this.codeTpsMonitor.increase();String code = null;int retryNum = 0;while (Objects.isNull(code)) {CodeSegment curSegment = this.codeSegmentBlockingQueue.peek();if (Objects.nonNull(curSegment)) {if (curSegment.getIdle() <= UPDATE_THRESHOLD * codeTpsMonitor.getStep().get() && this.needLoading.compareAndSet(false, true)) {this.loadingExecutor.execute(new CodeLoader());}code = curSegment.getCode();if (Objects.isNull(code)) {this.codeSegmentBlockingQueue.poll();}} else {if (!this.statusHealthy.get() || retryNum > 2) {throw new IllegalStateException("create code failed,no available codes.");}}retryNum++;if (Objects.isNull(code)) {LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));}}return code;}private class CodeLoader implements Runnable {@Overridepublic void run() {CodeSeqRet codeSeqRet = null;while (Objects.isNull(codeSeqRet)) {try {codeSeqRet = loadCodeAlloc();} catch (Exception e) {log.error("load code error.", e);LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));}}try {CodeHandler.this.codeSegmentBlockingQueue.put(new CodeSegment(codeSeqRet.getPrefix(), codeSeqRet.getEnd(), codeSeqRet.getStart()));} catch (InterruptedException e) {log.error("CodeLoader put queue error", e);}CodeHandler.this.needLoading.set(false);}}
}

• CodeSegment:封装了当前号段的信息,提供了获取下一个 ID 的功能。

public class CodeSegment {private final int prefix;private final int maxSeq;private AtomicLong curSequence;public CodeSegment(int prefix,int maxSeq,long start) {this.prefix = prefix;this.maxSeq = maxSeq;curSequence = new AtomicLong(start);}public String getCode() {long value = curSequence.getAndIncrement();if (value <= maxSeq) {return prefix + String.format(Locale.ENGLISH,"%07d",value);} else {return null;}}public long getIdle() {return maxSeq - curSequence.get() + 1;}
}

• CodeTpsMonitor:监控当前的 TPS,可以根据CODE生成值动态调整预获取CODE的批量值

public class CodeTpsMonitor implements Runnable {public static final int INITIAL_BATCH_COUNT = 100;private static final int MAX_BATCH_COUNT = 1000;private final AtomicInteger step = new AtomicInteger(INITIAL_BATCH_COUNT);private final String name;private AtomicInteger count = new AtomicInteger(0);private long startTime;private ScheduledExecutorService scheduledExecutorService;public CodeTpsMonitor(String name) {this.name = name;}public void start() {scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("check-" + name + "-order-thread").daemon(true).build());scheduledExecutorService.scheduleWithFixedDelay(this, 1, 5, TimeUnit.SECONDS);}public void increase() {count.incrementAndGet();}public AtomicInteger getStep() {return step;}@Overridepublic void run() {//重置count和时间long start = startTime;int reqNum = count.getAndSet(0);startTime = System.currentTimeMillis();long timeCost = startTime - start;final long tps = reqNum * 1000 / timeCost;int newBatchCount;if (tps < INITIAL_BATCH_COUNT) {newBatchCount = INITIAL_BATCH_COUNT;} else if (tps > MAX_BATCH_COUNT) {newBatchCount = MAX_BATCH_COUNT;} else {newBatchCount = (int) tps;}step.set(newBatchCount);}
}

• CodeSeqService:与数据库交互,负责从数据库中获取和更新 ID。

public class CodeSeqService {private static final int MAX_SEQ = 999999999;@Resourceprivate CodeSeqMapper codeSeqMapper;public List<Integer> selectPrefixByEnv(String env) {return codeSeqMapper.selectPrefixByEnv(env, MAX_SEQ);}@Transactional(timeout = 300, isolation = Isolation.REPEATABLE_READ, rollbackFor = Throwable.class)public CodeSeqRet generateCodeByPrefix(String env, Integer prefix, Integer step) {log.info("generateCodeByPrefix begin env={},prefix={},step={}", env, prefix, step);//加上行锁CodeSeq codeSeq = codeSeqMapper.selectCodeSeqByPrefix(env, prefix);CodeSeqRet codeSeqRet = new CodeSeqRet();codeSeqRet.setResult(0);codeSeqRet.setPrefix(prefix);if (Objects.isNull(codeSeq) || Objects.isNull(codeSeq.getSequence())) {codeSeqRet.setResult(1);return codeSeqRet;}if (codeSeq.getSequence() > MAX_SEQ) {codeSeqRet.setResult(1);return codeSeqRet;}if (MAX_SEQ - codeSeq.getSequence() + 1 < step) {codeSeqRet.setStart(codeSeq.getSequence());codeSeqRet.setEnd(MAX_SEQ);} else {codeSeqRet.setStart(codeSeq.getSequence());codeSeqRet.setEnd(codeSeq.getSequence() + step - 1);}codeSeq.setSequence(codeSeqRet.getEnd() + 1);int ret = codeSeqMapper.updateCodeSeqByPrefix(codeSeq);if (ret <= 0) {log.info("update error,ret={},codeSeq={}", ret, codeSeq);codeSeqRet.setResult(2);return codeSeqRet;}return codeSeqRet;}
}    

• CodeSeqRet : 实体

public class CodeSeqRet {private int prefix;/*** 0:正常 1:无可用 2:失败*/private Integer result;private Integer start;private Integer end;
}

• CodeSeqMapper : Mybaits接口

public interface CodeSeqMapper {List<Integer> selectPrefixByEnv(@Param("env") String env,@Param("maxSeq") Integer maxSeq);CodeSeq selectCodeSeqByPrefix(@Param("env") String env, @Param("prefix") Integer prefix);int updateCodeSeqByPrefix(CodeSeq codeSeq);
}

• CodeSeqMapper:Mybatis XML文件

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.toby.dynamic.data.source.db.dao.config.CodeSeqMapper"><resultMap id="paramConfig" type="com.toby.dynamic.data.source.db.model.CodeSeq"><result column="env" property="env" jdbcType="VARCHAR"/><result column="prefix" property="prefix" jdbcType="INTEGER"/><result column="seq" property="sequence" jdbcType="INTEGER"/></resultMap><select id="selectPrefixByEnv" resultType="java.lang.Integer">select`prefix`from code_seq where `env`=#{env} and `seq` &lt;= #{maxSeq};</select><select id="selectCodeSeqByPrefix" resultMap="paramConfig">select `env`,`prefix`,`seq` from code_seq where `env` = #{env} and `prefix` = #{prefix} for update;</select><update id="updateCodeSeqByPrefix" parameterType="com.toby.dynamic.data.source.db.model.CodeSeq" >update code_seq set `seq` = #{sequence} where `env` = #{env} and `prefix` = #{prefix};</update>
</mapper>

最终TEST用例调用如下:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
@Slf4j
public class CodeTest {@Autowiredprivate CodeSeqService codeSeqService;@Testpublic void createCodeTestCase01() {log.info("createCodeTestCase01 begin.");CodeHandler codeHandler = new CodeHandler("code-create");codeHandler.setCodeSeqService(codeSeqService);codeHandler.init();ExecutorService executorService = Executors.newFixedThreadPool(5, new ThreadFactory() {private volatile int index = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "t" + (index++));}});CountDownLatch countDownLatch = new CountDownLatch(5);executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));executorService.submit(new CreateCodeJob(codeHandler, 200, countDownLatch));
//        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));try {countDownLatch.await();} catch (InterruptedException e) {log.error("countDownLatch.await() error", e);}executorService.shutdown();}private static class CreateCodeJob implements Runnable {private CodeHandler codeHandler;private int times;private CountDownLatch countDownLatch;public CreateCodeJob(CodeHandler codeHandler, int times, CountDownLatch countDownLatch) {this.codeHandler = codeHandler;this.times = times;this.countDownLatch = countDownLatch;}@Overridepublic void run() {while (times > 0) {String code = codeHandler.getCode();log.info("threadName:{},code={}", Thread.currentThread().getName(), code);times--;}countDownLatch.countDown();}}
}

http://www.ppmy.cn/ops/141801.html

相关文章

Not using native diff for overlay2, this may cause degraded performance……

问题现象 案例&#xff1a;Anolis 8.9&#xff08;4.19.91-26.an8.x86_64&#xff09; Overlay2存储驱动程序&#xff09; 当我们安装好Docker之后&#xff0c;通过systemctl status docker -l 会发现有一个告警信息&#xff1a;levelwarning msg"Not using native dif…

Maven、mybatis框架

一、Maven介绍 1.概念&#xff1a; Maven项目对象模型(POM)&#xff0c;可以通过一小段描述信息来管理项目的构建&#xff0c;报告和文档的项目管理工具软件。 2.为啥使用maven: 之前项目中需要引入大量的jar包。这些jar从网上下载&#xff0c;可能下载地址不同意。这些jar之间…

AI开源南京分享会回顾录

AI 开源南京分享会&#xff0c;已于2024年11月30日下午在国浩律师&#xff08;南京&#xff09;事务所5楼会议厅成功举办。此次活动由 KCC南京、PowerData、RISC-Verse 联合主办&#xff0c;国浩律师&#xff08;南京&#xff09;事务所协办。 活动以“开源视角的 AI 对话”为主…

TcpServer 服务器优化之后,加了多线程,对心跳包进行优化

TcpServer 服务器优化之后&#xff0c;加了多线程&#xff0c;对心跳包进行优化 TcpServer.h #ifndef TCPSERVER_H #define TCPSERVER_H#include <iostream> #include <winsock2.h> #include <ws2tcpip.h> #include <vector> #include <map> #…

C语言(指针基础练习)

删除数组中的元素 数组的元素在内存地址中是连续的&#xff0c;不能单独删除数组中的某个元素&#xff0c;只能覆盖。 #include <stdio.h> #include <stdbool.h>// 函数声明 int deleteElement(int arr[], int size, int element);int main() {int arr[] {1, 2, 3…

图形学笔记 - 5. 光线追踪 - RayTracing

Whitted-Style Ray tracing 为什么要光线追踪 光栅化不能很好地处理全局效果 软阴影尤其是当光线反射不止一次的时候 栅格化速度很快&#xff0c;但质量相对较低 光线追踪是准确的&#xff0c;但速度很慢 光栅化&#xff1a;实时&#xff0c;光线追踪&#xff1a;离线~10K …

ios上架构建版本没苹果电脑怎么上传

在app store上架的时候&#xff0c;遇到下图的问题&#xff1a; 点击蓝色加号的时候&#xff0c;并没有构建版本可以选择 从图中可以看出&#xff0c;它给我们推荐了很多上传工具&#xff0c;比如xcode、transporter或命令行工具之类的&#xff0c;但是这些工具都是只能在苹果…

Debedium如何忽略Oracle的purge命令

报错 截至目前3.0版本&#xff0c;Debezium的Oracle Connector并不支持purge table这个指令。 所以&#xff0c;在使用Debezium解析Oracle变更的时候&#xff0c;如果在源端执行了类似 purge table "$BIN… 的语句&#xff0c;就会导致Debezium罢工&#xff0c;日志里显…