1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.infra.merge;
19
20 import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
21 import org.apache.shardingsphere.infra.binder.context.statement.SQLStatementContext;
22 import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
23 import org.apache.shardingsphere.infra.executor.sql.execute.result.query.QueryResult;
24 import org.apache.shardingsphere.infra.merge.engine.ResultProcessEngine;
25 import org.apache.shardingsphere.infra.merge.engine.decorator.ResultDecorator;
26 import org.apache.shardingsphere.infra.merge.engine.decorator.ResultDecoratorEngine;
27 import org.apache.shardingsphere.infra.merge.engine.decorator.impl.TransparentResultDecorator;
28 import org.apache.shardingsphere.infra.merge.engine.merger.ResultMerger;
29 import org.apache.shardingsphere.infra.merge.engine.merger.ResultMergerEngine;
30 import org.apache.shardingsphere.infra.merge.result.MergedResult;
31 import org.apache.shardingsphere.infra.merge.result.impl.transparent.TransparentMergedResult;
32 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
33 import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
34 import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
35 import org.apache.shardingsphere.infra.session.connection.ConnectionContext;
36 import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
37
38 import java.sql.SQLException;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Map.Entry;
42 import java.util.Optional;
43
44
45
46
47 @HighFrequencyInvocation
48 public final class MergeEngine {
49
50 private final RuleMetaData globalRuleMetaData;
51
52 private final ShardingSphereDatabase database;
53
54 private final ConfigurationProperties props;
55
56 @SuppressWarnings("rawtypes")
57 private final Map<ShardingSphereRule, ResultProcessEngine> engines;
58
59 private final ConnectionContext connectionContext;
60
61 public MergeEngine(final RuleMetaData globalRuleMetaData, final ShardingSphereDatabase database, final ConfigurationProperties props, final ConnectionContext connectionContext) {
62 this.globalRuleMetaData = globalRuleMetaData;
63 this.database = database;
64 this.props = props;
65 engines = OrderedSPILoader.getServices(ResultProcessEngine.class, database.getRuleMetaData().getRules());
66 this.connectionContext = connectionContext;
67 }
68
69
70
71
72
73
74
75
76
77 public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
78 Optional<MergedResult> mergedResult = executeMerge(queryResults, sqlStatementContext);
79 Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
80 return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
81 }
82
83 @SuppressWarnings({"unchecked", "rawtypes"})
84 private Optional<MergedResult> executeMerge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
85 for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
86 if (entry.getValue() instanceof ResultMergerEngine) {
87 ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(database.getName(), database.getProtocolType(), entry.getKey(), props, sqlStatementContext);
88 return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, database, connectionContext));
89 }
90 }
91 return Optional.empty();
92 }
93
94 @SuppressWarnings({"unchecked", "rawtypes"})
95 private MergedResult decorate(final MergedResult mergedResult, final SQLStatementContext sqlStatementContext) throws SQLException {
96 MergedResult result = null;
97 for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
98 if (entry.getValue() instanceof ResultDecoratorEngine) {
99 ResultDecorator resultDecorator = getResultDecorator(sqlStatementContext, entry);
100 result = null == result ? resultDecorator.decorate(mergedResult, sqlStatementContext, entry.getKey()) : resultDecorator.decorate(result, sqlStatementContext, entry.getKey());
101 }
102 }
103 return null == result ? mergedResult : result;
104 }
105
106 @SuppressWarnings({"unchecked", "rawtypes"})
107 private Optional<MergedResult> decorate(final QueryResult queryResult, final SQLStatementContext sqlStatementContext) throws SQLException {
108 MergedResult result = null;
109 for (Entry<ShardingSphereRule, ResultProcessEngine> entry : engines.entrySet()) {
110 if (entry.getValue() instanceof ResultDecoratorEngine) {
111 ResultDecorator resultDecorator = getResultDecorator(sqlStatementContext, entry);
112 result = null == result ? resultDecorator.decorate(queryResult, sqlStatementContext, entry.getKey()) : resultDecorator.decorate(result, sqlStatementContext, entry.getKey());
113 }
114 }
115 return Optional.ofNullable(result);
116 }
117
118 @SuppressWarnings({"unchecked", "rawtypes"})
119 private ResultDecorator getResultDecorator(final SQLStatementContext sqlStatementContext, final Entry<ShardingSphereRule, ResultProcessEngine> entry) {
120 return (ResultDecorator) ((ResultDecoratorEngine) entry.getValue()).newInstance(globalRuleMetaData, database, entry.getKey(), props, sqlStatementContext)
121 .orElseGet(TransparentResultDecorator::new);
122 }
123 }