Caffeine 手动策略缓存 put() 方法源码解析

devtools/2024/11/6 23:10:26/

BoundedLocalManualCache put() 方法源码解析

先看一下BoundedLocalManualCache的类图

BoundedLocalManualCache

com.github.benmanes.caffeine.cache.BoundedLocalCache中定义的BoundedLocalManualCache静态内部类。

static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable

实现了LocalManualCache接口,这个接口提供了Cache接口的骨架实现,以最简的方式去实现一个LocalCache

详细查看LocalManualCache接口里定义的内容,代码也不多,直接贴到内容里:

interface LocalManualCache<K, V> extends Cache<K, V> {/** Returns the backing {@link LocalCache} data store. */LocalCache<K, V> cache();@Overridedefault long estimatedSize() {return cache().estimatedSize();}@Overridedefault void cleanUp() {cache().cleanUp();}@Overridedefault @Nullable V getIfPresent(Object key) {return cache().getIfPresent(key, /* recordStats */ true);}@Overridedefault @Nullable V get(K key, Function<? super K, ? extends V> mappingFunction) {return cache().computeIfAbsent(key, mappingFunction);}@Overridedefault Map<K, V> getAllPresent(Iterable<?> keys) {return cache().getAllPresent(keys);}@Overridedefault Map<K, V> getAll(Iterable<? extends K> keys,Function<Iterable<? extends K>, Map<K, V>> mappingFunction) {requireNonNull(mappingFunction);Set<K> keysToLoad = new LinkedHashSet<>();Map<K, V> found = cache().getAllPresent(keys);Map<K, V> result = new LinkedHashMap<>(found.size());for (K key : keys) {V value = found.get(key);if (value == null) {keysToLoad.add(key);}result.put(key, value);}if (keysToLoad.isEmpty()) {return found;}bulkLoad(keysToLoad, result, mappingFunction);return Collections.unmodifiableMap(result);}/*** Performs a non-blocking bulk load of the missing keys. Any missing entry that materializes* during the load are replaced when the loaded entries are inserted into the cache.*/default void bulkLoad(Set<K> keysToLoad, Map<K, V> result,Function<Iterable<? extends @NonNull K>, @NonNull Map<K, V>> mappingFunction) {boolean success = false;long startTime = cache().statsTicker().read();try {Map<K, V> loaded = mappingFunction.apply(keysToLoad);loaded.forEach((key, value) ->cache().put(key, value, /* notifyWriter */ false));for (K key : keysToLoad) {V value = loaded.get(key);if (value == null) {result.remove(key);} else {result.put(key, value);}}success = !loaded.isEmpty();} catch (RuntimeException e) {throw e;} catch (Exception e) {throw new CompletionException(e);} finally {long loadTime = cache().statsTicker().read() - startTime;if (success) {cache().statsCounter().recordLoadSuccess(loadTime);} else {cache().statsCounter().recordLoadFailure(loadTime);}}}@Overridedefault void put(K key, V value) {cache().put(key, value);}@Overridedefault void putAll(Map<? extends K, ? extends V> map) {cache().putAll(map);}@Overridedefault void invalidate(Object key) {cache().remove(key);}@Overridedefault void invalidateAll(Iterable<?> keys) {cache().invalidateAll(keys);}@Overridedefault void invalidateAll() {cache().clear();}@Overridedefault CacheStats stats() {return cache().statsCounter().snapshot();}@Overridedefault ConcurrentMap<K, V> asMap() {return cache();}
}

可以看到,CacheLoader接口定义了loadloadAllputputAllinvalidateinvalidateAllstatsasMap等方法,做一个简单实现。这些方法提供了缓存的基本操作,如加载缓存、添加缓存、移除缓存、获取缓存统计信息等。

Manual Cache 源码

