1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.transaction.rule;
19
20 import lombok.AccessLevel;
21 import lombok.Getter;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
24 import org.apache.shardingsphere.infra.executor.sql.context.ExecutionContext;
25 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
26 import org.apache.shardingsphere.infra.rule.attribute.RuleAttributes;
27 import org.apache.shardingsphere.infra.rule.scope.GlobalRule;
28 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
29 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DMLStatement;
30 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.SelectStatement;
31 import org.apache.shardingsphere.transaction.ConnectionTransaction;
32 import org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
33 import org.apache.shardingsphere.transaction.api.TransactionType;
34 import org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
35 import org.apache.shardingsphere.transaction.constant.TransactionOrder;
36
37 import javax.sql.DataSource;
38 import java.util.Collection;
39 import java.util.LinkedHashMap;
40 import java.util.Map;
41 import java.util.Properties;
42 import java.util.concurrent.atomic.AtomicReference;
43
44
45
46
47 @Getter
48 @Slf4j
49 public final class TransactionRule implements GlobalRule, AutoCloseable {
50
51 private final TransactionRuleConfiguration configuration;
52
53 private final TransactionType defaultType;
54
55 private final String providerType;
56
57 private final Properties props;
58
59 @Getter(AccessLevel.NONE)
60 private final AtomicReference<ShardingSphereTransactionManagerEngine> resource;
61
62 private final RuleAttributes attributes;
63
64 public TransactionRule(final TransactionRuleConfiguration ruleConfig, final Collection<ShardingSphereDatabase> databases) {
65 configuration = ruleConfig;
66 defaultType = TransactionType.valueOf(ruleConfig.getDefaultType().toUpperCase());
67 providerType = ruleConfig.getProviderType();
68 props = ruleConfig.getProps();
69 resource = new AtomicReference<>(createTransactionManagerEngine(databases));
70 attributes = new RuleAttributes();
71 }
72
73 private synchronized ShardingSphereTransactionManagerEngine createTransactionManagerEngine(final Collection<ShardingSphereDatabase> databases) {
74 ShardingSphereTransactionManagerEngine result = new ShardingSphereTransactionManagerEngine(defaultType);
75 if (databases.isEmpty()) {
76 return result;
77 }
78 Map<String, DatabaseType> databaseTypes = new LinkedHashMap<>(databases.size(), 1F);
79 Map<String, DataSource> dataSourceMap = new LinkedHashMap<>(databases.size(), 1F);
80 for (ShardingSphereDatabase each : databases) {
81 each.getResourceMetaData().getStorageUnits().forEach((key, value) -> {
82 databaseTypes.put(each.getName() + "." + key, value.getStorageType());
83 dataSourceMap.put(each.getName() + "." + key, value.getDataSource());
84 });
85 }
86 result.init(databaseTypes, dataSourceMap, providerType);
87 return result;
88 }
89
90
91
92
93
94
95 public ShardingSphereTransactionManagerEngine getResource() {
96 return resource.get();
97 }
98
99
100
101
102
103
104
105
106
107 public boolean isImplicitCommitTransaction(final ExecutionContext executionContext, final ConnectionTransaction connectionTransaction, final boolean isAutoCommit) {
108 if (!isAutoCommit) {
109 return false;
110 }
111 if (!TransactionType.isDistributedTransaction(defaultType) || connectionTransaction.isInDistributedTransaction()) {
112 return false;
113 }
114 return isWriteDMLStatement(executionContext.getSqlStatementContext().getSqlStatement()) && executionContext.getExecutionUnits().size() > 1;
115 }
116
117 private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
118 return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement);
119 }
120
121 @Override
122 public void refresh(final Collection<ShardingSphereDatabase> databases, final GlobalRuleChangedType changedType) {
123 if (GlobalRuleChangedType.DATABASE_CHANGED != changedType) {
124 return;
125 }
126 ShardingSphereTransactionManagerEngine previousEngine = resource.get();
127 close(previousEngine);
128 resource.set(createTransactionManagerEngine(databases));
129 }
130
131 @Override
132 public void close() {
133
134 ShardingSphereTransactionManagerEngine engine = resource.get();
135 if (null != engine) {
136 close(engine);
137 resource.set(new ShardingSphereTransactionManagerEngine(defaultType));
138 }
139 }
140
141 private void close(final ShardingSphereTransactionManagerEngine engine) {
142 try {
143 engine.close();
144
145 } catch (final RuntimeException ex) {
146
147 log.error("Close transaction engine failed.", ex);
148 }
149 }
150
151 @Override
152 public int getOrder() {
153 return TransactionOrder.ORDER;
154 }
155 }