首页 > 面试资料 博客日记

Atomikos 实现多数据源分布式事务

2026-03-31 12:30:02面试资料围观1

本篇文章分享Atomikos 实现多数据源分布式事务,对你有帮助的话记得收藏一下,看极客资料网收获更多编程知识

Atomikos 实现多数据源分布式事务


💡 作者:古渡蓝按

个人微信公众号:微信公众号(深入浅出谈java)
感觉本篇对你有帮助可以关注一下,会不定期更新知识和面试资料、技巧!!!


❗️本文是对上篇文章进行事务补充,可优先阅读上篇文章,使其更好理解:

Springboot 实现多数据源(PostgreSQL 和 SQL Server)连接 - 古渡蓝按 - 博客园


鉴于项目结构(基于 单体或多模块应用),推荐优先使用 Atomikos 方案,它配置简单且无需额外部署中间件;本文也是重点介绍使用 Atomikos 方案

为了配合使用 Atomikos 实现多数据源分布式事务

在 Spring Boot 多数据源场景下实现分布式事务(保证多个数据库操作的一致性),主要有两种主流方案:

1、轻量级方案:使用 Atomikos 或 Narayana 实现本地事务管理器(适用于大多数单体应用的多数据源场景)。
2、重量级/微服务方案:使用 Seata 分布式事务框架(适用于微服务架构或复杂业务场景)。



下图为使用Atomikos 事务的代码流程示意图:

result (1)


流程核心解析

1、配置类的作用 (PgDataSourceConfig & SqlServerDataSourceConfig)

  • XA 数据源构建:这两个配置类不再创建普通的 HikariDataSource,而是创建 AtomikosDataSourceBean。
  • 驱动指定:分别指定了 org.postgresql.xa.PGXADataSource 和 com.microsoft.sqlserver.jdbc.SQLServerXADataSource。这是支持两阶段提交的关键,普通驱动无法参与全局事务。
  • 唯一标识:通过 setUniqueResourceName 为每个数据源分配唯一 ID(如 pg_datasource, ss_datasource),供 JTA 管理器追踪。

2、全局事务管理器 (JtaConfig)

  • 统一指挥:JtaTransactionManager 是唯一的“指挥官”。当 Service 层加上 @Transactional 时,Spring 不再寻找局部的 DataSourceTransactionManager,而是使用这个全局管理器。
  • 事务上下文:它维护一个全局的事务上下文,将所有在该方法内获取的数据库连接都纳入同一个 XID (Transaction ID) 下。

3、两阶段提交 (2PC) 机制

  • Prepare 阶段:当代码执行完所有 SQL 但未返回时,Atomikos 不会立即让数据库 Commit,而是询问所有参与者(PG 和 SS):“你们准备好提交了吗?”。数据库会锁定资源并写入 Undo Log,然后回答"Yes"。
  • Commit 阶段:只有当所有参与者都回答"Yes",Atomikos 才会下达正式的 Commit 指令。
  • Rollback 阶段:只要有一个参与者失败(或 Java 代码抛出异常),Atomikos 会向所有已 Prepare 的参与者发送 Rollback 指令,利用 Undo Log 恢复数据。

引入依赖

引入依赖、配置事务管理器,并将数据源改为 XADataSource 模式。

在 pom.xml 中添加 spring-boot-starter-jta-atomikos:


<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<!-- 确保已有 sqlserver 和 postgresql 驱动 -->


修改配置文件 (application-stand.yml)

❗️注意:XA 模式下,属性名通常变为 user 而不是 username,且必须指定 data-source-class-name。

spring:
  datasource:
    # 主数据源 (PostgreSQL)
    pg:
      xa:
        data-source-class-name: org.postgresql.xa.PGXADataSource
        url: jdbc:postgresql://127.0.0.1:5432/test1
        user: test1
        password: 123456
      driver-class-name: org.postgresql.Driver
      
    # 次数据源 (SQL Server)
    sqlserver:
      xa:
        data-source-class-name: com.microsoft.sqlserver.jdbc.SQLServerXADataSource
        url: jdbc:sqlserver://127.0.0.1:1433;databaseName=test2;encrypt=true;trustServerCertificate=true
        user: test2
        password: 123456
      driver-class-name: com.microsoft.sqlserver.jdbc.SQLServerDriver
  
  jta:
    atomikos:
      datasource:
        default-isolation-level: READ_COMMITTED
        max-pool-size: 10
        min-pool-size: 1
        borrow-connection-timeout: 30
        reap-timeout: 60
        maintenance-interval: 60
        login-timeout: 30


核心变化点:

  1. 移除 DataSourceBuilder:Atomikos 需要特定的 XADataSource 实现类,不能通过通用的 DataSourceBuilder 创建。
  2. 移除 @ConfigurationProperties 直接绑定到 DataSource:改为手动读取配置并实例化具体的 XA 驱动类(如 PGXADataSource)。
  3. 移除 DataSourceTransactionManager:替换为全局的 JtaTransactionManager(通常在一个统一配置类中定义,或者在此处定义但类型不同)。

