前言

目前大部分互联网架构 Cache 已经成为了必可不少的一环。常用的方案有大家熟知的 NoSQL 数据库(Redis、Memcached),也有大量的进程内缓存比如 EhCache 、Guava Cache、Caffeine 等。

本系列文章会选取本地缓存和分布式缓存(NoSQL)的优秀框架比较他们各自的优缺点、应用场景、项目中的最佳实践以及原理分析。本文主要针对本地 Cache 的老大哥 Guava Cache 进行介绍和分析。

基本用法

Guava Cache 通过简单好用的 Client 可以快速构造出符合需求的 Cache 对象,不需要过多复杂的配置,大多数情况就像构造一个 POJO 一样的简单。这里介绍两种构造 Cache 对象的方式:CacheLoaderCallable

CacheLoader

构造 LoadingCache 的关键在于实现 load 方法,也就是在需要 访问的缓存项不存在的时候 Cache 会自动调用 load 方法将数据加载到 Cache 中。这里你肯定会想假如有多个线程过来访问这个不存在的缓存项怎么办,也就是缓存的并发问题如何怎么处理是否需要人工介入,这些在下文中也会介绍到。

除了实现 load 方法之外还可以配置缓存相关的一些性质,比如过期加载策略、刷新策略 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static final LoadingCache<String, String> CACHE = CacheBuilder
.newBuilder()
// 最大容量为 100 超过容量有对应的淘汰机制,下文详述
.maximumSize(100)
// 缓存项写入后多久过期,下文详述
.expireAfterWrite(60 * 5, TimeUnit.SECONDS)
// 缓存写入后多久自动刷新一次,下文详述
.refreshAfterWrite(60, TimeUnit.SECONDS)
// 创建一个 CacheLoader,load 表示缓存不存在的时候加载到缓存并返回
.build(new CacheLoader<String, String>() {
// 加载缓存数据的方法
@Override
public String load(String key) {
return "cache [" + key + "]";
}
});

public void getTest() throws Exception {
CACHE.get("KEY_25487");
}

Callable


除了在构造 Cache 对象的时候指定 load 方法来加载缓存外,我们亦可以在获取缓存项时指定载入缓存的方法,并且可以根据使用场景在不同的位置采用不同的加载方式。

比如在某些位置可以通过二级缓存加载不存在的缓存项,而有些位置则可以直接从 DB 加载缓存项。

1
2
3
4
5
6
7
8
9
10
// 注意返回值是 Cache
private static final Cache<String, String> SIMPLE_CACHE = CacheBuilder
.newBuilder()
.build();

public void getTest1() throws Exception {
String key = "KEY_25487";
// get 缓存项的时候指定 callable 加载缓存项
SIMPLE_CACHE.get(key, () -> "cache [" + key + "]");
}

缓存项加载机制


如果某个缓存过期了或者缓存项不存在于缓存中,而恰巧此此时有大量请求过来请求这个缓存项,如果没有保护机制就会导致大量的线程同时请求数据源加载数据并生成缓存项,这就是所谓的 “缓存击穿”

举个简单的例子,某个时刻有 100 个请求同时请求 KEY_25487 这个缓存项,而不巧这个缓存项刚好失效了,那么这 100 个线程(如果有这么多机器和流量的话)就会同时从 DB 加载这个数据,很可怕的点在于就算某一个线程率先获取到数据生成了缓存项,其他的线程还是继续请求 DB 而不会走到缓存

【缓存击穿图例】


看到上面这个图或许你已经有方法解这个问题了,如果多个线程过来如果我们 只让一个线程去加载数据生成缓存项,其他线程等待然后读取生成好的缓存项 岂不是就完美解决。那么恭喜你在这个问题上,和 Google 工程师的思路是一致的。不过采用这个方案,问题是解了但没有完全解,后面会说到它的缺陷。

其实 Guava Cache 在 load 的时候做了并发控制,在多个线程请求一个不存在或者过期的缓存项时保证只有一个线程进入 load 方法,其他线程等待直到缓存项被生成,这样就避免了大量的线程击穿缓存直达 DB 。不过试想下如果有上万 QPS 同时过来会有大量的线程阻塞导致线程无法释放,甚至会出现线程池满的尴尬场景,这也是说为什么这个方案解了 “缓存击穿” 问题但又没完全解。

上述机制其实就是 expireAfterWrite/expireAfterAccess 来控制的,如果你配置了过期策略对应的缓存项在过期后被访问就会走上述流程来加载缓存项。

缓存项刷新机制


缓存项的刷新和加载看起来是相似的,都是让缓存数据处于最新的状态。区别在于:

  • 缓存项加载是一个被动 的过程,而 缓存刷新是一个主动触发 动作。如果缓存项不存在或者过期只有下次 get 的时候才会触发新值加载。而缓存刷新则更加主动替换缓存中的老值。
  • 另外一个很重要点的在于,缓存刷新的项目一定是存在缓存中 的,他是对老值的替换而非是对 NULL 值的替换。

由于缓存项刷新的前提是该缓存项存在于缓存中,那么缓存的刷新就不用像缓存加载的流程一样让其他线程等待而是允许一个线程去数据源获取数据,其他线程都先返回老值直到异步线程生成了新缓存项

这个方案完美解决了上述遇到的 “缓存击穿” 问题,不过 他的前提是已经生成缓存项了 。在实际生产情况下我们可以做 缓存预热 ,提前生成缓存项,避免流量洪峰造成的线程堆积。

这套机制在 Guava Cache 中是通过 refreshAfterWrite 实现的,在配置刷新策略后,对应的缓存项会按照设定的时间定时刷新,避免线程阻塞的同时保证缓存项处于最新状态。

但他也不是完美的,比如他的限制是缓存项已经生成,并且 如果恰巧你运气不好,大量的缓存项同时需要刷新或者过期, 就会有大量的线程请求 DB,这就是常说的 “缓存血崩”

缓存项异步刷新机制


上面说到缓存项大面积失效或者刷新会导致雪崩,那么就只能限制访问 DB 的数量了,位置有三个地方:

  1. 源头:因为加载缓存的线程就是前台请求线程,所以如果 控制请求线程数量 的确是减少大面积失效对 DB 的请求,那这样一来就不存在高并发请求,就算不用缓存都可以。

  2. 中间层缓冲:因为请求线程和访问 DB 的线程是同一个,假如在 中间加一层缓冲,通过一个后台线程池去异步刷新缓存 所有请求线程直接返回老值,这样对于 DB 的访问的流量就可以被后台线程池的池大小控住。

  3. 底层:直接 控 DB 连接池的池大小,这样访问 DB 的连接数自然就少了,但是如果大量请求到连接池发现获取不到连接程序一样会出现连接池满的问题,会有大量连接被拒绝的异常。

所以比较合适的方式是通过添加一个异步线程池异步刷新数据,在 Guava Cache 中实现方案是重写 CacheLoader 的 reload 方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final LoadingCache<String, String> ASYNC_CACHE = CacheBuilder.newBuilder()
.build(
CacheLoader.asyncReloading(new CacheLoader<String, String>() {
@Override
public String load(String key) {
return key;
}

@Override
public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
return super.reload(key, oldValue);
}
}, new ThreadPoolExecutor(5, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>()))
);

LocalCache 源码分析


先整体看下 Cache 的类结构,下面的这些子类表示了不同的创建方式本质还都是 LocalCache

【Cache 类图】

核心代码都在 LocalCache 这个文件中,并且通过这个继承关系可以看出 Guava Cache 的本质就是 ConcurrentMap。

【LocalCache 继承与实现】

在看源码之前先理一下流程,先理清思路。如果想直接看源码理解流程可以先跳过这张图 ~

【 get 缓存数据流程图】

这里核心理一下 Get 的流程,put 阶段比较简单就不做分析了。

LocalCache#get


1
2
3
4
5
V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
int hash = hash(checkNotNull(key));
// 根据 hash 获取对应的 segment 然后从 segment 获取具体值
return segmentFor(hash).get(key, hash, loader);
}

