复古 护眼 海天 深邃 暗黑 默认

运行部署

背景信息

ShardingSphere CDC 分为两个部分,一个是 CDC Server,另一个是 CDC Client。 CDC Server 和 ShardingSphere-Proxy 目前是一同部署的。

用户可以在自己的项目中引入 CDC Client,实现数据的消费逻辑。

约束条件

  • 纯 JAVA 开发,JDK 建议 1.8 或以上版本。
  • CDC Server 要求 ShardingSphere-Proxy 使用集群模式,目前支持 ZooKeeper 作为注册中心。
  • CDC 只同步数据,不会同步表结构,目前也不支持 DDL 的语句同步。

CDC Server 部署步骤

这里以 openGauss 数据库为例,介绍 CDC Server 的部署步骤。

由于 CDC Server 内置于 ShardingSphere-Proxy,所以需要获取 ShardingSphere-Proxy。详情请参见 proxy 启动手册

CDC Server 使用手册

修改配置文件 conf/global.yaml,打开 CDC 功能。 目前 mode 必须是 Cluster,需要提前启动对应的注册中心。

配置示例:

  1. global.yaml 中开启 CDC 功能。
mode:
  type: Cluster
  repository:
    type: ZooKeeper
    props:
      namespace: cdc_demo
      server-lists: localhost:2181
      retryIntervalMilliseconds: 500
      timeToLiveSeconds: 60
      maxRetries: 3
      operationTimeoutMilliseconds: 500

authority:
  users:
    - user: root@%
      password: root
  privilege:
    type: ALL_PERMITTED

props:
  proxy-default-port: 3307 # Proxy default port
  cdc-server-port: 33071 # CDC Server 端口,必须配置
  proxy-frontend-database-protocol-type: openGauss # 和后端数据库的类型一致
  1. 引入 JDBC 驱动。

proxy 已包含 PostgreSQL JDBC 和 openGauss JDBC 驱动。

如果后端连接以下数据库,请下载相应 JDBC 驱动 jar 包,并将其放入 ${shardingsphere-proxy}/ext-lib 目录。

数据库 JDBC 驱动
MySQL mysql-connector-j-8.4.0.jar
  1. 启动 ShardingSphere-Proxy:
sh bin/start.sh
  1. 查看 proxy 日志 logs/stdout.log,看到日志中出现:
[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy Cluster mode started successfully

确认启动成功。

  1. 按需配置 CDC 任务同步配置

完整 DistSQL 语法请参考 SHOW STREAMING RULEALTER STREAMING RULE

6.1. 查询配置。

SHOW STREAMING RULE;

默认配置如下:

+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
| read                                                         | write                                | stream_channel                                        |
+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | {"workerThread":20,"batchSize":1000} | {"type":"MEMORY","props":{"block-queue-size":"2000"}} |
+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+

6.2. 修改配置(可选)。

因 streaming rule 具有默认值,无需创建,仅提供 ALTER 语句。

完整配置 DistSQL 示例:

ALTER STREAMING RULE (
READ(
  WORKER_THREAD=20,
  BATCH_SIZE=1000,
  SHARDING_SIZE=10000000,
  RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
  WORKER_THREAD=20,
  BATCH_SIZE=1000,
  RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);

配置项说明:

ALTER STREAMING RULE (
READ( -- 数据读取配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=20, -- 影响全量、增量任务,从源端摄取数据的线程池大小,不配置则使用默认值,需要确保该值不低于分库的数量。
  BATCH_SIZE=1000, -- 影响全量、增量任务,一次查询操作返回的最大记录数。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
  SHARDING_SIZE=10000000, -- 影响全量任务,存量数据分片大小。如果不配置则使用默认值。
  RATE_LIMITER ( -- 影响全量、增量任务,限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:QPS
  NAME='QPS',
  PROPERTIES( -- 算法属性
  'qps'='500'
  )))
),
WRITE( -- 数据写入配置。如果不配置则部分参数默认生效。
  WORKER_THREAD=20, -- 影响全量、增量任务,数据写入到目标端的线程池大小。如果不配置则使用默认值。
  BATCH_SIZE=1000, -- 影响全量、增量任务,存量任务一次批量写入操作的最大记录数。如果不配置则使用默认值。如果一个事务中的数据量大于该值,增量情况下可能超过设定的值。
  RATE_LIMITER ( -- 限流算法。如果不配置则不限流。
  TYPE( -- 算法类型。可选项:TPS
  NAME='TPS',
  PROPERTIES( -- 算法属性
  'tps'='2000'
  )))
),
STREAM_CHANNEL ( -- 数据通道,连接生产者和消费者,用于 read 和 write 环节。如果不配置则默认使用 MEMORY 类型。
TYPE( -- 算法类型。可选项:MEMORY
NAME='MEMORY',
PROPERTIES( -- 算法属性
'block-queue-size'='2000' -- 属性:阻塞队列大小
)))
);

CDC Client 手册

CDC Client 不需要额外部署,只需要通过 maven 引入 CDC Client 的依赖就可以在项目中使用。用户可以通过 CDC Client 和服务端进行交互。

如果有需要,用户也可以自行实现一个 CDC Client,进行数据的消费和 ACK。

<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
    <version>${version}</version>
</dependency>

CDC Client 介绍

org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient 是 CDC Client 的入口类,用户可以通过该类和 CDC Server 进行交互。主要的和新方法如下。

方法名 返回值 说明
connect(Consumer<List> dataConsumer, ExceptionHandler exceptionHandler, ServerErrorResultHandler errorResultHandler) void 和服务端进行连接,连接的时候需要指定
1. 数据的消费处理逻辑
2. 消费时候的异常处理逻辑
3. 服务端错误的异常处理逻辑
login(CDCLoginParameter parameter) void CDC登陆,参数
username:用户名
password:密码
startStreaming(StartStreamingParameter parameter) String 开启 CDC 订阅并返回 streaming ID
StartStreamingParameter 参数
database:逻辑库名称
schemaTables:订阅的表名
full:是否订阅全量数据
restartStreaming(String streamingId) void 重启订阅
stopStreaming(String streamingId) void 停止订阅
dropStreaming(String streamingId) void 删除订阅
await() void 阻塞 CDC 线程,等待 channel 关闭
close() void 关闭 channel,流程结束