Apache Pulsar is a cloud-native distributed messaging and stream processing platform that uses a decoupled architecture to enable independent and elastic scaling of computation and storage. This topic describes how to use the Debezium MySQL connector in Pulsar to capture data changes from OceanBase Database (CDC) and implement real-time data synchronization. By configuring the Debezium source connector, you can push change events of OceanBase tables to Pulsar topics in real time for downstream systems to consume and process.
Compatibility
- OceanBase Database: ≥ V4.4.1
- Apache Pulsar: ≥ 4.2.0
- Debezium MySQL Connector: pulsar-io-debezium-mysql-4.2.0.nar
Prerequisites
Before you use Apache Pulsar to capture data changes in OceanBase Database, make sure that the following conditions are met:
- You have deployed OceanBase Database and created a MySQL-compatible user tenant. For more information, see Create a tenant .
- You have enabled the Binlog service for the MySQL-compatible user tenant. For more information, see OceanBase Binlog service .
- You have installed the Docker environment.
Procedure
Step 1: Obtain the connection string of OceanBase Database
Contact the OceanBase Database deployment personnel to obtain the connection string, for example:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
Parameter description:
$host: the IP address for connection. For connection through ODP, use the ODP address. For direct connection, use the OBServer IP address.$port: the connection port. For connection through ODP, the default value is2883. For direct connection, the default value is2881.$database_name: the database name.Notice
The user used to connect to the tenant must have the
CREATE,INSERT,DROP, andSELECTprivileges on the database. For more information about user privileges, see Privilege types in MySQL mode.$user_name: the connection account. For connection through ODP, the format isuser@tenant#clusterorcluster:tenant:user. For direct connection, the format isuser@tenant.$password: the account password.
For more information about the connection string, see Connect to an OceanBase tenant by using OBClient.
Here is an example:
obclient -hxxx.xxx.xxx.xxx -P2881 -utest_user001@mysql001 -p****** -Dtest
Step 2: Deploy the Apache Pulsar environment
The following example uses Docker to deploy the Apache Pulsar 4.2.0 standalone environment.
Run the following command to start the Pulsar container:
docker run -it -d \
--name pulsar \
--user root \
-p 6650:6650 \
-p 8080:8080 \
-p 2883:2883 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
--mount source=pulsarconnectors,target=/pulsar/connectors \
apachepulsar/pulsar:4.2.0 \
bin/pulsar standalone
After the container is started, run the following command to download the Debezium MySQL connector:
docker exec -it pulsar bash -c "wget -O /pulsar/connectors/pulsar-io-debezium-mysql-4.2.0.nar https://www.apache.org/dyn/closer.lua/pulsar/pulsar-4.2.0/connectors/pulsar-io-debezium-mysql-4.2.0.nar?action=download"
Step 3: Configure the Debezium OceanBase source
Go to the Pulsar container:
docker exec -it pulsar bash
Create a Pulsar configuration file named debezium-oceanbase-source-config.yaml in the /pulsar/conf directory:
tenant: "public"
namespace: "default"
name: "debezium-oceanbase-source"
topicName: "debezium-oceanbase-topic"
archive: "connectors/pulsar-io-debezium-mysql-4.2.0.nar"
parallelism: 1
configs:
database.hostname: "$host"
database.port: "$port"
database.user: "$user_name"
database.password: "$password"
database.server.id: "$server_id" # Ensure that this value is unique.
database.server.name: "$server_name"
database.include.list: "$database_name"
table.include.list: "$database_name.$table_name"
topic.prefix: "$topic_prefix"
database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"
database.history.pulsar.topic: "debezium-oceanbase-history"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter.schemas.enable: "false"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
pulsar.service.url: "pulsar://127.0.0.1:6650"
offset.storage.topic: "debezium-oceanbase-offsets"
Parameter description:
$host: the IP address of the OceanBase Database.$port: the port of the OceanBase Database.$user_name: the username of the OceanBase Database.$password: the password of the OceanBase Database.$server_id: the ID of the database server. Ensure that this value is unique.$server_name: the name of the database server.$database_name: the name of the database to be captured.$table_name: the name of the table to be captured.$topic_prefix: the prefix of the Pulsar topic.
Step 4: Start the CDC source and verify the data capture
Run the following command in the Pulsar container to start the source:
./bin/pulsar-admin --admin-url http://127.0.0.1:8080 source localrun \
--source-config-file ./conf/debezium-oceanbase-source.yaml
You can run the following command to view the number of data records in the topic:
./bin/pulsar-admin topics stats-internal persistent://public/default/$topic_prefix.$database_name.$table_name
Step 5: Generate test change data
To verify whether the CDC is working, execute a test DML statement on the target table in the OceanBase Database, for example:
INSERT INTO <table_name> VALUES (...);
You can also execute UPDATE or DELETE statements to verify whether different types of change events can be captured.
Step 6: Consume and verify the CDC data
Run the following command in the Pulsar container to consume the change data in the topic:
./bin/pulsar-client consume persistent://public/default/$topic_prefix.$database_name.$table_name -s sub1 -n 0 -p Earliest
When data is being consumed, the following output is displayed:
----- got message -----
publishTime:[1775797562431], eventTime:[0], key:[null], properties:[__debezium.context.connectorName=mysql, __debezium.context.connectorLogicalName=$server_name, __debezium.context.taskId=0, __debezium.context.runId=019d75be-a5ee-776c-986c-bdbb1de4bf89], content:{"before":null,"after":{"a":11,"b":1},"source":{"version":"3.4.2.Final","connector":"mysql","name":"$server_name","ts_ms":1775797562000,"snapshot":"false","db":"$database_name","sequence":null,"ts_us":1775797562000000,"ts_ns":1775797562000000000,"table":"$table_name","server_id":3249252,"gtid":"e229cf46-2444-11f1-9465-0242ac110003:4570","file":"mysql-bin.000001","pos":1686884,"row":0,"thread":1000,"query":null},"transaction":null,"op":"c","ts_ms":1775797562242,"ts_us":1775797562242747,"ts_ns":1775797562242747919}
