0%

mybatis之执行器Executor

执行器Executor

Executor定义了数据库操作的基本方法,SqlSession接口中的功能都是基于Executor接口实现的,真正执行java和数据库交互的类,负责维护一级缓存和二级缓存,并提供事务管理的相关操作,会将数据库相关操作委托给StatementHandler完成

1
2
3
4
5
public enum ExecutorType {
SIMPLE, // 对应SimpleExecutor 简易执行器,默认,每执行一次update或select,就开启一个Statement对象,用完立刻关闭
REUSE, // 对应ReuseExecutor,重用预处理语句,执行update或select,以sql作为key查找Statement对象,存在就使用,不存在就创建,用完后,不关闭Statement对象,而是放置于map中
BATCH //对应BatchExecutor,重用语句和批量更新,完成批处理
}

执行器的流程

  • 创建: Executor通过Configuration对象中newExecutor()方法中选择相应的执行器生成

    1
    Executor executor = configuration.newExecutor(tx, execType)
  • 执行过程: Executor会先调用StatementHandler的prepare()方法预编译SQL语句,同时设置一些基本的运行参数,然后调用StatementHandler的parameterize()方法(实际上是启用了ParameterHandler设置参数)设置参数,resultHandler再组装查询结果返回调用者完成一次查询完成预编译,简单总结起来就是即先预编译SQL语句,之后设置参数(跟JDBC的prepareStatement过程类似)最后如果有查询结果就会组装返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public interface Executor {
ResultHandler NO_RESULT_HANDLER = null;
// 执行insert、update、delete
int update(MappedStatement var1, Object var2) throws SQLException;
// 查询,带分页和缓存
<E> List<E> query(MappedStatement var1, Object var2, RowBounds var3, ResultHandler var4, CacheKey var5, BoundSql var6) throws SQLException;
// 查询,带分页
<E> List<E> query(MappedStatement var1, Object var2, RowBounds var3, ResultHandler var4) throws SQLException;
// 存储过程
<E> Cursor<E> queryCursor(MappedStatement var1, Object var2, RowBounds var3) throws SQLException;
// 刷新批处理语句
List<BatchResult> flushStatements() throws SQLException;
// 事务提交
void commit(boolean var1) throws SQLException;
// 事务回滚
void rollback(boolean var1) throws SQLException;
// 创建缓存中用到的CacheKey对象
CacheKey createCacheKey(MappedStatement var1, Object var2, RowBounds var3, BoundSql var4);
// 根据CacheKey对象查找缓存
boolean isCached(MappedStatement var1, CacheKey var2);
// 清空一级缓存
void clearLocalCache();
// 延迟加载一级缓存中的数据
void deferLoad(MappedStatement var1, MetaObject var2, String var3, CacheKey var4, Class<?> var5);

Transaction getTransaction();

void close(boolean var1);

boolean isClosed();

void setExecutorWrapper(Executor var1);
}

Executor有两个实现类BaseExecutor和CachingExecutor

BaseExecutor

BaseExecutor是一个实现了Executor接口的抽象类,实现了Executor接口的大部分方法,主要提供了缓存管理和事务管理的基本功能,继承BaseExecutor需要实现四个基本方法来实现数据库的相关操作,doUpdate()、doQuery()、doQueryCursor()、doFlushStatement()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
public abstract class BaseExecutor implements Executor {
private static final Log log = LogFactory.getLog(BaseExecutor.class);
// 事务
protected Transaction transaction;
// 封装Executor对象
protected Executor wrapper;
// 延迟加载队列
protected ConcurrentLinkedQueue<BaseExecutor.DeferredLoad> deferredLoads;
// 一级缓存,用于缓存查询结果集映射得到的结果对象
protected PerpetualCache localCache;
// 一级缓存,用于缓存输出类型参数(存储过程)
protected PerpetualCache localOutputParameterCache;
protected Configuration configuration;
// 用来记录嵌套查询层数
protected int queryStack;
private boolean closed;

protected BaseExecutor(Configuration configuration, Transaction transaction) {
this.transaction = transaction;
this.deferredLoads = new ConcurrentLinkedQueue();
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
this.configuration = configuration;
this.wrapper = this;
}

public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (this.closed) {
throw new ExecutorException("Executor was closed.");
} else {
// 先清除一级缓存
this.clearLocalCache();
return this.doUpdate(ms, parameter);
}
}

