View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * The ASF licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *     http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  
18  package org.apache.shardingsphere.proxy.frontend;
19  
20  import io.netty.bootstrap.ServerBootstrap;
21  import io.netty.buffer.PooledByteBufAllocator;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.EventLoopGroup;
25  import io.netty.channel.WriteBufferWaterMark;
26  import io.netty.channel.epoll.Epoll;
27  import io.netty.channel.epoll.EpollEventLoopGroup;
28  import io.netty.channel.epoll.EpollServerSocketChannel;
29  import io.netty.channel.nio.NioEventLoopGroup;
30  import io.netty.channel.socket.nio.NioServerSocketChannel;
31  import io.netty.handler.logging.LogLevel;
32  import io.netty.handler.logging.LoggingHandler;
33  import lombok.RequiredArgsConstructor;
34  import org.apache.shardingsphere.proxy.frontend.netty.CDCServerHandlerInitializer;
35  
36  import java.util.ArrayList;
37  import java.util.List;
38  
39  /**
40   * CDC server.
41   */
42  @RequiredArgsConstructor
43  public final class CDCServer extends Thread {
44      
45      private final List<String> addressed;
46      
47      private final int port;
48      
49      private EventLoopGroup bossGroup;
50      
51      private EventLoopGroup workerGroup;
52      
53      @Override
54      public void run() {
55          try {
56              List<ChannelFuture> futures = startInternal(addressed, port);
57              for (ChannelFuture each : futures) {
58                  each.channel().closeFuture().sync();
59              }
60          } catch (final InterruptedException ignored) {
61              Thread.currentThread().interrupt();
62          } finally {
63              close();
64          }
65      }
66      
67      private List<ChannelFuture> startInternal(final List<String> addresses, final int port) throws InterruptedException {
68          createEventLoopGroup();
69          ServerBootstrap bootstrap = new ServerBootstrap();
70          bootstrap.channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
71                  .group(bossGroup, workerGroup)
72                  .option(ChannelOption.SO_REUSEADDR, true)
73                  .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
74                  .childOption(ChannelOption.SO_KEEPALIVE, true)
75                  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
76                  .childOption(ChannelOption.TCP_NODELAY, true)
77                  .handler(new LoggingHandler(LogLevel.INFO))
78                  .childHandler(new CDCServerHandlerInitializer());
79          List<ChannelFuture> result = new ArrayList<>();
80          for (String each : addresses) {
81              result.add(bootstrap.bind(each, port).sync());
82          }
83          return result;
84      }
85      
86      private void createEventLoopGroup() {
87          bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
88          workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
89      }
90      
91      private void close() {
92          if (null != bossGroup) {
93              bossGroup.shutdownGracefully();
94          }
95          if (null != workerGroup) {
96              workerGroup.shutdownGracefully();
97          }
98      }
99  }