SpringDataJPA多数据源及JPA+atomikos实现分布式事务
在之前的文章中我们已经介绍了SpringJDBC的多数据源实现(在一个项目中操作多个数据库),比较常见的多数据源的支持方式有两种:

把数据源作为参数传入到调用方法内部,需要我们手动传参。
不同的包下面的接口函数自动注入不同的数据源。
第一种方法比较麻烦,会增加额外的代码。所以我们使用第二种方式来实现JPA的多数据源支持。
首先来修改.yml文件,配置两个数据源:
server: port: 8888spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: Family: driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: mysql://localhost:3306/Family?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 Family2: driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: mysql://localhost:3306/Family2?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 jpa: # Hibernate 创建数据库表的时候,默认使用的数据库存储引擎是 MyISAM # database-platform在建表的时候将存储引擎切换为 InnoDB database-platform: org.hibernate.dialect.MySQL5InnoDBDialect hibernate: # 在Hibernate每次加载的时候,都会验证数据库中的表结构是否跟model类中字段的定义是一致的,如果不一致就抛出异常 ddl-auto: validate naming: physical-strategy: org.hibernate.boot.model.naming.PhysicalNamingStrategyStandardImpl database: mysql # 在日志打印出执行的sql语句 show-sql: true1234567891011121314151617181920212223242526272829303132复制代码类型:[java]
在Dao文件下创建文件夹db和db2,将model中的Pets.java移动到db文件下并把Dao文件下原有的PetsRepository也移动到db文件夹下。在db2下新创建一个实体类Doctor和DoctorRepository接口:

package com.javafamily.familydemo.dao.db2;import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor;import javax.persistence.*;@Data@Entity@Builder@Table(name = "doctor")public class Doctor { @Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private long id; @Column(nullable = false)
private String name; @Column(nullable = false)
private String title;
}123456789101112131415161718192021222324复制代码类型:[java]package com.javafamily.familydemo.dao.db2;import com.javafamily.familydemo.model.Pets;import org.springframework.data.jpa.repository.JpaRepository;public interface DoctorRepository extends JpaRepository<Doctor, Long> {
}12345678复制代码类型:[java]
现在我们需要实现将Family数据源给PetsRepository使用,Family2数据源给db2使用。
接下来我们来实现配置:在config文件下创建JPAFamilyConfig.java和JPAFamily2Config.java
package com.javafamily.familydemo.config;import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jdbc.DataSourceBuilder;import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;import org.springframework.orm.jpa.JpaTransactionManager;import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.annotation.EnableTransactionManagement;import javax.annotation.Resource;import javax.persistence.EntityManager;import javax.sql.DataSource;import java.util.Map;@Configuration@EnableTransactionManagement@EnableJpaRepositories(
entityManagerFactoryRef = "entityManagerFactoryFamily",
transactionManagerRef = "transactionManagerFamily",
// 换成你自己的Repository所在位置
basePackages = {"com.javafamily.familydemo.dao.db"})public class JPAFamilyConfig { @Resource
private JpaProperties jpaProperties; @Primary
@Bean(name = "familyDataSource")
@ConfigurationProperties(prefix = "spring.datasource.family")
public DataSource familyDataSource() { return DataSourceBuilder.create().build();
} @Primary
// 实体管理器
@Bean(name = "entityManagerFamily")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) { return entityManagerFactoryFamily(builder).getObject().createEntityManager();
} @Primary
// 实体工厂
@Bean(name = "entityManagerFactoryFamily")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryFamily(EntityManagerFactoryBuilder builder) { return builder.dataSource(familyDataSource())
.properties(jpaProperties.getProperties()) // 换成你自己的实体类所在位置
.packages("com.javafamily.familydemo.dao.db")
.persistenceUnit("familyPersistenceUnit")
.build();
} @Primary
// 事务管理器
@Bean(name = "transactionManagerFamily")
public PlatformTransactionManager transactionManagerFamily(EntityManagerFactoryBuilder builder) { return new JpaTransactionManager(entityManagerFactoryFamily(builder).getObject());
}
}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768复制代码类型:[java]package com.javafamily.familydemo.config;import org.springframework.boot.autoconfigure.orm.jpa.JpaProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jdbc.DataSourceBuilder;import org.springframework.boot.orm.jpa.EntityManagerFactoryBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;import org.springframework.orm.jpa.JpaTransactionManager;import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.annotation.EnableTransactionManagement;import javax.annotation.Resource;import javax.persistence.EntityManager;import javax.sql.DataSource;@Configuration@EnableTransactionManagement@EnableJpaRepositories(
entityManagerFactoryRef = "entityManagerFactoryFamily2",
transactionManagerRef = "transactionManagerFamily2",
// 换成你自己的Repository所在位置
basePackages = {"com.javafamily.familydemo.dao.db2"})public class JPAFamily2Config { @Resource
private JpaProperties jpaProperties; @Bean(name = "family2DataSource")
@ConfigurationProperties(prefix = "spring.datasource.family2")
public DataSource family2DataSource() { return DataSourceBuilder.create().build();
} @Bean(name = "entityManagerFamily2")
public EntityManager entityManager(EntityManagerFactoryBuilder builder) { return entityManagerFactoryFamily2(builder).getObject().createEntityManager();
} @Bean(name = "entityManagerFactoryFamily2")
public LocalContainerEntityManagerFactoryBean entityManagerFactoryFamily2(EntityManagerFactoryBuilder builder) { return builder.dataSource(family2DataSource())
.properties(jpaProperties.getProperties()) // 换成你自己的实体类所在位置
.packages("com.javafamily.familydemo.dao.db2")
.persistenceUnit("family2PersistenceUnit")
.build();
} @Bean(name = "transactionManagerFamily2")
public PlatformTransactionManager transactionManagerFamily2(EntityManagerFactoryBuilder builder) { return new JpaTransactionManager(entityManagerFactoryFamily2(builder).getObject());
}
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758复制代码类型:[java]
完成上述配置后,再编写测试类对多数据源进行测试。
package com.javafamily.familydemo;import com.javafamily.familydemo.dao.db.Pets;import com.javafamily.familydemo.dao.db.PetsRepository;import com.javafamily.familydemo.dao.db2.Doctor;import com.javafamily.familydemo.dao.db2.DoctorRepository;import org.junit.jupiter.api.Test;import org.junit.jupiter.api.extension.ExtendWith;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit.jupiter.SpringExtension;import javax.annotation.Resource;import java.util.Date;@ExtendWith(SpringExtension.class)@SpringBootTestpublic class KeyWordsTest { @Resource
private PetsRepository petsRepository; @Resource
private DoctorRepository doctorRepository; @Test
public void Test() {
Pets pets = petsRepository.findPetsByName("fish");
System.out.println(pets);
} @Test
public void jpaTest() {
Pets pets = Pets.builder()
.id(2L)
.name("JavaFamily")
.varieties("chai")
.createTime(new Date())
.build();
Doctor doctor = Doctor.builder()
.name("petsDoctor").title("director").build(); // 先构造一个Pets对象pets,这个操作针对db
petsRepository.save(pets); //在构造一个Doctor对象doctor,这个操作针对db2
doctorRepository.save(doctor);
}
}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647复制代码类型:[java]
执行代码,查看数据库会发现两张表的数据都已经插入成功。


当有异常发生时按照正常的逻辑,两条数据都应该插入失败。
我们先将数据库中之前的数据全部清除,之后改写PetsServiceImpl.java的save方法,并且添加分母为0的异常。
@Resource
private PetsRepository petsRepository; @Resource
private DoctorRepository doctorRepository; @Resource
private Mapper dozerMapper; @Transactional
public void savePets(PetsVO pets) {
Pets petsPO = dozerMapper.map(pets, Pets.class); // 通过insert,保存一个对象
petsRepository.save(petsPO);
doctorRepository.save(new Doctor(3,"Family2","doctor")); int num = 1/0;
}1234567891011121314151617复制代码类型:[java]
执行代码,在postman中添加请求:

得到报错后,查看数据库。

第一个数据库没有数据传入。

第二个数据库有数据传入。
这是因为数据库事物不能跨链接,数据源更不能跨库。如果出现了上述操作那这个事务就变成了分布式事务,需要一个统一协调的管理器。下面让我们来实现JPA+atomikos实现分布式事务。
首先引入maven依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency>1234复制代码类型:[java]
之后改写.yml文件:
spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 datasource: family: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/Family?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 family2: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/Family2?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: 123456 # jta表示分布式事物 jta: atomikos: datasource: # 数据池最大连接数 max-pool-size: 30 # 超出设定时间抛出异常 borrow-connection-timeout: 100 connectionfactory: max-pool-size: 30 borrow-connection-timeout: 10012345678910111213141516171819202122232425262728复制代码类型:[java]
注:将jdbc-url还原成url
在config文件下创建AtomikosJtaPlatform.java,这部分的代码为固定代码。
package com.javafamily.familydemo.config;import org.hibernate.engine.transaction.jta.platform.internal.AbstractJtaPlatform;import javax.transaction.TransactionManager;import javax.transaction.UserTransaction;public class AtomikosJtaPlatform extends AbstractJtaPlatform { private static final long serialVersionUID = 1L; static TransactionManager transactionManager; static UserTransaction transaction; @Override
protected TransactionManager locateTransactionManager() { return transactionManager;
} @Override
protected UserTransaction locateUserTransaction() { return transaction;
}
}12345678910111213141516171819202122232425复制代码类型:[java]
再来进行事物管理器配置。在config文件夹下创建JPAAtomikosTransactionConfig.java:
package com.javafamily.familydemo.config;import com.atomikos.icatch.jta.UserTransactionImp;import com.atomikos.icatch.jta.UserTransactionManager;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.ComponentScan;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;import org.springframework.orm.jpa.JpaVendorAdapter;import org.springframework.orm.jpa.vendor.Database;import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;import org.springframework.transaction.PlatformTransactionManager;import org.springframework.transaction.annotation.EnableTransactionManagement;import org.springframework.transaction.jta.JtaTransactionManager;import javax.transaction.TransactionManager;import javax.transaction.UserTransaction;@Configuration@ComponentScan@EnableTransactionManagementpublic class JPAAtomikosTransactionConfig { @Bean
public PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { return new PropertySourcesPlaceholderConfigurer();
} // JPA特性
@Bean
public JpaVendorAdapter jpaVendorAdapter() {
HibernateJpaVendorAdapter hibernateJpaVendorAdapter = new HibernateJpaVendorAdapter();
hibernateJpaVendorAdapter.setShowSql(true);
hibernateJpaVendorAdapter.setGenerateDdl(true);
hibernateJpaVendorAdapter.setDatabase(Database.MYSQL); return hibernateJpaVendorAdapter;
} @Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(5000); return userTransactionImp;
} @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
public TransactionManager atomikosTransactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
AtomikosJtaPlatform.transactionManager = userTransactionManager; return userTransactionManager;
} @Bean(name = "transactionManager")
@DependsOn({"userTransaction", "atomikosTransactionManager"})
public PlatformTransactionManager transactionManager() throws Throwable {
UserTransaction userTransaction = userTransaction();
AtomikosJtaPlatform.transaction = userTransaction;
TransactionManager atomikosTransactionManager = atomikosTransactionManager(); return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
}
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364复制代码类型:[java]
设置第一个数据库Family的数据源和实体扫描管理:
package com.javafamily.familydemo.config;import com.mysql.cj.jdbc.MysqlXADataSource;import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.context.annotation.Primary;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;import org.springframework.orm.jpa.JpaVendorAdapter;import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;import javax.annotation.Resource;import javax.sql.DataSource;import java.sql.SQLException;import java.util.HashMap;@Configuration@DependsOn("transactionManager")@EnableJpaRepositories(basePackages = "com.javafamily.familydemo.dao.db",
entityManagerFactoryRef = "familyEntityManager",
transactionManagerRef = "transactionManager")public class JPAFamilyConfig { @Resource
private JpaVendorAdapter jpaVendorAdapter; @Primary
@Bean
@ConfigurationProperties(prefix = "spring.datasource.family")
public DataSourceProperties familyDataSourceProperties() { return new DataSourceProperties();
} @Primary
@Bean
@ConfigurationProperties(prefix = "spring.datasource.family")
public DataSource familyDataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(familyDataSourceProperties().getUrl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(familyDataSourceProperties().getPassword());
mysqlXaDataSource.setUser(familyDataSourceProperties().getUsername());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("family");
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setMaxPoolSize(20); return xaDataSource;
} @Primary
@Bean
@DependsOn("transactionManager")
public LocalContainerEntityManagerFactoryBean familyEntityManager() throws Throwable {
HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
properties.put("javax.persistence.transactionType", "JTA");
LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
entityManager.setJtaDataSource(familyDataSource());
entityManager.setJpaVendorAdapter(jpaVendorAdapter); //这里要修改成主数据源的扫描包
entityManager.setPackagesToScan("com.javafamily.familydemo.dao.db");
entityManager.setPersistenceUnitName("familyPersistenceUnit");
entityManager.setJpaPropertyMap(properties); return entityManager;
}
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970复制代码类型:[java]
设置第二个数据库Family的数据源和实体扫描管理:
package com.javafamily.familydemo.config;import com.mysql.cj.jdbc.MysqlXADataSource;import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.DependsOn;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;import org.springframework.orm.jpa.JpaVendorAdapter;import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;import javax.annotation.Resource;import javax.sql.DataSource;import java.sql.SQLException;import java.util.HashMap;@Configuration@DependsOn("transactionManager")@EnableJpaRepositories(basePackages = "com.javafamily.familydemo.dao.db2",
entityManagerFactoryRef = "family2EntityManager",
transactionManagerRef = "transactionManager")public class JPAFamily2Config { @Resource
private JpaVendorAdapter jpaVendorAdapter; @Bean
@ConfigurationProperties(prefix = "spring.datasource.family2") //注意这里
public DataSourceProperties family2DataSourceProperties() { return new DataSourceProperties();
} // @Bean(name = "family2DataSource", initMethod = "init", destroyMethod = "close")
@Bean
@ConfigurationProperties(prefix = "spring.datasource.family2")
public DataSource family2DataSource() throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
mysqlXaDataSource.setUrl(family2DataSourceProperties().getUrl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(family2DataSourceProperties().getPassword());
mysqlXaDataSource.setUser(family2DataSourceProperties().getUsername());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("family2");
xaDataSource.setBorrowConnectionTimeout(60);
xaDataSource.setMaxPoolSize(20); return xaDataSource;
} @Bean
@DependsOn("transactionManager")
public LocalContainerEntityManagerFactoryBean family2EntityManager() throws Throwable {
HashMap<String, Object> properties = new HashMap<String, Object>();
properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
properties.put("javax.persistence.transactionType", "JTA");
LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
entityManager.setJtaDataSource(family2DataSource());
entityManager.setJpaVendorAdapter(jpaVendorAdapter); //这里要修改成主数据源的扫描包
entityManager.setPackagesToScan("com.javafamily.familydemo.dao.db2");
entityManager.setPersistenceUnitName("family2PersistenceUnit");
entityManager.setJpaPropertyMap(properties); return entityManager;
}
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869复制代码类型:[java]
通过以上代码我们会发现,除了事物管理器只有一个以外,其他都是两个。不同的数据源通过同一个事物管理器实现了分布式事务。
这时我们再次执行代码,会发现报错之后两个数据库都没有数据库插入。



当把错误代码去除后(intnum=1/0;)再次执行,两个数据库的数据均已插入。