public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
// 获取BoundSql
BoundSql boundSql = ms.getBoundSql(parameter);
// 创建cacheKey
CacheKey key = this.createCacheKey(ms, parameter, rowBounds, boundSql);
// 执行查询
return this.query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}

public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
if (this.closed) {
throw new ExecutorException("Executor was closed.");
} else {
//非嵌套查询且在<select>节点中配置flushCache属性时,清空一级缓存
if (this.queryStack == 0 && ms.isFlushCacheRequired()) {
this.clearLocalCache();
}

List list;
try {
// 增加嵌套层数
++this.queryStack;
// 查询一级缓存
list = resultHandler == null ? (List)this.localCache.getObject(key) : null;
if (list != null) {
// 针对于存储过程调用处理,在一级缓存命中时,获取缓存中保存的输出类型参数,并设置到用户传入的实参中
this.handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
// 缓存没有命中,调用doQuery查询数据库
list = this.queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
// 当前查询完成,查询层数减少
--this.queryStack;
}

// 延迟加载
// 在外层查询结束时,所有嵌套查询已经完成,相关缓存项也已经完全加载,触发DeferredLoad加载一级缓存中记录的嵌套查询的结果对象
if (this.queryStack == 0) {
Iterator var8 = this.deferredLoads.iterator();

while(var8.hasNext()) {
BaseExecutor.DeferredLoad deferredLoad = (BaseExecutor.DeferredLoad)var8.next();
deferredLoad.load();
}
// 加载完成后,清空deferredLoads
this.deferredLoads.clear();
// LocalCacheScope本地缓存的作用域
if (this.configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
this.clearLocalCache();
}
}

return list;
}
}

public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameter);
return this.doQueryCursor(ms, parameter, rowBounds, boundSql);
}

public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
if (this.closed) {
throw new ExecutorException("Executor was closed.");
} else {
// 创建延迟加载对象
BaseExecutor.DeferredLoad deferredLoad = new BaseExecutor.DeferredLoad(resultObject, property, key, this.localCache, this.configuration, targetType);
// 以及缓存中已经记录了指定查询的结果对象,直接从缓存中加载对象,设置到外层对象中
if (deferredLoad.canLoad()) {
deferredLoad.load();
} else {
// 将DeferredLoad对象添加到deferredLoads队列中,待整个外层查询结束后,在加载该结果对象
this.deferredLoads.add(new BaseExecutor.DeferredLoad(resultObject, property, key, this.localCache, this.configuration, targetType));
}

}
}
// cachekey由MappedStatement的id、offset、limit、SQL语句(包含?占位符)、用户传的实参以及Environment的id五部分组成
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (this.closed) {
throw new ExecutorException("Executor was closed.");
} else {
// 创建cacheKey
CacheKey cacheKey = new CacheKey();
// 将MappedStatement的id添加到CacheKey中
cacheKey.update(ms.getId());
// 将offset添加到cacheKey中
cacheKey.update(rowBounds.getOffset());
// 将limit添加到cacheKey中
cacheKey.update(rowBounds.getLimit());
// 将SQL语句添加到cacheKey中
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
Iterator var8 = parameterMappings.iterator();

// 获取用户传入的实参,添加到cacheKey中
while(var8.hasNext()) {
ParameterMapping parameterMapping = (ParameterMapping)var8.next();
// 过滤掉输出类型的参数
if (parameterMapping.getMode() != ParameterMode.OUT) {
String propertyName = parameterMapping.getProperty();
Object value;
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = this.configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}

cacheKey.update(value);
}
}
// 如果Environment不为空的话,把id添加到cachekey中
if (this.configuration.getEnvironment() != null) {
cacheKey.update(this.configuration.getEnvironment().getId());
}

return cacheKey;
}
}

public boolean isCached(MappedStatement ms, CacheKey key) {
return this.localCache.getObject(key) != null;
}