具体修改方案

保留 SqlSessionFactory 和 SqlSessionTemplate 的配置逻辑,但依赖的 DataSource Bean 名称和类型会变。
以下是具体的修改方案:


1.修改 PgDataSourceConfig.java

你需要手动实例化 org.postgresql.xa.PGXADataSource 并包裹在 Atomikos 的容器中(或者直接返回配置好的 XA DataSource,Spring Boot Atomikos Starter 会自动拦截,但显式配置更稳妥)。

package com.example.bomcompare.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean; // 引入 Atomikos 包装类
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.sql.DataSource;
import java.util.Properties;

@Configuration
@MapperScan(basePackages = "com.example.bomcompare.mapper.pg", sqlSessionFactoryRef = "pgSqlSessionFactory")
public class PgDataSourceConfig {

    // 从配置文件读取属性
    @Value("${spring.datasource.pg.xa.url}")
    private String pgUrl;
    @Value("${spring.datasource.pg.xa.user}")
    private String pgUser;
    @Value("${spring.datasource.pg.xa.password}")
    private String pgPassword;
    @Value("${spring.datasource.pg.xa.data-source-class-name:org.postgresql.xa.PGXADataSource}")
    private String pgXaClassName;

    @Primary
    @Bean(name = "pgDataSource")
    public DataSource pgDataSource() {
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSourceClassName(pgXaClassName);
        
        // 配置 XA 属性
        Properties xaProperties = new Properties();
        xaProperties.setProperty("url", pgUrl);
        xaProperties.setProperty("user", pgUser);
        xaProperties.setProperty("password", pgPassword);
        // PostgreSQL XA 驱动可能需要设置其他属性,视具体驱动版本而定
        xaDataSource.setXaProperties(xaProperties);
        
        xaDataSource.setUniqueResourceName("pg_datasource"); // 必须唯一
        xaDataSource.setPoolSize(10);
        
        return xaDataSource;
    }

    @Primary
    @Bean(name = "pgSqlSessionFactory")
    public SqlSessionFactory pgSqlSessionFactory(@Qualifier("pgDataSource") DataSource dataSource) throws Exception {
        MybatisSqlSessionFactoryBean factoryBean = new MybatisSqlSessionFactoryBean();
        factoryBean.setDataSource(dataSource);
        factoryBean.setTypeAliasesPackage("com.example.entity.pg");
        
        MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
        interceptor.addInnerInterceptor(new PaginationInnerInterceptor());
        factoryBean.setPlugins(interceptor);
        
        return factoryBean.getObject();
    }

