复古 护眼 海天 深邃 暗黑 默认

Seata 事务

背景信息

Apache ShardingSphere 提供 BASE 事务,集成了 Seata 的实现。本文所指 Seata 集成均指向 Seata AT 模式。

前提条件

引入 Maven 依赖,并排除 io.seata:seata-all 中过时的 org.antlr:antlr4-runtime:4.8 的 Maven 依赖。

<project>
    <dependencies>
      <dependency>
         <groupId>org.apache.shardingsphere</groupId>
         <artifactId>shardingsphere-jdbc</artifactId>
         <version>${shardingsphere.version}</version>
      </dependency>
      <dependency>
         <groupId>org.apache.shardingsphere</groupId>
         <artifactId>shardingsphere-transaction-base-seata-at</artifactId>
         <version>${shardingsphere.version}</version>
      </dependency>
      <dependency>
         <groupId>io.seata</groupId>
         <artifactId>seata-all</artifactId>
         <version>2.0.0</version>
         <exclusions>
            <exclusion>
               <groupId>org.antlr</groupId>
               <artifactId>antlr4-runtime</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
    </dependencies>
</project>

操作步骤

  1. 启动 Seata Server
  2. 创建日志表
  3. 添加 Seata 配置

配置示例

启动 Seata Server

按照如下任一链接的步骤,下载并启动 Seata 服务器。 合理的启动方式应通过 Docker Hub 中的 seataio/seata-server 的 Docker Image 来实例化 Seata 服务器。 对于 apache/incubator-seata:v2.0.0 及更早的 Seata 版本,应使用 Docker Hub 中的 seataio/seata-server。 否则应使用 Docker Hub 中的 apache/seata-server

创建 undo_log 表

在每一个 ShardingSphere 涉及的真实数据库实例中创建 undo_log 表。 SQL 的内容以 https://github.com/apache/incubator-seata/tree/v2.0.0/script/client/at/db 内对应的数据库为准。 以下内容以 MySQL 为例。

CREATE TABLE IF NOT EXISTS `undo_log`
(
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
    ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';

ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);

修改配置

在自有项目的 ShardingSphere 的 YAML 配置文件写入如下内容,参考 分布式事务。 若初始化 ShardingSphere JDBC DataSource 时使用的是 Java API,参考 分布式事务

transaction:
   defaultType: BASE
   providerType: Seata

在 classpath 的根目录中增加 seata.conf 文件, 配置文件格式参考 io.seata.config.FileConfigurationJavaDoc

seata.conf 存在四个属性,

  1. shardingsphere.transaction.seata.at.enable,当此值为true时,开启 ShardingSphere 的 Seata AT 集成。存在默认值为 true
  2. shardingsphere.transaction.seata.tx.timeout,全局事务超时(秒)。存在默认值为 60
  3. client.application.id,应用唯一主键,用于设置 Seata Transaction Manager Client 和 Seata Resource Manager Client 的 applicationId
  4. client.transaction.service.group,所属事务组, 用于设置 Seata Transaction Manager Client 和 Seata Resource Manager Client 的 transactionServiceGroup。 存在默认值为 default

一个完全配置的 seata.conf 如下,

shardingsphere.transaction.seata.at.enable = true
shardingsphere.transaction.seata.tx.timeout = 60

client {
    application.id = example
    transaction.service.group = default_tx_group
}

一个最小配置的 seata.conf 如下。 由 ShardingSphere 管理的 seata.conf 中, client.transaction.service.group 的默认值设置为 default 是出于历史原因。 假设用户使用的 Seata Server 和 Seata Client 的 registry.conf 中,registry.typeconfig.type 均为 file, 则对于 registry.confconfig.file.name 配置的 .conf 文件中,事务分组名在 apache/incubator-seata:v1.5.1 之后默认值为 default_tx_group, 反之则为 my_test_tx_group

client.application.id = example

根据实际场景修改 Seata 的 registry.conf 文件。

使用限制

ShardingSphere 的 Seata 集成不支持隔离级别。

ShardingSphere 的 Seata 集成将获取到的 Seata 全局事务置入线程的局部变量。 而 org.apache.seata.spring.annotation.GlobalTransactionScanner 则是采用 Dynamic Proxy 的方式对方法进行增强。 这意味着用户在使用 ShardingSphere 的 Seata 集成时,用户应避免使用 io.seata:seata-all 的 Java API, 除非用户正在混合使用 ShardingSphere 的 Seata 集成与 Seata Client 的 TCC 模式特性。

