View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.driver.jdbc.core.connection;
19  
20  import lombok.Getter;
21  import org.apache.shardingsphere.driver.exception.ConnectionClosedException;
22  import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
23  import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
24  import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
25  import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
26  import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
27  import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
28  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
29  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
30  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
31  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
32  import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
33  import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
34  import org.apache.shardingsphere.mode.manager.ContextManager;
35  import org.apache.shardingsphere.transaction.ConnectionTransaction.DistributedTransactionOperationType;
36  import org.apache.shardingsphere.transaction.rule.TransactionRule;
37  
38  import java.sql.DatabaseMetaData;
39  import java.sql.PreparedStatement;
40  import java.sql.SQLException;
41  import java.sql.SQLFeatureNotSupportedException;
42  import java.sql.Savepoint;
43  import java.sql.Statement;
44  import java.util.Collection;
45  import java.util.Optional;
46  import java.util.concurrent.ConcurrentLinkedQueue;
47  import java.util.concurrent.Executor;
48  
49  /**
50   * ShardingSphere connection.
51   */
52  @HighFrequencyInvocation
53  public final class ShardingSphereConnection extends AbstractConnectionAdapter {
54      
55      private final ProcessEngine processEngine = new ProcessEngine();
56      
57      private final ForceExecuteTemplate<StatementManager> forceExecuteTemplate = new ForceExecuteTemplate<>();
58      
59      @Getter
60      private final String currentDatabaseName;
61      
62      @Getter
63      private final ContextManager contextManager;
64      
65      @Getter
66      private final DriverDatabaseConnectionManager databaseConnectionManager;
67      
68      @Getter
69      private final Collection<StatementManager> statementManagers = new ConcurrentLinkedQueue<>();
70      
71      @Getter
72      private final String processId;
73      
74      private boolean autoCommit = true;
75      
76      private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
77      
78      private boolean readOnly;
79      
80      private volatile boolean closed;
81      
82      public ShardingSphereConnection(final String currentDatabaseName, final ContextManager contextManager) {
83          this.currentDatabaseName = currentDatabaseName;
84          this.contextManager = contextManager;
85          databaseConnectionManager = new DriverDatabaseConnectionManager(currentDatabaseName, contextManager);
86          processId = processEngine.connect(currentDatabaseName);
87      }
88      
89      /**
90       * Begin transaction if needed when auto commit is false.
91       *
92       * @throws SQLException SQL exception
93       */
94      public void beginTransactionIfNeededWhenAutoCommitFalse() throws SQLException {
95          if (autoCommit || databaseConnectionManager.getConnectionContext().getTransactionContext().isInTransaction()) {
96              return;
97          }
98          databaseConnectionManager.begin();
99      }
100     
101     @Override
102     public DatabaseMetaData getMetaData() throws SQLException {
103         return new ShardingSphereDatabaseMetaData(this);
104     }
105     
106     @Override
107     public PreparedStatement prepareStatement(final String sql) throws SQLException {
108         return new ShardingSpherePreparedStatement(this, sql);
109     }
110     
111     @Override
112     public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
113         return new ShardingSpherePreparedStatement(this, sql, resultSetType, resultSetConcurrency);
114     }
115     
116     @Override
117     public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
118         return new ShardingSpherePreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);
119     }
120     
121     @Override
122     public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
123         return new ShardingSpherePreparedStatement(this, sql, autoGeneratedKeys);
124     }
125     
126     @Override
127     public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
128         return new ShardingSpherePreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);
129     }
130     
131     @Override
132     public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
133         return new ShardingSpherePreparedStatement(this, sql, columnNames);
134     }
135     
136     @Override
137     public Statement createStatement() {
138         return new ShardingSphereStatement(this);
139     }
140     
141     @Override
142     public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
143         return new ShardingSphereStatement(this, resultSetType, resultSetConcurrency);
144     }
145     
146     @Override
147     public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
148         return new ShardingSphereStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
149     }
150     
151     @Override
152     public boolean getAutoCommit() {
153         return autoCommit;
154     }
155     
156     @Override
157     public void setAutoCommit(final boolean autoCommit) throws SQLException {
158         this.autoCommit = autoCommit;
159         if (databaseConnectionManager.getConnectionTransaction().isLocalTransaction()) {
160             processLocalTransaction();
161         } else {
162             processDistributedTransaction();
163         }
164     }
165     
166     private void processLocalTransaction() throws SQLException {
167         databaseConnectionManager.setAutoCommit(autoCommit);
168         TransactionConnectionContext transactionContext = databaseConnectionManager.getConnectionContext().getTransactionContext();
169         if (autoCommit && transactionContext.isInTransaction()) {
170             transactionContext.close();
171             return;
172         }
173         if (!autoCommit && !transactionContext.isInTransaction()) {
174             transactionContext.beginTransaction(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType().name(),
175                     databaseConnectionManager.getConnectionTransaction().getDistributedTransactionManager());
176         }
177     }
178     
179     private void processDistributedTransaction() throws SQLException {
180         Optional<DistributedTransactionOperationType> operationType = databaseConnectionManager.getConnectionTransaction().getDistributedTransactionOperationType(autoCommit);
181         if (!operationType.isPresent()) {
182             return;
183         }
184         if (DistributedTransactionOperationType.BEGIN == operationType.get()) {
185             databaseConnectionManager.begin();
186         } else {
187             databaseConnectionManager.commit();
188         }
189     }
190     
191     @Override
192     public void commit() throws SQLException {
193         databaseConnectionManager.commit();
194     }
195     
196     @Override
197     public void rollback() throws SQLException {
198         databaseConnectionManager.rollback();
199     }
200     
201     @Override
202     public void rollback(final Savepoint savepoint) throws SQLException {
203         checkClose();
204         ShardingSpherePreconditions.checkState(databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit) || !isSchemaSupportedDatabaseType(),
205                 () -> new SQLFeatureNotSupportedException("ROLLBACK TO SAVEPOINT can only be used in transaction blocks"));
206         databaseConnectionManager.rollback(savepoint);
207     }
208     
209     @Override
210     public Savepoint setSavepoint(final String name) throws SQLException {
211         checkClose();
212         ShardingSpherePreconditions.checkState(databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit) || !isSchemaSupportedDatabaseType(),
213                 () -> new SQLFeatureNotSupportedException("Savepoint can only be used in transaction blocks"));
214         return databaseConnectionManager.setSavepoint(name);
215     }
216     
217     @Override
218     public Savepoint setSavepoint() throws SQLException {
219         checkClose();
220         ShardingSpherePreconditions.checkState(databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit) || !isSchemaSupportedDatabaseType(),
221                 () -> new SQLFeatureNotSupportedException("Savepoint can only be used in transaction blocks"));
222         return databaseConnectionManager.setSavepoint();
223     }
224     
225     @Override
226     public void releaseSavepoint(final Savepoint savepoint) throws SQLException {
227         checkClose();
228         ShardingSpherePreconditions.checkState(databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit) || !isSchemaSupportedDatabaseType(),
229                 () -> new SQLFeatureNotSupportedException("RELEASE SAVEPOINT can only be used in transaction blocks"));
230         if (databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit)) {
231             databaseConnectionManager.releaseSavepoint(savepoint);
232         }
233     }
234     
235     private void checkClose() throws SQLException {
236         ShardingSpherePreconditions.checkState(!isClosed(), () -> new ConnectionClosedException().toSQLException());
237     }
238     
239     private boolean isSchemaSupportedDatabaseType() {
240         DatabaseType databaseType = contextManager.getMetaDataContexts().getMetaData().getDatabase(currentDatabaseName).getProtocolType();
241         return new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getSchemaOption().getDefaultSchema().isPresent();
242     }
243     
244     @SuppressWarnings("MagicConstant")
245     @Override
246     public int getTransactionIsolation() throws SQLException {
247         return databaseConnectionManager.getTransactionIsolation().orElseGet(() -> transactionIsolation);
248     }
249     
250     @Override
251     public void setTransactionIsolation(final int level) throws SQLException {
252         transactionIsolation = level;
253         databaseConnectionManager.setTransactionIsolation(level);
254     }
255     
256     @Override
257     public boolean isReadOnly() {
258         return readOnly;
259     }
260     
261     @Override
262     public void setReadOnly(final boolean readOnly) throws SQLException {
263         this.readOnly = readOnly;
264         databaseConnectionManager.setReadOnly(readOnly);
265     }
266     
267     /*
268      * This is just to avoid the Warning in <a href="https://github.com/brettwooldridge/HikariCP/issues/2196">brettwooldridge/HikariCP#2196</a>. ShardingSphere does not propagate this property to the
269      * real JDBC Driver. `0` is actually the default value of {@link java.net.Socket#getSoTimeout()}.
270      */
271     @Override
272     public int getNetworkTimeout() {
273         return 0;
274     }
275     
276     /*
277      * This is just to avoid the Warning in <a href="https://github.com/brettwooldridge/HikariCP/issues/2196">brettwooldridge/HikariCP#2196</a>. ShardingSphere does not propagate this property to the
278      * real JDBC Driver.
279      */
280     @Override
281     public void setNetworkTimeout(final Executor executor, final int milliseconds) throws SQLException {
282         ShardingSpherePreconditions.checkState(0 <= milliseconds, () -> new SQLException("Network timeout must be a value greater than or equal to 0."));
283     }
284     
285     @Override
286     public boolean isValid(final int timeout) throws SQLException {
287         return databaseConnectionManager.isValid(timeout);
288     }
289     
290     @Override
291     public String getSchema() {
292         return currentDatabaseName;
293     }
294     
295     @Override
296     public boolean isClosed() {
297         return closed;
298     }
299     
300     @Override
301     public void close() throws SQLException {
302         if (databaseConnectionManager.getConnectionTransaction().isInDistributedTransaction(databaseConnectionManager.getConnectionContext().getTransactionContext())) {
303             databaseConnectionManager.getConnectionTransaction().rollback();
304         }
305         closed = true;
306         processEngine.disconnect(processId);
307         try {
308             forceExecuteTemplate.execute(statementManagers, StatementManager::close);
309         } finally {
310             statementManagers.clear();
311             databaseConnectionManager.close();
312         }
313     }
314 }