SpringBoot整合分布式事务,JTA+Atomikos实现多数据源
在整合MySQL,JDBCTemplate这篇中的结尾,我们成功的向两个数据库中分别添加了一条数据。但是我们思考一下,如果PetsServiceImpl.java中的savePets()中如果发生异常了会怎么样呢?
@Override@Transactionalpublic void savePets(Pets pets) { petsDAO.save(pets, familyJdbcTemplate); petsDAO.save(pets, family2JdbcTemplate); // 异常,分母不能为零 int num = 1/0; }123456789复制代码类型:[java]
我们添加了@Transactional注解,按照正常的思路来说两个数据库中都不应该有数据加入:
数据库Family确实没有数据加入:
但是Family2却有数据加入:
这是不符合逻辑的,因为数据库事物不能跨链接,数据源更不能跨库。如果出现了上述操作那这个事务就变成了分布式事务,需要一个统一协调的管理器。所以我们要解决这个问题。
JTA实现跨库分布式事务
XA规范:是一个两阶段提交协议(同时对数据进行处理),被很多数据库和中间件支持。
两阶段提交协议(2PC):将整个事务分成准备阶段和提交阶段两个阶段。2指是两个阶段,P(Prepare),C(Commit)。
正常情况下:
准备阶段:事务管理器(TM)给Family和Family2发送Prepare消息,Family和Family2在本地执行事务,写本地的Undo/Redo日志,但是此时事务没有提交。
提交阶段:事务管理器(TM)收到Family和Family2超时或执行失败的消息时,会对Family和Family2发送回滚消息。如果事务管理器没有收到关于失败的消息,就会发送提交消息。完成回滚或是提交之后需要释放事务处理过程中使用的锁资源。
成功情况:
失败情况:
两阶段提交是在数据库层面实现的,下面我们来结合Family和Family2出现的问题进行解析。
现在我们的应用程序持有Family和Family2两个数据库,应用程序通过事务管理器同时通知Family,Family2添加数据。事务管理器收到执行的回复,两个数据库有一方失败,就向另一方发起回滚事务。回滚完毕,资源锁释放。如果都是成功,此时向所有数据库发起提交事务,提交完毕,资源锁释放。
XA规范缺点:
同步阻塞,数据库锁定资源时间太长,全局锁,并发低,不适合长事务场景,还需要本地数据库支持XA规范。
JTA规范:可以理解为XA规范的Java版本。
Atomikos:分布式事务管理器,JTA/XA的具体实现。(支持分布式事务,单独引入了事物管理器导致性能开销大,不适合用于高并发场景)
分布式事务就是跨数据库对数据进行操作的事务,要达到的目的是要么都成功,要么都失败。
首先我们在pom.xml文件中引入atomikos依赖包:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>1234复制代码类型:[java]
之后我们需要对application.yml进行修改:
server: port: 8888spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: familydb: uniqueResourceName: family xaDataSourceClassName: com.mysql.cj.jdbc.MysqlXADataSource xaProperties: url: jdbc:mysql://localhost:3306/Family?serverTimezone=GMT%2b8&characterEncoding=utf-8 username: root password: 123456 exclusiveConnectionMode: true minPoolSize: 5 maxPoolSize: 15 testQuery: SELECT 1 from dual family2db: uniqueResourceName: family2 xaDataSourceClassName: com.mysql.cj.jdbc.MysqlXADataSource xaProperties: url: jdbc:mysql://localhost:3306/Family2?serverTimezone=GMT%2b8&characterEncoding=utf-8 username: root password: 123456 exclusiveConnectionMode: true minPoolSize: 5 maxPoolSize: 15 testQuery: SELECT 1 from dual12345678910111213141516171819202122232425262728293031复制代码类型:[java]
现在再来改写一下DataSourceConfig.java:
@Configurationpublic class DataSourceConfig { // JTA数据源family @Bean @Primary @ConfigurationProperties(prefix = "familydb") public DataSource familyDataSource() { // 返回AtomikosDataSourceBean,配置属性也都是注入到这个类里面 return new AtomikosDataSourceBean(); } // JTA数据源family2 @Bean @ConfigurationProperties(prefix = "family2db") public DataSource family2DataSource() { return new AtomikosDataSourceBean(); } // familyJdbcTemplate使用familyDataSource数据源 @Bean public JdbcTemplate familyJdbcTemplate( @Qualifier("familyDataSource") DataSource familyDataSource) { return new JdbcTemplate(familyDataSource); } // family2JdbcTemplate使用family2DataSource数据源 @Bean public JdbcTemplate family2JdbcTemplate( @Qualifier("family2DataSource") DataSource family2DataSource) { return new JdbcTemplate(family2DataSource); } }1234567891011121314151617181920212223242526272829303132复制代码类型:[java]
将上述代码改写完成后,在config文件下创建TransactionManagerConfig.java:
package com.javafamily.familydemo.config;import com.atomikos.icatch.jta.UserTransactionImp;import com.atomikos.icatch.jta.UserTransactionManager;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.jta.JtaTransactionManager;import javax.transaction.SystemException;import javax.transaction.TransactionManager;import javax.transaction.UserTransaction;@Configurationpublic class TransactionManagerConfig { @Bean public UserTransaction userTransaction() throws SystemException { UserTransactionImp userTransactionImp = new UserTransactionImp(); userTransactionImp.setTransactionTimeout(10000); return userTransactionImp; } @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close") public TransactionManager atomikosTransactionManager() throws Throwable { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; } @Bean(name = "transactionManager") @DependsOn({"userTransaction", "atomikosTransactionManager"}) public PlatformTransactionManager transactionManager() throws Throwable { UserTransaction userTransaction = userTransaction(); JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager()); return manager; } }1234567891011121314151617181920212223242526272829303132333435363738394041复制代码类型:[java]
以上的代码属于事务管理器配置,事务管理器负责协调多个JTA数据源实现事务机制,这是固定的写法,不需要纠结。
完成了以上的配置之后,我们再次验证分母为零的异常问题:
报出异常后,查看分别查看两个数据库:
两个数据库由于异常都没有数据被插入,问题解决!