View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend.netty;
19  
20  import com.google.common.hash.Hashing;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.util.AttributeKey;
26  import lombok.extern.slf4j.Slf4j;
27  import org.apache.shardingsphere.authority.model.ShardingSpherePrivileges;
28  import org.apache.shardingsphere.authority.rule.AuthorityRule;
29  import org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
30  import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCExceptionWrapper;
31  import org.apache.shardingsphere.data.pipeline.cdc.exception.CDCLoginFailedException;
32  import org.apache.shardingsphere.data.pipeline.cdc.exception.EmptyCDCLoginRequestBodyException;
33  import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
34  import org.apache.shardingsphere.data.pipeline.cdc.handler.CDCBackendHandler;
35  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.AckStreamingRequestBody;
36  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
37  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.DropStreamingRequestBody;
38  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.LoginRequestBody.BasicBody;
39  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StartStreamingRequestBody;
40  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StopStreamingRequestBody;
41  import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
42  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
43  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
44  import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.ServerGreetingResult;
45  import org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
46  import org.apache.shardingsphere.infra.autogen.version.ShardingSphereVersion;
47  import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
48  import org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
49  import org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
50  import org.apache.shardingsphere.infra.exception.dialect.SQLExceptionTransformEngine;
51  import org.apache.shardingsphere.infra.exception.dialect.exception.syntax.database.UnknownDatabaseException;
52  import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredRuleException;
53  import org.apache.shardingsphere.infra.exception.mysql.exception.AccessDeniedException;
54  import org.apache.shardingsphere.infra.metadata.user.Grantee;
55  import org.apache.shardingsphere.infra.metadata.user.ShardingSphereUser;
56  import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
57  import org.apache.shardingsphere.proxy.frontend.protocol.FrontDatabaseProtocolTypeFactory;
58  
59  import java.net.InetSocketAddress;
60  import java.net.SocketAddress;
61  import java.sql.SQLException;
62  import java.util.Objects;
63  import java.util.Optional;
64  
65  /**
66   * CDC channel inbound handler.
67   */
68  @Slf4j
69  public final class CDCChannelInboundHandler extends ChannelInboundHandlerAdapter {
70      
71      private static final AttributeKey<CDCConnectionContext> CONNECTION_CONTEXT_KEY = AttributeKey.valueOf("connection.context");
72      
73      private final CDCBackendHandler backendHandler = new CDCBackendHandler();
74      
75      @Override
76      public void channelActive(final ChannelHandlerContext ctx) {
77          CDCResponse response = CDCResponse.newBuilder().setServerGreetingResult(ServerGreetingResult.newBuilder().setServerVersion(ShardingSphereVersion.VERSION).setProtocolVersion("1").build())
78                  .setStatus(Status.SUCCEED).build();
79          ctx.writeAndFlush(response);
80      }
81      
82      @Override
83      public void channelInactive(final ChannelHandlerContext ctx) {
84          CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
85          if (null != connectionContext && null != connectionContext.getJobId()) {
86              backendHandler.stopStreaming(connectionContext.getJobId(), ctx.channel().id());
87          }
88          ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(null);
89      }
90      
91      @Override
92      public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
93          log.error("caught CDC resolution error", cause);
94          // TODO add CDC exception to wrapper this exception, and add the parameters requestId and whether to close connect
95          ChannelFuture channelFuture;
96          if (cause instanceof CDCExceptionWrapper) {
97              CDCExceptionWrapper wrapper = (CDCExceptionWrapper) cause;
98              SQLException sqlException = SQLExceptionTransformEngine.toSQLException(wrapper.getCause(), FrontDatabaseProtocolTypeFactory.getDatabaseType());
99              channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed(wrapper.getRequestId(), sqlException.getSQLState(), sqlException.getMessage()));
100         } else {
101             channelFuture = ctx.writeAndFlush(CDCResponseUtils.failed("", XOpenSQLState.GENERAL_ERROR.getValue(), String.valueOf(cause.getMessage())));
102         }
103         CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
104         if (null == connectionContext) {
105             channelFuture.addListener(ChannelFutureListener.CLOSE);
106         }
107     }
108     
109     @Override
110     public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
111         CDCConnectionContext connectionContext = ctx.channel().attr(CONNECTION_CONTEXT_KEY).get();
112         CDCRequest request = (CDCRequest) msg;
113         if (null == connectionContext || request.hasLoginRequestBody()) {
114             processLogin(ctx, request);
115             return;
116         }
117         switch (request.getType()) {
118             case STREAM_DATA:
119                 processStreamDataRequest(ctx, request, connectionContext);
120                 break;
121             case ACK_STREAMING:
122                 processAckStreamingRequest(request);
123                 break;
124             case STOP_STREAMING:
125                 processStopStreamingRequest(ctx, request, connectionContext);
126                 break;
127             case START_STREAMING:
128                 processStartStreamingRequest(ctx, request, connectionContext);
129                 break;
130             case DROP_STREAMING:
131                 processDropStreamingRequest(ctx, request, connectionContext);
132                 break;
133             default:
134                 log.warn("can't handle this type of request {}", request);
135                 break;
136         }
137     }
138     
139     private void processLogin(final ChannelHandlerContext ctx, final CDCRequest request) {
140         ShardingSpherePreconditions.checkState(request.hasLoginRequestBody() && request.getLoginRequestBody().hasBasicBody(),
141                 () -> new CDCExceptionWrapper(request.getRequestId(), new EmptyCDCLoginRequestBodyException()));
142         BasicBody body = request.getLoginRequestBody().getBasicBody();
143         AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().getSingleRule(AuthorityRule.class);
144         Optional<ShardingSphereUser> user = authorityRule.findUser(new Grantee(body.getUsername(), getHostAddress(ctx)));
145         if (user.isPresent() && Objects.equals(Hashing.sha256().hashBytes(user.get().getPassword().getBytes()).toString().toUpperCase(), body.getPassword())) {
146             ctx.channel().attr(CONNECTION_CONTEXT_KEY).set(new CDCConnectionContext(user.get()));
147             ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
148         } else {
149             throw new CDCExceptionWrapper(request.getRequestId(), new CDCLoginFailedException());
150         }
151     }
152     
153     private void checkPrivileges(final String requestId, final Grantee grantee, final String currentDatabase) {
154         AuthorityRule authorityRule = ProxyContext.getInstance().getContextManager().getMetaDataContexts().getMetaData().getGlobalRuleMetaData().findSingleRule(AuthorityRule.class)
155                 .orElseThrow(() -> new CDCExceptionWrapper(requestId, new MissingRequiredRuleException("authority")));
156         ShardingSpherePrivileges privileges = authorityRule.findPrivileges(grantee)
157                 .orElseThrow(() -> new CDCExceptionWrapper(requestId, new AccessDeniedException(grantee.getUsername(), grantee.getHostname(), false)));
158         ShardingSpherePreconditions.checkState(privileges.hasPrivileges(currentDatabase),
159                 () -> new CDCExceptionWrapper(requestId, new UnknownDatabaseException(currentDatabase)));
160     }
161     
162     private String getHostAddress(final ChannelHandlerContext context) {
163         SocketAddress socketAddress = context.channel().remoteAddress();
164         return socketAddress instanceof InetSocketAddress ? ((InetSocketAddress) socketAddress).getAddress().getHostAddress() : socketAddress.toString();
165     }
166     
167     private void processStreamDataRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
168         if (!request.hasStreamDataRequestBody()) {
169             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Stream data request body is empty"));
170         }
171         StreamDataRequestBody requestBody = request.getStreamDataRequestBody();
172         if (requestBody.getDatabase().isEmpty()) {
173             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Database is empty"));
174         }
175         if (requestBody.getSourceSchemaTableList().isEmpty()) {
176             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Source schema table is empty"));
177         }
178         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), requestBody.getDatabase());
179         try {
180             CDCResponse response = backendHandler.streamData(request.getRequestId(), requestBody, connectionContext, ctx.channel());
181             ctx.writeAndFlush(response);
182         } catch (final PipelineSQLException ex) {
183             throw new CDCExceptionWrapper(request.getRequestId(), ex);
184         }
185     }
186     
187     private void processAckStreamingRequest(final CDCRequest request) {
188         if (!request.hasAckStreamingRequestBody()) {
189             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request body is empty"));
190         }
191         AckStreamingRequestBody requestBody = request.getAckStreamingRequestBody();
192         if (requestBody.getAckId().isEmpty()) {
193             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Ack request is empty"));
194         }
195         backendHandler.processAck(requestBody);
196     }
197     
198     private void processStartStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
199         if (!request.hasStartStreamingRequestBody()) {
200             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Start streaming request body is empty"));
201         }
202         StartStreamingRequestBody requestBody = request.getStartStreamingRequestBody();
203         if (requestBody.getStreamingId().isEmpty()) {
204             throw new CDCExceptionWrapper(request.getRequestId(), new PipelineInvalidParameterException("Streaming id is empty"));
205         }
206         String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
207         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
208         backendHandler.startStreaming(requestBody.getStreamingId(), connectionContext, ctx.channel());
209         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
210     }
211     
212     private void processStopStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
213         StopStreamingRequestBody requestBody = request.getStopStreamingRequestBody();
214         String database = backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId());
215         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), database);
216         backendHandler.stopStreaming(requestBody.getStreamingId(), ctx.channel().id());
217         connectionContext.setJobId(null);
218         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
219     }
220     
221     private void processDropStreamingRequest(final ChannelHandlerContext ctx, final CDCRequest request, final CDCConnectionContext connectionContext) {
222         DropStreamingRequestBody requestBody = request.getDropStreamingRequestBody();
223         checkPrivileges(request.getRequestId(), connectionContext.getCurrentUser().getGrantee(), backendHandler.getDatabaseNameByJobId(requestBody.getStreamingId()));
224         backendHandler.dropStreaming(requestBody.getStreamingId());
225         connectionContext.setJobId(null);
226         ctx.writeAndFlush(CDCResponseUtils.succeed(request.getRequestId()));
227     }
228 }