public List<BatchResult> flushStatements() throws SQLException {
// 执行Executor中缓存的SQL语句
return this.flushStatements(false);
}

public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
if (this.closed) {
throw new ExecutorException("Executor was closed.");
} else {
// true表示不执行 false表示执行
return this.doFlushStatements(isRollBack);
}
}

public void commit(boolean required) throws SQLException {
if (this.closed) {
throw new ExecutorException("Cannot commit, transaction is already closed");
} else {
// 清空一级缓存
this.clearLocalCache();
// 执行缓存中的SQL语句
this.flushStatements();
if (required) {
// 提交事务
this.transaction.commit();
}

}
}

public void rollback(boolean required) throws SQLException {
if (!this.closed) {
try {
this.clearLocalCache();
this.flushStatements(true);
} finally {
if (required) {
this.transaction.rollback();
}

}
}

}

public Transaction getTransaction() {
if (this.closed) {
throw new ExecutorException("Executor was closed.");
} else {
return this.transaction;
}
}

public void close(boolean forceRollback) {
try {
try {
this.rollback(forceRollback);
} finally {
if (this.transaction != null) {
this.transaction.close();
}

}
} catch (SQLException var11) {

} finally {
this.transaction = null;
this.deferredLoads = null;
this.localCache = null;
this.localOutputParameterCache = null;
this.closed = true;
}

}

public boolean isClosed() {
return this.closed;
}



public void clearLocalCache() {
if (!this.closed) {
this.localCache.clear();
this.localOutputParameterCache.clear();
}

}

protected abstract int doUpdate(MappedStatement var1, Object var2) throws SQLException;

protected abstract List<BatchResult> doFlushStatements(boolean var1) throws SQLException;

protected abstract <E> List<E> doQuery(MappedStatement var1, Object var2, RowBounds var3, ResultHandler var4, BoundSql var5) throws SQLException;

protected abstract <E> Cursor<E> doQueryCursor(MappedStatement var1, Object var2, RowBounds var3, BoundSql var4) throws SQLException;

protected void closeStatement(Statement statement) {
if (statement != null) {
try {
statement.close();
} catch (SQLException var3) {
}
}

}

protected void applyTransactionTimeout(Statement statement) throws SQLException {
StatementUtil.applyTransactionTimeout(statement, statement.getQueryTimeout(), this.transaction.getTimeout());
}

private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
Object cachedParameter = this.localOutputParameterCache.getObject(key);
if (cachedParameter != null && parameter != null) {
MetaObject metaCachedParameter = this.configuration.newMetaObject(cachedParameter);
MetaObject metaParameter = this.configuration.newMetaObject(parameter);
Iterator var8 = boundSql.getParameterMappings().iterator();

while(var8.hasNext()) {
ParameterMapping parameterMapping = (ParameterMapping)var8.next();
if (parameterMapping.getMode() != ParameterMode.IN) {
String parameterName = parameterMapping.getProperty();
Object cachedValue = metaCachedParameter.getValue(parameterName);
metaParameter.setValue(parameterName, cachedValue);
}
}
}
}

}

// 从数据库中查询数据
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
// ExecutionPlaceholder.EXECUTION_PLACEHOLDER 缓存的占位符
// 在缓存中添加占位符
this.localCache.putObject(key, ExecutionPlaceholder.EXECUTION_PLACEHOLDER);

List list;
try {
// 查数据库
list = this.doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
// 删除占位符
this.localCache.removeObject(key);
}
// 将真正的结果对象放入一级缓存中
this.localCache.putObject(key, list);
// 是否为存储过程
if (ms.getStatementType() == StatementType.CALLABLE) {
// 缓存输出类型的参数
this.localOutputParameterCache.putObject(key, parameter);
}

return list;
}

protected Connection getConnection(Log statementLog) throws SQLException {
Connection connection = this.transaction.getConnection();
return statementLog.isDebugEnabled() ? ConnectionLogger.newInstance(connection, statementLog, this.queryStack) : connection;
}

public void setExecutorWrapper(Executor wrapper) {
this.wrapper = wrapper;
}

