SpringBoot整合分布式事务,JTA+Atomikos实现多数据源

在整合MySQL,JDBCTemplate这篇中的结尾,我们成功的向两个数据库中分别添加了一条数据。但是我们思考一下,如果PetsServiceImpl.java中的savePets()中如果发生异常了会怎么样呢?

@Override@Transactionalpublic void savePets(Pets pets) {
 petsDAO.save(pets, familyJdbcTemplate);
 petsDAO.save(pets, family2JdbcTemplate); // 异常,分母不能为零
 int num = 1/0;
}123456789复制代码类型:[java]

我们添加了@Transactional注解,按照正常的思路来说两个数据库中都不应该有数据加入:

数据库Family确实没有数据加入:

但是Family2却有数据加入:

这是不符合逻辑的,因为数据库事物不能跨链接,数据源更不能跨库。如果出现了上述操作那这个事务就变成了分布式事务,需要一个统一协调的管理器。所以我们要解决这个问题。

JTA实现跨库分布式事务

XA规范:是一个两阶段提交协议(同时对数据进行处理),被很多数据库和中间件支持。

两阶段提交协议(2PC):将整个事务分成准备阶段和提交阶段两个阶段。2指是两个阶段,P(Prepare),C(Commit)。

正常情况下:

准备阶段:事务管理器(TM)给Family和Family2发送Prepare消息,Family和Family2在本地执行事务,写本地的Undo/Redo日志,但是此时事务没有提交。

提交阶段:事务管理器(TM)收到Family和Family2超时或执行失败的消息时,会对Family和Family2发送回滚消息。如果事务管理器没有收到关于失败的消息,就会发送提交消息。完成回滚或是提交之后需要释放事务处理过程中使用的锁资源。

成功情况:

失败情况:

两阶段提交是在数据库层面实现的,下面我们来结合Family和Family2出现的问题进行解析。

现在我们的应用程序持有Family和Family2两个数据库,应用程序通过事务管理器同时通知Family,Family2添加数据。事务管理器收到执行的回复,两个数据库有一方失败,就向另一方发起回滚事务。回滚完毕,资源锁释放。如果都是成功,此时向所有数据库发起提交事务,提交完毕,资源锁释放。

XA规范缺点:

同步阻塞,数据库锁定资源时间太长,全局锁,并发低,不适合长事务场景,还需要本地数据库支持XA规范。

JTA规范:可以理解为XA规范的Java版本。

Atomikos:分布式事务管理器,JTA/XA的具体实现。(支持分布式事务,单独引入了事物管理器导致性能开销大,不适合用于高并发场景)

分布式事务就是跨数据库对数据进行操作的事务,要达到的目的是要么都成功,要么都失败。

首先我们在pom.xml文件中引入atomikos依赖包:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>1234复制代码类型:[java]

之后我们需要对application.yml进行修改:

server:
  port: 8888spring:
  jackson:
 date-format: yyyy-MM-dd HH:mm:ss
 time-zone: GMT+8

  datasource:
 familydb:
   uniqueResourceName: family
   xaDataSourceClassName: com.mysql.cj.jdbc.MysqlXADataSource
   xaProperties:
  url: jdbc:mysql://localhost:3306/Family?serverTimezone=GMT%2b8&characterEncoding=utf-8
  username: root
  password: 123456
   exclusiveConnectionMode: true
   minPoolSize: 5
   maxPoolSize: 15
   testQuery: SELECT 1 from dual
 family2db:
   uniqueResourceName: family2
   xaDataSourceClassName: com.mysql.cj.jdbc.MysqlXADataSource
   xaProperties:
  url: jdbc:mysql://localhost:3306/Family2?serverTimezone=GMT%2b8&characterEncoding=utf-8
  username: root
  password: 123456
   exclusiveConnectionMode: true
   minPoolSize: 5
   maxPoolSize: 15
   testQuery: SELECT 1 from dual12345678910111213141516171819202122232425262728293031复制代码类型:[java]

现在再来改写一下DataSourceConfig.java:

@Configurationpublic class DataSourceConfig { // JTA数据源family
 @Bean
 @Primary
 @ConfigurationProperties(prefix = "familydb")
 public DataSource familyDataSource() {  // 返回AtomikosDataSourceBean,配置属性也都是注入到这个类里面
  return new AtomikosDataSourceBean();
 } // JTA数据源family2
 @Bean
 @ConfigurationProperties(prefix = "family2db")
 public DataSource family2DataSource()  {  return new AtomikosDataSourceBean();
 } // familyJdbcTemplate使用familyDataSource数据源
 @Bean
 public JdbcTemplate familyJdbcTemplate(   @Qualifier("familyDataSource") DataSource familyDataSource) {  return new JdbcTemplate(familyDataSource);
 } // family2JdbcTemplate使用family2DataSource数据源
 @Bean
 public JdbcTemplate family2JdbcTemplate(   @Qualifier("family2DataSource") DataSource family2DataSource) {  return new JdbcTemplate(family2DataSource);
 }
}1234567891011121314151617181920212223242526272829303132复制代码类型:[java]

将上述代码改写完成后,在config文件下创建TransactionManagerConfig.java:

package com.javafamily.familydemo.config;import com.atomikos.icatch.jta.UserTransactionImp;import com.atomikos.icatch.jta.UserTransactionManager;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.jta.JtaTransactionManager;import javax.transaction.SystemException;import javax.transaction.TransactionManager;import javax.transaction.UserTransaction;@Configurationpublic class TransactionManagerConfig { @Bean
 public UserTransaction userTransaction() throws SystemException {
  UserTransactionImp userTransactionImp = new UserTransactionImp();
  userTransactionImp.setTransactionTimeout(10000);  return userTransactionImp;
 } @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
 public TransactionManager atomikosTransactionManager() throws Throwable {
  UserTransactionManager userTransactionManager = new UserTransactionManager();
  userTransactionManager.setForceShutdown(false);  return userTransactionManager;
 } @Bean(name = "transactionManager")
 @DependsOn({"userTransaction", "atomikosTransactionManager"})
 public PlatformTransactionManager transactionManager() throws Throwable {
  UserTransaction userTransaction = userTransaction();

  JtaTransactionManager manager = new JtaTransactionManager(userTransaction, atomikosTransactionManager());  return manager;
 }
}1234567891011121314151617181920212223242526272829303132333435363738394041复制代码类型:[java]

以上的代码属于事务管理器配置,事务管理器负责协调多个JTA数据源实现事务机制,这是固定的写法,不需要纠结。

完成了以上的配置之后,我们再次验证分母为零的异常问题:

报出异常后,查看分别查看两个数据库:

两个数据库由于异常都没有数据被插入,问题解决!

(0)

相关推荐