Caffeine 手动策略缓存 put() 方法源码解析

news/2024/11/7 3:49:20/

BoundedLocalManualCache put() 方法源码解析

先看一下BoundedLocalManualCache的类图

BoundedLocalManualCache

com.github.benmanes.caffeine.cache.BoundedLocalCache中定义的BoundedLocalManualCache静态内部类。

static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable

实现了LocalManualCache接口,这个接口提供了Cache接口的骨架实现,以最简的方式去实现一个LocalCache

详细查看LocalManualCache接口里定义的内容,代码也不多,直接贴到内容里:

interface LocalManualCache<K, V> extends Cache<K, V> {/** Returns the backing {@link LocalCache} data store. */LocalCache<K, V> cache();@Overridedefault long estimatedSize() {return cache().estimatedSize();}@Overridedefault void cleanUp() {cache().cleanUp();}@Overridedefault @Nullable V getIfPresent(Object key) {return cache().getIfPresent(key, /* recordStats */ true);}@Overridedefault @Nullable V get(K key, Function<? super K, ? extends V> mappingFunction) {return cache().computeIfAbsent(key, mappingFunction);}@Overridedefault Map<K, V> getAllPresent(Iterable<?> keys) {return cache().getAllPresent(keys);}@Overridedefault Map<K, V> getAll(Iterable<? extends K> keys,Function<Iterable<? extends K>, Map<K, V>> mappingFunction) {requireNonNull(mappingFunction);Set<K> keysToLoad = new LinkedHashSet<>();Map<K, V> found = cache().getAllPresent(keys);Map<K, V> result = new LinkedHashMap<>(found.size());for (K key : keys) {V value = found.get(key);if (value == null) {keysToLoad.add(key);}result.put(key, value);}if (keysToLoad.isEmpty()) {return found;}bulkLoad(keysToLoad, result, mappingFunction);return Collections.unmodifiableMap(result);}/*** Performs a non-blocking bulk load of the missing keys. Any missing entry that materializes* during the load are replaced when the loaded entries are inserted into the cache.*/default void bulkLoad(Set<K> keysToLoad, Map<K, V> result,Function<Iterable<? extends @NonNull K>, @NonNull Map<K, V>> mappingFunction) {boolean success = false;long startTime = cache().statsTicker().read();try {Map<K, V> loaded = mappingFunction.apply(keysToLoad);loaded.forEach((key, value) ->cache().put(key, value, /* notifyWriter */ false));for (K key : keysToLoad) {V value = loaded.get(key);if (value == null) {result.remove(key);} else {result.put(key, value);}}success = !loaded.isEmpty();} catch (RuntimeException e) {throw e;} catch (Exception e) {throw new CompletionException(e);} finally {long loadTime = cache().statsTicker().read() - startTime;if (success) {cache().statsCounter().recordLoadSuccess(loadTime);} else {cache().statsCounter().recordLoadFailure(loadTime);}}}@Overridedefault void put(K key, V value) {cache().put(key, value);}@Overridedefault void putAll(Map<? extends K, ? extends V> map) {cache().putAll(map);}@Overridedefault void invalidate(Object key) {cache().remove(key);}@Overridedefault void invalidateAll(Iterable<?> keys) {cache().invalidateAll(keys);}@Overridedefault void invalidateAll() {cache().clear();}@Overridedefault CacheStats stats() {return cache().statsCounter().snapshot();}@Overridedefault ConcurrentMap<K, V> asMap() {return cache();}
}

可以看到,CacheLoader接口定义了loadloadAllputputAllinvalidateinvalidateAllstatsasMap等方法,做一个简单实现。这些方法提供了缓存的基本操作,如加载缓存、添加缓存、移除缓存、获取缓存统计信息等。

Manual Cache 源码

static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable {private static final long serialVersionUID = 1;final BoundedLocalCache<K, V> cache;final boolean isWeighted;@Nullable Policy<K, V> policy;BoundedLocalManualCache(Caffeine<K, V> builder) {this(builder, null);}BoundedLocalManualCache(Caffeine<K, V> builder, @Nullable CacheLoader<? super K, V> loader) {cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);isWeighted = builder.isWeighted();}@Overridepublic BoundedLocalCache<K, V> cache() {return cache;}@Overridepublic Policy<K, V> policy() {return (policy == null)? (policy = new BoundedPolicy<>(cache, Function.identity(), isWeighted)): policy;}@SuppressWarnings("UnusedVariable")private void readObject(ObjectInputStream stream) throws InvalidObjectException {throw new InvalidObjectException("Proxy required");}Object writeReplace() {return makeSerializationProxy(cache, isWeighted);}}