Segment#get


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
// count 表示在这个 segment 中存活的项目个数
if (count != 0) {
// 获取 segment 中的元素 (ReferenceEntry) 包含正在 load 的数据
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
// 获取缓存值,如果是 load,invalid,expired 返回 null,同时检查是否过期了,过期移除并返回 null
V value = getLiveValue(e, now);
if (value != null) {
// 记录访问时间
recordRead(e, now);
// 记录缓存命中一次
statsCounter.recordHits(1);
// 刷新缓存并返回缓存值 ,后面展开
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
// 如果在 loading 等着 ,后面展开
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}

// 走到这说明从来没写入过值 或者 值为 null 或者 过期(数据还没做清理),后面展开
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}

Segment#scheduleRefresh


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// com.google.common.cache.LocalCache.Segment#scheduleRefresh

V scheduleRefresh(
ReferenceEntry<K, V> entry,
K key,
int hash,
V oldValue,
long now,
CacheLoader<? super K, V> loader) {

if (
// 配置了刷新策略 refreshAfterWrite
map.refreshes()
// 到刷新时间了
&& (now - entry.getWriteTime() > map.refreshNanos)
// 没在 loading
&& !entry.getValueReference().isLoading()) {
// 开始刷新,下面展开
V newValue = refresh(key, hash, loader, true);
if (newValue != null) {
return newValue;
}
}
return oldValue;
}


// com.google.common.cache.LocalCache.Segment#refresh

V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
// 插入 loading 节点
final LoadingValueReference<K, V> loadingValueReference =
insertLoadingValueReference(key, hash, checkTime);

if (loadingValueReference == null) {
return null;
}

// 异步刷新,下面展开
ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
if (result.isDone()) {
try {
return Uninterruptibles.getUninterruptibly(result);
} catch (Throwable t) {
// don't let refresh exceptions propagate; error was already logged
}
}
return null;
}

// com.google.common.cache.LocalCache.Segment#loadAsync

ListenableFuture<V> loadAsync(
final K key,
final int hash,
final LoadingValueReference<K, V> loadingValueReference,
CacheLoader<? super K, V> loader) {
// 通过 loader 异步加载数据,下面展开
final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
loadingFuture.addListener(
new Runnable() {
@Override
public void run() {
try {
getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
loadingValueReference.setException(t);
}
}
},
directExecutor());
return loadingFuture;
}

// com.google.common.cache.LocalCache.LoadingValueReference#loadFuture

public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
try {
stopwatch.start();
// oldValue 指在写入 loading 节点前这个位置的值,如果这个位置之前没有值 oldValue 会被赋值为 UNSET
// UNSET.get() 值为 null ,所以这个缓存项从来没有进入缓存需要同步 load 具体原因前面提到了,如果通过
// 异步 reload ,由于没有老值会导致其他线程返回的都是 null
V previousValue = oldValue.get();
if (previousValue == null) {
V newValue = loader.load(key);
return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
}
// 异步 load
ListenableFuture<V> newValue = loader.reload(key, previousValue);
if (newValue == null) {
return Futures.immediateFuture(null);
}
// To avoid a race, make sure the refreshed value is set into loadingValueReference
// *before* returning newValue from the cache query.
return transform(
newValue,
new com.google.common.base.Function<V, V>() {
@Override
public V apply(V newValue) {
LoadingValueReference.this.set(newValue);
return newValue;
}
},
directExecutor());
} catch (Throwable t) {
ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
return result;
}
}

Segment#waitForLoadingValue


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
V waitForLoadingValue(ReferenceEntry<K, V> e, K key, ValueReference<K, V> valueReference)
throws ExecutionException {
// 首先你要是一个 loading 节点
if (!valueReference.isLoading()) {
throw new AssertionError();
}

checkState(!Thread.holdsLock(e), "Recursive load of: %s", key);
// don't consider expiration as we're concurrent with loading
try {
V value = valueReference.waitForValue();
if (value == null) {
throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
}
// re-read ticker now that loading has completed
long now = map.ticker.read();
recordRead(e, now);
return value;
} finally {
statsCounter.recordMisses(1);
}
}

// com.google.common.cache.LocalCache.LoadingValueReference#waitForValue

public V waitForValue() throws ExecutionException {
return getUninterruptibly(futureValue);
}

// com.google.common.util.concurrent.Uninterruptibles#getUninterruptibly

public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
boolean interrupted = false;
try {
while (true) {
try {
// hang 住,如果该线程被打断了继续回去 hang 住等结果,直到有结果返回
return future.get();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

Segment#lockedGetOrLoad


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;

// 要对 segment 写操作 ,先加锁
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);

// 这里基本就是 HashMap 的代码,如果没有 segment 的数组下标冲突了就拉一个链表
int newCount = this.count - 1;
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);

for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash
&& entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();

// 如果在加载中 不做任何处理
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
// 如果缓存项为 null 数据已经被删除,通知对应的 queue
if (value == null) {
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
// 这个是 double check 如果缓存项过期 数据没被删除,通知对应的 queue
} else if (map.isExpired(e, now)) {
// This is a duplicate check, as preWriteCleanup already purged expired
// entries, but let's accommodate an incorrect expiration queue.
enqueueNotification(
entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
// 再次看到的时候这个位置有值了直接返回
} else {
recordLockedRead(e, now);
statsCounter.recordHits(1);
return value;
}

// immediately reuse invalid entries
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}

// 没有 loading ,创建一个 loading 节点
if (createNewEntry) {
loadingValueReference = new LoadingValueReference<>();

if (e == null) {
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
unlock();
postWriteCleanup();
}

if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
statsCounter.recordMisses(1);
}
} else {
// The entry already exists. Wait for loading.
return waitForLoadingValue(e, key, valueReference);
}
}

总结


结合上面图以及源码我们发现在整个流程中 GuavaCache 是没有额外的线程去做数据清理和刷新的,基本都是通过 Get 方法来触发这些动作 ,减少了设计的复杂性和降低了系统开销。

简单回顾下 Get 的流程以及在每个阶段做的事情,返回的值。首先判断缓存是否过期然后判断是否需要刷新,如果过期了就调用 loading 去同步加载数据(其他线程阻塞),如果是仅仅需要刷新调用 reloading 异步加载(其他线程返回老值)。

所以如果 refreshTime > expireTime 意味着永远走不到缓存刷新逻辑,缓存刷新是为了在缓存有效期内尽量保证缓存数据一致性所以在配置刷新策略和过期策略时一定保证 refreshTime < expireTime 。

最后关于 Guava Cache 的使用建议 (最佳实践) :

  1. 如果刷新时间配置的较短一定要重载 reload 异步加载数据的方法,传入一个自定义线程池保护 DB
  2. 失效时间一定要大于刷新时间
  3. 如果是常驻内存的一些少量数据失效时间可以配置的较长刷新时间配置短一点 (根据业务对缓存失效容忍度)

SpringFu DSL 设计思想

让我们再回头看看这个启动类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Application {

public static JafuApplication app = Jafu.webApplication(ioc ->
ioc.beans(beanDefinition ->
beanDefinition.bean(SampleHandler.class)
.bean(SampleService.class)
.bean(Sample.class)
).enable(WebMvcServerDsl.webMvc(dsl ->
dsl.port(dsl.profiles().contains("test") ? 8181 : 8080)
.router(router -> {
SampleHandler handler = dsl.ref(SampleHandler.class);
router.GET("/sayHello", handler::hello);
})
.converters(c ->
c.string().jackson()
)
))
);

public static void main(String[] args) {
ConfigurableApplicationContext context = app.run(args);
}
}

从上面的代码中可以看到,不论是 Jafu.webApplication 还是 ConfigurationDsl.enable() 或者 WebMvcServerDsl.webMvc() 他们都有一个共同的特点就是通过 DSL 来描述设置、加载配置、定义 Bean ,可以说 DSL 就是整个 SpringFu 的驱动器和灵魂。

那么对于这些 DSL 又究竟是什么,里面都有哪些神奇的小组件,DSL 之间又有哪些使用小技巧呢?带着这些问题可以一起来扒一扒 SpringFu 的源码。

image-20210710194145037

看到上面这个图一切就明朗起来了,如果说 SpringBoot 的自动配置是通过 xxxAutoConfiguration 那么SpringFu 的核心就在于 xxxDsl 。是的他将所有的组件都通过一份 DSL代码 来做配置和组合。而 DSL 的本质其实是 ApplicationContextInitializer 如果你恰好也熟悉 SpringBoot 的自动配置原理的话,你会顿时明白他们本质是同一个东西,都是通过 ApplicationContextInitializer 这个钩子函数来魔改 Spring 定义框架行为的。

