View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.infra.executor.sql.process;
19  
20  import lombok.Getter;
21  import lombok.RequiredArgsConstructor;
22  import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation;
23  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
24  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
25  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
26  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
27  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
28  import org.apache.shardingsphere.infra.metadata.user.Grantee;
29  
30  import java.sql.SQLException;
31  import java.sql.Statement;
32  import java.util.LinkedHashMap;
33  import java.util.Map;
34  import java.util.Optional;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  /**
40   * Process.
41   */
42  @HighFrequencyInvocation
43  @RequiredArgsConstructor
44  @Getter
45  public final class Process {
46      
47      private final Map<Integer, Statement> processStatements = new ConcurrentHashMap<>();
48      
49      private final String id;
50      
51      private final long startMillis;
52      
53      private final String sql;
54      
55      private final String databaseName;
56      
57      private final String username;
58      
59      private final String hostname;
60      
61      private final AtomicInteger totalUnitCount;
62      
63      private final AtomicInteger completedUnitCount;
64      
65      private final AtomicBoolean idle;
66      
67      private final AtomicBoolean interrupted;
68      
69      public Process(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
70          this("", executionGroupContext, true);
71      }
72      
73      public Process(final String sql, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
74          this(sql, executionGroupContext, false);
75      }
76      
77      private Process(final String sql, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final boolean idle) {
78          id = executionGroupContext.getReportContext().getProcessId();
79          startMillis = System.currentTimeMillis();
80          this.sql = sql;
81          databaseName = executionGroupContext.getReportContext().getDatabaseName();
82          Optional<Grantee> grantee = executionGroupContext.getReportContext().getGrantee();
83          username = grantee.map(Grantee::getUsername).orElse("");
84          hostname = grantee.map(Grantee::getHostname).orElse("");
85          totalUnitCount = new AtomicInteger(getTotalUnitCount(executionGroupContext));
86          processStatements.putAll(createProcessStatements(executionGroupContext));
87          completedUnitCount = new AtomicInteger(0);
88          this.idle = new AtomicBoolean(idle);
89          interrupted = new AtomicBoolean();
90      }
91      
92      private int getTotalUnitCount(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
93          int result = 0;
94          for (ExecutionGroup<? extends SQLExecutionUnit> each : executionGroupContext.getInputGroups()) {
95              result += each.getInputs().size();
96          }
97          return result;
98      }
99      
100     private Map<Integer, Statement> createProcessStatements(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
101         Map<Integer, Statement> result = new LinkedHashMap<>();
102         for (ExecutionGroup<? extends SQLExecutionUnit> each : executionGroupContext.getInputGroups()) {
103             for (SQLExecutionUnit executionUnit : each.getInputs()) {
104                 if (executionUnit instanceof JDBCExecutionUnit) {
105                     JDBCExecutionUnit jdbcExecutionUnit = (JDBCExecutionUnit) executionUnit;
106                     result.put(System.identityHashCode(jdbcExecutionUnit.getExecutionUnit()), jdbcExecutionUnit.getStorageResource());
107                 }
108             }
109         }
110         return result;
111     }
112     
113     /**
114      * Complete execution unit.
115      */
116     public void completeExecutionUnit() {
117         completedUnitCount.incrementAndGet();
118     }
119     
120     /**
121      * Is interrupted.
122      *
123      * @return interrupted
124      */
125     public boolean isInterrupted() {
126         return interrupted.get();
127     }
128     
129     /**
130      * Set interrupted.
131      *
132      * @param interrupted interrupted
133      */
134     public void setInterrupted(final boolean interrupted) {
135         this.interrupted.set(interrupted);
136     }
137     
138     /**
139      * Is idle.
140      *
141      * @return idle
142      */
143     public boolean isIdle() {
144         return idle.get();
145     }
146     
147     /**
148      * Remove process statement.
149      *
150      * @param executionUnit execution unit
151      */
152     public void removeProcessStatement(final ExecutionUnit executionUnit) {
153         processStatements.remove(System.identityHashCode(executionUnit));
154     }
155     
156     /**
157      * Kill process.
158      *
159      * @throws SQLException SQL exception
160      */
161     public void kill() throws SQLException {
162         setInterrupted(true);
163         for (Statement each : processStatements.values()) {
164             each.cancel();
165         }
166     }
167 }