1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.shardingsphere.data.pipeline.mysql.ingest;
19
20 import com.google.common.base.Preconditions;
21 import org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
22 import org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
23
24 import javax.sql.DataSource;
25 import java.sql.Connection;
26 import java.sql.PreparedStatement;
27 import java.sql.ResultSet;
28 import java.sql.SQLException;
29
30
31
32
33 public final class MySQLIngestPositionManager implements DialectIngestPositionManager {
34
35 @Override
36 public BinlogPosition init(final DataSource dataSource, final String slotNameSuffix) throws SQLException {
37 try (Connection connection = dataSource.getConnection()) {
38 return getBinlogPosition(connection);
39 }
40 }
41
42 @Override
43 public BinlogPosition init(final String data) {
44 String[] array = data.split("#");
45 Preconditions.checkArgument(2 == array.length, "Unknown binlog position: %s", data);
46 return new BinlogPosition(array[0], Long.parseLong(array[1]), 0L);
47 }
48
49 private BinlogPosition getBinlogPosition(final Connection connection) throws SQLException {
50 String filename;
51 long position;
52 long serverId;
53 try (
54 PreparedStatement preparedStatement = connection.prepareStatement("SHOW MASTER STATUS");
55 ResultSet resultSet = preparedStatement.executeQuery()) {
56 resultSet.next();
57 filename = resultSet.getString(1);
58 position = resultSet.getLong(2);
59 }
60 try (
61 PreparedStatement preparedStatement = connection.prepareStatement("SHOW VARIABLES LIKE 'server_id'");
62 ResultSet resultSet = preparedStatement.executeQuery()) {
63 resultSet.next();
64 serverId = resultSet.getLong(2);
65 }
66 return new BinlogPosition(filename, position, serverId);
67 }
68
69 @Override
70 public String getDatabaseType() {
71 return "MySQL";
72 }
73 }