Overview
RisingWave is a cloud-native distributed SQL streaming database designed to reduce the cost of building real-time applications.
This topic describes how to connect to OceanBase using RisingWave’s MySQL CDC (Change Data Capture) feature. With CDC, RisingWave can capture data changes from the OceanBase MySQL database in real time and process and analyze them as streams.
Version compatibility
- OceanBase Database version: V4.2.1 BP10 or later, V4.2.5 BP1 or later.
Limitations
When using RisingWave CDC to connect to OceanBase Database, please note the following limitations:
- Permissions: The user connecting to OceanBase Database must have the
SELECT,REPLICATION SLAVE, andREPLICATION CLIENTpermissions. - server.id uniqueness: The
server.idparameter must be unique in the database. Generally, a random number greater than 1000 is specified. - Data type mapping: OceanBase Database data types are mapped to RisingWave data types. Some types may need to be adjusted.
Prerequisites
Before using RisingWave, make sure the following conditions are met:
- You have deployed OceanBase Database and created a MySQL-compatible user tenant. For more information about how to create a user tenant, see Create a tenant.
- You have enabled the Binlog service in the created MySQL-compatible user tenant. For more information, see OceanBase Binlog service.
- You have installed the PostgreSQL command-line client tool psql, and its version is compatible with the target database. You can run the
psql --versioncommand in the terminal to check the installation status. - You have completed the Docker deployment, and the Docker service is running. The current user has the permission to execute Docker commands (you can run the
docker infocommand to verify this).
Procedure
Step 1: Obtain the database connection string
Contact the OceanBase Database deployment personnel to obtain the connection string. For example:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
Parameter description:
$host: the IP address for connection. For ODP connection, use the ODP address. For direct connection, use the OBServer IP address.$port: the port for connection. For ODP connection, the default value is2883. For direct connection, the default value is2881.$database_name: the name of the database.Notice
The user used to connect to the tenant must have the
CREATE,INSERT,DROP,REPLICATION SLAVE,REPLICATION CLIENT, andSELECTprivileges on the database. For more information about user privileges, see Privilege types in MySQL mode.$user_name: the account for connection. For ODP connection, the format isUser@Tenant#ClusterorCluster:Tenant:User. For direct connection, the format isUser@Tenant.$password: the password of the account.
For more information about the connection string, see Connect to an OceanBase tenant by using OBClient.
Here is an example:
obclient -hxxx.xxx.xxx.xxx -P2881 -utest_user001@mysql001 -p****** -Dtest
Step 2: Deploy RisingWave
Run the following command to deploy RisingWave:
sudo docker run -d --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest single_node
# Check whether RisingWave has started.
sudo netstat -ntlp | grep 4566
This command starts a single-node instance of RisingWave, which listens on port 4566 (RisingWave service) and port 5691 (monitoring interface).
Step 3: Prepare data in OceanBase Database
Create a user and a test table in OceanBase Database for CDC:
CREATE USER '$rs_name'@'%' IDENTIFIED BY '$rs_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '$rs_name'@'%';
FLUSH PRIVILEGES;
create table tbl_t1 (
c01 int,
c02 char,
c03 varchar(10),
c04 text,
c05 bigint,
c06 long,
c07 numeric(10, 2),
c08 decimal(20, 8),
c09 date,
c10 datetime,
c11 datetime(3),
c12 timestamp(6),
primary key (c01, c02)
);
insert into tbl_t1 (c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12)
values (1, 'a', 'abc', 'abcdefg', 123456789000, 99999999999999, 123.456, 12345.6789,
current_date,
current_timestamp,
'2024-01-01 18:18:18.888',
'2024-01-01 06:06:06.666666');
select * from tbl_t1;
Step 4: Connect RisingWave to OceanBase Database
Run the following command to connect to RisingWave by using psql:
psql -h 127.0.0.1 -p 4566 -d dev -U root
Run the following statements in RisingWave to create a schema and a MySQL-CDC source:
-- Create a schema.
create schema rwtest;
-- Create a MySQL-CDC source.
drop source if exists rwtest.mysql_cdc cascade;
create source rwtest.mysql_cdc
with (
connector = 'mysql-cdc',
hostname = '$host',
port = '$port',
username = '$rs_name',
password = '$rs_password',
database.name = '$database_name',
server.id = 1001, -- Generally, specify a random number greater than 1000. Make sure that the server.id is unique in the database.
-- You can specify Debezium parameters, such as skip unknown DDL statements.
debezium.schema.history.internal.store.only.captured.tables.ddl = 'true'
);
show sources from rwtest;
-- Create a CDC table to map the OceanBase table.
DROP TABLE IF EXISTS rwtest.test_mysql_cdc;
CREATE TABLE rwtest.test_mysql_cdc (
c01 int,
c02 string, -- No need to define the precision for the string type.
c03 string,
c04 string,
c05 bigint,
c06 string,
c07 numeric, -- No need to define the precision for the numeric type.
c08 numeric,
c09 date,
c10 timestamp, -- No need to define the precision for the timestamp type.
c11 timestamp,
c12 timestamptz,
PRIMARY KEY (c01, c02)
) FROM rwtest.mysql_cdc TABLE '$database_name.tbl_t1';
select * from rwtest.test_mysql_cdc;
Verify the result
Run the following statement to query the CDC table and verify the data synchronization result:
dev=> select * from rwtest.test_mysql_cdc;
c01 | c02 | c03 | c04 | c05 | c06 | c07 | c08 | c09 | c10 | c11 |
c12
-----+-----+-----+---------+--------------+----------------+--------+----------------+------------+---------------------+-------------------------+-------
---------------------------
1 | a | abc | abcdefg | 123456789000 | 99999999999999 | 123.46 | 12345.67890000 | 2026-02-05 | 2026-02-05 14:17:17 | 2024-01-01 18:18:18.888 | 2023-1
2-31 22:06:06.666666+00:00
(1 row)
References
For more information, see RisingWave official documentation.
