View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.backend.connector;
19  
20  import com.google.common.base.Preconditions;
21  import com.google.common.collect.LinkedHashMultimap;
22  import com.google.common.collect.Multimap;
23  import lombok.Getter;
24  import lombok.RequiredArgsConstructor;
25  import lombok.extern.slf4j.Slf4j;
26  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27  import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
28  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DatabaseConnectionManager;
29  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
30  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
31  import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionPostProcessor;
32  import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionResourceLock;
33  import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
34  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
35  import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
36  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
37  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
38  import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
39  import org.apache.shardingsphere.transaction.spi.TransactionHook;
40  
41  import java.sql.Connection;
42  import java.sql.SQLException;
43  import java.sql.Statement;
44  import java.util.ArrayList;
45  import java.util.Collection;
46  import java.util.Collections;
47  import java.util.LinkedList;
48  import java.util.List;
49  import java.util.Map;
50  import java.util.Map.Entry;
51  import java.util.concurrent.ConcurrentHashMap;
52  import java.util.concurrent.atomic.AtomicBoolean;
53  
54  /**
55   * Database connection manager of ShardingSphere-Proxy.
56   */
57  @Slf4j
58  @RequiredArgsConstructor
59  @Getter
60  public final class ProxyDatabaseConnectionManager implements DatabaseConnectionManager<Connection> {
61      
62      private final ConnectionSession connectionSession;
63      
64      private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
65      
66      private final Collection<ProxyBackendHandler> backendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
67      
68      private final Collection<ProxyBackendHandler> inUseBackendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
69      
70      private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();
71      
72      private final ConnectionResourceLock connectionResourceLock = new ConnectionResourceLock();
73      
74      private final AtomicBoolean closed = new AtomicBoolean(false);
75      
76      private final Object closeLock = new Object();
77      
78      @SuppressWarnings("rawtypes")
79      private final Map<ShardingSphereRule, TransactionHook> transactionHooks = OrderedSPILoader.getServices(
80              TransactionHook.class, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
81      
82      @Override
83      public List<Connection> getConnections(final String databaseName, final String dataSourceName, final int connectionOffset, final int connectionSize,
84                                             final ConnectionMode connectionMode) throws SQLException {
85          Preconditions.checkNotNull(databaseName, "Current database name is null.");
86          Collection<Connection> connections;
87          String cacheKey = getKey(databaseName, dataSourceName);
88          synchronized (cachedConnections) {
89              connections = cachedConnections.get(cacheKey);
90          }
91          List<Connection> result;
92          int maxConnectionSize = connectionOffset + connectionSize;
93          if (connections.size() >= maxConnectionSize) {
94              result = new ArrayList<>(connections).subList(connectionOffset, maxConnectionSize);
95          } else if (connections.isEmpty()) {
96              Collection<Connection> newConnections = createNewConnections(databaseName, dataSourceName, maxConnectionSize, connectionMode);
97              result = new ArrayList<>(newConnections).subList(connectionOffset, maxConnectionSize);
98              synchronized (cachedConnections) {
99                  cachedConnections.putAll(cacheKey, newConnections);
100             }
101             executeTransactionHooksAfterCreateConnections(result);
102         } else {
103             List<Connection> allConnections = new ArrayList<>(maxConnectionSize);
104             allConnections.addAll(connections);
105             List<Connection> newConnections = createNewConnections(databaseName, dataSourceName, maxConnectionSize - connections.size(), connectionMode);
106             allConnections.addAll(newConnections);
107             result = allConnections.subList(connectionOffset, maxConnectionSize);
108             synchronized (cachedConnections) {
109                 cachedConnections.putAll(cacheKey, newConnections);
110             }
111         }
112         return result;
113     }
114     
115     private String getKey(final String databaseName, final String dataSourceName) {
116         return databaseName.toLowerCase() + "." + dataSourceName;
117     }
118     
119     @SuppressWarnings({"unchecked", "rawtypes"})
120     private void executeTransactionHooksAfterCreateConnections(final List<Connection> connections) throws SQLException {
121         if (connectionSession.getTransactionStatus().isInTransaction()) {
122             DatabaseType databaseType = ProxyContext.getInstance().getDatabaseType();
123             for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
124                 entry.getValue().afterCreateConnections(entry.getKey(), databaseType, connections, connectionSession.getConnectionContext().getTransactionContext());
125             }
126         }
127     }
128     
129     private List<Connection> createNewConnections(final String databaseName, final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
130         List<Connection> result = ProxyContext.getInstance().getBackendDataSource().getConnections(databaseName.toLowerCase(), dataSourceName, connectionSize, connectionMode);
131         setSessionVariablesIfNecessary(result);
132         for (Connection each : result) {
133             replayTransactionOption(each);
134         }
135         if (connectionSession.getTransactionStatus().isInTransaction()) {
136             for (Connection each : result) {
137                 replayMethodsInvocation(each);
138             }
139         }
140         return result;
141     }
142     
143     private void setSessionVariablesIfNecessary(final List<Connection> connections) throws SQLException {
144         if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || connections.isEmpty()) {
145             return;
146         }
147         String databaseType = connections.iterator().next().getMetaData().getDatabaseProductName();
148         List<String> setSQLs = connectionSession.getRequiredSessionVariableRecorder().toSetSQLs(databaseType);
149         try {
150             executeSetSessionVariables(connections, setSQLs);
151         } catch (final SQLException ex) {
152             releaseConnection(connections, ex);
153             throw ex;
154         }
155     }
156     
157     private void executeSetSessionVariables(final List<Connection> connections, final List<String> setSQLs) throws SQLException {
158         for (Connection each : connections) {
159             try (Statement statement = each.createStatement()) {
160                 for (String eachSetSQL : setSQLs) {
161                     statement.execute(eachSetSQL);
162                 }
163             }
164         }
165     }
166     
167     private void releaseConnection(final List<Connection> connections, final SQLException sqlException) {
168         for (Connection each : connections) {
169             try {
170                 each.close();
171             } catch (final SQLException ex) {
172                 sqlException.setNextException(ex);
173             }
174         }
175     }
176     
177     private void replayMethodsInvocation(final Connection target) throws SQLException {
178         for (ConnectionPostProcessor each : connectionPostProcessors) {
179             each.process(target);
180         }
181     }
182     
183     private void replayTransactionOption(final Connection connection) throws SQLException {
184         if (null == connection) {
185             return;
186         }
187         if (connectionSession.isReadOnly()) {
188             connection.setReadOnly(true);
189         }
190         if (connectionSession.getIsolationLevel().isPresent()) {
191             connection.setTransactionIsolation(TransactionUtils.getTransactionIsolationLevel(connectionSession.getIsolationLevel().get()));
192         }
193     }
194     
195     /**
196      * Get used data source names.
197      *
198      * @return used data source names
199      */
200     public Collection<String> getUsedDataSourceNames() {
201         Collection<String> result = new ArrayList<>(cachedConnections.size());
202         String databaseName = connectionSession.getUsedDatabaseName().toLowerCase();
203         for (String each : cachedConnections.keySet()) {
204             String[] split = each.split("\\.", 2);
205             String cachedDatabaseName = split[0];
206             String cachedDataSourceName = split[1];
207             if (databaseName.equals(cachedDatabaseName)) {
208                 result.add(cachedDataSourceName);
209             }
210         }
211         return result;
212     }
213     
214     /**
215      * Get connection size.
216      *
217      * @return connection size
218      */
219     public int getConnectionSize() {
220         return cachedConnections.values().size();
221     }
222     
223     /**
224      * Add handler.
225      *
226      * @param handler handler to be added
227      */
228     public void add(final ProxyBackendHandler handler) {
229         backendHandlers.add(handler);
230     }
231     
232     /**
233      * Mark a handler as in use.
234      *
235      * @param handler handler to be marked
236      */
237     public void markResourceInUse(final ProxyBackendHandler handler) {
238         inUseBackendHandlers.add(handler);
239     }
240     
241     /**
242      * Unmark a in use proxy backend handler.
243      *
244      * @param handler proxy backend handler to be added
245      */
246     public void unmarkResourceInUse(final ProxyBackendHandler handler) {
247         inUseBackendHandlers.remove(handler);
248     }
249     
250     /**
251      * Handle auto commit.
252      */
253     public void handleAutoCommit() {
254         if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) {
255             BackendTransactionManager transactionManager = new BackendTransactionManager(this);
256             transactionManager.begin();
257         }
258     }
259     
260     /**
261      * Close resources used in execution.
262      *
263      * @throws BackendConnectionException backend connection exception
264      */
265     public void closeExecutionResources() throws BackendConnectionException {
266         synchronized (closeLock) {
267             Collection<Exception> result = new LinkedList<>(closeHandlers(false));
268             if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext()))) {
269                 result.addAll(closeHandlers(true));
270                 result.addAll(closeConnections(false));
271             } else if (closed.get()) {
272                 result.addAll(closeHandlers(true));
273                 result.addAll(closeConnections(true));
274             }
275             if (result.isEmpty()) {
276                 return;
277             }
278             throw new BackendConnectionException(result);
279         }
280     }
281     
282     /**
283      * Close all resources.
284      *
285      * @return exceptions occurred during closing resources
286      */
287     public Collection<SQLException> closeAllResources() {
288         synchronized (closeLock) {
289             closed.set(true);
290             Collection<SQLException> result = new LinkedList<>();
291             result.addAll(closeHandlers(true));
292             result.addAll(closeConnections(true));
293             return result;
294         }
295     }
296     
297     /**
298      * Close handlers.
299      *
300      * @param includeInUse include handlers in use
301      * @return SQL exception when handler close
302      */
303     public Collection<SQLException> closeHandlers(final boolean includeInUse) {
304         Collection<SQLException> result = new LinkedList<>();
305         for (ProxyBackendHandler each : backendHandlers) {
306             if (!includeInUse && inUseBackendHandlers.contains(each)) {
307                 continue;
308             }
309             try {
310                 each.close();
311             } catch (final SQLException ex) {
312                 result.add(ex);
313             }
314         }
315         if (includeInUse) {
316             inUseBackendHandlers.clear();
317         }
318         backendHandlers.retainAll(inUseBackendHandlers);
319         return result;
320     }
321     
322     /**
323      * Close connections.
324      *
325      * @param forceRollback is force rollback
326      * @return SQL exception when connections close
327      */
328     public Collection<SQLException> closeConnections(final boolean forceRollback) {
329         Collection<SQLException> result = new LinkedList<>();
330         synchronized (cachedConnections) {
331             resetSessionVariablesIfNecessary(cachedConnections.values(), result);
332             for (Connection each : cachedConnections.values()) {
333                 try {
334                     if (forceRollback && connectionSession.getTransactionStatus().isInTransaction()) {
335                         each.rollback();
336                     }
337                 } catch (final SQLException ignored) {
338                 } finally {
339                     try {
340                         each.close();
341                     } catch (final SQLException ex) {
342                         if (!isClosed(each)) {
343                             log.warn("Close connection {} failed.", each, ex);
344                             result.add(ex);
345                         }
346                     }
347                 }
348             }
349             cachedConnections.clear();
350         }
351         if (!forceRollback) {
352             connectionPostProcessors.clear();
353         }
354         return result;
355     }
356     
357     private boolean isClosed(final Connection connection) {
358         try {
359             if (connection.isClosed()) {
360                 return true;
361             }
362         } catch (final SQLException ignored) {
363         }
364         return false;
365     }
366     
367     private void resetSessionVariablesIfNecessary(final Collection<Connection> values, final Collection<SQLException> exceptions) {
368         if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || values.isEmpty()) {
369             return;
370         }
371         String databaseType;
372         try {
373             databaseType = values.iterator().next().getMetaData().getDatabaseProductName();
374         } catch (final SQLException ex) {
375             exceptions.add(ex);
376             return;
377         }
378         List<String> resetSQLs = connectionSession.getRequiredSessionVariableRecorder().toResetSQLs(databaseType);
379         for (Connection each : values) {
380             try (Statement statement = each.createStatement()) {
381                 for (String eachResetSQL : resetSQLs) {
382                     statement.execute(eachResetSQL);
383                 }
384             } catch (final SQLException ex) {
385                 exceptions.add(ex);
386             }
387         }
388         connectionSession.getRequiredSessionVariableRecorder().removeVariablesWithDefaultValue();
389     }
390 }