针对 ShardingSphere 数据源,讨论 6 种情况,

  1. 手动获取从 ShardingSphere 数据源创建的 java.sql.Connection 实例,并手动调用 setAutoCommit(), commit()rollback() 方法, 这是被允许的。

  2. 在函数上使用 Jakarta EE 8 的 javax.transaction.Transactional 注解,这是被允许的。

  3. 在函数上使用 Jakarta EE 9/10 的 jakarta.transaction.Transactional 注解,这是被允许的。

  4. 在函数上使用 Spring Framework 的 org.springframework.transaction.annotation.Transactional 注解,这是被允许的。

  5. 在函数上使用 io.seata.spring.annotation.GlobalTransactional 注解,这是不被允许的

  6. 手动从 io.seata.tm.api.GlobalTransactionContext 创建 io.seata.tm.api.GlobalTransaction 实例, 调用 io.seata.tm.api.GlobalTransaction 实例的 begin(), commit()rollback() 方法,这是不被允许的

在使用 Spring Boot 的实际情景中, com.alibaba.cloud:spring-cloud-starter-alibaba-seataio.seata:seata-spring-boot-starter 常常被其他 Maven 依赖传递引入。 为了避开事务冲突,用户需要在 Spring Boot 的配置文件中将 seata.enable-auto-data-source-proxy 的属性置为 false。一个可能的依赖关系如下。

<project>
    <dependencies>
      <dependency>
         <groupId>org.apache.shardingsphere</groupId>
         <artifactId>shardingsphere-jdbc</artifactId>
         <version>${shardingsphere.version}</version>
      </dependency>
      <dependency>
         <groupId>org.apache.shardingsphere</groupId>
         <artifactId>shardingsphere-transaction-base-seata-at</artifactId>
         <version>${shardingsphere.version}</version>
      </dependency>
      <dependency>
         <groupId>io.seata</groupId>
         <artifactId>seata-spring-boot-starter</artifactId>
         <version>2.0.0</version>
         <exclusions>
            <exclusion>
               <groupId>org.antlr</groupId>
               <artifactId>antlr4-runtime</artifactId>
            </exclusion>
         </exclusions>
      </dependency>
    </dependencies>
</project>

classpath 下对应的 application.yml 需要包含以下配置。 在此情况下,在 Spring Boot 的 application.yaml 内定义 Seata 的 registry.conf 的等价配置依然有效。 当下游项目使用 org.apache.shardingsphere:shardingsphere-transaction-base-seata-at 的 Maven 模块时,总是被鼓励使用 registry.conf 配置 Seata Client。

seata:
  enable-auto-data-source-proxy: false

与 Seata TCC 模式特性混合使用

对于设置开启 ShardingSphere 的 Seata 集成的情况下, 在与 ShardingSphere JDBC DataSource 无关的业务函数中,如需在业务函数使用 Seata Client 的 Seata TCC 模式相关的特性, 可实例化一个未代理的普通 TCC 接口实现类, 然后使用 io.seata.integration.tx.api.util.ProxyUtil 创建一个代理的TCC接口类, 并调用 TCC 接口实现类 TryConfirmCancel 三个阶段对应的函数。

对于由 Seata TCC 模式而引入的 io.seata.spring.annotation.GlobalTransactional 注解或 Seata TCC 模式涉及的业务函数中需要与数据库实例交互, 此注解标记的业务函数内不应使用 ShardingSphere JDBC DataSource, 而是应该手动创建javax.sql.DataSource 实例,或从自定义的 Spring Bean 中获取 javax.sql.DataSource 实例。

跨服务调用的事务传播

跨服务调用场景下的事务传播,并不像单个微服务内的事务操作一样开箱即用。 对于 Seata Server,跨服务调用场景下的事务传播,要把 XID 通过服务调用传递到服务提供方,并绑定到 io.seata.core.context.RootContext 中去。 参考 https://seata.apache.org/docs/user/api/ 。这需要讨论两种情况,

  1. 在使用 ShardingSphere JDBC 的场景下,跨多个微服务的事务场景需要考虑在起点微服务的上下文使用 io.seata.core.context.RootContext.getXID() 获取 Seata XID 后, 通过 HTTP 或 RPC 等手段传递给终点微服务,并在终点微服务的 Filter 或 Spring WebMVC HandlerInterceptor 中处理。 Spring WebMVC HandlerInterceptor 仅适用于 Spring Boot 微服务,对 Quarkus,Micronaut Framework 和 Helidon 无效。

  2. 在使用 ShardingSphere Proxy 的场景下,多个微服务均对着 ShardingSphere Proxy 的逻辑数据源操作本地事务, 这将在 ShardingSphere Proxy 的服务端来转化为对分布式事务的操作,不需要考虑额外的 Seata XID。

