1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.cdc.distsql.parser.core;
19
20 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingListStatement;
21 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingRuleStatement;
22 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.queryable.ShowStreamingStatusStatement;
23 import org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.updatable.DropStreamingStatement;
24 import org.apache.shardingsphere.data.pipeline.distsql.statement.updatable.AlterTransmissionRuleStatement;
25 import org.apache.shardingsphere.database.connector.core.metadata.database.enums.QuoteCharacter;
26 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementBaseVisitor;
27 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlgorithmDefinitionContext;
28 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.AlterStreamingRuleContext;
29 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.BatchSizeContext;
30 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.DropStreamingContext;
31 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertiesDefinitionContext;
32 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.PropertyContext;
33 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.RateLimiterContext;
34 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ReadDefinitionContext;
35 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShardingSizeContext;
36 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingListContext;
37 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingRuleContext;
38 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.ShowStreamingStatusContext;
39 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.StreamChannelContext;
40 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.TransmissionRuleContext;
41 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WorkerThreadContext;
42 import org.apache.shardingsphere.distsql.parser.autogen.CDCDistSQLStatementParser.WriteDefinitionContext;
43 import org.apache.shardingsphere.distsql.segment.AlgorithmSegment;
44 import org.apache.shardingsphere.distsql.segment.ReadOrWriteSegment;
45 import org.apache.shardingsphere.distsql.segment.TransmissionRuleSegment;
46 import org.apache.shardingsphere.sql.parser.api.ASTNode;
47 import org.apache.shardingsphere.sql.parser.api.visitor.SQLVisitor;
48 import org.apache.shardingsphere.sql.parser.statement.core.value.identifier.IdentifierValue;
49
50 import java.util.Properties;
51
52
53
54
55 public final class CDCDistSQLStatementVisitor extends CDCDistSQLStatementBaseVisitor<ASTNode> implements SQLVisitor<ASTNode> {
56
57 @Override
58 public ASTNode visitShowStreamingList(final ShowStreamingListContext ctx) {
59 return new ShowStreamingListStatement();
60 }
61
62 @Override
63 public ASTNode visitShowStreamingStatus(final ShowStreamingStatusContext ctx) {
64 return new ShowStreamingStatusStatement(new IdentifierValue(ctx.jobId().getText()).getValue());
65 }
66
67 @Override
68 public ASTNode visitDropStreaming(final DropStreamingContext ctx) {
69 return new DropStreamingStatement(new IdentifierValue(ctx.jobId().getText()).getValue());
70 }
71
72 @Override
73 public ASTNode visitShowStreamingRule(final ShowStreamingRuleContext ctx) {
74 return new ShowStreamingRuleStatement();
75 }
76
77 @Override
78 public ASTNode visitAlterStreamingRule(final AlterStreamingRuleContext ctx) {
79 return new AlterTransmissionRuleStatement("STREAMING", (TransmissionRuleSegment) visit(ctx.transmissionRule()));
80 }
81
82 @Override
83 public ASTNode visitTransmissionRule(final TransmissionRuleContext ctx) {
84 TransmissionRuleSegment result = new TransmissionRuleSegment();
85 if (null != ctx.readDefinition()) {
86 result.setReadSegment((ReadOrWriteSegment) visit(ctx.readDefinition()));
87 }
88 if (null != ctx.writeDefinition()) {
89 result.setWriteSegment((ReadOrWriteSegment) visit(ctx.writeDefinition()));
90 }
91 if (null != ctx.streamChannel()) {
92 result.setStreamChannel((AlgorithmSegment) visit(ctx.streamChannel()));
93 }
94 return result;
95 }
96
97 @Override
98 public ASTNode visitReadDefinition(final ReadDefinitionContext ctx) {
99 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getShardingSize(ctx.shardingSize()), getAlgorithmSegment(ctx.rateLimiter()));
100 }
101
102 private Integer getWorkerThread(final WorkerThreadContext ctx) {
103 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
104 }
105
106 private Integer getBatchSize(final BatchSizeContext ctx) {
107 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
108 }
109
110 private Integer getShardingSize(final ShardingSizeContext ctx) {
111 return null == ctx ? null : Integer.parseInt(ctx.intValue().getText());
112 }
113
114 private AlgorithmSegment getAlgorithmSegment(final RateLimiterContext ctx) {
115 return null == ctx ? null : (AlgorithmSegment) visit(ctx);
116 }
117
118 @Override
119 public ASTNode visitWriteDefinition(final WriteDefinitionContext ctx) {
120 return new ReadOrWriteSegment(getWorkerThread(ctx.workerThread()), getBatchSize(ctx.batchSize()), getAlgorithmSegment(ctx.rateLimiter()));
121 }
122
123 @Override
124 public ASTNode visitRateLimiter(final RateLimiterContext ctx) {
125 return visit(ctx.algorithmDefinition());
126 }
127
128 @Override
129 public ASTNode visitStreamChannel(final StreamChannelContext ctx) {
130 return visit(ctx.algorithmDefinition());
131 }
132
133 @Override
134 public ASTNode visitAlgorithmDefinition(final AlgorithmDefinitionContext ctx) {
135 return new AlgorithmSegment(new IdentifierValue(ctx.algorithmTypeName().getText()).getValue(), buildProperties(ctx.propertiesDefinition()));
136 }
137
138 private Properties buildProperties(final PropertiesDefinitionContext ctx) {
139 Properties result = new Properties();
140 if (null == ctx) {
141 return result;
142 }
143 for (PropertyContext each : ctx.properties().property()) {
144 result.setProperty(QuoteCharacter.unwrapAndTrimText(each.key.getText()), QuoteCharacter.unwrapAndTrimText(each.value.getText()));
145 }
146 return result;
147 }
148 }