Debezium is an open-source distributed platform for monitoring database changes, capturing data change events, and exporting them as event streams to various consumers. It is based on Apache Kafka and supports multiple databases.
Prerequisites
Before proceeding, please make sure that:
You have enabled the Binlog service in OceanBase Cloud.
The path to enable the Binlog service is: Instances -> Tenants -> Binlog Service, click to activate. For more information, refer to Enable Binlog Service.
Notice
- Initial snapshots feature is not supported, only the default Incremental snapshots feature is supported.
- We recommend that you set snapshot.locking.mode to
none, and read the risk warnings in the official documentation.
Example
This article provides a simple way of how to use Debezium with Docker to connect to OceanBase and retrieve data. This deployment method is not suitable in production environment. Follow the steps below:
Start Zookeeper
docker run -it -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.4Start Kafka
docker run -it -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.4Start Kafka Connect Service. The service exposes a REST API for managing the Debezium MySQL connector.
docker run -it -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka quay.io/debezium/connect:2.4Create a new
payload.jsonfile and write in the following example configuration.{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "xxxx.cn-hangzhou.oceanbase.aliyuncs.com", "database.port": "3306", "database.user": "root", "database.password": "xxxx", "database.server.id": "1", "topic.prefix": "observer1", "database.include.list": "debe", "table.include.list":"debe.earthquake", "schema.history.internal.store.only.captured.tables.ddl":true, "schema.history.internal.skip.unparseable.ddl":true, "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory", "snapshot.locking.mode": "none" } }
Notice
- The MySQL connector cannot recognize OceanBase's
PURGE TABLEsyntax. This can be resolved by configuringschema.history.internal.skip.unparseable.ddloptions. - If you do not want to synchronize OceanBase's virtual tables and views, use
table.include.listortable.exclude.listoptions. - Use the curl command to submit a POST request to the /connectors resource of the Kafka Connect service's API, along with the JSON document for the aforementioned new connector.
cat payload.json | tr -d "\n" | curl -X POST -H "Content-Type: application/json" -d @- localhost:8083/connectors/ - After registering a connector, Debezium will produce a considerable amount of log output in the Kafka Connect container. From these outputs, you can observe the process the connector undergoes from creation to start reading OceanBase's Binlog.
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k observer1.debe.earthquake