JetCache的源码

1
$ git clone git@github.com:alibaba/jetcache.git

Cache

Cache接口定义和一些了的新增, 插入, 移除等操作方法, 除此之外还有tryLock方法的默认实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
default AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
if (key == null) {
return null;
}
final String uuid = UUID.randomUUID().toString();
final long expireTimestamp = System.currentTimeMillis() + timeUnit.toMillis(expire);
// 配置有些阈值要用其判断循环次数.
final CacheConfig config = config();
// 实现AutoReleaseLock的解锁操作, 是通过缓存来实现的锁
AutoReleaseLock lock = () -> {
int unlockCount = 0;
while (unlockCount++ < config.getTryLockUnlockCount()) {
if(System.currentTimeMillis() < expireTimestamp) {
// 锁未过期时, 尝试移除锁
CacheResult unlockResult = REMOVE(key);
if (unlockResult.getResultCode() == CacheResultCode.FAIL
|| unlockResult.getResultCode() == CacheResultCode.PART_SUCCESS) {
// 失败和半成功都需要重试
} else if (unlockResult.isSuccess()) {
// 锁移除成功
return;
} else {
// NOT_EXISTS, EXISTS, EXPIRED 等状态为异常状态, 兜底直接解
return;
}
} else {
// 超时了直接解锁
return;
}
}
};
// 加锁操作, 是通过缓存来实现的锁
int lockCount = 0;
Cache cache = this;
// 循环尝试加锁, 配置中包含阈值
while (lockCount++ < config.getTryLockLockCount()) {
CacheResult lockResult = cache.PUT_IF_ABSENT(key, uuid, expire, timeUnit);
if (lockResult.isSuccess()) {
// 成功向缓存插入记录, 即获取了锁
return lock;
} else if (lockResult.getResultCode() == CacheResultCode.FAIL || lockResult.getResultCode() == CacheResultCode.PART_SUCCESS) {
// 失败或者半成功
int inquiryCount = 0;
while (inquiryCount++ < config.getTryLockInquiryCount()) {
// 循环查询刚刚put的锁, 等待加锁完成
CacheGetResult inquiryResult = cache.GET(key);
if (inquiryResult.isSuccess()) {
if (uuid.equals(inquiryResult.getValue())) {
// 查询加锁成功并且uuid和当前线程生成的一毛一样, 表示是自己刚刚put的锁.
return lock;
} else {
// 查询加锁成功,但是内部的value不是自己生成的, 这个说不是自己刚刚put的.
return null;
}
} else {
// 配置的阈值范围内, 循环验证刚加的锁是否成功.
}
}
} else {
// 其他方控制了锁, 返回未获取到锁
return null;
}
return null;
}

AbstractCache

使用了模板方法模式, 实现了Cache接口, 支持了各种事件发布的功能(CacheEvent的实现), 并且对外暴露abstract方法, 供不同Cache实现自己的新增, 插入, 移除等操作

