View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc;
19  
20  import lombok.RequiredArgsConstructor;
21  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
22  import org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
23  import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
24  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorCallback;
25  import org.apache.shardingsphere.infra.executor.sql.context.SQLUnit;
26  import org.apache.shardingsphere.infra.executor.sql.execute.engine.ConnectionMode;
27  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutorExceptionHandler;
28  import org.apache.shardingsphere.infra.executor.sql.hook.SPISQLExecutionHook;
29  import org.apache.shardingsphere.infra.executor.sql.hook.SQLExecutionHook;
30  import org.apache.shardingsphere.infra.executor.sql.process.ProcessEngine;
31  import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
32  import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
33  import org.apache.shardingsphere.sql.parser.statement.core.statement.SQLStatement;
34  
35  import java.sql.SQLException;
36  import java.sql.Statement;
37  import java.util.Collection;
38  import java.util.LinkedList;
39  import java.util.Optional;
40  
41  /**
42   * JDBC executor callback.
43   *
44   * @param <T> class type of return value
45   */
46  @HighFrequencyInvocation
47  @RequiredArgsConstructor
48  public abstract class JDBCExecutorCallback<T> implements ExecutorCallback<JDBCExecutionUnit, T> {
49      
50      private final DatabaseType protocolType;
51      
52      private final ResourceMetaData resourceMetaData;
53      
54      private final SQLStatement sqlStatement;
55      
56      private final boolean isExceptionThrown;
57      
58      private final ProcessEngine processEngine = new ProcessEngine();
59      
60      @Override
61      public final Collection<T> execute(final Collection<JDBCExecutionUnit> executionUnits, final boolean isTrunkThread, final String processId) throws SQLException {
62          // TODO It is better to judge whether need sane result before execute, can avoid exception thrown
63          Collection<T> result = new LinkedList<>();
64          for (JDBCExecutionUnit each : executionUnits) {
65              T executeResult = execute(each, isTrunkThread, processId);
66              if (null != executeResult) {
67                  result.add(executeResult);
68              }
69          }
70          return result;
71      }
72      
73      /*
74       * To make sure SkyWalking will be available at the next release of ShardingSphere, a new plugin should be provided to SkyWalking project if this API changed.
75       *
76       * @see <a href="https://github.com/apache/skywalking/blob/master/docs/en/guides/Java-Plugin-Development-Guide.md#user-content-plugin-development-guide">Plugin Development Guide</a>
77       */
78      private T execute(final JDBCExecutionUnit jdbcExecutionUnit, final boolean isTrunkThread, final String processId) throws SQLException {
79          SQLExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
80          String dataSourceName = jdbcExecutionUnit.getExecutionUnit().getDataSourceName();
81          // TODO use metadata to replace storageUnits to support multiple logic databases
82          StorageUnit storageUnit = resourceMetaData.getStorageUnits().containsKey(dataSourceName)
83                  ? resourceMetaData.getStorageUnits().get(dataSourceName)
84                  : resourceMetaData.getStorageUnits().values().iterator().next();
85          DatabaseType storageType = storageUnit.getStorageType();
86          ConnectionProperties connectionProps = storageUnit.getConnectionProperties();
87          SQLExecutionHook sqlExecutionHook = new SPISQLExecutionHook();
88          try {
89              SQLUnit sqlUnit = jdbcExecutionUnit.getExecutionUnit().getSqlUnit();
90              sqlExecutionHook.start(dataSourceName, sqlUnit.getSql(), sqlUnit.getParameters(), connectionProps, isTrunkThread);
91              T result = executeSQL(sqlUnit.getSql(), jdbcExecutionUnit.getStorageResource(), jdbcExecutionUnit.getConnectionMode(), storageType);
92              sqlExecutionHook.finishSuccess();
93              processEngine.completeSQLUnitExecution(jdbcExecutionUnit, processId);
94              return result;
95          } catch (final SQLException ex) {
96              if (!storageType.equals(protocolType)) {
97                  Optional<T> saneResult = getSaneResult(sqlStatement, ex);
98                  if (saneResult.isPresent()) {
99                      return isTrunkThread ? saneResult.get() : null;
100                 }
101             }
102             sqlExecutionHook.finishFailure(ex);
103             SQLExecutorExceptionHandler.handleException(ex);
104             return null;
105         }
106     }
107     
108     protected abstract T executeSQL(String sql, Statement statement, ConnectionMode connectionMode, DatabaseType storageType) throws SQLException;
109     
110     protected abstract Optional<T> getSaneResult(SQLStatement sqlStatement, SQLException ex);
111 }