引入简单场景来继续讨论在使用 ShardingSphere JDBC 的场景下,跨服务调用的事务传播。假设存在以下已知微服务和中间件的 Docker Image 实例。

  1. MySQL 数据库实例 a-mysql,所有 database 均已创建 UNDO_LOG 表和业务表。
  2. MySQL 数据库实例 b-mysql,所有 database 均已创建 UNDO_LOG 表和业务表。
  3. 使用 file 作为配置中心和注册中心的 Seata Server 实例 a-seata-server
  4. 微服务实例 a-service。此微服务创建仅配置数据库实例 a-mysql 的 ShardingSphere JDBC DataSource。 此 ShardingSphere JDBC DataSource 配置使用连接到 Seata Server 实例 a-seata-server 的 Seata AT 集成,其 Seata Application Id 为 service-a, 其 Seata 事务分组为 default_tx_group,其 Virtual Group Mapping 指向的 Seata Transaction Coordinator 集群分组为 default。 此微服务实例 a-service 暴露单个 Restful API 的 GET 端点为 /hello,此 Restful API 端点的业务函数 aMethod 使用了普通的本地事务注解。 若此微服务基于 Spring Boot 2,
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class DemoController {
   @Transactional
   @GetMapping("/hello")
   public String aMethod() {
      // ... 对数据库实例 `a-mysql` 做 UPDATE 操作
      return "Hello World!";
   }
}
  1. 微服务实例 b-service。此微服务创建仅配置数据库实例 b-mysql 的 ShardingSphere JDBC DataSource。 此 ShardingSphere JDBC DataSource 配置使用连接到 Seata Server 实例 a-seata-server 的 Seata AT 集成,其 Seata Application Id 为 service-b, 其 Seata 事务分组为 default_tx_group,其 Virtual Group Mapping 指向的 Seata Transaction Coordinator 集群分组为 default。 此微服务实例 b-service 的业务函数 bMethod 使用普通的本地事务注解,并在 bMethod 通过 HTTP Client 调用微服务实例 a-service/hello Restful API 端点。 若此微服务基于 Spring Boot 2,
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

@Service
public class DemoService {
   @Transactional
   public void bMethod() {
      RestTemplate restTemplate = new RestTemplateBuilder().build();
      restTemplate.getForEntity("http://a-service/hello", String.class);
      // ... 对数据库实例 `b-mysql` 做 UPDATE 操作
   }
}

对于此简单场景,此时存在单个 Seata Server Cluster,其包含单个 Virtual Groupdefault。 此 Virtual Group 包含单个 Seata Server 实例为 a-seata-server

讨论单服务调用的事务传播。当微服务实例 a-service 的业务函数 aMethod 抛出异常,在业务函数内对 MySQL 数据库实例 a-mysql 的更改将被正常回滚。

讨论跨服务调用的事务传播。当微服务实例 b-service 的业务函数 bMethod 抛出异常,在业务函数内对 MySQL 数据库实例 b-mysql 的更改将被正常回滚, 而微服务实例 a-serviceio.seata.core.context.RootContext 未绑定微服务实例 b-service 的业务函数 bMethod 的 Seata XID, 因此在业务函数内对 MySQL 数据库实例 a-mysql 的更改将不会被回滚。

为了实现当微服务实例 b-service 的业务函数 bMethod 抛出异常,在业务函数内对 MySQL 数据库实例 a-mysqlb-mysql 的更改均被正常回滚, 讨论不同场景下的常见处理方案。

  1. 微服务实例 a-serviceb-service 均为基于 Jakarta EE 8 的 Spring Boot 2 微服务。 用户可在微服务实例 b-service 的业务函数 bMethod 使用 org.springframework.web.client.RestTemplate 把 XID 通过服务调用传递到微服务实例 a-service。 可能的改造逻辑如下。
import io.seata.core.context.RootContext;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;

@Service
public class DemoService {
    @Transactional
    public void bMethod() {
        RestTemplate restTemplate = new RestTemplateBuilder().additionalInterceptors((request, body, execution) -> {
                    String xid = RootContext.getXID();
                    if (null != xid) {
                        request.getHeaders().add(RootContext.KEY_XID, xid);
                    }
                    return execution.execute(request, body);
                })
                .build();
        restTemplate.getForEntity("http://a-service/hello", String.class);
        // ... 对数据库实例 `b-mysql` 做 UPDATE 操作
    }
}