实现了同步加载的静态方法synchronizedLoad, 目的是为了支持computeIfAbsent的逻辑, computeIfAbsent会在查找缓存不到的时候, 自动执行load, 但是会有缓存穿透的风险, 大量请求可能导致一些问题(load操作很耗资源的情况下)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// 支持computeIfAbsent方法
static <K, V> V computeIfAbsentImpl(K key, Function<K, V> loader, boolean cacheNullWhenLoaderReturnNull,
long expireAfterWrite, TimeUnit timeUnit, Cache<K, V> cache) {
// 这是个强转型
AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
// 使用了函数式接口, 并使用了代理模式,创建了一个缓存load代理对象,用于记录load耗时
CacheLoader<K, V> newLoader = CacheUtil.createProxyLoader(cache, loader, abstractCache::notify);
CacheGetResult<V> r;
if (cache instanceof RefreshCache) {
// RefreshCache暂时不看
RefreshCache<K, V> refreshCache = ((RefreshCache<K, V>) cache);
r = refreshCache.GET(key);
refreshCache.addOrUpdateRefreshTask(key, newLoader);
} else {
// 获取一个CacheGetResult
r = cache.GET(key);
}
if (r.isSuccess()) {
// 成功直接返回
return r.getValue();
} else {
// 不成功需要load

// 定义插入缓存的Consumer
Consumer<V> cacheUpdater = (loadedValue) -> {
if(needUpdate(loadedValue, cacheNullWhenLoaderReturnNull, newLoader)) {
if (timeUnit != null) {
cache.PUT(key, loadedValue, expireAfterWrite, timeUnit).waitForResult();
} else {
cache.PUT(key, loadedValue).waitForResult();
}
}
};

V loadedValue;
// 根据配置判断是否需要防穿透
if (cache.config().isCachePenetrationProtect()) {
loadedValue = synchronizedLoad(cache.config(), abstractCache, key, newLoader, cacheUpdater);
} else {
// 如果不防, 先loader出数据, 然后更新缓存
loadedValue = newLoader.apply(key);
cacheUpdater.accept(loadedValue);
}

return loadedValue;
}
}
// 同步load
static <K, V> V synchronizedLoad(CacheConfig config, AbstractCache<K,V> abstractCache,
K key, Function<K, V> newLoader, Consumer<V> cacheUpdater) {
// 获取一个ConcurrentHashMap的单例对象
ConcurrentHashMap<Object, LoaderLock> loaderMap = abstractCache.initOrGetLoaderMap();
// key可用自定义keyConvertor
Object lockKey = buildLoaderLockKey(abstractCache, key);
while (true) {
// 数组标记位, 记录是否被创建
boolean create[] = new boolean[1];
// 往loaderMap尝试插入, 插入成功标记为为true
LoaderLock ll = loaderMap.computeIfAbsent(lockKey, (unusedKey) -> {
create[0] = true;
LoaderLock loaderLock = new LoaderLock();
// LoaderLock对象中有个CountDownLatch, 并记录了当前线程
loaderLock.signal = new CountDownLatch(1);
loaderLock.loaderThread = Thread.currentThread();
return loaderLock;
});
// 如果标记物为true, 或者LoaderLock对象存在, 创建者是本线程, 即支持重入.
if (create[0] || ll.loaderThread == Thread.currentThread()) {
// 执行load
try {
V loadedValue = newLoader.apply(key);
ll.success = true;
ll.value = loadedValue;
// 让传入的cacheUpdater更新缓存
cacheUpdater.accept(loadedValue);
return loadedValue;
} finally {
// 如果loaderMap的写入者是自己,释放资源
if (create[0]) {
ll.signal.countDown();
loaderMap.remove(lockKey);
}
}
} else {
// 如果创建者不是自己, 并且LoaderLock对象不做当前线程所创建的
try {
// 查配置,穿透保护的超时时间, 选择使用哪种await方法来等待
Duration timeout = config.getPenetrationProtectTimeout();
if (timeout == null) {
// 没有则永久等待
ll.signal.await();
} else {
// 有时间的等待
boolean ok = ll.signal.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
if(!ok) {
// 等待CountDownLatch计时结束, 如果时间过了, ll.signal.countDown();还没被调用, 证明是loader太久了, 当前线程就等不及了开始load
return newLoader.apply(key);
}
}
} catch (InterruptedException e) {
// 等待被中断也等不及, 当前线程马上load
return newLoader.apply(key);
}
// 查看ConcurrentHashMap中的LoaderLock中的标记是否成功, 成功表示load完成, 返回LoaderLock对象中的结果, 否则继续下一波循环.
if (ll.success) {
return (V) ll.value;
} else {
continue;
}

}
}
}

AbstractEmbeddedCache

继承自AbstractCache, 加了个InnerMap的成员变量, InnerMap属于适配器模式, 用来屏蔽不同本地缓存的差异性(jetcache目前提供了LinkedHashMapCache和CaffeineCache,如果用的不爽可以自定义一个缓存集合,用InnerMap来适配一下增删改查等方法)

并且AbstractEmbeddedCache引入了CacheValueHolder, 是对缓存中value的包装, 其中包含了过期时间.

CaffeineCache

基于内存的缓存,使用Caffeine

需要引入:

1
2
3
4
5
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.8.1</version>
</dependency>

Caffeine在算法上对本地缓存做了优化, 缓存性能接近理论最优, 采用了一种结合LRU、LFU优点的算法:W-TinyLFU

TinyLfu 回收策略,提供了一个近乎最佳的命中率。当数据的访问模式不随时间变化的时候,LFU的策略能够带来最佳的缓存命中率。然而LFU有两个缺点:首先,它需要给每个记录项维护频率信息,每次访问都需要更新,这是个巨大的开销;其次,如果数据访问模式随时间有变,LFU的频率信息无法随之变化,因此早先频繁访问的记录可能会占据缓存,而后期访问较多的记录则无法被命中。因此,大多数的缓存设计都是基于LRU或者其变种来进行的。相比之下,LRU并不需要维护昂贵的缓存记录元信息,同时也能够反应随时间变化的数据访问模式。然而,在许多负载之下,LRU依然需要更多的空间才能做到跟,FU一致的缓存命中率。因此,一个“现代”的缓存,应当能够综合两者的长处。TinyLFU维护了近期访问记录的频率信息,作为一个过滤器,当新记录来时,只有满足TinyLFU要求的记录才可以被插入缓存。

