View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.mode.manager.cluster.coordinator.subscriber;
19  
20  import com.google.common.eventbus.Subscribe;
21  import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
22  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
23  import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDataSource;
24  import org.apache.shardingsphere.infra.rule.attribute.datasource.StaticDataSourceRuleAttribute;
25  import org.apache.shardingsphere.infra.state.datasource.DataSourceState;
26  import org.apache.shardingsphere.infra.state.datasource.DataSourceStateManager;
27  import org.apache.shardingsphere.mode.manager.ContextManager;
28  import org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
29  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterLockDeletedEvent;
30  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStateEvent;
31  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.cluster.event.ClusterStatusChangedEvent;
32  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOfflineEvent;
33  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.InstanceOnlineEvent;
34  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsEvent;
35  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
36  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
37  import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
38  
39  /**
40   * State changed subscriber.
41   */
42  @SuppressWarnings("unused")
43  public final class StateChangedSubscriber {
44      
45      private final RegistryCenter registryCenter;
46      
47      private final ContextManager contextManager;
48      
49      public StateChangedSubscriber(final RegistryCenter registryCenter, final ContextManager contextManager) {
50          this.registryCenter = registryCenter;
51          this.contextManager = contextManager;
52          contextManager.getInstanceContext().getEventBusContext().register(this);
53      }
54      
55      /**
56       * Renew disabled data source names.
57       * 
58       * @param event Storage node changed event
59       */
60      @Subscribe
61      public synchronized void renew(final StorageNodeChangedEvent event) {
62          QualifiedDataSource qualifiedDataSource = event.getQualifiedDataSource();
63          if (!contextManager.getMetaDataContexts().getMetaData().containsDatabase(qualifiedDataSource.getDatabaseName())) {
64              return;
65          }
66          DataSourceStateManager.getInstance().updateState(qualifiedDataSource, DataSourceState.valueOf(event.getDataSource().getStatus().name()));
67          ShardingSphereDatabase database = contextManager.getMetaDataContexts().getMetaData().getDatabase(qualifiedDataSource.getDatabaseName());
68          for (StaticDataSourceRuleAttribute each : database.getRuleMetaData().getAttributes(StaticDataSourceRuleAttribute.class)) {
69              each.updateStatus(qualifiedDataSource, event.getDataSource().getStatus());
70          }
71      }
72      
73      /**
74       * Reset cluster state.
75       * 
76       * @param event cluster lock deleted event
77       */
78      @Subscribe
79      public synchronized void renew(final ClusterLockDeletedEvent event) {
80          contextManager.getInstanceContext().getEventBusContext().post(new ClusterStatusChangedEvent(event.getState()));
81      }
82      
83      /**
84       * Renew cluster state.
85       * 
86       * @param event cluster state event
87       */
88      @Subscribe
89      public synchronized void renew(final ClusterStateEvent event) {
90          contextManager.updateClusterState(event.getStatus());
91      }
92      
93      /**
94       * Renew instance status.
95       * 
96       * @param event state event
97       */
98      @Subscribe
99      public synchronized void renew(final StateEvent event) {
100         contextManager.getInstanceContext().updateInstanceStatus(event.getInstanceId(), event.getStatus());
101     }
102     
103     /**
104      * Renew instance worker id.
105      * 
106      * @param event worker id event
107      */
108     @Subscribe
109     public synchronized void renew(final WorkerIdEvent event) {
110         contextManager.getInstanceContext().updateWorkerId(event.getInstanceId(), event.getWorkerId());
111     }
112     
113     /**
114      * Renew instance labels.
115      * 
116      * @param event label event
117      */
118     @Subscribe
119     public synchronized void renew(final LabelsEvent event) {
120         // TODO labels may be empty
121         contextManager.getInstanceContext().updateLabel(event.getInstanceId(), event.getLabels());
122     }
123     
124     /**
125      * Renew instance list.
126      *
127      * @param event compute node online event
128      */
129     @Subscribe
130     public synchronized void renew(final InstanceOnlineEvent event) {
131         contextManager.getInstanceContext().addComputeNodeInstance(registryCenter.getComputeNodeStatusService().loadComputeNodeInstance(event.getInstanceMetaData()));
132     }
133     
134     /**
135      * Renew instance list.
136      *
137      * @param event compute node offline event
138      */
139     @Subscribe
140     public synchronized void renew(final InstanceOfflineEvent event) {
141         contextManager.getInstanceContext().deleteComputeNodeInstance(new ComputeNodeInstance(event.getInstanceMetaData()));
142     }
143 }