View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.backend.connector;
19  
20  import com.google.common.base.Preconditions;
21  import com.google.common.collect.LinkedHashMultimap;
22  import com.google.common.collect.Multimap;
23  import lombok.Getter;
24  import lombok.RequiredArgsConstructor;
25  import lombok.extern.slf4j.Slf4j;
26  import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
27  import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
28  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DatabaseConnectionManager;
29  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
30  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
31  import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionPostProcessor;
32  import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionResourceLock;
33  import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.ProxyBackendTransactionManager;
34  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
35  import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
36  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
37  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
38  import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
39  import org.apache.shardingsphere.transaction.spi.TransactionHook;
40  
41  import java.sql.Connection;
42  import java.sql.SQLException;
43  import java.sql.Statement;
44  import java.util.ArrayList;
45  import java.util.Collection;
46  import java.util.Collections;
47  import java.util.LinkedList;
48  import java.util.List;
49  import java.util.Map;
50  import java.util.Map.Entry;
51  import java.util.concurrent.ConcurrentHashMap;
52  import java.util.concurrent.atomic.AtomicBoolean;
53  
54  /**
55   * Database connection manager of ShardingSphere-Proxy.
56   */
57  @RequiredArgsConstructor
58  @Getter
59  @Slf4j
60  public final class ProxyDatabaseConnectionManager implements DatabaseConnectionManager<Connection> {
61      
62      private final ConnectionSession connectionSession;
63      
64      private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
65      
66      private final Collection<ProxyBackendHandler> proxyBackendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
67      
68      private final Collection<ProxyBackendHandler> inUseProxyBackendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
69      
70      private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();
71      
72      private final ConnectionResourceLock connectionResourceLock = new ConnectionResourceLock();
73      
74      private final AtomicBoolean closed = new AtomicBoolean(false);
75      
76      private final Object closeLock = new Object();
77      
78      @SuppressWarnings("rawtypes")
79      private final Map<ShardingSphereRule, TransactionHook> transactionHooks = OrderedSPILoader.getServices(
80              TransactionHook.class, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
81      
82      @Override
83      public List<Connection> getConnections(final String databaseName, final String dataSourceName, final int connectionOffset, final int connectionSize,
84                                             final ConnectionMode connectionMode) throws SQLException {
85          Preconditions.checkNotNull(databaseName, "Current database name is null.");
86          Collection<Connection> connections;
87          String cacheKey = getKey(databaseName, dataSourceName);
88          synchronized (cachedConnections) {
89              connections = cachedConnections.get(cacheKey);
90          }
91          List<Connection> result;
92          int maxConnectionSize = connectionOffset + connectionSize;
93          if (connections.size() >= maxConnectionSize) {
94              result = new ArrayList<>(connections).subList(connectionOffset, maxConnectionSize);
95          } else if (connections.isEmpty()) {
96              Collection<Connection> newConnections = createNewConnections(databaseName, dataSourceName, maxConnectionSize, connectionMode);
97              result = new ArrayList<>(newConnections).subList(connectionOffset, maxConnectionSize);
98              synchronized (cachedConnections) {
99                  cachedConnections.putAll(cacheKey, newConnections);
100             }
101             executeTransactionHooksAfterCreateConnections(result);
102         } else {
103             List<Connection> allConnections = new ArrayList<>(maxConnectionSize);
104             allConnections.addAll(connections);
105             List<Connection> newConnections = createNewConnections(databaseName, dataSourceName, maxConnectionSize - connections.size(), connectionMode);
106             allConnections.addAll(newConnections);
107             result = allConnections.subList(connectionOffset, maxConnectionSize);
108             synchronized (cachedConnections) {
109                 cachedConnections.putAll(cacheKey, newConnections);
110             }
111         }
112         return result;
113     }
114     
115     private String getKey(final String databaseName, final String dataSourceName) {
116         return databaseName.toLowerCase() + "." + dataSourceName;
117     }
118     
119     @SuppressWarnings({"unchecked", "rawtypes"})
120     private void executeTransactionHooksAfterCreateConnections(final List<Connection> connections) throws SQLException {
121         if (connectionSession.getTransactionStatus().isInTransaction()) {
122             DatabaseType databaseType = ProxyContext.getInstance().getContextManager().getDatabaseType();
123             for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
124                 entry.getValue().afterCreateConnections(entry.getKey(), databaseType, connections, connectionSession.getConnectionContext().getTransactionContext());
125             }
126         }
127     }
128     
129     private List<Connection> createNewConnections(final String databaseName, final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
130         List<Connection> result = ProxyContext.getInstance().getBackendDataSource().getConnections(databaseName.toLowerCase(), dataSourceName, connectionSize, connectionMode);
131         setSessionVariablesIfNecessary(result);
132         for (Connection each : result) {
133             replayTransactionOption(each);
134         }
135         if (connectionSession.getConnectionContext().getTransactionContext().isTransactionStarted()) {
136             for (Connection each : result) {
137                 replayMethodsInvocation(each);
138             }
139         }
140         return result;
141     }
142     
143     private void setSessionVariablesIfNecessary(final List<Connection> connections) throws SQLException {
144         if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || connections.isEmpty()) {
145             return;
146         }
147         String databaseType = connections.iterator().next().getMetaData().getDatabaseProductName();
148         List<String> setSQLs = connectionSession.getRequiredSessionVariableRecorder().toSetSQLs(databaseType);
149         try {
150             executeSetSessionVariables(connections, setSQLs);
151         } catch (final SQLException ex) {
152             releaseConnection(connections, ex);
153             throw ex;
154         }
155     }
156     
157     private void executeSetSessionVariables(final List<Connection> connections, final List<String> setSQLs) throws SQLException {
158         for (Connection each : connections) {
159             try (Statement statement = each.createStatement()) {
160                 for (String eachSetSQL : setSQLs) {
161                     statement.execute(eachSetSQL);
162                 }
163             }
164         }
165     }
166     
167     private void releaseConnection(final List<Connection> connections, final SQLException sqlException) {
168         for (Connection each : connections) {
169             try {
170                 each.close();
171             } catch (final SQLException ex) {
172                 sqlException.setNextException(ex);
173             }
174         }
175     }
176     
177     private void replayMethodsInvocation(final Connection target) throws SQLException {
178         for (ConnectionPostProcessor each : connectionPostProcessors) {
179             each.process(target);
180         }
181     }
182     
183     @SuppressWarnings("MagicConstant")
184     private void replayTransactionOption(final Connection connection) throws SQLException {
185         if (null == connection) {
186             return;
187         }
188         if (connectionSession.isReadOnly()) {
189             connection.setReadOnly(true);
190         }
191         if (connectionSession.getIsolationLevel().isPresent()) {
192             connection.setTransactionIsolation(TransactionUtils.getTransactionIsolationLevel(connectionSession.getIsolationLevel().get()));
193         }
194     }
195     
196     /**
197      * Get used data source names.
198      *
199      * @return used data source names
200      */
201     public Collection<String> getUsedDataSourceNames() {
202         Collection<String> result = new ArrayList<>(cachedConnections.size());
203         String databaseName = connectionSession.getUsedDatabaseName().toLowerCase();
204         for (String each : cachedConnections.keySet()) {
205             String[] split = each.split("\\.", 2);
206             String cachedDatabaseName = split[0];
207             String cachedDataSourceName = split[1];
208             if (databaseName.equals(cachedDatabaseName)) {
209                 result.add(cachedDataSourceName);
210             }
211         }
212         return result;
213     }
214     
215     /**
216      * Get connection size.
217      *
218      * @return connection size
219      */
220     public int getConnectionSize() {
221         return cachedConnections.values().size();
222     }
223     
224     /**
225      * Add handler.
226      *
227      * @param handler handler to be added
228      */
229     public void add(final ProxyBackendHandler handler) {
230         proxyBackendHandlers.add(handler);
231     }
232     
233     /**
234      * Mark a handler as in use.
235      *
236      * @param handler handler to be marked
237      */
238     public void markResourceInUse(final ProxyBackendHandler handler) {
239         inUseProxyBackendHandlers.add(handler);
240     }
241     
242     /**
243      * Unmark a in use proxy backend handler.
244      *
245      * @param handler proxy backend handler to be added
246      */
247     public void unmarkResourceInUse(final ProxyBackendHandler handler) {
248         inUseProxyBackendHandlers.remove(handler);
249     }
250     
251     /**
252      * Handle auto commit.
253      */
254     public void handleAutoCommit() {
255         if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) {
256             ProxyBackendTransactionManager transactionManager = new ProxyBackendTransactionManager(this);
257             transactionManager.begin();
258         }
259     }
260     
261     /**
262      * Close resources used in execution.
263      *
264      * @throws BackendConnectionException backend connection exception
265      */
266     public void closeExecutionResources() throws BackendConnectionException {
267         synchronized (closeLock) {
268             Collection<Exception> result = new LinkedList<>(closeHandlers(false));
269             if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext()))) {
270                 result.addAll(closeHandlers(true));
271                 result.addAll(closeConnections(false));
272             } else if (closed.get()) {
273                 result.addAll(closeHandlers(true));
274                 result.addAll(closeConnections(true));
275             }
276             if (result.isEmpty()) {
277                 return;
278             }
279             throw new BackendConnectionException(result);
280         }
281     }
282     
283     /**
284      * Close all resources.
285      *
286      * @return exceptions occurred during closing resources
287      */
288     public Collection<SQLException> closeAllResources() {
289         synchronized (closeLock) {
290             closed.set(true);
291             Collection<SQLException> result = new LinkedList<>();
292             result.addAll(closeHandlers(true));
293             result.addAll(closeConnections(true));
294             return result;
295         }
296     }
297     
298     /**
299      * Close handlers.
300      *
301      * @param includeInUse include handlers in use
302      * @return SQL exception when handler close
303      */
304     public Collection<SQLException> closeHandlers(final boolean includeInUse) {
305         Collection<SQLException> result = new LinkedList<>();
306         for (ProxyBackendHandler each : proxyBackendHandlers) {
307             if (!includeInUse && inUseProxyBackendHandlers.contains(each)) {
308                 continue;
309             }
310             try {
311                 each.close();
312             } catch (final SQLException ex) {
313                 result.add(ex);
314             }
315         }
316         if (includeInUse) {
317             inUseProxyBackendHandlers.clear();
318         }
319         proxyBackendHandlers.retainAll(inUseProxyBackendHandlers);
320         return result;
321     }
322     
323     /**
324      * Close connections.
325      *
326      * @param forceRollback is force rollback
327      * @return SQL exception when connections close
328      */
329     public Collection<SQLException> closeConnections(final boolean forceRollback) {
330         Collection<SQLException> result = new LinkedList<>();
331         synchronized (cachedConnections) {
332             resetSessionVariablesIfNecessary(cachedConnections.values(), result);
333             for (Connection each : cachedConnections.values()) {
334                 try {
335                     if (forceRollback && connectionSession.getTransactionStatus().isInTransaction()) {
336                         each.rollback();
337                     }
338                 } catch (final SQLException ignored) {
339                 } finally {
340                     try {
341                         each.close();
342                     } catch (final SQLException ex) {
343                         if (!isClosed(each)) {
344                             log.warn("Close connection {} failed.", each, ex);
345                             result.add(ex);
346                         }
347                     }
348                 }
349             }
350             cachedConnections.clear();
351         }
352         if (!forceRollback) {
353             connectionPostProcessors.clear();
354         }
355         return result;
356     }
357     
358     private boolean isClosed(final Connection connection) {
359         try {
360             if (connection.isClosed()) {
361                 return true;
362             }
363         } catch (final SQLException ignored) {
364         }
365         return false;
366     }
367     
368     private void resetSessionVariablesIfNecessary(final Collection<Connection> values, final Collection<SQLException> exceptions) {
369         if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || values.isEmpty()) {
370             return;
371         }
372         String databaseType;
373         try {
374             databaseType = values.iterator().next().getMetaData().getDatabaseProductName();
375         } catch (final SQLException ex) {
376             exceptions.add(ex);
377             return;
378         }
379         List<String> resetSQLs = connectionSession.getRequiredSessionVariableRecorder().toResetSQLs(databaseType);
380         for (Connection each : values) {
381             try (Statement statement = each.createStatement()) {
382                 for (String eachResetSQL : resetSQLs) {
383                     statement.execute(eachResetSQL);
384                 }
385             } catch (final SQLException ex) {
386                 exceptions.add(ex);
387             }
388         }
389         connectionSession.getRequiredSessionVariableRecorder().removeVariablesWithDefaultValue();
390     }
391 }