1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.distsql.parser.core;
19
20 import org.antlr.v4.runtime.tree.ParseTree;
21 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingListStatement;
22 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingRuleStatement;
23 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingStatusStatement;
24 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.updatable.DropStreamingStatement;
25 import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
26 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
27 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
28 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlterStreamingRuleContext;
29 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.BatchSizeContext;
30 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
31 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertiesDefinitionContext;
32 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertyContext;
33 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RateLimiterContext;
34 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
35 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
36 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
37 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingRuleContext;
38 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
39 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
40 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.TransmissionRuleContext;
41 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
42 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WriteDefinitionContext;
43 import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
44 import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
45 import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
46 import org.apache.shardingsphere.infra.database.core.metadata.database.enums.QuoteCharacter;
47 import org.apache.shardingsphere.sql.parser.api.ASTNode;
48 import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
49 import org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
50
51 import java.util.Properties;
52
53
54
55
56 public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
57
58 @Override
59 public ASTNode visitShowStreamingList(final ShowStreamingListContext ctx) {
60 return new ShowStreamingListStatement();
61 }
62
63 @Override
64 public ASTNode visitShowStreamingStatus(final ShowStreamingStatusContext ctx) {
65 return new ShowStreamingStatusStatement(getIdentifierValue(ctx.jobId()));
66 }
67
68 @Override
69 public ASTNode visitDropStreaming(final DropStreamingContext ctx) {
70 return new DropStreamingStatement(getIdentifierValue(ctx.jobId()));
71 }
72
73 private String getIdentifierValue(final ParseTree ctx) {
74 return null == ctx ? null : new IdentifierValue(ctx.getText()).getValue();
75 }
76
77 @Override
78 public ASTNode visitShowStreamingRule(final ShowStreamingRuleContext ctx) {
79 return new ShowStreamingRuleStatement();
80 }
81
82 @Override
83 public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext ctx) {
84 return new AlterTransmissionRuleStatement("STREAMING", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
85 }
86
87 @Override
88 public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
89 TransmissionRuleSegment result = new TransmissionRuleSegment();
90 if (null != ctx.readDefinition()) {
91 result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
92 }
93 if (null != ctx.writeDefinition()) {
94 result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
95 }
96 if (null != ctx.streamChannel()) {
97 result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
98 }
99 return result;
100 }
101
102 @Override
103 public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
104 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
105 }
106
107 private Integer getWorkerThread(final WorkerThreadContext ctx) {
108 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
109 }
110
111 private Integer getBatchSize(final BatchSizeContext ctx) {
112 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
113 }
114
115 private Integer getShardingSize(final ShardingSizeContext ctx) {
116 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
117 }
118
119 private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
120 return null == ctx ? null : (AlgorithmSegment) visit(ctx);
121 }
122
123 @Override
124 public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
125 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
126 }
127
128 @Override
129 public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
130 return visit(ctx.algorithmDefinition());
131 }
132
133 @Override
134 public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
135 return visit(ctx.algorithmDefinition());
136 }
137
138 @Override
139 public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
140 return new AlgorithmSegment(getIdentifierValue(ctx.algorithmTypeName()), buildProperties(ctx.propertiesDefinition()));
141 }
142
143 private Properties buildProperties(final PropertiesDefinitionContext ctx) {
144 Properties result = new Properties();
145 if (null == ctx) {
146 return result;
147 }
148 for (PropertyContext each : ctx.properties().property()) {
149 result.setProperty(QuoteCharacter.unwrapAndTrimText(each.key.getText()), QuoteCharacter.unwrapAndTrimText(each.value.getText()));
150 }
151 return result;
152 }
153 }