如果你还不太熟悉 ApplicationContextInitializer 可以参看我写的这篇文章:xxx ,浅显易懂的介绍了他是什么,怎么用,为什么这么用。

顶级 Dsl

​ 这里说的顶级 Dsl 并不是指他在类层级上属于顶层,而是说他是用于配置一个 SpringBoot 应用的 Dsl ,要启动一个 SpringBoot 应用首先要定义这个 Dsl。

​ 这个类的代码也是非常简单,核心功能点在于执行用户自定义的 DslFunction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ApplicationDsl extends ConfigurationDsl {

private final Consumer<ApplicationDsl> dsl;

ApplicationDsl(Consumer<ApplicationDsl> dsl) {
super(configurationDsl -> {});
this.dsl = dsl;
}

@Override
public void initialize(GenericApplicationContext context) {
super.initialize(context);
this.dsl.accept(this); // 执行自定义 DSL
new MessageSourceInitializer().initialize(context); // 注册国际化的 AutoConfigurationBean
}

}

追溯下 ApplicationDsl 的父类 ConfigurationDsl 看起来方法不少,但要注意一个点我们可以把所有的 Dsl 类都分为两部分:配置执行 。配置即组装当前 Dsl 的上下文,执行则是根据前面组装的上下文执行 initialize 方法。

image-20210710200135821

​ 上图两个红框的部分,分别是 提供给用户的配置方法系统执行的初始化方法 。核心方法如下:

logging

​ 配置日志等级

1
2
3
4
public ConfigurationDsl logging(Consumer<LoggingDsl> dsl) {
new LoggingDsl(dsl);
return this;
}

configurationProperties

使用配置类

1
2
3
4
public <T> ConfigurationDsl configurationProperties(Class<T> clazz, String prefix) {
context.registerBean(clazz.getSimpleName() + "ConfigurationProperties", clazz, () -> new FunctionalConfigurationPropertiesBinder(context).bind(prefix, Bindable.of(clazz)).get());
return this;
}

beans

注册 bean

1
2
3
4
public ConfigurationDsl beans(Consumer<BeanDefinitionDsl> dsl) {
new BeanDefinitionDsl(dsl).initialize(context);
return this;
}

enable

​ 立即执行某个具体的 Dsl

1
2
3
4
5
6
7
8
9

public ConfigurationDsl enable(ApplicationContextInitializer<GenericApplicationContext> configuration) {
return (ConfigurationDsl) super.enable(configuration);
}

protected AbstractDsl enable(ApplicationContextInitializer<GenericApplicationContext> dsl) {
dsl.initialize(context);
return this;
}

同时他还有一个重载的方法,立即执行 ConfigurationDsl

1
2
3
4
public ConfigurationDsl enable(Consumer<ConfigurationDsl> configuration) {
new ConfigurationDsl(configuration).initialize(context);
return this;
}

listener

添加监听器

1
2
3
4
5
6
7
8
9
public <E extends ApplicationEvent> ConfigurationDsl listener(Class<E> clazz, ApplicationListener<E> listener) {
context.addApplicationListener(e -> {
// TODO Leverage SPR-16872 when it will be fixed
if (clazz.isAssignableFrom(e.getClass())) {
listener.onApplicationEvent((E)e);
}
});
return this;
}

在追溯 ConfigurationDsl 的父类 AbstractDsl 可以看到他提供了几个获取 Bean 以及获取环境变量的方法提供给所有的 Dsl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractDsl implements ApplicationContextInitializer<GenericApplicationContext> {

protected GenericApplicationContext context;

public <T> T ref(Class<T> beanClass) {
return this.context.getBean(beanClass);
}

public <T> T ref(Class<T> beanClass, String name) {
return this.context.getBean(name, beanClass);
}

public Environment env() {
return context.getEnvironment();
}

public List<String> profiles() {
return Arrays.asList(context.getEnvironment().getActiveProfiles());
}

protected AbstractDsl enable(ApplicationContextInitializer<GenericApplicationContext> dsl) {
dsl.initialize(context);
return this;
}

@Override
public void initialize(GenericApplicationContext context) {
this.context = context;
}

}

DSL 设计思路

​ 看了一个顶级 Dsl 的源码后我们大致能够窥探到整个 SpringFu 的 Dsl 的设计思路了:执行 xx 操作就返回 xxDsl,并且在执行 xx 操作时传递 xxDsl 中预置方法的组合,来定义这个 Dsl 的行为。 看文字还是比较抽象的,结合一个例子和图就会很明白了。

1
2
3
4
5
6
7
8
9
10
WebMvcServerDsl webDsl = WebMvcServerDsl.webMvc(dsl ->
dsl.port(dsl.profiles().contains("test") ? 8181 : 8080)
.router(router -> {
SampleHandler handler = dsl.ref(SampleHandler.class);
router.GET("/sayHello", handler::hello);
})
.converters(c ->
c.string().jackson()
)
);

上面这段代码就是用于构造一个:

  • 端口号为 8181 (测试环境) 8080 (非测试环境)
  • 路由为 /sayHello 访问方式为 Get
  • 返回格式为 json

的 WebDsl 整体流程我们可以理解如下的流图

WebDsl 执行流程图
[WebDsl 执行流程图]

SpringFu 启动流程

整个项目的启动是通过 Jafu.webApplication().run() 完成的,与 SpringBoot 的 SpringApplication.run(SpringfuDemoApplication.class, args) 有异曲同工之妙。那么核心点就是 webApplication() run() 方法。

通过 JaFu 的成员结构和注释我们能看出来,他其实是一个客户端类或者说工厂类。

image-20210711164338078
[JaFu 成员结构]

其实这个写法在日常代码中也是有大量应用场景的,比如我们封装一个二方包或者客户端给其他人用,他们使用起来可能需要 new 好多类,配置很多参数之后再做组装。这样一系列操作对于不熟悉这个二方包的人是非常不友好的,接入成本太高。为了防止被祭天,我们尽量采用这种工厂类或者富客户端的方式减少用户负担,屏蔽细节。

image-20210711165132287
[反面教材]

配置 Web APp

1
2
3
4
5
6
7
8
public static JafuApplication webApplication(Consumer<ApplicationDsl> dsl) {
return new JafuApplication(new ApplicationDsl(dsl)) {
@Override
protected ConfigurableApplicationContext createContext() {
return new ServletWebServerApplicationContext();
}
};
}

核心就是 new 了一个 JafuApplication ,并且指明了当前的运行环境为 Web 所以创建了 ServletWebServerApplicationContext

用函数写 Bean 你见过么

​ 函数是很多编程语言中的一等公民,并且在近些年提的很火的 函数式变成 以及 Serverless 都是通过函数来表达,每个函数提供一个单一功能的服务。他们相互作用,相互引用组装出复杂的功能。当云计算重构整个 IT 产业的同时, 也赋予了企业崭新的增长机遇。正如集装箱的出现加速了贸易全球化进程,以容器为代表的云原生技术作为云计算的服务新界面加速云计算也推动着软件架构向云原生演进。在当下处处都在呼吁 云原生反应式架构的大背景下, Java 以及他的好搭档 Spring 又在做出怎么样的改变去紧跟这个新浪潮呢 ?

Cloud native思维导图
[云原生概念导图]

​ 这里我们就要请出我们的主角 SpringFu , 据官方声称使用 SpringFu 搭建 SpringBoot 应用比普通自动配置的 SpringMVC 项目要快上 40% ,并且得益于很少使用反射特性能够非常好的适应 GraalVM native 同时还有内存占用量低等等优势。

GayHub

​ 其实 SpringFu 项目分为 JaFu (Java DSL) and KoFu (Kotlin DSL) ,也就是他既做了 Java 版本也做了 Kotlin 版本,作者也考虑了极为先进的卡特琳了,虽然这个语言我学过几天后再也没使用过,在平时的工作中我使用 Groovy 的频率都比 Kotlin 高。

image-20210710152019688

先创建一个简单的 SpringBoot 工程,可以使用 https://start.spring.io/ 这个 Spring 官方的脚手架搭建工具,当然如果你是跟我一样的 Idea 的重度用户也可以使用 Idea 的 SpringInitializr

