Kafka is a high-performance distributed stream processing platform. It is used to handle real-time data streams and is utilized for building real-time data pipelines, stream processing applications, and real-time analytics systems.
Currently, Debezium does not have an official connector for OceanBase. However, since OceanBase is MySQL compatible, this document will use Debezium MySQL Connector and Confluent JDBC Sink Connector for data integration between OceanBase and Kafka.
Background information
Debezium is a series of source connectors developed for Kafka Connect, based on change data capture (CDC). As a distributed service, Debezium records row-level changes for each database table and delivers changes to applications for processing via Kafka topics. Initially, it can send all existing data to Kafka topics, then capture new changes.
Confluent JDBC Sink Connector is a Kafka connector used to write Kafka data to relational databases. It allows you to stream messages from Kafka topics to JDBC-compatible databases, enabling real-time data synchronization and persistent storage.
Kafka Connect workflow
- Configure the source connector, Debezium MySQL Connector in this case, to read data from the OceanBase database.
- Write the read data to a specific topic in Kafka.
- Configure the sink connector, Confluent JDBC Sink Connector in this case, to read data from the specific Kafka topic and write it into OceanBase Database.
Demonstration environment
Before you start the integration between OceanBase and Kafka, do a check of these:
- Your Debezium version is V1.5.4.Final.
- Your Kafka version is V2.12-2.5.0.
- Your Zookeeper version is V3.6.
- Your OceanBase database version is V4.2.3, and Binlog is enabled.
- Your Confluent JDBC Sink Connector version is V10.7.6.
Note
The versions used in this demonstration are for reference purposes only. You can use other versions as long as they are compatible.
Configure OceanBase Database
Follow these steps to configure OceanBase Database:
Activate the Binlog service in OceanBase Cloud. In the left-side navigation pane of the OceanBase Cloud console, choose Instances > Tenant Management and activate the Binlog service. For more information, see Activate the Binlog service.
Connect to OceanBase Database and create a dedicated account for the Debezium Connector.
CREATE USER 'debezium_user'@'localhost' IDENTIFIED BY 'debezium_password';If you are using OceanBase Cloud, log in to the OceanBase cloud service console to create the user and grant privileges. For more information, see Create Account (Database User).
Grant necessary privileges to the
debezium_useraccount.GRANT SELECT, CREATE, RELOAD, SHOW DATABASES ON *.* TO 'debezium_user' IDENTIFIED BY 'debezium_password';Enable Binlog service for OceanBase database.
If you are using OceanBase Cloud, enable Binlog in these method: Instance List -> Tenant Management -> Binlog Service, then click to enable. For more information, see Enable Binlog Service.
Run these command to make sure that Binlog is enabled:
SHOW MASTER STATUS;Expected result:
+------------------+----------+--------------+------------------+------------------------------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +------------------+----------+--------------+------------------+------------------------------------------+ | mysql-bin.000001 | 2567 | | | a2750d9c-11da-11ef-81aa-0242ac110008:1-9 | +------------------+----------+--------------+------------------+------------------------------------------+ 1 row in set (0.125 sec)Check the values of
interactive_timeoutandwait_timeoutparameters.SHOW VARIABLES LIKE 'interactive_timeout'; SHOW VARIABLES LIKE 'wait_timeout';The default values for both parameters are 28800 seconds, which is 8 hours. During the initialization snapshot, the process might take a long time, and the connection might time out. You can decide whether to adjust these parameters based on the actual situation.
Configure Debezium MySQL Connector
Follow these steps to configure Debezium MySQL Connector:
Download Debezium MySQL Connector plug-in version 1.5.4.Final to the machine where Kafka Connect is installed.
For more information, refer to Download Debezium MySQL connector.
Extract Debezium MySQL Connector.
Send Data from OceanBase Database to Kafka
This section will configure Kafka and Debezium to synchronize data from OceanBase Database.
Start Debezium MySQL Connector
Follow these steps to start a distributed Debezium MySQL Connector:
Configure Kafka Connect
Go to the
configfolder in the Kafka installation directory. In this example, the directory is/opt/kafka/config. Modify theconnect-distributed.propertiesfile and add the path of the Debezium MySQL connector plugin at the end of the file.# open configuration file vi /opt/kafka/config/connect-distributed.properties # add these content: plugin.path=/opt/debezium/pluginNotice
Do not include
debezium-connector-mysqldirectory underplugin.Check Kafka topics status
Before you start Kafka Connect, check the current topic status.
bin/kafka-topics.sh --list --bootstrap-server localhost:9092The list is currently empty, with no topics.
Start Kafka Connect.
Use the these command to start Kafka Connect:
bin/connect-distributed.sh -daemon config/connect-distributed.propertiesConfigure the synchronization settings for OceanBase Database.
Create a
register-oceanbase-debezium.jsonfile in any directory, and add the necessary synchronization information for OceanBase Database.# Create register-oceanbase-debezium.json file vi register-oceanbase-debezium.jsonAdd these content:
{ "name": "oceanbase-inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "xxx.xx.x.x", "database.port": "xxxx", "database.user": "debezium_user", "database.password": "xxxxxxxx", "database.server.id": "1", "database.server.name": "zhang-oceanbase", "database.whitelist": "test", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "oceanbase-schema-changes-inventory", "snapshot.locking.mode": "none" } }For more information, see Debezium MySQL Connector documentation.
Notice
- Modify the configuration items to match your database information.
- During the initial snapshot phase, setting
snapshot.locking.modetononeis only safe if you are certain no other clients will modify the table schema.
Register the Debezium MySQL Connector via REST API.
Use these command to register the configuration with the Debezium MySQL Connector:
curl -X POST -H 'Content-Type: application/json' --data @register-oceanbase-debezium.json http://localhost:8083/connectors/Notice
You must execute this command in the directory containing the
register-oceanbase-debezium.jsonfile.Verify if the Connector has been added successfully.
curl http://localhost:8083/connectorsIf successful, a list of Connectors should be displayed.
Check the Kafka Topics.
bin/kafka-topics.sh --list --bootstrap-server localhost:9092At this point, you should see a list of the relevant Topics.
Check the data messages in the Kafka Topic.
Run these command to make sure that the data has been sent to the Kafka Topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic zhang-oceanbase.test.tb1 --from-beginningIn this example, the OceanBase
testdatabase includes a table namedtb1with 14 data entries, with the highestIDbeing 14.The steps above demonstrate how to synchronize data to the
zhang-oceanbase.test.tb1Topic.
Adding new data
Insert a new data to see if it is passed into Kafka's Topic zhang-oceanbase.test.tb1.
INSERT INTO tb1 (id, name, addtime) VALUES (15, 'xiaoli', '2024-06-18 17:20:00');
Update and delete Connectors
Update Connector configuration.
The format of the JSON file for updates is different from that used for additions. When you update, the keys
nameandconfigare not present. The file format for updates is as follows:{ "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "xxx.xx.x.x", "database.port": "xxxxx", "database.user": "debezium_user@zhang", "database.server.id": "1", "database.server.name": "zhang-oceanbase", "database.whitelist": "test", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "oceanbase-schema-changes-inventory", "snapshot.locking.mode": "none" }Update by using the REST API with the
PUTmethod:curl -X PUT -H 'Content-Type: application/json' --data @register-oceanbase-update.json http://localhost:8083/connectors/oceanbase-inventory-connector/configDelete Connector.
curl -X DELETE http://localhost:8083/connectors/oceanbase-inventory-connectorConnector deletion does not delete the Kafka Topic.
Recreate the same Connector.
After deletion, recreate the same Connector. New data will continue to be added incrementally into the previously established Topic.
Write messages from Kafka to OceanBase Database
Follow these steps to consume data from Kafka's Topic using the JDBC Sink:
Prepare Confluent Kafka Connect JDBC.
Download Confluent Kafka Connect JDBC. For more information, see JDBC Connector (Source and Sink). Choose the deployment method
Self-Hosted.Extract the downloaded archive, and place the extracted folder into the path specified by the
plugin.pathconfiguration in theconnect-distributed.propertiesfile.Since the
libdirectory does not contain the MySQL driver, place the MySQL driver into thelibdirectory. Execute these command to copy the driver from/opt/debezium/plugin/debezium-connector-mysql:cp /opt/debezium/plugin/debezium-connector-mysql/mysql-connector-java-8.0.21.jar /opt/debezium/plugin/confluentinc-kafka-connect-jdbc-10.7.6/lib/
Restart Kafka Connect.
Use the
jpscommand to find theConnectDistributedprocess, then executekill pid, and restart Kafka Connect.bin/connect-distributed.sh -daemon ./config/connect-distributed.propertiesAfter you restart Kafka Connect, check if the JDBC Sink Connector has been added successfully:
curl http://localhost:8083/connector-pluginsExpected return results:
{"class":"io.confluent.connect.jdbc.JdbcSinkConnector","type":"sink","version":"10.7.6"}
Create the Kafka Connect JDBC Sink configuration file.
vi register-oceanbase-sink.jsonAdd these content to the
register-oceanbase-sink.jsonfile:{ "name": "connect-oceanbase-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://xxx.xx.x.x:xxxx/test_sink?useUnicode=true&characterEncoding=UTF-8&useSSL=false", "connection.user": "xxxxx", "connection.password": "xxxxxx", "insert.mode": "upsert", "delete.enabled": "true", "pk.mode": "record_key", "auto.create": "true", "auto.evolve": "true", "topics": "zhang-oceanbase.test.tb1", "table.name.format": "tb2", "transforms": "ExtractField", "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractField.field": "after" } }Notice
- The example does not create a separate user for the Sink; it uses the
rootuser instead. In actual use, you may choose based on the circumstances. - The source and target databases are located in different tenants. If they are in the same tenant, it might cause issues with Binlog entries.
test_sinkis the target database established in advance.
- The example does not create a separate user for the Sink; it uses the
Add JDBC Sink Connector to Kafka Connect.
curl -X POST -H "Content-Type: application/json" --data @register-oceanbase-sink.json http://localhost:8083/connectorsCheck if the JDBC Sink Connector has been added successfully:
curl http://localhost:8083/connectorsExpected return results:
["connect-oceanbase-sink","oceanbase-inventory-connector"]Synchronize Full Data
Access the
test_sinkto view the dataSELECT * FROM tb2;Expected return results:
+----+-----------+----------------------+ | ID | name | addtime | +----+-----------+----------------------+ | 1 | xiaozhao | 2016-12-09T16:04:33Z | | 2 | xiaozhang | 2016-12-09T16:04:33Z | | 3 | xiaozhang | 2016-12-09T16:04:33Z | | 4 | xiaozhang | 2017-12-09T16:04:33Z | | 5 | xiaozhang | 2017-12-09T16:04:33Z | | 6 | xiaozhang | 2017-12-09T16:04:33Z | | 7 | xiaozhang | 2017-12-09T16:04:33Z | | 8 | xiaozhang | 2017-12-09T16:04:33Z | | 9 | xiaozhang | 2017-12-09T16:04:33Z | | 10 | xiaozhang | 2017-12-09T16:04:33Z | | 11 | xiaozhang | 2017-12-09T16:04:33Z | | 12 | xiaozhang | 2017-12-09T16:04:33Z | | 13 | xiaozhang | 2017-12-09T16:04:33Z | | 14 | xiaozhang | 2017-12-09T16:04:33Z | | 15 | xiaoli | 2024-06-18T09:20:00Z | | 16 | xiaoli | 2024-06-18T09:20:00Z | | 17 | xiaoli | 2024-06-18T09:20:00Z | | 18 | xiaoli | 2024-06-18T09:20:00Z | | 19 | xiaoli | 2024-06-18T09:20:00Z | | 20 | xiaoli | 2024-06-18T09:20:00Z | | 21 | xiaoli | 2024-06-18T09:20:00Z | | 22 | xiaoli | 2024-06-18T09:20:00Z | | 23 | xiaoli | 2024-06-18T09:20:00Z | | 24 | xiaozhang | 2024-06-24T09:20:00Z | +----+-----------+----------------------+ 24 rows in set (0.013 sec)
Synchronize incremental data
Add a new record with ID equal to 25 to the source database
test:INSERT INTO tb1 (id, name, addtime) VALUES (25,'xiaozhang','2024-06-25 09:20:00');Observe the incremental data with ID 25 in the target database
test_sink:SELECT * FROM tb2 WHERE id = 25;
Modify table schema
Add an
agecolumn to the source table:ALTER TABLE tb1 ADD COLUMN age INT;At this point, the source table has an additional column, while the target table remains unchanged, as no new data has flowed to the target table. The structure of the target table will only change when new data is added.
Add another record with ID 26 to the source table:
INSERT INTO tb1 (id, name, addtime, age) VALUES (26,'xiaozhang','2024-06-25 10:20:00',20);You can see that the schema of the target table has changed, and the data with ID 26 has been synchronized to the target table.
Update data
The age column for the row with ID 26 in the target table has been updated to 21.
UPDATE tb1 SET age = 21 WHERE id = 26;
Delete data
Delete the data with ID equal to 20 from the source table:
DELETE FROM tb1 WHERE id = 20;The corresponding data has been deleted from the target table:
SELECT * FROM tb2 WHERE id = 20;