View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.core.datasource.creator;
19  
20  import org.apache.shardingsphere.authority.yaml.config.YamlAuthorityRuleConfiguration;
21  import org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
22  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
23  import org.apache.shardingsphere.data.pipeline.core.datasource.yaml.PipelineYamlRuleConfigurationReviser;
24  import org.apache.shardingsphere.data.pipeline.spi.PipelineDataSourceCreator;
25  import org.apache.shardingsphere.driver.api.ShardingSphereDataSourceFactory;
26  import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
27  import org.apache.shardingsphere.infra.config.props.ConfigurationPropertyKey;
28  import org.apache.shardingsphere.infra.config.props.temporary.TemporaryConfigurationPropertyKey;
29  import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
30  import org.apache.shardingsphere.infra.datasource.pool.destroyer.DataSourcePoolDestroyer;
31  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
32  import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
33  import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
34  import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlModeConfiguration;
35  import org.apache.shardingsphere.infra.yaml.config.pojo.mode.YamlPersistRepositoryConfiguration;
36  import org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
37  import org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
38  import org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
39  import org.apache.shardingsphere.mode.manager.ContextManager;
40  
41  import javax.sql.DataSource;
42  import java.sql.SQLException;
43  import java.util.Collection;
44  import java.util.LinkedList;
45  import java.util.List;
46  import java.util.Map;
47  import java.util.Properties;
48  
49  /**
50   * ShardingSphere pipeline data source creator.
51   */
52  public final class ShardingSpherePipelineDataSourceCreator implements PipelineDataSourceCreator {
53      
54      @Override
55      public DataSource create(final Object dataSourceConfig) throws SQLException {
56          YamlRootConfiguration yamlRootConfig = YamlEngine.unmarshal(YamlEngine.marshal(dataSourceConfig), YamlRootConfiguration.class);
57          removeAuthorityRuleConfiguration(yamlRootConfig);
58          yamlRootConfig.setProps(createConfigurationProperties());
59          reviseYamlRuleConfiguration(yamlRootConfig);
60          yamlRootConfig.setMode(createStandaloneModeConfiguration());
61          return createShardingSphereDataSource(yamlRootConfig);
62      }
63      
64      private void removeAuthorityRuleConfiguration(final YamlRootConfiguration yamlRootConfig) {
65          yamlRootConfig.getRules().removeIf(YamlAuthorityRuleConfiguration.class::isInstance);
66      }
67      
68      private Properties createConfigurationProperties() {
69          Properties realtimeProps = getRealtimeProperties();
70          Properties result = new Properties();
71          for (String each : getConfigurationPropertyKeys()) {
72              Object value = realtimeProps.get(each);
73              if (null != value) {
74                  result.put(each, value);
75              }
76          }
77          result.put(TemporaryConfigurationPropertyKey.SYSTEM_SCHEMA_METADATA_ASSEMBLY_ENABLED.getKey(), String.valueOf(Boolean.FALSE));
78          // Set a large enough value to enable ConnectionMode.MEMORY_STRICTLY, make sure streaming query work.
79          result.put(ConfigurationPropertyKey.MAX_CONNECTIONS_SIZE_PER_QUERY.getKey(), 100000);
80          return result;
81      }
82      
83      private Properties getRealtimeProperties() {
84          ContextManager contextManager = PipelineContextManager.getProxyContext();
85          if (null == contextManager) {
86              return new Properties();
87          }
88          return contextManager.getMetaDataContexts().getMetaData().getProps().getProps();
89      }
90      
91      private List<String> getConfigurationPropertyKeys() {
92          List<String> result = new LinkedList<>();
93          result.add(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE.getKey());
94          result.add(ConfigurationPropertyKey.SQL_SHOW.getKey());
95          return result;
96      }
97      
98      @SuppressWarnings("unchecked")
99      private void reviseYamlRuleConfiguration(final YamlRootConfiguration yamlRootConfig) {
100         OrderedSPILoader.getServices(PipelineYamlRuleConfigurationReviser.class, yamlRootConfig.getRules()).forEach((key, value) -> value.revise(key));
101     }
102     
103     private YamlModeConfiguration createStandaloneModeConfiguration() {
104         YamlModeConfiguration result = new YamlModeConfiguration();
105         result.setType("Standalone");
106         YamlPersistRepositoryConfiguration yamlRepositoryConfig = new YamlPersistRepositoryConfiguration();
107         yamlRepositoryConfig.setType("Memory");
108         result.setRepository(yamlRepositoryConfig);
109         return result;
110     }
111     
112     private DataSource createShardingSphereDataSource(final YamlRootConfiguration yamlRootConfig) throws SQLException {
113         Map<String, DataSource> dataSourceMap = new YamlDataSourceConfigurationSwapper().swapToDataSources(yamlRootConfig.getDataSources(), false);
114         try {
115             return createShardingSphereDataSource(dataSourceMap, yamlRootConfig);
116             // CHECKSTYLE:OFF
117         } catch (final SQLException | RuntimeException ex) {
118             // CHECKSTYLE:ON
119             dataSourceMap.values().stream().map(DataSourcePoolDestroyer::new).forEach(DataSourcePoolDestroyer::asyncDestroy);
120             throw ex;
121         }
122     }
123     
124     private DataSource createShardingSphereDataSource(final Map<String, DataSource> dataSourceMap, final YamlRootConfiguration yamlRootConfig) throws SQLException {
125         ModeConfiguration modeConfig = new YamlModeConfigurationSwapper().swapToObject(yamlRootConfig.getMode());
126         Collection<RuleConfiguration> ruleConfigs = new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(yamlRootConfig.getRules());
127         return ShardingSphereDataSourceFactory.createDataSource(yamlRootConfig.getDatabaseName(), modeConfig, dataSourceMap, ruleConfigs, yamlRootConfig.getProps());
128     }
129     
130     @Override
131     public String getType() {
132         return ShardingSpherePipelineDataSourceConfiguration.TYPE;
133     }
134 }