private static class DeferredLoad {
// 外层对象对应的MetaObject
private final MetaObject resultObject;
// 延迟加载的属性名称
private final String property;
// 延迟加载的属性的类型
private final Class<?> targetType;
// 延迟加载的结果对象在一级缓存中相应的CacheKey
private final CacheKey key;
// 一级缓存,与BaseExecutor.localCache字段指向同一个对象
private final PerpetualCache localCache;
private final ObjectFactory objectFactory;
// 负责结果对象的类型转换
private final ResultExtractor resultExtractor;

public DeferredLoad(MetaObject resultObject, String property, CacheKey key, PerpetualCache localCache, Configuration configuration, Class<?> targetType) {
this.resultObject = resultObject;
this.property = property;
this.key = key;
this.localCache = localCache;
this.objectFactory = configuration.getObjectFactory();
this.resultExtractor = new ResultExtractor(configuration, this.objectFactory);
this.targetType = targetType;
}
// 检测缓存项是否已经完全加载到缓存中 ExecutionPlaceholder.EXECUTION_PLACEHOLDER为缓存的占位符
public boolean canLoad() {
return this.localCache.getObject(this.key) != null && this.localCache.getObject(this.key) != ExecutionPlaceholder.EXECUTION_PLACEHOLDER;
}

public void load() {
List<Object> list = (List)this.localCache.getObject(this.key);
Object value = this.resultExtractor.extractObjectFromList(list, this.targetType);
this.resultObject.setValue(this.property, value);
}
}
}

一级缓存(默认开启)

一级缓存是会话级别的缓存,在mybatis中每创建一个SqlSession对象,就表示开启了一次数据库会话。在一次会话中,应用程序可能会在短时间内,如一次事务内,反复执行完全相同的查询语句,如果不对数据进行缓存,那么每次查询都会执行一次数据库查询操作,而多次完全相同的、时间间隔较短的查询语句得到的结果集极有可能完全相同,会造成数据库资源的浪费。

mybatis在Executor中会建立一个缓存,将每次查询的结果对象缓存起来。在执行查询操作时,会先查询一级缓存,如果其中存在完全一样的查询语句,则直接从一级缓存中取出相应的结果对象返给用户,减少数据库的压力。

一级缓存的生命周期与SqlSession相同(其实是与SqlSession中封装的Executor对象的生命周期相同),当调用Executor.close方法时,Executor所对应的一级缓存就会不可用。在调用Executor.update方法时,会清空一级缓存。

SimpleExecutor

SimpleExecutor继承了BaseExecutor抽象类,如果不进行配置的话,该执行器为默认执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class SimpleExecutor extends BaseExecutor {
public SimpleExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}

public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;

int var6;
try {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, (ResultHandler)null, (BoundSql)null);
stmt = this.prepareStatement(handler, ms.getStatementLog());
var6 = handler.update(stmt);
} finally {
this.closeStatement(stmt);
}

return var6;
}

public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;

List var9;
try {
Configuration configuration = ms.getConfiguration();
// 创建StatementHandler对象,返回的为RoutingStatementHandler对象
StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
// 创建statement
stmt = this.prepareStatement(handler, ms.getStatementLog());
// 调用StatementHandler.query方法 执行sql语句 并使用resultHandler进行结果集的映射
var9 = handler.query(stmt, resultHandler);
} finally {
this.closeStatement(stmt);
}

return var9;
}

protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameter, rowBounds, (ResultHandler)null, boundSql);
Statement stmt = this.prepareStatement(handler, ms.getStatementLog());
Cursor<E> cursor = handler.queryCursor(stmt);
stmt.closeOnCompletion();
return cursor;
}

public List<BatchResult> doFlushStatements(boolean isRollback) {
return Collections.emptyList();
}

private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Connection connection = this.getConnection(statementLog);
// 预编译sql语句,设置一些基本的运行参数
Statement stmt = handler.prepare(connection, this.transaction.getTimeout());
// 设置参数
handler.parameterize(stmt);
return stmt;
}
}

ReuseExecutor

重用statement对象是一种常用的优化手段,可以减少SQL预编译的开销以及创建和销毁Statement对象的开销,提高性能。

