1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.transaction.rule;
19
20 import lombok.Getter;
21 import lombok.extern.slf4j.Slf4j;
22 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
23 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
24 import org.apache.shardingsphere.infra.rule.attribute.RuleAttributes;
25 import org.apache.shardingsphere.infra.rule.scope.GlobalRule;
26 import org.apache.shardingsphere.transaction.ShardingSphereTransactionManagerEngine;
27 import org.apache.shardingsphere.transaction.api.TransactionType;
28 import org.apache.shardingsphere.transaction.config.TransactionRuleConfiguration;
29
30 import javax.sql.DataSource;
31 import java.util.LinkedHashMap;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.Properties;
35 import java.util.concurrent.atomic.AtomicReference;
36
37
38
39
40 @Getter
41 @Slf4j
42 public final class TransactionRule implements GlobalRule, AutoCloseable {
43
44 private final TransactionRuleConfiguration configuration;
45
46 private final TransactionType defaultType;
47
48 private final String providerType;
49
50 private final Properties props;
51
52 private final AtomicReference<ShardingSphereTransactionManagerEngine> resource;
53
54 private final RuleAttributes attributes;
55
56 public TransactionRule(final TransactionRuleConfiguration ruleConfig, final Map<String, ShardingSphereDatabase> databases) {
57 configuration = ruleConfig;
58 defaultType = TransactionType.valueOf(ruleConfig.getDefaultType().toUpperCase());
59 providerType = ruleConfig.getProviderType();
60 props = ruleConfig.getProps();
61 resource = new AtomicReference<>(createTransactionManagerEngine(databases));
62 attributes = new RuleAttributes();
63 }
64
65 private synchronized ShardingSphereTransactionManagerEngine createTransactionManagerEngine(final Map<String, ShardingSphereDatabase> databases) {
66 if (databases.isEmpty()) {
67 return new ShardingSphereTransactionManagerEngine(defaultType);
68 }
69 Map<String, DatabaseType> databaseTypes = new LinkedHashMap<>(databases.size(), 1F);
70 Map<String, DataSource> dataSourceMap = new LinkedHashMap<>(databases.size(), 1F);
71 for (Entry<String, ShardingSphereDatabase> entry : databases.entrySet()) {
72 ShardingSphereDatabase database = entry.getValue();
73 database.getResourceMetaData().getStorageUnits().forEach((key, value) -> {
74 databaseTypes.put(database.getName() + "." + key, value.getStorageType());
75 dataSourceMap.put(database.getName() + "." + key, value.getDataSource());
76 });
77 }
78 ShardingSphereTransactionManagerEngine result = new ShardingSphereTransactionManagerEngine(defaultType);
79 result.init(databaseTypes, dataSourceMap, providerType);
80 return result;
81 }
82
83
84
85
86
87
88 public ShardingSphereTransactionManagerEngine getResource() {
89 return resource.get();
90 }
91
92 @Override
93 public void refresh(final Map<String, ShardingSphereDatabase> databases, final GlobalRuleChangedType changedType) {
94 if (GlobalRuleChangedType.DATABASE_CHANGED != changedType) {
95 return;
96 }
97 ShardingSphereTransactionManagerEngine previousEngine = resource.get();
98 if (null != previousEngine) {
99 close(previousEngine);
100 }
101 resource.set(createTransactionManagerEngine(databases));
102 }
103
104 @Override
105 public void close() {
106
107 ShardingSphereTransactionManagerEngine engine = resource.get();
108 if (null != engine) {
109 close(engine);
110 resource.set(new ShardingSphereTransactionManagerEngine(defaultType));
111 }
112 }
113
114 private void close(final ShardingSphereTransactionManagerEngine engine) {
115 try {
116 engine.close();
117
118 } catch (final RuntimeException ex) {
119
120 log.error("Close transaction engine failed.", ex);
121 }
122 }
123 }