定义了一个BoundedLocalCache属性,还有权重的标志位isWeighted,以及一个Policy属性。BoundedLocalManualCache的构造方法中,调用了LocalCacheFactory.newBoundedLocalCache方法,创建了一个BoundedLocalCache对象,并赋值给cache属性。policy属性则是在policy()方法中创建的。policy 是一个BoundedPolicy对象,它实现了Policy接口,用于管理缓存策略。BoundedPolicy源码紧接着就在BoundedLocalManualCache下面,这里就不贴出来了。

static final class BoundedPolicy<K, V> implements Policy<K, V>,里具体定义了了BoundedLocalCache缓存策略,比如缓存大小,缓存权重,缓存过期时间等。

接下来我们看BoundedLocalCacheput方法

手动使用调用cache.put(k, v);会调用put(key, value, expiry(), /* notifyWriter */ true, /* onlyIfAbsent */ false);
具体的参数解释如下:

  • key:要放入缓存的键。
  • value:要放入缓存的值。
  • expiry:缓存的过期时间,默认为Duration.ZERO,表示永不过期。
  • notifyWriter:是否通知写入者,默认为true
  • onlyIfAbsent:是否只在缓存中不存在该键时才放入,默认为false

put 方法源码如下:

@Nullable V put(K key, V value, Expiry<K, V> expiry, boolean notifyWriter, boolean onlyIfAbsent) {requireNonNull(key);requireNonNull(value);Node<K, V> node = null;long now = expirationTicker().read();int newWeight = weigher.weigh(key, value);for (;;) {// 获取 prior 节点Node<K, V> prior = data.get(nodeFactory.newLookupKey(key));if (prior == null) {// 如果不存在 prior 节点,则创建新的节点if (node == null) {node = nodeFactory.newNode(key, keyReferenceQueue(),value, valueReferenceQueue(), newWeight, now);setVariableTime(node, expireAfterCreate(key, value, expiry, now));}// notifyWriter 为 true 且存在Writer时,通知Writerif (notifyWriter && hasWriter()) {Node<K, V> computed = node;prior = data.computeIfAbsent(node.getKeyReference(), k -> {writer.write(key, value);return computed;});//    如果存在 prior 节点,调用 afterWrite 方法if (prior == node) {afterWrite(new AddTask(node, newWeight));return null;// 如果onlyIfAbsent 为 true。代表只在缓存中不存在该键时才放入缓存} else if (onlyIfAbsent) {V currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}// 如果 notifyWriter 为 false,直接放入缓存} else {prior = data.putIfAbsent(node.getKeyReference(), node);if (prior == null) {afterWrite(new AddTask(node, newWeight));return null;} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}}} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}// 如果 prior != null,则说明该节点已经存在,则尝试获取锁V oldValue;long varTime;int oldWeight;boolean expired = false;boolean mayUpdate = true;boolean exceedsTolerance = false;synchronized (prior) {if (!prior.isAlive()) {continue;}oldValue = prior.getValue();oldWeight = prior.getWeight();// 如果 oldValue == null,通过 expireAfterCreate 方法计算过期时间,并删除key对应的值if (oldValue == null) {varTime = expireAfterCreate(key, value, expiry, now);writer.delete(key, null, RemovalCause.COLLECTED);// 返回prior是否过期,true,则删除key对应的值} else if (hasExpired(prior, now)) {expired = true;varTime = expireAfterCreate(key, value, expiry, now);writer.delete(key, oldValue, RemovalCause.EXPIRED);// 如果 onlyIfAbsent 为 true,则不更新key对应的值,返回新的过期时间} else if (onlyIfAbsent) {mayUpdate = false;varTime = expireAfterRead(prior, key, value, expiry, now);} else {varTime = expireAfterUpdate(prior, key, value, expiry, now);}// notifyWriter 为true,如果过期或者更新了值,则通知Writerif (notifyWriter && (expired || (mayUpdate && (value != oldValue)))) {writer.write(key, value);}// 如果mayUpdate为true,计算过期时间是否超出容忍度if (mayUpdate) {exceedsTolerance =(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)|| (expiresVariable()&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);setWriteTime(prior, now);prior.setWeight(newWeight);prior.setValue(value, valueReferenceQueue());}// 设置访问时间和过期时间setVariableTime(prior, varTime);setAccessTime(prior, now);}// 如果在创建缓存时设置了移除监听器,则通知移除监听器if (hasRemovalListener()) {if (expired) {notifyRemoval(key, oldValue, RemovalCause.EXPIRED);} else if (oldValue == null) {notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);} else if (mayUpdate && (value != oldValue)) {notifyRemoval(key, oldValue, RemovalCause.REPLACED);}}// 更新权重,判断是不是第一写入,如果是,调用afterWrite方法int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;if ((oldValue == null) || (weightedDifference != 0) || expired) {afterWrite(new UpdateTask(prior, weightedDifference));// 判断 onlyIfAbsent 是否为 true,以及是否超过容忍度,如果超过容忍度,调用afterWrite方法} else if (!onlyIfAbsent && exceedsTolerance) {afterWrite(new UpdateTask(prior, weightedDifference));} else {if (mayUpdate) {setWriteTime(prior, now);}//执行 afterRead 方法afterRead(prior, now, /* recordHit */ false);}return expired ? null : oldValue;}}

案例中通过 cache.put(k,v)调用方法,走到这个方法中,因为是第一次尝试储存key和value,所以代码中声明的 node = null,获取的prior = nullif (prior == null),创建新节点,设置创建后过期时间。notifyWriter=truehasWriter=false,执行else中方法

          prior = data.putIfAbsent(node.getKeyReference(), node);if (prior == null) {afterWrite(new AddTask(node, newWeight));return null;} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}

putIfAbsent 方法:由于data中不存在我们的key,value,返回 null,调用 afterWrite() 方法,将任务放入writeBuffer中,调用scheduleAfterWrite()方法

  void afterWrite(Runnable task) {for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {if (writeBuffer.offer(task)) {scheduleAfterWrite();return;}scheduleDrainBuffers();}

scheduleAfterWrite()方法:

  void scheduleAfterWrite() {for (;;) {switch (drainStatus()) {case IDLE:casDrainStatus(IDLE, REQUIRED);scheduleDrainBuffers();return;case REQUIRED:scheduleDrainBuffers();return;case PROCESSING_TO_IDLE:if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {return;}continue;case PROCESSING_TO_REQUIRED:return;default:throw new IllegalStateException();}}}

看到这我其实是有点蒙了,因为笔者的异步编程基础薄弱,只看方法名字做一个不负责任的猜想,写入后安排异步任务,条件符合执行清理计划,会继续调用 scheduleDrainBuffers() 方法

scheduleDrainBuffers() 方法:

void scheduleDrainBuffers() {if (drainStatus() >= PROCESSING_TO_IDLE) {return;}if (evictionLock.tryLock()) {try {int drainStatus = drainStatus();if (drainStatus >= PROCESSING_TO_IDLE) {return;}lazySetDrainStatus(PROCESSING_TO_IDLE);executor.execute(drainBuffersTask);} catch (Throwable t) {logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);maintenance(/* ignored */ null);} finally {evictionLock.unlock();}}}

drainStatus() 就是返回这条件的值,如果大于等于 PROCESSING_TO_IDLE 就直接返回,否则执行 tryLock() 方法,如果成功,则执行 executor.execute(drainBuffersTask); 方法,否则执行 maintenance() 方法,这个方法就是执行清理任务的方法。

传进来的drainBuffersTask是一个PerformCleanupTask,这个类实现了Runnable接口,重写了run()方法,这个方法就是执行清理任务的方法。

    @Overridepublic void run() {BoundedLocalCache<?, ?> cache = reference.get();if (cache != null) {cache.performCleanUp(/* ignored */ null);}}

继续看performCleanUp()方法:

  void performCleanUp(@Nullable Runnable task) {evictionLock.lock();try {maintenance(task);} finally {evictionLock.unlock();}if ((drainStatus() == REQUIRED) && (executor == ForkJoinPool.commonPool())) {scheduleDrainBuffers();}}

可以看到,这里也是调用了maintenance()方法,然后判断drainStatus()是否等于REQUIRED,如果等于,则调用scheduleDrainBuffers()方法。

@GuardedBy("evictionLock")void maintenance(@Nullable Runnable task) {lazySetDrainStatus(PROCESSING_TO_IDLE);try {drainReadBuffer();drainWriteBuffer();if (task != null) {task.run();}drainKeyReferences();drainValueReferences();expireEntries();evictEntries();climb();} finally {if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {lazySetDrainStatus(REQUIRED);}}}

maintenance() 是实际的清理方法,它首先将drainStatus()设置为PROCESSING_TO_IDLE,然后调用drainReadBuffer()drainWriteBuffer()drainKeyReferences()drainValueReferences()expireEntries()evictEntries()climb()等方法,清理读写缓冲区、过期条目、驱逐条目等。

到这里,afterWrite()基本就执行完了,写入一次(key,value),都会去判断是否需要清理,如果需要清理,就异步调用maintenance()方法进行清理。

如果是给已经存在的key设置值,put方法执行到最后会调用 afterRead()方法

  void afterRead(Node<K, V> node, long now, boolean recordHit) {if (recordHit) {statsCounter().recordHits(1);}boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);if (shouldDrainBuffers(delayable)) {scheduleDrainBuffers();}refreshIfNeeded(node, now);}

