View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.driver.jdbc.adapter;
19  
20  import lombok.AccessLevel;
21  import lombok.Getter;
22  import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
23  import org.apache.shardingsphere.driver.jdbc.core.connection.ShardingSphereConnection;
24  import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
25  import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
26  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
28  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
29  import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
30  import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
31  import org.apache.shardingsphere.transaction.util.AutoCommitUtils;
32  
33  import java.sql.Connection;
34  import java.sql.SQLException;
35  import java.sql.SQLFeatureNotSupportedException;
36  import java.sql.SQLWarning;
37  import java.sql.Statement;
38  import java.util.Collection;
39  
40  /**
41   * Adapter for {@code Statement}.
42   */
43  @Getter
44  public abstract class AbstractStatementAdapter extends WrapperAdapter implements Statement {
45      
46      @Getter(AccessLevel.NONE)
47      private final ForceExecuteTemplate<Statement> forceExecuteTemplate = new ForceExecuteTemplate<>();
48      
49      private boolean poolable;
50      
51      private int fetchSize;
52      
53      private int fetchDirection;
54      
55      private boolean closeOnCompletion;
56      
57      private boolean closed;
58      
59      protected final void handleAutoCommitBeforeExecution(final SQLStatement sqlStatement, final ShardingSphereConnection connection) throws SQLException {
60          if (AutoCommitUtils.needOpenTransaction(sqlStatement)) {
61              connection.beginTransactionIfNeededWhenAutoCommitFalse();
62          }
63      }
64      
65      protected final void handleAutoCommitAfterExecution(final ShardingSphereConnection connection) throws SQLException {
66          if (connection.getAutoCommit()) {
67              connection.getDatabaseConnectionManager().clearCachedConnections();
68          }
69      }
70      
71      protected final void handleExceptionInTransaction(final ShardingSphereConnection connection, final ShardingSphereMetaData metaData) {
72          if (connection.getDatabaseConnectionManager().getConnectionContext().getTransactionContext().isInTransaction()) {
73              DatabaseType databaseType = metaData.getDatabase(connection.getCurrentDatabaseName()).getProtocolType();
74              DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData();
75              if (dialectDatabaseMetaData.getSchemaOption().getDefaultSchema().isPresent()) {
76                  connection.getDatabaseConnectionManager().getConnectionContext().getTransactionContext().setExceptionOccur(true);
77              }
78          }
79      }
80      
81      protected abstract boolean isAccumulate();
82      
83      protected abstract Collection<? extends Statement> getRoutedStatements();
84      
85      protected abstract StatementManager getStatementManager();
86      
87      @SuppressWarnings({"unchecked", "rawtypes"})
88      @Override
89      public final void setPoolable(final boolean poolable) throws SQLException {
90          this.poolable = poolable;
91          getMethodInvocationRecorder().record("setPoolable", statement -> statement.setPoolable(poolable));
92          forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setPoolable(poolable));
93      }
94      
95      @SuppressWarnings({"unchecked", "rawtypes"})
96      @Override
97      public final void setFetchSize(final int rows) throws SQLException {
98          fetchSize = rows;
99          getMethodInvocationRecorder().record("setFetchSize", statement -> statement.setFetchSize(rows));
100         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setFetchSize(rows));
101     }
102     
103     @SuppressWarnings({"unchecked", "rawtypes"})
104     @Override
105     public final void setFetchDirection(final int direction) throws SQLException {
106         fetchDirection = direction;
107         getMethodInvocationRecorder().record("setFetchDirection", statement -> statement.setFetchDirection(direction));
108         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setFetchDirection(direction));
109     }
110     
111     @Override
112     public final int getMaxFieldSize() throws SQLException {
113         return getRoutedStatements().isEmpty() ? 0 : getRoutedStatements().iterator().next().getMaxFieldSize();
114     }
115     
116     @SuppressWarnings({"unchecked", "rawtypes"})
117     @Override
118     public final void setMaxFieldSize(final int max) throws SQLException {
119         getMethodInvocationRecorder().record("setMaxFieldSize", statement -> statement.setMaxFieldSize(max));
120         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setMaxFieldSize(max));
121     }
122     
123     // TODO Confirm MaxRows for multiple databases is need special handle. eg: 10 statements maybe MaxRows / 10
124     @Override
125     public final int getMaxRows() throws SQLException {
126         return getRoutedStatements().isEmpty() ? -1 : getRoutedStatements().iterator().next().getMaxRows();
127     }
128     
129     @SuppressWarnings({"unchecked", "rawtypes"})
130     @Override
131     public final void setMaxRows(final int max) throws SQLException {
132         getMethodInvocationRecorder().record("setMaxRows", statement -> statement.setMaxRows(max));
133         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setMaxRows(max));
134     }
135     
136     @Override
137     public final int getQueryTimeout() throws SQLException {
138         return getRoutedStatements().isEmpty() ? 0 : getRoutedStatements().iterator().next().getQueryTimeout();
139     }
140     
141     @SuppressWarnings({"unchecked", "rawtypes"})
142     @Override
143     public final void setQueryTimeout(final int seconds) throws SQLException {
144         getMethodInvocationRecorder().record("setQueryTimeout", statement -> statement.setQueryTimeout(seconds));
145         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setQueryTimeout(seconds));
146     }
147     
148     @SuppressWarnings({"unchecked", "rawtypes"})
149     @Override
150     public final void setEscapeProcessing(final boolean enable) throws SQLException {
151         getMethodInvocationRecorder().record("setEscapeProcessing", statement -> statement.setEscapeProcessing(enable));
152         forceExecuteTemplate.execute((Collection) getRoutedStatements(), statement -> statement.setEscapeProcessing(enable));
153     }
154     
155     @Override
156     public final int getUpdateCount() throws SQLException {
157         if (isAccumulate()) {
158             return accumulate();
159         }
160         Collection<? extends Statement> statements = getRoutedStatements();
161         if (statements.isEmpty()) {
162             return -1;
163         }
164         return getRoutedStatements().iterator().next().getUpdateCount();
165     }
166     
167     private int accumulate() throws SQLException {
168         long result = 0L;
169         boolean hasResult = false;
170         for (Statement each : getRoutedStatements()) {
171             int updateCount = each.getUpdateCount();
172             if (updateCount > -1) {
173                 hasResult = true;
174             }
175             result += updateCount;
176         }
177         if (result > Integer.MAX_VALUE) {
178             result = Integer.MAX_VALUE;
179         }
180         return hasResult ? (int) result : -1;
181     }
182     
183     @Override
184     public final boolean getMoreResults() throws SQLException {
185         boolean result = false;
186         for (Statement each : getRoutedStatements()) {
187             result = each.getMoreResults();
188         }
189         return result;
190     }
191     
192     @Override
193     public final boolean getMoreResults(final int current) {
194         return false;
195     }
196     
197     @Override
198     public final boolean isCloseOnCompletion() {
199         return closeOnCompletion;
200     }
201     
202     @Override
203     public final void closeOnCompletion() {
204         closeOnCompletion = true;
205     }
206     
207     @Override
208     public final void setCursorName(final String name) throws SQLException {
209         ShardingSpherePreconditions.checkState(1 == getRoutedStatements().size(), () -> new SQLFeatureNotSupportedException("setCursorName"));
210         getRoutedStatements().iterator().next().setCursorName(name);
211     }
212     
213     @SuppressWarnings({"unchecked", "rawtypes"})
214     @Override
215     public final void cancel() throws SQLException {
216         forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::cancel);
217     }
218     
219     @Override
220     public final SQLWarning getWarnings() {
221         return null;
222     }
223     
224     @Override
225     public final void clearWarnings() {
226     }
227     
228     @SuppressWarnings({"unchecked", "rawtypes"})
229     @Override
230     public final void close() throws SQLException {
231         closed = true;
232         try {
233             forceExecuteTemplate.execute((Collection) getRoutedStatements(), Statement::close);
234             closeExecutor();
235             if (null != getStatementManager()) {
236                 getStatementManager().close();
237                 Connection connection = getConnection();
238                 if (connection instanceof ShardingSphereConnection) {
239                     ShardingSphereConnection logicalConnection = (ShardingSphereConnection) connection;
240                     logicalConnection.getStatementManagers().remove(getStatementManager());
241                     handleAutoCommitAfterExecution(logicalConnection);
242                 }
243             }
244         } finally {
245             getRoutedStatements().clear();
246         }
247     }
248     
249     protected abstract void closeExecutor() throws SQLException;
250 }