Debezium is an open-source distributed platform that monitors and captures database changes and turns them into event streams. Debezium is implemented based on Apache Kafka and supports multiple types of database systems.
Prerequisites
You have installed oblogproxy.
Examples
In this topic, a docker is used to demonstrate how to connect to OceanBase Database by using Debezium and obtain data. This deployment mode cannot be directly used in a production environment. Perform the following steps:
Start Zookeeper.
docker run -it -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.4Start Kafka.
docker run -it -d --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.4Start Kafka Connect, which exposes a REST API to manage the Debezium connector for MySQL.
docker run -it -d --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka quay.io/debezium/connect:2.4Create a file named
payload.jsonand write the following sample code to the file:{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "xxxx.cn-hangzhou.oceanbase.aliyuncs.com", "database.port": "3306", "database.user": "root", "database.password": "xxxx", "database.server.id": "1", "topic.prefix": "observer1", "database.include.list": "debe", "table.include.list":"debe.earthquake", "schema.history.internal.store.only.captured.tables.ddl":true, "schema.history.internal.skip.unparseable.ddl":true, "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.inventory", "snapshot.locking.mode": "none" } }
Notice
- If the Debezium connector for MySQL connector does not recognize OceanBase Database syntaxes such as
PURGE TABLE, you can setschema.history.internal.skip.unparseable.ddltotrueto skip these syntaxes. - If you do not want to synchronize the unique virtual tables and views of OceanBase Database, you can specify
table.include.listortable.exclude.listto filter out these tables and views. - You can use a curl command to submit a POST request for
/connectorsresources to the API of Kafka Connect, with the aforementioned JSON file for the new connector included in the request. Here is an example:cat payload.json | tr -d "\n" | curl -X POST -H "Content-Type: application/json" -d @- localhost:8083/connectors/ - A registered connector will generate a large number of logs in the Kafka Connect container. From the generated logs, you can observe how the connector is created and how it reads the binlogs of OceanBase Database. Here is an example:
docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k observer1.debe.earthquake