Mybatis源码分析-Mybatis的缓存机制

一直都知道Mybatis有二级缓存,但这些知识点是停留在文档或者面试宝典中,很少在工作中感知到,更别说从源码角度分析了。所以,今天准备从源码角度分析学习下。

一级缓存

一级缓存是SqlSession对象持有的,我们知道Mybatis在每次查询操作的时候都会创建一个SqlSession,那这就意味着一级缓存是SqlSession级别的,不会有线程安全问题。

先看看一级缓存的真面目:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// org.apache.ibatis.executor.BaseExecutor
public abstract class BaseExecutor implements Executor {
protected Transaction transaction;
protected Executor wrapper;

protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
// 一级缓存
protected PerpetualCache localCache;
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;

// ..........省略............
// 典序的查询操作
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
queryStack++;
// 先查询一级缓存
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
queryStack--;
}
if (queryStack == 0) {
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
// issue #601
deferredLoads.clear();
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
}

看看缓存的类定义

1
2
3
4
5
6
7
8
9
// org.apache.ibatis.cache.impl.PerpetualCache
// 其实就是个hashMap
public class PerpetualCache implements Cache {

private final String id;

private final Map<Object, Object> cache = new HashMap<>();
// ...........省略.............
}

我们知道,从mapper接口一路走来最终是通过Executor的实现类组件来查询sql的,以Executor的一个实现类BaseExecutor为例子,其封装了localCache作为一级缓存,每次查询前都会先从一级缓存取,当一级缓存未命中才选择从数据库获取数据。

一级缓存命中

如何才能命中一级缓存?一级缓存是sqlSession级别的,也就是只有当使用的是同一个SqlSession,查询完全相同的sql时才可以命中。一种典型的触发方式如下:

1
2
3
4
5
SqlSession sqlSession = sqlSessionFactory.openSession();
BookMapper mapper = sqlSession.getMapper(BookMapper.class);
List<Book> query = mapper.query();
List<Book> query1 = mapper.query();

如果是在spring的场景下呢?此时就要求在同一个事务下才行,举个例子:

1
2
3
4
5
6
7
@Transactional
@Override
public List<SysPost> selectPostList(SysPost post)
{
postMapper.selectPostList(post);
return postMapper.selectPostList(post);
}

第二次查相同sql语句的时将直接从一级缓存取,不用查数据库。下面研究下其源码:

如何分析呢?首先需要明确既然要命中一级缓存那么最重要的是得使用相同的sqlSession,所以先定位获取SqlSession的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// org.mybatis.spring.SqlSessionTemplate.SqlSessionInterceptor
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 被拦截的目的是获取sqlSession,为了后面反射调用目标方法
SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
}
//............省略..............
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// org.mybatis.spring.SqlSessionUtils
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {

notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);
// spring-tx中很关键的一个组件,其中的resource可用来存线程级的变量
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
// 如果此线程之前放过sqlSession就可以取出来,用先前那个对象
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}

LOGGER.debug(() -> "Creating a new SqlSession");
// 没有则创建一个sqlSession
session = sessionFactory.openSession(executorType);
// 注册到线程变量上
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);

return session;
}

private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
// 只有开启了事务,即@Transactional才为true
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();

if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
LOGGER.debug(() -> "Registering transaction synchronization for SqlSession [" + session + "]");

holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
// 绑定到线程上
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
TransactionSynchronizationManager
.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
} else {
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
LOGGER.debug(() -> "SqlSession [" + session
+ "] was not registered for synchronization because DataSource is not transactional");
} else {
throw new TransientDataAccessResourceException(
"SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
}
} else {
LOGGER.debug(() -> "SqlSession [" + session
+ "] was not registered for synchronization because synchronization is not active");
}

}

经过上述代码可以得出结论:标注了@Transctional开启事务后,第一次使用mybatis查库从线程中获取不到SqlSession,会选择openSession,然后将其放到线程中。那么,后续事务中还需查库的时候,由于使用的是相同的线程,则可取出先前存入的那个SqlSession对象使用,即前后两次查库使用的SqlSession是完全一样的。此时如果查询的sql再完全一样,就符合命中一级缓存的规则。

二级缓存

