View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.infra.merge;
19  
20  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
21  import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
22  import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
23  import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
24  import org.apache.shardingsphere.infra.merge.engine.ResultProcessEngine;
25  import org.apache.shardingsphere.infra.merge.engine.decorator.ResultDecorator;
26  import org.apache.shardingsphere.infra.merge.engine.decorator.ResultDecoratorEngine;
27  import org.apache.shardingsphere.infra.merge.engine.decorator.impl.TransparentResultDecorator;
28  import org.apache.shardingsphere.infra.merge.engine.merger.ResultMerger;
29  import org.apache.shardingsphere.infra.merge.engine.merger.ResultMergerEngine;
30  import org.apache.shardingsphere.infra.merge.result.MergedResult;
31  import org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
32  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
33  import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
34  import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
35  import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
36  import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
37  
38  import java.sql.SQLException;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.Map.Entry;
42  import java.util.Optional;
43  
44  /**
45   * Merge engine.
46   */
47  @HighFrequencyInvocation
48  public final class MergeEngine {
49      
50      private final RuleMetaData globalRuleMetaData;
51      
52      private final ShardingSphereDatabase database;
53      
54      private final ConfigurationProperties props;
55      
56      @SuppressWarnings("rawtypes")
57      private final Map<ShardingSphereRule, ResultProcessEngine> engines;
58      
59      private final ConnectionContext connectionContext;
60      
61      public MergeEngine(final RuleMetaData globalRuleMetaData, final ShardingSphereDatabase database, final ConfigurationProperties props, final ConnectionContext connectionContext) {
62          this.globalRuleMetaData = globalRuleMetaData;
63          this.database = database;
64          this.props = props;
65          engines = OrderedSPILoader.getServices(ResultProcessEngine.class, database.getRuleMetaData().getRules());
66          this.connectionContext = connectionContext;
67      }
68      
69      /**
70       * Merge.
71       *
72       * @param queryResults query results
73       * @param sqlStatementContext SQL statement context
74       * @return merged result
75       * @throws SQLException SQL exception
76       */
77      public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
78          Optional<MergedResult> mergedResult = executeMerge(queryResults, sqlStatementContext);
79          Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
80          return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
81      }
82      
83      @SuppressWarnings({"unchecked", "rawtypes"})
84      private Optional<MergedResult> executeMerge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
85          for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
86              if (entry.getValue() instanceof ResultMergerEngine) {
87                  ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(database.getName(), database.getProtocolType(), entry.getKey(), props, sqlStatementContext);
88                  return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, database, connectionContext));
89              }
90          }
91          return Optional.empty();
92      }
93      
94      @SuppressWarnings({"unchecked", "rawtypes"})
95      private MergedResult decorate(final MergedResult mergedResult, final SQLStatementContext sqlStatementContext) throws SQLException {
96          MergedResult result = null;
97          for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
98              if (entry.getValue() instanceof ResultDecoratorEngine) {
99                  ResultDecorator resultDecorator = getResultDecorator(sqlStatementContext, entry);
100                 result = null == result ? resultDecorator.decorate(mergedResult, sqlStatementContext, entry.getKey()) : resultDecorator.decorate(result, sqlStatementContext, entry.getKey());
101             }
102         }
103         return null == result ? mergedResult : result;
104     }
105     
106     @SuppressWarnings({"unchecked", "rawtypes"})
107     private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
108         MergedResult result = null;
109         for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
110             if (entry.getValue() instanceof ResultDecoratorEngine) {
111                 ResultDecorator resultDecorator = getResultDecorator(sqlStatementContext, entry);
112                 result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, entry.getKey()) : resultDecorator.decorate(result, sqlStatementContext, entry.getKey());
113             }
114         }
115         return Optional.ofNullable(result);
116     }
117     
118     @SuppressWarnings({"unchecked", "rawtypes"})
119     private ResultDecorator getResultDecorator(final SQLStatementContext sqlStatementContext, final Entry<ShardingSphereRule, ResultProcessEngine> entry) {
120         return (ResultDecorator) ((ResultDecoratorEngine) entry.getValue()).newInstance(globalRuleMetaData, database, entry.getKey(), props, sqlStatementContext)
121                 .orElseGet(TransparentResultDecorator::new);
122     }
123 }