ReuseExecutor提供了Statement对象重用的功能,通过statementMap字段缓存使用过的Statement对象,key是SQL语句,value是sql所对应的Statement对象。

ReuseExecutor与SimpleExecutor的区别在于prepareStatement方法,SimpleExecutor每次都会通过connection创建新的Statement对象,而ReuseExecutor会先尝试从statementMap中获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class ReuseExecutor extends BaseExecutor {
private final Map<String, Statement> statementMap = new HashMap();

public ReuseExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}

public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, (ResultHandler)null, (BoundSql)null);
Statement stmt = this.prepareStatement(handler, ms.getStatementLog());
return handler.update(stmt);
}

public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
Statement stmt = this.prepareStatement(handler, ms.getStatementLog());
return handler.query(stmt, resultHandler);
}

protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameter, rowBounds, (ResultHandler)null, boundSql);
Statement stmt = this.prepareStatement(handler, ms.getStatementLog());
return handler.queryCursor(stmt);
}

public List<BatchResult> doFlushStatements(boolean isRollback) {
Iterator var2 = this.statementMap.values().iterator();

while(var2.hasNext()) {
Statement stmt = (Statement)var2.next();
// 关闭statement
this.closeStatement(stmt);
}
// 清空缓存
this.statementMap.clear();
return Collections.emptyList();
}

private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
BoundSql boundSql = handler.getBoundSql();
String sql = boundSql.getSql();
Statement stmt;
// 检测是否存储了该SQL
if (this.hasStatementFor(sql)) {
stmt = this.getStatement(sql);
// 修改超时时间
this.applyTransactionTimeout(stmt);
} else {
Connection connection = this.getConnection(statementLog);
// 创建新的Statement对象
stmt = handler.prepare(connection, this.transaction.getTimeout());
// 缓存
this.putStatement(sql, stmt);
}
// 处理占位符
handler.parameterize(stmt);
return stmt;
}

private boolean hasStatementFor(String sql) {
try {
Statement statement = (Statement)this.statementMap.get(sql);
return statement != null && !statement.getConnection().isClosed();
} catch (SQLException var3) {
return false;
}
}

private Statement getStatement(String s) {
return (Statement)this.statementMap.get(s);
}

private void putStatement(String sql, Statement stmt) {
this.statementMap.put(sql, stmt);
}
}

BatchExecutor

在执行一条SQL语句,会将SQL语句以及相关参数通过网络发送到数据库系统。对于频繁操作数据库的应用来说,如果执行一条SQL就向数据库发送一次请求,就会在网络通信商浪费很多时间。使用批处理的优化方式可以在客户端缓存多条SQL语句,并将多条语句打包发送给数据库执行,从而减少网络开销,提高性能。

在批量执行多条SQL语句时,每次向数据库发送的SQL语句条数是有上限的,如果超过上限,数据库会拒绝执行这些SQL语句并抛出异常。

JDBC的批处理只支持insert、update和delete,不支持select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
public class BatchExecutor extends BaseExecutor {
public static final int BATCH_UPDATE_RETURN_VALUE = -2147482646;
// 缓存多个statement,每个statement中都存储了多条SQL语句
private final List<Statement> statementList = new ArrayList();
// 记录批处理结果,BatchResult中通过updateCounts字段来记录每个statement执行的结果
private final List<BatchResult> batchResultList = new ArrayList();
// 记录当前执行的SQL
private String currentSql;
// 记录当前执行的MappedStatement
private MappedStatement currentStatement;

public BatchExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}

public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, (ResultHandler)null, (BoundSql)null);
BoundSql boundSql = handler.getBoundSql();
String sql = boundSql.getSql();
Statement stmt;
// 如果当前执行的SQL与上次执行的相同,且对应的MappedStatement也相同
if (sql.equals(this.currentSql) && ms.equals(this.currentStatement)) {
// 获取集合中的最后一个
int last = this.statementList.size() - 1;
stmt = (Statement)this.statementList.get(last);
this.applyTransactionTimeout(stmt);
// 绑定实参,处理?占位符
handler.parameterize(stmt);
BatchResult batchResult = (BatchResult)this.batchResultList.get(last);
batchResult.addParameterObject(parameterObject);
} else {
Connection connection = this.getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, this.transaction.getTimeout());
handler.parameterize(stmt);
this.currentSql = sql;
this.currentStatement = ms;
this.statementList.add(stmt);
this.batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
// 通过Statement.addBatch方法添加SQL语句
handler.batch(stmt);
return -2147482646;
}

