View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.datasource;
19  
20  import lombok.Getter;
21  import lombok.RequiredArgsConstructor;
22  import lombok.SneakyThrows;
23  import lombok.extern.slf4j.Slf4j;
24  import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
25  import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
26  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
27  import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
28  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
29  
30  import javax.sql.DataSource;
31  import java.io.PrintWriter;
32  import java.sql.Connection;
33  import java.sql.SQLException;
34  import java.sql.SQLFeatureNotSupportedException;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.logging.Logger;
37  
38  /**
39   * Pipeline data source wrapper.
40   */
41  @RequiredArgsConstructor
42  @Slf4j
43  public final class PipelineDataSourceWrapper implements DataSource, AutoCloseable {
44      
45      private final DataSource dataSource;
46      
47      @Getter
48      private final DatabaseType databaseType;
49      
50      private final AtomicBoolean closed = new AtomicBoolean(false);
51      
52      @SneakyThrows(SQLException.class)
53      public PipelineDataSourceWrapper(final PipelineDataSourceConfiguration pipelineDataSourceConfig) {
54          dataSource = TypedSPILoader.getService(PipelineDataSourceCreator.class, pipelineDataSourceConfig.getType()).create(pipelineDataSourceConfig.getDataSourceConfiguration());
55          databaseType = pipelineDataSourceConfig.getDatabaseType();
56      }
57      
58      /**
59       * Whether underlying data source is closed or not.
60       *
61       * @return true if closed
62       */
63      public boolean isClosed() {
64          return closed.get();
65      }
66      
67      @Override
68      public Connection getConnection() throws SQLException {
69          return dataSource.getConnection();
70      }
71      
72      @Override
73      public Connection getConnection(final String username, final String password) throws SQLException {
74          return dataSource.getConnection(username, password);
75      }
76      
77      @Override
78      public <T> T unwrap(final Class<T> iface) throws SQLException {
79          return dataSource.unwrap(iface);
80      }
81      
82      @Override
83      public boolean isWrapperFor(final Class<?> iface) throws SQLException {
84          return dataSource.isWrapperFor(iface);
85      }
86      
87      @Override
88      public PrintWriter getLogWriter() throws SQLException {
89          return dataSource.getLogWriter();
90      }
91      
92      @Override
93      public void setLogWriter(final PrintWriter out) throws SQLException {
94          dataSource.setLogWriter(out);
95      }
96      
97      @Override
98      public void setLoginTimeout(final int seconds) throws SQLException {
99          dataSource.setLoginTimeout(seconds);
100     }
101     
102     @Override
103     public int getLoginTimeout() throws SQLException {
104         return dataSource.getLoginTimeout();
105     }
106     
107     @Override
108     public Logger getParentLogger() throws SQLFeatureNotSupportedException {
109         return dataSource.getParentLogger();
110     }
111     
112     @Override
113     public void close() throws SQLException {
114         if (closed.get()) {
115             return;
116         }
117         if (!(dataSource instanceof AutoCloseable)) {
118             log.warn("Data source is not closed, it might cause connection leak, data source: {}", dataSource);
119             return;
120         }
121         try {
122             new DataSourcePoolDestroyer(dataSource).asyncDestroy();
123             closed.set(true);
124             // CHECKSTYLE:OFF
125         } catch (final RuntimeException ex) {
126             // CHECKSTYLE:ON
127             throw new SQLException("Data source close failed.", ex);
128         }
129     }
130 }