LinkedHashMapCache

自制的简易内存缓存,没有任何依赖

使用LinkedHashMap实现了一个LRUMap, 缓存过期依赖于Cleaner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Cleaner {
// 内部一个LinkedList,保存每一个LinkedHashMapCache的弱引用
static LinkedList<WeakReference<LinkedHashMapCache>> linkedHashMapCaches = new LinkedList<>();

static {
ScheduledExecutorService executorService = JetCacheExecutor.defaultExecutor();
executorService.scheduleWithFixedDelay(() -> run(), 60, 60, TimeUnit.SECONDS);
}

static void add(LinkedHashMapCache cache) {
synchronized (linkedHashMapCaches) {
linkedHashMapCaches.add(new WeakReference<>(cache));
}
}

static void run() {
synchronized (linkedHashMapCaches) {
// 遍历每个Cache, 执行Cache的cleanExpiredEntry来做定时过期
Iterator<WeakReference<LinkedHashMapCache>> it = linkedHashMapCaches.iterator();
while (it.hasNext()) {
WeakReference<LinkedHashMapCache> ref = it.next();
LinkedHashMapCache c = ref.get();
if (c == null) {
it.remove();
} else {
c.cleanExpiredEntry();
}
}
}
}

}

AbstractExternalCache

外部存储的抽象, 由于是外部的存储, 所以涉及到Value的序列化和反序列化

RedisCache

redis实现,使用jedis客户端

RedisCache可以做读写分离, 并且可以设置每个slave节点的权重, 相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// RedisCache的初始化
public RedisCache(RedisCacheConfig<K, V> config) {
super(config);
this.config = config;
this.valueEncoder = config.getValueEncoder();
this.valueDecoder = config.getValueDecoder();

if (config.getJedisPool() == null) {
throw new CacheConfigException("no pool");
}
// 注意下面config中的配置对其影响
if (config.isReadFromSlave()) {
if (config.getJedisSlavePools() == null || config.getJedisSlavePools().length == 0) {
throw new CacheConfigException("slaves not config");
}
if (config.getSlaveReadWeights() == null) {
initDefaultWeights(config);
} else if (config.getSlaveReadWeights().length != config.getJedisSlavePools().length) {
initDefaultWeights(config);
}
}
if (config.isExpireAfterAccess()) {
throw new CacheConfigException("expireAfterAccess is not supported");
}
}
// 初始化默认权重
private void initDefaultWeights(RedisCacheConfig<K, V> config) {
int len = config.getJedisSlavePools().length;
int[] weights = new int[len];
Arrays.fill(weights, 100);
config.setSlaveReadWeights(weights);
}
// 获取读节点
Pool<Jedis> getReadPool() {
if (!config.isReadFromSlave()) {
return config.getJedisPool();
}
int[] weights = config.getSlaveReadWeights();
int index = randomIndex(weights);
return config.getJedisSlavePools()[index];
}
// 根据权重来
static int randomIndex(int[] weights) {
int sumOfWeights = 0;
for (int w : weights) {
sumOfWeights += w;
}
int r = random.nextInt(sumOfWeights);
int x = 0;
for (int i = 0; i < weights.length; i++) {
x += weights[i];
if(r < x){
return i;
}
}
throw new CacheException("assert false");
}
// 批量插入操作使用了redis的Pipeline
protected CacheResult do_PUT_ALL(Map<? extends K, ? extends V> map, long expireAfterWrite, TimeUnit timeUnit) {
try (Jedis jedis = config.getJedisPool().getResource()) {
int failCount = 0;
List<Response<String>> responses = new ArrayList<>();
// Pipeline
Pipeline p = jedis.pipelined();
for (Map.Entry<? extends K, ? extends V> en : map.entrySet()) {
CacheValueHolder<V> holder = new CacheValueHolder(en.getValue(), timeUnit.toMillis(expireAfterWrite));
Response<String> resp = p.psetex(buildKey(en.getKey()), timeUnit.toMillis(expireAfterWrite), valueEncoder.apply(holder));
responses.add(resp);
}
// 同步
p.sync();
for (Response<String> resp : responses) {
if(!"OK".equals(resp.get())){
failCount++;
}
}
return failCount == 0 ? CacheResult.SUCCESS_WITHOUT_MSG :
failCount == map.size() ? CacheResult.FAIL_WITHOUT_MSG : CacheResult.PART_SUCCESS_WITHOUT_MSG;
} catch (Exception ex) {
return new CacheResult(ex);
}
}
RedisLettuceCache

