1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.migration.distsql.parser.core;
19
20 import com.google.common.base.Splitter;
21 import org.antlr.v4.runtime.tree.ParseTree;
22 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.CheckMigrationStatement;
23 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.CommitMigrationStatement;
24 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.DropMigrationCheckStatement;
25 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
26 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RegisterMigrationSourceStorageUnitStatement;
27 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RollbackMigrationStatement;
28 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationCheckStatusStatement;
29 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationListStatement;
30 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationRuleStatement;
31 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationSourceStorageUnitsStatement;
32 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationStatusStatement;
33 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StartMigrationCheckStatement;
34 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StartMigrationStatement;
35 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StopMigrationCheckStatement;
36 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StopMigrationStatement;
37 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.UnregisterMigrationSourceStorageUnitStatement;
38 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
39 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementBaseVisitor;
40 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser;
41 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlgorithmDefinitionContext;
42 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlterMigrationRuleContext;
43 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.BatchSizeContext;
44 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CheckMigrationContext;
45 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CommitMigrationContext;
46 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.DropMigrationCheckContext;
47 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.MigrateTableContext;
48 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PasswordContext;
49 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertiesDefinitionContext;
50 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertyContext;
51 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RateLimiterContext;
52 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ReadDefinitionContext;
53 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RegisterMigrationSourceStorageUnitContext;
54 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RollbackMigrationContext;
55 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShardingSizeContext;
56 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckAlgorithmsContext;
57 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckStatusContext;
58 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
59 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationRuleContext;
60 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceStorageUnitsContext;
61 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
62 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.SourceTableNameContext;
63 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationCheckContext;
64 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationContext;
65 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationCheckContext;
66 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationContext;
67 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StorageUnitDefinitionContext;
68 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StreamChannelContext;
69 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TargetTableNameContext;
70 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TransmissionRuleContext;
71 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.UnregisterMigrationSourceStorageUnitContext;
72 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.WorkerThreadContext;
73 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.WriteDefinitionContext;
74 import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
75 import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
76 import org.apache.shardingsphere.distsql.segment.HostnameAndPortBasedDataSourceSegment;
77 import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
78 import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
79 import org.apache.shardingsphere.distsql.segment.URLBasedDataSourceSegment;
80 import org.apache.shardingsphere.distsql.statement.ral.queryable.show.ShowPluginsStatement;
81 import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
82 import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
83 import org.apache.shardingsphere.infra.datanode.DataNode;
84 import org.apache.shardingsphere.sql.parser.api.ASTNode;
85 import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
86 import org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
87
88 import java.util.Collection;
89 import java.util.Collections;
90 import java.util.LinkedList;
91 import java.util.List;
92 import java.util.Properties;
93 import java.util.stream.Collectors;
94
95
96
97
98 public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
99
100 @Override
101 public ASTNode visitShowMigrationRule(final ShowMigrationRuleContext ctx) {
102 return new ShowMigrationRuleStatement();
103 }
104
105 @Override
106 public ASTNode visitAlterMigrationRule(final AlterMigrationRuleContext ctx) {
107 return new AlterTransmissionRuleStatement("MIGRATION", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
108 }
109
110 @Override
111 public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
112 TransmissionRuleSegment result = new TransmissionRuleSegment();
113 if (null != ctx.readDefinition()) {
114 result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
115 }
116 if (null != ctx.writeDefinition()) {
117 result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
118 }
119 if (null != ctx.streamChannel()) {
120 result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
121 }
122 return result;
123 }
124
125 @Override
126 public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
127 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
128 }
129
130 @Override
131 public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
132 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
133 }
134
135 private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
136 return null == ctx ? null : (AlgorithmSegment) visit(ctx);
137 }
138
139 @Override
140 public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
141 return visit(ctx.algorithmDefinition());
142 }
143
144 @Override
145 public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
146 return visit(ctx.algorithmDefinition());
147 }
148
149 private Integer getWorkerThread(final WorkerThreadContext ctx) {
150 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
151 }
152
153 private Integer getBatchSize(final BatchSizeContext ctx) {
154 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
155 }
156
157 private Integer getShardingSize(final ShardingSizeContext ctx) {
158 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
159 }
160
161 @Override
162 public ASTNode visitMigrateTable(final MigrateTableContext ctx) {
163 SourceTargetEntry sourceTargetEntry = buildSourceTargetEntry(ctx.sourceTableName(), ctx.targetTableName());
164 return new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), sourceTargetEntry.getTargetDatabaseName());
165 }
166
167 private SourceTargetEntry buildSourceTargetEntry(final SourceTableNameContext sourceContext, final TargetTableNameContext targetContext) {
168 List<String> source = Splitter.on('.').splitToList(getRequiredIdentifierValue(sourceContext));
169 List<String> target = Splitter.on('.').splitToList(getRequiredIdentifierValue(targetContext));
170 String sourceResourceName = source.get(0);
171 String sourceSchemaName = 3 == source.size() ? source.get(1) : null;
172 String sourceTableName = source.get(source.size() - 1);
173 String targetDatabaseName = target.size() > 1 ? target.get(0) : null;
174 String targetTableName = target.get(target.size() - 1);
175 SourceTargetEntry result = new SourceTargetEntry(targetDatabaseName, new DataNode(sourceResourceName, sourceTableName), targetTableName);
176 result.getSource().setSchemaName(sourceSchemaName);
177 return result;
178 }
179
180 private String getRequiredIdentifierValue(final ParseTree context) {
181 return new IdentifierValue(context.getText()).getValue();
182 }
183
184 @Override
185 public ASTNode visitShowMigrationList(final ShowMigrationListContext ctx) {
186 return new ShowMigrationListStatement();
187 }
188
189 @Override
190 public ASTNode visitShowMigrationStatus(final ShowMigrationStatusContext ctx) {
191 return new ShowMigrationStatusStatement(getRequiredIdentifierValue(ctx.jobId()));
192 }
193
194 @Override
195 public ASTNode visitStartMigration(final StartMigrationContext ctx) {
196 return new StartMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
197 }
198
199 @Override
200 public ASTNode visitStopMigration(final StopMigrationContext ctx) {
201 return new StopMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
202 }
203
204 @Override
205 public ASTNode visitRollbackMigration(final RollbackMigrationContext ctx) {
206 return new RollbackMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
207 }
208
209 @Override
210 public ASTNode visitCommitMigration(final CommitMigrationContext ctx) {
211 return new CommitMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
212 }
213
214 @Override
215 public ASTNode visitCheckMigration(final CheckMigrationContext ctx) {
216 return new CheckMigrationStatement(getRequiredIdentifierValue(ctx.jobId()), null == ctx.algorithmDefinition() ? null : (AlgorithmSegment) visit(ctx.algorithmDefinition()));
217 }
218
219 @Override
220 public ASTNode visitShowMigrationCheckAlgorithms(final ShowMigrationCheckAlgorithmsContext ctx) {
221 return new ShowPluginsStatement("MIGRATION_CHECK");
222 }
223
224 @Override
225 public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
226 return new AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), getProperties(ctx.propertiesDefinition()));
227 }
228
229 private String getIdentifierValue(final ParseTree context) {
230 return new IdentifierValue(context.getText()).getValue();
231 }
232
233 @Override
234 public ASTNode visitStorageUnitDefinition(final MigrationDistSQLStatementParser.StorageUnitDefinitionContext ctx) {
235 String user = getIdentifierValue(ctx.user());
236 String password = null == ctx.password() ? "" : getPassword(ctx.password());
237 Properties props = getProperties(ctx.propertiesDefinition());
238 return null == ctx.urlSource()
239 ? new HostnameAndPortBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()),
240 getIdentifierValue(ctx.simpleSource().hostname()), ctx.simpleSource().port().getText(), getIdentifierValue(ctx.simpleSource().dbName()), user, password, props)
241 : new URLBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()), getIdentifierValue(ctx.urlSource().url()), user, password, props);
242 }
243
244 private String getPassword(final PasswordContext ctx) {
245 return getIdentifierValue(ctx);
246 }
247
248 private Properties getProperties(final PropertiesDefinitionContext ctx) {
249 Properties result = new Properties();
250 if (null == ctx || null == ctx.properties()) {
251 return result;
252 }
253 for (PropertyContext each : ctx.properties().property()) {
254 result.setProperty(QuoteCharacter.unwrapAndTrimText(each.key.getText()), QuoteCharacter.unwrapAndTrimText(each.value.getText()));
255 }
256 return result;
257 }
258
259 @Override
260 public ASTNode visitRegisterMigrationSourceStorageUnit(final RegisterMigrationSourceStorageUnitContext ctx) {
261 Collection<DataSourceSegment> dataSources = new LinkedList<>();
262 for (StorageUnitDefinitionContext each : ctx.storageUnitDefinition()) {
263 dataSources.add((DataSourceSegment) visit(each));
264 }
265 return new RegisterMigrationSourceStorageUnitStatement(dataSources);
266 }
267
268 @Override
269 public ASTNode visitUnregisterMigrationSourceStorageUnit(final UnregisterMigrationSourceStorageUnitContext ctx) {
270 return new UnregisterMigrationSourceStorageUnitStatement(ctx.storageUnitName().stream().map(ParseTree::getText).map(each -> new IdentifierValue(each).getValue()).collect(Collectors.toList()));
271 }
272
273 @Override
274 public ASTNode visitShowMigrationSourceStorageUnits(final ShowMigrationSourceStorageUnitsContext ctx) {
275 return new ShowMigrationSourceStorageUnitsStatement();
276 }
277
278 @Override
279 public ASTNode visitShowMigrationCheckStatus(final ShowMigrationCheckStatusContext ctx) {
280 return new ShowMigrationCheckStatusStatement(getRequiredIdentifierValue(ctx.jobId()));
281 }
282
283 @Override
284 public ASTNode visitStartMigrationCheck(final StartMigrationCheckContext ctx) {
285 return new StartMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
286 }
287
288 @Override
289 public ASTNode visitStopMigrationCheck(final StopMigrationCheckContext ctx) {
290 return new StopMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
291 }
292
293 @Override
294 public ASTNode visitDropMigrationCheck(final DropMigrationCheckContext ctx) {
295 return new DropMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
296 }
297 }