1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.datasource;
19
20 import lombok.Getter;
21 import lombok.RequiredArgsConstructor;
22 import lombok.SneakyThrows;
23 import lombok.extern.slf4j.Slf4j;
24 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
25 import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
26 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27 import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
28 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
29
30 import javax.sql.DataSource;
31 import java.io.PrintWriter;
32 import java.sql.Connection;
33 import java.sql.SQLException;
34 import java.sql.SQLFeatureNotSupportedException;
35 import java.util.concurrent.atomic.AtomicBoolean;
36 import java.util.logging.Logger;
37
38
39
40
41 @RequiredArgsConstructor
42 @Slf4j
43 public final class PipelineDataSourceWrapper implements DataSource, AutoCloseable {
44
45 private final DataSource dataSource;
46
47 @Getter
48 private final DatabaseType databaseType;
49
50 private final AtomicBoolean closed = new AtomicBoolean(false);
51
52 @SneakyThrows(SQLException.class)
53 public PipelineDataSourceWrapper(final PipelineDataSourceConfiguration pipelineDataSourceConfig) {
54 dataSource = TypedSPILoader.getService(PipelineDataSourceCreator.class, pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration());
55 databaseType = pipelineDataSourceConfig.getDatabaseType();
56 }
57
58
59
60
61
62
63 public boolean isClosed() {
64 return closed.get();
65 }
66
67 @Override
68 public Connection getConnection() throws SQLException {
69 return dataSource.getConnection();
70 }
71
72 @Override
73 public Connection getConnection(final String username, final String password) throws SQLException {
74 return dataSource.getConnection(username, password);
75 }
76
77 @Override
78 public <T> T unwrap(final Class<T> iface) throws SQLException {
79 return dataSource.unwrap(iface);
80 }
81
82 @Override
83 public boolean isWrapperFor(final Class<?> iface) throws SQLException {
84 return dataSource.isWrapperFor(iface);
85 }
86
87 @Override
88 public PrintWriter getLogWriter() throws SQLException {
89 return dataSource.getLogWriter();
90 }
91
92 @Override
93 public void setLogWriter(final PrintWriter out) throws SQLException {
94 dataSource.setLogWriter(out);
95 }
96
97 @Override
98 public void setLoginTimeout(final int seconds) throws SQLException {
99 dataSource.setLoginTimeout(seconds);
100 }
101
102 @Override
103 public int getLoginTimeout() throws SQLException {
104 return dataSource.getLoginTimeout();
105 }
106
107 @Override
108 public Logger getParentLogger() throws SQLFeatureNotSupportedException {
109 return dataSource.getParentLogger();
110 }
111
112 @Override
113 public void close() throws SQLException {
114 if (closed.get()) {
115 return;
116 }
117 if (!(dataSource instanceof AutoCloseable)) {
118 log.warn("Data source is not closed, it might cause connection leak, data source: {}", dataSource);
119 return;
120 }
121 try {
122 new DataSourcePoolDestroyer(dataSource).asyncDestroy();
123 closed.set(true);
124
125 } catch (final RuntimeException ex) {
126
127 throw new SQLException("Data source close failed.", ex);
128 }
129 }
130 }