1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.transaction.rule;
19
20 import lombok.AccessLevel;
21 import lombok.Getter;
22 import lombok.extern.slf4j.Slf4j;
23 import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
24 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
25 import org.apache.shardingsphere.infra.rule.attribute.RuleAttributes;
26 import org.apache.shardingsphere.infra.rule.scope.GlobalRule;
27 import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
28 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.DMLStatement;
29 import org.apache.shardingsphere.sql.parser.statement.core.statement.type.dml.SelectStatement;
30 import org.apache.shardingsphere.transaction.ConnectionTransaction;
31 import org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
32 import org.apache.shardingsphere.transaction.api.TransactionType;
33 import org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
34 import org.apache.shardingsphere.transaction.constant.TransactionOrder;
35
36 import javax.sql.DataSource;
37 import java.util.Collection;
38 import java.util.LinkedHashMap;
39 import java.util.Map;
40 import java.util.Properties;
41 import java.util.concurrent.atomic.AtomicReference;
42
43
44
45
46 @Getter
47 @Slf4j
48 public final class TransactionRule implements GlobalRule, AutoCloseable {
49
50 private final TransactionRuleConfiguration configuration;
51
52 private final TransactionType defaultType;
53
54 private final String providerType;
55
56 private final Properties props;
57
58 @Getter(AccessLevel.NONE)
59 private final AtomicReference<ShardingSphereTransactionManagerEngine> resource;
60
61 private final RuleAttributes attributes;
62
63 public TransactionRule(final TransactionRuleConfiguration ruleConfig, final Collection<ShardingSphereDatabase> databases) {
64 configuration = ruleConfig;
65 defaultType = TransactionType.valueOf(ruleConfig.getDefaultType().toUpperCase());
66 providerType = ruleConfig.getProviderType();
67 props = ruleConfig.getProps();
68 resource = new AtomicReference<>(createTransactionManagerEngine(databases));
69 attributes = new RuleAttributes();
70 }
71
72 private synchronized ShardingSphereTransactionManagerEngine createTransactionManagerEngine(final Collection<ShardingSphereDatabase> databases) {
73 ShardingSphereTransactionManagerEngine result = new ShardingSphereTransactionManagerEngine(defaultType);
74 if (databases.isEmpty()) {
75 return result;
76 }
77 Map<String, DatabaseType> databaseTypes = new LinkedHashMap<>(databases.size(), 1F);
78 Map<String, DataSource> dataSourceMap = new LinkedHashMap<>(databases.size(), 1F);
79 for (ShardingSphereDatabase each : databases) {
80 each.getResourceMetaData().getStorageUnits().forEach((key, value) -> {
81 databaseTypes.put(each.getName() + "." + key, value.getStorageType());
82 dataSourceMap.put(each.getName() + "." + key, value.getDataSource());
83 });
84 }
85 result.init(databaseTypes, dataSourceMap, providerType);
86 return result;
87 }
88
89
90
91
92
93
94 public ShardingSphereTransactionManagerEngine getResource() {
95 return resource.get();
96 }
97
98
99
100
101
102
103
104
105
106
107 public boolean isImplicitCommitTransaction(final SQLStatement sqlStatement, final boolean multiExecutionUnits, final ConnectionTransaction connectionTransaction, final boolean isAutoCommit) {
108 if (!isAutoCommit) {
109 return false;
110 }
111 if (!TransactionType.isDistributedTransaction(defaultType) || connectionTransaction.isInDistributedTransaction()) {
112 return false;
113 }
114 return isWriteDMLStatement(sqlStatement) && multiExecutionUnits;
115 }
116
117 private boolean isWriteDMLStatement(final SQLStatement sqlStatement) {
118 return sqlStatement instanceof DMLStatement && !(sqlStatement instanceof SelectStatement);
119 }
120
121 @Override
122 public void refresh(final Collection<ShardingSphereDatabase> databases, final GlobalRuleChangedType changedType) {
123 if (GlobalRuleChangedType.DATABASE_CHANGED != changedType) {
124 return;
125 }
126 ShardingSphereTransactionManagerEngine previousEngine = resource.get();
127 close(previousEngine);
128 resource.set(createTransactionManagerEngine(databases));
129 }
130
131 @Override
132 public void close() {
133
134 ShardingSphereTransactionManagerEngine engine = resource.get();
135 if (null != engine) {
136 resource.set(null);
137 close(engine);
138 }
139 }
140
141 private void close(final ShardingSphereTransactionManagerEngine engine) {
142 try {
143 engine.close();
144
145 } catch (final RuntimeException ex) {
146
147 log.error("Close transaction engine failed.", ex);
148 }
149 }
150
151 @Override
152 public int getOrder() {
153 return TransactionOrder.ORDER;
154 }
155 }