image-20210710153046306

加载依赖的小插曲

再到主 POM 里加下下面这这段朴实无华的代码:

1
2
3
4
5
<dependency>
<groupId>org.springframework.fu</groupId>
<artifactId>spring-fu-jafu</artifactId>
<version>0.4.3</version>
</dependency>

但是当你更新 Maven 依赖的时候发现这个包并不会被拉下来,然后我果断换掉了 阿里云 的 Maven 镜像源,变成了 http://maven.net.cn/content/groups/public/ 然后赤裸裸的 Error 又把我打入地狱,什么玩意中央仓库都没有这个包?于是我拿着这个 artifactId 去 https://mvnrepository.com/ 全网 Maven 仓库查询了下,然后我注意到他有这么个 Tag ,呵 !!!给我来这套

image-20210710160236238

于是乎我经历了第三次变更 Maven 镜像源,我也懒得配置 mirrorOf 了,直接把这个镜像源提到第一个按照优先级先到这个镜像仓库取拉包,这次果然没有问题了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<mirrors>  
<!-- mirror | Specifies a repository mirror site to use instead of a given
repository. The repository that | this mirror serves has an ID that matches
the mirrorOf element of this mirror. IDs are used | for inheritance and direct
lookup purposes, and must be unique across the set of mirrors. | -->
<mirror>
<id>nexus-sp</id>
<mirrorOf>*</mirrorOf>
<name>Nexus sp</name>
<url>https://repo.spring.io/milestone/</url>
</mirror>
<mirror>
<id>nexus-163</id>
<mirrorOf>*</mirrorOf>
<name>Nexus 163</name>
<url>http://mirrors.163.com/maven/repository/maven-public/</url>
</mirror>
<mirror>
<id>nexus-aliyun</id>
<mirrorOf>central</mirrorOf>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</mirror>
<mirror>
<id>net-cn</id>
<mirrorOf>central</mirrorOf>
<name>Nexus net</name>
<url>http://maven.net.cn/content/groups/public/</url>
</mirror>
</mirrors>

​ 因为我勾选了 SpringData ,所以我必须配置一个数据源,这里我就用本地的 MySQL 当做测试数据源,让项目先跑起来。

image-20210710161001751

​ 不出意外,MacBook Pro 运行这种小程序就是快!

改造主程序

​ 我们都清楚,SpringBoot 的加载是通过主类来完成的,想让 SpringFu 来作为整个项目的启动器我们就需要对 SpringfuDemoApplication 做做改造了。注意如下代码请不要抄官方文档,你不可能跑起来的,因为他们的 demo 使用了一个 protected 的方法,如果你不是通过源码依赖是不可能调用到这个方法的,所以这里我换了一个写法,加载完 ioc 容器后在外部手动 enable 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Application {

public static JafuApplication app = webApplication(ioc ->
ioc.beans(beanDefinition ->
beanDefinition.bean(SampleHandler.class).bean(SampleService.class)
).enable(webMvc(dsl ->
dsl.port(dsl.profiles().contains("test") ? 8181 : 8080)
.router(router -> {
SampleHandler handler = dsl.ref(SampleHandler.class);
router.GET("/sayHello", handler::hello);
})
.converters(c ->
c.string().jackson()
)
))
);

public static void main(String[] args) {
app.run(args);
}
}

其中依赖了 SampleHandler 这个处理器 代码如下:

1
2
3
4
5
6
7
public class SampleHandler {

public ServerResponse hello(ServerRequest request) {
return ok().body("hello");
}

}

​ 启动后访问,大功告成。

image-20210710175442941

新旧比较

加载 bean

在传统的 Spring 加载 Bean 一般是通过 xml 或者 JavaConfig 的方式,这里演示下相对比较简单的 JavaConfig 声明 Bean 的方式。通过一个 @Configuration 注解告诉 Spring 他是一个配置类,其中的每个 @Bean 声明的方法都是容器中的一个 Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class TestConfig {

@Bean
public User user() {
return new User();
}

@Bean
public Student student() {
return new Student();
}
}

而在 SpringFu 中使用就变得简单起来了,通过 ConfigurationDsl 的 beans 方法注册 bean,方法接受了一个 Lambda 表达式来加载 bean 。所以说在 SpringMVC 或者 SpringBoot 中需要使用 @Component 的地方最终都会使用 ConfigurationDsl#beans 来代替

1
2
3
4
5
ioc.beans(beanDefinition ->
beanDefinition.bean(SampleHandler.class)
.bean(SampleService.class)
.bean(Sample.class)
)

Web 路由

在 SpringMVC 中是通过 @RequestMapping("/api/demo") 这个注解来表名当前哪个路径会路由到当前类或者方法,而在 SpringFu 中则有专门的 DSL 来描述这个映射关系,比如我们上面用到的 WebMvcServerDsl 并且他定义了一系列的方法来配置 web 相关信息,比如用到的 portrouter

1
2
3
4
5
6
7
8
9
@RestController
@RequestMapping("/hello")
public class TestController {

@RequestMapping("/echo")
public String echo() {
return "hello";
}
}

在 SpringFu 中我们把 Controller 叫做 Handler , 路由配置方式也与之前有些差异:

1
2
3
4
webMvcDsl.router(router -> {
SampleHandler handler = dsl.ref(SampleHandler.class);
router.GET("/sayHello", handler::hello);
})

整体来看,从 SpringMVC 到 SpringFu 改动的不仅仅是表达方式,而是一种编程思维,通过 DSL 抽象组件再通过 Function 的组合以及注册完成框架配置和业务展开。其实 SpringFu 的定位并不是替代 SpringMVC 而是在云原生以及在Serverless等以函数为主体的轻应用场景中发挥巨大作用。

本篇主要介绍 SpringFu 项目搭建,体验了一把总体感觉还是不错的除了有不少坑之外,下篇会看看关于 SpringFu 所用到的设计思想以及类关系简单的源码分析。

揭秘 Tomcat - 启动

前言

tomcat 对于每一个 Java 工程师都是一个既熟悉又神秘的黑盒,什么 web 容器,什么三大组件,什么web.xml ….. 这些令人迷惑的问题都会在 《揭秘 Tomcat》系列文章中掰扯的清清楚楚。

image.png

从启动说起

启动 tomcat 非常简单 , ./startup.sh 一行命令,起飞 ~ 打开这个 shell 脚本我们发现最终启动了一个 Java 的 main 方法,也就是整个tomcat 的入口 org.apache.catalina.startup.Boostrap 瞅一眼简化后的代码 ~

image.png

简要的做一下分析启动参数,main 方法可以传入 startd、stopd、start、stop,startd 作为参数。其中 start 和 startd 区别就是将 await 设置为 true ,即程序是需要后台运行,还是在前台等待用户操作。

启动主要做了以下三件事情:

    1. 实例化 Boostrap ,并初始化
    2. 调用 load 方法
    3. 调用 start 方法

接下来就从以上三个步骤,来揭秘 tomcat 启动流程。

初始化启动器【init】

直接上代码看起来会比较蛋疼,我们先整理下思路,调用 Boostrap#init 方法主要完成三件事:

  1. 初始化类加载器:初始化 Tomcat 核心加载器:commonLoadercatalinaLoadersharedLoader ,后续会详细介绍这三个类加载器的作用以及使用场景。
  1. 设置主线程 classLoader 为catalinaLoader,安全管理的 classLoad 为catalineLoader。
  1. 通过反射生成Catalina对象,设置 Catalina 父加载器为 sharedLoader

初始化的核心代码如下:

image-20210627194240506

总结下来就是初始化了类加载器,即关键的一行就是 initClassLoaders(),它加载了/conf/catalina.properties 中的 common.loader、shared.loader、server.loader 对应的jar包和 properties 等文件,最后初始化了Catalina类,这个类将是加载我们 web 的 classloader。所以我们的自定义的文件或者 jar 包 都可以通过配置common.loader来加载 ,比如公共配置文件,日志文件等等。

看看 initClassLoaders() 方法代码,我们发现这几个类加载器默认情况下都是指向的 commonLoader,从源码可以看到,tomcat commonLoader 的父加载器为null,违背了java自己的类加载机制。那么问题来了 tomcat 为什么要定义自己的 classLoader ?

