1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.datasource.creator;
19
20 import org.apache.shardingsphere.authority.yaml.config.YamlAuthorityRuleConfiguration;
21 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
23 import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.PipelineYamlRuleConfigurationReviser;
24 import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
25 import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
26 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
27 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
28 import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
29 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
30 import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
31 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
32 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
33 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
34 import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
35 import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlPersistRepositoryConfiguration;
36 import org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
37 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
38 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
39 import org.apache.shardingsphere.mode.manager.ContextManager;
40
41 import javax.sql.DataSource;
42 import java.sql.SQLException;
43 import java.util.Collection;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.Map;
47 import java.util.Properties;
48
49
50
51
52 public final class ShardingSpherePipelineDataSourceCreator implements PipelineDataSourceCreator {
53
54 @Override
55 public DataSource create(final Object dataSourceConfig) throws SQLException {
56 YamlRootConfiguration yamlRootConfig = YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), YamlRootConfiguration.class);
57 removeAuthorityRuleConfiguration(yamlRootConfig);
58 yamlRootConfig.setProps(createConfigurationProperties());
59 reviseYamlRuleConfiguration(yamlRootConfig);
60 yamlRootConfig.setMode(createStandaloneModeConfiguration());
61 return createShardingSphereDataSource(yamlRootConfig);
62 }
63
64 private void removeAuthorityRuleConfiguration(final YamlRootConfiguration yamlRootConfig) {
65 yamlRootConfig.getRules().removeIf(YamlAuthorityRuleConfiguration.class::isInstance);
66 }
67
68 private Properties createConfigurationProperties() {
69 Properties realtimeProps = getRealtimeProperties();
70 Properties result = new Properties();
71 for (String each : getConfigurationPropertyKeys()) {
72 Object value = realtimeProps.get(each);
73 if (null != value) {
74 result.put(each, value);
75 }
76 }
77 result.put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ASSEMBLY_ENABLED.getKey(), String.valueOf(Boolean.FALSE));
78
79 result.put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 100000);
80 return result;
81 }
82
83 private Properties getRealtimeProperties() {
84 ContextManager contextManager = PipelineContextManager.getProxyContext();
85 if (null == contextManager) {
86 return new Properties();
87 }
88 return contextManager.getMetaDataContexts().getMetaData().getProps().getProps();
89 }
90
91 private List<String> getConfigurationPropertyKeys() {
92 List<String> result = new LinkedList<>();
93 result.add(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey());
94 result.add(ConfigurationPropertyKey.SQL_SHOW.getKey());
95 return result;
96 }
97
98 @SuppressWarnings("unchecked")
99 private void reviseYamlRuleConfiguration(final YamlRootConfiguration yamlRootConfig) {
100 OrderedSPILoader.getServices(PipelineYamlRuleConfigurationReviser.class, yamlRootConfig.getRules()).forEach((key, value) -> value.revise(key));
101 }
102
103 private YamlModeConfiguration createStandaloneModeConfiguration() {
104 YamlModeConfiguration result = new YamlModeConfiguration();
105 result.setType("Standalone");
106 YamlPersistRepositoryConfiguration yamlRepositoryConfig = new YamlPersistRepositoryConfiguration();
107 yamlRepositoryConfig.setType("Memory");
108 result.setRepository(yamlRepositoryConfig);
109 return result;
110 }
111
112 private DataSource createShardingSphereDataSource(final YamlRootConfiguration yamlRootConfig) throws SQLException {
113 Map<String, DataSource> dataSourceMap = new YamlDataSourceConfigurationSwapper().swapToDataSources(yamlRootConfig.getDataSources(), false);
114 try {
115 return createShardingSphereDataSource(dataSourceMap, yamlRootConfig);
116
117 } catch (final SQLException | RuntimeException ex) {
118
119 dataSourceMap.values().stream().map(DataSourcePoolDestroyer::new).forEach(DataSourcePoolDestroyer::asyncDestroy);
120 throw ex;
121 }
122 }
123
124 private DataSource createShardingSphereDataSource(final Map<String, DataSource> dataSourceMap, final YamlRootConfiguration yamlRootConfig) throws SQLException {
125 ModeConfiguration modeConfig = new YamlModeConfigurationSwapper().swapToObject(yamlRootConfig.getMode());
126 Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfig.getRules());
127 return ShardingSphereDataSourceFactory.createDataSource(yamlRootConfig.getDatabaseName(), modeConfig, dataSourceMap, ruleConfigs, yamlRootConfig.getProps());
128 }
129
130 @Override
131 public String getType() {
132 return ShardingSpherePipelineDataSourceConfiguration.TYPE;
133 }
134 }