afterRead()方法会记录命中次数,然后判断是否需要延迟写入缓冲区,如果需要延迟写入缓冲区,则将节点放入读取缓冲区,如果读取缓冲区已满,则调用scheduleDrainBuffers()方法异步清理缓冲区,最后调用refreshIfNeeded()方法异步刷新节点。

refreshIfNeeded()方法会根据节点的过期时间、访问时间、更新时间等判断是否需要刷新节点,如果需要刷新节点,则调用refresh()方法刷新节点。

本例中没有设置过期时间,直接返回。

总结

本文算是比较详细的把put()方法执行流程分析了一遍,通过分析put()方法,我们可以了解到Caffeine缓存的基本原理,以及如何使用Caffeine缓存,学习如何自己实现一个本地缓存的 put()方法,怎样执行一个异步的清理任务,怎样判断是否需要清理,怎样异步刷新节点等等。

笔者也是一个小菜鸟,刚开始看一些源码,可能有些地方理解的不对,欢迎指正,谢谢!

希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言讨论。


http://www.ppmy.cn/news/1544947.html

相关文章

10万就能买增程车!买车一定要等它

文 | AUTO芯球 作者 | 雷慢 现在千万别买增程车&#xff0c; 不然过不了多久肯定得后悔&#xff0c; 现在的零跑杀疯了&#xff0c; C16、C11和C10已经抢了理想一大批的潜在用户&#xff0c; 而且更劲爆的来了&#xff0c;零跑要搞价格更低的B10、B11、B16系列了&#xf…

