ShardingSphere CDC is divided into two parts, one is the CDC Server, and the other is the CDC Client. The CDC Server and ShardingSphere-Proxy are currently deployed together.
Users can introduce the CDC Client into their own projects to implement data consumption logic.
Here, the openGauss database is used as an example to introduce the deployment steps of the CDC Server.
Since the CDC Server is built into ShardingSphere-Proxy, you need to get ShardingSphere-Proxy. For details, please refer to the proxy startup manual.
Modify the configuration file conf/global.yaml and turn on the CDC function. Currently, mode must be Cluster, and the corresponding registry center needs to be started in advance.
Configuration example:
global.yaml.mode:
type: Cluster
repository:
type: ZooKeeper
props:
namespace: cdc_demo
server-lists: localhost:2181
retryIntervalMilliseconds: 500
timeToLiveSeconds: 60
maxRetries: 3
operationTimeoutMilliseconds: 500
authority:
users:
- user: root@%
password: root
privilege:
type: ALL_PERMITTED
props:
proxy-default-port: 3307 # Proxy default port.
cdc-server-port: 33071 # CDC Server port, must be configured
proxy-frontend-database-protocol-type: openGauss # Consistent with the type of backend database
Proxy has included JDBC driver of PostgreSQL and openGauss.
If the backend is connected to the following databases, download the corresponding JDBC driver jar package and put it into the ${shardingsphere-proxy}/ext-lib directory.
| Database | JDBC Driver |
|---|---|
| MySQL | mysql-connector-j-8.4.0.jar |
sh bin/start.sh
logs/stdout.log. If you see the following statements:[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy Cluster mode started successfully
The startup will have been successful.
For complete DistSQL syntax, refer to SHOW STREAMING RULE and ALTER STREAMING RULE.
6.1. Query configuration.
SHOW STREAMING RULE;
The default configuration is as follows:
+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
| read | write | stream_channel |
+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | {"workerThread":20,"batchSize":1000} | {"type":"MEMORY","props":{"block-queue-size":"2000"}} |
+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
6.2. Alter configuration (optional).
Since the streaming rule has default values, there is no need to create it, only the ALTER statement is provided.
A completely configured DistSQL is as follows.
ALTER STREAMING RULE (
READ(
WORKER_THREAD=20,
BATCH_SIZE=1000,
SHARDING_SIZE=10000000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
WORKER_THREAD=20,
BATCH_SIZE=1000,
RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);
Configuration item description:
ALTER STREAMING RULE (
READ( -- Data reading configuration. If it is not configured, part of the parameters will take effect by default.
WORKER_THREAD=20, -- Affects full and incremental tasks, obtain the thread pool size of all the data from the source side. If it is not configured, the default value is used. It needs to ensure that this value is not lower than the number of database shards.
BATCH_SIZE=1000, -- Affects full and incremental tasks, the maximum number of records returned by a query operation. If it is not configured, the default value is used. If the amount of data in a transaction is greater than this value, the incremental situation may exceed the set value.
SHARDING_SIZE=10000000, -- Affects full tasks, sharding size of all the data. If it is not configured, the default value is used.
RATE_LIMITER ( -- Affects full and incremental tasks, traffic limit algorithm. If it is not configured, traffic is not limited.
TYPE( -- Algorithm type. Option: QPS
NAME='QPS',
PROPERTIES( -- Algorithm property
'qps'='500'
)))
),
WRITE( -- Data writing configuration. If it is not configured, part of the parameters will take effect by default.
WORKER_THREAD=20, -- Affects full and incremental tasks, the size of the thread pool on which data is written into the target side. If it is not configured, the default value is used.
BATCH_SIZE=1000, -- Affects full and incremental tasks, the maximum number of records for a batch write operation. If it is not configured, the default value is used. If the amount of data in a transaction is greater than this value, the incremental situation may exceed the set value.
RATE_LIMITER ( -- Traffic limit algorithm. If it is not configured, traffic is not limited.
TYPE( -- Algorithm type. Option: TPS
NAME='TPS',
PROPERTIES( -- Algorithm property.
'tps'='2000'
)))
),
STREAM_CHANNEL ( -- Data channel. It connects producers and consumers, used for reading and writing procedures. If it is not configured, the MEMORY type is used by default.
TYPE( -- Algorithm type. Option: MEMORY
NAME='MEMORY',
PROPERTIES( -- Algorithm property
'block-queue-size'='2000' -- Property: blocking queue size.
)))
);
The CDC Client does not need to be deployed separately, just need to introduce the dependency of the CDC Client through maven to use it in the project. Users can interact with the server through the CDC Client.
If necessary, users can also implement a CDC Client themselves to consume data and ACK.
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
<version>${version}</version>
</dependency>
org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient is the entry class of the CDC Client. Users can interact with the CDC Server through this class. The main new methods are as follows.
| Method Name | Return Value | Description |
|---|---|---|
| connect(Consumer<List |
void | Connect with the server, when connecting, you need to specify 1. Data consumption processing function 2. Exception handling logic during consumption 3. Server error exception handling function |
| login(CDCLoginParameter parameter) | void | CDC login, parameters username: username password: password |
| startStreaming(StartStreamingParameter parameter) | String | Start CDC subscription and return the streaming ID StartStreamingParameter parameters database: logical database name schemaTables: subscribed table name full: whether to subscribe to full data |
| restartStreaming(String streamingId) | void | Restart subscription |
| stopStreaming(String streamingId) | void | Stop subscription |
| dropStreaming(String streamingId) | void | Delete subscription |
| await() | void | Block the CDC thread and wait for the channel to close |
| close() | void | Close the channel, the process ends |
