美团Leaf分布式ID生成算法深度解析与源码实现
前言
在分布式系统中,全局唯一ID的生成是核心基础服务。美团点评(现美团)针对Snowflake算法在运维场景中的痛点,研发了Leaf分布式ID生成系统。本文将从设计原理、源码实现、优化策略等角度深入剖析Leaf算法。
一、分布式ID生成方案对比
常见方案对比
方案 | 优点 | 缺点 |
---|---|---|
UUID | 简单 | 无序、字符串存储效率低 |
数据库自增ID | 简单可靠 | 性能瓶颈、扩展困难 |
Redis生成ID | 性能较好 | 依赖Redis、数据持久化问题 |
Snowflake | 趋势递增、高性能 | 时钟回拨问题、机器位限制 |
Leaf核心优势
- 高可用性(双号段缓存 + Zookeeper容错)
- 解决时钟回拨问题
- 支持HTTP/API多协议接入
- 可视化监控支持
二、Leaf算法核心原理
Leaf包含两种模式,可独立部署也可混合使用:
2.1 Leaf-Segment(号段模式)
实现原理:
- 使用数据库代理(号段表)分配ID区间
- 双Buffer异步加载机制
- 号段耗尽时自动切换Buffer
采用双buffer的方式,Leaf服务内部有两个号段缓存区segment。当前号段已下发10%时,如果下一个号段未更新,则另启一个更新线程去更新下一个号段。当前号段全部下发完后,如果下个号段准备好了则切换到下个号段为当前segment接着下发,循环往复。
- 每个biz-tag都有消费速度监控,通常推荐segment长度设置为服务高峰期发号QPS的600倍(10分钟),这样即使DB宕机,Leaf仍能持续发号10-20分钟不受影响。
- 每次请求来临时都会判断下个号段的状态,从而更新此号段,所以偶尔的网络抖动不会影响下个号段的更新。
源码深度解析
public class SegmentIDGenImpl implements IDGen {private static final Logger logger = LoggerFactory.getLogger(SegmentIDGenImpl.class);/*** IDCache未初始化成功时的异常码*/private static final long EXCEPTION_ID_IDCACHE_INIT_FALSE = -1;/*** key不存在时的异常码*/private static final long EXCEPTION_ID_KEY_NOT_EXISTS = -2;/*** SegmentBuffer中的两个Segment均未从DB中装载时的异常码*/private static final long EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL = -3;/*** 最大步长不超过100,0000*/private static final int MAX_STEP = 1000000;/*** 一个Segment维持时间为15分钟*/private static final long SEGMENT_DURATION = 15 * 60 * 1000L;// 最多5个线程,也就是最多5个任务同时执行(因为可能有多个tag,如果tag 只有1、2个,那么没必要5个线程);idle时间是60s;// SynchronousQueue意思是,只能有一个线程执行一个tag的任务,立即执行,执行完立即获取;其他的都只能阻塞式等待private ExecutorService service = new ThreadPoolExecutor(5, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new UpdateThreadFactory());private volatile boolean initOK = false;// 注意它包含了所有的SegmentBuffer,k是业务类型,v是对应的// 全局缓存private Map<String, SegmentBuffer> cache = new ConcurrentHashMap<String, SegmentBuffer>();private IDAllocDao dao;public static class UpdateThreadFactory implements ThreadFactory {private static int threadInitNumber = 0;private static synchronized int nextThreadNum() {return threadInitNumber++;}@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "Thread-Segment-Update-" + nextThreadNum());}}@Overridepublic boolean init() {logger.info("Init ...");// 确保加载到kv后才初始化成功updateCacheFromDb();initOK = true;updateCacheFromDbAtEveryMinute();return initOK;}private void updateCacheFromDbAtEveryMinute() {// 顾名思义, 每分钟执行一次。执行什么? 执行 updateCacheFromDb方法ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("check-idCache-thread");t.setDaemon(true);return t;}});service.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {updateCacheFromDb();}}, 60, 60, TimeUnit.SECONDS);}// 通过数据库 来更新cache缓存, 也就是Map<String, SegmentBuffer> cache。注意它包含了所有的SegmentBuffer
// 总结就是, 把db新增的tag, 初始化并添加到cache,把db删除的tag,从cache中删除;db中更新的呢?这里不管,后面由更新线程去维护到cache
// init的时候执行一次,后面每分钟执行一次private void updateCacheFromDb() {logger.info("update cache from db");StopWatch sw = new Slf4JStopWatch();try {List<String> dbTags = dao.getAllTags();// 可能有新加的tags, 这里仅仅加载 tag,不包括valueif (dbTags == null || dbTags.isEmpty()) {return;}List<String> cacheTags = new ArrayList<String>(cache.keySet());Set<String> insertTagsSet = new HashSet<>(dbTags);Set<String> removeTagsSet = new HashSet<>(cacheTags);//db中新加的tags灌进cachefor(int i = 0; i < cacheTags.size(); i++){String tmp = cacheTags.get(i);if(insertTagsSet.contains(tmp)){insertTagsSet.remove(tmp); // 数据库中已经存在于cache中的,是旧的,先删除}}for (String tag : insertTagsSet) {// 现在insertTagsSet的部分都是全新的tagsSegmentBuffer buffer = new SegmentBuffer();// 默认 SegmentBuffer的init 是falsebuffer.setKey(tag);Segment segment = buffer.getCurrent();// currentPos 永远只会在 0/1之间切换segment.setValue(new AtomicLong(0));// value表示 实际的分布式的唯一id值segment.setMax(0); // max 是 当前号段的 最大值; step 是步长,它会根据消耗的速度自动调整!segment.setStep(0);// 默认 value、max、 step 都是0 —— 也就是, 全部重置!cache.put(tag, buffer);// 添加到全局的 cachelogger.info("Add tag {} from db to IdCache, SegmentBuffer {}", tag, buffer);}//cache中已失效的tags从cache删除for(int i = 0; i < dbTags.size(); i++){String tmp = dbTags.get(i);if(removeTagsSet.contains(tmp)){ // 两个contains 都是在寻找重叠部分removeTagsSet.remove(tmp);//cache中存在于数据库中的,先从缓存中删除}}for (String tag : removeTagsSet) { // 现在removeTagsSet的部分都是是旧的、已经失效的tagscache.remove(tag); // 又从全局的 cache中删除。logger.info("Remove tag {} from IdCache", tag);}} catch (Exception e) {logger.warn("update cache from db exception", e);} finally {sw.stop("updateCacheFromDb");}}@Overridepublic Result get(final String key) {if (!initOK) { // 全局cache 是否初始化完成? 一般是不会说没有完成初始化return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);}if (cache.containsKey(key)) {SegmentBuffer buffer = cache.get(key);if (!buffer.isInitOk()) {// 对应key的buffer 是否初始化完成? 第一次调用get方法肯定是falsesynchronized (buffer) { // 没完成则加锁, 因为可能有多线程调用此get 方法if (!buffer.isInitOk()) {// 再次判断, 防止 xxx问题try {updateSegmentFromDb(key, buffer.getCurrent());// 通过数据库 初始化、更新buffer中 的当前号段logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());buffer.setInitOk(true);// 必须到这里 才算完成buffer的 初始化; 也是全局唯一被调用的地方!} catch (Exception e) {logger.warn("Init buffer {} exception", buffer.getCurrent(), e);}}}}// 到这里,肯定已经完成对应key的buffer的初始化, 所以直接从号段缓存中 获取idreturn getIdFromSegmentBuffer(cache.get(key));// 直接从号段缓存中 获取id}//可能数据库中不存在对应的key, 那就暂时只能异常返回return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);}// 通过数据库 更新buffer中 的号段
// 注意 initOK为true,号段 缓存可能还没有初始化; 也就是init() 方法实际并没有对缓存做 初始化。
// 全局被调用的地方, 就两处public void updateSegmentFromDb(String key, Segment segment) {StopWatch sw = new Slf4JStopWatch();SegmentBuffer buffer = segment.getBuffer();LeafAlloc leafAlloc;if (!buffer.isInitOk()) {// 再次判断,第一次调用get方法肯定init=false,于是进入这个ifleafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);// 直接查询数据库buffer.setStep(leafAlloc.getStep());buffer.setMinStep(leafAlloc.getStep());//leafAlloc中的step为DB中的step; 最小步长,表明真正的步长是可能大于minStep的} else if (buffer.getUpdateTimestamp() == 0) {// 如果buffer已经init过,第二次会进这个if, 因为setUpdateTimestamp全局被调用两次,这个if和下一个elseleafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);//buffer.setUpdateTimestamp(System.currentTimeMillis());// 唯二的全局被调用;为什么有这个if, 为什么特地需要特地调用这个方法buffer.setStep(leafAlloc.getStep());// 和上一个if 是不是重复了? 代码是不是有些啰嗦?buffer.setMinStep(leafAlloc.getStep());// minStep的作用仅仅是为了 动态调整, 实际使用的还是 step !!} else {// 如果buffer已经init过,第三次会进这个else; 这里其实是根据duration 动态调整步长!long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();int nextStep = buffer.getStep();if (duration < SEGMENT_DURATION) {// 如果duration 太短了, 那么说明步长有点小,id 消耗快,那么进入调整。if (nextStep * 2 > MAX_STEP) { // 如果*2 会超出最大步长,那就不调整了,否则就*2//do nothing} else {nextStep = nextStep * 2;}} else if (duration < SEGMENT_DURATION * 2) { 如果 SEGMENT_DURATION < duration < SEGMENT_DURATION * 2, 那么不管//do nothing with nextStep} else {// 否则就是 > SEGMENT_DURATION * 2; 那么 步长除以2,但是不能小于minnextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;}logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);LeafAlloc temp = new LeafAlloc();temp.setKey(key);temp.setStep(nextStep);leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);// 动态调整记录到数据库中~buffer.setUpdateTimestamp(System.currentTimeMillis());// 唯二的全局被调用buffer.setStep(nextStep);buffer.setMinStep(leafAlloc.getStep());//leafAlloc的step为DB中的step}// must set value before set maxlong value = leafAlloc.getMaxId() - buffer.getStep();//leafAlloc.getMaxId()是后端新的最大值,buffer.getStep()是新的步长segment.getValue().set(value);// value 和max 构成了 号段的起止值; value 其实命名并不好,容易让人误解!segment.setMax(leafAlloc.getMaxId()); // value 、max、step 都设置好了,表示 号段初始化、更新完成segment.setStep(buffer.getStep());sw.stop("updateSegmentFromDb", key + " " + segment);}// 直接从号段缓存中 获取id
// 获取的时候, 需要检查是否超过了 10%, 超过则 另外启动任务去异步 加载新的 号段!public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {while (true) {buffer.rLock().lock();try {final Segment segment = buffer.getCurrent();// 每次get 都会进入这里,但是其实 isNextReady条件 只有设置一次if (!buffer.isNextReady() && (segment.getIdle() < 0.9 * segment.getStep()) && buffer.getThreadRunning().compareAndSet(false, true)) {service.execute(new Runnable() {// 异步执行@Overridepublic void run() { // 进入了这个方法,buffer.getThreadRunning 已经被cas设置为了true; 保证了 这个方法不会有并发Segment next = buffer.getSegments()[buffer.nextPos()];boolean updateOk = false;try {updateSegmentFromDb(buffer.getKey(), next);// next 是指使用 下一个号段;这里也就是更新下一个号段updateOk = true;logger.info("update segment {} from db {}", buffer.getKey(), next);} catch (Exception e) {logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);} finally {if (updateOk) {buffer.wLock().lock();// 这里使用了 写锁, 是因为有写 的并发竞争buffer.setNextReady(true);// 表示下一个号段已经准备好!buffer.getThreadRunning().set(false);// 操作完毕,又把buffer线程状态设置为false,因为task 已经执行完buffer.wLock().unlock();} else {buffer.getThreadRunning().set(false);// 不管怎么样,需要把buffer线程状态设置为false,因为task 已经执行完}}}});}long value = segment.getValue().getAndIncrement();// 这里可能有读的并发,if (value < segment.getMax()) { // 未达到号段的最大值,也就是最右端return new Result(value, Status.SUCCESS);// 到这里, 就成功返回; 一般情况 就是进入这个if 然后返回!}} finally {buffer.rLock().unlock(); // 不管怎么样,这里释放读锁}//什么情况会需要wait? 上面的方法没有进入上一个if已经达到号段的最大值! 或者出现异常,当然一般不会有异常;//达到号段的最大值 意味着需要使用下一个号段,// waitAndSleep其实是等待 正在执行的线程把任务执行完成;具体是 判断并自旋10000,然后超过10000,那就每次sleep 10毫秒,然后退出..waitAndSleep(buffer);// 总之,waitAndSleep保证没有正在执行的更新线程; 但也不是100% 保证!buffer.wLock().lock();// 执行写锁, 排斥任何其他的线程!try {final Segment segment = buffer.getCurrent();// 能继续使用同一个号段吗?long value = segment.getValue().getAndIncrement();if (value < segment.getMax()) {// 为什么需要再次判断? max会在已经确定的情况下变化? 也不会,大概是保险起见?return new Result(value, Status.SUCCESS);}if (buffer.isNextReady()) {// 已经达到号段的最大值,此时前面必然已经完成了新号段的获取, 肯定进入此判断buffer.switchPos(); // 全局唯一的 切换! 虽然完成了切换, 但是不立即获取value,而是等待下一次的循环!buffer.setNextReady(false);} else {// 这种情况, 不太可能发生吧!logger.error("Both two segments in {} are not ready!", buffer);return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);}} finally {buffer.wLock().unlock();}}}private void waitAndSleep(SegmentBuffer buffer) {int roll = 0;while (buffer.getThreadRunning().get()) {roll += 1;if(roll > 10000) {try {TimeUnit.MILLISECONDS.sleep(10);break;// 不管怎么样,需要退出, 不能在这个方法等太久。 就是说, 极端情况下, 此方法不能保证 等待正在执行的任务完成。} catch (InterruptedException e) {logger.warn("Thread {} Interrupted",Thread.currentThread().getName());break;}}}}public List<LeafAlloc> getAllLeafAllocs() {return dao.getAllLeafAllocs();}public Map<String, SegmentBuffer> getCache() {return cache;}public IDAllocDao getDao() {return dao;}public void setDao(IDAllocDao dao) {this.dao = dao;}
}
表结构设计:
CREATE TABLE `leaf_alloc` (`biz_tag` varchar(128) NOT NULL,`max_id` bigint(20) NOT NULL,`step` int(11) NOT NULL,`update_time` datetime NOT NULL,PRIMARY KEY (`biz_tag`)
)
2.2 Leaf-Snowflake(雪花算法改进版)
2.2.1 Snowflake
雪花算法简单来说是这样一个长整形数值。它64位,8个字节,刚好一个long。(为什么雪花算法ID是64位? 大概也是这个原因吧。理论上当然可以使用更多位,但是其实不是很有必要)
雪花算法,在单个节点上是有序的,如同 号段模式,但它也不是 全局严格有序,而是单个节点严格递增。
雪花算法的问题
1 因为雪花算法 依赖于本地时钟。所以存在时钟回拨问题。那么,如何避免时钟回拨?
2 雪花算法实现起来不复杂。但是问题是分布式场景之下,当需要启动的leaf服务越来越多时,对其分配workerId是一件非常令人头疼的事情。我们要做的是,尽量让一件事情简单化,让用户无感知。百度的UID做到了(文末有相关阅读链接),leaf也做到了!
参见上图整个启动流程图,服务启动时首先检查自己是否写过ZooKeeper leaf_forever节点:
- 若写过,则用自身系统时间与leaf_forever/ s e l f 节点记录时间做比较,若小于 l e a f f o r e v e r / {self}节点记录时间做比较,若小于leaf_forever/ self节点记录时间做比较,若小于leafforever/{self}时间则认为机器时间发生了大步长回拨,服务启动失败并报警。
- 若未写过,证明是新服务节点,直接创建持久节点leaf_forever/${self}并写入自身系统时间,接下来综合对比其余Leaf节点的系统时间来判断自身系统时间是否准确,具体做法是取leaf_temporary下的所有临时节点(所有运行中的Leaf-snowflake节点)的服务IP:Port,然后通过RPC请求得到所有节点的系统时间,计算sum(time)/nodeSize。
- 若abs( 系统时间-sum(time)/nodeSize ) < 阈值,认为当前系统时间准确,正常启动服务,同时写临时节点leaf_temporary/${self} 维持租约。
- 否则认为本机系统时间发生大步长偏移,启动失败并报警。
- 每隔一段时间(3s)上报自身系统时间写入leaf_forever/${self}。
由于强依赖时钟,对时间的要求比较敏感,在机器工作时NTP同步也会造成秒级别的回退,建议可以直接关闭NTP同步。要么在时钟回拨的时候直接不提供服务直接返回ERROR_CODE,等时钟追上即可。或者做一层重试,然后上报报警系统,更或者是发现有时钟回拨之后自动摘除本身节点并报警,如下:
//发生了回拨,此刻时间小于上次发号时间if (timestamp < lastTimestamp) {long offset = lastTimestamp - timestamp;if (offset <= 5) {try {//时间偏差大小小于5ms,则等待两倍时间wait(offset << 1);//waittimestamp = timeGen();if (timestamp < lastTimestamp) {//还是小于,抛异常并上报throwClockBackwardsEx(timestamp);} } catch (InterruptedException e) { throw e;}} else {//throwthrowClockBackwardsEx(timestamp);}}//分配ID
改进点:
- 使用Zookeeper持久顺序节点分配WorkerID
- 时钟回拨解决方案:
- 轻度回拨:等待时钟同步
- 严重回拨:拒绝服务并报警
- 历史最大时间追踪机制
3.2 Leaf-Snowflake关键实现
WorkerID分配流程:
public class SnowflakeIDGenImpl implements IDGen {// Zookeeper节点路径private static final String ZK_PATH = "/leaf/snowflake/" + serviceName;private void initWorkerID() {// 检查本地缓存if (workerID == null) {// 从ZK获取或创建持久节点String path = zkClient.createPersistentSequential(ZK_PATH + "/worker", true);workerID = parseWorkerID(path);}}
}
时钟回拨处理:
public synchronized Result get(String key) {long timestamp = timeGen();// 时钟回拨检测if (timestamp < lastTimestamp) {long offset = lastTimestamp - timestamp;if (offset <= 5) {wait(offset << 1); // 等待两倍偏移时间timestamp = timeGen();} else {throw new RuntimeException("Clock moved backwards");}}//...生成ID逻辑
}
四、性能优化策略
4.1 分段缓存优化
- 动态调整step值(根据TPS自动调整号段长度)
- 预加载机制(号段使用达阈值时触发异步加载)
4.2 异常处理机制
- 数据库故障降级策略
- Zookeeper连接失败本地缓存
- 监控报警系统集成
五、Leaf vs 原生Snowflake
对比维度 | Leaf-Snowflake | 原生Snowflake |
---|---|---|
WorkerID分配 | Zookeeper动态分配 | 手动配置 |
时钟回拨处理 | 多级处理策略 | 未处理 |
扩展性 | 支持HTTP/RPC多协议 | 需自行封装 |
运维支持 | 提供监控控制台 | 无 |
六、生产环境实践建议
-
部署方案选择:
- 对TPS要求高:优先使用Leaf-Segment
- 需要严格递增:Leaf-Snowflake
- 混合部署时需注意业务标签隔离
-
参数调优建议:
# 号段模式配置 leaf.segment.step=2000 # 初始步长 leaf.segment.threshold=0.1 # 加载阈值(10%)# Snowflake配置 leaf.snowflake.zk.address=127.0.0.1:2181 leaf.snowflake.port=8080
七、面试要点
核心面试问题:
- Leaf如何解决Snowflake的时钟回拨问题?
- 双Buffer机制如何保证服务可用性?
- 号段模式相比直接DB查询的优势是什么?
- Zookeeper在Leaf中起什么作用?
设计思想启示:
- 分布式系统设计中的可用性与一致性权衡
- 无状态服务设计的重要性
- 监控系统对中间件运维的关键作用
先使用Leaf-Segment
- 需要严格递增:Leaf-Snowflake
- 混合部署时需注意业务标签隔离
-
参数调优建议:
# 号段模式配置 leaf.segment.step=2000 # 初始步长 leaf.segment.threshold=0.1 # 加载阈值(10%)# Snowflake配置 leaf.snowflake.zk.address=127.0.0.1:2181 leaf.snowflake.port=8080
七、面试要点
核心面试问题:
- Leaf如何解决Snowflake的时钟回拨问题?
- 双Buffer机制如何保证服务可用性?
- 号段模式相比直接DB查询的优势是什么?
- Zookeeper在Leaf中起什么作用?
设计思想启示:
- 分布式系统设计中的可用性与一致性权衡
- 无状态服务设计的重要性
- 监控系统对中间件运维的关键作用
GitHub源码地址:https://github.com/Meituan-Dianping/Leaf