public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;

List var10;
try {
this.flushStatements();
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
Connection connection = this.getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, this.transaction.getTimeout());
handler.parameterize(stmt);
var10 = handler.query(stmt, resultHandler);
} finally {
this.closeStatement(stmt);
}

return var10;
}

protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
this.flushStatements();
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(this.wrapper, ms, parameter, rowBounds, (ResultHandler)null, boundSql);
Connection connection = this.getConnection(ms.getStatementLog());
Statement stmt = handler.prepare(connection, this.transaction.getTimeout());
handler.parameterize(stmt);
Cursor<E> cursor = handler.queryCursor(stmt);
stmt.closeOnCompletion();
return cursor;
}

public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
boolean var17 = false;

Statement stmt;
ArrayList var21;
Iterator var22;
label179: {
List var3;
try {
var17 = true;
ArrayList results = new ArrayList();
if (!isRollback) {
int i = 0;

for(int n = this.statementList.size(); i < n; ++i) {
stmt = (Statement)this.statementList.get(i);
this.applyTransactionTimeout(stmt);
BatchResult batchResult = (BatchResult)this.batchResultList.get(i);

try {
// 调用Statement.executeBatch来批量执行SQL 返回的int[] 来更新BatchResult的updateCounts字段
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
List<Object> parameterObjects = batchResult.getParameterObjects();
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator)keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) {
Iterator var10 = parameterObjects.iterator();

while(var10.hasNext()) {
Object parameter = var10.next();
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}

this.closeStatement(stmt);
} catch (BatchUpdateException var18) {
StringBuilder message = new StringBuilder();
message.append(batchResult.getMappedStatement().getId()).append(" (batch index #").append(i + 1).append(")").append(" failed.");
if (i > 0) {
message.append(" ").append(i).append(" prior sub executor(s) completed successfully, but will be rolled back.");
}

throw new BatchExecutorException(message.toString(), var18, results, batchResult);
}

results.add(batchResult);
}

var21 = results;
var17 = false;
break label179;
}

var3 = Collections.emptyList();
var17 = false;
} finally {
if (var17) {
Iterator var13 = this.statementList.iterator();

while(var13.hasNext()) {
Statement stmt = (Statement)var13.next();
this.closeStatement(stmt);
}

this.currentSql = null;
this.statementList.clear();
this.batchResultList.clear();
}
}

var22 = this.statementList.iterator();

while(var22.hasNext()) {
stmt = (Statement)var22.next();
this.closeStatement(stmt);
}

this.currentSql = null;
this.statementList.clear();
this.batchResultList.clear();
return var3;
}

var22 = this.statementList.iterator();

while(var22.hasNext()) {
stmt = (Statement)var22.next();
this.closeStatement(stmt);
}

this.currentSql = null;
this.statementList.clear();
this.batchResultList.clear();
return var21;
}
}

CachingExecutor

CachingExecutor提供了二级缓存的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
public class CachingExecutor implements Executor {
// 底层的Executor
private final Executor delegate;
// 用于管理CachingExecutor使用的二级缓存对象 key为二级缓存对象 value为TransacionalCache对象
private final TransactionalCacheManager tcm = new TransactionalCacheManager();

public CachingExecutor(Executor delegate) {
this.delegate = delegate;
delegate.setExecutorWrapper(this);
}

public Transaction getTransaction() {
return this.delegate.getTransaction();
}

public void close(boolean forceRollback) {
try {
if (forceRollback) {
this.tcm.rollback();
} else {
this.tcm.commit();
}
} finally {
this.delegate.close(forceRollback);
}

}

public boolean isClosed() {
return this.delegate.isClosed();
}

public int update(MappedStatement ms, Object parameterObject) throws SQLException {
this.flushCacheIfRequired(ms);
return this.delegate.update(ms, parameterObject);
}

public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
this.flushCacheIfRequired(ms);
return this.delegate.queryCursor(ms, parameter, rowBounds);
}

