View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.transaction.xa.narayana.manager;
19  
20  import com.arjuna.ats.jta.recovery.XAResourceRecoveryHelper;
21  import com.google.common.base.Preconditions;
22  import lombok.extern.slf4j.Slf4j;
23  
24  import javax.sql.XAConnection;
25  import javax.sql.XADataSource;
26  import javax.transaction.xa.XAException;
27  import javax.transaction.xa.XAResource;
28  import javax.transaction.xa.Xid;
29  import java.sql.SQLException;
30  
31  /**
32   * XAResourceRecoveryHelper implementation which gets XIDs, which needs to be recovered, from the database.
33   */
34  @Slf4j
35  public final class DataSourceXAResourceRecoveryHelper implements XAResourceRecoveryHelper, XAResource {
36      
37      private static final XAResource[] NO_XA_RESOURCES = {};
38      
39      private final XADataSource xaDataSource;
40      
41      private final String user;
42      
43      private final String password;
44      
45      private XAConnection xaConnection;
46      
47      private XAResource delegate;
48      
49      /**
50       * Create a new {@link DataSourceXAResourceRecoveryHelper} instance.
51       *
52       * @param xaDataSource the XA data source
53       */
54      public DataSourceXAResourceRecoveryHelper(final XADataSource xaDataSource) {
55          this(xaDataSource, null, null);
56      }
57      
58      /**
59       * Create a new {@link DataSourceXAResourceRecoveryHelper} instance.
60       *
61       * @param xaDataSource the XA data source
62       * @param user the database user or {@code null}
63       * @param password the database password or {@code null}
64       */
65      public DataSourceXAResourceRecoveryHelper(final XADataSource xaDataSource, final String user, final String password) {
66          this.xaDataSource = xaDataSource;
67          this.user = user;
68          this.password = password;
69      }
70      
71      @Override
72      public boolean initialise(final String props) {
73          return true;
74      }
75      
76      @Override
77      public XAResource[] getXAResources() {
78          if (connect()) {
79              return new XAResource[]{this};
80          }
81          return NO_XA_RESOURCES;
82      }
83      
84      private boolean connect() {
85          if (null == delegate) {
86              try {
87                  xaConnection = getXaConnection();
88                  delegate = xaConnection.getXAResource();
89              } catch (final SQLException ex) {
90                  log.warn("Failed to create connection", ex);
91                  return false;
92              }
93          }
94          return true;
95      }
96      
97      private XAConnection getXaConnection() throws SQLException {
98          return null == user && null == password ? xaDataSource.getXAConnection() : xaDataSource.getXAConnection(user, password);
99      }
100     
101     @Override
102     public Xid[] recover(final int flag) throws XAException {
103         try {
104             return getDelegate().recover(flag);
105         } finally {
106             if (flag == XAResource.TMENDRSCAN) {
107                 disconnect();
108             }
109         }
110     }
111     
112     private void disconnect() {
113         try {
114             xaConnection.close();
115         } catch (final SQLException ex) {
116             log.warn("Failed to close connection", ex);
117         } finally {
118             xaConnection = null;
119             delegate = null;
120         }
121     }
122     
123     @Override
124     public void start(final Xid xid, final int flags) throws XAException {
125         getDelegate().start(xid, flags);
126     }
127     
128     @Override
129     public void end(final Xid xid, final int flags) throws XAException {
130         getDelegate().end(xid, flags);
131     }
132     
133     @Override
134     public int prepare(final Xid xid) throws XAException {
135         return getDelegate().prepare(xid);
136     }
137     
138     @Override
139     public void commit(final Xid xid, final boolean onePhase) throws XAException {
140         getDelegate().commit(xid, onePhase);
141     }
142     
143     @Override
144     public void rollback(final Xid xid) throws XAException {
145         getDelegate().rollback(xid);
146     }
147     
148     @Override
149     public boolean isSameRM(final XAResource xaResource) throws XAException {
150         return getDelegate().isSameRM(xaResource);
151     }
152     
153     @Override
154     public void forget(final Xid xid) throws XAException {
155         getDelegate().forget(xid);
156     }
157     
158     @Override
159     public int getTransactionTimeout() throws XAException {
160         return getDelegate().getTransactionTimeout();
161     }
162     
163     @Override
164     public boolean setTransactionTimeout(final int seconds) throws XAException {
165         return getDelegate().setTransactionTimeout(seconds);
166     }
167     
168     private XAResource getDelegate() {
169         Preconditions.checkNotNull(delegate, "Connection has not been opened");
170         return delegate;
171     }
172 }