1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.proxy.backend.connector;
19
20 import com.google.common.base.Preconditions;
21 import com.google.common.collect.LinkedHashMultimap;
22 import com.google.common.collect.Multimap;
23 import lombok.Getter;
24 import lombok.RequiredArgsConstructor;
25 import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
26 import org.apache.shardingsphere.infra.executor.sql.prepare.driver.OnlineDatabaseConnectionManager;
27 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
28 import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ConnectionPostProcessor;
29 import org.apache.shardingsphere.proxy.backend.connector.jdbc.connection.ResourceLock;
30 import org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction.BackendTransactionManager;
31 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
32 import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
33 import org.apache.shardingsphere.proxy.backend.handler.ProxyBackendHandler;
34 import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
35 import org.apache.shardingsphere.proxy.backend.util.TransactionUtils;
36 import org.apache.shardingsphere.transaction.spi.TransactionHook;
37
38 import java.sql.Connection;
39 import java.sql.SQLException;
40 import java.sql.Statement;
41 import java.util.ArrayList;
42 import java.util.Collection;
43 import java.util.Collections;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.concurrent.ConcurrentHashMap;
47 import java.util.concurrent.atomic.AtomicBoolean;
48
49
50
51
52 @RequiredArgsConstructor
53 @Getter
54 public final class ProxyDatabaseConnectionManager implements OnlineDatabaseConnectionManager<Connection> {
55
56 private final ConnectionSession connectionSession;
57
58 private final Multimap<String, Connection> cachedConnections = LinkedHashMultimap.create();
59
60 private final Collection<ProxyBackendHandler> backendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
61
62 private final Collection<ProxyBackendHandler> inUseBackendHandlers = Collections.newSetFromMap(new ConcurrentHashMap<>(64));
63
64 private final Collection<ConnectionPostProcessor> connectionPostProcessors = new LinkedList<>();
65
66 private final ResourceLock resourceLock = new ResourceLock();
67
68 private final AtomicBoolean closed = new AtomicBoolean(false);
69
70 private final Collection<TransactionHook> transactionHooks = ShardingSphereServiceLoader.getServiceInstances(TransactionHook.class);
71
72 @Override
73 public List<Connection> getConnections(final String dataSourceName, final int connectionOffset, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
74 Preconditions.checkNotNull(connectionSession.getDatabaseName(), "Current database name is null.");
75 Collection<Connection> connections;
76 synchronized (cachedConnections) {
77 connections = cachedConnections.get(connectionSession.getDatabaseName().toLowerCase() + "." + dataSourceName);
78 }
79 List<Connection> result;
80 int maxConnectionSize = connectionOffset + connectionSize;
81 if (connections.size() >= maxConnectionSize) {
82 result = new ArrayList<>(connections).subList(connectionOffset, maxConnectionSize);
83 } else if (connections.isEmpty()) {
84 Collection<Connection> newConnections = createNewConnections(dataSourceName, maxConnectionSize, connectionMode);
85 result = new ArrayList<>(newConnections).subList(connectionOffset, maxConnectionSize);
86 synchronized (cachedConnections) {
87 cachedConnections.putAll(connectionSession.getDatabaseName().toLowerCase() + "." + dataSourceName, newConnections);
88 }
89 executeTransactionHooksAfterCreateConnections(result);
90 } else {
91 List<Connection> allConnections = new ArrayList<>(maxConnectionSize);
92 allConnections.addAll(connections);
93 List<Connection> newConnections = createNewConnections(dataSourceName, maxConnectionSize - connections.size(), connectionMode);
94 allConnections.addAll(newConnections);
95 result = allConnections.subList(connectionOffset, maxConnectionSize);
96 synchronized (cachedConnections) {
97 cachedConnections.putAll(connectionSession.getDatabaseName().toLowerCase() + "." + dataSourceName, newConnections);
98 }
99 }
100 return result;
101 }
102
103 private void executeTransactionHooksAfterCreateConnections(final List<Connection> result) throws SQLException {
104 if (connectionSession.getTransactionStatus().isInTransaction()) {
105 for (TransactionHook each : transactionHooks) {
106 each.afterCreateConnections(result, connectionSession.getConnectionContext().getTransactionContext());
107 }
108 }
109 }
110
111 private List<Connection> createNewConnections(final String dataSourceName, final int connectionSize, final ConnectionMode connectionMode) throws SQLException {
112 List<Connection> result = ProxyContext.getInstance().getBackendDataSource().getConnections(connectionSession.getDatabaseName().toLowerCase(), dataSourceName, connectionSize, connectionMode);
113 setSessionVariablesIfNecessary(result);
114 for (Connection each : result) {
115 replayTransactionOption(each);
116 }
117 if (connectionSession.getTransactionStatus().isInTransaction()) {
118 for (Connection each : result) {
119 replayMethodsInvocation(each);
120 }
121 }
122 return result;
123 }
124
125 private void setSessionVariablesIfNecessary(final List<Connection> connections) throws SQLException {
126 if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || connections.isEmpty()) {
127 return;
128 }
129 String databaseType = connections.iterator().next().getMetaData().getDatabaseProductName();
130 List<String> setSQLs = connectionSession.getRequiredSessionVariableRecorder().toSetSQLs(databaseType);
131 try {
132 executeSetSessionVariables(connections, setSQLs);
133 } catch (final SQLException ex) {
134 releaseConnection(connections, ex);
135 throw ex;
136 }
137 }
138
139 private void executeSetSessionVariables(final List<Connection> connections, final List<String> setSQLs) throws SQLException {
140 for (Connection each : connections) {
141 try (Statement statement = each.createStatement()) {
142 for (String eachSetSQL : setSQLs) {
143 statement.execute(eachSetSQL);
144 }
145 }
146 }
147 }
148
149 private void releaseConnection(final List<Connection> connections, final SQLException sqlException) {
150 for (Connection each : connections) {
151 try {
152 each.close();
153 } catch (final SQLException ex) {
154 sqlException.setNextException(ex);
155 }
156 }
157 }
158
159 private void replayMethodsInvocation(final Connection target) throws SQLException {
160 for (ConnectionPostProcessor each : connectionPostProcessors) {
161 each.process(target);
162 }
163 }
164
165 private void replayTransactionOption(final Connection connection) throws SQLException {
166 if (null == connection) {
167 return;
168 }
169 if (connectionSession.isReadOnly()) {
170 connection.setReadOnly(true);
171 }
172 if (null != connectionSession.getIsolationLevel()) {
173 connection.setTransactionIsolation(TransactionUtils.getTransactionIsolationLevel(connectionSession.getIsolationLevel()));
174 }
175 }
176
177
178
179
180
181
182 public Collection<String> getUsedDataSourceNames() {
183 Collection<String> result = new ArrayList<>(cachedConnections.size());
184 String databaseName = connectionSession.getDatabaseName().toLowerCase();
185 for (String each : cachedConnections.keySet()) {
186 String[] split = each.split("\\.", 2);
187 String cachedDatabaseName = split[0];
188 String cachedDataSourceName = split[1];
189 if (databaseName.equals(cachedDatabaseName)) {
190 result.add(cachedDataSourceName);
191 }
192 }
193 return result;
194 }
195
196
197
198
199
200
201 public int getConnectionSize() {
202 return cachedConnections.values().size();
203 }
204
205
206
207
208
209
210 public void add(final ProxyBackendHandler handler) {
211 backendHandlers.add(handler);
212 }
213
214
215
216
217
218
219 public void markResourceInUse(final ProxyBackendHandler handler) {
220 inUseBackendHandlers.add(handler);
221 }
222
223
224
225
226
227
228 public void unmarkResourceInUse(final ProxyBackendHandler handler) {
229 inUseBackendHandlers.remove(handler);
230 }
231
232
233
234
235 public void handleAutoCommit() {
236 if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) {
237 BackendTransactionManager transactionManager = new BackendTransactionManager(this);
238 transactionManager.begin();
239 }
240 }
241
242
243
244
245
246
247 public void closeExecutionResources() throws BackendConnectionException {
248 synchronized (this) {
249 Collection<Exception> result = new LinkedList<>(closeHandlers(false));
250 if (!connectionSession.getTransactionStatus().isInConnectionHeldTransaction(TransactionUtils.getTransactionType(connectionSession.getConnectionContext().getTransactionContext()))) {
251 result.addAll(closeHandlers(true));
252 result.addAll(closeConnections(false));
253 } else if (closed.get()) {
254 result.addAll(closeHandlers(true));
255 result.addAll(closeConnections(true));
256 }
257 if (result.isEmpty()) {
258 return;
259 }
260 throw new BackendConnectionException(result);
261 }
262 }
263
264
265
266
267 public void closeAllResources() {
268 synchronized (this) {
269 closed.set(true);
270 closeHandlers(true);
271 closeConnections(true);
272 }
273 }
274
275
276
277
278
279
280
281 public Collection<SQLException> closeHandlers(final boolean includeInUse) {
282 Collection<SQLException> result = new LinkedList<>();
283 for (ProxyBackendHandler each : backendHandlers) {
284 if (!includeInUse && inUseBackendHandlers.contains(each)) {
285 continue;
286 }
287 try {
288 each.close();
289 } catch (final SQLException ex) {
290 result.add(ex);
291 }
292 }
293 if (includeInUse) {
294 inUseBackendHandlers.clear();
295 }
296 backendHandlers.retainAll(inUseBackendHandlers);
297 return result;
298 }
299
300
301
302
303
304
305
306 public Collection<SQLException> closeConnections(final boolean forceRollback) {
307 Collection<SQLException> result = new LinkedList<>();
308 synchronized (cachedConnections) {
309 resetSessionVariablesIfNecessary(cachedConnections.values(), result);
310 for (Connection each : cachedConnections.values()) {
311 try {
312 if (forceRollback && connectionSession.getTransactionStatus().isInTransaction()) {
313 each.rollback();
314 }
315 each.close();
316 } catch (final SQLException ex) {
317 result.add(ex);
318 }
319 }
320 cachedConnections.clear();
321 }
322 if (!forceRollback) {
323 connectionPostProcessors.clear();
324 }
325 return result;
326 }
327
328 private void resetSessionVariablesIfNecessary(final Collection<Connection> values, final Collection<SQLException> exceptions) {
329 if (connectionSession.getRequiredSessionVariableRecorder().isEmpty() || values.isEmpty()) {
330 return;
331 }
332 String databaseType;
333 try {
334 databaseType = values.iterator().next().getMetaData().getDatabaseProductName();
335 } catch (final SQLException ex) {
336 exceptions.add(ex);
337 return;
338 }
339 List<String> resetSQLs = connectionSession.getRequiredSessionVariableRecorder().toResetSQLs(databaseType);
340 for (Connection each : values) {
341 try (Statement statement = each.createStatement()) {
342 for (String eachResetSQL : resetSQLs) {
343 statement.execute(eachResetSQL);
344 }
345 } catch (final SQLException ex) {
346 exceptions.add(ex);
347 }
348 }
349 connectionSession.getRequiredSessionVariableRecorder().removeVariablesWithDefaultValue();
350 }
351 }