image.png

​ 我们知道 Java 默认是遵循双亲委派模型的,听起来高大上的名词,用一句话来解释就是所有类加载器优先使用父加载器进行类的加载。

image.png

Java 体系中默认的类加载器遵循上图的父子关系,也就是说大部分的类都是最顶层的 BootStrap 这个类加载器进行加载的。这样做的好处就是保证 JDK 中的基础类不会被改写,另外防止一个类被不同的类加载器重复加载多次。

但是为什么 tomcat 没有使用默认的类加载机制,而是自己另起一套呢?看下下面几个场景:

    • tomcat 中可以部署多个项目,但是这多个项目可能依赖同一个类库的不同版本假如都使用 bootstrap 去加载那么就会出现类冲突谁知道要加载哪个版本; -> 衍生出 WebAppClassLoader
    • tomcat 内部依赖的 jar 和类库如果不和应用程序隔离的话应用程序中的类可以轻易覆盖 tomcat 中的核心类库造成安全问题; -> 衍生出 catalinaLoader
    • 应用程序和tomcat 都需要的类库大家可以共用的就不需要重复加载;-> 衍生出 Shared ClassLoader

简单看下 tomcat 的类加载体系:

image.png

他们的功能从名字就能看出来时干嘛的,简单列举下功能:

    • commonLoader:Tomcat最基本的类加载器,加载路径中的class可以被Tomcat容器本身以及各个Webapp访问;
    • catalinaLoader:Tomcat容器私有的类加载器,加载路径中的class对于Webapp不可见;
    • sharedLoader:各个Webapp共享的类加载器,加载路径中的class对于所有Webapp可见,但是对于Tomcat容器不可见;
    • WebappClassLoader:各个Webapp私有的类加载器,加载路径中的class只对当前Webapp可见;

加载容器组件 【load】

紧接着调用了 Bootstrap 的 load 方法,看下简化后的代码:

image.png

直接调用到了 Catalina#load 方法

image.png

简单介绍下上面代码的作用,configFile() 方法返回了 server.xml 文件, Digester 解析该文件,得到容器的配置,并创建容器组件和容器。这个地方的解析 xml 生成一系列的对象的逻辑很牛逼,也很复杂后面会专门有一篇文章来解析这部分逻辑!

依次创建的组件为 :StandardServer、 StandardService、StandardEngine、StandardHost。然后拿到 StandardServer 实例调用 init() 方法初始化 Tomcat 容器的一系列组件。一些容器初始化的时候,都会调用其子容器的 init() 方法,初始化它的子容器,相当于每个容器都在初始化自身相关设置的同时,将子容器初始化。所以着重看下 init 方法

init 方法并不是 Server 类中的,而是他从 Lifecycle 接口的子类中继承的。就拿 StandardServer 举例, 他继承了 LifecycleMBeanBase ,可以看到他的初始化逻辑(下图的 init 方法) 并没有直接写出来,而是通过一个抽象方法让子类实现,而在本类中实现通用的逻辑,比如状态机的设定,异常处理。这就是大名鼎鼎的 模板设计模式 ,读 tomcat 还能学学设计模式,血赚有没有~

image.png

而这个类里面的模板占位符 initInternal 被 20 多个类进行了覆盖,都是我们的 tomcat 组件

image.png

同理 destroy -> destroyInternal ,start -> startInternal ,stop -> stopInternal 都是使用了相同的手法。

启动容器 【start】

最后一步就是 start ,调用后就能成功看到 Server startup in 1222 ms 这样一行标志性的日志了

image.png

同样的,他还是调用了 org.apache.catalina.startup.Catalina#start 代码判断异常的逻辑较多,可以简化为下面的形式

image.png

如果没有初始化 server 那么重新初始化一下,这是一个好习惯,做一个兜底防止程序马上就挂了;之后调用了 server 组件的 start 方法,前面我们知道 server 组件初始化会调用内部的 initInternal ,start 也是同理的会调用内部的 start 他们都是生命周期接口中的方法。主要的事情就是启动子组件;

接着就注册了一个钩子函数,这是 jdk 中自带的线程停止后会执行另外一个线程帮他清理数据;

最后就是调用 await () 进行 web 端口的监听,启动后台线程池接受 http 请求,这样整个 tomcat 就启动完毕了,在 await 过程中方法是不会返回的所以 stop 不会被执行,而程序接收到 stop 的命令后会返回执行 stop 方法清理数据

总结

这里和大家主要从宏观的角度了解 tomcat 的启动流程,主要分为 init,load,start 三部分,而且很多中间件都是这三个部分组成的,不论是后面会讲到的 spring 还是 dubbo 大致流程都是如此,对我们后期看其他中间件源码会有一个大致的思路。

而且对于整个启动过程可以总结为三句话:

  • 初始化 toncat 类加载器系统

  • 解析 server.xml 初始化 tomcat 系统组件

  • tomcat 组件启动,钩子注册,端口监听

后续的文章会对这三部分进行详细的解读,里面使用的设计模式 , 优秀的写法, 良好的风格我都会和大家一起探讨进步。关注我,在技术的道路上十年如一日的进步 !

另外:大家也可以关注下我的微信公众号哦~ 技术分享和个人思考都会第一时间同步!

image.png

SpringBoot 启动加速

前言

在 2021 年这个小学作文中的未来年份,没有想象中的汽车满天飞,也没有实现机器人满地跑。但牛逼的是我们都有一个共识: 知乎达到了人均 “谢邀~ 人在美国刚下飞机”的生活水平,虎扑的人均收入也在 30W+ ,还有就是程序员都人均精通 SpringBoot ,哪怕和算法聊技术一言不合就满嘴 SpringCould 分布式、微服务,然而实际操作可能是 分步试伪服务 … 你一个小小系统开这么多应用启动不难受?(不难受因为可以装 13) SpringBoot 这启动速度也确实令人捉急,每个应用 5 分钟改个小功能,编码五分钟部署俩小时。 so 如何更加优雅快速的 启动 SpringBoot 应用 (满足装 13 的欲望) 呢?

IMG_3485.jpg

其实我们都比较清楚大部分的启动时间是由于 Spring 需要加载各种 Bean 导致启动速度下降的,那么对于那些不是特别重要的 Bean 我们是不是可以让他再起一个线程做 Bean 初始化,不阻塞主线程启动,这样启动速度不就起来了么,说干就干!

那你肯定会问,我们咋判断一个 Bean 加载时间长短,找出那些 拉跨的Bean 呢?内心一阵嘲讽,嘴角泛出一阵冷笑后 不慌不忙的道来:“这对于精通 Spring 框架的我来说不是小菜一碟么 ”。我掐指一算,就知道是哪几个 Bean 拉跨。

image.png

统计 Bean 初始化时间

简单做法就是通过 BeanPostProcessor ,祭出这个神器就能干倒一半的 LSP,很多使用 Spring 的 LSP 其实都不太知道这个玩意,那他到底是个什么玩意呢?简单来讲就是 Spring 初始化 Bean 的钩子函数。 Bean 是否加载,加载的是哪个 Bean 以及修改 Bean 的相关属性都可以通过这个钩子函数搞定。这就是一个活生生的 Spring 后门,学会了他 Spring 框架任你摆布。

后面我会专门开一篇文章去讲解 BeanPostProcessor 如何帮你加载 Bean,以及如何利用这个后门搞事情。

unnamed.jpg

所以我们现在讲讲如何用 BeanPostProcessor 统计 Bean 的初始化时间。不多 BB 直接上代码:

image.png

我们编写了 BeanInitCostTimeBeanPostProcessor 继承了 BeanPostProcessor 实现了 postProcessBeforeInitialization 和 postProcessAfterInitialization 分别代表了 Bean 初始化之前和之后,简单记录了 Bean 初始化开始时间,然后用结束时间减去开始时间就得到了具体的初始化时间。

我专门在 Spring 容器中放了一个加载时间很长的测试 Bean ,代码如下

image.png

效果如下:

image.png

异步 Bean 实操

异步 Bean 异步 Bean ,首先这个 Bean 需要是个异步的,那如何将一个 Bean 包装成异步的呢?首先异步 Bean 肯定需要继承原来的 Bean 需要保证是同一种类型的,然后重写他的 init 方法。代码操作如下:

