View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.backend.connector;
19  
20  import com.google.common.base.Preconditions;
21  import com.google.common.collect.LinkedHashMultimap;
22  import com.google.common.collect.Multimap;
23  import lombok.Getter;
24  import lombok.RequiredArgsConstructor;
25  import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
26  import org.apache.shardingsphere.infra.executor.sql.prepare.driver.OnlineDatabaseConnectionManager;
27  import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
28  import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionPostProcessor;
29  import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ResourceLock;
30  import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
31  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
32  import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
33  import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
34  import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
35  import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
36  import org.apache.shardingsphere.transaction.spi.TransactionHook;
37  
38  import java.sql.Connection;
39  import java.sql.SQLException;
40  import java.sql.Statement;
41  import java.util.ArrayList;
42  import java.util.Collection;
43  import java.util.Collections;
44  import java.util.LinkedList;
45  import java.util.List;
46  import java.util.concurrent.ConcurrentHashMap;
47  import java.util.concurrent.atomic.AtomicBoolean;
48  
49  /**
50   * Database connection manager of ShardingSphere-Proxy.
51   */
52  @RequiredArgsConstructor
53  @Getter
54  public final class ProxyDatabaseConnectionManager implements OnlineDatabaseConnectionManager<Connection> {
55      
56      private final ConnectionSession connectionSession;
57      
58      private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
59      
60      private final Collection<ProxyBackendHandler> backendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
61      
62      private final Collection<ProxyBackendHandler> inUseBackendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
63      
64      private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();
65      
66      private final ResourceLock resourceLock = new ResourceLock();
67      
68      private final AtomicBoolean closed = new AtomicBoolean(false);
69      
70      private final Collection<TransactionHook> transactionHooks = ShardingSphereServiceLoader.getServiceInstances(TransactionHook.class);
71      
72      @Override
73      public List<Connection> getConnections(final String dataSourceName, final int connectionOffset, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
74          Preconditions.checkNotNull(connectionSession.getDatabaseName(), "Current database name is null.");
75          Collection<Connection> connections;
76          synchronized (cachedConnections) {
77              connections = cachedConnections.get(connectionSession.getDatabaseName().toLowerCase() + "." + dataSourceName);
78          }
79          List<Connection> result;
80          int maxConnectionSize = connectionOffset + connectionSize;
81          if (connections.size() >= maxConnectionSize) {
82              result = new ArrayList<>(connections).subList(connectionOffset, maxConnectionSize);
83          } else if (connections.isEmpty()) {
84              Collection<Connection> newConnections = createNewConnections(dataSourceName, maxConnectionSize, connectionMode);
85              result = new ArrayList<>(newConnections).subList(connectionOffset, maxConnectionSize);
86              synchronized (cachedConnections) {
87                  cachedConnections.putAll(connectionSession.getDatabaseName().toLowerCase() + "." + dataSourceName, newConnections);
88              }
89              executeTransactionHooksAfterCreateConnections(result);
90          } else {
91              List<Connection> allConnections = new ArrayList<>(maxConnectionSize);
92              allConnections.addAll(connections);
93              List<Connection> newConnections = createNewConnections(dataSourceName, maxConnectionSize - connections.size(), connectionMode);
94              allConnections.addAll(newConnections);
95              result = allConnections.subList(connectionOffset, maxConnectionSize);
96              synchronized (cachedConnections) {
97                  cachedConnections.putAll(connectionSession.getDatabaseName().toLowerCase() + "." + dataSourceName, newConnections);
98              }
99          }
100         return result;
101     }
102     
103     private void executeTransactionHooksAfterCreateConnections(final List<Connection> result) throws SQLException {
104         if (connectionSession.getTransactionStatus().isInTransaction()) {
105             for (TransactionHook each : transactionHooks) {
106                 each.afterCreateConnections(result, connectionSession.getConnectionContext().getTransactionContext());
107             }
108         }
109     }
110     
111     private List<Connection> createNewConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
112         List<Connection> result = ProxyContext.getInstance().getBackendDataSource().getConnections(connectionSession.getDatabaseName().toLowerCase(), dataSourceName, connectionSize, connectionMode);
113         setSessionVariablesIfNecessary(result);
114         for (Connection each : result) {
115             replayTransactionOption(each);
116         }
117         if (connectionSession.getTransactionStatus().isInTransaction()) {
118             for (Connection each : result) {
119                 replayMethodsInvocation(each);
120             }
121         }
122         return result;
123     }
124     
125     private void setSessionVariablesIfNecessary(final List<Connection> connections) throws SQLException {
126         if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || connections.isEmpty()) {
127             return;
128         }
129         String databaseType = connections.iterator().next().getMetaData().getDatabaseProductName();
130         List<String> setSQLs = connectionSession.getRequiredSessionVariableRecorder().toSetSQLs(databaseType);
131         try {
132             executeSetSessionVariables(connections, setSQLs);
133         } catch (final SQLException ex) {
134             releaseConnection(connections, ex);
135             throw ex;
136         }
137     }
138     
139     private void executeSetSessionVariables(final List<Connection> connections, final List<String> setSQLs) throws SQLException {
140         for (Connection each : connections) {
141             try (Statement statement = each.createStatement()) {
142                 for (String eachSetSQL : setSQLs) {
143                     statement.execute(eachSetSQL);
144                 }
145             }
146         }
147     }
148     
149     private void releaseConnection(final List<Connection> connections, final SQLException sqlException) {
150         for (Connection each : connections) {
151             try {
152                 each.close();
153             } catch (final SQLException ex) {
154                 sqlException.setNextException(ex);
155             }
156         }
157     }
158     
159     private void replayMethodsInvocation(final Connection target) throws SQLException {
160         for (ConnectionPostProcessor each : connectionPostProcessors) {
161             each.process(target);
162         }
163     }
164     
165     private void replayTransactionOption(final Connection connection) throws SQLException {
166         if (null == connection) {
167             return;
168         }
169         if (connectionSession.isReadOnly()) {
170             connection.setReadOnly(true);
171         }
172         if (null != connectionSession.getIsolationLevel()) {
173             connection.setTransactionIsolation(TransactionUtils.getTransactionIsolationLevel(connectionSession.getIsolationLevel()));
174         }
175     }
176     
177     /**
178      * Get used data source names.
179      * 
180      * @return used data source names
181      */
182     public Collection<String> getUsedDataSourceNames() {
183         Collection<String> result = new ArrayList<>(cachedConnections.size());
184         String databaseName = connectionSession.getDatabaseName().toLowerCase();
185         for (String each : cachedConnections.keySet()) {
186             String[] split = each.split("\\.", 2);
187             String cachedDatabaseName = split[0];
188             String cachedDataSourceName = split[1];
189             if (databaseName.equals(cachedDatabaseName)) {
190                 result.add(cachedDataSourceName);
191             }
192         }
193         return result;
194     }
195     
196     /**
197      * Get connection size.
198      *
199      * @return connection size
200      */
201     public int getConnectionSize() {
202         return cachedConnections.values().size();
203     }
204     
205     /**
206      * Add handler.
207      *
208      * @param handler handler to be added
209      */
210     public void add(final ProxyBackendHandler handler) {
211         backendHandlers.add(handler);
212     }
213     
214     /**
215      * Mark a handler as in use.
216      *
217      * @param handler handler to be marked
218      */
219     public void markResourceInUse(final ProxyBackendHandler handler) {
220         inUseBackendHandlers.add(handler);
221     }
222     
223     /**
224      * Unmark a in use proxy backend handler.
225      *
226      * @param handler proxy backend handler to be added
227      */
228     public void unmarkResourceInUse(final ProxyBackendHandler handler) {
229         inUseBackendHandlers.remove(handler);
230     }
231     
232     /**
233      * Handle auto commit.
234      */
235     public void handleAutoCommit() {
236         if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) {
237             BackendTransactionManager transactionManager = new BackendTransactionManager(this);
238             transactionManager.begin();
239         }
240     }
241     
242     /**
243      * Close resources used in execution.
244      *
245      * @throws BackendConnectionException backend connection exception
246      */
247     public void closeExecutionResources() throws BackendConnectionException {
248         synchronized (this) {
249             Collection<Exception> result = new LinkedList<>(closeHandlers(false));
250             if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext()))) {
251                 result.addAll(closeHandlers(true));
252                 result.addAll(closeConnections(false));
253             } else if (closed.get()) {
254                 result.addAll(closeHandlers(true));
255                 result.addAll(closeConnections(true));
256             }
257             if (result.isEmpty()) {
258                 return;
259             }
260             throw new BackendConnectionException(result);
261         }
262     }
263     
264     /**
265      * Close all resources.
266      */
267     public void closeAllResources() {
268         synchronized (this) {
269             closed.set(true);
270             closeHandlers(true);
271             closeConnections(true);
272         }
273     }
274     
275     /**
276      * Close handlers.
277      *
278      * @param includeInUse include handlers in use
279      * @return SQL exception when handler close
280      */
281     public Collection<SQLException> closeHandlers(final boolean includeInUse) {
282         Collection<SQLException> result = new LinkedList<>();
283         for (ProxyBackendHandler each : backendHandlers) {
284             if (!includeInUse && inUseBackendHandlers.contains(each)) {
285                 continue;
286             }
287             try {
288                 each.close();
289             } catch (final SQLException ex) {
290                 result.add(ex);
291             }
292         }
293         if (includeInUse) {
294             inUseBackendHandlers.clear();
295         }
296         backendHandlers.retainAll(inUseBackendHandlers);
297         return result;
298     }
299     
300     /**
301      * Close connections.
302      * 
303      * @param forceRollback is force rollback
304      * @return SQL exception when connections close
305      */
306     public Collection<SQLException> closeConnections(final boolean forceRollback) {
307         Collection<SQLException> result = new LinkedList<>();
308         synchronized (cachedConnections) {
309             resetSessionVariablesIfNecessary(cachedConnections.values(), result);
310             for (Connection each : cachedConnections.values()) {
311                 try {
312                     if (forceRollback && connectionSession.getTransactionStatus().isInTransaction()) {
313                         each.rollback();
314                     }
315                     each.close();
316                 } catch (final SQLException ex) {
317                     result.add(ex);
318                 }
319             }
320             cachedConnections.clear();
321         }
322         if (!forceRollback) {
323             connectionPostProcessors.clear();
324         }
325         return result;
326     }
327     
328     private void resetSessionVariablesIfNecessary(final Collection<Connection> values, final Collection<SQLException> exceptions) {
329         if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || values.isEmpty()) {
330             return;
331         }
332         String databaseType;
333         try {
334             databaseType = values.iterator().next().getMetaData().getDatabaseProductName();
335         } catch (final SQLException ex) {
336             exceptions.add(ex);
337             return;
338         }
339         List<String> resetSQLs = connectionSession.getRequiredSessionVariableRecorder().toResetSQLs(databaseType);
340         for (Connection each : values) {
341             try (Statement statement = each.createStatement()) {
342                 for (String eachResetSQL : resetSQLs) {
343                     statement.execute(eachResetSQL);
344                 }
345             } catch (final SQLException ex) {
346                 exceptions.add(ex);
347             }
348         }
349         connectionSession.getRequiredSessionVariableRecorder().removeVariablesWithDefaultValue();
350     }
351 }