View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.data.pipeline.cdc.distsql.parser.core;
19  
20  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingListStatement;
21  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingRuleStatement;
22  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingStatusStatement;
23  import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.updatable.DropStreamingStatement;
24  import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
25  import org.apache.shardingsphere.database.connector.core.metadata.database.enums.QuoteCharacter;
26  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
27  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
28  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlterStreamingRuleContext;
29  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.BatchSizeContext;
30  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
31  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertiesDefinitionContext;
32  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertyContext;
33  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RateLimiterContext;
34  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
35  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
36  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
37  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingRuleContext;
38  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
39  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
40  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.TransmissionRuleContext;
41  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
42  import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WriteDefinitionContext;
43  import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
44  import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
45  import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
46  import org.apache.shardingsphere.sql.parser.api.ASTNode;
47  import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
48  import org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
49  
50  import java.util.Properties;
51  
52  /**
53   * SQL statement visitor for CDC DistSQL.
54   */
55  public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
56      
57      @Override
58      public ASTNode visitShowStreamingList(final ShowStreamingListContext ctx) {
59          return new ShowStreamingListStatement();
60      }
61      
62      @Override
63      public ASTNode visitShowStreamingStatus(final ShowStreamingStatusContext ctx) {
64          return new ShowStreamingStatusStatement(new IdentifierValue(ctx.jobId().getText()).getValue());
65      }
66      
67      @Override
68      public ASTNode visitDropStreaming(final DropStreamingContext ctx) {
69          return new DropStreamingStatement(new IdentifierValue(ctx.jobId().getText()).getValue());
70      }
71      
72      @Override
73      public ASTNode visitShowStreamingRule(final ShowStreamingRuleContext ctx) {
74          return new ShowStreamingRuleStatement();
75      }
76      
77      @Override
78      public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext ctx) {
79          return new AlterTransmissionRuleStatement("STREAMING", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
80      }
81      
82      @Override
83      public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
84          TransmissionRuleSegment result = new TransmissionRuleSegment();
85          if (null != ctx.readDefinition()) {
86              result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
87          }
88          if (null != ctx.writeDefinition()) {
89              result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
90          }
91          if (null != ctx.streamChannel()) {
92              result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
93          }
94          return result;
95      }
96      
97      @Override
98      public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
99          return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
100     }
101     
102     private Integer getWorkerThread(final WorkerThreadContext ctx) {
103         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
104     }
105     
106     private Integer getBatchSize(final BatchSizeContext ctx) {
107         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
108     }
109     
110     private Integer getShardingSize(final ShardingSizeContext ctx) {
111         return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
112     }
113     
114     private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
115         return null == ctx ? null : (AlgorithmSegment) visit(ctx);
116     }
117     
118     @Override
119     public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
120         return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
121     }
122     
123     @Override
124     public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
125         return visit(ctx.algorithmDefinition());
126     }
127     
128     @Override
129     public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
130         return visit(ctx.algorithmDefinition());
131     }
132     
133     @Override
134     public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
135         return new AlgorithmSegment(new IdentifierValue(ctx.algorithmTypeName().getText()).getValue(), buildProperties(ctx.propertiesDefinition()));
136     }
137     
138     private Properties buildProperties(final PropertiesDefinitionContext ctx) {
139         Properties result = new Properties();
140         if (null == ctx) {
141             return result;
142         }
143         for (PropertyContext each : ctx.properties().property()) {
144             result.setProperty(QuoteCharacter.unwrapAndTrimText(each.key.getText()), QuoteCharacter.unwrapAndTrimText(each.value.getText()));
145         }
146         return result;
147     }
148 }