redis实现,使用lettuce客户端

redis操作的lettuce客户端版本, 本文没啥好说的, 有时间写篇介绍lettuce.

Jedis是直连模式,在多个线程间共享一个Jedis实例时是线程不安全的,可以通过创建多个Jedis实例来解决,但当连接数量增多时,物理连接成本就较高同时会影响性能,因此较好的解决方法是使用JedisPool。
Lettuce的连接是基于Netty的,连接实例可以在多个线程间共享,Netty可以使多线程的应用使用同一个连接实例,而不用担心并发线程的数量。通过异步的方式可以让我们更好地利用系统资源。

MultiLevelCache

多级缓存,注解方式配置只支持了两级,实际上这个类支持N级

内部有个Cache数组, 从这里可以知道其是对Cache的包装, 并支持多个Cache, 子Cache不允许配置load

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public MultiLevelCache(MultiLevelCacheConfig<K, V> cacheConfig) throws CacheConfigException {
this.config = cacheConfig;
this.caches = cacheConfig.getCaches().toArray(new Cache[]{});
checkCaches();
}
private void checkCaches() {
if (caches == null || caches.length == 0) {
throw new IllegalArgumentException();
}
for (Cache c : caches) {
// 配置了会报错
if (c.config().getLoader() != null) {
throw new CacheConfigException("Loader on sub cache is not allowed, set the loader into MultiLevelCache.");
}
}
}

取数据时, 按照顺序从Cache数组中的各个元素中取, 如果上级没有命中, 下级命中了, 会有一个往上级同步的操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
protected CacheGetResult<V> do_GET(K key) {
for (int i = 0; i < caches.length; i++) {
Cache cache = caches[i];
CacheGetResult result = cache.GET(key);
if (result.isSuccess()) {
CacheValueHolder<V> holder = unwrapHolder(result.getHolder());
checkResultAndFillUpperCache(key, i, holder);
return new CacheGetResult(CacheResultCode.SUCCESS, null, holder);
}
}
return CacheGetResult.NOT_EXISTS_WITHOUT_MSG;
}
// i是索引, PUT_caches会插入i的所有上级Cache
private void checkResultAndFillUpperCache(K key, int i, CacheValueHolder<V> h) {
Objects.requireNonNull(h);
long currentExpire = h.getExpireTime();
long now = System.currentTimeMillis();
if (now <= currentExpire) {
if(config.isUseExpireOfSubCache()){
PUT_caches(i, key, h.getValue(), 0, null);
} else {
long restTtl = currentExpire - now;
if (restTtl > 0) {
PUT_caches(i, key, h.getValue(), restTtl, TimeUnit.MILLISECONDS);
}
}
}
}
// 多个cache的put可能会某些成功,某些失败, 这里用到CompletableFuture的thenCombine进行结合, 如果发现结果码不同则返回半成功状态的ResultData
// lastIndex表示 更新lastIndex之前的所有缓存
private CacheResult PUT_caches(int lastIndex, K key, V value, long expire, TimeUnit timeUnit) {
CompletableFuture<ResultData> future = CompletableFuture.completedFuture(null);
for (int i = 0; i < lastIndex; i++) {
Cache cache = caches[i];
CacheResult r;
if (timeUnit == null) {
r = cache.PUT(key, value);
} else {
r = cache.PUT(key, value, expire, timeUnit);
}
future = combine(future, r);
}
return new CacheResult(future);
}
private CompletableFuture<ResultData> combine(CompletableFuture<ResultData> future, CacheResult result) {
return future.thenCombine(result.future(), (d1, d2) -> {
if (d1 == null) {
return d2;
}
if (d1.getResultCode() != d2.getResultCode()) {
return new ResultData(CacheResultCode.PART_SUCCESS, null, null);
}
return d1;
});
}

CacheResultCode.PART_SUCCESS 在 MultiLevelCache 下会出现, 表示操作多级缓存时, 某一部分没有操作成功

MultiLevelCache的tryLock仅仅使用最后一级Cache的tryLock

1
2
3
4
5
6
public AutoReleaseLock tryLock(K key, long expire, TimeUnit timeUnit) {
if (key == null) {
return null;
}
return caches[caches.length - 1].tryLock(key, expire, timeUnit);
}

