View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc;
19  
20  import lombok.RequiredArgsConstructor;
21  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
22  import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
23  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
24  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
25  import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
26  import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
27  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
28  import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
29  import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
30  import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
31  import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
32  import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
33  
34  import java.sql.SQLException;
35  import java.sql.Statement;
36  import java.util.Collection;
37  import java.util.LinkedList;
38  import java.util.Optional;
39  
40  /**
41   * JDBC executor callback.
42   *
43   * @param <T> class type of return value
44   */
45  @HighFrequencyInvocation
46  @RequiredArgsConstructor
47  public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCExecutionUnit, T> {
48      
49      private final DatabaseType protocolType;
50      
51      private final ResourceMetaData resourceMetaData;
52      
53      private final SQLStatement sqlStatement;
54      
55      private final boolean isExceptionThrown;
56      
57      private final ProcessEngine processEngine = new ProcessEngine();
58      
59      @Override
60      public final Collection<T> execute(final Collection<JDBCExecutionUnit> executionUnits, final boolean isTrunkThread, final String processId) throws SQLException {
61          // TODO It is better to judge whether need sane result before execute, can avoid exception thrown
62          Collection<T> result = new LinkedList<>();
63          for (JDBCExecutionUnit each : executionUnits) {
64              T executeResult = execute(each, isTrunkThread, processId);
65              if (null != executeResult) {
66                  result.add(executeResult);
67              }
68          }
69          return result;
70      }
71      
72      /*
73       * To make sure SkyWalking will be available at the next release of ShardingSphere, a new plugin should be provided to SkyWalking project if this API changed.
74       *
75       * @see <a href="https://github.com/apache/skywalking/blob/master/docs/en/guides/Java-Plugin-Development-Guide.md#user-content-plugin-development-guide">Plugin Development Guide</a>
76       */
77      private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread, final String processId) throws SQLException {
78          SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
79          DatabaseType storageType = resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getStorageType();
80          ConnectionProperties connectionProps = resourceMetaData.getStorageUnits().get(jdbcExecutionUnit.getExecutionUnit().getDataSourceName()).getConnectionProperties();
81          SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
82          try {
83              SQLUnit sqlUnit = jdbcExecutionUnit.getExecutionUnit().getSqlUnit();
84              sqlExecutionHook.start(jdbcExecutionUnit.getExecutionUnit().getDataSourceName(), sqlUnit.getSql(), sqlUnit.getParameters(), connectionProps, isTrunkThread);
85              T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
86              sqlExecutionHook.finishSuccess();
87              processEngine.completeSQLUnitExecution(jdbcExecutionUnit, processId);
88              return result;
89          } catch (final SQLException ex) {
90              if (!storageType.equals(protocolType)) {
91                  Optional<T> saneResult = getSaneResult(sqlStatement, ex);
92                  if (saneResult.isPresent()) {
93                      return isTrunkThread ? saneResult.get() : null;
94                  }
95              }
96              sqlExecutionHook.finishFailure(ex);
97              SQLExecutorExceptionHandler.handleException(ex);
98              return null;
99          }
100     }
101     
102     protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode, DatabaseType storageType) throws SQLException;
103     
104     protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement, SQLException ex);
105 }