1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.distsql.parser.core;
19
20 import org.antlr.v4.runtime.tree.ParseTree;
21 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.updatable.DropStreamingStatement;
22 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingListStatement;
23 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingRuleStatement;
24 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingStatusStatement;
25 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
26 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
27 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlterStreamingRuleContext;
28 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.BatchSizeContext;
29 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
30 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertiesDefinitionContext;
31 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertyContext;
32 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RateLimiterContext;
33 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
34 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
35 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
36 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingRuleContext;
37 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
38 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
39 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.TransmissionRuleContext;
40 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
41 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WriteDefinitionContext;
42 import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
43 import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
44 import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
45 import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
46 import org.apache.shardingsphere.sql.parser.api.ASTNode;
47 import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
48 import org.apache.shardingsphere.sql.parser.sql.common.value.identifier.IdentifierValue;
49
50 import java.util.Properties;
51
52
53
54
55 public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
56
57 @Override
58 public ASTNode visitShowStreamingList(final ShowStreamingListContext ctx) {
59 return new ShowStreamingListStatement();
60 }
61
62 @Override
63 public ASTNode visitShowStreamingStatus(final ShowStreamingStatusContext ctx) {
64 return new ShowStreamingStatusStatement(getIdentifierValue(ctx.jobId()));
65 }
66
67 @Override
68 public ASTNode visitDropStreaming(final DropStreamingContext ctx) {
69 return new DropStreamingStatement(getIdentifierValue(ctx.jobId()));
70 }
71
72 private String getIdentifierValue(final ParseTree ctx) {
73 return null == ctx ? null : new IdentifierValue(ctx.getText()).getValue();
74 }
75
76 @Override
77 public ASTNode visitShowStreamingRule(final ShowStreamingRuleContext ctx) {
78 return new ShowStreamingRuleStatement();
79 }
80
81 @Override
82 public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext ctx) {
83 return new AlterTransmissionRuleStatement("STREAMING", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
84 }
85
86 @Override
87 public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
88 TransmissionRuleSegment result = new TransmissionRuleSegment();
89 if (null != ctx.readDefinition()) {
90 result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
91 }
92 if (null != ctx.writeDefinition()) {
93 result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
94 }
95 if (null != ctx.streamChannel()) {
96 result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
97 }
98 return result;
99 }
100
101 @Override
102 public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
103 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
104 }
105
106 private Integer getWorkerThread(final WorkerThreadContext ctx) {
107 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
108 }
109
110 private Integer getBatchSize(final BatchSizeContext ctx) {
111 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
112 }
113
114 private Integer getShardingSize(final ShardingSizeContext ctx) {
115 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
116 }
117
118 private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
119 return null == ctx ? null : (AlgorithmSegment) visit(ctx);
120 }
121
122 @Override
123 public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
124 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
125 }
126
127 @Override
128 public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
129 return visit(ctx.algorithmDefinition());
130 }
131
132 @Override
133 public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
134 return visit(ctx.algorithmDefinition());
135 }
136
137 @Override
138 public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
139 return new AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), buildProperties(ctx.propertiesDefinition()));
140 }
141
142 private Properties buildProperties(final PropertiesDefinitionContext ctx) {
143 Properties result = new Properties();
144 if (null == ctx) {
145 return result;
146 }
147 for (PropertyContext each : ctx.properties().property()) {
148 result.setProperty(IdentifierValue.getQuotedContent(each.key.getText()), IdentifierValue.getQuotedContent(each.value.getText()));
149 }
150 return result;
151 }
152 }