static class BoundedLocalManualCache<K, V> implements LocalManualCache<K, V>, Serializable {private static final long serialVersionUID = 1;final BoundedLocalCache<K, V> cache;final boolean isWeighted;@Nullable Policy<K, V> policy;BoundedLocalManualCache(Caffeine<K, V> builder) {this(builder, null);}BoundedLocalManualCache(Caffeine<K, V> builder, @Nullable CacheLoader<? super K, V> loader) {cache = LocalCacheFactory.newBoundedLocalCache(builder, loader, /* async */ false);isWeighted = builder.isWeighted();}@Overridepublic BoundedLocalCache<K, V> cache() {return cache;}@Overridepublic Policy<K, V> policy() {return (policy == null)? (policy = new BoundedPolicy<>(cache, Function.identity(), isWeighted)): policy;}@SuppressWarnings("UnusedVariable")private void readObject(ObjectInputStream stream) throws InvalidObjectException {throw new InvalidObjectException("Proxy required");}Object writeReplace() {return makeSerializationProxy(cache, isWeighted);}}

定义了一个BoundedLocalCache属性,还有权重的标志位isWeighted,以及一个Policy属性。BoundedLocalManualCache的构造方法中,调用了LocalCacheFactory.newBoundedLocalCache方法,创建了一个BoundedLocalCache对象,并赋值给cache属性。policy属性则是在policy()方法中创建的。policy 是一个BoundedPolicy对象,它实现了Policy接口,用于管理缓存策略。BoundedPolicy源码紧接着就在BoundedLocalManualCache下面,这里就不贴出来了。

static final class BoundedPolicy<K, V> implements Policy<K, V>,里具体定义了了BoundedLocalCache缓存策略,比如缓存大小,缓存权重,缓存过期时间等。

接下来我们看BoundedLocalCacheput方法

手动使用调用cache.put(k, v);会调用put(key, value, expiry(), /* notifyWriter */ true, /* onlyIfAbsent */ false);
具体的参数解释如下:

  • key:要放入缓存的键。
  • value:要放入缓存的值。
  • expiry:缓存的过期时间,默认为Duration.ZERO,表示永不过期。
  • notifyWriter:是否通知写入者,默认为true
  • onlyIfAbsent:是否只在缓存中不存在该键时才放入,默认为false

put 方法源码如下:

@Nullable V put(K key, V value, Expiry<K, V> expiry, boolean notifyWriter, boolean onlyIfAbsent) {requireNonNull(key);requireNonNull(value);Node<K, V> node = null;long now = expirationTicker().read();int newWeight = weigher.weigh(key, value);for (;;) {// 获取 prior 节点Node<K, V> prior = data.get(nodeFactory.newLookupKey(key));if (prior == null) {// 如果不存在 prior 节点,则创建新的节点if (node == null) {node = nodeFactory.newNode(key, keyReferenceQueue(),value, valueReferenceQueue(), newWeight, now);setVariableTime(node, expireAfterCreate(key, value, expiry, now));}// notifyWriter 为 true 且存在Writer时,通知Writerif (notifyWriter && hasWriter()) {Node<K, V> computed = node;prior = data.computeIfAbsent(node.getKeyReference(), k -> {writer.write(key, value);return computed;});//    如果存在 prior 节点,调用 afterWrite 方法if (prior == node) {afterWrite(new AddTask(node, newWeight));return null;// 如果onlyIfAbsent 为 true。代表只在缓存中不存在该键时才放入缓存} else if (onlyIfAbsent) {V currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}// 如果 notifyWriter 为 false,直接放入缓存} else {prior = data.putIfAbsent(node.getKeyReference(), node);if (prior == null) {afterWrite(new AddTask(node, newWeight));return null;} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}}} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}// 如果 prior != null,则说明该节点已经存在,则尝试获取锁V oldValue;long varTime;int oldWeight;boolean expired = false;boolean mayUpdate = true;boolean exceedsTolerance = false;synchronized (prior) {if (!prior.isAlive()) {continue;}oldValue = prior.getValue();oldWeight = prior.getWeight();// 如果 oldValue == null,通过 expireAfterCreate 方法计算过期时间,并删除key对应的值if (oldValue == null) {varTime = expireAfterCreate(key, value, expiry, now);writer.delete(key, null, RemovalCause.COLLECTED);// 返回prior是否过期,true,则删除key对应的值} else if (hasExpired(prior, now)) {expired = true;varTime = expireAfterCreate(key, value, expiry, now);writer.delete(key, oldValue, RemovalCause.EXPIRED);// 如果 onlyIfAbsent 为 true,则不更新key对应的值,返回新的过期时间} else if (onlyIfAbsent) {mayUpdate = false;varTime = expireAfterRead(prior, key, value, expiry, now);} else {varTime = expireAfterUpdate(prior, key, value, expiry, now);}// notifyWriter 为true,如果过期或者更新了值,则通知Writerif (notifyWriter && (expired || (mayUpdate && (value != oldValue)))) {writer.write(key, value);}// 如果mayUpdate为true,计算过期时间是否超出容忍度if (mayUpdate) {exceedsTolerance =(expiresAfterWrite() && (now - prior.getWriteTime()) > EXPIRE_WRITE_TOLERANCE)|| (expiresVariable()&& Math.abs(varTime - prior.getVariableTime()) > EXPIRE_WRITE_TOLERANCE);setWriteTime(prior, now);prior.setWeight(newWeight);prior.setValue(value, valueReferenceQueue());}// 设置访问时间和过期时间setVariableTime(prior, varTime);setAccessTime(prior, now);}// 如果在创建缓存时设置了移除监听器,则通知移除监听器if (hasRemovalListener()) {if (expired) {notifyRemoval(key, oldValue, RemovalCause.EXPIRED);} else if (oldValue == null) {notifyRemoval(key, /* oldValue */ null, RemovalCause.COLLECTED);} else if (mayUpdate && (value != oldValue)) {notifyRemoval(key, oldValue, RemovalCause.REPLACED);}}// 更新权重,判断是不是第一写入,如果是,调用afterWrite方法int weightedDifference = mayUpdate ? (newWeight - oldWeight) : 0;if ((oldValue == null) || (weightedDifference != 0) || expired) {afterWrite(new UpdateTask(prior, weightedDifference));// 判断 onlyIfAbsent 是否为 true,以及是否超过容忍度,如果超过容忍度,调用afterWrite方法} else if (!onlyIfAbsent && exceedsTolerance) {afterWrite(new UpdateTask(prior, weightedDifference));} else {if (mayUpdate) {setWriteTime(prior, now);}//执行 afterRead 方法afterRead(prior, now, /* recordHit */ false);}return expired ? null : oldValue;}}

案例中通过 cache.put(k,v)调用方法,走到这个方法中,因为是第一次尝试储存key和value,所以代码中声明的 node = null,获取的prior = nullif (prior == null),创建新节点,设置创建后过期时间。notifyWriter=truehasWriter=false,执行else中方法

          prior = data.putIfAbsent(node.getKeyReference(), node);if (prior == null) {afterWrite(new AddTask(node, newWeight));return null;} else if (onlyIfAbsent) {// An optimistic fast path to avoid unnecessary lockingV currentValue = prior.getValue();if ((currentValue != null) && !hasExpired(prior, now)) {if (!isComputingAsync(prior)) {tryExpireAfterRead(prior, key, currentValue, expiry(), now);setAccessTime(prior, now);}afterRead(prior, now, /* recordHit */ false);return currentValue;}}

putIfAbsent 方法:由于data中不存在我们的key,value,返回 null,调用 afterWrite() 方法,将任务放入writeBuffer中,调用scheduleAfterWrite()方法

  void afterWrite(Runnable task) {for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {if (writeBuffer.offer(task)) {scheduleAfterWrite();return;}scheduleDrainBuffers();}

scheduleAfterWrite()方法:

  void scheduleAfterWrite() {for (;;) {switch (drainStatus()) {case IDLE:casDrainStatus(IDLE, REQUIRED);scheduleDrainBuffers();return;case REQUIRED:scheduleDrainBuffers();return;case PROCESSING_TO_IDLE:if (casDrainStatus(PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED)) {return;}continue;case PROCESSING_TO_REQUIRED:return;default:throw new IllegalStateException();}}}

看到这我其实是有点蒙了,因为笔者的异步编程基础薄弱,只看方法名字做一个不负责任的猜想,写入后安排异步任务,条件符合执行清理计划,会继续调用 scheduleDrainBuffers() 方法

scheduleDrainBuffers() 方法:

void scheduleDrainBuffers() {if (drainStatus() >= PROCESSING_TO_IDLE) {return;}if (evictionLock.tryLock()) {try {int drainStatus = drainStatus();if (drainStatus >= PROCESSING_TO_IDLE) {return;}lazySetDrainStatus(PROCESSING_TO_IDLE);executor.execute(drainBuffersTask);} catch (Throwable t) {logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);maintenance(/* ignored */ null);} finally {evictionLock.unlock();}}}

drainStatus() 就是返回这条件的值,如果大于等于 PROCESSING_TO_IDLE 就直接返回,否则执行 tryLock() 方法,如果成功,则执行 executor.execute(drainBuffersTask); 方法,否则执行 maintenance() 方法,这个方法就是执行清理任务的方法。

传进来的drainBuffersTask是一个PerformCleanupTask,这个类实现了Runnable接口,重写了run()方法,这个方法就是执行清理任务的方法。

    @Overridepublic void run() {BoundedLocalCache<?, ?> cache = reference.get();if (cache != null) {cache.performCleanUp(/* ignored */ null);}}

继续看performCleanUp()方法:

  void performCleanUp(@Nullable Runnable task) {evictionLock.lock();try {maintenance(task);} finally {evictionLock.unlock();}if ((drainStatus() == REQUIRED) && (executor == ForkJoinPool.commonPool())) {scheduleDrainBuffers();}}

可以看到,这里也是调用了maintenance()方法,然后判断drainStatus()是否等于REQUIRED,如果等于,则调用scheduleDrainBuffers()方法。

@GuardedBy("evictionLock")void maintenance(@Nullable Runnable task) {lazySetDrainStatus(PROCESSING_TO_IDLE);try {drainReadBuffer();drainWriteBuffer();if (task != null) {task.run();}drainKeyReferences();drainValueReferences();expireEntries();evictEntries();climb();} finally {if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {lazySetDrainStatus(REQUIRED);}}}

maintenance() 是实际的清理方法,它首先将drainStatus()设置为PROCESSING_TO_IDLE,然后调用drainReadBuffer()drainWriteBuffer()drainKeyReferences()drainValueReferences()expireEntries()evictEntries()climb()等方法,清理读写缓冲区、过期条目、驱逐条目等。

到这里,afterWrite()基本就执行完了,写入一次(key,value),都会去判断是否需要清理,如果需要清理,就异步调用maintenance()方法进行清理。

如果是给已经存在的key设置值,put方法执行到最后会调用 afterRead()方法

  void afterRead(Node<K, V> node, long now, boolean recordHit) {if (recordHit) {statsCounter().recordHits(1);}boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);if (shouldDrainBuffers(delayable)) {scheduleDrainBuffers();}refreshIfNeeded(node, now);}

afterRead()方法会记录命中次数,然后判断是否需要延迟写入缓冲区,如果需要延迟写入缓冲区,则将节点放入读取缓冲区,如果读取缓冲区已满,则调用scheduleDrainBuffers()方法异步清理缓冲区,最后调用refreshIfNeeded()方法异步刷新节点。

refreshIfNeeded()方法会根据节点的过期时间、访问时间、更新时间等判断是否需要刷新节点,如果需要刷新节点,则调用refresh()方法刷新节点。

本例中没有设置过期时间,直接返回。

总结

本文算是比较详细的把put()方法执行流程分析了一遍,通过分析put()方法,我们可以了解到Caffeine缓存的基本原理,以及如何使用Caffeine缓存,学习如何自己实现一个本地缓存的 put()方法,怎样执行一个异步的清理任务,怎样判断是否需要清理,怎样异步刷新节点等等。

笔者也是一个小菜鸟,刚开始看一些源码,可能有些地方理解的不对,欢迎指正,谢谢!

希望本文对你有所帮助,如果有任何问题,欢迎在评论区留言讨论。


http://www.ppmy.cn/devtools/131859.html

相关文章

WPF+MVVM案例实战(二十)- 制作一个雷达辐射效果的按钮

文章目录 1、案例效果2、文件创建与代码实现1、创建文件2、图标资源文件3、源代码获取1、案例效果 2、文件创建与代码实现 1、创建文件 打开 Wpf_Examples 项目,在 Views 文件夹下创建窗体界面 RadarEffactWindow.xaml 。代码功能分两个部分完成,一个是样式,一个是动画。页…

C语言:初识入门篇

专栏说明&#xff1a;本专栏用于C语言学习&#xff0c;文章中出现的代码由C语言实现&#xff0c;如对你学习有所帮助&#xff0c;可以点赞收藏关注&#xff0c;感谢三连。 博客主页&#xff1a;Duck Bro 博客主页系列专栏&#xff1a;C语言专栏关注博主&#xff0c;后期持续更新…

利用pythonstudio写的PDF、图片批量水印生成器,可同时为不同读者生成多组水印

现在很多场合需要将PDF或图片加水印&#xff0c;本程序利用pythonstudio编写。 第一步 界面 其中&#xff1a; LstMask:列表框 PopupMenu:PmnMark LstFiles:列表框 PopupMenu:PmnFiles OdFiles:文件选择器 Filter:PDF文件(.PDF)|.PDF|图像文件(.JPG)|.JPG|图像文件(.png…

uniapp使用echart

一 直线图 三中国地图 <template><view class"content"><l-echart ref"chartRef"></l-echart></view> </template><script setup> import { ref, onMounted } from "vue"; import geoJson from &quo…

利用游戏引擎的优势

大家好&#xff0c;我是小蜗牛。 在当今快速发展的游戏产业中&#xff0c;选择合适的游戏引擎对开发者来说至关重要。Cocos Creator作为一款功能强大且灵活的游戏引擎&#xff0c;为开发者提供了丰富的工具和资源&#xff0c;使他们能够高效地开发出优秀的游戏。本文将探讨如何…

24.11.6 PySimpleGUI库和pymsql 库以及人脸识别小项目

PySimpleGUI 库 PySimpleGUI 是一个用于简化 GUI 编程的 Python 包&#xff0c;它封装了多种底层 GUI 框架&#xff08;如 tkinter、Qt、WxPython 等&#xff09;&#xff0c;提供了简单易用的 API。PySimpleGUI 包含了大量的控件&#xff08;也称为小部件或组件&#xff09;&…

国家级汽车检测中心联合开源网安打造安全解决方案,提升行业安全检测水平

某汽车检测中心是我国交通运输行业智能商用车领域的国家级检测中心&#xff0c;以商用车智能化、网联化检测能力引领&#xff0c;拥有交通运输部及工业和信息化部认定的“智能网联汽车自动驾驶封闭测试场地测试基地”和“交通运输部自动驾驶交通技术研发中心”&#xff0c;主要…

GPT-SoVITS 部署方案

简介 当前主流的开源TTS框架&#xff0c;这里介绍部署该服务的主要流程和我在使用过程中出现的问题。 使用的技术 Docker、Jupyter、Python、C# 部署 docker的使用 拉取命令 docker pull jupyter/base-notebook:python-3.10.11jupyter的访问 docker运行以后可以直接使用…