View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.migration.distsql.parser.core;
19  
20  import com.google.common.base.Splitter;
21  import org.antlr.v4.runtime.tree.ParseTree;
22  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.CheckMigrationStatement;
23  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.CommitMigrationStatement;
24  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.DropMigrationCheckStatement;
25  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
26  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RegisterMigrationSourceStorageUnitStatement;
27  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RollbackMigrationStatement;
28  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationCheckStatusStatement;
29  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationListStatement;
30  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationRuleStatement;
31  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationSourceStorageUnitsStatement;
32  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationStatusStatement;
33  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StartMigrationCheckStatement;
34  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StartMigrationStatement;
35  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StopMigrationCheckStatement;
36  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StopMigrationStatement;
37  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.UnregisterMigrationSourceStorageUnitStatement;
38  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
39  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementBaseVisitor;
40  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser;
41  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlgorithmDefinitionContext;
42  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlterMigrationRuleContext;
43  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.BatchSizeContext;
44  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CheckMigrationContext;
45  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CommitMigrationContext;
46  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.DropMigrationCheckContext;
47  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.MigrateTableContext;
48  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PasswordContext;
49  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertiesDefinitionContext;
50  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertyContext;
51  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RateLimiterContext;
52  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ReadDefinitionContext;
53  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RegisterMigrationSourceStorageUnitContext;
54  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RollbackMigrationContext;
55  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShardingSizeContext;
56  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckAlgorithmsContext;
57  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckStatusContext;
58  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
59  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationRuleContext;
60  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceStorageUnitsContext;
61  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
62  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.SourceTableNameContext;
63  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationCheckContext;
64  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationContext;
65  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationCheckContext;
66  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationContext;
67  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StorageUnitDefinitionContext;
68  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StreamChannelContext;
69  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TargetTableNameContext;
70  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TransmissionRuleContext;
71  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.UnregisterMigrationSourceStorageUnitContext;
72  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.WorkerThreadContext;
73  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.WriteDefinitionContext;
74  import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
75  import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
76  import org.apache.shardingsphere.distsql.segment.HostnameAndPortBasedDataSourceSegment;
77  import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
78  import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
79  import org.apache.shardingsphere.distsql.segment.URLBasedDataSourceSegment;
80  import org.apache.shardingsphere.distsql.statement.ral.queryable.show.ShowPluginsStatement;
81  import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
82  import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
83  import org.apache.shardingsphere.infra.datanode.DataNode;
84  import org.apache.shardingsphere.sql.parser.api.ASTNode;
85  import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
86  import org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
87  
88  import java.util.Collection;
89  import java.util.Collections;
90  import java.util.LinkedList;
91  import java.util.List;
92  import java.util.Properties;
93  import java.util.stream.Collectors;
94  
95  /**
96   * SQL statement visitor for migration DistSQL.
97   */
98  public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
99      
100     @Override
101     public ASTNode visitShowMigrationRule(final ShowMigrationRuleContext ctx) {
102         return new ShowMigrationRuleStatement();
103     }
104     
105     @Override
106     public ASTNode visitAlterMigrationRule(final AlterMigrationRuleContext ctx) {
107         return new AlterTransmissionRuleStatement("MIGRATION", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
108     }
109     
110     @Override
111     public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
112         TransmissionRuleSegment result = new TransmissionRuleSegment();
113         if (null != ctx.readDefinition()) {
114             result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
115         }
116         if (null != ctx.writeDefinition()) {
117             result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
118         }
119         if (null != ctx.streamChannel()) {
120             result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
121         }
122         return result;
123     }
124     
125     @Override
126     public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
127         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
128     }
129     
130     @Override
131     public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
132         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
133     }
134     
135     private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
136         return null == ctx ? null : (AlgorithmSegment) visit(ctx);
137     }
138     
139     @Override
140     public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
141         return visit(ctx.algorithmDefinition());
142     }
143     
144     @Override
145     public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
146         return visit(ctx.algorithmDefinition());
147     }
148     
149     private Integer getWorkerThread(final WorkerThreadContext ctx) {
150         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
151     }
152     
153     private Integer getBatchSize(final BatchSizeContext ctx) {
154         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
155     }
156     
157     private Integer getShardingSize(final ShardingSizeContext ctx) {
158         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
159     }
160     
161     @Override
162     public ASTNode visitMigrateTable(final MigrateTableContext ctx) {
163         SourceTargetEntry sourceTargetEntry = buildSourceTargetEntry(ctx.sourceTableName(), ctx.targetTableName());
164         return new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), sourceTargetEntry.getTargetDatabaseName());
165     }
166     
167     private SourceTargetEntry buildSourceTargetEntry(final SourceTableNameContext sourceContext, final TargetTableNameContext targetContext) {
168         List<String> source = Splitter.on('.').splitToList(getRequiredIdentifierValue(sourceContext));
169         List<String> target = Splitter.on('.').splitToList(getRequiredIdentifierValue(targetContext));
170         String sourceResourceName = source.get(0);
171         String sourceSchemaName = 3 == source.size() ? source.get(1) : null;
172         String sourceTableName = source.get(source.size() - 1);
173         String targetDatabaseName = target.size() > 1 ? target.get(0) : null;
174         String targetTableName = target.get(target.size() - 1);
175         SourceTargetEntry result = new SourceTargetEntry(targetDatabaseName, new DataNode(sourceResourceName, sourceTableName), targetTableName);
176         result.getSource().setSchemaName(sourceSchemaName);
177         return result;
178     }
179     
180     private String getRequiredIdentifierValue(final ParseTree context) {
181         return new IdentifierValue(context.getText()).getValue();
182     }
183     
184     @Override
185     public ASTNode visitShowMigrationList(final ShowMigrationListContext ctx) {
186         return new ShowMigrationListStatement();
187     }
188     
189     @Override
190     public ASTNode visitShowMigrationStatus(final ShowMigrationStatusContext ctx) {
191         return new ShowMigrationStatusStatement(getRequiredIdentifierValue(ctx.jobId()));
192     }
193     
194     @Override
195     public ASTNode visitStartMigration(final StartMigrationContext ctx) {
196         return new StartMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
197     }
198     
199     @Override
200     public ASTNode visitStopMigration(final StopMigrationContext ctx) {
201         return new StopMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
202     }
203     
204     @Override
205     public ASTNode visitRollbackMigration(final RollbackMigrationContext ctx) {
206         return new RollbackMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
207     }
208     
209     @Override
210     public ASTNode visitCommitMigration(final CommitMigrationContext ctx) {
211         return new CommitMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
212     }
213     
214     @Override
215     public ASTNode visitCheckMigration(final CheckMigrationContext ctx) {
216         return new CheckMigrationStatement(getRequiredIdentifierValue(ctx.jobId()), null == ctx.algorithmDefinition() ? null : (AlgorithmSegment) visit(ctx.algorithmDefinition()));
217     }
218     
219     @Override
220     public ASTNode visitShowMigrationCheckAlgorithms(final ShowMigrationCheckAlgorithmsContext ctx) {
221         return new ShowPluginsStatement("MIGRATION_CHECK");
222     }
223     
224     @Override
225     public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
226         return new AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), getProperties(ctx.propertiesDefinition()));
227     }
228     
229     private String getIdentifierValue(final ParseTree context) {
230         return new IdentifierValue(context.getText()).getValue();
231     }
232     
233     @Override
234     public ASTNode visitStorageUnitDefinition(final MigrationDistSQLStatementParser.StorageUnitDefinitionContext ctx) {
235         String user = getIdentifierValue(ctx.user());
236         String password = null == ctx.password() ? "" : getPassword(ctx.password());
237         Properties props = getProperties(ctx.propertiesDefinition());
238         return null == ctx.urlSource()
239                 ? new HostnameAndPortBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()),
240                         getIdentifierValue(ctx.simpleSource().hostname()), ctx.simpleSource().port().getText(), getIdentifierValue(ctx.simpleSource().dbName()), user, password, props)
241                 : new URLBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()), getIdentifierValue(ctx.urlSource().url()), user, password, props);
242     }
243     
244     private String getPassword(final PasswordContext ctx) {
245         return getIdentifierValue(ctx);
246     }
247     
248     private Properties getProperties(final PropertiesDefinitionContext ctx) {
249         Properties result = new Properties();
250         if (null == ctx || null == ctx.properties()) {
251             return result;
252         }
253         for (PropertyContext each : ctx.properties().property()) {
254             result.setProperty(QuoteCharacter.unwrapAndTrimText(each.key.getText()), QuoteCharacter.unwrapAndTrimText(each.value.getText()));
255         }
256         return result;
257     }
258     
259     @Override
260     public ASTNode visitRegisterMigrationSourceStorageUnit(final RegisterMigrationSourceStorageUnitContext ctx) {
261         Collection<DataSourceSegment> dataSources = new LinkedList<>();
262         for (StorageUnitDefinitionContext each : ctx.storageUnitDefinition()) {
263             dataSources.add((DataSourceSegment) visit(each));
264         }
265         return new RegisterMigrationSourceStorageUnitStatement(dataSources);
266     }
267     
268     @Override
269     public ASTNode visitUnregisterMigrationSourceStorageUnit(final UnregisterMigrationSourceStorageUnitContext ctx) {
270         return new UnregisterMigrationSourceStorageUnitStatement(ctx.storageUnitName().stream().map(ParseTree::getText).map(each -> new IdentifierValue(each).getValue()).collect(Collectors.toList()));
271     }
272     
273     @Override
274     public ASTNode visitShowMigrationSourceStorageUnits(final ShowMigrationSourceStorageUnitsContext ctx) {
275         return new ShowMigrationSourceStorageUnitsStatement();
276     }
277     
278     @Override
279     public ASTNode visitShowMigrationCheckStatus(final ShowMigrationCheckStatusContext ctx) {
280         return new ShowMigrationCheckStatusStatement(getRequiredIdentifierValue(ctx.jobId()));
281     }
282     
283     @Override
284     public ASTNode visitStartMigrationCheck(final StartMigrationCheckContext ctx) {
285         return new StartMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
286     }
287     
288     @Override
289     public ASTNode visitStopMigrationCheck(final StopMigrationCheckContext ctx) {
290         return new StopMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
291     }
292     
293     @Override
294     public ASTNode visitDropMigrationCheck(final DropMigrationCheckContext ctx) {
295         return new DropMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
296     }
297 }