MultiLevelCache不支持putIfAbsent和do_PUT_IF_ABSENT方法, 会报错.

1
2
3
4
5
6
7
8
@Override
public boolean putIfAbsent(K key, V value) {
throw new UnsupportedOperationException("putIfAbsent is not supported by MultiLevelCache");
}
@Override
protected CacheResult do_PUT_IF_ABSENT(K key, V value, long expireAfterWrite, TimeUnit timeUnit) {
throw new UnsupportedOperationException("PUT_IF_ABSENT is not supported by MultiLevelCache");
}

ProxyCache

ProxyCache接口继承了Cache接口, 作用是代理Cache实现一些牛批的功能, 例如下面

LoadingCache

基于Decorator模式,提供自动加载功能

主要是对get和getAll两个方法的处理, 如果配置了load, 这两方法将会在娶不到数据的时候自动load

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Override
public V get(K key) throws CacheInvokeException {
CacheLoader<K, V> loader = config.getLoader();
if (loader != null) {
return AbstractCache.computeIfAbsentImpl(key, loader,
config.isCacheNullValue() ,0, null, this);
} else {
// 没配置load不会报错, 而是走被包装cache的get
return cache.get(key);
}
}
@Override
public Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
CacheLoader<K, V> loader = config.getLoader();
if (loader != null) {
// 先取批量的结果
MultiGetResult<K, V> r = GET_ALL(keys);
Map<K, V> kvMap;
if (r.isSuccess() || r.getResultCode() == CacheResultCode.PART_SUCCESS) {
// 成功或者半成功取出 所有成功了的结果
kvMap = r.unwrapValues();
} else {
kvMap = new HashMap<>();
}
// 找出没有成功的key的集合, 这些需要被load
Set<K> keysNeedLoad = new LinkedHashSet<>();
keys.forEach((k) -> {
if (!kvMap.containsKey(k)) {
keysNeedLoad.add(k);
}
});
// 校验穿透保护开关
if (!config.isCachePenetrationProtect()) {
// 直接找出需要update的Values, 一个PUT_ALL方法load进去
if (eventConsumer != null) {
loader = CacheUtil.createProxyLoader(cache, loader, eventConsumer);
}
Map<K, V> loadResult;
try {
loadResult = loader.loadAll(keysNeedLoad);

CacheLoader<K, V> theLoader = loader;
Map<K, V> updateValues = loadResult.entrySet().stream()
.filter(kvEntry -> needUpdate(kvEntry.getValue(), theLoader))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

// batch put
if (!updateValues.isEmpty()) {
PUT_ALL(updateValues);
}
} catch (Throwable e) {
throw new CacheInvokeException(e);
}
kvMap.putAll(loadResult);
} else {
// 循环keysNeedLoad, 依次调AbstractCache.synchronizedLoad方法来做同步load
AbstractCache<K, V> abstractCache = CacheUtil.getAbstractCache(cache);
loader = CacheUtil.createProxyLoader(cache, loader, eventConsumer);
for(K key : keysNeedLoad) {
Consumer<V> cacheUpdater = (v) -> {
if(needUpdate(v, config.getLoader())) {
PUT(key, v);
}
};
V v = AbstractCache.synchronizedLoad(config, abstractCache, key, loader, cacheUpdater);
kvMap.put(key, v);
}
}
return kvMap;
} else {
return cache.getAll(keys);
}

}

RefreshCache

基于Decorator模式,提供自动刷新功能

继承自LoadingCache, 即他的get和getAll能够自动load, 除此之外, 他自己做了这些工作:

使用ConcurrentHashMap维护了一个刷新任务集合, 为了方便批量操作,例如停止刷新, 并且用boolean字段标识代理的Cache是不是一个MultiLevelCache(多级缓存和单级的处理逻辑不一样,下面关注下)

RefreshCache同样也是对get和getAll做了增强, 添加了addOrUpdateRefreshTask的操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public V get(K key) throws CacheInvokeException {
if (config.getRefreshPolicy() != null && hasLoader()) {
addOrUpdateRefreshTask(key, null);
}
return super.get(key);
}

@Override
public Map<K, V> getAll(Set<? extends K> keys) throws CacheInvokeException {
if (config.getRefreshPolicy() != null && hasLoader()) {
for (K key : keys) {
addOrUpdateRefreshTask(key, null);
}
}
return super.getAll(keys);
}

