1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.transaction.xa.narayana.manager;
19
20 import com.arjuna.ats.jta.recovery.XAResourceRecoveryHelper;
21 import com.google.common.base.Preconditions;
22 import lombok.extern.slf4j.Slf4j;
23
24 import javax.sql.XAConnection;
25 import javax.sql.XADataSource;
26 import javax.transaction.xa.XAException;
27 import javax.transaction.xa.XAResource;
28 import javax.transaction.xa.Xid;
29 import java.sql.SQLException;
30
31
32
33
34 @Slf4j
35 public final class DataSourceXAResourceRecoveryHelper implements XAResourceRecoveryHelper, XAResource {
36
37 private static final XAResource[] NO_XA_RESOURCES = {};
38
39 private final XADataSource xaDataSource;
40
41 private final String user;
42
43 private final String password;
44
45 private XAConnection xaConnection;
46
47 private XAResource delegate;
48
49
50
51
52
53
54 public DataSourceXAResourceRecoveryHelper(final XADataSource xaDataSource) {
55 this(xaDataSource, null, null);
56 }
57
58
59
60
61
62
63
64
65 public DataSourceXAResourceRecoveryHelper(final XADataSource xaDataSource, final String user, final String password) {
66 this.xaDataSource = xaDataSource;
67 this.user = user;
68 this.password = password;
69 }
70
71 @Override
72 public boolean initialise(final String props) {
73 return true;
74 }
75
76 @Override
77 public XAResource[] getXAResources() {
78 if (connect()) {
79 return new XAResource[]{this};
80 }
81 return NO_XA_RESOURCES;
82 }
83
84 private boolean connect() {
85 if (null == delegate) {
86 try {
87 xaConnection = getXaConnection();
88 delegate = xaConnection.getXAResource();
89 } catch (final SQLException ex) {
90 log.warn("Failed to create connection", ex);
91 return false;
92 }
93 }
94 return true;
95 }
96
97 private XAConnection getXaConnection() throws SQLException {
98 return null == user && null == password ? xaDataSource.getXAConnection() : xaDataSource.getXAConnection(user, password);
99 }
100
101 @Override
102 public Xid[] recover(final int flag) throws XAException {
103 try {
104 return getDelegate().recover(flag);
105 } finally {
106 if (flag == XAResource.TMENDRSCAN) {
107 disconnect();
108 }
109 }
110 }
111
112 private void disconnect() {
113 try {
114 xaConnection.close();
115 } catch (final SQLException ex) {
116 log.warn("Failed to close connection", ex);
117 } finally {
118 xaConnection = null;
119 delegate = null;
120 }
121 }
122
123 @Override
124 public void start(final Xid xid, final int flags) throws XAException {
125 getDelegate().start(xid, flags);
126 }
127
128 @Override
129 public void end(final Xid xid, final int flags) throws XAException {
130 getDelegate().end(xid, flags);
131 }
132
133 @Override
134 public int prepare(final Xid xid) throws XAException {
135 return getDelegate().prepare(xid);
136 }
137
138 @Override
139 public void commit(final Xid xid, final boolean onePhase) throws XAException {
140 getDelegate().commit(xid, onePhase);
141 }
142
143 @Override
144 public void rollback(final Xid xid) throws XAException {
145 getDelegate().rollback(xid);
146 }
147
148 @Override
149 public boolean isSameRM(final XAResource xaResource) throws XAException {
150 return getDelegate().isSameRM(xaResource);
151 }
152
153 @Override
154 public void forget(final Xid xid) throws XAException {
155 getDelegate().forget(xid);
156 }
157
158 @Override
159 public int getTransactionTimeout() throws XAException {
160 return getDelegate().getTransactionTimeout();
161 }
162
163 @Override
164 public boolean setTransactionTimeout(final int seconds) throws XAException {
165 return getDelegate().setTransactionTimeout(seconds);
166 }
167
168 private XAResource getDelegate() {
169 Preconditions.checkNotNull(delegate, "Connection has not been opened");
170 return delegate;
171 }
172 }