Apache ShardingSphere 提供 BASE 事务,集成了 Seata 的实现。本文所指 Seata 集成均指向 Seata AT 模式。
ShardingSphere 的 Seata 集成仅在 apache/incubator-seata:v2.1.0
或更高版本可用。
对于 org.apache.seata:seata-all
Maven 模块对应的 Seata Client,此限制同时作用于 HotSpot VM 和 GraalVM Native Image。
引入 Maven 依赖,并排除 org.apache.seata:seata-all
中过时的 org.antlr:antlr4-runtime:4.8
的 Maven 依赖。
<project>
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-base-seata-at</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-all</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
受 Calcite 的影响,ShardingSphere JDBC 使用的 commons-lang:commons-lang
和 org.apache.commons:commons-pool2
与 Seata Client 存在依赖冲突,
需用户根据实际情景考虑是否需要解决依赖冲突。
使用 ShardingSphere 的 Seata 集成模块时,ShardingSphere 连接的数据库实例应同时实现 ShardingSphere 的方言解析支持与 Seata AT 模式的方言解析支持。
这类数据库包括但不限于 mysql
,gvenzl/oracle-free
,gvenzl/oracle-xe
,postgres
,mcr.microsoft.com/mssql/server
等 Docker Image。
按照如下任一链接的步骤,下载并启动 Seata 服务器。
合理的启动方式应通过 Docker Hub 中的 apache/seata-server
的 Docker Image 来实例化 Seata 服务器。
在每一个 ShardingSphere 涉及的真实数据库实例中创建 undo_log
表。
SQL 的内容以 https://github.com/apache/incubator-seata/tree/v2.1.0/script/client/at/db 内对应的数据库为准。
以下内容以 MySQL 为例。
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
ALTER TABLE `undo_log` ADD INDEX `ix_log_created` (`log_created`);
在自有项目的 ShardingSphere 的 YAML 配置文件写入如下内容,参考 分布式事务。 若初始化 ShardingSphere JDBC DataSource 时使用的是 Java API,参考 分布式事务。
transaction:
defaultType: BASE
providerType: Seata
在 classpath 的根目录中增加 seata.conf
文件,
配置文件格式参考 org.apache.seata.config.FileConfiguration
的 JavaDoc。
seata.conf
存在四个属性,
shardingsphere.transaction.seata.at.enable
,当此值为true
时,开启 ShardingSphere 的 Seata AT 集成。存在默认值为 true
shardingsphere.transaction.seata.tx.timeout
,全局事务超时(秒)。存在默认值为 60
client.application.id
,应用唯一主键,用于设置 Seata Transaction Manager Client 和 Seata Resource Manager Client 的 applicationId
client.transaction.service.group
,所属事务组, 用于设置 Seata Transaction Manager Client 和 Seata Resource Manager Client 的 transactionServiceGroup
。
存在默认值为 default
一个完全配置的 seata.conf
如下,
shardingsphere.transaction.seata.at.enable = true
shardingsphere.transaction.seata.tx.timeout = 60
client {
application.id = example
transaction.service.group = default_tx_group
}
一个最小配置的 seata.conf
如下。
由 ShardingSphere 管理的 seata.conf
中, client.transaction.service.group
的默认值设置为 default
是出于历史原因。
假设用户使用的 Seata Server 和 Seata Client 的 registry.conf
中,registry.type
和 config.type
均为 file
,
则对于 registry.conf
的 config.file.name
配置的 .conf
文件中,事务分组名在 apache/incubator-seata:v1.5.1
及之后默认值为 default_tx_group
,
在 apache/incubator-seata:v1.5.1
之前则为 my_test_tx_group
。
client.application.id = example
根据实际场景修改 Seata 的 registry.conf
文件。
ShardingSphere 的 Seata 集成不支持隔离级别。
ShardingSphere 的 Seata 集成将获取到的 Seata 全局事务置入线程的局部变量。
而 org.apache.seata.spring.annotation.GlobalTransactionScanner
则是采用 Dynamic Proxy 的方式对方法进行增强。
这意味着用户在使用 ShardingSphere 的 Seata 集成时,用户应避免使用 org.apache.seata:seata-all
的 Java API,
除非用户正在混合使用 ShardingSphere 的 Seata 集成与 Seata Client 的 TCC 模式特性。
针对 ShardingSphere 数据源,讨论 6 种情况,
手动获取从 ShardingSphere 数据源创建的 java.sql.Connection
实例,并手动调用 setAutoCommit()
, commit()
和 rollback()
方法,
这是被允许的。
在函数上使用 Jakarta EE 8 的 javax.transaction.Transactional
注解,这是被允许的。
在函数上使用 Jakarta EE 9/10 的 jakarta.transaction.Transactional
注解,这是被允许的。
在函数上使用 Spring Framework 的 org.springframework.transaction.annotation.Transactional
注解,这是被允许的。
在函数上使用 org.apache.seata.spring.annotation.GlobalTransactional
注解,这是不被允许的。
手动从 org.apache.seata.tm.api.GlobalTransactionContext
创建 org.apache.seata.tm.api.GlobalTransaction
实例,
调用 org.apache.seata.tm.api.GlobalTransaction
实例的 begin()
, commit()
和 rollback()
方法,这是不被允许的。
在使用 Spring Boot 的实际情景中,
com.alibaba.cloud:spring-cloud-starter-alibaba-seata
和 org.apache.seata:seata-spring-boot-starter
常常被其他 Maven 依赖传递引入。
为了避开事务冲突,用户需要在 Spring Boot 的配置文件中将 seata.enable-auto-data-source-proxy
的属性置为 false
。一个可能的依赖关系如下。
<project>
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-transaction-base-seata-at</artifactId>
<version>${shardingsphere.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
classpath 下对应的 application.yml
需要包含以下配置。
在此情况下,在 Spring Boot 的 application.yaml
内定义 Seata 的 registry.conf
的等价配置依然有效。
当下游项目使用 org.apache.shardingsphere:shardingsphere-transaction-base-seata-at
的 Maven 模块时,总是被鼓励使用 registry.conf
配置 Seata Client。
seata:
enable-auto-data-source-proxy: false
对于设置开启 ShardingSphere 的 Seata 集成的情况下,
在与 ShardingSphere JDBC DataSource 无关的业务函数中,如需在业务函数使用 Seata Client 的 Seata TCC 模式相关的特性,
可实例化一个未代理的普通 TCC 接口实现类, 然后使用 org.apache.seata.integration.tx.api.util.ProxyUtil
创建一个代理的TCC接口类,
并调用 TCC 接口实现类 Try
,Confirm
,Cancel
三个阶段对应的函数。
对于由 Seata TCC 模式而引入的 org.apache.seata.spring.annotation.GlobalTransactional
注解或 Seata TCC 模式涉及的业务函数中需要与数据库实例交互,
此注解标记的业务函数内不应使用 ShardingSphere JDBC DataSource,
而是应该手动创建javax.sql.DataSource
实例,或从自定义的 Spring Bean 中获取 javax.sql.DataSource
实例。
跨服务调用场景下的事务传播,并不像单个微服务内的事务操作一样开箱即用。
对于 Seata Server,跨服务调用场景下的事务传播,要把 XID 通过服务调用传递到服务提供方,并绑定到 org.apache.seata.core.context.RootContext
中去。
参考 https://seata.apache.org/docs/user/api/ 。这需要讨论两种情况,
在使用 ShardingSphere JDBC 的场景下,跨多个微服务的事务场景需要考虑在起点微服务的上下文使用 org.apache.seata.core.context.RootContext.getXID()
获取 Seata XID 后,
通过 HTTP 或 RPC 等手段传递给终点微服务,并在终点微服务的 Filter 或 Spring WebMVC HandlerInterceptor 中处理。
Spring WebMVC HandlerInterceptor 仅适用于 Spring Boot 微服务,对 Quarkus,Micronaut Framework 和 Helidon 无效。
在使用 ShardingSphere Proxy 的场景下,多个微服务均对着 ShardingSphere Proxy 的逻辑数据源操作本地事务, 这将在 ShardingSphere Proxy 的服务端来转化为对分布式事务的操作,不需要考虑额外的 Seata XID。
引入简单场景来继续讨论在使用 ShardingSphere JDBC 的场景下,跨服务调用的事务传播。假设存在以下已知微服务和中间件的 Docker Image 实例。
a-mysql
,所有 database 均已创建 UNDO_LOG
表和业务表。b-mysql
,所有 database 均已创建 UNDO_LOG
表和业务表。file
作为配置中心和注册中心的 Seata Server 实例 a-seata-server
。a-service
。此微服务创建仅配置数据库实例 a-mysql
的 ShardingSphere JDBC DataSource。
此 ShardingSphere JDBC DataSource 配置使用连接到 Seata Server 实例 a-seata-server
的 Seata AT 集成,其 Seata Application Id 为 service-a
,
其 Seata 事务分组为 default_tx_group
,其 Virtual Group Mapping
指向的 Seata Transaction Coordinator 集群分组为 default
。
此微服务实例 a-service
暴露单个 Restful API 的 GET 端点为 /hello
,此 Restful API 端点的业务函数 aMethod
使用了普通的本地事务注解。
若此微服务基于 Spring Boot 2,import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DemoController {
@Transactional
@GetMapping("/hello")
public String aMethod() {
// ... 对数据库实例 `a-mysql` 做 UPDATE 操作
return "Hello World!";
}
}
b-service
。此微服务创建仅配置数据库实例 b-mysql
的 ShardingSphere JDBC DataSource。
此 ShardingSphere JDBC DataSource 配置使用连接到 Seata Server 实例 a-seata-server
的 Seata AT 集成,其 Seata Application Id 为 service-b
,
其 Seata 事务分组为 default_tx_group
,其 Virtual Group Mapping
指向的 Seata Transaction Coordinator 集群分组为 default
。
此微服务实例 b-service
的业务函数 bMethod
使用普通的本地事务注解,并在 bMethod
通过 HTTP Client 调用微服务实例 a-service
的 /hello
Restful API 端点。
若此微服务基于 Spring Boot 2,import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
@Service
public class DemoService {
@Transactional
public void bMethod() {
RestTemplate restTemplate = new RestTemplateBuilder().build();
restTemplate.getForEntity("http://a-service/hello", String.class);
// ... 对数据库实例 `b-mysql` 做 UPDATE 操作
}
}
对于此简单场景,此时存在单个 Seata Server Cluster,其包含单个 Virtual Group
为 default
。 此 Virtual Group
包含单个 Seata Server 实例为 a-seata-server
。
讨论单服务调用的事务传播。当微服务实例 a-service
的业务函数 aMethod
抛出异常,在业务函数内对 MySQL 数据库实例 a-mysql
的更改将被正常回滚。
讨论跨服务调用的事务传播。当微服务实例 b-service
的业务函数 bMethod
抛出异常,在业务函数内对 MySQL 数据库实例 b-mysql
的更改将被正常回滚,
而微服务实例 a-service
的 org.apache.seata.core.context.RootContext
未绑定微服务实例 b-service
的业务函数 bMethod
的 Seata XID,
因此在业务函数内对 MySQL 数据库实例 a-mysql
的更改将不会被回滚。
为了实现当微服务实例 b-service
的业务函数 bMethod
抛出异常,在业务函数内对 MySQL 数据库实例 a-mysql
和 b-mysql
的更改均被正常回滚,
讨论不同场景下的常见处理方案。
a-service
和 b-service
均为基于 Jakarta EE 8 的 Spring Boot 2 微服务。
用户可在微服务实例 b-service
的业务函数 bMethod
使用 org.springframework.web.client.RestTemplate
把 XID 通过服务调用传递到微服务实例 a-service
。
可能的改造逻辑如下。import org.apache.seata.core.context.RootContext;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestTemplate;
@Service
public class DemoService {
@Transactional
public void bMethod() {
RestTemplate restTemplate = new RestTemplateBuilder().additionalInterceptors((request, body, execution) -> {
String xid = RootContext.getXID();
if (null != xid) {
request.getHeaders().add(RootContext.KEY_XID, xid);
}
return execution.execute(request, body);
})
.build();
restTemplate.getForEntity("http://a-service/hello", String.class);
// ... 对数据库实例 `b-mysql` 做 UPDATE 操作
}
}
此时在微服务实例 a-service
和 b-service
均需要添加自定义的 org.springframework.web.servlet.config.annotation.WebMvcConfigurer
实现。
import org.apache.seata.integration.http.TransactionPropagationInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class CustomWebMvcConfigurer implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new TransactionPropagationInterceptor());
}
}
此时,当微服务实例 b-service
的业务函数 bMethod
抛出异常,在业务函数内对 MySQL 数据库实例 a-mysql
和 b-mysql
的更改均被正常回滚。
a-service
和 b-service
均为基于 Jakarta EE 9/10 的 Spring Boot 3 微服务。
用户可在微服务实例 b-service
的业务函数 bMethod
使用 org.springframework.web.client.RestClient
把 XID 通过服务调用传递到微服务实例 a-service
。
可能的改造逻辑如下。import org.apache.seata.core.context.RootContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.client.RestClient;
@Service
public class DemoService {
@Transactional
public void bMethod() {
RestClient restClient = RestClient.builder().requestInterceptor((request, body, execution) -> {
String xid = RootContext.getXID();
if (null != xid) {
request.getHeaders().add(RootContext.KEY_XID, xid);
}
return execution.execute(request, body);
})
.build();
restClient.get().uri("http://a-service/hello").retrieve().body(String.class);
// ... 对数据库实例 `b-mysql` 做 UPDATE 操作
}
}
此时在微服务实例 a-service
和 b-service
均需要添加自定义的 org.springframework.web.servlet.config.annotation.WebMvcConfigurer
实现。
import org.apache.seata.integration.http.JakartaTransactionPropagationInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class CustomWebMvcConfigurer implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new JakartaTransactionPropagationInterceptor());
}
}
此时,当微服务实例 b-service
的业务函数 bMethod
抛出异常,在业务函数内对 MySQL 数据库实例 a-mysql
和 b-mysql
的更改均被正常回滚。
微服务实例 a-service
和 b-service
均为 Spring Boot 微服务,但使用的 API 网关中间件阻断了所有包含 TX_XID
的 HTTP Header 的 HTTP 请求。
用户需要考虑更改把 XID 通过服务调用传递到微服务实例 a-service
使用的 HTTP Header,或使用 RPC 框架把 XID 通过服务调用传递到微服务实例 a-service
。
参考 https://github.com/apache/incubator-seata/tree/v2.1.0/integration 。
微服务实例 a-service
和 b-service
均为 Quarkus,Micronaut Framework 和 Helidon 等微服务。此情况下无法使用 Spring WebMVC HandlerInterceptor。
可参考如下 Spring Boot 3 的自定义 WebMvcConfigurer 实现,来实现 Filter。
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.context.RootContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.NonNull;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class CustomWebMvcConfigurer implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new HandlerInterceptor() {
@Override
public boolean preHandle(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
String xid = RootContext.getXID();
if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) {
RootContext.bind(rpcXid);
}
return true;
}
@Override
public void afterCompletion(@NonNull HttpServletRequest request, @NonNull HttpServletResponse response, @NonNull Object handler, Exception ex) {
if (RootContext.inGlobalTransaction()) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
String xid = RootContext.getXID();
if (StringUtils.isNotBlank(xid)) {
String unbindXid = RootContext.unbind();
if (!StringUtils.equalsIgnoreCase(rpcXid, unbindXid)) {
if (StringUtils.isNotBlank(unbindXid)) {
RootContext.bind(unbindXid);
}
}
}
}
}
});
}
}
a-service
和 b-service
均为 Spring Boot 微服务,但使用的组件是 Spring WebFlux 而非 Spring WebMVC。
在反应式编程 API 下 ShardingSphere JDBC 无法处理 R2DBC DataSource,仅可处理 JDBC DataSource。
在使用 WebFlux 组件的 Spring Boot 微服务中应避免创建 ShardingSphere JDBC DataSource。