0%

事务源码分析

事务源码分析

spring的事务处理模块是通过AOP功能来实现声明式事务处理的,通过TransactionProxyFactoryBean可以生成Proxy代理对象,在该代理对象中,通过TransactionInterceptor来完成对代理方法的拦截

spring事务处理

TransactionProxyFactoryBean

TransactionProxyFactoryBean继承关系

通过继承关系可以看到,该类实现了InitializingBean接口,所以在创建IOC容器时会执行afterPropertiesSet方法,该方法也是spring事务处理完成AOP配置的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean
implements BeanFactoryAware {
// 拦截器,通过拦截器实现事务处理
private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();

private Pointcut pointcut;

// 事务管理器
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionInterceptor.setTransactionManager(transactionManager);
}

// 事务处理属性
public void setTransactionAttributes(Properties transactionAttributes) {
this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
}

public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.transactionInterceptor.setTransactionAttributeSource(transactionAttributeSource);
}


public void setPointcut(Pointcut pointcut) {
this.pointcut = pointcut;
}


@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.transactionInterceptor.setBeanFactory(beanFactory);
}

// 创建spring aop对事务处理的Advisor
@Override
protected Object createMainInterceptor() {
this.transactionInterceptor.afterPropertiesSet();
if (this.pointcut != null) {
// 如果配置了pointcut,则使用默认的通知器DefaultPointcutAdvisor
return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
}
else {
// Rely on default pointcut.
// 没有配置pointcut,使用TransactionAttributeSourceAdvisor作为通知器,并为通知器设置transactionInterceptor作为拦截器
return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
}
}

@Override
protected void postProcessProxyFactory(ProxyFactory proxyFactory) {
proxyFactory.addInterface(TransactionalProxy.class);
}

}

afterPropertiesSet方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public void afterPropertiesSet() {
// 必须配置target属性,并且是一个bean的引用
if (this.target == null) {
throw new IllegalArgumentException("Property 'target' is required");
}
if (this.target instanceof String) {
throw new IllegalArgumentException("'target' needs to be a bean reference, not a bean name as value");
}
if (this.proxyClassLoader == null) {
this.proxyClassLoader = ClassUtils.getDefaultClassLoader();
}
// ProxyFactory提供了Proxy对象
ProxyFactory proxyFactory = new ProxyFactory();

if (this.preInterceptors != null) {
for (Object interceptor : this.preInterceptors) {
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(interceptor));
}
}

// Add the main interceptor (typically an Advisor).
// spring加入通知器的地方,调用TransactionProxyFactoryBean#createMainInterceptor方法来创建Advisor
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(createMainInterceptor()));

if (this.postInterceptors != null) {
for (Object interceptor : this.postInterceptors) {
proxyFactory.addAdvisor(this.advisorAdapterRegistry.wrap(interceptor));
}
}

proxyFactory.copyFrom(this);
// 创建AOP的目标源
TargetSource targetSource = createTargetSource(this.target);
proxyFactory.setTargetSource(targetSource);

if (this.proxyInterfaces != null) {
proxyFactory.setInterfaces(this.proxyInterfaces);
}
else if (!isProxyTargetClass()) {
// Rely on AOP infrastructure to tell us what interfaces to proxy.
proxyFactory.setInterfaces(
ClassUtils.getAllInterfacesForClass(targetSource.getTargetClass(), this.proxyClassLoader));
}

postProcessProxyFactory(proxyFactory);

this.proxy = proxyFactory.getProxy(this.proxyClassLoader);
}

TransactionInterceptor#invoke方法

对于事务方法的调用,最终会调用TransactionInterceptor#invoke方法,会根据不同的事务处理器以及事务配置情况来进行事务操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
public Object invoke(final MethodInvocation invocation) throws Throwable {
// 获取代理的目标对象
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
@Override
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
});
}



protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {

// If the transaction attribute is null, the method is non-transactional.
// 获取事务属性
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
// 事务管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 区分不同的事务管理器
// 对于CallbackPreferringPlatformTransactionManager类型的事务管理器,需要回调函数来实现事务的创建和提交
// 对于非CallbackPreferringPlatformTransactionManager类型的,不需要通过回调函数来实现事务的创建和提交
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 创建事务,同时把创建事务过程中得到的信息放到TransactionInfo中,保存当前事务状态
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 沿着处理器链进行调用
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 事务处理方法调用中出现异常,事务处理需要根据具体情况进行回滚或者提交
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 重置掉ThreadLocal中的信息transactionInfoHolder.set(this.oldTransactionInfo)
cleanupTransactionInfo(txInfo);
}
// 事务提交
commitTransactionAfterReturning(txInfo);
return retVal;
}