public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
// 获取BoundSql对象
BoundSql boundSql = ms.getBoundSql(parameterObject);
// 创建cacheKey对象
CacheKey key = this.createCacheKey(ms, parameterObject, rowBounds, boundSql);
return this.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
// 获取查询有所在命名空间对应的二级缓存
Cache cache = ms.getCache();
// 是否开启了二级缓存
if (cache != null) {
// 根据<select>节点的配置,决定是否需要清空二级缓存
this.flushCacheIfRequired(ms);
// 检测SQL节点的useCache配置以及是否使用了resultHandler配置
if (ms.isUseCache() && resultHandler == null) {
// 二级缓存不能保存输出类型的参数,如果查询操作调用了包含输出类型的存储过程,则报错
this.ensureNoOutParams(ms, boundSql);
// 查询二级缓存
List<E> list = (List)this.tcm.getObject(cache, key);
if (list == null) {
// 二级缓存不存在,调用封装的executor对象的query方法
list = this.delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
// 将查询结果存储到TransactionalCache.entriesToAddOnCommit中
this.tcm.putObject(cache, key, list);
}

return list;
}
}
// 如果没有启动二级缓存,则直接调用executor的query方法
return this.delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}

public List<BatchResult> flushStatements() throws SQLException {
return this.delegate.flushStatements();
}

public void commit(boolean required) throws SQLException {
this.delegate.commit(required);
this.tcm.commit();
}

public void rollback(boolean required) throws SQLException {
try {
this.delegate.rollback(required);
} finally {
if (required) {
this.tcm.rollback();
}

}

}

private void ensureNoOutParams(MappedStatement ms, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
Iterator var3 = boundSql.getParameterMappings().iterator();

while(var3.hasNext()) {
ParameterMapping parameterMapping = (ParameterMapping)var3.next();
if (parameterMapping.getMode() != ParameterMode.IN) {
throw new ExecutorException("Caching stored procedures with OUT params is not supported. Please configure useCache=false in " + ms.getId() + " statement.");
}
}
}

}

public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
return this.delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
}

public boolean isCached(MappedStatement ms, CacheKey key) {
return this.delegate.isCached(ms, key);
}

public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
this.delegate.deferLoad(ms, resultObject, property, key, targetType);
}

public void clearLocalCache() {
this.delegate.clearLocalCache();
}

private void flushCacheIfRequired(MappedStatement ms) {
Cache cache = ms.getCache();
if (cache != null && ms.isFlushCacheRequired()) {
this.tcm.clear(cache);
}

}

public void setExecutorWrapper(Executor executor) {
throw new UnsupportedOperationException("This method should not be called");
}
}
TransactionalCacheManager

TransactionalCacheManager 用于管理 CachingExecutor 使用的二级缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class TransactionalCacheManager {
// 用来管理CachingExecutor使用的二级缓存
// key为CachingExecutor使用的二级缓存对象,value为对应的TransactionalCache对象
private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap();

public TransactionalCacheManager() {
}

public void clear(Cache cache) {
this.getTransactionalCache(cache).clear();
}

public Object getObject(Cache cache, CacheKey key) {
return this.getTransactionalCache(cache).getObject(key);
}

public void putObject(Cache cache, CacheKey key, Object value) {
this.getTransactionalCache(cache).putObject(key, value);
}

public void commit() {
Iterator var1 = this.transactionalCaches.values().iterator();

while(var1.hasNext()) {
TransactionalCache txCache = (TransactionalCache)var1.next();
txCache.commit();
}

}

public void rollback() {
Iterator var1 = this.transactionalCaches.values().iterator();

while(var1.hasNext()) {
TransactionalCache txCache = (TransactionalCache)var1.next();
txCache.rollback();
}

}
// 获取TransactionalCache对象,所有的调用都是使用的TransactionalCache来调用对应的方法
private TransactionalCache getTransactionalCache(Cache cache) {
return (TransactionalCache)this.transactionalCaches.computeIfAbsent(cache, TransactionalCache::new);
}
}
TransactionalCache