二级缓存是全局的,是mappedStatement级别的,存在线程安全问题。

首先看看查询二级缓存的时机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// org.apache.ibatis.executor.CachingExecutor
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
// 先查二级缓存,来自ms,即mapper定义的xml文件中是否标注<cache/>
// 这个cache一般是SynchronizedCache,在增删改查上加了synchronized关键字,避免线程安全问题
Cache cache = ms.getCache();
if (cache != null) {
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, boundSql);
@SuppressWarnings("unchecked")
// 从TransactionalCacheManager组件中查缓存
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

看看TransactionalCacheManager的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TransactionalCacheManager {
// 存缓存
private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();

public void clear(Cache cache) {
getTransactionalCache(cache).clear();
}

public Object getObject(Cache cache, CacheKey key) {
return getTransactionalCache(cache).getObject(key);
}
//...........省略..............
}

更关键的是TransactionalCache的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class TransactionalCache implements Cache {

private static final Log log = LogFactory.getLog(TransactionalCache.class);
//装饰的对象,一般是SynchronizedCache 取个别名 全局缓存
private final Cache delegate;
private boolean clearOnCommit;
// 在未commit前把对象放这 取个别名事务缓存
private final Map<Object, Object> entriesToAddOnCommit;
private final Set<Object> entriesMissedInCache;

public TransactionalCache(Cache delegate) {
this.delegate = delegate;
this.clearOnCommit = false;
this.entriesToAddOnCommit = new HashMap<>();
this.entriesMissedInCache = new HashSet<>();
}

@Override
public String getId() {
return delegate.getId();
}


// 查缓存是直接查全局缓存
@Override
public Object getObject(Object key) {
// issue #116
Object object = delegate.getObject(key);
if (object == null) {
entriesMissedInCache.add(key);
}
// issue #146
if (clearOnCommit) {
return null;
} else {
return object;
}
}
// 放缓存先放 事务缓存
@Override
public void putObject(Object key, Object object) {
entriesToAddOnCommit.put(key, object);
}

// commit的时候把事务缓存的数据刷新到全局缓存中
public void commit() {
if (clearOnCommit) {
delegate.clear();
}
flushPendingEntries();
reset();
}

private void flushPendingEntries() {
for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
delegate.putObject(entry.getKey(), entry.getValue());
}
for (Object entry : entriesMissedInCache) {
if (!entriesToAddOnCommit.containsKey(entry)) {
delegate.putObject(entry, null);
}
}
}
//...................省略.....................

}

上面分析了二级缓存的结构,以及查二级缓存的时机,可以看到二级缓存存在一些问题,比如线程安全及事务隔离级别的问题。

其中线程安全问题可通过SynchronizedCache解决。

关于事务隔离的问题,Mybaits框架是有考虑的:二级缓存的结构设计就是针对这点,专门设计了entriesToAddOnCommit这个事务级缓存对象。二级缓存支持的事务级别为读已提交,不支持可重复读。只有commit操作后数据才会更新到全局缓存中,这就避免了其他线程脏读的问题。但是无法解决重复读的问题,即在同一个事务中多次读取相同数据是无法保证一致的。

二级缓存问题多

目前工作中还未遇到使用Mybatis二级缓存的例子,一方面是有更好的缓存方案,另一方面是问题很多,最大的问题就是脏读。

因为二级缓存是和mappedStatement绑定,一个namespace一个ms,或者说一个mapper接口一个ms。如何保证对一张表的增删改查操作严格限制在一个mapper接口呢?实际上很难做到,一方面是开发人员写代码的不确定性,另一方面是复杂业务不可避免需要多表关联操作,这些都有产生脏读的可能性:因为某个接口改了数据后二级缓存无法及时更新。所以,Mybatis的二级缓存不建议使用。

最后

学习了mybatis缓存机制后收获很大,一方面见识了优秀框架是如何设计缓存的,另一方面对mybatis的理解也更深刻,甚至也顺带加深了对spring对事务的解决原理的理解。当然,本文的源码分析也只是本人粗浅的理解,肯定有很多不到位甚至错误的地方,希望以后经验丰富后不断改正。

参考文章:

MyBatis 源码分析系列文章合集