1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.subscriber;
19
20 import com.google.common.eventbus.Subscribe;
21 import org.apache.shardingsphere.infra.executor.sql.process.Process;
22 import org.apache.shardingsphere.infra.executor.sql.process.ProcessRegistry;
23 import org.apache.shardingsphere.infra.executor.sql.process.lock.ProcessOperationLockRegistry;
24 import org.apache.shardingsphere.infra.executor.sql.process.yaml.swapper.YamlProcessListSwapper;
25 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
26 import org.apache.shardingsphere.metadata.persist.node.ComputeNode;
27 import org.apache.shardingsphere.metadata.persist.node.ProcessNode;
28 import org.apache.shardingsphere.mode.manager.ContextManager;
29 import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
30 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessCompletedEvent;
31 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.KillLocalProcessEvent;
32 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesCompletedEvent;
33 import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ReportLocalProcessesEvent;
34
35 import java.sql.SQLException;
36 import java.sql.Statement;
37 import java.util.Collection;
38
39
40
41
42
43 public final class ProcessListChangedSubscriber {
44
45 private final RegistryCenter registryCenter;
46
47 private final ContextManager contextManager;
48
49 private final YamlProcessListSwapper swapper = new YamlProcessListSwapper();
50
51 public ProcessListChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
52 this.registryCenter = registryCenter;
53 this.contextManager = contextManager;
54 contextManager.getInstanceContext().getEventBusContext().register(this);
55 }
56
57
58
59
60
61
62 @Subscribe
63 public void reportLocalProcesses(final ReportLocalProcessesEvent event) {
64 if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
65 return;
66 }
67 Collection<Process> processes = ProcessRegistry.getInstance().listAll();
68 if (!processes.isEmpty()) {
69 registryCenter.getRepository().persist(
70 ProcessNode.getProcessListInstancePath(event.getTaskId(), event.getInstanceId()), YamlEngine.marshal(swapper.swapToYamlConfiguration(processes)));
71 }
72 registryCenter.getRepository().delete(ComputeNode.getProcessTriggerInstanceNodePath(event.getInstanceId(), event.getTaskId()));
73 }
74
75
76
77
78
79
80 @Subscribe
81 public synchronized void completeToReportLocalProcesses(final ReportLocalProcessesCompletedEvent event) {
82 ProcessOperationLockRegistry.getInstance().notify(event.getTaskId());
83 }
84
85
86
87
88
89
90
91 @Subscribe
92 public synchronized void killLocalProcess(final KillLocalProcessEvent event) throws SQLException {
93 if (!event.getInstanceId().equals(contextManager.getInstanceContext().getInstance().getMetaData().getId())) {
94 return;
95 }
96 Process process = ProcessRegistry.getInstance().get(event.getProcessId());
97 if (null != process) {
98 process.setInterrupted(true);
99 for (Statement each : process.getProcessStatements().values()) {
100 each.cancel();
101 }
102 }
103 registryCenter.getRepository().delete(ComputeNode.getProcessKillInstanceIdNodePath(event.getInstanceId(), event.getProcessId()));
104 }
105
106
107
108
109
110
111 @Subscribe
112 public synchronized void completeToKillLocalProcess(final KillLocalProcessCompletedEvent event) {
113 ProcessOperationLockRegistry.getInstance().notify(event.getProcessId());
114 }
115 }