TransactionalCache 实现了 Cache 接口,主要用于保存在某个 SqlSession 的某个事务中需要向某个二级缓存中添加的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class TransactionalCache implements Cache {
private static final Log log = LogFactory.getLog(TransactionalCache.class);
// 底层封装的二级缓存对应的Cache对象
private final Cache delegate;
// true表示当前TransactionalCache不可查询,且提交事务时会将底层Cache清空
private boolean clearOnCommit;
// 暂时存储数据,提交事务后会存储到二级缓存
private final Map<Object, Object> entriesToAddOnCommit;
// 记录缓存未命中的CacheKey对象
private final Set<Object> entriesMissedInCache;

public TransactionalCache(Cache delegate) {
this.delegate = delegate;
this.clearOnCommit = false;
this.entriesToAddOnCommit = new HashMap();
this.entriesMissedInCache = new HashSet();
}

public String getId() {
return this.delegate.getId();
}

public int getSize() {
return this.delegate.getSize();
}

public Object getObject(Object key) {
// 先查缓存中是否有
Object object = this.delegate.getObject(key);
if (object == null) {
// 如果缓存中没有,将key存入到entriesMissedInCache中
this.entriesMissedInCache.add(key);
}

return this.clearOnCommit ? null : object;
}
// putObject不会直接将结果对象记录到二级缓存中,而是暂时保存在entriesToAddOnCommit集合中,在事务提交时才会将结果对象添加到二级缓存中
public void putObject(Object key, Object object) {
// 将缓存项暂存在entriesToAddOnCommit中
this.entriesToAddOnCommit.put(key, object);
}

public Object removeObject(Object key) {
return null;
}

public void clear() {
this.clearOnCommit = true;
this.entriesToAddOnCommit.clear();
}

public void commit() {
if (this.clearOnCommit) {
this.delegate.clear();
}
// 将entriesToAddOnCommit集合中的数据放入二级缓存中
this.flushPendingEntries();
this.reset();
}

public void rollback() {
// 事务回滚会将未命中缓存的数据清除掉
this.unlockMissedEntries();
this.reset();
}

private void reset() {
this.clearOnCommit = false;
this.entriesToAddOnCommit.clear();
this.entriesMissedInCache.clear();
}

// 将entriesToAddOnCommit集合中的数据放入二级缓存中
private void flushPendingEntries() {
Iterator var1 = this.entriesToAddOnCommit.entrySet().iterator();

while(var1.hasNext()) {
Entry<Object, Object> entry = (Entry)var1.next();
// 放入二级缓存
this.delegate.putObject(entry.getKey(), entry.getValue());
}

var1 = this.entriesMissedInCache.iterator();

while(var1.hasNext()) {
Object entry = var1.next();
if (!this.entriesToAddOnCommit.containsKey(entry)) {
this.delegate.putObject(entry, (Object)null);
}
}

}

private void unlockMissedEntries() {
Iterator var1 = this.entriesMissedInCache.iterator();

while(var1.hasNext()) {
Object entry = var1.next();

try {
this.delegate.removeObject(entry);
} catch (Exception var4) {

}
}

}
}

二级缓存

mybatis中的二级缓存是应用级别的缓存,生命周期与应用程序的生命周期相同。

二级缓存相关配置

  • Mybatis-config.xml配置文件中的cacheEnabled,是二级缓存的总开关 只有配置为true时,二级缓存的配置才会生效,默认为true

    1
    2
    3
    <settings>
    <setting name="cacheEnabled" value="true"></setting>
    </settings>
  • mapper文件中配置cache节点或cache-ref节点 可以在命名空间粒度上管理二级缓存的开启和关闭
    配置了节点,在解析时会为该配置文件所对应的命名空间创建相应的Cache对象作为二级缓存,默认为PerpetualCache,可以使用type属性指定自定义的Cache对象。
    配置可以使得多个命名空间共享一个Cache对象

  • <select>节点的useCache属性,表示查询产生的结果是否要保存到二级缓存中。默认为true

欢迎关注我的其它发布渠道