addOrUpdateRefreshTask 主要做了针对一个缓存的key, 创建一个RefreshTask任务, RefreshTask继承了Runnable, 它是个可被执行的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void addOrUpdateRefreshTask(K key, CacheLoader<K,V> loader) {
// 看看有没有配置刷新策略, 没有策略不会刷新, 这点注意别忘记配置了
RefreshPolicy refreshPolicy = config.getRefreshPolicy();
if (refreshPolicy == null) {
return;
}
long refreshMillis = refreshPolicy.getRefreshMillis();
// 刷新时间不能小于等于0, 否则也不会刷新
if (refreshMillis > 0) {
Object taskId = getTaskId(key);
RefreshTask refreshTask = taskMap.computeIfAbsent(taskId, tid -> {
logger.debug("add refresh task. interval={}, key={}", refreshMillis , key);
RefreshTask task = new RefreshTask(taskId, key, loader);
task.lastAccessTime = System.currentTimeMillis();
// 有一个单例的线程池heavyIOExecutor, 定时处理task
ScheduledFuture<?> future = JetCacheExecutor.heavyIOExecutor().scheduleWithFixedDelay(
task, refreshMillis, refreshMillis, TimeUnit.MILLISECONDS);
task.future = future;
return task;
});
// 更新最后请求时间
refreshTask.lastAccessTime = System.currentTimeMillis();
}
}

从上可知道, 一个支持刷新的缓存, 只有get或者getAll方法调用后才会去启动刷新任务, 这样做避免了无谓的资源浪费, 即一个可刷新缓存被put后, 一直没有被获取, 那么他不会自动刷新.

具体的刷新逻辑

