1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.core.datasource.creator;
19
20 import org.apache.shardingsphere.authority.yaml.config.YamlAuthorityRuleConfiguration;
21 import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22 import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
23 import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
24 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
25 import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
26 import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
27 import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
28 import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
29 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
30 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
31 import org.apache.shardingsphere.infra.algorithm.core.yaml.YamlAlgorithmConfiguration;
32 import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
33 import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlPersistRepositoryConfiguration;
34 import org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
35 import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
36 import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
37 import org.apache.shardingsphere.mode.repository.standalone.jdbc.props.JDBCRepositoryPropertyKey;
38 import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
39 import org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
40 import org.apache.shardingsphere.single.api.constant.SingleTableConstants;
41 import org.apache.shardingsphere.single.yaml.config.pojo.YamlSingleRuleConfiguration;
42
43 import javax.sql.DataSource;
44 import java.sql.SQLException;
45 import java.util.Collection;
46 import java.util.Collections;
47 import java.util.Map;
48 import java.util.Optional;
49 import java.util.Properties;
50 import java.util.concurrent.atomic.AtomicInteger;
51
52
53
54
55 public final class ShardingSpherePipelineDataSourceCreator implements PipelineDataSourceCreator {
56
57 private static final AtomicInteger STANDALONE_DATABASE_ID = new AtomicInteger(1);
58
59 @Override
60 public DataSource create(final Object dataSourceConfig) throws SQLException {
61 YamlRootConfiguration rootConfig = YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), YamlRootConfiguration.class);
62 removeAuthorityRule(rootConfig);
63 updateSingleRuleConfiguration(rootConfig);
64 disableSystemSchemaMetadata(rootConfig);
65 enableStreamingQuery(rootConfig);
66 Optional<YamlShardingRuleConfiguration> yamlShardingRuleConfig = ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
67 if (yamlShardingRuleConfig.isPresent()) {
68 enableRangeQueryForInline(yamlShardingRuleConfig.get());
69 removeAuditStrategy(yamlShardingRuleConfig.get());
70 }
71 rootConfig.setMode(createStandaloneModeConfiguration());
72 return createDataSourceWithoutCache(rootConfig);
73 }
74
75 private void removeAuthorityRule(final YamlRootConfiguration rootConfig) {
76 rootConfig.getRules().stream().filter(YamlAuthorityRuleConfiguration.class::isInstance).findFirst().map(each -> rootConfig.getRules().remove(each));
77 }
78
79 private void updateSingleRuleConfiguration(final YamlRootConfiguration rootConfig) {
80 rootConfig.getRules().removeIf(YamlSingleRuleConfiguration.class::isInstance);
81 YamlSingleRuleConfiguration singleRuleConfig = new YamlSingleRuleConfiguration();
82 singleRuleConfig.setTables(Collections.singletonList(SingleTableConstants.ALL_TABLES));
83 rootConfig.getRules().add(singleRuleConfig);
84 }
85
86 private void disableSystemSchemaMetadata(final YamlRootConfiguration rootConfig) {
87 rootConfig.getProps().put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ENABLED.getKey(), String.valueOf(Boolean.FALSE));
88 }
89
90
91 private void enableStreamingQuery(final YamlRootConfiguration rootConfig) {
92
93 rootConfig.getProps().put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 100000);
94 }
95
96 private void enableRangeQueryForInline(final YamlShardingRuleConfiguration yamlShardingRuleConfig) {
97 for (YamlAlgorithmConfiguration each : yamlShardingRuleConfig.getShardingAlgorithms().values()) {
98 if ("INLINE".equalsIgnoreCase(each.getType())) {
99 each.getProps().put("allow-range-query-with-inline-sharding", Boolean.TRUE.toString());
100 }
101 }
102 }
103
104 private void removeAuditStrategy(final YamlShardingRuleConfiguration yamlShardingRuleConfig) {
105 yamlShardingRuleConfig.setDefaultAuditStrategy(null);
106 yamlShardingRuleConfig.setAuditors(null);
107 if (null != yamlShardingRuleConfig.getTables()) {
108 yamlShardingRuleConfig.getTables().forEach((key, value) -> value.setAuditStrategy(null));
109 }
110 if (null != yamlShardingRuleConfig.getAutoTables()) {
111 yamlShardingRuleConfig.getAutoTables().forEach((key, value) -> value.setAuditStrategy(null));
112 }
113 }
114
115 private YamlModeConfiguration createStandaloneModeConfiguration() {
116 YamlModeConfiguration result = new YamlModeConfiguration();
117 result.setType("Standalone");
118 YamlPersistRepositoryConfiguration repository = new YamlPersistRepositoryConfiguration();
119 result.setRepository(repository);
120 repository.setType("JDBC");
121 Properties props = new Properties();
122 repository.setProps(props);
123 props.setProperty(JDBCRepositoryPropertyKey.JDBC_URL.getKey(),
124 String.format("jdbc:h2:mem:pipeline_db_%d;DB_CLOSE_DELAY=0;DATABASE_TO_UPPER=false;MODE=MYSQL", STANDALONE_DATABASE_ID.getAndIncrement()));
125 return result;
126 }
127
128 private DataSource createDataSourceWithoutCache(final YamlRootConfiguration rootConfig) throws SQLException {
129 Map<String, DataSource> dataSourceMap = new YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources(), false);
130 try {
131 return createDataSource(dataSourceMap, rootConfig);
132
133 } catch (final SQLException | RuntimeException ex) {
134
135 dataSourceMap.values().stream().map(DataSourcePoolDestroyer::new).forEach(DataSourcePoolDestroyer::asyncDestroy);
136 throw ex;
137 }
138 }
139
140 private DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final YamlRootConfiguration rootConfig) throws SQLException {
141 ModeConfiguration modeConfig = null == rootConfig.getMode() ? null : new YamlModeConfigurationSwapper().swapToObject(rootConfig.getMode());
142 Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(rootConfig.getRules());
143 return ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(), modeConfig, dataSourceMap, ruleConfigs, rootConfig.getProps());
144 }
145
146 @Override
147 public String getType() {
148 return ShardingSpherePipelineDataSourceConfiguration.TYPE;
149 }
150 }