1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.datasource;
19
20 import lombok.extern.slf4j.Slf4j;
21 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
22
23 import java.sql.SQLException;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentHashMap;
26
27
28
29
30 @Slf4j
31 public final class PipelineDataSourceManager implements AutoCloseable {
32
33 private final Map<PipelineDataSourceConfiguration, PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
34
35
36
37
38
39
40
41 public PipelineDataSourceWrapper getDataSource(final PipelineDataSourceConfiguration dataSourceConfig) {
42 PipelineDataSourceWrapper result = cachedDataSources.get(dataSourceConfig);
43 if (null != result) {
44 return result;
45 }
46 synchronized (cachedDataSources) {
47 result = cachedDataSources.get(dataSourceConfig);
48 if (null != result) {
49 if (!result.isClosed()) {
50 return result;
51 }
52 log.warn("{} is already closed, create again.", result);
53 }
54 result = new PipelineDataSourceWrapper(dataSourceConfig);
55 cachedDataSources.put(dataSourceConfig, result);
56 return result;
57 }
58 }
59
60 @Override
61 public void close() {
62 for (PipelineDataSourceWrapper each : cachedDataSources.values()) {
63 if (each.isClosed()) {
64 continue;
65 }
66 try {
67 each.close();
68 } catch (final SQLException ex) {
69 log.error("An exception occurred while closing the data source.", ex);
70 }
71 }
72 cachedDataSources.clear();
73 }
74 }