View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.infra.executor.sql.process;
19  
20  import lombok.Getter;
21  import lombok.RequiredArgsConstructor;
22  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroup;
23  import org.apache.shardingsphere.infra.executor.kernel.model.ExecutionGroupContext;
24  import org.apache.shardingsphere.infra.executor.sql.context.ExecutionUnit;
25  import org.apache.shardingsphere.infra.executor.sql.execute.engine.SQLExecutionUnit;
26  import org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutionUnit;
27  import org.apache.shardingsphere.infra.metadata.user.Grantee;
28  
29  import java.sql.Statement;
30  import java.util.LinkedHashMap;
31  import java.util.Map;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.atomic.AtomicBoolean;
34  import java.util.concurrent.atomic.AtomicInteger;
35  
36  /**
37   * Process.
38   */
39  @RequiredArgsConstructor
40  @Getter
41  public final class Process {
42      
43      private final Map<Integer, Statement> processStatements = new ConcurrentHashMap<>();
44      
45      private final String id;
46      
47      private final long startMillis;
48      
49      private final String sql;
50      
51      private final String databaseName;
52      
53      private final String username;
54      
55      private final String hostname;
56      
57      private final AtomicInteger totalUnitCount;
58      
59      private final AtomicInteger completedUnitCount;
60      
61      private final AtomicBoolean idle;
62      
63      private final AtomicBoolean interrupted;
64      
65      public Process(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
66          this("", executionGroupContext, true);
67      }
68      
69      public Process(final String sql, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
70          this(sql, executionGroupContext, false);
71      }
72      
73      private Process(final String sql, final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext, final boolean idle) {
74          id = executionGroupContext.getReportContext().getProcessId();
75          startMillis = System.currentTimeMillis();
76          this.sql = sql;
77          databaseName = executionGroupContext.getReportContext().getDatabaseName();
78          Grantee grantee = executionGroupContext.getReportContext().getGrantee();
79          username = null == grantee ? "" : grantee.getUsername();
80          hostname = null == grantee ? "" : grantee.getHostname();
81          totalUnitCount = new AtomicInteger(getTotalUnitCount(executionGroupContext));
82          processStatements.putAll(createProcessStatements(executionGroupContext));
83          completedUnitCount = new AtomicInteger(0);
84          this.idle = new AtomicBoolean(idle);
85          interrupted = new AtomicBoolean();
86      }
87      
88      private int getTotalUnitCount(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
89          int result = 0;
90          for (ExecutionGroup<? extends SQLExecutionUnit> each : executionGroupContext.getInputGroups()) {
91              result += each.getInputs().size();
92          }
93          return result;
94      }
95      
96      private Map<Integer, Statement> createProcessStatements(final ExecutionGroupContext<? extends SQLExecutionUnit> executionGroupContext) {
97          Map<Integer, Statement> result = new LinkedHashMap<>();
98          for (ExecutionGroup<? extends SQLExecutionUnit> each : executionGroupContext.getInputGroups()) {
99              for (SQLExecutionUnit executionUnit : each.getInputs()) {
100                 if (executionUnit instanceof JDBCExecutionUnit) {
101                     JDBCExecutionUnit jdbcExecutionUnit = (JDBCExecutionUnit) executionUnit;
102                     result.put(System.identityHashCode(jdbcExecutionUnit.getExecutionUnit()), jdbcExecutionUnit.getStorageResource());
103                 }
104             }
105         }
106         return result;
107     }
108     
109     /**
110      * Complete execution unit.
111      */
112     public void completeExecutionUnit() {
113         completedUnitCount.incrementAndGet();
114     }
115     
116     /**
117      * Is interrupted.
118      *
119      * @return interrupted
120      */
121     public boolean isInterrupted() {
122         return interrupted.get();
123     }
124     
125     /**
126      * Set interrupted.
127      *
128      * @param interrupted interrupted
129      */
130     public void setInterrupted(final boolean interrupted) {
131         this.interrupted.set(interrupted);
132     }
133     
134     /**
135      * Is idle.
136      *
137      * @return idle
138      */
139     public boolean isIdle() {
140         return idle.get();
141     }
142     
143     /**
144      * Put process statement.
145      *
146      * @param executionUnit execution unit
147      * @param statement statement
148      */
149     public void putProcessStatement(final ExecutionUnit executionUnit, final Statement statement) {
150         putProcessStatement(System.identityHashCode(executionUnit), statement);
151     }
152     
153     /**
154      * Put process statement.
155      *
156      * @param executionUnitIdentityHashCode execution unit identity hash code
157      * @param statement statement
158      */
159     public void putProcessStatement(final Integer executionUnitIdentityHashCode, final Statement statement) {
160         processStatements.put(executionUnitIdentityHashCode, statement);
161     }
162     
163     /**
164      * Remove process statement.
165      *
166      * @param executionUnit execution unit
167      */
168     public void removeProcessStatement(final ExecutionUnit executionUnit) {
169         processStatements.remove(System.identityHashCode(executionUnit));
170     }
171 }