View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.datasource.creator;
19  
20  import org.apache.shardingsphere.authority.yaml.config.YamlAuthorityRuleConfiguration;
21  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
23  import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
24  import org.apache.shardingsphere.infra.algorithm.core.yaml.YamlAlgorithmConfiguration;
25  import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
26  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
27  import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
28  import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
29  import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
30  import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
31  import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
32  import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
33  import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlPersistRepositoryConfiguration;
34  import org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
35  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
36  import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
37  import org.apache.shardingsphere.mode.repository.standalone.jdbc.props.JDBCRepositoryPropertyKey;
38  import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
39  import org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
40  
41  import javax.sql.DataSource;
42  import java.sql.SQLException;
43  import java.util.Arrays;
44  import java.util.Collection;
45  import java.util.Map;
46  import java.util.Optional;
47  import java.util.Properties;
48  import java.util.concurrent.atomic.AtomicInteger;
49  
50  /**
51   * ShardingSphere pipeline data source creator.
52   */
53  public final class ShardingSpherePipelineDataSourceCreator implements PipelineDataSourceCreator {
54      
55      private static final AtomicInteger STANDALONE_DATABASE_ID = new AtomicInteger(1);
56      
57      @Override
58      public DataSource create(final Object dataSourceConfig) throws SQLException {
59          YamlRootConfiguration yamlRootConfig = YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), YamlRootConfiguration.class);
60          removeAuthorityRuleConfiguration(yamlRootConfig);
61          updateConfigurationProperties(yamlRootConfig);
62          updateShardingRuleConfiguration(yamlRootConfig);
63          yamlRootConfig.setMode(createStandaloneModeConfiguration());
64          return createShardingSphereDataSource(yamlRootConfig);
65      }
66      
67      private void removeAuthorityRuleConfiguration(final YamlRootConfiguration yamlRootConfig) {
68          yamlRootConfig.getRules().removeIf(YamlAuthorityRuleConfiguration.class::isInstance);
69      }
70      
71      private void updateConfigurationProperties(final YamlRootConfiguration yamlRootConfig) {
72          Properties newProps = new Properties();
73          for (String each : Arrays.asList(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey(), ConfigurationPropertyKey.SQL_SHOW.getKey())) {
74              Object value = yamlRootConfig.getProps().get(each);
75              if (null != value) {
76                  newProps.put(each, value);
77              }
78          }
79          newProps.put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ASSEMBLY_ENABLED.getKey(), String.valueOf(Boolean.FALSE));
80          // Set a large enough value to enable ConnectionMode.MEMORY_STRICTLY, make sure streaming query work.
81          newProps.put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 100000);
82          yamlRootConfig.setProps(newProps);
83      }
84      
85      private void updateShardingRuleConfiguration(final YamlRootConfiguration yamlRootConfig) {
86          Optional<YamlShardingRuleConfiguration> yamlShardingRuleConfig = ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(yamlRootConfig.getRules());
87          if (yamlShardingRuleConfig.isPresent()) {
88              enableRangeQueryForInline(yamlShardingRuleConfig.get());
89              removeAuditStrategy(yamlShardingRuleConfig.get());
90          }
91      }
92      
93      private void enableRangeQueryForInline(final YamlShardingRuleConfiguration yamlShardingRuleConfig) {
94          for (YamlAlgorithmConfiguration each : yamlShardingRuleConfig.getShardingAlgorithms().values()) {
95              if ("INLINE".equalsIgnoreCase(each.getType())) {
96                  each.getProps().put("allow-range-query-with-inline-sharding", Boolean.TRUE.toString());
97              }
98          }
99      }
100     
101     private void removeAuditStrategy(final YamlShardingRuleConfiguration yamlShardingRuleConfig) {
102         yamlShardingRuleConfig.setDefaultAuditStrategy(null);
103         yamlShardingRuleConfig.setAuditors(null);
104         if (null != yamlShardingRuleConfig.getTables()) {
105             yamlShardingRuleConfig.getTables().forEach((key, value) -> value.setAuditStrategy(null));
106         }
107         if (null != yamlShardingRuleConfig.getAutoTables()) {
108             yamlShardingRuleConfig.getAutoTables().forEach((key, value) -> value.setAuditStrategy(null));
109         }
110     }
111     
112     private YamlModeConfiguration createStandaloneModeConfiguration() {
113         YamlModeConfiguration result = new YamlModeConfiguration();
114         result.setType("Standalone");
115         YamlPersistRepositoryConfiguration yamlRepositoryConfig = new YamlPersistRepositoryConfiguration();
116         yamlRepositoryConfig.setType("JDBC");
117         yamlRepositoryConfig.getProps().setProperty(JDBCRepositoryPropertyKey.JDBC_URL.getKey(),
118                 String.format("jdbc:h2:mem:pipeline_db_%d;DB_CLOSE_DELAY=0;DATABASE_TO_UPPER=false;MODE=MYSQL", STANDALONE_DATABASE_ID.getAndIncrement()));
119         result.setRepository(yamlRepositoryConfig);
120         return result;
121     }
122     
123     private DataSource createShardingSphereDataSource(final YamlRootConfiguration yamlRootConfig) throws SQLException {
124         Map<String, DataSource> dataSourceMap = new YamlDataSourceConfigurationSwapper().swapToDataSources(yamlRootConfig.getDataSources(), false);
125         try {
126             return createShardingSphereDataSource(dataSourceMap, yamlRootConfig);
127             // CHECKSTYLE:OFF
128         } catch (final SQLException | RuntimeException ex) {
129             // CHECKSTYLE:ON
130             dataSourceMap.values().stream().map(DataSourcePoolDestroyer::new).forEach(DataSourcePoolDestroyer::asyncDestroy);
131             throw ex;
132         }
133     }
134     
135     private DataSource createShardingSphereDataSource(final Map<String, DataSource> dataSourceMap, final YamlRootConfiguration yamlRootConfig) throws SQLException {
136         ModeConfiguration modeConfig = new YamlModeConfigurationSwapper().swapToObject(yamlRootConfig.getMode());
137         Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfig.getRules());
138         return ShardingSphereDataSourceFactory.createDataSource(yamlRootConfig.getDatabaseName(), modeConfig, dataSourceMap, ruleConfigs, yamlRootConfig.getProps());
139     }
140     
141     @Override
142     public String getType() {
143         return ShardingSpherePipelineDataSourceConfiguration.TYPE;
144     }
145 }