View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.distsql.parser.core;
19  
20  import org.antlr.v4.runtime.tree.ParseTree;
21  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.updatable.DropStreamingStatement;
22  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingListStatement;
23  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingRuleStatement;
24  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingStatusStatement;
25  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
26  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
27  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlterStreamingRuleContext;
28  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.BatchSizeContext;
29  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
30  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertiesDefinitionContext;
31  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertyContext;
32  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RateLimiterContext;
33  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
34  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
35  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
36  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingRuleContext;
37  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
38  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
39  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.TransmissionRuleContext;
40  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
41  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WriteDefinitionContext;
42  import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
43  import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
44  import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
45  import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
46  import org.apache.shardingsphere.sql.parser.api.ASTNode;
47  import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
48  import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
49  
50  import java.util.Properties;
51  
52  /**
53   * SQL statement visitor for CDC DistSQL.
54   */
55  public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
56      
57      @Override
58      public ASTNode visitShowStreamingList(final ShowStreamingListContext ctx) {
59          return new ShowStreamingListStatement();
60      }
61      
62      @Override
63      public ASTNode visitShowStreamingStatus(final ShowStreamingStatusContext ctx) {
64          return new ShowStreamingStatusStatement(getIdentifierValue(ctx.jobId()));
65      }
66      
67      @Override
68      public ASTNode visitDropStreaming(final DropStreamingContext ctx) {
69          return new DropStreamingStatement(getIdentifierValue(ctx.jobId()));
70      }
71      
72      private String getIdentifierValue(final ParseTree ctx) {
73          return null == ctx ? null : new IdentifierValue(ctx.getText()).getValue();
74      }
75      
76      @Override
77      public ASTNode visitShowStreamingRule(final ShowStreamingRuleContext ctx) {
78          return new ShowStreamingRuleStatement();
79      }
80      
81      @Override
82      public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext ctx) {
83          return new AlterTransmissionRuleStatement("STREAMING", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
84      }
85      
86      @Override
87      public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
88          TransmissionRuleSegment result = new TransmissionRuleSegment();
89          if (null != ctx.readDefinition()) {
90              result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
91          }
92          if (null != ctx.writeDefinition()) {
93              result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
94          }
95          if (null != ctx.streamChannel()) {
96              result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
97          }
98          return result;
99      }
100     
101     @Override
102     public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
103         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
104     }
105     
106     private Integer getWorkerThread(final WorkerThreadContext ctx) {
107         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
108     }
109     
110     private Integer getBatchSize(final BatchSizeContext ctx) {
111         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
112     }
113     
114     private Integer getShardingSize(final ShardingSizeContext ctx) {
115         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
116     }
117     
118     private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
119         return null == ctx ? null : (AlgorithmSegment) visit(ctx);
120     }
121     
122     @Override
123     public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
124         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
125     }
126     
127     @Override
128     public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
129         return visit(ctx.algorithmDefinition());
130     }
131     
132     @Override
133     public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
134         return visit(ctx.algorithmDefinition());
135     }
136     
137     @Override
138     public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
139         return new AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), buildProperties(ctx.propertiesDefinition()));
140     }
141     
142     private Properties buildProperties(final PropertiesDefinitionContext ctx) {
143         Properties result = new Properties();
144         if (null == ctx) {
145             return result;
146         }
147         for (PropertyContext each : ctx.properties().property()) {
148             result.setProperty(IdentifierValue.getQuotedContent(each.key.getText()), IdentifierValue.getQuotedContent(each.value.getText()));
149         }
150         return result;
151     }
152 }