- Author: HuiFer
- 源码阅读仓库: SourceHot-Spring
- 事务传播
publicenumPropagation { /** * 有事务则加入,没有则新建 */REQUIRED(TransactionDefinition.PROPAGATION_REQUIRED), /** * 有事务就用,如果没有就不开启(继承关系) * @see org.springframework.transaction.support.AbstractPlatformTransactionManager#setTransactionSynchronization */SUPPORTS(TransactionDefinition.PROPAGATION_SUPPORTS), /** * 必须在已有事务中 */MANDATORY(TransactionDefinition.PROPAGATION_MANDATORY), /** * 不管是否已有事务,都要开启新事务,老事务挂起 * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager */REQUIRES_NEW(TransactionDefinition.PROPAGATION_REQUIRES_NEW), /** * 不开启事务 * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager */NOT_SUPPORTED(TransactionDefinition.PROPAGATION_NOT_SUPPORTED), /** * 必须在没有事务的方法中调用,否则抛出异常 */NEVER(TransactionDefinition.PROPAGATION_NEVER), /** * 果已有事务,则嵌套执行,如果没有,就新建(和REQUIRED类似,和REQUIRES_NEW容易混淆) * @see org.springframework.jdbc.datasource.DataSourceTransactionManager */NESTED(TransactionDefinition.PROPAGATION_NESTED); privatefinalintvalue; Propagation(intvalue) { this.value = value; } publicintvalue() { returnthis.value; } }
- 事务级别
publicenumIsolation { /** * @see java.sql.Connection */DEFAULT(TransactionDefinition.ISOLATION_DEFAULT), /** * 读未提交 * * @see java.sql.Connection#TRANSACTION_READ_UNCOMMITTED */READ_UNCOMMITTED(TransactionDefinition.ISOLATION_READ_UNCOMMITTED), /** * 读已提交 * * @see java.sql.Connection#TRANSACTION_READ_COMMITTED */READ_COMMITTED(TransactionDefinition.ISOLATION_READ_COMMITTED), /** * 可重复读 * * @see java.sql.Connection#TRANSACTION_REPEATABLE_READ */REPEATABLE_READ(TransactionDefinition.ISOLATION_REPEATABLE_READ), /** * 可串行化 * * @see java.sql.Connection#TRANSACTION_SERIALIZABLE */SERIALIZABLE(TransactionDefinition.ISOLATION_SERIALIZABLE); privatefinalintvalue; Isolation(intvalue) { this.value = value; } publicintvalue() { returnthis.value; } }
- 下面代码是一个注解方式的事务配置使用
EnableTransactionManagement
来开启事务支持
@ComponentScan(basePackages = "org.source.hot.spring.overview.ioc.tx.declarative") @EnableTransactionManagementpublicclassTxConfig { @Bean// 数据源publicDataSourcedataSource() { DruidDataSourcedataSource = newDruidDataSource(); dataSource.setUsername(""); dataSource.setPassword(""); dataSource.setUrl(""); dataSource.setDriverClassName(com.mysql.jdbc.Driver.class.getName()); returndataSource; } @BeanpublicJdbcTemplatejdbcTemplate(DataSourcedataSource) { returnnewJdbcTemplate(dataSource); } @Bean//事务管理器publicPlatformTransactionManagerplatformTransactionManager(DataSourcedataSource) { returnnewDataSourceTransactionManager(dataSource); } }
- 注解源码如下,关注于
@Import(TransactionManagementConfigurationSelector.class)
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented@Import(TransactionManagementConfigurationSelector.class) public @interface EnableTransactionManagement { booleanproxyTargetClass() defaultfalse; AdviceModemode() defaultAdviceMode.PROXY; intorder() defaultOrdered.LOWEST_PRECEDENCE; }
publicclassTransactionManagementConfigurationSelectorextendsAdviceModeImportSelector<EnableTransactionManagement> { @OverrideprotectedString[] selectImports(AdviceModeadviceMode) { // 根据切面类型进行初始化switch (adviceMode) { casePROXY: // 默认值returnnewString[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()}; caseASPECTJ: returnnewString[] {determineTransactionAspectClass()}; default: returnnull; } } privateStringdetermineTransactionAspectClass() { return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ? TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME : TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME); } }
@Configuration(proxyBeanMethods = false) publicclassProxyTransactionManagementConfigurationextendsAbstractTransactionManagementConfiguration { /** * 事务切面 * @param transactionAttributeSource * @param transactionInterceptor * @return */@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) publicBeanFactoryTransactionAttributeSourceAdvisortransactionAdvisor( TransactionAttributeSourcetransactionAttributeSource, TransactionInterceptortransactionInterceptor) { // 事务切面BeanFactoryTransactionAttributeSourceAdvisoradvisor = newBeanFactoryTransactionAttributeSourceAdvisor(); // 事务属性advisor.setTransactionAttributeSource(transactionAttributeSource); advisor.setAdvice(transactionInterceptor); if (this.enableTx != null) { // 执行顺序advisor.setOrder(this.enableTx.<Integer>getNumber("order")); } returnadvisor; } @Bean@Role(BeanDefinition.ROLE_INFRASTRUCTURE) publicTransactionAttributeSourcetransactionAttributeSource() { returnnewAnnotationTransactionAttributeSource(); } /*** * 事务拦截器 * @param transactionAttributeSource * @return */@Bean@Role(BeanDefinition.ROLE_INFRASTRUCTURE) publicTransactionInterceptortransactionInterceptor( TransactionAttributeSourcetransactionAttributeSource) { TransactionInterceptorinterceptor = newTransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource); if (this.txManager != null) { // 事务管理器注入interceptor.setTransactionManager(this.txManager); } returninterceptor; } }
- 实现了
org.aopalliance.intercept.MethodInterceptor
接口的方法
@Override@NullablepublicObjectinvoke(MethodInvocationinvocation) throwsThrowable { // Work out the target class: may be {@code null}.// The TransactionAttributeSource should be passed the target class// as well as the method, which may be from an interface.Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction...returninvokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
- 这段代码会在具有
Transactional
的注解方法上生效
@ServicepublicclassIssueServiceImpl { @AutowiredprivateJdbcTemplatejdbcTemplate; @Transactional() publicbooleaninsertIssue() throwsException { jdbcTemplate.execute("INSERT INTO `scrum`.`issue`() VALUES ()"); thrownewException("a"); } } publicclassDeclarativeTransactionTest { publicstaticvoidmain(String[] args) throwsException { AnnotationConfigApplicationContextapplicationContext = newAnnotationConfigApplicationContext( TxConfig.class); IssueServiceImplbean = applicationContext.getBean(IssueServiceImpl.class); bean.insertIssue(); System.out.println(); applicationContext.close(); } }
断点开始进行查阅. 再断点后执行一步会直接进入 cglib 代理对象
org.springframework.aop.framework.CglibAopProxy.DynamicAdvisedInterceptor#intercept
具体不展开,继续往下执行
走到invoke
方法了
入参对象查看
获取事务属性
@Override@NullablepublicTransactionAttributegetTransactionAttribute(Methodmethod, @NullableClass<?> targetClass) { if (method.getDeclaringClass() == Object.class) { returnnull; } // First, see if we have a cached value.// 尝试缓存中获取ObjectcacheKey = getCacheKey(method, targetClass); TransactionAttributecached = this.attributeCache.get(cacheKey); if (cached != null) { // Value will either be canonical value indicating there is no transaction attribute,// or an actual transaction attribute.if (cached == NULL_TRANSACTION_ATTRIBUTE) { returnnull; } else { returncached; } } else { // We need to work it out.// 自行构建一个事务属性TransactionAttributetxAttr = computeTransactionAttribute(method, targetClass); // Put it in the cache.if (txAttr == null) { this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE); } else { StringmethodIdentification = ClassUtils .getQualifiedMethodName(method, targetClass); if (txAttrinstanceofDefaultTransactionAttribute) { ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification); } if (logger.isTraceEnabled()) { logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr); } this.attributeCache.put(cacheKey, txAttr); } returntxAttr; } } protectedObjectgetCacheKey(Methodmethod, @NullableClass<?> targetClass) { returnnewMethodClassKey(method, targetClass); }
此处方法已经获取到了这个方法就是后面的一个切面
确定事务管理器
@NullableprotectedTransactionManagerdetermineTransactionManager( @NullableTransactionAttributetxAttr) { // Do not attempt to lookup tx manager if no tx attributes are set// 空判断返回一个事务管理器if (txAttr == null || this.beanFactory == null) { returngetTransactionManager(); } // 属性是否有别名Stringqualifier = txAttr.getQualifier(); // 如果有if (StringUtils.hasText(qualifier)) { // 从 ioc 容器中根据类型和名称获取事务管理器returndetermineQualifiedTransactionManager(this.beanFactory, qualifier); } elseif (StringUtils.hasText(this.transactionManagerBeanName)) { // 从 ioc 容器中根据类型和名称获取事务管理器returndetermineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName); } else { // 通过get方法获取TransactionManagerdefaultTransactionManager = getTransactionManager(); // 如果没有if (defaultTransactionManager == null) { // 尝试从缓存中获取defaultTransactionManager = this.transactionManagerCache .get(DEFAULT_TRANSACTION_MANAGER_KEY); // 缓存里面没有从 ioc 容器中获取并且设置缓存if (defaultTransactionManager == null) { defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class); this.transactionManagerCache.putIfAbsent( DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); } } returndefaultTransactionManager; } }
类型转换
@NullableprivatePlatformTransactionManagerasPlatformTransactionManager( @NullableObjecttransactionManager) { if (transactionManager == null || transactionManagerinstanceofPlatformTransactionManager) { return (PlatformTransactionManager) transactionManager; } else { thrownewIllegalStateException( "Specified transaction manager is not a PlatformTransactionManager: " + transactionManager); } }
获取方法切面
privateStringmethodIdentification(Methodmethod, @NullableClass<?> targetClass, @NullableTransactionAttributetxAttr) { StringmethodIdentification = methodIdentification(method, targetClass); if (methodIdentification == null) { if (txAttrinstanceofDefaultTransactionAttribute) { // 直接就获取了.方法签名.methodIdentification = ((DefaultTransactionAttribute) txAttr).getDescriptor(); } if (methodIdentification == null) { methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass); } } returnmethodIdentification; }
创建一个新的事务根据事务传播性
@SuppressWarnings("serial") protectedTransactionInfocreateTransactionIfNecessary(@NullablePlatformTransactionManagertm, @NullableTransactionAttributetxAttr, finalStringjoinpointIdentification) { // If no name specified, apply method identification as transaction name.// 把切面的地址放进去if (txAttr != null && txAttr.getName() == null) { txAttr = newDelegatingTransactionAttribute(txAttr) { @OverridepublicStringgetName() { returnjoinpointIdentification; } }; } TransactionStatusstatus = null; if (txAttr != null) { if (tm != null) { // 事务状态// 获取事务status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } // 处理出一个 TransactionInforeturnprepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
tm.getTransaction
@OverridepublicfinalTransactionStatusgetTransaction(@NullableTransactionDefinitiondefinition) throwsTransactionException { // Use defaults if no transaction definition given.// 获取事务的定义TransactionDefinitiondef = (definition != null ? definition : TransactionDefinition.withDefaults()); // 获取事务Objecttransaction = doGetTransaction(); booleandebugEnabled = logger.isDebugEnabled(); // 是否存在事务if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave.// 存在事务后处理什么操作returnhandleExistingTransaction(def, transaction, debugEnabled); } // Check definition settings for new transaction.// 超时的校验. 小于默认值抛出异常if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { thrownewInvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed.// 没有事务抛出异常if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { thrownewIllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } elseif (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { SuspendedResourcesHoldersuspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { booleannewSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatusstatus = newTransactionStatus( def, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, def); prepareSynchronization(status, def); returnstatus; } catch (RuntimeException | Errorex) { resume(null, suspendedResources); throwex; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization.if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger .isWarnEnabled()) { logger.warn( "Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + def); } booleannewSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); returnprepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } }
org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction
@OverrideprotectedObjectdoGetTransaction() { DataSourceTransactionObjecttxObject = newDataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 数据库链接对象// 从事务管理器中获取数据库链接对象ConnectionHolderconHolder = (ConnectionHolder) TransactionSynchronizationManager .getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); returntxObject; }
org.springframework.transaction.support.AbstractPlatformTransactionManager#suspend
@NullableprotectedfinalSuspendedResourcesHoldersuspend(@NullableObjecttransaction) throwsTransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization(); try { ObjectsuspendedResources = null; if (transaction != null) { suspendedResources = doSuspend(transaction); } // 线程名称Stringname = TransactionSynchronizationManager.getCurrentTransactionName(); // 同步方法中设置TransactionSynchronizationManager.setCurrentTransactionName(null); // 只读设置booleanreadOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); // 同步方法中设置TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); // 隔离级别IntegerisolationLevel = TransactionSynchronizationManager .getCurrentTransactionIsolationLevel(); // 同步方法中设置TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); // 是否活跃booleanwasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); returnnewSuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Errorex) { // doSuspend failed - original transaction is still active...doResumeSynchronization(suspendedSynchronizations); throwex; } } elseif (transaction != null) { // Transaction active but no synchronization active.ObjectsuspendedResources = doSuspend(transaction); returnnewSuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active.returnnull; } }
prepareTransactionInfo
简单的new
对象并且绑定线程protectedTransactionInfoprepareTransactionInfo(@NullablePlatformTransactionManagertm, @NullableTransactionAttributetxAttr, StringjoinpointIdentification, @NullableTransactionStatusstatus) { // 初始化TransactionInfotxInfo = newTransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method...if (logger.isTraceEnabled()) { logger.trace( "Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists.txInfo.newTransactionStatus(status); } else { // The TransactionInfo.hasTransaction() method will return false. We created it only// to preserve the integrity of the ThreadLocal stack maintained in this class.if (logger.isTraceEnabled()) { logger.trace("No need to create transaction for [" + joinpointIdentification + "]: This method is not transactional."); } } // We always bind the TransactionInfo to the thread, even if we didn't create// a new transaction here. This guarantees that the TransactionInfo stack// will be managed correctly even if no transaction was created by this aspect.// 和线程绑定txInfo.bindToThread(); returntxInfo; }
retVal = invocation.proceedWithInvocation();
这里走的是 CGLIB 的方法直接会执行结果将结果返回具体方法在
org.springframework.aop.framework.CglibAopProxy.CglibMethodInvocation#proceed
@Override@NullablepublicObjectproceed() throwsThrowable { try { returnsuper.proceed(); } catch (RuntimeExceptionex) { throwex; } catch (Exceptionex) { if (ReflectionUtils.declaresException(getMethod(), ex.getClass())) { throwex; } else { thrownewUndeclaredThrowableException(ex); } } }
如果没有异常就直接处理完成返回了
我们现在是有异常的
try { // This is an around advice: Invoke the next interceptor in the chain.// This will normally result in a target object being invoked.// 回调方法retVal = invocation.proceedWithInvocation(); } catch (Throwableex) { // target invocation exception// 回滚异常completeTransactionAfterThrowing(txInfo, ex); throwex; } finally { // 消息清理cleanupTransactionInfo(txInfo); }
completeTransactionAfterThrowing
回滚异常的处理方法protectedvoidcompleteTransactionAfterThrowing(@NullableTransactionInfotxInfo, Throwableex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "] after exception: " + ex); } if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { // 做回滚txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemExceptionex2) { logger.error("Application exception overridden by rollback exception", ex); ex2.initApplicationException(ex); throwex2; } catch (RuntimeException | Errorex2) { logger.error("Application exception overridden by rollback exception", ex); throwex2; } } else { // We don't roll back on this exception.// Will still roll back if TransactionStatus.isRollbackOnly() is true.try { // org.springframework.transaction.support.AbstractPlatformTransactionManager.commit 的方法txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemExceptionex2) { logger.error("Application exception overridden by commit exception", ex); ex2.initApplicationException(ex); throwex2; } catch (RuntimeException | Errorex2) { logger.error("Application exception overridden by commit exception", ex); throwex2; } } } }
整理一下这里的流程
有异常走回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus())
没有异常直接提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus())
注意: 这里的异常如果是 exception 不会走回滚
判断是否需要回滚
txInfo.transactionAttribute.rollbackOn
链路
org.springframework.transaction.interceptor.DelegatingTransactionAttribute#rollbackOn
org.springframework.transaction.interceptor.RuleBasedTransactionAttribute#rollbackOn
@OverridepublicbooleanrollbackOn(Throwableex) { if (logger.isTraceEnabled()) { logger.trace( "Applying rules to determine whether transaction should rollback on " + ex); } RollbackRuleAttributewinner = null; intdeepest = Integer.MAX_VALUE; if (this.rollbackRules != null) { for (RollbackRuleAttributerule : this.rollbackRules) { intdepth = rule.getDepth(ex); if (depth >= 0 && depth < deepest) { deepest = depth; winner = rule; } } } if (logger.isTraceEnabled()) { logger.trace("Winning rollback rule is: " + winner); } // User superclass behavior (rollback on unchecked) if no rule matches.if (winner == null) { logger.trace("No relevant rollback rule found: applying default rules"); returnsuper.rollbackOn(ex); } return !(winnerinstanceofNoRollbackRuleAttribute); }
org.springframework.transaction.interceptor.DefaultTransactionAttribute#rollbackOn
@OverridepublicbooleanrollbackOn(Throwableex) { return (exinstanceofRuntimeException || exinstanceofError); }
- 这就是我们的异常判断是否需要回滚
cleanupTransactionInfo
数据清理
protectedvoidcleanupTransactionInfo(@NullableTransactionInfotxInfo) { if (txInfo != null) { txInfo.restoreThreadLocalStatus(); } }
privatevoidrestoreThreadLocalStatus() { // Use stack to restore old transaction TransactionInfo.// Will be null if none was set.transactionInfoHolder.set(this.oldTransactionInfo); }
- 默认的事务定义
- 常见属性
- timeout
- readOnly
- ....
- 常见属性
// 获取事务TransactionStatusgetTransaction(@NullableTransactionDefinitiondefinition)throwsTransactionException; // 提交事务voidcommit(TransactionStatusstatus) throwsTransactionException; // 回滚事务voidrollback(TransactionStatusstatus) throwsTransactionException;
- 贴出一部分
- AbstractPlatformTransactionManager 定义了一些基础属性 以及一些需要子类实现的方法
// 属性defaultTimeoutnestedTransactionAllowedvalidateExistingTransactionglobalRollbackOnParticipationFailurefailEarlyOnGlobalRollbackOnlyrollbackOnCommitFailure// 方法doGetTransactionisExistingTransactionuseSavepointForNestedTransactiondoBegindoSuspenddoResumeshouldCommitOnGlobalRollbackOnlyprepareForCommitdoCommitdoRollbackdoSetRollbackOnlyregisterAfterCompletionWithExistingTransactiondoCleanupAfterCompletion
- xml 配置如下
<beanid="dataSource"class="com.alibaba.druid.pool.DruidDataSource"> <propertyname="url" value=""/> <propertyname="username"value=""/> <propertyname="password"value=""/> <propertyname="driverClassName"value="com.mysql.jdbc.Driver"/> </bean> <beanid="jdbcTemplate"class="org.springframework.jdbc.core.JdbcTemplate"> <propertyname="dataSource"ref="dataSource"/> </bean> <beanid="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <propertyname="dataSource"ref="dataSource"/> </bean>
两个属性,通常我们会配置 datasource
@NullableprivateDataSourcedataSource; privatebooleanenforceReadOnly = false;
- bean 的属性注入就不具体描述了
InitializingBean
@OverridepublicvoidafterPropertiesSet() { if (getDataSource() == null) { thrownewIllegalArgumentException("Property 'dataSource' is required"); } }
- 如果
dataSource
为空会抛出异常 - 默认单例会注册到 ioc 容器中.后续注册流程不具体描述
- 如果
方法注释
/** * 获取datasource */protectedDataSourceobtainDataSource() { DataSourcedataSource = getDataSource(); Assert.state(dataSource != null, "No DataSource set"); returndataSource; } /** * 创建事务 * * @return 事务对象 */@OverrideprotectedObjectdoGetTransaction() { DataSourceTransactionObjecttxObject = newDataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); // 数据库链接对象// 从事务管理器中获取数据库链接对象ConnectionHolderconHolder = (ConnectionHolder) TransactionSynchronizationManager .getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); returntxObject; } /** * 是否存在事务 * * @param transaction transaction object returned by doGetTransaction * @return */@OverrideprotectedbooleanisExistingTransaction(Objecttransaction) { DataSourceTransactionObjecttxObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder() .isTransactionActive()); } /** * This implementation sets the isolation level but ignores the timeout. 事务的开始方法 */@OverrideprotectedvoiddoBegin(Objecttransaction, TransactionDefinitiondefinition) { // 拿出事务DataSourceTransactionObjecttxObject = (DataSourceTransactionObject) transaction; // 链接对象Connectioncon = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { // 数据库链接对象ConnectionnewCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } // 设置数据库连接txObject.setConnectionHolder(newConnectionHolder(newCon), true); } // 拿出链接对象并且设置同步事务txObject.getConnectionHolder().setSynchronizedWithTransaction(true); // 链接对象赋值con = txObject.getConnectionHolder().getConnection(); // 获取事务级别IntegerpreviousIsolationLevel = DataSourceUtils .prepareConnectionForTransaction(con, definition); // 设置事务隔离级别txObject.setPreviousIsolationLevel(previousIsolationLevel); // 设置只读txObject.setReadOnly(definition.isReadOnly()); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,// so we don't want to do it unnecessarily (for example if we've explicitly// configured the connection pool to set it already).// 判断是否自动提交if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false); } // 事务链接准备prepareTransactionalConnection(con, definition); // 事务激活txObject.getConnectionHolder().setTransactionActive(true); // 超时时间获取inttimeout = determineTimeout(definition); // 默认超时时间设置if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread.// 将链接和当前线程绑定if (txObject.isNewConnectionHolder()) { // k: datasource v: connectionHolderTransactionSynchronizationManager .bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwableex) { if (txObject.isNewConnectionHolder()) { // 释放链接DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null, false); } thrownewCannotCreateTransactionException( "Could not open JDBC Connection for transaction", ex); } } /** * 挂起事务 * * @param transaction transaction object returned by {@code doGetTransaction} * @return 移除的链接 */@OverrideprotectedObjectdoSuspend(Objecttransaction) { // 获取事务对象DataSourceTransactionObjecttxObject = (DataSourceTransactionObject) transaction; // 连接置空txObject.setConnectionHolder(null); // 解除资源绑定returnTransactionSynchronizationManager.unbindResource(obtainDataSource()); } /** * 恢复事务 * * @param transaction transaction object returned by {@code doGetTransaction} * @param suspendedResources the object that holds suspended resources, as returned by * doSuspend */@OverrideprotectedvoiddoResume(@NullableObjecttransaction, ObjectsuspendedResources) { // 资源绑定TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources); } /** * 做提交 * * @param status the status representation of the transaction */@OverrideprotectedvoiddoCommit(DefaultTransactionStatusstatus) { // 事务对象DataSourceTransactionObjecttxObject = (DataSourceTransactionObject) status .getTransaction(); // 获取链接Connectioncon = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { // 链接提交con.commit(); } catch (SQLExceptionex) { thrownewTransactionSystemException("Could not commit JDBC transaction", ex); } } /** * 事务回滚 * * @param status the status representation of the transaction */@OverrideprotectedvoiddoRollback(DefaultTransactionStatusstatus) { // 事务对象DataSourceTransactionObjecttxObject = (DataSourceTransactionObject) status .getTransaction(); // 链接对象Connectioncon = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); } try { // 回滚方法con.rollback(); } catch (SQLExceptionex) { thrownewTransactionSystemException("Could not roll back JDBC transaction", ex); } } /** * 设置回滚 * * @param status the status representation of the transaction */@OverrideprotectedvoiddoSetRollbackOnly(DefaultTransactionStatusstatus) { DataSourceTransactionObjecttxObject = (DataSourceTransactionObject) status .getTransaction(); if (status.isDebug()) { logger.debug( "Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() + "] rollback-only"); } txObject.setRollbackOnly(); } /** * 清除 * * @param transaction transaction object returned by {@code doGetTransaction} */@OverrideprotectedvoiddoCleanupAfterCompletion(Objecttransaction) { DataSourceTransactionObjecttxObject = (DataSourceTransactionObject) transaction; // Remove the connection holder from the thread, if exposed.if (txObject.isNewConnectionHolder()) { // 释放datasource绑定的资源TransactionSynchronizationManager.unbindResource(obtainDataSource()); } // Reset connection.Connectioncon = txObject.getConnectionHolder().getConnection(); try { if (txObject.isMustRestoreAutoCommit()) { con.setAutoCommit(true); } // 重置链接DataSourceUtils.resetConnectionAfterTransaction( con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly()); } catch (Throwableex) { logger.debug("Could not reset JDBC Connection after transaction", ex); } if (txObject.isNewConnectionHolder()) { if (logger.isDebugEnabled()) { logger.debug("Releasing JDBC Connection [" + con + "] after transaction"); } DataSourceUtils.releaseConnection(con, this.dataSource); } txObject.getConnectionHolder().clear(); } /** * * 事务准备 */protectedvoidprepareTransactionalConnection(Connectioncon, TransactionDefinitiondefinition) throwsSQLException { if (isEnforceReadOnly() && definition.isReadOnly()) { try (Statementstmt = con.createStatement()) { // 执行sql 类似事务隔离级别stmt.executeUpdate("SET TRANSACTION READ ONLY"); } } }
privatestaticclassDataSourceTransactionObjectextendsJdbcTransactionObjectSupport { /** * 是否有新的链接 */privatebooleannewConnectionHolder; /** * 是否自动提交 */privatebooleanmustRestoreAutoCommit; }
- abstract 修饰具体定义的方法不具体展开。主要关注实现
org.springframework.transaction.PlatformTransactionManager
的几个方法
@Overridepublicfinalvoidcommit(TransactionStatusstatus) throwsTransactionException { if (status.isCompleted()) { thrownewIllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } // 事务状态DefaultTransactionStatusdefStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } // 处理回滚processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug( "Global transaction is marked as rollback-only but transactional code requested commit"); } // 处理回滚processRollback(defStatus, true); return; } // 真正的处理提交processCommit(defStatus); }
privatevoidprocessCommit(DefaultTransactionStatusstatus) throwsTransactionException { try { booleanbeforeCompletionInvoked = false; try { booleanunexpectedRollback = false; //prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); // 前置任务是否已经执行beforeCompletionInvoked = true; // 嵌套事务. 是否有保存点if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } elseif (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status); } elseif (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only// marker but still didn't get a corresponding exception from commit.if (unexpectedRollback) { thrownewUnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackExceptionex) { // can only be caused by doCommit// 事务的同步状态: 回滚triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throwex; } catch (TransactionExceptionex) { // can only be caused by doCommit// 提交失败 做回滚if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { // 事务的同步状态: 未知triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throwex; } catch (RuntimeException | Errorex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throwex; } // Trigger afterCommit callbacks, with an exception thrown there// propagated to callers but the transaction still considered as committed.try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { // 完成后清理cleanupAfterCompletion(status); } }
@Overridepublicfinalvoidrollback(TransactionStatusstatus) throwsTransactionException { // 是否已完成if (status.isCompleted()) { thrownewIllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatusdefStatus = (DefaultTransactionStatus) status; // 执行回滚processRollback(defStatus, false); }
privatevoidprocessRollback(DefaultTransactionStatusstatus, booleanunexpected) { try { booleanunexpectedRollback = unexpected; try { triggerBeforeCompletion(status); // 嵌套事务if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint"); } // 回滚保存点status.rollbackToHeldSavepoint(); } // 独立事务elseif (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback"); } // 执行回滚doRollback(status); } else { // Participating in larger transactionif (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug( "Participating transaction failed - marking existing transaction as rollback-only"); } // 设置回滚doSetRollbackOnly(status); } else { if (status.isDebug()) { logger.debug( "Participating transaction failed - letting transaction originator decide on rollback"); } } } else { logger.debug( "Should roll back transaction but cannot - no transaction available"); } // Unexpected rollback only matters here if we're asked to fail earlyif (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (RuntimeException | Errorex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throwex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); // Raise UnexpectedRollbackException if we had a global rollback-only markerif (unexpectedRollback) { thrownewUnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } } finally { cleanupAfterCompletion(status); } }
事务同步管理器
一些基本属性
/** * 资源 */privatestaticfinalThreadLocal<Map<Object, Object>> resources = newNamedThreadLocal<>("Transactional resources"); /** * 同步器 */privatestaticfinalThreadLocal<Set<TransactionSynchronization>> synchronizations = newNamedThreadLocal<>("Transaction synchronizations"); /** * 事务名称 */privatestaticfinalThreadLocal<String> currentTransactionName = newNamedThreadLocal<>("Current transaction name"); /** * 是否只读 */privatestaticfinalThreadLocal<Boolean> currentTransactionReadOnly = newNamedThreadLocal<>("Current transaction read-only status"); /** * 事务隔离级别 */privatestaticfinalThreadLocal<Integer> currentTransactionIsolationLevel = newNamedThreadLocal<>("Current transaction isolation level"); /** * 事务激活状态 */privatestaticfinalThreadLocal<Boolean> actualTransactionActive = newNamedThreadLocal<>("Actual transaction active");
publicstaticMap<Object, Object> getResourceMap() { // 线程变量中获取Map<Object, Object> map = resources.get(); // 判空 如果为空给个空map如果有就返回return (map != null ? Collections.unmodifiableMap(map) : Collections.emptyMap()); }
publicstaticbooleanhasResource(Objectkey) { // 资源key获取// 通过 unwrapResourceIfNecessary 会走一次资源对象转换.// 1. InfrastructureProxy// 2. ScopedObjectObjectactualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Objectvalue = doGetResource(actualKey); return (value != null); }
unwrapResourceIfNecessary
方法会将资源具体化到接口,从接口中调用方法获取具体的资源staticObjectunwrapResourceIfNecessary(Objectresource) { Assert.notNull(resource, "Resource must not be null"); ObjectresourceRef = resource; // unwrap infrastructure proxyif (resourceRefinstanceofInfrastructureProxy) { resourceRef = ((InfrastructureProxy) resourceRef).getWrappedObject(); } if (aopAvailable) { // now unwrap scoped proxyresourceRef = ScopedProxyUnwrapper.unwrapIfNecessary(resourceRef); } returnresourceRef; } privatestaticclassScopedProxyUnwrapper { publicstaticObjectunwrapIfNecessary(Objectresource) { if (resourceinstanceofScopedObject) { return ((ScopedObject) resource).getTargetObject(); } else { returnresource; } } }
doGetResource
方法去获取资源@NullableprivatestaticObjectdoGetResource(ObjectactualKey) { Map<Object, Object> map = resources.get(); if (map == null) { returnnull; } Objectvalue = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void...// 如果资源是下面两种的其中一个就删除这个资源if (valueinstanceofResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty...if (map.isEmpty()) { resources.remove(); } value = null; } returnvalue; }
publicstaticvoidbindResource(Objectkey, Objectvalue) throwsIllegalStateException { // 将资源转换为正真的keyObjectactualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.notNull(value, "Value must not be null"); Map<Object, Object> map = resources.get(); // set ThreadLocal Map if none found// 资源对象为空初始化if (map == null) { map = newHashMap<>(); resources.set(map); } // 原来的值ObjectoldValue = map.put(actualKey, value); // Transparently suppress a ResourceHolder that was marked as void...// 如果原来的值是下面的两种 抛出异常if (oldValueinstanceofResourceHolder && ((ResourceHolder) oldValue).isVoid()) { oldValue = null; } if (oldValue != null) { thrownewIllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } if (logger.isTraceEnabled()) { logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]"); } }
- debug 使用的是 druid 的数据源
unwrapResourceIfNecessary
方法
staticObjectunwrapResourceIfNecessary(Objectresource) { Assert.notNull(resource, "Resource must not be null"); ObjectresourceRef = resource; // unwrap infrastructure proxyif (resourceRefinstanceofInfrastructureProxy) { resourceRef = ((InfrastructureProxy) resourceRef).getWrappedObject(); } if (aopAvailable) { // now unwrap scoped proxyresourceRef = ScopedProxyUnwrapper.unwrapIfNecessary(resourceRef); } returnresourceRef; }
显然com.alibaba.druid.pool.DruidDataSource
不是InfrastructureProxy
aopAvailable
privatestaticfinalbooleanaopAvailable = ClassUtils.isPresent( "org.springframework.aop.scope.ScopedObject", TransactionSynchronizationUtils.class.getClassLoader());
publicstaticbooleanisPresent(StringclassName, @NullableClassLoaderclassLoader) { try { forName(className, classLoader); returntrue; } catch (IllegalAccessErrorerr) { thrownewIllegalStateException("Readability mismatch in inheritance hierarchy of class [" + className + "]: " + err.getMessage(), err); } catch (Throwableex) { // Typically ClassNotFoundException or NoClassDefFoundError...returnfalse; } }
看是否可以解析如果解析成功返回
true
解析失败返回false
ScopedProxyUnwrapper.unwrapIfNecessary
privatestaticclassScopedProxyUnwrapper { publicstaticObjectunwrapIfNecessary(Objectresource) { if (resourceinstanceofScopedObject) { return ((ScopedObject) resource).getTargetObject(); } else { returnresource; } } }
com.alibaba.druid.pool.DruidDataSource
不是ScopedObject
直接返回
后续就是一个map
的put
方法不具体展开
publicstaticObjectunbindResource(Objectkey) throwsIllegalStateException { // 获取真正的资源对象ObjectactualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); // map 移除keyObjectvalue = doUnbindResource(actualKey); if (value == null) { thrownewIllegalStateException( "No value for key [" + actualKey + "] bound to thread [" + Thread .currentThread().getName() + "]"); } returnvalue; } @NullableprivatestaticObjectdoUnbindResource(ObjectactualKey) { Map<Object, Object> map = resources.get(); if (map == null) { returnnull; } Objectvalue = map.remove(actualKey); // Remove entire ThreadLocal if empty...if (map.isEmpty()) { resources.remove(); } // Transparently suppress a ResourceHolder that was marked as void...if (valueinstanceofResourceHolder && ((ResourceHolder) value).isVoid()) { value = null; } if (value != null && logger.isTraceEnabled()) { logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" + Thread.currentThread().getName() + "]"); } returnvalue; }
map 对象的 remove 操作
- 其他几个都是使用
ThreadLocal
进行数据设置操作即可.
属性
@NullableprivatePlatformTransactionManagertransactionManager;
前文说到
DataSourceTransactionManager
实现了PlatformTransactionManager
因此配置的时候我们有如下片段<beanid="transactionTemplate"class="org.springframework.transaction.support.TransactionTemplate"> <propertyname="transactionManager"ref="transactionManager"/> </bean>
事务操作模板类图
org.springframework.beans.factory.InitializingBean
接口的实现@OverridepublicvoidafterPropertiesSet() { if (this.transactionManager == null) { thrownewIllegalArgumentException("Property 'transactionManager' is required"); } }
@Override@Nullablepublic <T> Texecute(TransactionCallback<T> action) throwsTransactionException { Assert.state(this.transactionManager != null, "No PlatformTransactionManager set"); // 事务管理是否是 xxx接口if (this.transactionManagerinstanceofCallbackPreferringPlatformTransactionManager) { // 强转执行return ((CallbackPreferringPlatformTransactionManager) this.transactionManager) .execute(this, action); } else { // 获取事务状态TransactionStatusstatus = this.transactionManager.getTransaction(this); // 返回结果Tresult; try { // 事务回调执行result = action.doInTransaction(status); } catch (RuntimeException | Errorex) { // Transactional code threw application exception -> rollback// 回滚异常rollbackOnException(status, ex); throwex; } catch (Throwableex) { // Transactional code threw unexpected exception -> rollback// 回滚异常rollbackOnException(status, ex); thrownewUndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception"); } // 提交this.transactionManager.commit(status); returnresult; } }