查看原文
其他

【技术干货】Spring事务原理一探

网易云信 网易云信 2022-10-28

本篇文章是网易云信研发工程师对Spring事务实现原理及实现的研究和总结,分享给大家,希望和大家共同探讨。

事务是一个由有限操作集合组成的逻辑单元。事务操作包含两个目的,数据一致以及操作隔离。数据一致是指事务提交时保证事务内的所有操作都成功完成,并且更改永久生效;事务回滚时,保证能够恢复到事务执行之前的状态。操作隔离则是指多个同时执行的事务之间应该相互独立,互不影响。

事务是一个比较广泛的概念,事务管理资源除了我们熟知的数据库外,还可以包含消息队列、文件系统等。当然,一般来说,我们说的事务单指“数据库事务”。接下来我们会以MySQL数据库、Spring声明式事务为主要研究对象,但是很多事务概念、接口抽象和实现方式同时适用于其他情况。

事务属性和行为

ACID属性

提到事务,不可避免需要涉及到事务的ACID属性:

  • 原子性(Atomicity):事务作为一个整体被执行,包含在其中的对数据库的操作要么全部被执行,要么都不执行。

  • 一致性(Consistency):事务应确保数据库的状态从一个一致状态转变为另一个一致状态。一致状态的含义是数据库中的数据应满足完整性约束。

  • 隔离性(Isolation):多个事务并发执行时,一个事务的执行不应影响其他事务的执行。

  • 持久性(Durability):已被提交的事务对数据库的修改应该永久保存在数据库中。