image.png

但是这么写并不是很优雅,假如我们还需要在初始化 Bean 的时候做一些其他操作,直接继承是不方便的。所以必须祭出 Spring 中的另外一个组件了,工厂 Bean,学名 FactoryBean 这个要和另外一个叫做 BeanFactory 的大佬区分开。 工厂 Bean 顾名思义就是这个玩意也是一个 Bean 只是他的主要职责是作为工厂,通常用于生成特定类型的 Bean ,那么我们就有一个骚操作了,让这个工厂生产的 Bean 就是它自己,而且中间还能定义很多额外的逻辑,不多 BB 上代码

image.png

然后通过异步 Bean 去加载我们看看前后的差别

image.png

优化之前:

image.png

优化之后:

image.png

可以看到优化后 1.4s 应用就启动完成了,而且耗时的 Bean 可以让他慢慢加载,在应用启动完成后这个 Bean 才完全被加载。

唠叨

最近有点忙成狗,从四月就断更到现在了,今天抽个周末的时候终于写完了。

不过最近看了好几本书,也准备抽点时间写写读后感了,非常推荐大家看看 《零秒思考》 ,同时写点个人的小思考: 一旦适应了用语言表达脑海中的意向和感觉,渐渐地就不会为如何表达自己的心情和想法而苦恼,流畅清晰的表达能够更快的与对方沟通让对方心领神会。表达不善往往便会产生一种想也是白想的放弃心理,由此甚至连思考本身也放弃了,不思考人就不会成长,不经历思考解决手头的事情人就会失去干劲变得无聊。

另外:大家也可以关注下我的微信公众号哦~ 技术分享和个人思考都会第一时间同步!

image.png

Kafka 探险 - 源码环境搭建

这个 Kafka 的专题,我会从系统整体架构,设计到代码落地。和大家一起杠源码,学技巧,涨知识。希望大家持续关注一起见证成长!

我相信:技术的道路,十年如一日!十年磨一剑!

前言

在阅读源码之前,首先要做的就是搭建一套源码调试环境,这是最基本的一步,不要觉得麻烦或者简单就不去做,也许你会像我一样搭源码的过程中得到一些教训和经验。同时在后面阅读源码的过程中,很多看不懂的地方 debug 一下也许就明朗了。

记录了搭建 Kafka 源码环境的简单过程,为大家提供一个步骤参考,同时记录搭建环境中可能会遇到的问题及解决方案。

这个环境搭建过程也会提到一个非常实用,并且很多人都不知道的源码 debug 技巧,对阅读源码和 debug 系统很有帮助哦!

源码下载

笔者下载的 Kafka 版本是 0.11.0.1 ,源码下载地址是 :https://kafka.apache.org/downloads

下载时选择,源码下载:

image.png

解压工程&安装插件

解压下载好的源码包,直接使用 Idea 打开项目即可。另外由于 Kafka 代码是 Scala 写的,所以需要安装一个 Scala 插件。

到 Idea 的插件市场下载 Scala 插件,这个插件不仅仅有语法提示而且可以帮你下载 Scala SDK,切换 SDK 非常方便,必装!

image.png

仓库初始化

养成一个好习惯,对于这种直接下载的源码包,先用 git 进行初始化,后续有什么改动也能够进行回溯,防止直接把源码改瓢了,之前做的注释也很难再拷贝出来。

1
git add . && git commit -m 'init'

构建项目

修改项目根目录下的 build.gradle ,将所有的 mavenCentral() 替换成 maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/'} 加快 gradle 导入包的速度。

完事以后开始进行 Gradle 构建

image.png

构建完成后,所有的 Kafka 些模块会被自动导入,如下图是导入完成时的工程模块结构

image.png

启动

找到 kafka.Kafka 这个类,然后运行 Main 方法,添加启动参数

1
2
3
vmOptions ->  -Dkafka.logs.dir=/Users/lwen/logs/kafka   # 这个目录需要修改一下,是 kafka 消息文件目录

program arguments -> config/server.properties # kafka 的配置文件路径

下图展示配置完毕时的参数

image.png

我遇到了很多编译警告⚠️,不过只要还能继续编译就不用 care。

令人悲伤的是程序启动不起来,main 方法直接退出了,没有任何的提示。

image.png

排查问题

遇到上面那个问题后,找不到任何的日志看出是因为什么导致的,当时看网上的教程是把 log4j 配置文件拷贝到 kafka 目录,日志就能生效,但是我尝试过了也不 OK。

所以我就开始 debug,找出为什么这个地方会出现 exit with 1 ,这里介绍一个调试源码的技巧:我们看到代码是遇到了异常才退出的,但是我们没有异常堆栈和错误提示,可以肯定的是程序肯定遇到异常了。

所以我们在 Idea 中,断点所有会发生异常的位置具体操作:

cmd+shift+f8 打开断点窗口

image.png

勾选上 Any Exception ,并在 Catch Class Filter 中去掉 ClassNotFoundException 因为在程序运行的时候会有双亲委派的类加载过程,肯定会触发 ClassNotFoundException 。这样配置以后,程序抛出任何非 ClassNotFoundException 的位置都会停下来

image.png

以 debug 的方式启动程序,最后我发现程序在 initZk() 的地方异常了,那就很清晰了,zk 配置问题

image.png

这个有点坑!主要是因为没有开启日志,所以一行日志没有直接抛出异常结束进程了,后来我也找到打印日志的方法,按照我上面的启动参数配置就可以。

所以原因是没有启动 zk,那么下一步就是安装 zk。

安装 ZK

1
brew install  zookeeper

安装完了以后启动 zk ,我采用的是 后台运行的方式:

1
brew services start zookeeper

​ 当然也可以直接前台启动,看到日志输出:

1
zkServer start

再次启动

image.png

唠叨

本来以为搭建源码挺简单的,但是还是自己把自己坑了一把。日志没配,zk 没配。不过好在这个过程中,就算没有任何日志和堆栈也能分析到问题的原因,也是调试的一个小技巧,相当实用。

下篇文章要开始分析 Producer 的架构啦,首先我们会尝试自己实现一个 Producer ,然后再和官方的对比,看看优秀的代码在设计中更关注的点以及是如何实现的。

另外:大家也可以关注下我的微信公众号哦~ 技术分享和个人思考都会第一时间同步!

image.png

Mybatis 常用标签

trim

这个标签的作用就是帮你给标签的内容的头部或者尾部 删除 或者 添加 特定字符。

举个例子:

我们的 where 下面经常会有各种条件,假如这些条件我们一个都不传那么我们就应该删除这个 where ,也或者说如果标签里面的内容不为空的话那么就给我们加上 where

1
2
3
<trim prefix="WHERE" prefixOverrides="AND |OR ">
...
</trim>

这段的意思就是:

  1. 如果 trim 标签里面的内容为空,那么不要加 where ,否则加上
  2. 如果标签里面的第一个元素是 and 或者是 or 那么删除这个 and 或者是 or

Where

这个标签如果看懂了上面的话,那么他的意思就是上面的那段 tirm

Kafka 探险 - 架构简介

这个 Kafka 的专题,我会从系统整体架构,设计到代码落地。和大家一起杠源码,学技巧,涨知识。希望大家持续关注一起见证成长!

我相信:技术的道路,十年如一日!十年磨一剑!

简介

Kafka 是一种分布式的,基于发布 / 订阅的消息系统。最初被 LinkedIn 开发,并在 2011 年初开源,2012 年 10 月从 Apache 孵化器破壳而出,成为 Apache 的顶级项目。

Kafka 最初被设计的目的是 LinkedIn 流量和运维数据分析。流量数据包含 PV (Page View) , UV (Unique Visitor) ,搜索数据,详情页数据等。在高并发场景对于这些数据的统计并非实时的,不是简单的对于数据库的某个字段数据量 +1 这么简单,超大的流量洪峰下并不能因为统计数据将业务主流程阻塞。所以通常会将这些数据记录在文件或大数据存储引擎中,然后周期性的进行统计分析。

