1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.datasource.creator;
19
20 import org.apache.shardingsphere.authority.yaml.config.YamlAuthorityRuleConfiguration;
21 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
23 import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
24 import org.apache.shardingsphere.infra.algorithm.core.yaml.YamlAlgorithmConfiguration;
25 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
26 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
27 import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
28 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
29 import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
30 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
31 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
32 import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
33 import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlPersistRepositoryConfiguration;
34 import org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
35 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
36 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
37 import org.apache.shardingsphere.mode.repository.standalone.jdbc.props.JDBCRepositoryPropertyKey;
38 import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
39 import org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
40
41 import javax.sql.DataSource;
42 import java.sql.SQLException;
43 import java.util.Arrays;
44 import java.util.Collection;
45 import java.util.Map;
46 import java.util.Optional;
47 import java.util.Properties;
48 import java.util.concurrent.atomic.AtomicInteger;
49
50
51
52
53 public final class ShardingSpherePipelineDataSourceCreator implements PipelineDataSourceCreator {
54
55 private static final AtomicInteger STANDALONE_DATABASE_ID = new AtomicInteger(1);
56
57 @Override
58 public DataSource create(final Object dataSourceConfig) throws SQLException {
59 YamlRootConfiguration yamlRootConfig = YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), YamlRootConfiguration.class);
60 removeAuthorityRuleConfiguration(yamlRootConfig);
61 updateConfigurationProperties(yamlRootConfig);
62 updateShardingRuleConfiguration(yamlRootConfig);
63 yamlRootConfig.setMode(createStandaloneModeConfiguration());
64 return createShardingSphereDataSource(yamlRootConfig);
65 }
66
67 private void removeAuthorityRuleConfiguration(final YamlRootConfiguration yamlRootConfig) {
68 yamlRootConfig.getRules().removeIf(YamlAuthorityRuleConfiguration.class::isInstance);
69 }
70
71 private void updateConfigurationProperties(final YamlRootConfiguration yamlRootConfig) {
72 Properties newProps = new Properties();
73 for (String each : Arrays.asList(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey(), ConfigurationPropertyKey.SQL_SHOW.getKey())) {
74 Object value = yamlRootConfig.getProps().get(each);
75 if (null != value) {
76 newProps.put(each, value);
77 }
78 }
79 newProps.put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ASSEMBLY_ENABLED.getKey(), String.valueOf(Boolean.FALSE));
80
81 newProps.put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 100000);
82 yamlRootConfig.setProps(newProps);
83 }
84
85 private void updateShardingRuleConfiguration(final YamlRootConfiguration yamlRootConfig) {
86 Optional<YamlShardingRuleConfiguration> yamlShardingRuleConfig = ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(yamlRootConfig.getRules());
87 if (yamlShardingRuleConfig.isPresent()) {
88 enableRangeQueryForInline(yamlShardingRuleConfig.get());
89 removeAuditStrategy(yamlShardingRuleConfig.get());
90 }
91 }
92
93 private void enableRangeQueryForInline(final YamlShardingRuleConfiguration yamlShardingRuleConfig) {
94 for (YamlAlgorithmConfiguration each : yamlShardingRuleConfig.getShardingAlgorithms().values()) {
95 if ("INLINE".equalsIgnoreCase(each.getType())) {
96 each.getProps().put("allow-range-query-with-inline-sharding", Boolean.TRUE.toString());
97 }
98 }
99 }
100
101 private void removeAuditStrategy(final YamlShardingRuleConfiguration yamlShardingRuleConfig) {
102 yamlShardingRuleConfig.setDefaultAuditStrategy(null);
103 yamlShardingRuleConfig.setAuditors(null);
104 if (null != yamlShardingRuleConfig.getTables()) {
105 yamlShardingRuleConfig.getTables().forEach((key, value) -> value.setAuditStrategy(null));
106 }
107 if (null != yamlShardingRuleConfig.getAutoTables()) {
108 yamlShardingRuleConfig.getAutoTables().forEach((key, value) -> value.setAuditStrategy(null));
109 }
110 }
111
112 private YamlModeConfiguration createStandaloneModeConfiguration() {
113 YamlModeConfiguration result = new YamlModeConfiguration();
114 result.setType("Standalone");
115 YamlPersistRepositoryConfiguration yamlRepositoryConfig = new YamlPersistRepositoryConfiguration();
116 yamlRepositoryConfig.setType("JDBC");
117 yamlRepositoryConfig.getProps().setProperty(JDBCRepositoryPropertyKey.JDBC_URL.getKey(),
118 String.format("jdbc:h2:mem:pipeline_db_%d;DB_CLOSE_DELAY=0;DATABASE_TO_UPPER=false;MODE=MYSQL", STANDALONE_DATABASE_ID.getAndIncrement()));
119 result.setRepository(yamlRepositoryConfig);
120 return result;
121 }
122
123 private DataSource createShardingSphereDataSource(final YamlRootConfiguration yamlRootConfig) throws SQLException {
124 Map<String, DataSource> dataSourceMap = new YamlDataSourceConfigurationSwapper().swapToDataSources(yamlRootConfig.getDataSources(), false);
125 try {
126 return createShardingSphereDataSource(dataSourceMap, yamlRootConfig);
127
128 } catch (final SQLException | RuntimeException ex) {
129
130 dataSourceMap.values().stream().map(DataSourcePoolDestroyer::new).forEach(DataSourcePoolDestroyer::asyncDestroy);
131 throw ex;
132 }
133 }
134
135 private DataSource createShardingSphereDataSource(final Map<String, DataSource> dataSourceMap, final YamlRootConfiguration yamlRootConfig) throws SQLException {
136 ModeConfiguration modeConfig = new YamlModeConfigurationSwapper().swapToObject(yamlRootConfig.getMode());
137 Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfig.getRules());
138 return ShardingSphereDataSourceFactory.createDataSource(yamlRootConfig.getDatabaseName(), modeConfig, dataSourceMap, ruleConfigs, yamlRootConfig.getProps());
139 }
140
141 @Override
142 public String getType() {
143 return ShardingSpherePipelineDataSourceConfiguration.TYPE;
144 }
145 }