This topic describes how to integrate Community Edition obcdc into your data consumption pipeline.
Applicability
This topic applies only to Community Edition obcdc.
Develop your own OceanBase data consumption tool by using the obcdc library
obcdc is a C++ library that is compiled into a dynamic library. If you want to develop a downstream consumption program, you need to depend on the dynamic library and the header files (libobcdc.h and ob_errno.h).
Introduction
The Community Edition of obcdc depends on the oceanbase-ce-libs package. You can download and install this package from Software Center.
rpm -ivh oceanbase-ce-libs-****.rpm
After the installation is completed, run the ldd ./libobcdc.so command to check whether any dynamic libraries are missing locally. If so, make sure that all dependent libraries are available locally and configure LD_LIBRARY_PATH for the program that uses libobcdc.so, for example, libmariadb.so.3 in oceanbase-ce-libs.
Header files
For more information about the interfaces of libobcdc.h, see the comments in the code.
Create an obcdc instance
You can call the ObCDCFactory::construct_obcdc() method to create an obcdc instance.
Use obcdc
Initialize the interface: You must call the
init/init_with_start_tstamp_usecinterface to provide obcdc with the configuration information and the start timestamp.- Configuration information: You can specify the path of the configuration file or the configuration file map.
- Start time: The start time, which indicates the beginning time for pulling logs, can be specified in seconds or milliseconds.
Start the interface: Call the launch interface to instruct obcdc to start working.
LogRecord retrieval interface: Call the
next_recordinterface to continuously obtain incremental data of OceanBase Database from obcdc. This interface allows you to specify the timeout period and the tenant data. The data is encapsulated in the LogRecord format. The LogRecord memory is allocated by obcdc.LogRecord return interface: Call the
release_recordinterface to return the LogRecord that has been consumed to obcdc. obcdc recycles the memory by using the background GC thread (asynchronous recycling).Obtain the tenant IDs of all tenants served by the current obcdc instance: Call the
get_tenant_idsinterface to obtain the list of all tenants served by the current obcdc instance.
Destroy an obcdc instance
You must stop an obcdc instance before destroying it.
Stop obcdc:
- Call the
stopinterface to instruct obcdc to stop the operation of each module. - Call the
destroyinterface to destruct the modules of obcdc and release the related resources.
- Call the
Destroy an obcdc instance: Call the
ObCDCFactory::deconstruct(IObCDCInstance *instance)method to destroy the obcdc instance. After the operation, the pointer to the obcdc instance obtained in the first step can no longer be accessed.
Considerations
All data obtained from obcdc is allocated in the memory of the obcdc process. Therefore, you must call the
next_recordandrelease_recordinterfaces in pairs to avoid memory leaks. You can call therelease_recordinterface to return the LogRecord data after multiple calls to thenext_recordinterface.You cannot call the launch or init interface after you call the stop interface.
You must handle all error codes returned by obcdc as needed:
- The error code
OB_SUCCESSindicates that data is obtained successfully. The data pointer returned cannot be NULL. - The error code
OB_TIMEOUTindicates that no data can be pulled from obcdc at present. You can retry the operation to pull data. If the data timestamp is not real time, there may be an issue in obcdc. You must further investigate the issue. - The error code
OB_IN_STOP_STATEindicates that obcdc is stopped. It may be stopped by you by calling the stop/destroy interface, or it may be stopped because of an exception that cannot be handled by obcdc, which causes the modules of obcdc to stop working. You can obtain necessary information such as the safe point and then exit the process. - An unexpected error code indicates that you must obtain necessary information such as the safe point and then exit the process.
- The error code
Examples
A simple demo is provided to show you how to use obcdc. For more information, see obcdc_tailf source code.
Notice
This demo is provided for your reference only. You cannot compile or run it.
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*
* obcdc_demo
*/
#include <iostream>
#include "include/libobcdc/libobcdc.h"
#include "include/libobcdc/ob_errno.h"
using namespace std;
using namespace oceanbase::libobcdc;
using namespace oceanbase::common;
typedef IBinlogRecord Record;
#define LOG(msg) \
do { \
std::cout << msg << std::endl; \
} while (0)
int create_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *&obcdc_instance)
{
int ret = OB_SUCCESS;
if (NULL == (obcdc_instance = cdc_factory.construct_obcdc())) {
ret = OB_NOT_INIT;
LOG("[ERROR] construct_obcdc failed");
}
return ret;
}
void destroy_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *obcdc_instance)
{
obcdc_instance->stop();
cdc_factory.deconstruct(obcdc_instance);
}
int init_obcdc_instance(IObCDCInstance &obcdc_instance)
{
int ret = OB_SUCCESS;
const char *config_path = "conf/libobcdc.conf";
if (OB_SUCCESS != (ret = obcdc_instance.init(config_path, 0))) {
LOG("obcdc_instance init failed");
} else if (OB_SUCCESS != (ret = obcdc_instance.launch())) {
LOG("obcdc_instance launch failed");
}
return ret;
}
int fetch_next_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
const int64_t timeout = 10000; // usec
if (OB_SUCCESS != (ret = obcdc_instance.next_record(&record, timeout))) {
if (OB_TIMEOUT != ret) {
LOG("[WARN] next_record failed");
}
} else if (NULL == record) {
ret = OB_ERR_UNEXPECTED;
LOG("invalid record");
}
return ret;
}
int release_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
obcdc_instance.release_record(record);
return ret;
}
int handle_cdc_record(Record *record)
{
int ret = OB_SUCCESS;
return ret;
}
int main(int argc, char **argv)
{
int ret = OB_SUCCESS;
ObCDCFactory cdc_factory;
IObCDCInstance *obcdc_instance = NULL;
if (OB_SUCCESS != create_obcdc_instance(cdc_factory, obcdc_instance)) {
LOG("[ERROR] construct_obcdc_instance failed");
} else if (NULL == obcdc_instance) {
ret = OB_ERR_UNEXPECTED;
LOG("[ERROR] obcdc_instance should not be null!");
} else {
if (OB_SUCCESS != init_obcdc_instance(*obcdc_instance)) {
LOG("[ERROR] obcdc_instance init failed");
} else {
while(OB_SUCCESS == ret) {
Record *record = NULL;
if (OB_SUCCESS != (ret = fetch_next_cdc_record(*obcdc_instance, record))) {
if (OB_TIMEOUT == ret) {
ret = OB_SUCCESS;
} else {
LOG("[ERROR] fetch_next_cdc_record failed");
}
} else if (OB_SUCCESS != (ret = handle_cdc_record(record))) {
LOG("[ERROR] handle_cdc_record failed");
} else if (OB_SUCCESS != (ret = release_cdc_record(*obcdc_instance, record))) {
LOG("[ERROR] release_cdc_record failed");
}
}
}
destroy_obcdc_instance(cdc_factory, obcdc_instance);
}
return 0;
}
Appendix
This chapter shows the code of some obcdc header files.
ObCDCFactory
ObCDCFactory is an obcdc instance factory that is used to create or destroy obcdc instances.
Notice
Only one obcdc instance can be created in a process.
// libobcdc.h: ObCDCFactory
namespace oceanbase{
namespace liboblog{
class ObCDCFactory
{
public:
ObCDCFactory();
~ObCDCFactory();
public:
IObCDCInstance *construct_obcdc();
void deconstruct(IObCDCInstance *log);
};
}
}
IObCDCInstance
IObCDCInstance is the external interface of obcdc, providing capabilities such as initialization, startup, shutdown, destruction, data retrieval, and data release. The following code shows some commonly used interface definitions.
// libobcdc.h: IObCDCInstance
namespace oceanbase{
namespace liboblog{
// IObCDCInstance is the interface provided by liboblog to the outside world.
// Note: Parameters to be passed in for the following interfaces are ignored.
class IObCDCInstance
{
public:
virtual ~IObCDCInstance() {};
public:
/*
* Initialize libobcdc.
* @param config_file Name of the configuration file.
* @param start_timestamp Start timestamp (in seconds).
* @param err_cb Pointer to the error callback function.
*/
virtual int init(const char *config_file,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* Initialize libobcdc.
* @param configs Configuration information specified by using a map.
* @param start_timestamp Start timestamp (in seconds).
* @param err_cb Pointer to the error callback function.
*/
virtual int init(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* Initialize libobcdc.
* @param configs Configuration information specified by using a map.
* @param start_timestamp Start timestamp (in microseconds).
* @param err_cb Pointer to the error callback function.
*/
virtual int init_with_start_tstamp_usec(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_usec,
ERROR_CALLBACK err_cb = NULL) = 0;
virtual void destroy() = 0;
/*
* Fetch the next binlog record from the OB cluster.
* @param record Binlog record. The memory is allocated by oblog. You can call release_record multiple times after multiple calls to next_record.
* @param OB_SUCCESS Success.
* @param OB_TIMEOUT Timeout.
* @param other errorcode Failure.
*/
virtual int next_record(ICDCRecord **record, const int64_t timeout_us) = 0;
/*
* Fetch the next binlog record from the OB cluster.
* @param [out] record Binlog record. The memory is allocated by oblog. You can call release_record multiple times after multiple calls to next_record.
* @param [out] major_version Major version of ICDCRecord.
* @param [out] tenant_id Tenant ID of ICDCRecord.
*
* @param OB_SUCCESS Success.
* @param OB_TIMEOUT Timeout.
* @param other error code Failure.
*/
virtual int next_record(ICDCRecord **record,
int32_t &major_version,
uint64_t &tenant_id,
const int64_t timeout_us) = 0;
/*
* Release each ICDCRecord.
* @param record
*/
virtual void release_record(ICDCRecord *record) = 0;
/*
* Launch libobcdc.
* @retval OB_SUCCESS on success
* @retval ! OB_SUCCESS on fail
*/
virtual int launch() = 0;
/*
* Stop libobcdc.
*/
virtual void stop() = 0;
/// Get the list of all tenant IDs served by oblog after oblog is initialized.
///
/// @param [out] tenant_ids Tenant IDs served by oblog.
///
/// @retval OB_SUCCESS Success.
/// @retval other value Failure.
virtual int get_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
};
}
}