1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.api.type;
19
20 import com.google.common.base.Preconditions;
21 import lombok.EqualsAndHashCode;
22 import lombok.Getter;
23 import lombok.Setter;
24 import org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
25 import org.apache.shardingsphere.data.pipeline.spi.JdbcQueryPropertiesExtension;
26 import org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
27 import org.apache.shardingsphere.infra.database.core.connector.url.StandardJdbcUrlParser;
28 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
29 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
30 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
31 import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
32 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
33 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
34 import org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
35
36 import java.util.Arrays;
37 import java.util.Collection;
38 import java.util.HashMap;
39 import java.util.LinkedList;
40 import java.util.Map;
41 import java.util.Optional;
42 import java.util.Properties;
43
44
45
46
47 @Getter
48 @EqualsAndHashCode(of = "parameter")
49 public final class ShardingSpherePipelineDataSourceConfiguration implements PipelineDataSourceConfiguration {
50
51 public static final String TYPE = "ShardingSphereJDBC";
52
53 private final String parameter;
54
55 private final YamlRootConfiguration rootConfig;
56
57 private final DatabaseType databaseType;
58
59 public ShardingSpherePipelineDataSourceConfiguration(final String param) {
60 rootConfig = YamlEngine.unmarshal(param, YamlRootConfiguration.class, true);
61
62 for (Map<String, Object> each : rootConfig.getDataSources().values()) {
63 each.remove("dataSourceProperties");
64 }
65 parameter = YamlEngine.marshal(rootConfig);
66 Map<String, Object> props = rootConfig.getDataSources().values().iterator().next();
67 databaseType = DatabaseTypeFactory.get(getJdbcUrl(props));
68 appendJdbcQueryProperties(databaseType);
69 adjustDataSourcePoolProperties(rootConfig.getDataSources());
70 }
71
72 public ShardingSpherePipelineDataSourceConfiguration(final YamlRootConfiguration rootConfig) {
73 this(YamlEngine.marshal(getYamlParameterConfiguration(rootConfig)));
74 }
75
76 private static YamlParameterConfiguration getYamlParameterConfiguration(final YamlRootConfiguration rootConfig) {
77 YamlParameterConfiguration result = new YamlParameterConfiguration();
78 result.setDatabaseName(rootConfig.getDatabaseName());
79 result.setDataSources(rootConfig.getDataSources());
80 result.setRules(rootConfig.getRules());
81 result.getProps().putAll(rootConfig.getProps());
82 return result;
83 }
84
85 private String getJdbcUrl(final Map<String, Object> props) {
86 Object result = props.getOrDefault("url", props.get("jdbcUrl"));
87 Preconditions.checkNotNull(result, "url or jdbcUrl is required.");
88 return result.toString();
89 }
90
91 private void appendJdbcQueryProperties(final DatabaseType databaseType) {
92 Optional<JdbcQueryPropertiesExtension> extension = DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, databaseType);
93 if (!extension.isPresent()) {
94 return;
95 }
96 StandardJdbcUrlParser standardJdbcUrlParser = new StandardJdbcUrlParser();
97 rootConfig.getDataSources().forEach((key, value) -> {
98 String jdbcUrlKey = value.containsKey("url") ? "url" : "jdbcUrl";
99 String jdbcUrl = value.get(jdbcUrlKey).toString();
100 Properties queryProps = standardJdbcUrlParser.parseQueryProperties(jdbcUrl.contains("?") ? jdbcUrl.substring(jdbcUrl.indexOf("?") + 1) : "");
101 extension.get().extendQueryProperties(queryProps);
102 value.replace(jdbcUrlKey, new JdbcUrlAppender().appendQueryProperties(jdbcUrl, queryProps));
103 });
104 }
105
106 private void adjustDataSourcePoolProperties(final Map<String, Map<String, Object>> dataSources) {
107 for (Map<String, Object> queryProps : dataSources.values()) {
108 for (String each : Arrays.asList("minPoolSize", "minimumIdle")) {
109 queryProps.put(each, "1");
110 }
111 }
112 }
113
114 @Override
115 public Object getDataSourceConfiguration() {
116 return rootConfig;
117 }
118
119 @Override
120 public String getType() {
121 return TYPE;
122 }
123
124
125
126
127
128
129
130 public StandardPipelineDataSourceConfiguration getActualDataSourceConfiguration(final String actualDataSourceName) {
131 Map<String, Object> yamlDataSourceConfig = rootConfig.getDataSources().get(actualDataSourceName);
132 Preconditions.checkNotNull(yamlDataSourceConfig, "actualDataSourceName '{}' does not exist", actualDataSourceName);
133 return new StandardPipelineDataSourceConfiguration(yamlDataSourceConfig);
134 }
135
136
137
138
139 @Getter
140 @Setter
141 private static class YamlParameterConfiguration implements YamlConfiguration {
142
143 private String databaseName;
144
145 private Map<String, Map<String, Object>> dataSources = new HashMap<>();
146
147 private Collection<YamlRuleConfiguration> rules = new LinkedList<>();
148
149 private Properties props = new Properties();
150 }
151 }