View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.handler;
19  
20  import com.google.common.base.Strings;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelId;
23  import lombok.extern.slf4j.Slf4j;
24  import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
25  import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
26  import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
27  import org.apache.shardingsphere.data.pipeline.cdc.api.StreamDataParameter;
28  import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
29  import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
30  import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
31  import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
32  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
33  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
34  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
35  import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
36  import org.apache.shardingsphere.data.pipeline.cdc.exception.MissingRequiredStreamDataSourceException;
37  import org.apache.shardingsphere.data.pipeline.cdc.exception.StreamDatabaseNotFoundException;
38  import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
39  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
40  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
41  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
42  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
43  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
44  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
45  import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
46  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
47  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
48  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
49  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
50  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
51  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
52  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
53  import org.apache.shardingsphere.data.pipeline.core.util.PipelineDataNodeUtils;
54  import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
55  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
56  import org.apache.shardingsphere.infra.datanode.DataNode;
57  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
58  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
59  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
60  
61  import java.util.ArrayList;
62  import java.util.Collection;
63  import java.util.HashSet;
64  import java.util.List;
65  import java.util.Map;
66  import java.util.Properties;
67  import java.util.Set;
68  import java.util.stream.Collectors;
69  
70  /**
71   * CDC backend handler.
72   */
73  @Slf4j
74  public final class CDCBackendHandler {
75      
76      private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
77      
78      private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(new CDCJobType());
79      
80      /**
81       * Get database name by job ID.
82       *
83       * @param jobId job ID
84       * @return database
85       */
86      public String getDatabaseNameByJobId(final String jobId) {
87          return jobConfigManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
88      }
89      
90      /**
91       * Stream data.
92       *
93       * @param requestId request ID
94       * @param requestBody stream data request body
95       * @param connectionContext connection context
96       * @param channel channel
97       * @return CDC response
98       */
99      public CDCResponse streamData(final String requestId, final StreamDataRequestBody requestBody, final CDCConnectionContext connectionContext, final Channel channel) {
100         ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
101         ShardingSpherePreconditions.checkNotNull(database,
102                 () -> new CDCExceptionWrapper(requestId, new StreamDatabaseNotFoundException(String.format("%s database is not exists", requestBody.getDatabase()))));
103         Map<String, Set<String>> schemaTableNameMap;
104         Collection<String> tableNames;
105         Set<String> schemaTableNames = new HashSet<>();
106         DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData();
107         if (dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
108             schemaTableNameMap = CDCSchemaTableUtils.parseTableExpressionWithSchema(database, requestBody.getSourceSchemaTableList());
109             // TODO if different schema have same table names, table name may be overwritten, because the table name at sharding rule not contain schema.
110             tableNames = schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
111             schemaTableNameMap.forEach((key, value) -> value.forEach(tableName -> schemaTableNames.add(key.isEmpty() ? tableName : String.join(".", key, tableName))));
112         } else {
113             schemaTableNames.addAll(CDCSchemaTableUtils.parseTableExpressionWithoutSchema(database, requestBody.getSourceSchemaTableList().stream().map(SchemaTable::getTable)
114                     .collect(Collectors.toList())));
115             tableNames = schemaTableNames;
116         }
117         ShardingSpherePreconditions.checkNotEmpty(tableNames, () -> new CDCExceptionWrapper(requestId, new MissingRequiredStreamDataSourceException()));
118         Map<String, List<DataNode>> tableAndDataNodesMap = PipelineDataNodeUtils.buildTableAndDataNodesMap(database, tableNames);
119         // TODO Add globalCSNSupported to isolate it with isDecodeWithTransaction flag, they're different. And also update CDCJobPreparer needSorting flag.
120         boolean isDecodeWithTransaction = new DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData().getTransactionOption().isSupportGlobalCSN();
121         StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new ArrayList<>(schemaTableNames), requestBody.getFull(), tableAndDataNodesMap, isDecodeWithTransaction);
122         String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new Properties());
123         connectionContext.setJobId(jobId);
124         startStreaming(jobId, connectionContext, channel);
125         return CDCResponseUtils.succeed(requestId, ResponseCase.STREAM_DATA_RESULT, StreamDataResult.newBuilder().setStreamingId(jobId).build());
126     }
127     
128     /**
129      * Start streaming.
130      *
131      * @param jobId job ID
132      * @param channel channel
133      * @param connectionContext connection context
134      */
135     public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
136         CDCJobConfiguration cdcJobConfig = jobConfigManager.getJobConfiguration(jobId);
137         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
138         PipelineJobRegistry.stop(jobId);
139         ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
140         jobAPI.start(jobId, new PipelineCDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
141         connectionContext.setJobId(jobId);
142     }
143     
144     /**
145      * Stop streaming.
146      *
147      * @param jobId job ID
148      * @param channelId channel ID
149      */
150     public void stopStreaming(final String jobId, final ChannelId channelId) {
151         if (Strings.isNullOrEmpty(jobId)) {
152             log.warn("job id is null or empty, ignored");
153             return;
154         }
155         CDCJob job = (CDCJob) PipelineJobRegistry.get(jobId);
156         if (null == job) {
157             return;
158         }
159         if (((PipelineCDCSocketSink) job.getSink()).getChannel().id().equals(channelId)) {
160             log.info("close CDC job, channel id: {}", channelId);
161             PipelineJobRegistry.stop(jobId);
162             jobAPI.disable(jobId);
163         }
164     }
165     
166     /**
167      * Drop streaming.
168      *
169      * @param jobId job ID
170      */
171     public void dropStreaming(final String jobId) {
172         ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
173                 () -> new PipelineInternalException("Can't drop streaming job which is active"));
174         jobAPI.drop(jobId);
175     }
176     
177     /**
178      * Process ack.
179      *
180      * @param requestBody request body
181      */
182     public void processAck(final AckStreamingRequestBody requestBody) {
183         CDCAckId ackId = CDCAckId.unmarshal(requestBody.getAckId());
184         CDCImporter importer = CDCImporterManager.getImporter(ackId.getImporterId());
185         if (null == importer) {
186             log.warn("Could not find importer, ack id: {}", ackId.marshal());
187             return;
188         }
189         importer.ack(ackId.marshal());
190     }
191 }