View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.driver.jdbc.adapter;
19  
20  import lombok.AccessLevel;
21  import lombok.Getter;
22  import org.apache.shardingsphere.driver.executor.DriverExecutor;
23  import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
24  import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
25  import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
26  import org.apache.shardingsphere.driver.jdbc.unsupported.AbstractUnsupportedOperationStatement;
27  import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
28  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
29  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
30  import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
31  import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
32  import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.DMLStatement;
33  import org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
34  import org.apache.shardingsphere.transaction.api.TransactionType;
35  import org.apache.shardingsphere.transaction.rule.TransactionRule;
36  
37  import java.sql.SQLException;
38  import java.sql.SQLWarning;
39  import java.sql.Statement;
40  import java.util.Collection;
41  
42  /**
43   * Adapter for {@code Statement}.
44   */
45  @Getter
46  public abstract class AbstractStatementAdapter extends AbstractUnsupportedOperationStatement {
47      
48      @Getter(AccessLevel.NONE)
49      private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new ForceExecuteTemplate<>();
50      
51      private boolean poolable;
52      
53      private int fetchSize;
54      
55      private int fetchDirection;
56      
57      private boolean closed;
58      
59      protected final boolean isNeedImplicitCommitTransaction(final ShardingSphereConnection connection, final SQLStatement sqlStatement, final boolean multiExecutionUnits) {
60          if (!connection.getAutoCommit()) {
61              return false;
62          }
63          TransactionType transactionType = connection.getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType();
64          boolean isInTransaction = connection.getDatabaseConnectionManager().getConnectionContext().getTransactionContext().isInTransaction();
65          if (!TransactionType.isDistributedTransaction(transactionType) || isInTransaction) {
66              return false;
67          }
68          return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
69      }
70      
71      private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
72          return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement);
73      }
74      
75      protected final void handleExceptionInTransaction(final ShardingSphereConnection connection, final MetaDataContexts metaDataContexts) {
76          if (connection.getDatabaseConnectionManager().getConnectionTransaction().isInTransaction()) {
77              DatabaseType databaseType = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType();
78              DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData();
79              if (dialectDatabaseMetaData.getDefaultSchema().isPresent()) {
80                  connection.getDatabaseConnectionManager().getConnectionContext().getTransactionContext().setExceptionOccur(true);
81              }
82          }
83      }
84      
85      protected abstract boolean isAccumulate();
86      
87      protected abstract Collection<? extends Statement> getRoutedStatements();
88      
89      protected abstract DriverExecutor getExecutor();
90      
91      protected abstract StatementManager getStatementManager();
92      
93      @SuppressWarnings({"unchecked", "rawtypes"})
94      @Override
95      public final void setPoolable(final boolean poolable) throws SQLException {
96          this.poolable = poolable;
97          getMethodInvocationRecorder().record("setPoolable", statement -> statement.setPoolable(poolable));
98          forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setPoolable(poolable));
99      }
100     
101     @SuppressWarnings({"unchecked", "rawtypes"})
102     @Override
103     public final void setFetchSize(final int rows) throws SQLException {
104         fetchSize = rows;
105         getMethodInvocationRecorder().record("setFetchSize", statement -> statement.setFetchSize(rows));
106         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setFetchSize(rows));
107     }
108     
109     @SuppressWarnings({"unchecked", "rawtypes"})
110     @Override
111     public final void setFetchDirection(final int direction) throws SQLException {
112         fetchDirection = direction;
113         getMethodInvocationRecorder().record("setFetchDirection", statement -> statement.setFetchDirection(direction));
114         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setFetchDirection(direction));
115     }
116     
117     @Override
118     public final int getMaxFieldSize() throws SQLException {
119         return getRoutedStatements().isEmpty() ? 0 : getRoutedStatements().iterator().next().getMaxFieldSize();
120     }
121     
122     @SuppressWarnings({"unchecked", "rawtypes"})
123     @Override
124     public final void setMaxFieldSize(final int max) throws SQLException {
125         getMethodInvocationRecorder().record("setMaxFieldSize", statement -> statement.setMaxFieldSize(max));
126         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setMaxFieldSize(max));
127     }
128     
129     // TODO Confirm MaxRows for multiple databases is need special handle. eg: 10 statements maybe MaxRows / 10
130     @Override
131     public final int getMaxRows() throws SQLException {
132         return getRoutedStatements().isEmpty() ? -1 : getRoutedStatements().iterator().next().getMaxRows();
133     }
134     
135     @SuppressWarnings({"unchecked", "rawtypes"})
136     @Override
137     public final void setMaxRows(final int max) throws SQLException {
138         getMethodInvocationRecorder().record("setMaxRows", statement -> statement.setMaxRows(max));
139         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setMaxRows(max));
140     }
141     
142     @Override
143     public final int getQueryTimeout() throws SQLException {
144         return getRoutedStatements().isEmpty() ? 0 : getRoutedStatements().iterator().next().getQueryTimeout();
145     }
146     
147     @SuppressWarnings({"unchecked", "rawtypes"})
148     @Override
149     public final void setQueryTimeout(final int seconds) throws SQLException {
150         getMethodInvocationRecorder().record("setQueryTimeout", statement -> statement.setQueryTimeout(seconds));
151         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setQueryTimeout(seconds));
152     }
153     
154     @SuppressWarnings({"unchecked", "rawtypes"})
155     @Override
156     public final void setEscapeProcessing(final boolean enable) throws SQLException {
157         getMethodInvocationRecorder().record("setEscapeProcessing", statement -> statement.setEscapeProcessing(enable));
158         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setEscapeProcessing(enable));
159     }
160     
161     @Override
162     public final int getUpdateCount() throws SQLException {
163         if (isAccumulate()) {
164             return accumulate();
165         }
166         Collection<? extends Statement> statements = getRoutedStatements();
167         if (statements.isEmpty()) {
168             return -1;
169         }
170         return getRoutedStatements().iterator().next().getUpdateCount();
171     }
172     
173     private int accumulate() throws SQLException {
174         long result = 0;
175         boolean hasResult = false;
176         for (Statement each : getRoutedStatements()) {
177             int updateCount = each.getUpdateCount();
178             if (updateCount > -1) {
179                 hasResult = true;
180             }
181             result += updateCount;
182         }
183         if (result > Integer.MAX_VALUE) {
184             result = Integer.MAX_VALUE;
185         }
186         return hasResult ? (int) result : -1;
187     }
188     
189     @Override
190     public final boolean getMoreResults() throws SQLException {
191         boolean result = false;
192         for (Statement each : getRoutedStatements()) {
193             result = each.getMoreResults();
194         }
195         return result;
196     }
197     
198     @Override
199     public final boolean getMoreResults(final int current) {
200         return false;
201     }
202     
203     @Override
204     public final SQLWarning getWarnings() {
205         return null;
206     }
207     
208     @Override
209     public final void clearWarnings() {
210     }
211     
212     @SuppressWarnings({"unchecked", "rawtypes"})
213     @Override
214     public final void cancel() throws SQLException {
215         forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::cancel);
216     }
217     
218     @SuppressWarnings({"unchecked", "rawtypes"})
219     @Override
220     public final void close() throws SQLException {
221         closed = true;
222         try {
223             forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::close);
224             if (null != getExecutor()) {
225                 getExecutor().close();
226             }
227             if (null != getStatementManager()) {
228                 getStatementManager().close();
229             }
230         } finally {
231             getRoutedStatements().clear();
232         }
233     }
234 }