View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.migration.distsql.parser.core;
19  
20  import com.google.common.base.Splitter;
21  import org.antlr.v4.runtime.tree.ParseTree;
22  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.CheckMigrationStatement;
23  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.CommitMigrationStatement;
24  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.DropMigrationCheckStatement;
25  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.MigrateTableStatement;
26  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RegisterMigrationSourceStorageUnitStatement;
27  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.RollbackMigrationStatement;
28  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationCheckStatusStatement;
29  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationListStatement;
30  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationRuleStatement;
31  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationSourceStorageUnitsStatement;
32  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationStatusStatement;
33  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StartMigrationCheckStatement;
34  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StartMigrationStatement;
35  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StopMigrationCheckStatement;
36  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.StopMigrationStatement;
37  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.updatable.UnregisterMigrationSourceStorageUnitStatement;
38  import org.apache.shardingsphere.data.pipeline.migration.distsql.statement.pojo.SourceTargetEntry;
39  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementBaseVisitor;
40  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser;
41  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlgorithmDefinitionContext;
42  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.AlterMigrationRuleContext;
43  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.BatchSizeContext;
44  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CheckMigrationContext;
45  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.CommitMigrationContext;
46  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.DropMigrationCheckContext;
47  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.MigrateTableContext;
48  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PasswordContext;
49  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertiesDefinitionContext;
50  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.PropertyContext;
51  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RateLimiterContext;
52  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ReadDefinitionContext;
53  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RegisterMigrationSourceStorageUnitContext;
54  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.RollbackMigrationContext;
55  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShardingSizeContext;
56  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckAlgorithmsContext;
57  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationCheckStatusContext;
58  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationListContext;
59  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationRuleContext;
60  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationSourceStorageUnitsContext;
61  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.ShowMigrationStatusContext;
62  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.SourceTableNameContext;
63  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationCheckContext;
64  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StartMigrationContext;
65  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationCheckContext;
66  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StopMigrationContext;
67  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StorageUnitDefinitionContext;
68  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.StreamChannelContext;
69  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TargetTableNameContext;
70  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.TransmissionRuleContext;
71  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.UnregisterMigrationSourceStorageUnitContext;
72  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.WorkerThreadContext;
73  import org.apache.shardingsphere.distsql.parser.autogen.MigrationDistSQLStatementParser.WriteDefinitionContext;
74  import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
75  import org.apache.shardingsphere.distsql.segment.DataSourceSegment;
76  import org.apache.shardingsphere.distsql.segment.HostnameAndPortBasedDataSourceSegment;
77  import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
78  import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
79  import org.apache.shardingsphere.distsql.segment.URLBasedDataSourceSegment;
80  import org.apache.shardingsphere.distsql.statement.ral.queryable.show.ShowPluginsStatement;
81  import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
82  import org.apache.shardingsphere.infra.datanode.DataNode;
83  import org.apache.shardingsphere.sql.parser.api.ASTNode;
84  import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
85  import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
86  
87  import java.util.Collection;
88  import java.util.Collections;
89  import java.util.LinkedList;
90  import java.util.List;
91  import java.util.Properties;
92  import java.util.stream.Collectors;
93  
94  /**
95   * SQL statement visitor for migration DistSQL.
96   */
97  public final class MigrationDistSQLStatementVisitor extends MigrationDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
98      
99      @Override
100     public ASTNode visitShowMigrationRule(final ShowMigrationRuleContext ctx) {
101         return new ShowMigrationRuleStatement();
102     }
103     
104     @Override
105     public ASTNode visitAlterMigrationRule(final AlterMigrationRuleContext ctx) {
106         return new AlterTransmissionRuleStatement("MIGRATION", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
107     }
108     
109     @Override
110     public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
111         TransmissionRuleSegment result = new TransmissionRuleSegment();
112         if (null != ctx.readDefinition()) {
113             result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
114         }
115         if (null != ctx.writeDefinition()) {
116             result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
117         }
118         if (null != ctx.streamChannel()) {
119             result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
120         }
121         return result;
122     }
123     
124     @Override
125     public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
126         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
127     }
128     
129     @Override
130     public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
131         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
132     }
133     
134     private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
135         return null == ctx ? null : (AlgorithmSegment) visit(ctx);
136     }
137     
138     @Override
139     public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
140         return visit(ctx.algorithmDefinition());
141     }
142     
143     @Override
144     public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
145         return visit(ctx.algorithmDefinition());
146     }
147     
148     private Integer getWorkerThread(final WorkerThreadContext ctx) {
149         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
150     }
151     
152     private Integer getBatchSize(final BatchSizeContext ctx) {
153         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
154     }
155     
156     private Integer getShardingSize(final ShardingSizeContext ctx) {
157         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
158     }
159     
160     @Override
161     public ASTNode visitMigrateTable(final MigrateTableContext ctx) {
162         SourceTargetEntry sourceTargetEntry = buildSourceTargetEntry(ctx.sourceTableName(), ctx.targetTableName());
163         return new MigrateTableStatement(Collections.singletonList(sourceTargetEntry), sourceTargetEntry.getTargetDatabaseName());
164     }
165     
166     private SourceTargetEntry buildSourceTargetEntry(final SourceTableNameContext sourceContext, final TargetTableNameContext targetContext) {
167         List<String> source = Splitter.on('.').splitToList(getRequiredIdentifierValue(sourceContext));
168         List<String> target = Splitter.on('.').splitToList(getRequiredIdentifierValue(targetContext));
169         String sourceResourceName = source.get(0);
170         String sourceSchemaName = 3 == source.size() ? source.get(1) : null;
171         String sourceTableName = source.get(source.size() - 1);
172         String targetDatabaseName = target.size() > 1 ? target.get(0) : null;
173         String targetTableName = target.get(target.size() - 1);
174         SourceTargetEntry result = new SourceTargetEntry(targetDatabaseName, new DataNode(sourceResourceName, sourceTableName), targetTableName);
175         result.getSource().setSchemaName(sourceSchemaName);
176         return result;
177     }
178     
179     private String getRequiredIdentifierValue(final ParseTree context) {
180         return new IdentifierValue(context.getText()).getValue();
181     }
182     
183     @Override
184     public ASTNode visitShowMigrationList(final ShowMigrationListContext ctx) {
185         return new ShowMigrationListStatement();
186     }
187     
188     @Override
189     public ASTNode visitShowMigrationStatus(final ShowMigrationStatusContext ctx) {
190         return new ShowMigrationStatusStatement(getRequiredIdentifierValue(ctx.jobId()));
191     }
192     
193     @Override
194     public ASTNode visitStartMigration(final StartMigrationContext ctx) {
195         return new StartMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
196     }
197     
198     @Override
199     public ASTNode visitStopMigration(final StopMigrationContext ctx) {
200         return new StopMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
201     }
202     
203     @Override
204     public ASTNode visitRollbackMigration(final RollbackMigrationContext ctx) {
205         return new RollbackMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
206     }
207     
208     @Override
209     public ASTNode visitCommitMigration(final CommitMigrationContext ctx) {
210         return new CommitMigrationStatement(getRequiredIdentifierValue(ctx.jobId()));
211     }
212     
213     @Override
214     public ASTNode visitCheckMigration(final CheckMigrationContext ctx) {
215         return new CheckMigrationStatement(getRequiredIdentifierValue(ctx.jobId()), null == ctx.algorithmDefinition() ? null : (AlgorithmSegment) visit(ctx.algorithmDefinition()));
216     }
217     
218     @Override
219     public ASTNode visitShowMigrationCheckAlgorithms(final ShowMigrationCheckAlgorithmsContext ctx) {
220         return new ShowPluginsStatement("MIGRATION_CHECK");
221     }
222     
223     @Override
224     public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
225         return new AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), getProperties(ctx.propertiesDefinition()));
226     }
227     
228     private String getIdentifierValue(final ParseTree context) {
229         return new IdentifierValue(context.getText()).getValue();
230     }
231     
232     @Override
233     public ASTNode visitStorageUnitDefinition(final MigrationDistSQLStatementParser.StorageUnitDefinitionContext ctx) {
234         String user = getIdentifierValue(ctx.user());
235         String password = null == ctx.password() ? "" : getPassword(ctx.password());
236         Properties props = getProperties(ctx.propertiesDefinition());
237         return null == ctx.urlSource()
238                 ? new HostnameAndPortBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()),
239                         getIdentifierValue(ctx.simpleSource().hostname()), ctx.simpleSource().port().getText(), getIdentifierValue(ctx.simpleSource().dbName()), user, password, props)
240                 : new URLBasedDataSourceSegment(getIdentifierValue(ctx.storageUnitName()), getIdentifierValue(ctx.urlSource().url()), user, password, props);
241     }
242     
243     private String getPassword(final PasswordContext ctx) {
244         return getIdentifierValue(ctx);
245     }
246     
247     private Properties getProperties(final PropertiesDefinitionContext ctx) {
248         Properties result = new Properties();
249         if (null == ctx || null == ctx.properties()) {
250             return result;
251         }
252         for (PropertyContext each : ctx.properties().property()) {
253             result.setProperty(IdentifierValue.getQuotedContent(each.key.getText()), IdentifierValue.getQuotedContent(each.value.getText()));
254         }
255         return result;
256     }
257     
258     @Override
259     public ASTNode visitRegisterMigrationSourceStorageUnit(final RegisterMigrationSourceStorageUnitContext ctx) {
260         Collection<DataSourceSegment> dataSources = new LinkedList<>();
261         for (StorageUnitDefinitionContext each : ctx.storageUnitDefinition()) {
262             dataSources.add((DataSourceSegment) visit(each));
263         }
264         return new RegisterMigrationSourceStorageUnitStatement(dataSources);
265     }
266     
267     @Override
268     public ASTNode visitUnregisterMigrationSourceStorageUnit(final UnregisterMigrationSourceStorageUnitContext ctx) {
269         return new UnregisterMigrationSourceStorageUnitStatement(ctx.storageUnitName().stream().map(ParseTree::getText).map(each -> new IdentifierValue(each).getValue()).collect(Collectors.toList()));
270     }
271     
272     @Override
273     public ASTNode visitShowMigrationSourceStorageUnits(final ShowMigrationSourceStorageUnitsContext ctx) {
274         return new ShowMigrationSourceStorageUnitsStatement();
275     }
276     
277     @Override
278     public ASTNode visitShowMigrationCheckStatus(final ShowMigrationCheckStatusContext ctx) {
279         return new ShowMigrationCheckStatusStatement(getRequiredIdentifierValue(ctx.jobId()));
280     }
281     
282     @Override
283     public ASTNode visitStartMigrationCheck(final StartMigrationCheckContext ctx) {
284         return new StartMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
285     }
286     
287     @Override
288     public ASTNode visitStopMigrationCheck(final StopMigrationCheckContext ctx) {
289         return new StopMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
290     }
291     
292     @Override
293     public ASTNode visitDropMigrationCheck(final DropMigrationCheckContext ctx) {
294         return new DropMigrationCheckStatement(getRequiredIdentifierValue(ctx.jobId()));
295     }
296 }