View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.handler;
19  
20  import com.google.common.base.Strings;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelId;
23  import lombok.extern.slf4j.Slf4j;
24  import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
25  import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
26  import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
27  import org.apache.shardingsphere.data.pipeline.cdc.api.StreamDataParameter;
28  import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
29  import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
30  import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
31  import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
32  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
33  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
34  import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
35  import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
36  import org.apache.shardingsphere.data.pipeline.cdc.exception.StreamDatabaseNotFoundException;
37  import org.apache.shardingsphere.data.pipeline.cdc.exception.MissingRequiredStreamDataSourceException;
38  import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
39  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
40  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
41  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
42  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
43  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
44  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
45  import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataNodeUtils;
46  import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
47  import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
48  import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
49  import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
50  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
51  import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
52  import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
53  import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
54  import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
55  import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
56  import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
57  import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
58  import org.apache.shardingsphere.infra.datanode.DataNode;
59  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
60  import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
61  import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
62  
63  import java.util.ArrayList;
64  import java.util.Collection;
65  import java.util.HashSet;
66  import java.util.List;
67  import java.util.Map;
68  import java.util.Properties;
69  import java.util.Set;
70  import java.util.stream.Collectors;
71  
72  /**
73   * CDC backend handler.
74   */
75  @Slf4j
76  public final class CDCBackendHandler {
77      
78      private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
79      
80      private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(new CDCJobType());
81      
82      /**
83       * Get database name by job ID.
84       *
85       * @param jobId job ID
86       * @return database
87       */
88      public String getDatabaseNameByJobId(final String jobId) {
89          return jobConfigManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
90      }
91      
92      /**
93       * Stream data.
94       *
95       * @param requestId request ID
96       * @param requestBody stream data request body
97       * @param connectionContext connection context
98       * @param channel channel
99       * @return CDC response
100      */
101     public CDCResponse streamData(final String requestId, final StreamDataRequestBody requestBody, final CDCConnectionContext connectionContext, final Channel channel) {
102         ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
103         ShardingSpherePreconditions.checkNotNull(database,
104                 () -> new CDCExceptionWrapper(requestId, new StreamDatabaseNotFoundException(String.format("%s database is not exists", requestBody.getDatabase()))));
105         Map<String, Set<String>> schemaTableNameMap;
106         Collection<String> tableNames;
107         Set<String> schemaTableNames = new HashSet<>();
108         DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData();
109         if (dialectDatabaseMetaData.isSchemaAvailable()) {
110             schemaTableNameMap = CDCSchemaTableUtils.parseTableExpressionWithSchema(database, requestBody.getSourceSchemaTableList());
111             // TODO if different schema have same table names, table name may be overwritten, because the table name at sharding rule not contain schema.
112             tableNames = schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
113             schemaTableNameMap.forEach((k, v) -> v.forEach(tableName -> schemaTableNames.add(k.isEmpty() ? tableName : String.join(".", k, tableName))));
114         } else {
115             schemaTableNames.addAll(CDCSchemaTableUtils.parseTableExpressionWithoutSchema(database, requestBody.getSourceSchemaTableList().stream().map(SchemaTable::getTable)
116                     .collect(Collectors.toList())));
117             tableNames = schemaTableNames;
118         }
119         ShardingSpherePreconditions.checkNotEmpty(tableNames, () -> new CDCExceptionWrapper(requestId, new MissingRequiredStreamDataSourceException()));
120         Map<String, List<DataNode>> actualDataNodesMap = CDCDataNodeUtils.buildDataNodesMap(database, tableNames);
121         ShardingSpherePreconditions.checkNotEmpty(actualDataNodesMap, () -> new PipelineInvalidParameterException(String.format("Not find table %s", tableNames)));
122         // TODO Add globalCSNSupported to isolate it with decodeWithTx flag, they're different. And also update CDCJobPreparer needSorting flag.
123         boolean decodeWithTx = DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, database.getProtocolType()).isSupportGlobalCSN();
124         StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new ArrayList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
125         String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new Properties());
126         connectionContext.setJobId(jobId);
127         startStreaming(jobId, connectionContext, channel);
128         return CDCResponseUtils.succeed(requestId, ResponseCase.STREAM_DATA_RESULT, StreamDataResult.newBuilder().setStreamingId(jobId).build());
129     }
130     
131     /**
132      * Start streaming.
133      *
134      * @param jobId job ID
135      * @param channel channel
136      * @param connectionContext connection context
137      */
138     public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
139         CDCJobConfiguration cdcJobConfig = jobConfigManager.getJobConfiguration(jobId);
140         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
141         PipelineJobRegistry.stop(jobId);
142         ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
143         jobAPI.start(jobId, new PipelineCDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
144         connectionContext.setJobId(jobId);
145     }
146     
147     /**
148      * Stop streaming.
149      *
150      * @param jobId job ID
151      * @param channelId channel ID
152      */
153     public void stopStreaming(final String jobId, final ChannelId channelId) {
154         if (Strings.isNullOrEmpty(jobId)) {
155             log.warn("job id is null or empty, ignored");
156             return;
157         }
158         CDCJob job = (CDCJob) PipelineJobRegistry.get(jobId);
159         if (null == job) {
160             return;
161         }
162         if (((PipelineCDCSocketSink) job.getSink()).getChannel().id().equals(channelId)) {
163             log.info("close CDC job, channel id: {}", channelId);
164             PipelineJobRegistry.stop(jobId);
165             jobAPI.disable(jobId);
166         }
167     }
168     
169     /**
170      * Drop streaming.
171      *
172      * @param jobId job ID
173      */
174     public void dropStreaming(final String jobId) {
175         ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
176                 () -> new PipelineInternalException("Can't drop streaming job which is active"));
177         jobAPI.drop(jobId);
178     }
179     
180     /**
181      * Process ack.
182      *
183      * @param requestBody request body
184      */
185     public void processAck(final AckStreamingRequestBody requestBody) {
186         CDCAckId ackId = CDCAckId.unmarshal(requestBody.getAckId());
187         CDCImporter importer = CDCImporterManager.getImporter(ackId.getImporterId());
188         if (null == importer) {
189             log.warn("Could not find importer, ack id: {}", ackId.marshal());
190             return;
191         }
192         importer.ack(ackId.marshal());
193     }
194 }