RefreshTask的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public void run() {
try {
// 没有刷新策略或没有配置load不会跑任何load逻辑
if (config.getRefreshPolicy() == null || (loader == null && !hasLoader())) {
cancel();
return;
}
long now = System.currentTimeMillis();
long stopRefreshAfterLastAccessMillis = config.getRefreshPolicy().getStopRefreshAfterLastAccessMillis();
// 通过这个配置,如果一个key很久没有被请求了,超过阈值,停止刷新任务, 以免浪费资源
if (stopRefreshAfterLastAccessMillis > 0) {
if (lastAccessTime + stopRefreshAfterLastAccessMillis < now) {
cancel();
return;
}
}
// 如果是多级缓存, 会取最后一级缓存来更新, 最后一级缓存会驱动所有上级缓存更新.
Cache concreteCache = concreteCache();
if (concreteCache instanceof AbstractExternalCache) {
// 如果是三方
externalLoad(concreteCache, now);
} else {
load();
}
} catch (Throwable e) {
logger.error("refresh error: key=" + key, e);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private void externalLoad(final Cache concreteCache, final long currentTime) throws Throwable {
// 需要更新的缓存的key, 因为是三方的缓存, 所以要用buildKey生成
byte[] newKey = ((AbstractExternalCache) concreteCache).buildKey(key);
// 构建锁key
byte[] lockKey = combine(newKey, "_#RL#".getBytes());
long loadTimeOut = RefreshCache.this.config.getRefreshPolicy().getRefreshLockTimeoutMillis();
long refreshMillis = config.getRefreshPolicy().getRefreshMillis();
// 构建时间key
byte[] timestampKey = combine(newKey, "_#TS#".getBytes());

// AbstractExternalCache buildKey method will not convert byte[]
CacheGetResult refreshTimeResult = concreteCache.GET(timestampKey);
boolean shouldLoad = false;
if (refreshTimeResult.isSuccess()) {
// 当前时间 大于 刷新时间戳加刷新时间 表示很久没刷新了, 需要load一下, 否则不需要
shouldLoad = currentTime >= Long.parseLong(refreshTimeResult.getValue().toString()) + refreshMillis;
} else if (refreshTimeResult.getResultCode() == CacheResultCode.NOT_EXISTS) {
// 第一次刷新, get不到refreshTimeResult 也需要load
shouldLoad = true;
}
// 不需要load
if (!shouldLoad) {
if (multiLevelCache) {
// 如果是多级缓存,需要刷新上级所有缓存
refreshUpperCaches(key);
}
return;
}
// 定义一个runable对象, 内部执行load, 并且load后刷新最后一次load时间到timestampKey
Runnable r = () -> {
try {
load();
// AbstractExternalCache buildKey method will not convert byte[]
concreteCache.put(timestampKey, String.valueOf(System.currentTimeMillis()));
} catch (Throwable e) {
throw new CacheException("refresh error", e);
}
};

// AbstractExternalCache buildKey method will not convert byte[]
// 尝试获取锁, 并且执行加载操作
boolean lockSuccess = concreteCache.tryLockAndRun(lockKey, loadTimeOut, TimeUnit.MILLISECONDS, r);
if(!lockSuccess && multiLevelCache) {
// 多级缓存如果没有load成功, 延迟一会会, 执行refreshUpperCaches刷新上级的缓存
// 因为获取lockKey失败表示其他的实例可能在做refresh, 可能导致本地缓存错误, 延迟的目的是为了等待其他实例完成, 以免更新旧的值,
// 如果refreshMillis配置有问题, 可能导致某些服务实例上的缓存有误差.
JetCacheExecutor.heavyIOExecutor().schedule(
() -> refreshUpperCaches(key), (long)(0.2 * refreshMillis), TimeUnit.MILLISECONDS);
}
}

private void load() throws Throwable {
// 通过load来获取最新值, 然后PUT到对应的key中
CacheLoader<K,V> l = loader == null? config.getLoader(): loader;
if (l != null) {
l = CacheUtil.createProxyLoader(cache, l, eventConsumer);
V v = l.load(key);
if (needUpdate(v, l)) {
cache.PUT(key, v);
}
}
}

CacheMonitor与CacheStat

CacheMonitor主要的作用是监听CacheEvent, jetCache实现了一个默认的CacheMonitor—–DefaultCacheMonitor;

DefaultCacheMonitor 里面包含了CacheStat的成员变量, CacheStat用来描述jetCache管理的缓存属性信息

一个Cache对应一个DefaultCacheMonitor, 一个DefaultCacheMonitor对应一个CacheStat, 每有个CacheEvent的发布, 都会通知到CacheMonitor, 并更新CacheStat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class CacheStat implements Serializable, Cloneable {

private static final long serialVersionUID = -8802969946750554026L;

protected String cacheName;
protected long statStartTime;
protected long statEndTime;

protected long getCount;
protected long getHitCount;
protected long getMissCount;
protected long getFailCount;
protected long getExpireCount;
protected long getTimeSum;
protected long minGetTime = Long.MAX_VALUE;
protected long maxGetTime = 0;

protected long putCount;
protected long putSuccessCount;
protected long putFailCount;
protected long putTimeSum;
protected long minPutTime = Long.MAX_VALUE;
protected long maxPutTime = 0;

protected long removeCount;
protected long removeSuccessCount;
protected long removeFailCount;
protected long removeTimeSum;
protected long minRemoveTime = Long.MAX_VALUE;
protected long maxRemoveTime = 0;

protected long loadCount;
protected long loadSuccessCount;
protected long loadFailCount;
protected long loadTimeSum;
protected long minLoadTime = Long.MAX_VALUE;
protected long maxLoadTime = 0;
// 其他的略
}

CacheMonitorManager

CacheMonitor的管理者, 使用者可以通过其添加自定义的Monitor, jetcache有个默认的实现DefaultCacheMonitorManager

对于多级缓存, DefaultCacheMonitorManager会新增两种Monitor

针对多级缓存, DefaultCacheMonitorManager仅仅对前两个缓存做了监控.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override
public void addMonitors(String area, String cacheName, Cache cache) {
// MetricsMonitor
addMetricsMonitor(area, cacheName, cache);
// CacheUpdateMonitor
addCacheUpdateMonitor(area, cacheName, cache);
}

protected void addMetricsMonitor(String area, String cacheName, Cache cache) {
if (defaultMetricsManager != null) {
cache = CacheUtil.getAbstractCache(cache);
if (cache instanceof MultiLevelCache) {
MultiLevelCache mc = (MultiLevelCache) cache;
if (mc.caches().length == 2) {
// 仅仅监控前面俩, 如果三级或更多, 不会有CacheMonitor
Cache local = mc.caches()[0];
Cache remote = mc.caches()[1];
DefaultCacheMonitor localMonitor = new DefaultCacheMonitor(cacheName + "_local");
local.config().getMonitors().add(localMonitor);
DefaultCacheMonitor remoteMonitor = new DefaultCacheMonitor(cacheName + "_remote");
remote.config().getMonitors().add(remoteMonitor);
defaultMetricsManager.add(localMonitor, remoteMonitor);
}
}
// 有个默认的CacheMonitor
DefaultCacheMonitor monitor = new DefaultCacheMonitor(cacheName);
cache.config().getMonitors().add(monitor);
defaultMetricsManager.add(monitor);
}
}
// 可以自己实现 cacheMessagePublisher 把缓存的事件发布到其他地方去
protected void addCacheUpdateMonitor(String area, String cacheName, Cache cache) {
if (cacheMessagePublisher != null) {
CacheMonitor monitor = event -> {
if (event instanceof CachePutEvent) {
CacheMessage m = new CacheMessage();
CachePutEvent e = (CachePutEvent) event;
m.setType(CacheMessage.TYPE_PUT);
m.setKeys(new Object[]{e.getKey()});
cacheMessagePublisher.publish(area, cacheName, m);
} else if (event instanceof CacheRemoveEvent) {
CacheMessage m = new CacheMessage();
CacheRemoveEvent e = (CacheRemoveEvent) event;
m.setType(CacheMessage.TYPE_REMOVE);
m.setKeys(new Object[]{e.getKey()});
cacheMessagePublisher.publish(area, cacheName, m);
} else if (event instanceof CachePutAllEvent) {
CacheMessage m = new CacheMessage();
CachePutAllEvent e = (CachePutAllEvent) event;
m.setType(CacheMessage.TYPE_PUT_ALL);
if (e.getMap() != null) {
m.setKeys(e.getMap().keySet().toArray());
}
cacheMessagePublisher.publish(area, cacheName, m);
} else if (event instanceof CacheRemoveAllEvent) {
CacheMessage m = new CacheMessage();
CacheRemoveAllEvent e = (CacheRemoveAllEvent) event;
m.setType(CacheMessage.TYPE_REMOVE_ALL);
if (e.getKeys() != null) {
m.setKeys(e.getKeys().toArray());
}
cacheMessagePublisher.publish(area, cacheName, m);
}
};
cache.config().getMonitors().add(monitor);
}
}

DefaultMetricsManager

默认的指标监控器

内部有个CopyOnWriteArrayList来保存需要监控的DefaultCacheMonitor, 有个ScheduledFuture用来定时扫描DefaultCacheMonitor, 可以通过参数resetTime和resetTimeUnit调整ScheduledFuture的执行间隔. 并且有个Consumer,用来定义如何处理每个缓存中的CacheStat信息, 默认的处理方式是StatInfoLogger,即日志打印出来. jetCache可以定时打印日志, 公布缓存使用情况.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class DefaultMetricsManager {
private static final Logger logger = LoggerFactory.getLogger(DefaultMetricsManager.class);

protected CopyOnWriteArrayList< DefaultCacheMonitor> monitorList = new CopyOnWriteArrayList();

private ScheduledFuture<?> future;

private int resetTime;
private TimeUnit resetTimeUnit;
private Consumer<StatInfo> metricsCallback;
// ... 省略某些方法

// 构造一个"根据monitorList来构造一个StatInfo, 最终Consumer来处理"的run流程
Runnable cmd = new Runnable() {
private long time = System.currentTimeMillis();

@Override
public void run() {
try {
List<CacheStat> stats = monitorList.stream().map((m) -> {
CacheStat stat = m.getCacheStat();
m.resetStat();
return stat;
}).collect(Collectors.toList());

long endTime = System.currentTimeMillis();
StatInfo statInfo = new StatInfo();
statInfo.setStartTime(time);
statInfo.setEndTime(endTime);
statInfo.setStats(stats);
time = endTime;

metricsCallback.accept(statInfo);
} catch (Exception e) {
logger.error("jetcache DefaultMetricsManager error", e);
}
}
};
// 开始执行周期性定时任务
@PostConstruct
public synchronized void start() {
if (future != null) {
return;
}
long delay = firstDelay(resetTime, resetTimeUnit);
future = JetCacheExecutor.defaultExecutor().scheduleAtFixedRate(
cmd, delay, resetTimeUnit.toMillis(resetTime), TimeUnit.MILLISECONDS);
logger.info("cache stat period at " + resetTime + " " + resetTimeUnit);
}
// ... 省略某些方法
}

与Spring的对接

jetcache继承了AbstractLifecycle并实现了Spring的ApplicationContextAware, 启动时将会自动执行onInit()方法, onInit()方法将会执行initDefaultCacheMonitorInstaller传入默认的metricsCallback(就是个StatInfoLogger), 并执行init方法, DefaultCacheMonitorManager中的DefaultMetricsManager将会被start(), start()内会启动一个周期性定时线程池来扫描DefaultCacheMonitor中的CacheStat, 最后转化成StatInfo, 交给metricsCallback去处理.

上图:

screenshot-20210410-224344.png

screenshot-20210410-224436.png

screenshot-20210410-224459.png

sscreenshot-20210410-224543.png

sscreenshot-20210410-224626.png