publicvoidinit(){ registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser()); registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser()); registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser()); }
/** * Set the transaction attribute source which is used to find transaction * attributes. This should usually be identical to the source reference * set on the transaction interceptor itself. * @see TransactionInterceptor#setTransactionAttributeSource */ publicvoidsetTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource){ this.transactionAttributeSource = transactionAttributeSource; }
/** * Set the {@link ClassFilter} to use for this pointcut. * Default is {@link ClassFilter#TRUE}. */ publicvoidsetClassFilter(ClassFilter classFilter){ this.pointcut.setClassFilter(classFilter); }
@Override public Pointcut getPointcut(){ returnthis.pointcut; }
// AbstractFallbackTransactionAttributeSource // 获取事务属性 public TransactionAttribute getTransactionAttribute(Method method, Class<?> targetClass){ if (method.getDeclaringClass() == Object.class) { returnnull; }
// First, see if we have a cached value. Object cacheKey = getCacheKey(method, targetClass); TransactionAttribute cached = this.attributeCache.get(cacheKey); if (cached != null) { // Value will either be canonical value indicating there is no transaction attribute, // or an actual transaction attribute. if (cached == NULL_TRANSACTION_ATTRIBUTE) { returnnull; } else { return cached; } } else { // We need to work it out. // 获取事务属性 TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass); // Put it in the cache. if (txAttr == null) { this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); if (txAttr instanceof DefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } this.attributeCache.put(cacheKey, txAttr); } return txAttr; } }
// Ignore CGLIB subclasses - introspect the actual user class. Class<?> userClass = ClassUtils.getUserClass(targetClass); // The method may be on an interface, but we need attributes from the target class. // If the target class is null, the method will be unchanged. Method specificMethod = ClassUtils.getMostSpecificMethod(method, userClass); // If we are dealing with method with generic parameters, find the original method. specificMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
// First try is the method in the target class. // 先尝试解析方法上有没有@Transactional注解 TransactionAttribute txAttr = findTransactionAttribute(specificMethod); if (txAttr != null) { return txAttr; }
// Second try is the transaction attribute on the target class. // 如果方法上没有,则解析类上的 txAttr = findTransactionAttribute(specificMethod.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; }
if (specificMethod != method) { // Fallback is to look at the original method. txAttr = findTransactionAttribute(method); if (txAttr != null) { return txAttr; } // Last fallback is the class of the original method. txAttr = findTransactionAttribute(method.getDeclaringClass()); if (txAttr != null && ClassUtils.isUserLevelMethod(method)) { return txAttr; } }
// If the transaction attribute is null, the method is non-transactional. // 获取事务属性 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); // 事务管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); // 方法的唯一标识 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); // 区分不同的事务管理器 // 对于CallbackPreferringPlatformTransactionManager类型的事务管理器,需要回调函数来实现事务的创建和提交,即声明式事务 // 对于非CallbackPreferringPlatformTransactionManager类型的,不需要通过回调函数来实现事务的创建和提交,即编程式事务 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 创建事务,同时把创建事务过程中得到的信息放到TransactionInfo中,保存当前事务状态 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. // 沿着处理器链进行调用,执行被增强方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception // 事务处理方法调用中出现异常,事务处理需要根据具体情况进行回滚或者提交 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { // 重置掉ThreadLocal中的信息transactionInfoHolder.set(this.oldTransactionInfo) cleanupTransactionInfo(txInfo); } // 事务提交 commitTransactionAfterReturning(txInfo); return retVal; }
else { // 需要通过回调方法操作事务 Object result; final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status){ TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { thrownew ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; returnnull; } } finally { cleanupTransactionInfo(txInfo); } } }); } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; }
// Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } }
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method... // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. }
// We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. // 将当前的TransactionInfo和线程进行绑定,同时TransactionInfo中由一个变量来保存以前的TransactionInfo,在请求事务时会创建TransactionInfo txInfo.bindToThread(); return txInfo; }
// Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } // 判断当前线程是否存在事务 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. // 如果已经存在事务 return handleExistingTransaction(definition, transaction, debugEnabled); }
// Check definition settings for new transaction. // 事务超时 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { thrownew InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); }
// No existing transaction found -> check propagation behavior to find out how to proceed. // 当前线程不存在事务,但是TransactionDefinition被声明为PROPAGATION_MANDATORY,则抛出异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { thrownew IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } // 当前线程不存在事务,但是TransactionDefinition被声明为PROPAGATION_REQUIRED或者PROPAGATION_REQUIRES_NEW或者PROPAGATION_NESTED,则新建事务 elseif (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 空挂起 SuspendedResourcesHolder suspendedResources = suspend(null); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 开始创建事务,包括设置ConnectionHolder、隔离级别、timeout,如果是新连接,则绑定到当前线程 doBegin(transaction, definition); // 同步事务的设置 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 需要把自动提交事务关闭,由spring控制提交 if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); }
int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); }
// Bind the connection holder to the thread. // 如果是新开的连接,需要把Connection和线程绑定 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } }
catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } thrownew CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
// Remove the connection holder from the thread, if exposed. if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.unbindResource(this.dataSource); }
if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); }
txObject.getConnectionHolder().clear(); }
/** * Prepare the transactional {@code Connection} right after transaction begin. * <p>The default implementation executes a "SET TRANSACTION READ ONLY" statement * if the {@link #setEnforceReadOnly "enforceReadOnly"} flag is set to {@code true} * and the transaction definition indicates a read-only transaction. * <p>The "SET TRANSACTION READ ONLY" is understood by Oracle, MySQL and Postgres * and may work with other databases as well. If you'd like to adapt this treatment, * override this method accordingly. * @param con the transactional JDBC Connection * @param definition the current transaction definition * @throws SQLException if thrown by JDBC API * @since 4.3.7 * @see #setEnforceReadOnly */ protectedvoidprepareTransactionalConnection(Connection con, TransactionDefinition definition) throws SQLException {
/** * DataSource transaction object, representing a ConnectionHolder. * Used as transaction object by DataSourceTransactionManager. */ privatestaticclassDataSourceTransactionObjectextendsJdbcTransactionObjectSupport{