我们将严格遵循ACID属性的事务称为刚性事务。与之相对,期望最终一致性,在事务执行的中间状态允许暂时不遵循ACID属性的事务称为柔性事务,可参考《传统事务与柔性事务》(https://www.jianshu.com/p/ab1a1c6b08a1),柔性事务的使用涉及到分布式事务方案,可以后续扩展,这里我们先将注意集中在事务实现原理上。

隔离级别

根据SQL92标准,MySQL的InnoDB引擎提供四种隔离级别(即ACID中的I):读未提交(READ UNCOMMITTED)、读已提交(READ COMMITTED)、可重复读(REPEATABLE READ)和串行化(SERIALIZABLE),InnoDB默认的隔离级别是 REPEATABLE READ,其可避免脏读不可重复读,但不能避免幻读,需要指出的是,InnoDB引擎的多版本并发控制机制(MVCC)并没有完全避免幻读,关于该问题以及隔离级别说明,可参考《MySQL的InnoDB的幻读问题》(http://blog.sina.com.cn/s/blog_499740cb0100ugs7.html)。

传播机制

Spring针对方法嵌套调用时事务的创建行为定义了七种事务传播机制,分别是PROPAGATIONREQUIRED、PROPAGATIONSUPPORT、PROPAGATIONMANDATORY、PROPAGATIONREQUIRESNEW、PROPAGATIONNOTSUPPORTED、PROPAGATIONNEVER以及PROPAGATION_NESTED,基本上从字面意思就能知道每种传播机制各自的行为表现,Spring默认的事务传播机制是 PROPAGATION_REQUIRED,即如果当前存在事务,则使用当前事务,否则创建新的事务。详情可参考《Spring事务传播行为》(https://juejin.im/post/5ae9639af265da0b926564e7)。

事务行为

事务的行为包括事务开启、事务提交和事务回滚。InnoDB所有的用户SQL执行都在事务控制之内,在默认情况下,autocommit设置为 true,单条SQL执行成功后,MySQL会自动提交事务,或者如果SQL执行出错,则根据异常类型执行事务提交或者回滚。可以使用 START TRANSACTION(SQL标准)或者 BEGIN开启事务,使用 COMMITROLLBACK提交和回滚事务;也可以通过设置autocommit属性来控制事务行为,当设置autocommit为 false时,其后执行的多条SQL语句将在一个事务内,直到执行 COMMIT或者 ROLLBACK事务才会提交或者回滚。

AOP增强

Spring使用AOP(面向切面编程)来实现声明式事务,后续在讲Spring事务具体实现的时候会详细说明,关于AOP的概念可参考《Spring AOP概念理解》(通俗易懂)(https://juejin.im/post/5ae9639af265da0b926564e7),这里不再细说。说下动态代理AOP增强

动态代理是Spring实现AOP的默认方式,分为两种:JDK动态代理CGLIB动态代理。JDK动态代理面向接口,通过反射生成目标代理接口的匿名实现类;CGLIB动态代理则通过继承,使用字节码增强技术(或者 objenesis类库)为目标代理类生成代理子类。Spring默认对接口实现使用JDK动态代理,对具体类使用CGLIB,同时也支持配置全局使用CGLIB来生成代理对象。

我们在切面配置中会使用到 @Aspect注解,这里用到了Aspectj的切面表达式。Aspectj是java语言实现的一个AOP框架,使用静态代理模式,拥有完善的AOP功能,与Spring AOP互为补充。Spring采用了Aspectj强大的切面表达式定义方式,但是默认情况下仍然使用动态代理方式,并未使用Aspectj的编译器和织入器,当然也支持配置使用Aspectj静态代理替代动态代理方式。Aspectj功能更强大,比方说它支持对字段、POJO类进行增强,与之相对,Spring只支持对Bean方法级别进行增强。

Spring对方法的增强有五种方式:

  • 前置增强( org.springframework.aop.BeforeAdvice):在目标方法执行之前进行增强;

  • 后置增强( org.springframework.aop.AfterReturningAdvice):在目标方法执行之后进行增强;

  • 环绕增强( org.aopalliance.intercept.MethodInterceptor):在目标方法执行前后都执行增强;

  • 异常抛出增强( org.springframework.aop.ThrowsAdvice):在目标方法抛出异常后执行增强;

  • 引介增强( org.springframework.aop.IntroductionInterceptor):为目标类添加新的方法和属性。

声明式事务的实现就是通过环绕增强的方式,在目标方法执行之前开启事务,在目标方法执行之后提交或者回滚事务,事务拦截器的继承关系图可以体现这一点:

Spring事务抽象

统一一致的事务抽象是Spring框架的一大优势,无论是全局事务还是本地事务,JTA、JDBC、Hibernate还是JPA,Spring都使用统一的编程模型,使得应用程序可以很容易地在全局事务与本地事务,或者不同的事务框架之间进行切换。下图是Spring事务抽象的核心类图:

接口 PlatformTransactionManager定义了事务操作的行为,其依赖 TransactionDefinitionTransactionStatus接口,其实大部分的事务属性和行为我们以MySQL数据库为例已经有过了解,这里再对应介绍下。

  • PlatformTransactionManager:事务管理器

  • getTransaction方法:事务获取操作,根据事务属性定义,获取当前事务或者创建新事物;

  • commit方法:事务提交操作,注意这里所说的提交并非直接提交事务,而是根据当前事务状态执行提交或者回滚操作;

  • rollback方法:事务回滚操作,同样,也并非一定直接回滚事务,也有可能只是标记事务为只读,等待其他调用方执行回滚。

  • TransactionDefinition:事务属性定义

  • getPropagationBehavior方法:返回事务的传播属性,默认是 PROPAGATION_REQUIRED

  • getIsolationLevel方法:返回事务隔离级别,事务隔离级别只有在创建新事务时才有效,也就是说只对应传播属性 PROPAGATION_REQUIRED和 PROPAGATION_REQUIRES_NEW

  • getTimeout方法:返回事务超时时间,以秒为单位,同样只有在创建新事务时才有效;

  • isReadOnly方法:是否优化为只读事务,支持这项属性的事务管理器会将事务标记为只读,只读事务不允许有写操作,不支持只读属性的事务管理器需要忽略这项设置,这一点跟其他事务属性定义不同,针对其他不支持的属性设置,事务管理器应该抛出异常。

  • getName方法:返回事务名称,声明式事务中默认值为“类的完全限定名.方法名”。

  • TransactionStatus:当前事务状态

  • isNewTransaction方法:当前方法是否创建了新事务(区别于使用现有事务以及没有事务);

  • hasSavepoint方法:在嵌套事务场景中,判断当前事务是否包含保存点;

  • setRollbackOnly和 isRollbackOnly方法:只读属性设置(主要用于标记事务,等待回滚)和查询;

  • flush方法:刷新底层会话中的修改到数据库,一般用于刷新如Hibernate/JPA的会话,是否生效由具体事务资源实现决定;

  • isCompleted方法:判断当前事务是否已完成(已提交或者已回滚)。

部分Spring包含的对 PlatformTransactionManager的实现类如下图所示:

AbstractPlatformTransactionManager抽象类实现了Spring事务的标准流程,其子类 DataSourceTransactionManager是我们使用较多的JDBC单数据源事务管理器,而 JtaTransactionManager是JTA(Java Transaction API)规范的实现类,另外两个则分别是JavaEE容器WebLogicWebSphere的JTA事务管理器的具体实现。

Spring事务切面

之前提到,Spring采用AOP来实现声明式事务,那么事务的AOP切面是如何织入的呢?这一点涉及到AOP动态代理对象的生成过程。

代理对象生成的核心类是 AbstractAutoProxyCreator,实现了 BeanPostProcessor接口,会在Bean初始化完成之后,通过 postProcessAfterInitialization方法生成代理对象,关于 BeanPostProcessor在Bean生命周期中的作用,可参考《一些常用的Spring扩展接口》(https://www.cnblogs.com/xrq730/p/5721366.html)。

看一下 AbstractAutoProxyCreator类的核心代码,主要关注三个方法:postProcessAfterInitialization、wrapIfNecessary和createProxy,为了突出核心流程,以注释代替了部分代码的具体实现,后续的源码分析也采用相同的处理。

  1. // AbstractAutoProxyCreator.class

  2. @Override

  3. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

  4.    if (bean != null) {

  5.        Object cacheKey = getCacheKey(bean.getClass(), beanName);

  6.        if (!this.earlyProxyReferences.contains(cacheKey)) {

  7.            // 创建代理对象

  8.            return wrapIfNecessary(bean, beanName, cacheKey);

  9.        }

  10.    }

  11.    return bean;

  12. }


  13. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {

  14.    // 参数检查,跳过已经执行过代理对象生成,或者已知的不需要生成代理对象的Bean

  15.    ...


  16.    // Create proxy if we have advice.

  17.    // 查询当前Bean所有的AOP增强配置,最终是通过AOPUtils工具类实现

  18.    Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);

  19.    if (specificInterceptors != DO_NOT_PROXY) {

  20.        this.advisedBeans.put(cacheKey, Boolean.TRUE);

  21.        // 执行AOP织入,创建代理对象

  22.        Object proxy = createProxy(

  23.                bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));

  24.        this.proxyTypes.put(cacheKey, proxy.getClass());

  25.        return proxy;

  26.    }


  27.    this.advisedBeans.put(cacheKey, Boolean.FALSE);

  28.    return bean;

  29. }


  30. protected Object createProxy(Class<?> beanClass, String beanName, Object[] specificInterceptors, TargetSource targetSource) {


  31.    if (this.beanFactory instanceof ConfigurableListableBeanFactory) {

  32.        AutoProxyUtils.exposeTargetClass((ConfigurableListableBeanFactory) this.beanFactory, beanName, beanClass);

  33.    }


  34.    // 实例化代理工厂类

  35.    ProxyFactory proxyFactory = new ProxyFactory();

  36.    proxyFactory.copyFrom(this);


  37.    // 当全局使用动态代理时,设置是否需要对目标Bean强制使用CGLIB动态代理

  38.    ...


  39.    // 构建AOP增强顾问,包含框架公共增强和应用程序自定义增强

  40.    // 设置proxyFactory属性,如增强、目标类、是否允许变更等

  41.    ...


  42.    // 创建代理对象

  43.    return proxyFactory.getProxy(getProxyClassLoader());

  44. }

最后是通过调用 ProxyFactory#getProxy(java.lang.ClassLoader)方法来创建代理对象:

  1. // ProxyFactory.class

  2. public Object getProxy(ClassLoader classLoader) {

  3.    return createAopProxy().getProxy(classLoader);

  4. }


  5. // ProxyFactory父类ProxyCreatorSupport.class

  6. protected final synchronized AopProxy createAopProxy() {

  7.    if (!this.active) {

  8.        activate();

  9.    }

  10.    return getAopProxyFactory().createAopProxy(this);

  11. }


  12. public ProxyCreatorSupport() {

  13.    this.aopProxyFactory = new DefaultAopProxyFactory();

  14. }

ProxyFactory的父类构造器实例化了 DefaultAopProxyFactory类,从其源代码我们可以看到Spring动态代理方式选择策略的实现:如果目标类optimize,proxyTargetClass属性设置为 true或者未指定需要代理的接口,则使用CGLIB生成代理对象,否则使用JDK动态代理。

  1. public class DefaultAopProxyFactory implements AopProxyFactory, Serializable {

  2.    @Override

  3.    public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {

  4.        // 如果optimize,proxyTargetClass属性设置为true或者未指定代理接口,则使用CGLIB生成代理对象

  5.        if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {

  6.            Class<?> targetClass = config.getTargetClass();

  7.            // 参数检查,targetClass为空抛出异常

  8.            ...

  9.            // 目标类本身是接口或者代理对象,仍然使用JDK动态代理

  10.            if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {

  11.                return new JdkDynamicAopProxy(config);

  12.            }

  13.            // Objenesis是一个可以不通过构造器创建子类的java工具类库

  14.            // 作为Spring 4.0后CGLIB的默认实现

  15.            return new ObjenesisCglibAopProxy(config);

  16.        }

  17.        else {

  18.            // 否则使用JDK动态代理

  19.            return new JdkDynamicAopProxy(config);

  20.        }

  21.    }

  22.    ...

  23. }

Spring事务拦截

我们已经了解了AOP切面织入生成代理对象的过程,当Bean方法通过代理对象调用时,会触发对应的AOP增强拦截器,前面提到声明式事务是一种环绕增强,对应接口为 MethodInterceptor,事务增强对该接口的实现为 TransactionInterceptor,类图如下:

事务拦截器 TransactionInterceptorinvoke方法中,通过调用父类 TransactionAspectSupportinvokeWithinTransaction方法进行事务处理,该方法支持声明式事务和编程式事务。

  1. // TransactionInterceptor.class

  2. @Override

  3. public Object invoke(final MethodInvocation invocation) throws Throwable {

  4.    // 获取targetClass

  5.    ...


  6.    // Adapt to TransactionAspectSupport's invokeWithinTransaction...

  7.    return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {

  8.        @Override

  9.        public Object proceedWithInvocation() throws Throwable {

  10.            // 实际执行目标方法

  11.            return invocation.proceed();

  12.        }

  13.    });

  14. }


  15. // TransactionInterceptor父类TransactionAspectSupport.class

  16. protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)

  17.        throws Throwable {


  18.    // If the transaction attribute is null, the method is non-transactional.

  19.    // 查询目标方法事务属性、确定事务管理器、构造连接点标识(用于确认事务名称)

  20.    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);

  21.    final PlatformTransactionManager tm = determineTransactionManager(txAttr);

  22.    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);


  23.    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {

  24.        // 事务获取

  25.        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

  26.        Object retVal = null;

  27.        try {

  28.            // 通过回调执行目标方法

  29.            retVal = invocation.proceedWithInvocation();

  30.        }

  31.        catch (Throwable ex) {

  32.            // 目标方法执行抛出异常,根据异常类型执行事务提交或者回滚操作

  33.            completeTransactionAfterThrowing(txInfo, ex);

  34.            throw ex;

  35.        }

  36.        finally {

  37.            // 清理当前线程事务信息

  38.            cleanupTransactionInfo(txInfo);

  39.        }

  40.        // 目标方法执行成功,提交事务

  41.        commitTransactionAfterReturning(txInfo);

  42.        return retVal;

  43.    } else {

  44.        // 带回调的事务执行处理,一般用于编程式事务

  45.        ...

  46.    }

  47. }

