View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
19  
20  import com.google.common.eventbus.Subscribe;
21  import org.apache.shardingsphere.infra.executor.sql.process.Process;
22  import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
23  import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
24  import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
25  import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
26  import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
27  import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
28  import org.apache.shardingsphere.mode.manager.ContextManager;
29  import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
30  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
31  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
32  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
33  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
34  
35  import java.sql.SQLException;
36  import java.sql.Statement;
37  import java.util.Collection;
38  
39  /**
40   * TODO replace the old ProcessListChangedSubscriber after meta data refactor completed
41   * New process list changed subscriber.
42   */
43  public final class ProcessListChangedSubscriber {
44      
45      private final RegistryCenter registryCenter;
46      
47      private final ContextManager contextManager;
48      
49      private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();
50      
51      public ProcessListChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
52          this.registryCenter = registryCenter;
53          this.contextManager = contextManager;
54          contextManager.getInstanceContext().getEventBusContext().register(this);
55      }
56      
57      /**
58       * Report local processes.
59       *
60       * @param event show process list trigger event
61       */
62      @Subscribe
63      public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
64          if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
65              return;
66          }
67          Collection<Process> processes = ProcessRegistry.getInstance().listAll();
68          if (!processes.isEmpty()) {
69              registryCenter.getRepository().persist(
70                      ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
71          }
72          registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
73      }
74      
75      /**
76       * Complete to report local processes.
77       *
78       * @param event report local processes completed event
79       */
80      @Subscribe
81      public synchronized void completeToReportLocalProcesses(final ReportLocalProcessesCompletedEvent event) {
82          ProcessOperationLockRegistry.getInstance().notify(event.getTaskId());
83      }
84      
85      /**
86       * Kill local process.
87       *
88       * @param event kill local process event
89       * @throws SQLException SQL exception
90       */
91      @Subscribe
92      public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException {
93          if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
94              return;
95          }
96          Process process = ProcessRegistry.getInstance().get(event.getProcessId());
97          if (null != process) {
98              process.setInterrupted(true);
99              for (Statement each : process.getProcessStatements().values()) {
100                 each.cancel();
101             }
102         }
103         registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
104     }
105     
106     /**
107      * Complete to kill local process.
108      *
109      * @param event kill local process completed event
110      */
111     @Subscribe
112     public synchronized void completeToKillLocalProcess(final KillLocalProcessCompletedEvent event) {
113         ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
114     }
115 }