else { // 需要通过回调方法操作事务
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();

// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
new TransactionCallback<Object>() {
@Override
public Object doInTransaction(TransactionStatus status) {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}

// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}
事务创建方法createTransactionIfNecessary
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
protected TransactionInfo createTransactionIfNecessary(
PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

// If no name specified, apply method identification as transaction name.
// 如果没有指定名字,则使用方法特征来作为事务名
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 准备TransactionInfo来封装事务信息
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}


protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) {

TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
txInfo.newTransactionStatus(status);
}
else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}

// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
// 将当前的TransactionInfo和线程进行绑定,同时TransactionInfo中由一个变量来保存以前的TransactionInfo,在请求事务时会创建TransactionInfo
txInfo.bindToThread();
return txInfo;
}

事务处理器

以DataSourceTransactionManager为例

事务管理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
// 注入的数据源
private DataSource dataSource;

private boolean enforceReadOnly = false;



public DataSourceTransactionManager() {
setNestedTransactionAllowed(true);
}


public DataSourceTransactionManager(DataSource dataSource) {
this();
setDataSource(dataSource);
afterPropertiesSet();
}



public void setDataSource(DataSource dataSource) {
if (dataSource instanceof TransactionAwareDataSourceProxy) {

this.dataSource = ((TransactionAwareDataSourceProxy) dataSource).getTargetDataSource();
}
else {
this.dataSource = dataSource;
}
}

/**
* Return the JDBC DataSource that this instance manages transactions for.
*/
public DataSource getDataSource() {
return this.dataSource;
}


public void setEnforceReadOnly(boolean enforceReadOnly) {
this.enforceReadOnly = enforceReadOnly;
}


public boolean isEnforceReadOnly() {
return this.enforceReadOnly;
}

@Override
public void afterPropertiesSet() {
if (getDataSource() == null) {
throw new IllegalArgumentException("Property 'dataSource' is required");
}
}


@Override
public Object getResourceFactory() {
return getDataSource();
}

@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 事务工作是由Connection来完成的,把数据库的Connection放入到ConnectionHolder中,然后封装到DataSourceTransactionObject对象中
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
// 判断是佛存在事务,由ConnectionHolder的isTransactionActive来决定
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
// 事务开始
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);

// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 需要把自动提交事务关闭
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}

prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// Bind the connection holder to the thread.
// 如果是新开的连接,需要把Connection和线程绑定
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}

catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(this.dataSource);
}

@Override
protected void doResume(Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(this.dataSource, suspendedResources);
}
// 事务提交
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
// 事务回滚
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}

@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
txObject.setRollbackOnly();
}

@Override
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

// Remove the connection holder from the thread, if exposed.
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(this.dataSource);
}

// Reset connection.
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}

if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
DataSourceUtils.releaseConnection(con, this.dataSource);
}

txObject.getConnectionHolder().clear();
}


/**
* Prepare the transactional {@code Connection} right after transaction begin.
* <p>The default implementation executes a "SET TRANSACTION READ ONLY" statement
* if the {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true}
* and the transaction definition indicates a read-only transaction.
* <p>The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres
* and may work with other databases as well. If you'd like to adapt this treatment,
* override this method accordingly.
* @param con the transactional JDBC Connection
* @param definition the current transaction definition
* @throws SQLException if thrown by JDBC API
* @since 4.3.7
* @see #setEnforceReadOnly
*/
protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
throws SQLException {

if (isEnforceReadOnly() && definition.isReadOnly()) {
Statement stmt = con.createStatement();
try {
stmt.executeUpdate("SET TRANSACTION READ ONLY");
}
finally {
stmt.close();
}
}
}


/**
* DataSource transaction object, representing a ConnectionHolder.
* Used as transaction object by DataSourceTransactionManager.
*/
private static class DataSourceTransactionObject extends JdbcTransactionObjectSupport {

private boolean newConnectionHolder;

private boolean mustRestoreAutoCommit;

public void setConnectionHolder(ConnectionHolder connectionHolder, boolean newConnectionHolder) {
super.setConnectionHolder(connectionHolder);
this.newConnectionHolder = newConnectionHolder;
}

public boolean isNewConnectionHolder() {
return this.newConnectionHolder;
}

public void setMustRestoreAutoCommit(boolean mustRestoreAutoCommit) {
this.mustRestoreAutoCommit = mustRestoreAutoCommit;
}

public boolean isMustRestoreAutoCommit() {
return this.mustRestoreAutoCommit;
}

public void setRollbackOnly() {
getConnectionHolder().setRollbackOnly();
}

@Override
public boolean isRollbackOnly() {
return getConnectionHolder().isRollbackOnly();
}

@Override
public void flush() {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationUtils.triggerFlush();
}
}
}

}