Kafka 被越来越多的公司青睐主要和他的特性优势有关:

  • 以 O(1) 时间复杂度消息持久化,对于 TB 级别的数据也能够保证 O(1) 的访问效率
  • 支持批量 数据,并且对于数据进行压缩保证高吞吐
  • 支持消息分区,分布式发送,分布式消费,便于水平扩展 (Scale out),具有很高的并发能力

应用场景

那为何需要使用消息队列,或者说在什么场景下 Kafka 更加合适

解耦

在大数据,高并发的场景下为了突破性能瓶颈会对系统进行水平扩展和垂直拆分,将一个复杂的系统拆分多个独立,纯净的子系统。数据在各个系统之间流转,但是如果某一个服务处理速度过慢,就会拖累整个链路的性能,形成瓶颈降低整个系统的性能,造成“旱的旱死涝的涝死”的局面。

举个简单例子:在淘宝下单时,交易系统完成扣款,后续会有很多动作:提醒卖家发货,生成卖家工作流,核销优惠券,增加购物积分等等,如果这一步全部写到交易系统的扣款代码之后,很有可能交易系统就会被拖死,下游任何一个环节失败也会导致扣款回滚,并且如果需要添加一个新的动作需要交易去做大量修改,设计肯定是不合理的。实际上交易系统在处理完扣款后会发送一个扣款完成消息,下游接这个消息即可,下游失败不会影响核心流程失败,并且各个系统的边界更加清楚,分层更更加合理。

数据持久化

如今的应用程序基本都会涉及到多个系统之间的对接,数据在系统之间通过 RPC 进行传递,处理数据的过程失败就会导致数据丢失,除非数据被持久化到磁盘上。而 Kafka 将所有需要流转的数据都 持久化到磁盘上 ,保证数据不会丢失。另外还有一个很重要的能力就是保留现场便于后续问题排查跟踪,经历过系统失败但是无法复现的人才会体会到的痛!

为了保证磁盘上的数据不会爆炸式疯涨,Kafka 提供了数据清理,数据压缩等功能,清除处理完成的历史数据。

扩展性

在应用的访问量剧增的情况下,代码优化往往没有直接进行水平扩展来的那么及时。诊断,分析,方案,优化,验证 一系列复杂流程让代码优化看起来只能是一个从长计议的方案。这时止血的方案只能是降级,限流,扩机器 三板斧。Kafka 的扩展性主要就体现在能热扩容,不需要修改参数,不需要修改代码,上机器 -> 注册服务 就完成了扩容。并非所有系统都具备这个像 调节音量旋钮一样简单的提高系统性能 的能力 ,这里会涉及到扩容之前的数据是否会有热点,新节点对集群的同步,流量重分配等等一系列复杂流程。

容灾

系统的部分组件失败不会影响这个系统的运行,消息队列降低了进程间的耦合度,上游或者下游服务挂掉后不会影响其他系统的运行,在服务重新在线后能够继续处理之前未处理的数据,只是会存在一定的延时但是能够保证 最终业务正确性

保序

强哥:你这瓜保熟吗?哦不,你这队列保序吗?

在大多数场景下,数据处理顺序是至关重要的,顺序错乱很可能导致数据结果错误。除非这个处理过程是无状态的,此时消息只是起到事件触发的作用,触发下游进行计算。Kafka 可以保证分区内部有序而不能保证全局有序。

核心概念

架构图

image.png

上图是一个典型的 Kafka 架构图,左边为消息生产者(Producer) ,发送消息到一个特定的主题(Topic),由于 Kafka 的分布式设计每个 Topic 被分成多个分区,因此发送到每个 Topic 的消息会被存储到对应的分区。另外如果 Topic 设置了副本,则每个分区都会有对应的副本。这些 Topic 被不同的消费者(Consumer)订阅,如果两个消费者在同一个消费者组,那么里面的消费者只能订阅一个固定的分区。

用上图的 Topic A 举例, Producer 1 发送消息到 Topic-A ,消息会在存放在 Broker-2 和 Broker-3 的两个分区上,并且由于 Topic-A 开启了分区备份,所以每个分区都会由另外一个节点 Topic-A’ 备份分区数据 。发送到 Broker 的数据会被消费者订阅,由于 Consumer-1 和 Consumer-2 在同一个消费者组中,他们只能消费一个固定分区的消息, Consumer-1 只会接收到 Topic-A Partition-1 的消息,Consumer-2 只会接收到 Topic-A Partition-0 的消息。

Broker

在 Kafka 集群中的一个 Kafka Server 就是一个 Broker ,生产者将消息投递到 Broker ,Broker 保证消息的 持久化,容灾,准确性等。同时接受消费者的消息订阅,向消费者分发消息。一般来说在生产环境一台 Kafka 服务器就是一个 Broker。

Topic & Partition & Log

Topic 可以认为是用来存储消息的逻辑概念,可简单认为他是一个 信箱 。每条消息发送的时候都需要指定需要发送到哪个 Topic ,消息被消费的时候也需要指定消费哪个 Topic 中的消息。

Kafka 为了提高可扩展性以及吞吐量,Topic 被分成多个分区 (Partition) ,每个 Partition 对应一个 Log,Log 是一个逻辑概念, 它会对应服务器上一个文件夹,这个文件夹下存放的是这个 Partition 下所有的消息数据和消息索引 。在面对海量数据的时候,为了避免出现巨大文件出现 I/O 瓶颈,Kafka 又将 Log 分为多个 Segment 。每个 Segment 包含 log 文件index 文件 文件命名是以该 Segment 第一条消息的 offset 命名。这样说下来其实还是很绕的直接看下面的架构图,可以仔细留意一下各个部分的标识和数字再结合这段文字,理解起来应该就很轻松了。

另外因为 Kafka 采用顺序 I/O,顺序 I/O 效率非常高,甚至比随机写内存效率更高,这也是 Kafka 高性能的原因之一。image.png

Replication

在生产环境中,我们一般会开启 Kafka 消息冗余特性,每个 Partition 都有 1 个或多个副本,我们称之为 Replication。当分区只有一个副本的时候,该分区数据只保留了一份。每个分区副本都会选出一个 Leader , Leader 是所有读写请求的 “接口人” ,其余副本均为 Follower 。Follower 作用有两个:拉取 Leader 的 Log 数据做 备份 ,在 Leader 失败后作为候选人 参与 Leader 选举

Producer

消息产出的源头,通过一定的策略推送到 Topic 的各个分区 。这里所说的推送策略就是消息路由机制,Kafka 内置多种策略可选例如:按照消息 Key ,轮训等等,甚至用户可以写扩展代码来自定义路由策略。

Consumer & Consumer Group

消费者(Consumer) 主要工作是从 Broker 拉取消息,进行消费处理。每个消费者维护自己的消费进度,这样的设计有诸多好处,比如:每个消费者进度能够轻松的进行区分,并且可以修改单个消费者的消费位点跳过或者重新消费某些消息,避免了位点信息的集中化管理的单点故障问题。

现在的应用程序大部分为分布式的系统,一个应用有几十台上百台服务器,这些服务器上运行着相同的代码,那么一个消息过来,每台服务器都执行一次消费逻辑,岂不是会造成巨大的问题。

所以 Kafka 引入了一个新的概念: 消费者组(Consumer Group) 。我们可以将这些应用的服务器都放到同一个消费者组中,而 Kafka 规定一条消息只能被同一个消费者组中的一个消费者消费,这样就能完美避免分布式情况下的重复消费问题了。上面所说的情况简单来说是希望实现消息被某台服务器独占,也就是 单播 问题。假如我们希望这条消息被广播出去,每台收到这个消息的服务器都做处理,例如发消息做日志清理,这种情况称为 广播 , 那我们只需要将每个消费者放到不同的消费者组即可。

Kafka 引入消费者组的概念巧妙解决了单播和广播问题,而没有区分订阅类型,通过一种逻辑概念来屏蔽掉多种订阅实现。

另外在同一个消费者组中的消费者订阅的分区是确定的,只有在消费者组中的消费者有变化的时候才会进行重分配。例如我们有四个分区,三个消费者,就会出现一个消费者订阅两个分区的情况。而三个分区四个消费者就会出现有消费者处于空闲状态,造成浪费,所以一般消费者的数量尽量不要大于 Topic 的分区数。image.png

尾声(唠叨)