在讲Spring事务抽象时,有提到事务抽象的核心接口为 PlatformTransactionManager,它负责管理事务行为,包括事务的获取、提交和回滚。在 invokeWithinTransaction方法中,我们可以看到 createTransactionIfNecessarycommitTransactionAfterReturningcompleteTransactionAfterThrowing都是针对该接口编程,并不依赖于特定事务管理器,这里是对Spring事务抽象的实现。

  1. //TransactionAspectSupport.class

  2. protected TransactionInfo createTransactionIfNecessary(

  3.        PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

  4.    ...

  5.    TransactionStatus status = null;

  6.    if (txAttr != null) {

  7.        if (tm != null) {

  8.            // 获取事务

  9.            status = tm.getTransaction(txAttr);

  10.            ...

  11. }


  12. protected void commitTransactionAfterReturning(TransactionInfo txInfo) {

  13.    if (txInfo != null && txInfo.hasTransaction()) {

  14.        ...

  15.        // 提交事务

  16.        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());

  17.    }

  18. }


  19. protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {

  20.    if (txInfo != null && txInfo.hasTransaction()) {

  21.        ...

  22.        if (txInfo.transactionAttribute.rollbackOn(ex)) {

  23.            try {

  24.                // 异常类型为回滚异常,执行事务回滚

  25.                txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());

  26.            }

  27.            ...

  28.        } else {

  29.            try {

  30.                // 异常类型为非回滚异常,仍然执行事务提交

  31.                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());

  32.            }

  33.            ...

  34. }


  35. protected final class TransactionInfo {

  36.    private final PlatformTransactionManager transactionManager;

  37.    ...

另外,在获取事务时, AbstractPlatformTransactionManager#doBegin方法负责开启新事务,在 DataSourceTransactionManager有如下代码:

  1. @Override

  2. protected void doBegin(Object transaction, TransactionDefinition definition) {

  3.    // 获取数据库连接con

  4.    ...

  5.    if (con.getAutoCommit()) {

  6.        txObject.setMustRestoreAutoCommit(true);

  7.        if (logger.isDebugEnabled()) {

  8.            logger.debug("Switching JDBC Connection [" + con + "] to manual commit");

  9.        }

  10.        con.setAutoCommit(false);

  11.    }

  12.    ...

  13. }

这里才真正开启了数据库事务。

Spring事务同步

提到事务传播机制时,我们经常提到一个条件“如果当前已有事务”,那么Spring是如何知道当前是否已经开启了事务呢?在 AbstractPlatformTransactionManager中是这样做的:

  1. // AbstractPlatformTransactionManager.class

  2. @Override

  3. public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {

  4.    Object transaction = doGetTransaction();

  5.    // 参数为null时构造默认值

  6.    ...

  7.    if (isExistingTransaction(transaction)) {

  8.        // Existing transaction found -> check propagation behavior to find out how to behave.

  9.        return handleExistingTransaction(definition, transaction, debugEnabled);

  10.    }

  11.    ...


  12. // 获取当前事务对象

  13. protected abstract Object doGetTransaction() throws TransactionException;


  14. // 判断当前事务对象是否包含活跃事务

  15. protected boolean isExistingTransaction(Object transaction) throws TransactionException {

  16.    return false;

  17. }

注意 getTransaction方法是 final的,无法被子类覆盖,保证了获取事务流程的一致和稳定。抽象方法 doGetTransaction获取当前事务对象,方法 isExistingTransaction判断当前事务对象是否存在活跃事务,具体逻辑由特定事务管理器实现,看下我们使用最多的 DataSourceTransactionManager对应的实现:

  1. // DataSourceTransactionManager.class

  2. @Override

  3. protected Object doGetTransaction() {

  4.    DataSourceTransactionObject txObject = new DataSourceTransactionObject();

  5.    txObject.setSavepointAllowed(isNestedTransactionAllowed());

  6.    ConnectionHolder conHolder =

  7.            (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);

  8.    txObject.setConnectionHolder(conHolder, false);

  9.    return txObject;

  10. }


  11. @Override

  12. protected boolean isExistingTransaction(Object transaction) {

  13.    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

  14.    return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());

  15. }

可以看到,获取当前事务对象时,使用了 TransactionSynchronizationManager#getResource方法,类图如下:

TransactionSynchronizationManager通过 ThreadLocal对象在当前线程记录了 resourcessynchronizations属性。 resources是一个HashMap,用于记录当前参与事务的事务资源,方便进行事务同步,在 DataSourceTransactionManager的例子中就是以 dataSource作为key,保存了数据库连接,这样在同一个线程中,不同的方法调用就可以通过 dataSource获取相同的数据库连接,从而保证所有操作在一个事务中进行。 synchronizations属性是一个 TransactionSynchronization对象的集合, AbstractPlatformTransactionManager类中定义了事务操作各个阶段的调用流程,以事务提交为例:

  1. // AbstractPlatformTransactionManager.class

  2. private void processCommit(DefaultTransactionStatus status) throws TransactionException {

  3.    try {

  4.        boolean beforeCompletionInvoked = false;

  5.        try {

  6.            prepareForCommit(status);

  7.            triggerBeforeCommit(status);

  8.            triggerBeforeCompletion(status);

  9.            ....

  10.            else if (status.isNewTransaction()) {

  11.                // 记录日志

  12.                ...

  13.                doCommit(status);

  14.            }

  15.            ...

  16.        // 事务调用异常处理

  17.        ...

  18.        try {

  19.            triggerAfterCommit(status);

  20.        }

  21.        finally {

  22.            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);

  23.        }

  24.    }

  25. }

我们可以看到,有很多trigger前缀的方法,这些方法用于在事务操作的各个阶段触发回调,从而可以精确控制在事务执行的不同阶段所要执行的操作,这些回调实际上都通过 TransactionSynchronizationUtils来实现,它会遍历 TransactionSynchronizationManager#synchronizations集合中的 TransactionSynchronization对象,然后分别触发集合中各元素对应方法的调用。例如:

  1. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

  2.    @Override

  3.    public void afterCommit() {

  4.        // do something after commit

  5.    }

  6. });

这段代码就在当前线程的事务 synchronizations属性中,添加了一个自定义同步类,如果当前存在事务,那么在事务管理器执行事务提交之后,就会触发 afterCommit方法,可以通过这种方式在事务执行的不同阶段自定义一些操作。

到这里,我们已经对Spring事务的实现原理和处理流程有了一定的了解。

更多技术干货分享,点击【阅读原文】。

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存