The quick explanation of ShardingSphere transaction module
Work at JD Finance
Senior Java Development Engineer
- Years of internet development experience, love open source technology, full of interested in distributed storage. And familiar with online and realtime data processing with ElasticSearch, HBase, Presto, Storm and et.
- Be responsible for distributed transaction implementation in Sharding-Sphere team.
Using Scenarios of Distributed Transaction
Everything starts with ACID(atomicity, consistency, isolation, durability). ACID are features of database local transaction as following,
Atomicity guarantees that each transaction is treated as a single “unit”, which either succeeds completely, or fails completely.
Consistency ensures that a transaction can only bring the database from one valid state to another.
Transactions are often executed concurrently, and the effects of an incomplete transaction might not even be visible to other transactions.
Durability guarantees that once a transaction has been committed, it will remain committed even in the case of a system failure (e.g., power outage or crash).
Local transaction of relational database guarantees ACID perfectly. But in distributed scenarios, its performance will be a bottleneck. This post will explain How to make ACID features available or the alternative for distributed database.
CAP and BASE Theorem
For the internet application, traditional all-in-one architecture can’t satisfy the business requirement with increasing of visit and data gradually. the developer of the application needs to split the big application into several independent small applications, And all data in one database will be reorganized into multiple databases and tables.
But how to keep ACID among multi databases will be a hard tech problem, the CAP and BASE are basic theorem when to discuss the problem in distributed system.
The CAP theorem states that it is impossible for a distributed data system to simultaneously provide more than two out of the following three guarantees.
- Partition tolerance
No distributed system is safe from network failures, thus network partitioning generally has to be tolerated. In the presence of a network partition, one has to choose between consistency and availability.
BASE (basically available, soft state, eventually consistent) theorem is the result of trade off Consistency against availability.
For the most of distributed application, it’s enough that data to be eventually consistent state in a reasonable time. If we call transactions that satisfy ACID as hard transactions, then transactions based on BASE are called BASE transactions.
Without regarding system requirements and performance, pursuing strong consistency is not the best solution. The distributed application can benefit from both hard and BASE transaction at the same time. The local service adopt strong consistent transaction, and the service across multi systems adopt eventually consistent transaction. How to trade off performance against consistency of a system is a key challenge for the architect and developer.
Distributed Transaction Method
For implementation of distributed transaction, There are two main available specifications, XA(using 2PC) and BASE transaction.
XA standard describes the interface between transaction manager(TM) and resource manager(RM) by the X/Open CAE Specification for Distributed Transaction Processing。
The XAResource interface is a Java mapping of the industry standard XA interface, And its implementation varies among database vendors。
The implementation details by MySQL can be found in com.mysql.jdbc.jdbc2.optional.MysqlXAConnection class of mysql-connector-java v5.1.30 package.
In XA specification, TM(the application) controls a transaction which involves multiple RM(the database). TM needs a global transaction id(xid) to call XAResource interfaces and coordinate multi local transactions into a global unique distributed transaction.
One Phase Commit：weak XA
Comparing to 2PC, weak XA without prepare phase can benefit for reducing resource blocking and improving concurrency. The implementation only needs to iterate and call commit/callback on each datasource connection in a business thread. Performance of weak XA almost without reducing compares to local transaction. When has network failures, database crash and et., weak XA will lead inconsistent data and can’t be rollback. Weak XA transaction implementation doesn’t need extra efforts, so Sharding-Sphere supports it by default.
Two Phase Commit：2PC
Two phase commit is a standard implementation of XA. The distributed transaction commit consists of two phases: prepare and commit/rollback.
All local transactions will use default isolation level to lock resources, record redo and undo logs when XA global transaction enabled. In the prepare stage, TM initiates a prepare request to each RM and waits for response. and then to execute a commit operation if all RMs send yes or decides to execute a rollback if an TM sends no. if all TM said yes, but some of nodes crash on commit stage, The XA recover will be used to commit compensation when the node recovery, and make sure that data is consistent.
On 2PC model, the prepare stage needs to wait all involved RMs response and that may lead resource locked for a long time, so it’s not suitable for high concurrency and time-consumed sub transaction scenario.
Sharding-Sphere supports XA-based strong consistent transaction solution, third-party XA implementations, such as Atomikos and Narayana, can be inject into Sharding-Sphere by SPI.
BASE transaction is a compromise to XA, don’t need data to be strong consistent any more which benefits for reducing lock time of resource. There are a variety of BASE transaction implementations, various strategies can be used to trade off.
One Phase Commit with Compensation ：Best Effort Delivery(BED)
BED is a compensate strategy for weak XA. Using transaction table records all transactional operation SQL, the record will be deleted if sub transaction success to commit or retry until max configured retries. Trying it best to make data consistent that’s why we call it BED. In different business scenes, in order to balance consistency and availability, using synchronous or asynchronous retry.
The advantage of the strategy is free of resource lock, low performance loss. But the transaction can’t be rollback if still failed until reaches max reties, so it’s only useful for the transaction that will be successful one hundred percentage. For the sake of improving performance, BED sacrifices rollback function of transaction.
TCC model gives the control of lock to business level, and all sub transactions business must implement Try-Confirm/Cancel interface.
Try to execute business;
Finish all business check(Consistency);
Reserve required resource for business(Isolation);
Confirm execute business;
Execute the business logic without business check;
Only use reserved resource in the Try phase;
Confirm operation is idempotent;
Cancel execute business;
Release reserved resource in the Try phase;
Cancel operation is idempotent;
Different from XA prepare stage, the three stages will execute as the local transaction way. TCC don’t need to lock all the resource during voting, and improves system concurrency.
Using TCC model, in case of account A transfers 100 dollars to account B, The following figure shows the transformation of the business to support TCC, and the detail analysis of business changes as following:
Remittance and collection service need to implement Try-Confirm-Cancel interface, and injects it into TCC transaction manager in the business initiation stage.
Check effectiveness of account A, namely, check status of account A whether in “transferring” or “Frozen”;
Check whether account A has enough money;
Deduct 100 dollars from account A and update the status to “Transferring”
Reserve the deduction resource, store the event that transfer 100 dollars from account A to B into the message or log.
Add 100 dollars into account A;
Release the deduction resource from messages or logs;
Check effectiveness of account B;
Read log or message, add 100 dollars into account B; Release the deduction resource from messages or logs;
It can be seen that TCC model is intrusive for the business, and hard to business transformation.
Message-Driven Transaction Model
The message-based consistent solution depends on message middleware to make sure upstream and downstream applications keep the data consistent. The basic idea is to put the local operation and sending message into a local transaction, the downstream application consumes the message and executes the corresponding operation. It essentially relies on retry mechanism of message middleware to achieve eventually consistent. The disadvantage of message-driven is highly couple with message queue, which may increase the complexity of business system.
Saga derived from a paper published in 1987 by Hector & Kenneth.
Saga work principle
The saga model splits a distributed transaction into multiple local transactions(like Confirm and Cancel on TCC). Each local transactions has its own execution module and compensation module. When any of local transaction fails, the corresponding compensation method is called to restore the executed transaction, achieving the eventual consistency of the transactions.
Once compensating transactions (C1, C2, …, Cn-1) are defined for Saga (T1, T2, …, Tn). Then, the saga system can make the following guarantee: Either the sequence T1, T2, …, Tn(which is the preferable one) or T1, T2, …,Tx, Cx, …, C2, C1 will be executed.
For the lack of prepare stage on Saga model, transactions can’t keep isolation to each other. So loss of update, dirty read and et problem will happen when a resource is operated concurrently by multi transactions. The problem can be settled by concurrency control in business level, such as lock or pre-allocation resource.
The saga model supports both forward recovery and reverse recovery. Backward recovery compensates all the completed transaction when any transaction fails. In contrast, Forward recovery attempts to retry the transaction that currently fails. It’s based on the hypothesis that all transactions will be successful finally.
Obviously, Forward recovery don’t need compensation. If the transaction in the business will eventually succeed or backward compensating transaction is hard to define or impossible, forward recovery is more suitable for you. However, server crash, network and even datacenter power failure can happen in the real world. Recovery mechanism for failure are also needed, such as interference by people.
All in word, TCC and MQ are both depend on business transformation, but XA, BED and SAGA only involve database. So latter is more preferable solution, which is less invasive for business, and low transformation cost.
Distributed Transaction of Sharding-Sphere
ShardingSphere is an open-source ecosystem consisting of a set of distributed database middleware solutions, including 3 independent products, JDBC, Proxy & Sidecar. They all provide functions of data scale out, distributed transaction and distributed governance, applicable in a variety of situations such as Java isomorphism, heterogeneous language container and cloud native.
Both XA and BASE transaction are supported by Sharding-Sphere, and allows to select different transaction type for per request. Distributed transactions are completely transparent to business operations, which greatly reduces the cost of introducing distributed transactions.
Sharding-Sphere TM integrated XA and BASE transaction model:
- For XA transaction, using SPI makes weak XA, Atomikos, Narayana are mutually exclusive.
The following section will explain how to decouple transaction from main process by event-driven:
As can be seen from above figure, Sharding-core will produce and dispatch various events according SQL, and the thread of transaction listener will invoke corresponding transaction processor to handle when event arrives.
Transaction Implementation of Sharding-Sphere
Sharding-Proxy is a netty based database middle layer proxy, it implements MySQL protocol, and could be regarded as a database with built-in data sharding ability. Sharding-Proxy has implemented XA transaction based on Atomikos. For ensuring all sub transaction in the same thread, the changes of proxy thread model can be seen from the following figure.
When transaction enabled, SQL engine will use Channel-Thread pattern on Proxy backend, and lifecycle of channel and transaction thread keeps same. The process procedure of transaction are totally decoupled with Proxy, namely, the transaction event produced by Proxy will consumed by Sharding-Sphere TM.
Pressure test results show that insert and update performance of XA transaction is linear with amount of databases, and performance of query don’t have obvious change. so XA could be used in scenario with low concurrency and databases are not more than 10 in a transaction.
Principle Analysis of Atomikos Transaction Manager
Transaction manager of Atomikos can imbeded into business process, a XA transaction will be created and binds with current thread when application calls TransactionManager.begin method. Connection of dataSource was proxied by Atomikos, so JDBC manipulation and transaction info of the connection will be intercepted by Atomikos.
XAResource.start method will be executed when createStatement; the close method will call XAResource.end that makes XA transaction to be idle state; Calling prepare and commit in turn for two-phase commit when commit or rollback.
Saga Transaction Implementation of Sharding-Sphere
Cooperate with Apache Service Comb, Sharding-Sphere will use Saga engine of Service Comb to implement the distributed transaction.
Apache Service Comb is an open source micro-service framework initiated by HuaWei, the distributed transaction framework of micro-service can be divided into centralized and distributed coordinators. Sharding-Sphere will integrate centralized coordinator of Saga, and support distributed transaction for services in same thread in the future.
Centralized Transaction Coordinator of Service Comb
Centralized transaction coordinator including receiving, analysis, execution and result query for Saga request. Task agent module need to know predefined invoke relation graph of Sega, and execution module will generate invoke task according to invocation graph, and invoke micro-service interface. the compensation method will be invoke when service happens failure.
Saga execution module builds an invocation graph by analyzing request JSON data. Sequentially or concurrently invocation of sub transaction of Saga was described by JSON in Sharding-Sphere. Invocation relation graph was transferred into multiple execution tasks by execution module of Saga, the execution task consumer will generate corresponding invocation, both sequential and concurrent invocation are supported. Saga task will record key transaction event into logs according execution state, which can be viewed by event query engine.
Imbeded Saga Transaction Manager of Sharding-Sphere
Saga provides distributed transaction service governance in form of jar.
For Sharding-Sphere, the procedure of confirm and cancel represent normal and backward execution SQL of sub-transaction. Automation of reverse SQL generation is in the plan. After enabling Saga BASE transaction, routed physical datasource will enable transaction auto commit, every confirm and cancel will be submitted directly.
Inside of Sharding-Sphere, multi Saga transaction events will be generate when SQL engine was triggered, and confirm and cancel of current sub transaction will be registered into queue of Saga transaction manager; Saga transaction manager will execute confirm or cancel procedure according sub transaction execution result when commit or rollback was triggered.
Plans in the future
Evolution of Sharding-Sphere-TM framework will following the introduction on above, and main parts are as follow:
- Weak XA transaction (released)
- Atomikos based XA transaction(to be released)
- BED BASE transaction(released)
- SAGA(working on)
- TCC(in planing)
If the previous sharing is too lengthy, then a thousand words are gathered into a table, welcome to read.
In the future, feature will be optimized continuously, more new features such as BASE transactions and data governance that everyone concerns about will be launched one after another. Any thoughts, advices, suggestions, comments are appreciated, and welcome to join the open source community of Sharding-Sphere.
Q1: Does XA based transaction can be used int micro-service architecture ?
A1: Currently, transaction manager has been imbedded into JVM process, it’s ok to use for low concurrency, shot transaction.
Q2: For the order of transaction frameworks development, why ?
A2: Based on the degree of difficulty, so we put TCC at the end.
Q3: Does Sharding-Sphere support multi-language ? such as Golang?
A3: For non java application, Sharding-Proxy are available.
Q4: In this time, is Proxy implements distributed transaction ? But I remember Sharding-JDBC has implemented it ago.
A4: Transaction implementation of SS including Sharding-JDBC and Proxy, SJ has implemented weak XA and BED, SAGA and TCC will be introduced in the future.
Q5: If I only want to use transaction module of SS, is it possible?
A5: Architecture of SS is event-driven, transaction module will only responsible for transaction in the future.
Q6: SAGA doesn’t support I of ACID, how do you think of this?
A6: Now, Isolation is unsupported, and we have plan to support it. Actually all of BASE transaction don’t support I, Try stage of TCC can be regarded as isolation. Control of concurrency on business level can avoid dirty read.
Q7: In version 3.0, does module of transaction works independently ?
A7: In version 3.0, transaction module depends on Sharding-JDBC module, and executes transaction when received the event of Sharding-JDBC and Proxy. If you want to use the transaction module independently, you need to handle the event according to the definition of core module.