这是我 2021 年的第一篇博客,年底做回顾的时候才知道去年我过的究竟有多么糟糕。既没有输入也没有输出,虽然工作进入一个新的阶段,会越来越忙,但忙不是拒绝成长的借口,必须保证每月一到两本书的输入,一到两周输出一篇优质文章。 最长的路属于一颗孤独的心,与君共勉

下期我会从整体梳理 Kafka 生产者,包括消息发送客户端,发送端数据缓存从源码角度看看其中的设计模式,代码组织技巧。

大家也可以关注下微信公众号哦~

image.png

Kafka 探险 - 生产者源码分析:核心组件

这个 Kafka 的专题,我会从系统整体架构,设计到代码落地。和大家一起杠源码,学技巧,涨知识。希望大家持续关注一起见证成长!

我相信:技术的道路,十年如一日!十年磨一剑!

往期文章

Kafka 探险 - 架构简介

Kafka 探险 - 源码环境搭建

前言

我们说 Kafka 是一个消息队列,其实更加确切的说:是 Broker 这个核心部件。为何这么说?你会发现我们可以通过控制台、 Java 代码、 C++ 代码、甚至是 Socket 向 Broker 写入消息,只要我们遵从了 Kafka 写入消息的协议,就可以将消息发送到 Kafka 队列中。

用专业一点的话术来说,Kafka 定义了一个应用层的网络协议,只要我们基于传输层构造出符合这个协议的数据,就是合法的 Kafka 消息。image.png

所以说我们写入 Kafka 消息的只是一个生产者的客户端,他的形式多种多样,有 Java ,Python,C++ 等多种实现,那么我们每次发消息难道还需要自己去实现这套发送消息的协议么?显然 Kafka 官方已经考虑到这个问题了,为了给我们提供 开箱即用 的消息队列,官方已经帮我们写好了各种语言的优质生产者实现,例如我们今天要讨论的 Java 版本的实现。

思考

前面提到 Kafka 帮我们实现了各个版本的生产者代码,其实他也可以完全不提供这份代码,因为核心的队列的功能已经实现了,这些客户端的代码也可以完全交由用户自己实现。

那么假如没有官方代码,我们又该实现一些什么功能,有哪些接口,哪些方法,以及如何组织这些代码呢。带着这样的问题我们一起来思考一下!一般对于这种带有数据流转的设计,我会从 由谁产生? 什么数据? 通往哪去? 如何保证通路可靠? 这几个方面来考虑。

消息自然是通过应用程序构造出来并提供给生产者,生产者首先要知道需要将消息发送到哪个 Broker 的哪个 Topic,以及 Topic 的具体 Partition 。那么必然需要配置客户端的 Broker集群地址 ,需要发送的 Topic 名称 ,以及 消息的分区策略 ,是指定到具体的分区还是通过某个 key hash 到不同的分区。

知道了消息要通往哪,还需要知道发送的是什么格式的消息,是字符串还是数字或是被序列化的二进制对象。 消息序列化 将需要消息序列化成字节数组才方便在网络上传输,所以要配置生产者的消息序列化策略,最好是可以通过传递枚举或者类名的方式自动构造序列化器,便于后续序列化过程的扩展。

从上面一篇文章 《Kafka 探险 - 架构简介》 了解到:消息队列常常用于多个系统之间的异步调用,那么这种调用关系就没有强实时依赖。由于发消息到 Kafka 会产生 网络 I/O ,相对来说比较耗时,那么消息发送这一动作除了同步调用, 是否也可以设置为异步,提高生产者的吞吐呢? 。并且大量消息发送场景, 我们可以设置一个窗口,窗口可以是时间维度也可以是消息数量维度,将消息积攒起来批次发送,减少网络 I/O 次数,提高吞吐量。

最后呢为了保证消息可以最大程度的成功发送到 Broker ,我们还需要一些 失败重试机制 ,例如失败后放到重试队列中,隔一段时间尝试再次发送。

理清思路

通过上面的分析,我们会有一个大致的认识,应该会有哪些方法,以及底层的大致的设计会分为哪几个部分。但是不够清楚,不够明晰。

首先总结一下实现客户端的几个要点在于:

  1. 配置 Broker 基础信息:集群地址、Topic、Partition
  2. 消息序列化,通过可扩展的序列化器实现
  3. 消息异步写入缓冲区,网络 I/O 线程实现消息发送
  4. 消息发送的失败重试机制

话不多说,用一张图画出各个核心模块以及他们之间的交互顺序:image.png

用户设定 Kafka 集群信息,生产者从 Kafka Broker 上拉取 可用 Kafka 节点、Topic 以及 Partition 对应关系。缓存到生产者成员变量中,如果 Broker 集群有扩容,或者有机器下线需要重新获取这些服务信息。

客户端根据用户设置的序列化器,对消息进行序列化,之后异步的将消息写入到客户端缓冲区。缓冲区内的消息到达一定的数量或者到达一个时间窗口后,网络 I/O 线程将消息从缓冲区取走,发送到 Broker 。

以上就是我对于一个 Kafka 生产者实现的思考,接下来看看官方的代码设计与我们的思路有何差别,他又是为什么这么设计。

官方设计

其实经过上面的思考和整理,我们的设计已经非常接近 Kafka 的官方设计了,官方的模块拆分的更加细致,功能更加独立。

核心组件

首先看一眼 KafkaProducer 类中有哪些成员变量,这些变量就是 Producer 的核心组件。

image.png

其中核心字段的解释如下:

clinetId :标识发送者Id

metric :统计指标

partitioner :分区器作用是决定消息发到哪个分区。有 key 则按照 key 的 hash ,否则使用 roundrobin

key/value Serializer :消息 key/value 序列化器

interceptors :发送之前/后对消息的统一处理

maxRequestSize :可以发送的最大消息,默认值是1M,即影响一个消息 Record 的大小,此值在服务端也是有限制的。

maxBlockTimeMs :buffer满了或者等待metadata信息的,超时的补偿机制

accumulator :累积缓冲器

networkClient :包装的网络层

sender :网络 I/O 线程

发送流程

发送一条消息的时候,数据又是怎样在这些组件之间进行流转的呢?

image.png

Producer调用 send 方法后,在从 Broker 获取的 Metadata 有效情况下,经过拦截器和序列化后,被分区器放到了一个缓冲区的特定位置,缓冲区由一个 ConcurrentHashMap 构成,key 为主题分区,value 是一个 deque 存放消息缓存块。从客户端角度来看如果无需关心发送结果,发送流程就已经结束了。

接下来是独立的Sender线程负责从缓冲中获取足量的数据调用 Network Client 封装层去真正发送数据,这里使用了 Java8 的 NIO 网络模型发送数据。

可以看到整个逻辑的关键点在于 RecordAccumulator 如何进行消息缓存,一般的成熟框架和中间件中都会有一套自己的内存管理机制,比如 Netty 也有一套复杂而又精妙的内存管理抽象层,这里的缓冲区也是一样的道理,主要需要去看看 Kafka 如何去做内存管理。

另外需要关注 Sender 从缓冲里以什么样的逻辑获取数据,来达到尽量少的网络交互发送尽量多的数据。还有网络失败又是如何保证数据的可靠性的。这个地方也是我们的设计和官方实现的差距,对于网络 I/O 的精心优化。

目前的篇幅已经比较长了,为了大家方便阅读理解,本篇主要从和大家一起思考如何设计一个 Kafka Producer 以及官方是如何实现的,我们之间的差距是什么,更需要关注的点是什么。通过自己的思考和对比更加能认识到不足学习到新的点!

尾声(唠叨)

这篇文章从周内就开始了,后面断断续续每天写了点,只是每天回去的确实有点晚,偶尔还给我整个失眠,精神状态不太好,周五六点多饭都没吃直接回家睡觉了,确实好困,希望下周能休息好。

这周的工作压力也很大,主要是需要推动很多上下游协同,还需要定方案。经常在想怎么交涉?怎么修改方案大家会认同?怎样说服他们? 是压力也是锻炼,说明这方面欠缺的较多,该补!

下篇文章主要会写 KafkaProducer 的缓存内存管理机制,Meta 信息更新机制,以及网络 I/O 模型的设计。敬请期待~

另外:大家也可以关注下我的微信公众号哦~ 技术分享和个人思考都会第一时间同步!

image.png