React面试基础题大全(all)

前端react面试基础知识&#xff08;II)-CSDN博客 前端react常见面试题目&#xff08;basic)-CSDN博客 为什么 React 中的 props 被认为是只读的&#xff1f; React 中的 props 被认为是只读的&#xff0c;这是为了保证 React 的单向数据流的设计模式&#xff0c;使状态更可预…

【力扣打卡系列】单调栈

坚持按题型打卡&刷&梳理力扣算法题系列&#xff0c;语言为go&#xff0c;Day20 单调栈 题目描述 解题思路 单调栈 后进先出 记录的数据加在最上面丢掉数据也先从最上面开始 单调性 记录t[i]之前会先把所有小于等于t[i]的数据丢掉&#xff0c;不可能出现上面大下面小的…

如何在社媒平台上使用代理IP来保护帐号安全

社媒平台如Facebook、Twitter、Instagram等&#xff0c;不仅是用户分享生活与信息的重要平台&#xff0c;也是各类网络攻击的目标。利用代理IP可以帮助使用者保护帐号安全&#xff0c;防止个人信息外泄和帐号被盗用的风险。 一、为什么需要使用代理IP保护社媒帐号&#xff1f;…

创建一个基于SSM(Spring, Spring MVC, MyBatis)的教学视频点播系统

开发指南&#xff1a;包括项目结构、数据库设计、配置文件、DAO层、Service层、Controller层和前端页面的示例。 1. 需求分析 明确系统的主要功能需求&#xff0c;例如&#xff1a; 用户注册与登录视频上传与管理视频播放与评论分类管理搜索功能用户权限管理 2. 技术选型 …

033_Structure_Static_In_Matlab求解结构静力学问题两套方法

结构静力学问题 静力学问现在是已经很简单的问题&#xff0c;在材料各向同性的情况下&#xff0c;对于弹性固体材料&#xff0c;很容易通过有限元求解。特别是线弹性问题&#xff0c;方程的矩阵形式可以很容易的写出&#xff08;准确得说是很容易通过有限元表达&#xff09;&a…

InsCode 桌面版 IDE 推荐及使用指南

小编的inscode部署项目&#xff1a;割绳子游戏。 引言 随着技术的发展&#xff0c;集成开发环境&#xff08;IDE&#xff09;已经成为现代软件开发中不可或缺的工具。InsCode 桌面版 IDE 作为一款由联想公司研发的强大开发工具&#xff0c;不仅提供了丰富的功能和高效的开发体…

推荐一款非常好用的视频编辑软件:Movavi Video Editor Plus

MovaviVideoEditorPlus(视频编辑软件)可以制作令人惊叹的视频&#xff0c;即使您没有任何视频编辑方面的经验! 该款视频编辑程序没有复杂的设置&#xff0c;只需进行直观的拖放控制。在您的电脑上免费使用MovaviVideoEditor亲身体验它的简单易用性与强大功能! 基本简介 您是否…