1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.backend.connector;
19
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.LinkedHashMultimap;
22 import com.google.common.collect.Multimap;
23 import lombok.Getter;
24 import lombok.RequiredArgsConstructor;
25 import lombok.extern.slf4j.Slf4j;
26 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
28 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.DatabaseConnectionManager;
29 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
30 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
31 import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionPostProcessor;
32 import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionResourceLock;
33 import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
34 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
35 import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
36 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
37 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
38 import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
39 import org.apache.shardingsphere.transaction.spi.TransactionHook;
40
41 import java.sql.Connection;
42 import java.sql.SQLException;
43 import java.sql.Statement;
44 import java.util.ArrayList;
45 import java.util.Collection;
46 import java.util.Collections;
47 import java.util.LinkedList;
48 import java.util.List;
49 import java.util.Map;
50 import java.util.Map.Entry;
51 import java.util.concurrent.ConcurrentHashMap;
52 import java.util.concurrent.atomic.AtomicBoolean;
53
54
55
56
57 @Slf4j
58 @RequiredArgsConstructor
59 @Getter
60 public final class ProxyDatabaseConnectionManager implements DatabaseConnectionManager<Connection> {
61
62 private final ConnectionSession connectionSession;
63
64 private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
65
66 private final Collection<ProxyBackendHandler> backendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
67
68 private final Collection<ProxyBackendHandler> inUseBackendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
69
70 private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();
71
72 private final ConnectionResourceLock connectionResourceLock = new ConnectionResourceLock();
73
74 private final AtomicBoolean closed = new AtomicBoolean(false);
75
76 private final Object closeLock = new Object();
77
78 @SuppressWarnings("rawtypes")
79 private final Map<ShardingSphereRule, TransactionHook> transactionHooks = OrderedSPILoader.getServices(
80 TransactionHook.class, ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getRules());
81
82 @Override
83 public List<Connection> getConnections(final String databaseName, final String dataSourceName, final int connectionOffset, final int connectionSize,
84 final ConnectionMode connectionMode) throws SQLException {
85 Preconditions.checkNotNull(databaseName, "Current database name is null.");
86 Collection<Connection> connections;
87 String cacheKey = getKey(databaseName, dataSourceName);
88 synchronized (cachedConnections) {
89 connections = cachedConnections.get(cacheKey);
90 }
91 List<Connection> result;
92 int maxConnectionSize = connectionOffset + connectionSize;
93 if (connections.size() >= maxConnectionSize) {
94 result = new ArrayList<>(connections).subList(connectionOffset, maxConnectionSize);
95 } else if (connections.isEmpty()) {
96 Collection<Connection> newConnections = createNewConnections(databaseName, dataSourceName, maxConnectionSize, connectionMode);
97 result = new ArrayList<>(newConnections).subList(connectionOffset, maxConnectionSize);
98 synchronized (cachedConnections) {
99 cachedConnections.putAll(cacheKey, newConnections);
100 }
101 executeTransactionHooksAfterCreateConnections(result);
102 } else {
103 List<Connection> allConnections = new ArrayList<>(maxConnectionSize);
104 allConnections.addAll(connections);
105 List<Connection> newConnections = createNewConnections(databaseName, dataSourceName, maxConnectionSize - connections.size(), connectionMode);
106 allConnections.addAll(newConnections);
107 result = allConnections.subList(connectionOffset, maxConnectionSize);
108 synchronized (cachedConnections) {
109 cachedConnections.putAll(cacheKey, newConnections);
110 }
111 }
112 return result;
113 }
114
115 private String getKey(final String databaseName, final String dataSourceName) {
116 return databaseName.toLowerCase() + "." + dataSourceName;
117 }
118
119 @SuppressWarnings({"unchecked", "rawtypes"})
120 private void executeTransactionHooksAfterCreateConnections(final List<Connection> connections) throws SQLException {
121 if (connectionSession.getTransactionStatus().isInTransaction()) {
122 DatabaseType databaseType = ProxyContext.getInstance().getDatabaseType();
123 for (Entry<ShardingSphereRule, TransactionHook> entry : transactionHooks.entrySet()) {
124 entry.getValue().afterCreateConnections(entry.getKey(), databaseType, connections, connectionSession.getConnectionContext().getTransactionContext());
125 }
126 }
127 }
128
129 private List<Connection> createNewConnections(final String databaseName, final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
130 List<Connection> result = ProxyContext.getInstance().getBackendDataSource().getConnections(databaseName.toLowerCase(), dataSourceName, connectionSize, connectionMode);
131 setSessionVariablesIfNecessary(result);
132 for (Connection each : result) {
133 replayTransactionOption(each);
134 }
135 if (connectionSession.getTransactionStatus().isInTransaction()) {
136 for (Connection each : result) {
137 replayMethodsInvocation(each);
138 }
139 }
140 return result;
141 }
142
143 private void setSessionVariablesIfNecessary(final List<Connection> connections) throws SQLException {
144 if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || connections.isEmpty()) {
145 return;
146 }
147 String databaseType = connections.iterator().next().getMetaData().getDatabaseProductName();
148 List<String> setSQLs = connectionSession.getRequiredSessionVariableRecorder().toSetSQLs(databaseType);
149 try {
150 executeSetSessionVariables(connections, setSQLs);
151 } catch (final SQLException ex) {
152 releaseConnection(connections, ex);
153 throw ex;
154 }
155 }
156
157 private void executeSetSessionVariables(final List<Connection> connections, final List<String> setSQLs) throws SQLException {
158 for (Connection each : connections) {
159 try (Statement statement = each.createStatement()) {
160 for (String eachSetSQL : setSQLs) {
161 statement.execute(eachSetSQL);
162 }
163 }
164 }
165 }
166
167 private void releaseConnection(final List<Connection> connections, final SQLException sqlException) {
168 for (Connection each : connections) {
169 try {
170 each.close();
171 } catch (final SQLException ex) {
172 sqlException.setNextException(ex);
173 }
174 }
175 }
176
177 private void replayMethodsInvocation(final Connection target) throws SQLException {
178 for (ConnectionPostProcessor each : connectionPostProcessors) {
179 each.process(target);
180 }
181 }
182
183 private void replayTransactionOption(final Connection connection) throws SQLException {
184 if (null == connection) {
185 return;
186 }
187 if (connectionSession.isReadOnly()) {
188 connection.setReadOnly(true);
189 }
190 if (connectionSession.getIsolationLevel().isPresent()) {
191 connection.setTransactionIsolation(TransactionUtils.getTransactionIsolationLevel(connectionSession.getIsolationLevel().get()));
192 }
193 }
194
195
196
197
198
199
200 public Collection<String> getUsedDataSourceNames() {
201 Collection<String> result = new ArrayList<>(cachedConnections.size());
202 String databaseName = connectionSession.getUsedDatabaseName().toLowerCase();
203 for (String each : cachedConnections.keySet()) {
204 String[] split = each.split("\\.", 2);
205 String cachedDatabaseName = split[0];
206 String cachedDataSourceName = split[1];
207 if (databaseName.equals(cachedDatabaseName)) {
208 result.add(cachedDataSourceName);
209 }
210 }
211 return result;
212 }
213
214
215
216
217
218
219 public int getConnectionSize() {
220 return cachedConnections.values().size();
221 }
222
223
224
225
226
227
228 public void add(final ProxyBackendHandler handler) {
229 backendHandlers.add(handler);
230 }
231
232
233
234
235
236
237 public void markResourceInUse(final ProxyBackendHandler handler) {
238 inUseBackendHandlers.add(handler);
239 }
240
241
242
243
244
245
246 public void unmarkResourceInUse(final ProxyBackendHandler handler) {
247 inUseBackendHandlers.remove(handler);
248 }
249
250
251
252
253 public void handleAutoCommit() {
254 if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) {
255 BackendTransactionManager transactionManager = new BackendTransactionManager(this);
256 transactionManager.begin();
257 }
258 }
259
260
261
262
263
264
265 public void closeExecutionResources() throws BackendConnectionException {
266 synchronized (closeLock) {
267 Collection<Exception> result = new LinkedList<>(closeHandlers(false));
268 if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext()))) {
269 result.addAll(closeHandlers(true));
270 result.addAll(closeConnections(false));
271 } else if (closed.get()) {
272 result.addAll(closeHandlers(true));
273 result.addAll(closeConnections(true));
274 }
275 if (result.isEmpty()) {
276 return;
277 }
278 throw new BackendConnectionException(result);
279 }
280 }
281
282
283
284
285
286
287 public Collection<SQLException> closeAllResources() {
288 synchronized (closeLock) {
289 closed.set(true);
290 Collection<SQLException> result = new LinkedList<>();
291 result.addAll(closeHandlers(true));
292 result.addAll(closeConnections(true));
293 return result;
294 }
295 }
296
297
298
299
300
301
302
303 public Collection<SQLException> closeHandlers(final boolean includeInUse) {
304 Collection<SQLException> result = new LinkedList<>();
305 for (ProxyBackendHandler each : backendHandlers) {
306 if (!includeInUse && inUseBackendHandlers.contains(each)) {
307 continue;
308 }
309 try {
310 each.close();
311 } catch (final SQLException ex) {
312 result.add(ex);
313 }
314 }
315 if (includeInUse) {
316 inUseBackendHandlers.clear();
317 }
318 backendHandlers.retainAll(inUseBackendHandlers);
319 return result;
320 }
321
322
323
324
325
326
327
328 public Collection<SQLException> closeConnections(final boolean forceRollback) {
329 Collection<SQLException> result = new LinkedList<>();
330 synchronized (cachedConnections) {
331 resetSessionVariablesIfNecessary(cachedConnections.values(), result);
332 for (Connection each : cachedConnections.values()) {
333 try {
334 if (forceRollback && connectionSession.getTransactionStatus().isInTransaction()) {
335 each.rollback();
336 }
337 } catch (final SQLException ignored) {
338 } finally {
339 try {
340 each.close();
341 } catch (final SQLException ex) {
342 if (!isClosed(each)) {
343 log.warn("Close connection {} failed.", each, ex);
344 result.add(ex);
345 }
346 }
347 }
348 }
349 cachedConnections.clear();
350 }
351 if (!forceRollback) {
352 connectionPostProcessors.clear();
353 }
354 return result;
355 }
356
357 private boolean isClosed(final Connection connection) {
358 try {
359 if (connection.isClosed()) {
360 return true;
361 }
362 } catch (final SQLException ignored) {
363 }
364 return false;
365 }
366
367 private void resetSessionVariablesIfNecessary(final Collection<Connection> values, final Collection<SQLException> exceptions) {
368 if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || values.isEmpty()) {
369 return;
370 }
371 String databaseType;
372 try {
373 databaseType = values.iterator().next().getMetaData().getDatabaseProductName();
374 } catch (final SQLException ex) {
375 exceptions.add(ex);
376 return;
377 }
378 List<String> resetSQLs = connectionSession.getRequiredSessionVariableRecorder().toResetSQLs(databaseType);
379 for (Connection each : values) {
380 try (Statement statement = each.createStatement()) {
381 for (String eachResetSQL : resetSQLs) {
382 statement.execute(eachResetSQL);
383 }
384 } catch (final SQLException ex) {
385 exceptions.add(ex);
386 }
387 }
388 connectionSession.getRequiredSessionVariableRecorder().removeVariablesWithDefaultValue();
389 }
390 }