1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.migration.distsql.parser.core;
19
20 import com.google.common.base.Splitter;
21 import org.antlr.v4.runtime.tree.ParseTree;
22 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.CheckMigrationStatement;
23 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.CommitMigrationStatement;
24 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.DropMigrationCheckStatement;
25 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
26 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RegisterMigrationSourceStorageUnitStatement;
27 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RollbackMigrationStatement;
28 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationCheckStatusStatement;
29 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationListStatement;
30 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationRuleStatement;
31 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationSourceStorageUnitsStatement;
32 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationStatusStatement;
33 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StartMigrationCheckStatement;
34 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StartMigrationStatement;
35 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StopMigrationCheckStatement;
36 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StopMigrationStatement;
37 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.UnregisterMigrationSourceStorageUnitStatement;
38 import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
39 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementBaseVisitor;
40 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser;
41 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlgorithmDefinitionContext;
42 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlterMigrationRuleContext;
43 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.BatchSizeContext;
44 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CheckMigrationContext;
45 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CommitMigrationContext;
46 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.DropMigrationCheckContext;
47 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.MigrateTableContext;
48 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PasswordContext;
49 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertiesDefinitionContext;
50 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertyContext;
51 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RateLimiterContext;
52 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ReadDefinitionContext;
53 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RegisterMigrationSourceStorageUnitContext;
54 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RollbackMigrationContext;
55 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShardingSizeContext;
56 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckAlgorithmsContext;
57 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckStatusContext;
58 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
59 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationRuleContext;
60 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceStorageUnitsContext;
61 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
62 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.SourceTableNameContext;
63 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationCheckContext;
64 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationContext;
65 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationCheckContext;
66 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationContext;
67 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StorageUnitDefinitionContext;
68 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StreamChannelContext;
69 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TargetTableNameContext;
70 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TransmissionRuleContext;
71 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.UnregisterMigrationSourceStorageUnitContext;
72 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.WorkerThreadContext;
73 import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.WriteDefinitionContext;
74 import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
75 import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
76 import org.apache.shardingsphere.distsql.segment.HostnameAndPortBasedDataSourceSegment;
77 import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
78 import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
79 import org.apache.shardingsphere.distsql.segment.URLBasedDataSourceSegment;
80 import org.apache.shardingsphere.distsql.statement.ral.queryable.show.ShowPluginsStatement;
81 import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
82 import org.apache.shardingsphere.infra.datanode.DataNode;
83 import org.apache.shardingsphere.sql.parser.api.ASTNode;
84 import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
85 import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
86
87 import java.util.Collection;
88 import java.util.Collections;
89 import java.util.LinkedList;
90 import java.util.List;
91 import java.util.Properties;
92 import java.util.stream.Collectors;
93
94
95
96
97 public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
98
99 @Override
100 public ASTNode visitShowMigrationRule(final ShowMigrationRuleContext ctx) {
101 return new ShowMigrationRuleStatement();
102 }
103
104 @Override
105 public ASTNode visitAlterMigrationRule(final AlterMigrationRuleContext ctx) {
106 return new AlterTransmissionRuleStatement("MIGRATION", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
107 }
108
109 @Override
110 public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
111 TransmissionRuleSegment result = new TransmissionRuleSegment();
112 if (null != ctx.readDefinition()) {
113 result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
114 }
115 if (null != ctx.writeDefinition()) {
116 result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
117 }
118 if (null != ctx.streamChannel()) {
119 result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
120 }
121 return result;
122 }
123
124 @Override
125 public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
126 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
127 }
128
129 @Override
130 public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
131 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
132 }
133
134 private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
135 return null == ctx ? null : (AlgorithmSegment) visit(ctx);
136 }
137
138 @Override
139 public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
140 return visit(ctx.algorithmDefinition());
141 }
142
143 @Override
144 public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
145 return visit(ctx.algorithmDefinition());
146 }
147
148 private Integer getWorkerThread(final WorkerThreadContext ctx) {
149 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
150 }
151
152 private Integer getBatchSize(final BatchSizeContext ctx) {
153 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
154 }
155
156 private Integer getShardingSize(final ShardingSizeContext ctx) {
157 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
158 }
159
160 @Override
161 public ASTNode visitMigrateTable(final MigrateTableContext ctx) {
162 SourceTargetEntry sourceTargetEntry = buildSourceTargetEntry(ctx.sourceTableName(), ctx.targetTableName());
163 return new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), sourceTargetEntry.getTargetDatabaseName());
164 }
165
166 private SourceTargetEntry buildSourceTargetEntry(final SourceTableNameContext sourceContext, final TargetTableNameContext targetContext) {
167 List<String> source = Splitter.on('.').splitToList(getRequiredIdentifierValue(sourceContext));
168 List<String> target = Splitter.on('.').splitToList(getRequiredIdentifierValue(targetContext));
169 String sourceResourceName = source.get(0);
170 String sourceSchemaName = 3 == source.size() ? source.get(1) : null;
171 String sourceTableName = source.get(source.size() - 1);
172 String targetDatabaseName = target.size() > 1 ? target.get(0) : null;
173 String targetTableName = target.get(target.size() - 1);
174 SourceTargetEntry result = new SourceTargetEntry(targetDatabaseName, new DataNode(sourceResourceName, sourceTableName), targetTableName);
175 result.getSource().setSchemaName(sourceSchemaName);
176 return result;
177 }
178
179 private String getRequiredIdentifierValue(final ParseTree context) {
180 return new IdentifierValue(context.getText()).getValue();
181 }
182
183 @Override
184 public ASTNode visitShowMigrationList(final ShowMigrationListContext ctx) {
185 return new ShowMigrationListStatement();
186 }
187
188 @Override
189 public ASTNode visitShowMigrationStatus(final ShowMigrationStatusContext ctx) {
190 return new ShowMigrationStatusStatement(getRequiredIdentifierValue(ctx.jobId()));
191 }
192
193 @Override
194 public ASTNode visitStartMigration(final StartMigrationContext ctx) {
195 return new StartMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
196 }
197
198 @Override
199 public ASTNode visitStopMigration(final StopMigrationContext ctx) {
200 return new StopMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
201 }
202
203 @Override
204 public ASTNode visitRollbackMigration(final RollbackMigrationContext ctx) {
205 return new RollbackMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
206 }
207
208 @Override
209 public ASTNode visitCommitMigration(final CommitMigrationContext ctx) {
210 return new CommitMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
211 }
212
213 @Override
214 public ASTNode visitCheckMigration(final CheckMigrationContext ctx) {
215 return new CheckMigrationStatement(getRequiredIdentifierValue(ctx.jobId()), null == ctx.algorithmDefinition() ? null : (AlgorithmSegment) visit(ctx.algorithmDefinition()));
216 }
217
218 @Override
219 public ASTNode visitShowMigrationCheckAlgorithms(final ShowMigrationCheckAlgorithmsContext ctx) {
220 return new ShowPluginsStatement("MIGRATION_CHECK");
221 }
222
223 @Override
224 public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
225 return new AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), getProperties(ctx.propertiesDefinition()));
226 }
227
228 private String getIdentifierValue(final ParseTree context) {
229 return new IdentifierValue(context.getText()).getValue();
230 }
231
232 @Override
233 public ASTNode visitStorageUnitDefinition(final MigrationDistSQLStatementParser.StorageUnitDefinitionContext ctx) {
234 String user = getIdentifierValue(ctx.user());
235 String password = null == ctx.password() ? "" : getPassword(ctx.password());
236 Properties props = getProperties(ctx.propertiesDefinition());
237 return null == ctx.urlSource()
238 ? new HostnameAndPortBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()),
239 getIdentifierValue(ctx.simpleSource().hostname()), ctx.simpleSource().port().getText(), getIdentifierValue(ctx.simpleSource().dbName()), user, password, props)
240 : new URLBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()), getIdentifierValue(ctx.urlSource().url()), user, password, props);
241 }
242
243 private String getPassword(final PasswordContext ctx) {
244 return getIdentifierValue(ctx);
245 }
246
247 private Properties getProperties(final PropertiesDefinitionContext ctx) {
248 Properties result = new Properties();
249 if (null == ctx || null == ctx.properties()) {
250 return result;
251 }
252 for (PropertyContext each : ctx.properties().property()) {
253 result.setProperty(IdentifierValue.getQuotedContent(each.key.getText()), IdentifierValue.getQuotedContent(each.value.getText()));
254 }
255 return result;
256 }
257
258 @Override
259 public ASTNode visitRegisterMigrationSourceStorageUnit(final RegisterMigrationSourceStorageUnitContext ctx) {
260 Collection<DataSourceSegment> dataSources = new LinkedList<>();
261 for (StorageUnitDefinitionContext each : ctx.storageUnitDefinition()) {
262 dataSources.add((DataSourceSegment) visit(each));
263 }
264 return new RegisterMigrationSourceStorageUnitStatement(dataSources);
265 }
266
267 @Override
268 public ASTNode visitUnregisterMigrationSourceStorageUnit(final UnregisterMigrationSourceStorageUnitContext ctx) {
269 return new UnregisterMigrationSourceStorageUnitStatement(ctx.storageUnitName().stream().map(ParseTree::getText).map(each -> new IdentifierValue(each).getValue()).collect(Collectors.toList()));
270 }
271
272 @Override
273 public ASTNode visitShowMigrationSourceStorageUnits(final ShowMigrationSourceStorageUnitsContext ctx) {
274 return new ShowMigrationSourceStorageUnitsStatement();
275 }
276
277 @Override
278 public ASTNode visitShowMigrationCheckStatus(final ShowMigrationCheckStatusContext ctx) {
279 return new ShowMigrationCheckStatusStatement(getRequiredIdentifierValue(ctx.jobId()));
280 }
281
282 @Override
283 public ASTNode visitStartMigrationCheck(final StartMigrationCheckContext ctx) {
284 return new StartMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
285 }
286
287 @Override
288 public ASTNode visitStopMigrationCheck(final StopMigrationCheckContext ctx) {
289 return new StopMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
290 }
291
292 @Override
293 public ASTNode visitDropMigrationCheck(final DropMigrationCheckContext ctx) {
294 return new DropMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
295 }
296 }