1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.handler;
19
20 import com.google.common.base.Strings;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelId;
23 import lombok.extern.slf4j.Slf4j;
24 import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
25 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
26 import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
27 import org.apache.shardingsphere.data.pipeline.cdc.api.StreamDataParameter;
28 import org.apache.shardingsphere.data.pipeline.cdc.config.CDCJobConfiguration;
29 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
30 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
31 import org.apache.shardingsphere.data.pipeline.cdc.core.ack.CDCAckId;
32 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
33 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporterManager;
34 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.PipelineCDCSocketSink;
35 import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
36 import org.apache.shardingsphere.data.pipeline.cdc.exception.MissingRequiredStreamDataSourceException;
37 import org.apache.shardingsphere.data.pipeline.cdc.exception.StreamDatabaseNotFoundException;
38 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
39 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
40 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
41 import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
42 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
43 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.ResponseCase;
44 import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
45 import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtils;
46 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
47 import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
48 import org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
49 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
50 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
51 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
52 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
53 import org.apache.shardingsphere.data.pipeline.core.util.PipelineDataNodeUtils;
54 import org.apache.shardingsphere.infra.database.core.metadata.database.metadata.DialectDatabaseMetaData;
55 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
56 import org.apache.shardingsphere.infra.datanode.DataNode;
57 import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
58 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
59 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
60
61 import java.util.ArrayList;
62 import java.util.Collection;
63 import java.util.HashSet;
64 import java.util.List;
65 import java.util.Map;
66 import java.util.Properties;
67 import java.util.Set;
68 import java.util.stream.Collectors;
69
70
71
72
73 @Slf4j
74 public final class CDCBackendHandler {
75
76 private final CDCJobAPI jobAPI = (CDCJobAPI) TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
77
78 private final PipelineJobConfigurationManager jobConfigManager = new PipelineJobConfigurationManager(new CDCJobType());
79
80
81
82
83
84
85
86 public String getDatabaseNameByJobId(final String jobId) {
87 return jobConfigManager.<CDCJobConfiguration>getJobConfiguration(jobId).getDatabaseName();
88 }
89
90
91
92
93
94
95
96
97
98
99 public CDCResponse streamData(final String requestId, final StreamDataRequestBody requestBody, final CDCConnectionContext connectionContext, final Channel channel) {
100 ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(requestBody.getDatabase());
101 ShardingSpherePreconditions.checkNotNull(database,
102 () -> new CDCExceptionWrapper(requestId, new StreamDatabaseNotFoundException(String.format("%s database is not exists", requestBody.getDatabase()))));
103 Map<String, Set<String>> schemaTableNameMap;
104 Collection<String> tableNames;
105 Set<String> schemaTableNames = new HashSet<>();
106 DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData();
107 if (dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()) {
108 schemaTableNameMap = CDCSchemaTableUtils.parseTableExpressionWithSchema(database, requestBody.getSourceSchemaTableList());
109
110 tableNames = schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
111 schemaTableNameMap.forEach((key, value) -> value.forEach(tableName -> schemaTableNames.add(key.isEmpty() ? tableName : String.join(".", key, tableName))));
112 } else {
113 schemaTableNames.addAll(CDCSchemaTableUtils.parseTableExpressionWithoutSchema(database, requestBody.getSourceSchemaTableList().stream().map(SchemaTable::getTable)
114 .collect(Collectors.toList())));
115 tableNames = schemaTableNames;
116 }
117 ShardingSpherePreconditions.checkNotEmpty(tableNames, () -> new CDCExceptionWrapper(requestId, new MissingRequiredStreamDataSourceException()));
118 Map<String, List<DataNode>> tableAndDataNodesMap = PipelineDataNodeUtils.buildTableAndDataNodesMap(database, tableNames);
119
120 boolean isDecodeWithTransaction = new DatabaseTypeRegistry(database.getProtocolType()).getDialectDatabaseMetaData().getTransactionOption().isSupportGlobalCSN();
121 StreamDataParameter parameter = new StreamDataParameter(requestBody.getDatabase(), new ArrayList<>(schemaTableNames), requestBody.getFull(), tableAndDataNodesMap, isDecodeWithTransaction);
122 String jobId = jobAPI.create(parameter, CDCSinkType.SOCKET, new Properties());
123 connectionContext.setJobId(jobId);
124 startStreaming(jobId, connectionContext, channel);
125 return CDCResponseUtils.succeed(requestId, ResponseCase.STREAM_DATA_RESULT, StreamDataResult.newBuilder().setStreamingId(jobId).build());
126 }
127
128
129
130
131
132
133
134
135 public void startStreaming(final String jobId, final CDCConnectionContext connectionContext, final Channel channel) {
136 CDCJobConfiguration cdcJobConfig = jobConfigManager.getJobConfiguration(jobId);
137 ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new PipelineJobNotFoundException(jobId));
138 PipelineJobRegistry.stop(jobId);
139 ShardingSphereDatabase database = PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(cdcJobConfig.getDatabaseName());
140 jobAPI.start(jobId, new PipelineCDCSocketSink(channel, database, cdcJobConfig.getSchemaTableNames()));
141 connectionContext.setJobId(jobId);
142 }
143
144
145
146
147
148
149
150 public void stopStreaming(final String jobId, final ChannelId channelId) {
151 if (Strings.isNullOrEmpty(jobId)) {
152 log.warn("job id is null or empty, ignored");
153 return;
154 }
155 CDCJob job = (CDCJob) PipelineJobRegistry.get(jobId);
156 if (null == job) {
157 return;
158 }
159 if (((PipelineCDCSocketSink) job.getSink()).getChannel().id().equals(channelId)) {
160 log.info("close CDC job, channel id: {}", channelId);
161 PipelineJobRegistry.stop(jobId);
162 jobAPI.disable(jobId);
163 }
164 }
165
166
167
168
169
170
171 public void dropStreaming(final String jobId) {
172 ShardingSpherePreconditions.checkState(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).isDisabled(),
173 () -> new PipelineInternalException("Can't drop streaming job which is active"));
174 jobAPI.drop(jobId);
175 }
176
177
178
179
180
181
182 public void processAck(final AckStreamingRequestBody requestBody) {
183 CDCAckId ackId = CDCAckId.unmarshal(requestBody.getAckId());
184 CDCImporter importer = CDCImporterManager.getImporter(ackId.getImporterId());
185 if (null == importer) {
186 log.warn("Could not find importer, ack id: {}", ackId.marshal());
187 return;
188 }
189 importer.ack(ackId.marshal());
190 }
191 }