1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.handler;
19
20 import com.google.common.base.Strings;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelId;
23 import lombok.extern.slf4j.Slf4j;
24 import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
25 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
26 import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
27 import org.apache.shardingsphere.data.pipeline.cdc.api.StreamDataParameter;
28 import org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
29 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
30 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
31 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
32 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
33 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
34 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
35 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
36 import org.apache.shardingsphere.data.pipeline.cdc.exception.StreamDatabaseNotFoundException;
37 import org.apache.shardingsphere.data.pipeline.cdc.exception.MissingRequiredStreamDataSourceException;
38 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
39 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
40 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
41 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
42 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
43 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
44 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
45 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCDataNodeUtils;
46 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
47 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
48 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
49 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
50 import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
51 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
52 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
53 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
54 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
55 import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
56 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
57 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
58 import org.apache.shardingsphere.infra.datanode.DataNode;
59 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
60 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
61 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
62
63 import java.util.ArrayList;
64 import java.util.Collection;
65 import java.util.HashSet;
66 import java.util.List;
67 import java.util.Map;
68 import java.util.Properties;
69 import java.util.Set;
70 import java.util.stream.Collectors;
71
72
73
74
75 @Slf4j
76 public final class CDCBackendHandler {
77
78 private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
79
80 private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(new CDCJobType());
81
82
83
84
85
86
87
88 public String getDatabaseNameByJobId(final String jobId) {
89 return jobConfigManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
90 }
91
92
93
94
95
96
97
98
99
100
101 public CDCResponse streamData(final String requestId, final StreamDataRequestBody requestBody, final CDCConnectionContext connectionContext, final Channel channel) {
102 ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
103 ShardingSpherePreconditions.checkNotNull(database,
104 () -> new CDCExceptionWrapper(requestId, new StreamDatabaseNotFoundException(String.format("%s database is not exists", requestBody.getDatabase()))));
105 Map<String, Set<String>> schemaTableNameMap;
106 Collection<String> tableNames;
107 Set<String> schemaTableNames = new HashSet<>();
108 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData();
109 if (dialectDatabaseMetaData.isSchemaAvailable()) {
110 schemaTableNameMap = CDCSchemaTableUtils.parseTableExpressionWithSchema(database, requestBody.getSourceSchemaTableList());
111
112 tableNames = schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
113 schemaTableNameMap.forEach((k, v) -> v.forEach(tableName -> schemaTableNames.add(k.isEmpty() ? tableName : String.join(".", k, tableName))));
114 } else {
115 schemaTableNames.addAll(CDCSchemaTableUtils.parseTableExpressionWithoutSchema(database, requestBody.getSourceSchemaTableList().stream().map(SchemaTable::getTable)
116 .collect(Collectors.toList())));
117 tableNames = schemaTableNames;
118 }
119 ShardingSpherePreconditions.checkNotEmpty(tableNames, () -> new CDCExceptionWrapper(requestId, new MissingRequiredStreamDataSourceException()));
120 Map<String, List<DataNode>> actualDataNodesMap = CDCDataNodeUtils.buildDataNodesMap(database, tableNames);
121 ShardingSpherePreconditions.checkNotEmpty(actualDataNodesMap, () -> new PipelineInvalidParameterException(String.format("Not find table %s", tableNames)));
122
123 boolean decodeWithTx = DatabaseTypedSPILoader.getService(DialectDatabaseMetaData.class, database.getProtocolType()).isSupportGlobalCSN();
124 StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new ArrayList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, decodeWithTx);
125 String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new Properties());
126 connectionContext.setJobId(jobId);
127 startStreaming(jobId, connectionContext, channel);
128 return CDCResponseUtils.succeed(requestId, ResponseCase.STREAM_DATA_RESULT, StreamDataResult.newBuilder().setStreamingId(jobId).build());
129 }
130
131
132
133
134
135
136
137
138 public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
139 CDCJobConfiguration cdcJobConfig = jobConfigManager.getJobConfiguration(jobId);
140 ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
141 PipelineJobRegistry.stop(jobId);
142 ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
143 jobAPI.start(jobId, new PipelineCDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
144 connectionContext.setJobId(jobId);
145 }
146
147
148
149
150
151
152
153 public void stopStreaming(final String jobId, final ChannelId channelId) {
154 if (Strings.isNullOrEmpty(jobId)) {
155 log.warn("job id is null or empty, ignored");
156 return;
157 }
158 CDCJob job = (CDCJob) PipelineJobRegistry.get(jobId);
159 if (null == job) {
160 return;
161 }
162 if (((PipelineCDCSocketSink) job.getSink()).getChannel().id().equals(channelId)) {
163 log.info("close CDC job, channel id: {}", channelId);
164 PipelineJobRegistry.stop(jobId);
165 jobAPI.disable(jobId);
166 }
167 }
168
169
170
171
172
173
174 public void dropStreaming(final String jobId) {
175 ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
176 () -> new PipelineInternalException("Can't drop streaming job which is active"));
177 jobAPI.drop(jobId);
178 }
179
180
181
182
183
184
185 public void processAck(final AckStreamingRequestBody requestBody) {
186 CDCAckId ackId = CDCAckId.unmarshal(requestBody.getAckId());
187 CDCImporter importer = CDCImporterManager.getImporter(ackId.getImporterId());
188 if (null == importer) {
189 log.warn("Could not find importer, ack id: {}", ackId.marshal());
190 return;
191 }
192 importer.ack(ackId.marshal());
193 }
194 }