此时在微服务实例 a-serviceb-service 均需要添加自定义的 org.springframework.web.servlet.config.annotation.WebMvcConfigurer 实现。

import io.seata.integration.http.TransactionPropagationInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class CustomWebMvcConfigurer implements WebMvcConfigurer {

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new TransactionPropagationInterceptor());
    }
}

此时,当微服务实例 b-service 的业务函数 bMethod 抛出异常,在业务函数内对 MySQL 数据库实例 a-mysqlb-mysql 的更改均被正常回滚。

  1. 微服务实例 a-serviceb-service 均为基于 Jakarta EE 9/10 的 Spring Boot 3 微服务。 用户可在微服务实例 b-service 的业务函数 bMethod 使用 org.springframework.web.client.RestClient 把 XID 通过服务调用传递到微服务实例 a-service。 可能的改造逻辑如下。
import io.seata.core.context.RootContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestClient;

@Service
public class DemoService {
    @Transactional
    public void bMethod() {
        RestClient restClient = RestClient.builder().requestInterceptor((request, body, execution) -> {
                    String xid = RootContext.getXID();
                    if (null != xid) {
                        request.getHeaders().add(RootContext.KEY_XID, xid);
                    }
                    return execution.execute(request, body);
                })
                .build();
        restClient.get().uri("http://a-service/hello").retrieve().body(String.class);
        // ... 对数据库实例 `b-mysql` 做 UPDATE 操作
    }
}

此时在微服务实例 a-serviceb-service 均需要添加自定义的 org.springframework.web.servlet.config.annotation.WebMvcConfigurer 实现。

import io.seata.integration.http.JakartaTransactionPropagationInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class CustomWebMvcConfigurer implements WebMvcConfigurer {

   @Override
   public void addInterceptors(InterceptorRegistry registry) {
      registry.addInterceptor(new JakartaTransactionPropagationInterceptor());
   }
}

此时,当微服务实例 b-service 的业务函数 bMethod 抛出异常,在业务函数内对 MySQL 数据库实例 a-mysqlb-mysql 的更改均被正常回滚。

  1. 微服务实例 a-serviceb-service 均为 Spring Boot 微服务,但使用的 API 网关中间件阻断了所有包含 TX_XID 的 HTTP Header 的 HTTP 请求。 用户需要考虑更改把 XID 通过服务调用传递到微服务实例 a-service 使用的 HTTP Header,或使用 RPC 框架把 XID 通过服务调用传递到微服务实例 a-service。 参考 https://github.com/apache/incubator-seata/tree/v2.0.0/integration

  2. 微服务实例 a-serviceb-service 均为 Quarkus,Micronaut Framework 和 Helidon 等微服务。此情况下无法使用 Spring WebMVC HandlerInterceptor。 可参考如下 Spring Boot 3 的自定义 WebMvcConfigurer 实现,来实现 Filter。

import io.seata.common.util.StringUtils;
import io.seata.core.context.RootContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
public class CustomWebMvcConfigurer implements WebMvcConfigurer {
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new HandlerInterceptor() {
            @Override
            public boolean preHandle(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler) {
                String rpcXid = request.getHeader(RootContext.KEY_XID);
                String xid = RootContext.getXID();
                if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) {
                    RootContext.bind(rpcXid);
                }
                return true;
            }

            @Override
            public void afterCompletion(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler, Exception ex) {
                if (RootContext.inGlobalTransaction()) {
                    String rpcXid = request.getHeader(RootContext.KEY_XID);
                    String xid = RootContext.getXID();
                    if (StringUtils.isNotBlank(xid)) {
                        String unbindXid = RootContext.unbind();
                        if (!StringUtils.equalsIgnoreCase(rpcXid, unbindXid)) {
                            if (StringUtils.isNotBlank(unbindXid)) {
                                RootContext.bind(unbindXid);
                            }
                        }
                    }
                }
            }
        });
    }
}
  1. 微服务实例 a-serviceb-service 均为 Spring Boot 微服务,但使用的组件是 Spring WebFlux 而非 Spring WebMVC。 在反应式编程 API 下 ShardingSphere JDBC 无法处理 R2DBC DataSource,仅可处理 JDBC DataSource。 在使用 WebFlux 组件的 Spring Boot 微服务中应避免创建 ShardingSphere JDBC DataSource。

  2. 微服务实例 a-serviceb-service 使用的 Seata Client 为 org.apache.seata:seata-all, 而非 io.seata:seata-all。 将所有对 io.seata package 的调用更改为 org.apache.seata package 即可。