1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.driver.jdbc.core.connection;
19
20 import lombok.Getter;
21 import org.apache.shardingsphere.driver.exception.ConnectionClosedException;
22 import org.apache.shardingsphere.driver.jdbc.adapter.AbstractConnectionAdapter;
23 import org.apache.shardingsphere.driver.jdbc.adapter.executor.ForceExecuteTemplate;
24 import org.apache.shardingsphere.driver.jdbc.core.datasource.metadata.ShardingSphereDatabaseMetaData;
25 import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement;
26 import org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSphereStatement;
27 import org.apache.shardingsphere.driver.jdbc.core.statement.StatementManager;
28 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
29 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
30 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
31 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
32 import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
33 import org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
34 import org.apache.shardingsphere.mode.manager.ContextManager;
35 import org.apache.shardingsphere.transaction.ConnectionTransaction.DistributedTransactionOperationType;
36 import org.apache.shardingsphere.transaction.rule.TransactionRule;
37
38 import java.sql.DatabaseMetaData;
39 import java.sql.PreparedStatement;
40 import java.sql.SQLException;
41 import java.sql.SQLFeatureNotSupportedException;
42 import java.sql.Savepoint;
43 import java.sql.Statement;
44 import java.util.Collection;
45 import java.util.Optional;
46 import java.util.concurrent.ConcurrentLinkedQueue;
47 import java.util.concurrent.Executor;
48
49
50
51
52 @HighFrequencyInvocation
53 public final class ShardingSphereConnection extends AbstractConnectionAdapter {
54
55 private final ProcessEngine processEngine = new ProcessEngine();
56
57 private final ForceExecuteTemplate<StatementManager> forceExecuteTemplate = new ForceExecuteTemplate<>();
58
59 @Getter
60 private final String currentDatabaseName;
61
62 @Getter
63 private final ContextManager contextManager;
64
65 @Getter
66 private final DriverDatabaseConnectionManager databaseConnectionManager;
67
68 @Getter
69 private final Collection<StatementManager> statementManagers = new ConcurrentLinkedQueue<>();
70
71 @Getter
72 private final String processId;
73
74 private boolean autoCommit = true;
75
76 private int transactionIsolation = TRANSACTION_READ_UNCOMMITTED;
77
78 private boolean readOnly;
79
80 private volatile boolean closed;
81
82 public ShardingSphereConnection(final String currentDatabaseName, final ContextManager contextManager) {
83 this.currentDatabaseName = currentDatabaseName;
84 this.contextManager = contextManager;
85 databaseConnectionManager = new DriverDatabaseConnectionManager(currentDatabaseName, contextManager);
86 processId = processEngine.connect(currentDatabaseName);
87 }
88
89
90
91
92
93
94 public void beginTransactionIfNeededWhenAutoCommitFalse() throws SQLException {
95 if (autoCommit || databaseConnectionManager.getConnectionContext().getTransactionContext().isInTransaction()) {
96 return;
97 }
98 databaseConnectionManager.begin();
99 }
100
101 @Override
102 public DatabaseMetaData getMetaData() throws SQLException {
103 return new ShardingSphereDatabaseMetaData(this);
104 }
105
106 @Override
107 public PreparedStatement prepareStatement(final String sql) throws SQLException {
108 return new ShardingSpherePreparedStatement(this, sql);
109 }
110
111 @Override
112 public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException {
113 return new ShardingSpherePreparedStatement(this, sql, resultSetType, resultSetConcurrency);
114 }
115
116 @Override
117 public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
118 return new ShardingSpherePreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);
119 }
120
121 @Override
122 public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {
123 return new ShardingSpherePreparedStatement(this, sql, autoGeneratedKeys);
124 }
125
126 @Override
127 public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException {
128 return new ShardingSpherePreparedStatement(this, sql, Statement.RETURN_GENERATED_KEYS);
129 }
130
131 @Override
132 public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException {
133 return new ShardingSpherePreparedStatement(this, sql, columnNames);
134 }
135
136 @Override
137 public Statement createStatement() {
138 return new ShardingSphereStatement(this);
139 }
140
141 @Override
142 public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {
143 return new ShardingSphereStatement(this, resultSetType, resultSetConcurrency);
144 }
145
146 @Override
147 public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
148 return new ShardingSphereStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);
149 }
150
151 @Override
152 public boolean getAutoCommit() {
153 return autoCommit;
154 }
155
156 @Override
157 public void setAutoCommit(final boolean autoCommit) throws SQLException {
158 this.autoCommit = autoCommit;
159 if (databaseConnectionManager.getConnectionTransaction().isLocalTransaction()) {
160 processLocalTransaction();
161 } else {
162 processDistributedTransaction();
163 }
164 }
165
166 private void processLocalTransaction() throws SQLException {
167 databaseConnectionManager.setAutoCommit(autoCommit);
168 TransactionConnectionContext transactionContext = databaseConnectionManager.getConnectionContext().getTransactionContext();
169 if (autoCommit && transactionContext.isInTransaction()) {
170 transactionContext.close();
171 return;
172 }
173 if (!autoCommit && !transactionContext.isInTransaction()) {
174 transactionContext.beginTransaction(contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(TransactionRule.class).getDefaultType().name(),
175 databaseConnectionManager.getConnectionTransaction().getDistributedTransactionManager());
176 }
177 }
178
179 private void processDistributedTransaction() throws SQLException {
180 Optional<DistributedTransactionOperationType> operationType = databaseConnectionManager.getConnectionTransaction().getDistributedTransactionOperationType(autoCommit);
181 if (!operationType.isPresent()) {
182 return;
183 }
184 if (DistributedTransactionOperationType.BEGIN == operationType.get()) {
185 databaseConnectionManager.begin();
186 } else {
187 databaseConnectionManager.commit();
188 }
189 }
190
191 @Override
192 public void commit() throws SQLException {
193 databaseConnectionManager.commit();
194 }
195
196 @Override
197 public void rollback() throws SQLException {
198 databaseConnectionManager.rollback();
199 }
200
201 @Override
202 public void rollback(final Savepoint savepoint) throws SQLException {
203 checkClose();
204 ShardingSpherePreconditions.checkState(databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit) || !isSchemaSupportedDatabaseType(),
205 () -> new SQLFeatureNotSupportedException("ROLLBACK TO SAVEPOINT can only be used in transaction blocks"));
206 databaseConnectionManager.rollback(savepoint);
207 }
208
209 @Override
210 public Savepoint setSavepoint(final String name) throws SQLException {
211 checkClose();
212 ShardingSpherePreconditions.checkState(databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit) || !isSchemaSupportedDatabaseType(),
213 () -> new SQLFeatureNotSupportedException("Savepoint can only be used in transaction blocks"));
214 return databaseConnectionManager.setSavepoint(name);
215 }
216
217 @Override
218 public Savepoint setSavepoint() throws SQLException {
219 checkClose();
220 ShardingSpherePreconditions.checkState(databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit) || !isSchemaSupportedDatabaseType(),
221 () -> new SQLFeatureNotSupportedException("Savepoint can only be used in transaction blocks"));
222 return databaseConnectionManager.setSavepoint();
223 }
224
225 @Override
226 public void releaseSavepoint(final Savepoint savepoint) throws SQLException {
227 checkClose();
228 ShardingSpherePreconditions.checkState(databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit) || !isSchemaSupportedDatabaseType(),
229 () -> new SQLFeatureNotSupportedException("RELEASE SAVEPOINT can only be used in transaction blocks"));
230 if (databaseConnectionManager.getConnectionTransaction().isHoldTransaction(autoCommit)) {
231 databaseConnectionManager.releaseSavepoint(savepoint);
232 }
233 }
234
235 private void checkClose() throws SQLException {
236 ShardingSpherePreconditions.checkState(!isClosed(), () -> new ConnectionClosedException().toSQLException());
237 }
238
239 private boolean isSchemaSupportedDatabaseType() {
240 DatabaseType databaseType = contextManager.getMetaDataContexts().getMetaData().getDatabase(currentDatabaseName).getProtocolType();
241 return new DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getSchemaOption().getDefaultSchema().isPresent();
242 }
243
244 @SuppressWarnings("MagicConstant")
245 @Override
246 public int getTransactionIsolation() throws SQLException {
247 return databaseConnectionManager.getTransactionIsolation().orElseGet(() -> transactionIsolation);
248 }
249
250 @Override
251 public void setTransactionIsolation(final int level) throws SQLException {
252 transactionIsolation = level;
253 databaseConnectionManager.setTransactionIsolation(level);
254 }
255
256 @Override
257 public boolean isReadOnly() {
258 return readOnly;
259 }
260
261 @Override
262 public void setReadOnly(final boolean readOnly) throws SQLException {
263 this.readOnly = readOnly;
264 databaseConnectionManager.setReadOnly(readOnly);
265 }
266
267
268
269
270
271 @Override
272 public int getNetworkTimeout() {
273 return 0;
274 }
275
276
277
278
279
280 @Override
281 public void setNetworkTimeout(final Executor executor, final int milliseconds) throws SQLException {
282 ShardingSpherePreconditions.checkState(0 <= milliseconds, () -> new SQLException("Network timeout must be a value greater than or equal to 0."));
283 }
284
285 @Override
286 public boolean isValid(final int timeout) throws SQLException {
287 return databaseConnectionManager.isValid(timeout);
288 }
289
290 @Override
291 public String getSchema() {
292 return currentDatabaseName;
293 }
294
295 @Override
296 public boolean isClosed() {
297 return closed;
298 }
299
300 @Override
301 public void close() throws SQLException {
302 if (databaseConnectionManager.getConnectionTransaction().isInDistributedTransaction(databaseConnectionManager.getConnectionContext().getTransactionContext())) {
303 databaseConnectionManager.getConnectionTransaction().rollback();
304 }
305 closed = true;
306 processEngine.disconnect(processId);
307 try {
308 forceExecuteTemplate.execute(statementManagers, StatementManager::close);
309 } finally {
310 statementManagers.clear();
311 databaseConnectionManager.close();
312 }
313 }
314 }