View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.datasource.creator;
19  
20  import org.apache.shardingsphere.authority.yaml.config.YamlAuthorityRuleConfiguration;
21  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
23  import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
24  import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
25  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
26  import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
27  import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
28  import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
29  import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
30  import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
31  import org.apache.shardingsphere.infra.algorithm.core.yaml.YamlAlgorithmConfiguration;
32  import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
33  import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlPersistRepositoryConfiguration;
34  import org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
35  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
36  import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
37  import org.apache.shardingsphere.mode.repository.standalone.jdbc.props.JDBCRepositoryPropertyKey;
38  import org.apache.shardingsphere.sharding.yaml.config.YamlShardingRuleConfiguration;
39  import org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
40  import org.apache.shardingsphere.single.api.constant.SingleTableConstants;
41  import org.apache.shardingsphere.single.yaml.config.pojo.YamlSingleRuleConfiguration;
42  
43  import javax.sql.DataSource;
44  import java.sql.SQLException;
45  import java.util.Collection;
46  import java.util.Collections;
47  import java.util.Map;
48  import java.util.Optional;
49  import java.util.Properties;
50  import java.util.concurrent.atomic.AtomicInteger;
51  
52  /**
53   * ShardingSphere pipeline data source creator.
54   */
55  public final class ShardingSpherePipelineDataSourceCreator implements PipelineDataSourceCreator {
56      
57      private static final AtomicInteger STANDALONE_DATABASE_ID = new AtomicInteger(1);
58      
59      @Override
60      public DataSource create(final Object dataSourceConfig) throws SQLException {
61          YamlRootConfiguration rootConfig = YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), YamlRootConfiguration.class);
62          removeAuthorityRule(rootConfig);
63          updateSingleRuleConfiguration(rootConfig);
64          disableSystemSchemaMetadata(rootConfig);
65          enableStreamingQuery(rootConfig);
66          Optional<YamlShardingRuleConfiguration> yamlShardingRuleConfig = ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
67          if (yamlShardingRuleConfig.isPresent()) {
68              enableRangeQueryForInline(yamlShardingRuleConfig.get());
69              removeAuditStrategy(yamlShardingRuleConfig.get());
70          }
71          rootConfig.setMode(createStandaloneModeConfiguration());
72          return createDataSourceWithoutCache(rootConfig);
73      }
74      
75      private void removeAuthorityRule(final YamlRootConfiguration rootConfig) {
76          rootConfig.getRules().stream().filter(YamlAuthorityRuleConfiguration.class::isInstance).findFirst().map(each -> rootConfig.getRules().remove(each));
77      }
78      
79      private void updateSingleRuleConfiguration(final YamlRootConfiguration rootConfig) {
80          rootConfig.getRules().removeIf(YamlSingleRuleConfiguration.class::isInstance);
81          YamlSingleRuleConfiguration singleRuleConfig = new YamlSingleRuleConfiguration();
82          singleRuleConfig.setTables(Collections.singletonList(SingleTableConstants.ALL_TABLES));
83          rootConfig.getRules().add(singleRuleConfig);
84      }
85      
86      private void disableSystemSchemaMetadata(final YamlRootConfiguration rootConfig) {
87          rootConfig.getProps().put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ENABLED.getKey(), String.valueOf(Boolean.FALSE));
88      }
89      
90      // TODO Another way is improving ExecuteQueryCallback.executeSQL to enable streaming query, then remove it
91      private void enableStreamingQuery(final YamlRootConfiguration rootConfig) {
92          // Set a large enough value to enable ConnectionMode.MEMORY_STRICTLY, make sure streaming query work.
93          rootConfig.getProps().put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 100000);
94      }
95      
96      private void enableRangeQueryForInline(final YamlShardingRuleConfiguration yamlShardingRuleConfig) {
97          for (YamlAlgorithmConfiguration each : yamlShardingRuleConfig.getShardingAlgorithms().values()) {
98              if ("INLINE".equalsIgnoreCase(each.getType())) {
99                  each.getProps().put("allow-range-query-with-inline-sharding", Boolean.TRUE.toString());
100             }
101         }
102     }
103     
104     private void removeAuditStrategy(final YamlShardingRuleConfiguration yamlShardingRuleConfig) {
105         yamlShardingRuleConfig.setDefaultAuditStrategy(null);
106         yamlShardingRuleConfig.setAuditors(null);
107         if (null != yamlShardingRuleConfig.getTables()) {
108             yamlShardingRuleConfig.getTables().forEach((key, value) -> value.setAuditStrategy(null));
109         }
110         if (null != yamlShardingRuleConfig.getAutoTables()) {
111             yamlShardingRuleConfig.getAutoTables().forEach((key, value) -> value.setAuditStrategy(null));
112         }
113     }
114     
115     private YamlModeConfiguration createStandaloneModeConfiguration() {
116         YamlModeConfiguration result = new YamlModeConfiguration();
117         result.setType("Standalone");
118         YamlPersistRepositoryConfiguration repository = new YamlPersistRepositoryConfiguration();
119         result.setRepository(repository);
120         repository.setType("JDBC");
121         Properties props = new Properties();
122         repository.setProps(props);
123         props.setProperty(JDBCRepositoryPropertyKey.JDBC_URL.getKey(),
124                 String.format("jdbc:h2:mem:pipeline_db_%d;DB_CLOSE_DELAY=0;DATABASE_TO_UPPER=false;MODE=MYSQL", STANDALONE_DATABASE_ID.getAndIncrement()));
125         return result;
126     }
127     
128     private DataSource createDataSourceWithoutCache(final YamlRootConfiguration rootConfig) throws SQLException {
129         Map<String, DataSource> dataSourceMap = new YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources(), false);
130         try {
131             return createDataSource(dataSourceMap, rootConfig);
132             // CHECKSTYLE:OFF
133         } catch (final SQLException | RuntimeException ex) {
134             // CHECKSTYLE:ON
135             dataSourceMap.values().stream().map(DataSourcePoolDestroyer::new).forEach(DataSourcePoolDestroyer::asyncDestroy);
136             throw ex;
137         }
138     }
139     
140     private DataSource createDataSource(final Map<String, DataSource> dataSourceMap, final YamlRootConfiguration rootConfig) throws SQLException {
141         ModeConfiguration modeConfig = null == rootConfig.getMode() ? null : new YamlModeConfigurationSwapper().swapToObject(rootConfig.getMode());
142         Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(rootConfig.getRules());
143         return ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(), modeConfig, dataSourceMap, ruleConfigs, rootConfig.getProps());
144     }
145     
146     @Override
147     public String getType() {
148         return ShardingSpherePipelineDataSourceConfiguration.TYPE;
149     }
150 }