ShardingSphere CDC is divided into two parts, one is the CDC Server, and the other is the CDC Client. The CDC Server and ShardingSphere-Proxy are currently deployed together.
Users can introduce the CDC Client into their own projects to implement data consumption logic.
Here, the openGauss database is used as an example to introduce the deployment steps of the CDC Server.
Since the CDC Server is built into ShardingSphere-Proxy, you need to get ShardingSphere-Proxy. For details, please refer to the proxy startup manual.
The official website’s released binary package does not include the GLT module by default, if you are using the openGauss database with GLT functionality, you can additionally introduce the GLT module to ensure the integrity of XA transactions.
There are currently two ways to introduce the GLT module, and corresponding configurations need to be made in global.yaml.
1.1 Prepare the code environment, download in advance or use Git clone to download the ShardingSphere source code from Github.
1.2 Delete the <scope>provided</scope>
tag of the shardingsphere-global-clock-tso-provider-redis dependency in kernel/global-clock/type/tso/core/pom.xml and the <scope>provided</scope>
tag of jedis in kernel/global-clock/type/tso/provider/redis/pom.xml
1.3 Compile ShardingSphere-Proxy, for specific compilation steps, please refer to the ShardingSphere Compilation Manual.
Can be introduced from the maven repository
2.1. shardingsphere-global-clock-tso-provider-redis, download the same version as ShardingSphere-Proxy
2.2. jedis-4.3.1
Modify the configuration file conf/global.yaml
and turn on the CDC function. Currently, mode
must be Cluster
, and the corresponding registry center needs to be started in advance. If the GLT provider uses Redis, Redis needs to be started in advance.
Configuration example:
global.yaml
.mode:
type: Cluster
repository:
type: ZooKeeper
props:
namespace: cdc_demo
server-lists: localhost:2181
retryIntervalMilliseconds: 500
timeToLiveSeconds: 60
maxRetries: 3
operationTimeoutMilliseconds: 500
authority:
users:
- user: root@%
password: root
privilege:
type: ALL_PERMITTED
# When using GLT, you also need to enable distributed transactions, GLT is only supported by the openGauss database currently.
#transaction:
# defaultType: XA
# providerType: Atomikos
#
#globalClock:
# enabled: true
# type: TSO
# provider: redis
# props:
# host: 127.0.0.1
# port: 6379
props:
system-log-level: INFO
proxy-default-port: 3307 # Proxy default port.
cdc-server-port: 33071 # CDC Server port, must be configured
proxy-frontend-database-protocol-type: openGauss # Consistent with the type of backend database
Proxy has included JDBC driver of PostgreSQL and openGauss.
If the backend is connected to the following databases, download the corresponding JDBC driver jar package and put it into the ${shardingsphere-proxy}/ext-lib
directory.
Database | JDBC Driver |
---|---|
MySQL | mysql-connector-j-8.3.0.jar |
sh bin/start.sh
logs/stdout.log
. If you see the following statements:[INFO ] [main] o.a.s.p.frontend.ShardingSphereProxy - ShardingSphere-Proxy Cluster mode started successfully
The startup will have been successful.
6.1. Query configuration.
SHOW STREAMING RULE;
The default configuration is as follows:
+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
| read | write | stream_channel |
+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
| {"workerThread":20,"batchSize":1000,"shardingSize":10000000} | {"workerThread":20,"batchSize":1000} | {"type":"MEMORY","props":{"block-queue-size":"2000"}} |
+--------------------------------------------------------------+--------------------------------------+-------------------------------------------------------+
6.2. Alter configuration (optional).
Since the streaming rule has default values, there is no need to create it, only the ALTER
statement is provided.
A completely configured DistSQL is as follows.
ALTER STREAMING RULE (
READ(
WORKER_THREAD=20,
BATCH_SIZE=1000,
SHARDING_SIZE=10000000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
),
WRITE(
WORKER_THREAD=20,
BATCH_SIZE=1000,
RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
),
STREAM_CHANNEL (TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
);
Configuration item description:
ALTER STREAMING RULE (
READ( -- Data reading configuration. If it is not configured, part of the parameters will take effect by default.
WORKER_THREAD=20, -- Affects full and incremental tasks, obtain the thread pool size of all the data from the source side. If it is not configured, the default value is used. It needs to ensure that this value is not lower than the number of database shards.
BATCH_SIZE=1000, -- Affects full and incremental tasks, the maximum number of records returned by a query operation. If it is not configured, the default value is used. If the amount of data in a transaction is greater than this value, the incremental situation may exceed the set value.
SHARDING_SIZE=10000000, -- Affects full tasks, sharding size of all the data. If it is not configured, the default value is used.
RATE_LIMITER ( -- Affects full and incremental tasks, traffic limit algorithm. If it is not configured, traffic is not limited.
TYPE( -- Algorithm type. Option: QPS
NAME='QPS',
PROPERTIES( -- Algorithm property
'qps'='500'
)))
),
WRITE( -- Data writing configuration. If it is not configured, part of the parameters will take effect by default.
WORKER_THREAD=20, -- Affects full and incremental tasks, the size of the thread pool on which data is written into the target side. If it is not configured, the default value is used.
BATCH_SIZE=1000, -- Affects full and incremental tasks, the maximum number of records for a batch write operation. If it is not configured, the default value is used. If the amount of data in a transaction is greater than this value, the incremental situation may exceed the set value.
RATE_LIMITER ( -- Traffic limit algorithm. If it is not configured, traffic is not limited.
TYPE( -- Algorithm type. Option: TPS
NAME='TPS',
PROPERTIES( -- Algorithm property.
'tps'='2000'
)))
),
STREAM_CHANNEL ( -- Data channel. It connects producers and consumers, used for reading and writing procedures. If it is not configured, the MEMORY type is used by default.
TYPE( -- Algorithm type. Option: MEMORY
NAME='MEMORY',
PROPERTIES( -- Algorithm property
'block-queue-size'='2000' -- Property: blocking queue size.
)))
);
The CDC Client does not need to be deployed separately, just need to introduce the dependency of the CDC Client through maven to use it in the project. Users can interact with the server through the CDC Client.
If necessary, users can also implement a CDC Client themselves to consume data and ACK.
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
<version>${version}</version>
</dependency>
org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient
is the entry class of the CDC Client. Users can interact with the CDC Server through this class. The main new methods are as follows.
Method Name | Return Value | Description |
---|---|---|
connect(Consumer<List |
void | Connect with the server, when connecting, you need to specify 1. Data consumption processing function 2. Exception handling logic during consumption 3. Server error exception handling function |
login(CDCLoginParameter parameter) | void | CDC login, parameters username: username password: password |
startStreaming(StartStreamingParameter parameter) | streamingId | Start CDC subscription StartStreamingParameter parameters database: logical database name schemaTables: subscribed table name full: whether to subscribe to full data |
restartStreaming(String streamingId) | void | Restart subscription |
stopStreaming(String streamingId) | void | Stop subscription |
dropStreaming(String streamingId) | void | Delete subscription |
await() | void | Block the CDC thread and wait for the channel to close |
close() | void | Close the channel, the process ends |