View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.backend.handler.transaction;
19  
20  import lombok.RequiredArgsConstructor;
21  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
22  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
23  import org.apache.shardingsphere.infra.hint.HintValueContext;
24  import org.apache.shardingsphere.infra.session.query.QueryContext;
25  import org.apache.shardingsphere.proxy.backend.connector.DatabaseConnector;
26  import org.apache.shardingsphere.proxy.backend.connector.DatabaseConnectorFactory;
27  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
28  import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
29  import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
30  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
31  import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
32  import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XABeginStatement;
33  import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XACommitStatement;
34  import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XARecoveryStatement;
35  import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XARollbackStatement;
36  import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.xa.XAStatement;
37  import org.apache.shardingsphere.transaction.api.TransactionType;
38  import org.apache.shardingsphere.transaction.xa.jta.exception.XATransactionNestedBeginException;
39  
40  import java.sql.SQLException;
41  import java.util.Collections;
42  
43  /**
44   * XA transaction handler.
45   */
46  // TODO Currently XA transaction started with `XA START` doesn't support for database with multiple datasource, a flag should be added for this both in init progress and add datasource from DistSQL.
47  @RequiredArgsConstructor
48  public final class TransactionXAHandler implements ProxyBackendHandler {
49      
50      private final XAStatement xaStatement;
51      
52      private final ConnectionSession connectionSession;
53      
54      private final DatabaseConnector backendHandler;
55      
56      public TransactionXAHandler(final SQLStatementContext sqlStatementContext, final String sql, final ConnectionSession connectionSession) {
57          xaStatement = (XAStatement) sqlStatementContext.getSqlStatement();
58          this.connectionSession = connectionSession;
59          backendHandler = DatabaseConnectorFactory.getInstance().newInstance(
60                  new QueryContext(sqlStatementContext, sql, Collections.emptyList(), new HintValueContext()), connectionSession.getDatabaseConnectionManager(), false);
61      }
62      
63      @Override
64      public boolean next() throws SQLException {
65          return xaStatement instanceof XARecoveryStatement && backendHandler.next();
66      }
67      
68      @Override
69      public QueryResponseRow getRowData() throws SQLException {
70          return xaStatement instanceof XARecoveryStatement ? backendHandler.getRowData() : new QueryResponseRow(Collections.emptyList());
71      }
72      
73      @Override
74      public ResponseHeader execute() throws SQLException {
75          if (xaStatement instanceof XABeginStatement) {
76              return begin();
77          }
78          if (xaStatement instanceof XACommitStatement || xaStatement instanceof XARollbackStatement) {
79              return finish();
80          }
81          return backendHandler.execute();
82      }
83      
84      /*
85       * We have to let session occupy the thread when doing xa transaction. According to https://dev.mysql.com/doc/refman/5.7/en/xa-states.html XA and local transactions are mutually exclusive.
86       */
87      private ResponseHeader begin() throws SQLException {
88          ShardingSpherePreconditions.checkState(!connectionSession.getTransactionStatus().isInTransaction(), XATransactionNestedBeginException::new);
89          ResponseHeader result = backendHandler.execute();
90          TransactionType transactionType = TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext());
91          connectionSession.getConnectionContext().getTransactionContext().beginTransaction(String.valueOf(transactionType));
92          return result;
93      }
94      
95      private ResponseHeader finish() throws SQLException {
96          try {
97              return backendHandler.execute();
98          } finally {
99              connectionSession.getConnectionContext().clearTransactionContext();
100             connectionSession.getConnectionContext().clearCursorContext();
101         }
102     }
103 }