View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.distsql.parser.core;
19  
20  import org.antlr.v4.runtime.tree.ParseTree;
21  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingListStatement;
22  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingRuleStatement;
23  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingStatusStatement;
24  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.updatable.DropStreamingStatement;
25  import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
26  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
27  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
28  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlterStreamingRuleContext;
29  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.BatchSizeContext;
30  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
31  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertiesDefinitionContext;
32  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertyContext;
33  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RateLimiterContext;
34  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
35  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
36  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
37  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingRuleContext;
38  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
39  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
40  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.TransmissionRuleContext;
41  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
42  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WriteDefinitionContext;
43  import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
44  import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
45  import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
46  import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
47  import org.apache.shardingsphere.sql.parser.api.ASTNode;
48  import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
49  import org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
50  
51  import java.util.Properties;
52  
53  /**
54   * SQL statement visitor for CDC DistSQL.
55   */
56  public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
57      
58      @Override
59      public ASTNode visitShowStreamingList(final ShowStreamingListContext ctx) {
60          return new ShowStreamingListStatement();
61      }
62      
63      @Override
64      public ASTNode visitShowStreamingStatus(final ShowStreamingStatusContext ctx) {
65          return new ShowStreamingStatusStatement(getIdentifierValue(ctx.jobId()));
66      }
67      
68      @Override
69      public ASTNode visitDropStreaming(final DropStreamingContext ctx) {
70          return new DropStreamingStatement(getIdentifierValue(ctx.jobId()));
71      }
72      
73      private String getIdentifierValue(final ParseTree ctx) {
74          return null == ctx ? null : new IdentifierValue(ctx.getText()).getValue();
75      }
76      
77      @Override
78      public ASTNode visitShowStreamingRule(final ShowStreamingRuleContext ctx) {
79          return new ShowStreamingRuleStatement();
80      }
81      
82      @Override
83      public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext ctx) {
84          return new AlterTransmissionRuleStatement("STREAMING", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
85      }
86      
87      @Override
88      public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
89          TransmissionRuleSegment result = new TransmissionRuleSegment();
90          if (null != ctx.readDefinition()) {
91              result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
92          }
93          if (null != ctx.writeDefinition()) {
94              result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
95          }
96          if (null != ctx.streamChannel()) {
97              result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
98          }
99          return result;
100     }
101     
102     @Override
103     public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
104         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
105     }
106     
107     private Integer getWorkerThread(final WorkerThreadContext ctx) {
108         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
109     }
110     
111     private Integer getBatchSize(final BatchSizeContext ctx) {
112         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
113     }
114     
115     private Integer getShardingSize(final ShardingSizeContext ctx) {
116         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
117     }
118     
119     private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
120         return null == ctx ? null : (AlgorithmSegment) visit(ctx);
121     }
122     
123     @Override
124     public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
125         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
126     }
127     
128     @Override
129     public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
130         return visit(ctx.algorithmDefinition());
131     }
132     
133     @Override
134     public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
135         return visit(ctx.algorithmDefinition());
136     }
137     
138     @Override
139     public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
140         return new AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), buildProperties(ctx.propertiesDefinition()));
141     }
142     
143     private Properties buildProperties(final PropertiesDefinitionContext ctx) {
144         Properties result = new Properties();
145         if (null == ctx) {
146             return result;
147         }
148         for (PropertyContext each : ctx.properties().property()) {
149             result.setProperty(QuoteCharacter.unwrapAndTrimText(each.key.getText()), QuoteCharacter.unwrapAndTrimText(each.value.getText()));
150         }
151         return result;
152     }
153 }