1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.backend.connector;
19
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.LinkedHashMultimap;
22 import com.google.common.collect.Multimap;
23 import lombok.Getter;
24 import lombok.RequiredArgsConstructor;
25 import lombok.extern.slf4j.Slf4j;
26 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
27 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
28 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DatabaseConnectionManager;
29 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
30 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
31 import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionPostProcessor;
32 import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionResourceLock;
33 import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.ProxyBackendTransactionManager;
34 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
35 import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
36 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
37 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
38 import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
39 import org.apache.shardingsphere.transaction.spi.TransactionHook;
40
41 import java.sql.Connection;
42 import java.sql.SQLException;
43 import java.sql.Statement;
44 import java.util.ArrayList;
45 import java.util.Collection;
46 import java.util.Collections;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.Map.Entry;
51 import java.util.concurrent.ConcurrentHashMap;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54
55
56
57 @RequiredArgsConstructor
58 @Getter
59 @Slf4j
60 public final class ProxyDatabaseConnectionManager implements DatabaseConnectionManager<Connection> {
61
62 private final ConnectionSession connectionSession;
63
64 private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
65
66 private final Collection<ProxyBackendHandler> proxyBackendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
67
68 private final Collection<ProxyBackendHandler> inUseProxyBackendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
69
70 private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();
71
72 private final ConnectionResourceLock connectionResourceLock = new ConnectionResourceLock();
73
74 private final AtomicBoolean closed = new AtomicBoolean(false);
75
76 private final Object closeLock = new Object();
77
78 @SuppressWarnings("rawtypes")
79 private final Map<ShardingSphereRule, TransactionHook> transactionHooks = OrderedSPILoader.getServices(
80 TransactionHook.class, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
81
82 @Override
83 public List<Connection> getConnections(final String databaseName, final String dataSourceName, final int connectionOffset, final int connectionSize,
84 final ConnectionMode connectionMode) throws SQLException {
85 Preconditions.checkNotNull(databaseName, "Current database name is null.");
86 Collection<Connection> connections;
87 String cacheKey = getKey(databaseName, dataSourceName);
88 synchronized (cachedConnections) {
89 connections = cachedConnections.get(cacheKey);
90 }
91 List<Connection> result;
92 int maxConnectionSize = connectionOffset + connectionSize;
93 if (connections.size() >= maxConnectionSize) {
94 result = new ArrayList<>(connections).subList(connectionOffset, maxConnectionSize);
95 } else if (connections.isEmpty()) {
96 Collection<Connection> newConnections = createNewConnections(databaseName, dataSourceName, maxConnectionSize, connectionMode);
97 result = new ArrayList<>(newConnections).subList(connectionOffset, maxConnectionSize);
98 synchronized (cachedConnections) {
99 cachedConnections.putAll(cacheKey, newConnections);
100 }
101 executeTransactionHooksAfterCreateConnections(result);
102 } else {
103 List<Connection> allConnections = new ArrayList<>(maxConnectionSize);
104 allConnections.addAll(connections);
105 List<Connection> newConnections = createNewConnections(databaseName, dataSourceName, maxConnectionSize - connections.size(), connectionMode);
106 allConnections.addAll(newConnections);
107 result = allConnections.subList(connectionOffset, maxConnectionSize);
108 synchronized (cachedConnections) {
109 cachedConnections.putAll(cacheKey, newConnections);
110 }
111 }
112 return result;
113 }
114
115 private String getKey(final String databaseName, final String dataSourceName) {
116 return databaseName.toLowerCase() + "." + dataSourceName;
117 }
118
119 @SuppressWarnings({"unchecked", "rawtypes"})
120 private void executeTransactionHooksAfterCreateConnections(final List<Connection> connections) throws SQLException {
121 if (connectionSession.getTransactionStatus().isInTransaction()) {
122 DatabaseType databaseType = ProxyContext.getInstance().getContextManager().getDatabaseType();
123 for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
124 entry.getValue().afterCreateConnections(entry.getKey(), databaseType, connections, connectionSession.getConnectionContext().getTransactionContext());
125 }
126 }
127 }
128
129 private List<Connection> createNewConnections(final String databaseName, final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
130 List<Connection> result = ProxyContext.getInstance().getBackendDataSource().getConnections(databaseName.toLowerCase(), dataSourceName, connectionSize, connectionMode);
131 setSessionVariablesIfNecessary(result);
132 for (Connection each : result) {
133 replayTransactionOption(each);
134 }
135 if (connectionSession.getConnectionContext().getTransactionContext().isTransactionStarted()) {
136 for (Connection each : result) {
137 replayMethodsInvocation(each);
138 }
139 }
140 return result;
141 }
142
143 private void setSessionVariablesIfNecessary(final List<Connection> connections) throws SQLException {
144 if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || connections.isEmpty()) {
145 return;
146 }
147 String databaseType = connections.iterator().next().getMetaData().getDatabaseProductName();
148 List<String> setSQLs = connectionSession.getRequiredSessionVariableRecorder().toSetSQLs(databaseType);
149 try {
150 executeSetSessionVariables(connections, setSQLs);
151 } catch (final SQLException ex) {
152 releaseConnection(connections, ex);
153 throw ex;
154 }
155 }
156
157 private void executeSetSessionVariables(final List<Connection> connections, final List<String> setSQLs) throws SQLException {
158 for (Connection each : connections) {
159 try (Statement statement = each.createStatement()) {
160 for (String eachSetSQL : setSQLs) {
161 statement.execute(eachSetSQL);
162 }
163 }
164 }
165 }
166
167 private void releaseConnection(final List<Connection> connections, final SQLException sqlException) {
168 for (Connection each : connections) {
169 try {
170 each.close();
171 } catch (final SQLException ex) {
172 sqlException.setNextException(ex);
173 }
174 }
175 }
176
177 private void replayMethodsInvocation(final Connection target) throws SQLException {
178 for (ConnectionPostProcessor each : connectionPostProcessors) {
179 each.process(target);
180 }
181 }
182
183 @SuppressWarnings("MagicConstant")
184 private void replayTransactionOption(final Connection connection) throws SQLException {
185 if (null == connection) {
186 return;
187 }
188 if (connectionSession.isReadOnly()) {
189 connection.setReadOnly(true);
190 }
191 if (connectionSession.getIsolationLevel().isPresent()) {
192 connection.setTransactionIsolation(TransactionUtils.getTransactionIsolationLevel(connectionSession.getIsolationLevel().get()));
193 }
194 }
195
196
197
198
199
200
201 public Collection<String> getUsedDataSourceNames() {
202 Collection<String> result = new ArrayList<>(cachedConnections.size());
203 String databaseName = connectionSession.getUsedDatabaseName().toLowerCase();
204 for (String each : cachedConnections.keySet()) {
205 String[] split = each.split("\\.", 2);
206 String cachedDatabaseName = split[0];
207 String cachedDataSourceName = split[1];
208 if (databaseName.equals(cachedDatabaseName)) {
209 result.add(cachedDataSourceName);
210 }
211 }
212 return result;
213 }
214
215
216
217
218
219
220 public int getConnectionSize() {
221 return cachedConnections.values().size();
222 }
223
224
225
226
227
228
229 public void add(final ProxyBackendHandler handler) {
230 proxyBackendHandlers.add(handler);
231 }
232
233
234
235
236
237
238 public void markResourceInUse(final ProxyBackendHandler handler) {
239 inUseProxyBackendHandlers.add(handler);
240 }
241
242
243
244
245
246
247 public void unmarkResourceInUse(final ProxyBackendHandler handler) {
248 inUseProxyBackendHandlers.remove(handler);
249 }
250
251
252
253
254 public void handleAutoCommit() {
255 if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) {
256 ProxyBackendTransactionManager transactionManager = new ProxyBackendTransactionManager(this);
257 transactionManager.begin();
258 }
259 }
260
261
262
263
264
265
266 public void closeExecutionResources() throws BackendConnectionException {
267 synchronized (closeLock) {
268 Collection<Exception> result = new LinkedList<>(closeHandlers(false));
269 if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext()))) {
270 result.addAll(closeHandlers(true));
271 result.addAll(closeConnections(false));
272 } else if (closed.get()) {
273 result.addAll(closeHandlers(true));
274 result.addAll(closeConnections(true));
275 }
276 if (result.isEmpty()) {
277 return;
278 }
279 throw new BackendConnectionException(result);
280 }
281 }
282
283
284
285
286
287
288 public Collection<SQLException> closeAllResources() {
289 synchronized (closeLock) {
290 closed.set(true);
291 Collection<SQLException> result = new LinkedList<>();
292 result.addAll(closeHandlers(true));
293 result.addAll(closeConnections(true));
294 return result;
295 }
296 }
297
298
299
300
301
302
303
304 public Collection<SQLException> closeHandlers(final boolean includeInUse) {
305 Collection<SQLException> result = new LinkedList<>();
306 for (ProxyBackendHandler each : proxyBackendHandlers) {
307 if (!includeInUse && inUseProxyBackendHandlers.contains(each)) {
308 continue;
309 }
310 try {
311 each.close();
312 } catch (final SQLException ex) {
313 result.add(ex);
314 }
315 }
316 if (includeInUse) {
317 inUseProxyBackendHandlers.clear();
318 }
319 proxyBackendHandlers.retainAll(inUseProxyBackendHandlers);
320 return result;
321 }
322
323
324
325
326
327
328
329 public Collection<SQLException> closeConnections(final boolean forceRollback) {
330 Collection<SQLException> result = new LinkedList<>();
331 synchronized (cachedConnections) {
332 resetSessionVariablesIfNecessary(cachedConnections.values(), result);
333 for (Connection each : cachedConnections.values()) {
334 try {
335 if (forceRollback && connectionSession.getTransactionStatus().isInTransaction()) {
336 each.rollback();
337 }
338 } catch (final SQLException ignored) {
339 } finally {
340 try {
341 each.close();
342 } catch (final SQLException ex) {
343 if (!isClosed(each)) {
344 log.warn("Close connection {} failed.", each, ex);
345 result.add(ex);
346 }
347 }
348 }
349 }
350 cachedConnections.clear();
351 }
352 if (!forceRollback) {
353 connectionPostProcessors.clear();
354 }
355 return result;
356 }
357
358 private boolean isClosed(final Connection connection) {
359 try {
360 if (connection.isClosed()) {
361 return true;
362 }
363 } catch (final SQLException ignored) {
364 }
365 return false;
366 }
367
368 private void resetSessionVariablesIfNecessary(final Collection<Connection> values, final Collection<SQLException> exceptions) {
369 if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || values.isEmpty()) {
370 return;
371 }
372 String databaseType;
373 try {
374 databaseType = values.iterator().next().getMetaData().getDatabaseProductName();
375 } catch (final SQLException ex) {
376 exceptions.add(ex);
377 return;
378 }
379 List<String> resetSQLs = connectionSession.getRequiredSessionVariableRecorder().toResetSQLs(databaseType);
380 for (Connection each : values) {
381 try (Statement statement = each.createStatement()) {
382 for (String eachResetSQL : resetSQLs) {
383 statement.execute(eachResetSQL);
384 }
385 } catch (final SQLException ex) {
386 exceptions.add(ex);
387 }
388 }
389 connectionSession.getRequiredSessionVariableRecorder().removeVariablesWithDefaultValue();
390 }
391 }