    @Primary
    @Bean(name = "pgSqlSessionTemplate")
    public SqlSessionTemplate pgSqlSessionTemplate(@Qualifier("pgSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
    
    // 注意:TransactionManager 建议统一在一个配置类中定义,见下文说明
}


2. 创建/修改 SQL Server 的配置类 (SqlServerDataSourceConfig.java)


// 简略示例,结构同上
@Configuration
@MapperScan(basePackages = "com.example.bomcompare.mapper.sqlserver", sqlSessionFactoryRef = "sqlserverSqlSessionFactory")
public class SqlServerDataSourceConfig {
    
    @Value("${spring.datasource.sqlserver.xa.url}")
    private String ssUrl;
    // ... 其他属性
    
    @Bean(name = "sqlserverDataSource")
    public DataSource sqlserverDataSource() {
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSourceClassName("com.microsoft.sqlserver.jdbc.SQLServerXADataSource");
        // ... 设置属性和 uniqueResourceName ("sqlserver_datasource")
        return xaDataSource;
    }
    
    // ... 配置 SqlSessionFactory 和 SqlSessionTemplate
}


3.配置全局事务管理器 (JtaConfig.java)

关键步骤:你需要一个统一的配置类来定义 JtaTransactionManager,并移除原来各个配置类中的 DataSourceTransactionManager。


package com.example.bomcompare.config;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.config.UserTransactionServiceImp;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.UserTransaction;

@Configuration
public class JtaConfig {

    @Bean(name = "transactionManager")
    public PlatformTransactionManager transactionManager() throws Exception {
        // 初始化 Atomikos UserTransaction
        UserTransaction userTransaction = new UserTransactionImp();
        ((UserTransactionImp) userTransaction).setTransactionTimeout(300);
        
        // 创建 JtaTransactionManager
        JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
        jtaTransactionManager.setUserTransaction(userTransaction);
        // 如果需要显式指定 TransactionManager,也可以注入,但通常 UserTransaction 足够
        
        return jtaTransactionManager;
    }
    
    // 可选:配置 Atomikos 的一些全局参数
    @Bean(initMethod = "init", destroyMethod = "close")
    public UserTransactionServiceImp userTransactionService() {
        return new UserTransactionServiceImp();
    }
}


业务代码使用

在 Service 层添加 @Transactional 注解。由于启用了 JTA,Spring 会自动协调两个数据源的事务。

@Service
public class MaterialService {

    @Autowired
    private WkTditemMapper wkTditemMapper; // PG 库
    
    @Autowired
    private MaterialMapper materialMapper; // SQL Server 库

    // 开启分布式事务
    @Transactional(rollbackFor = Exception.class) 
    @Scheduled(fixedRate = 10000)
    public void updateMaterialAdmit() {
        try {
            // 操作 PG 库
            List<WkTditem> list = wkTditemMapper.selectList(...);
            
            // 操作 SQL Server 库
            List<MaterialVO> voList = materialMapper.selectByMaterialNo(...);
            
            // 如果这里抛出异常,两个库的操作都会回滚
            if (voList.isEmpty()) {
                throw new RuntimeException("模拟异常,触发回滚");
            }
            
            // 执行更新操作...
        } catch (Exception e) {
            log.error("事务执行失败", e);
            throw e; // 必须抛出异常才能触发回滚
        }
    }
}



最终检查清单

  • 删除旧代码:

在 PgDataSourceConfig.java 和 SQL Server 配置类中,删除 public DataSourceTransactionManager xxxTransactionManager(...) 方法。分布式事务下不再使用局部的 DataSourceTransactionManager。

  • YAML 对齐:

确保 application-stand.yml 中的配置键名与 Java 代码中的 @Value 一致(例如 spring.datasource.pg.xa.url)。

  • 业务调用:

在 Service 层使用 @Transactional 时,默认会使用上面配置的 jtaTransactionManager(因为它是主事务管理器,或者你可以在 @Transactional(transactionManager = "transactionManager") 中显式指定)。
依赖确认:再次确认 pom.xml 中包含 spring-boot-starter-jta-atomikos。



补充方案:使用 Seata (适合微服务或复杂场景)



两者差异

避免过于技术细节,但要点突出差异。 Atomikos vs Seata:分布式事务方案对比

维度 Atomikos Seata
定位 Java 事务 API(JTA)实现,提供 XA 两阶段提交(2PC)支持 分布式事务框架,支持多种模式(AT、TCC、SAGA、XA),面向微服务
核心原理 基于 XA 协议(2PC),通过 XAResource 协调多个资源(数据库、消息队列等) 以“全局事务”为中心,通过事务协调器(TC)管理分支事务,不同模式采用不同协议
事务模式 仅 XA(2PC) AT(自动补偿)、TCC(手动补偿)、SAGA(长事务)、XA(兼容 2PC)
资源管理 每个数据库连接需配置 XA 驱动,事务管理器统一提交/回滚 AT 模式基于 SQL 逆向生成回滚日志;TCC/SAGA 由业务定义补偿操作
一致性 强一致性(XA 保证 ACID) AT/TCC/SAGA 最终一致性,XA 模式强一致性
性能 2PC 阻塞,性能较低,事务时间较长,锁资源 AT 模式无业务侵入,性能较高(但存在全局锁);TCC 需业务实现,性能可控
适用场景 单体应用内跨多个数据库的分布式事务(JTA 场景) 微服务架构下跨多个服务/数据库的分布式事务,尤其适合高并发、最终一致性场景
部署复杂度 应用内嵌入,需配置 XA 数据源,启动参数添加 Java 代理 需独立部署 TC(事务协调器)服务,客户端引入 SDK 即可
典型技术栈 Spring + Hibernate/JPA + XA 数据源(如 Atomikos、Bitronix) Spring Cloud / Dubbo 微服务框架,Seata 集成方便
社区活跃度 相对老旧,更新较慢 阿里巴巴开源,活跃社区,持续迭代

总结

  • Atomikos:适合单体应用内,需要强一致性且跨多个数据库的场景。配置简单,但性能瓶颈明显,不适合高并发分布式系统。
  • Seata:专为微服务架构设计,支持多种事务模式,可在强一致性与最终一致性之间灵活选择。AT 模式无业务侵入,性能优于 XA,但需要独立部署 TC 服务。

选型建议

  • 传统单体应用需要强一致、跨库事务 → Atomikos
  • 微服务架构,要求高性能、高可用,且可接受最终一致性 → Seata AT
  • 业务对一致性要求极高(如金融)且性能可接受 → Seata XA 或结合 TCC 补偿。

大体实现逻辑

如果你的系统已经接入或计划接入微服务架构,或者 Atomikos 性能无法满足要求,可以使用阿里开源的 Seata。

  • 部署 Seata Server:需要单独部署一个 Seata 服务端(TC)。
  • 引入依赖:seata-spring-boot-starter。
  • 配置:在 application.yml 中配置 seata.tx-service-group 和注册中心地址。
  • 注解:在方法上添加 @GlobalTransactional 代替 @Transactional。
@GlobalTransactional(rollbackFor = Exception.class)
public void updateMaterialAdmit() {
    // 业务逻辑同上
}



文章来源:https://www.cnblogs.com/blbl-blog/p/19794767
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云