This topic describes how to integrate the Community Edition of obcdc into your data consumption pipeline.
Applicability
This topic applies only to the Community Edition of obcdc.
Develop your own OceanBase data consumption tool by using the obcdc dynamic library
obcdc is written in C++. Its compiled output is a dynamic library. To develop a downstream consumption program, you need to depend on the dynamic library and the header files (libobcdc.h and ob_errno.h).
Introduction
The Community Edition of obcdc depends on the oceanbase-ce-libs package. You can download and install this package from Software Center.
rpm -ivh oceanbase-ce-libs-****.rpm
After the installation, run the ldd ./libobcdc.so command to check whether any dynamic library is missing locally. If so, make sure that all dependent libraries are available locally and configure LD_LIBRARY_PATH for the program that uses libobcdc.so to ensure that obcdc can connect normally. For example, oceanbase-ce-libs contains libmariadb.so.3.
Header files
For detailed descriptions about the interfaces, see libobcdc.h.
Create an obcdc instance
You can call the ObCDCFactory::construct_obcdc() method to create an obcdc instance.
Use obcdc
Initialize the interface: You must call the
init/init_with_start_tstamp_usecinterface to provide obcdc with the configuration information and the startup timestamp.- Configuration information: You can specify the path of the configuration file or a map of the configuration file.
- Startup time: The startup time indicates the start time for pulling logs. The unit can be seconds or milliseconds.
Startup interface: You can call the launch interface to instruct obcdc to start working.
Interface for obtaining LogRecord: You can call the
next_recordinterface to continuously obtain incremental data of OceanBase Database from obcdc. This interface allows you to specify the timeout period and the tenant data. The data is encapsulated in the LogRecord format. The LogRecord memory is allocated by obcdc.Interface for returning LogRecord: You can call the
release_recordinterface to return the consumed LogRecord to obcdc. obcdc recycles the memory by using the background GC thread (asynchronous recycling).Obtain the tenant IDs of all tenants served by the current obcdc: You can call the
get_tenant_idsinterface to obtain the list of all tenants served by the current obcdc.
Destroy an obcdc instance
You must stop an obcdc instance before destroying it.
Stop obcdc:
- Call the
stopinterface to instruct obcdc to stop the operation of each module. - Call the
destroyinterface to destruct the modules of obcdc and release the resources.
- Call the
Destroy an obcdc instance: You can call the
ObCDCFactory::deconstruct(IObCDCInstance *instance)method to destroy the obcdc instance. After the operation, you can no longer access the obcdc instance pointer obtained in the first step.
Considerations
All data retrieved from obcdc is stored in memory allocated in the obcdc process. Therefore, you must call the
next_recordandrelease_recordinterfaces in pairs to avoid memory leaks. You can call therelease_recordinterface to return the LogRecord data after multiple calls to thenext_recordinterface.You cannot call the launch or init interface after you call the stop interface.
You must handle all error codes returned by obcdc functions as needed:
- The error code
OB_SUCCESSindicates that data is successfully retrieved and the data pointer is not NULL. - The error code
OB_TIMEOUTindicates that no data can be pulled from obcdc. You can retry the data pull operation. If the data timestamp is real-time, this error code indicates that there is an issue in obcdc. You need to investigate the issue. - The error code
OB_IN_STOP_STATEindicates that obcdc is stopped. It may be because you called the stop or destroy interface of obcdc, or because obcdc encountered an exception that it cannot handle. In this case, all modules of obcdc are marked to stop working. Upon receiving this error code, you can log necessary information such as the safe point and then exit the process. - Other error codes are unexpected. Upon receiving them, you can log necessary information such as the safe point and then exit the process.
- The error code
Examples
The following code shows how to use obcdc. For more information, see obcdc_tailf source code.
Notice
This sample code can only provide development guidance. It cannot be compiled or run.
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*
* obcdc_demo
*/
#include <iostream>
#include "include/libobcdc/libobcdc.h"
#include "include/libobcdc/ob_errno.h"
using namespace std;
using namespace oceanbase::libobcdc;
using namespace oceanbase::common;
typedef IBinlogRecord Record;
#define LOG(msg) \
do { \
std::cout << msg << std::endl; \
} while (0)
int create_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *&obcdc_instance)
{
int ret = OB_SUCCESS;
if (NULL == (obcdc_instance = cdc_factory.construct_obcdc())) {
ret = OB_NOT_INIT;
LOG("[ERROR] construct_obcdc failed");
}
return ret;
}
void destroy_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *obcdc_instance)
{
obcdc_instance->stop();
cdc_factory.deconstruct(obcdc_instance);
}
int init_obcdc_instance(IObCDCInstance &obcdc_instance)
{
int ret = OB_SUCCESS;
const char *config_path = "conf/libobcdc.conf";
if (OB_SUCCESS != (ret = obcdc_instance.init(config_path, 0))) {
LOG("obcdc_instance init failed");
} else if (OB_SUCCESS != (ret = obcdc_instance.launch())) {
LOG("obcdc_instance launch failed");
}
return ret;
}
int fetch_next_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
const int64_t timeout = 10000; // usec
if (OB_SUCCESS != (ret = obcdc_instance.next_record(&record, timeout))) {
if (OB_TIMEOUT != ret) {
LOG("[WARN] next_record failed");
}
} else if (NULL == record) {
ret = OB_ERR_UNEXPECTED;
LOG("invalid record");
}
return ret;
}
int release_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
obcdc_instance.release_record(record);
return ret;
}
int handle_cdc_record(Record *record)
{
int ret = OB_SUCCESS;
return ret;
}
int main(int argc, char **argv)
{
int ret = OB_SUCCESS;
ObCDCFactory cdc_factory;
IObCDCInstance *obcdc_instance = NULL;
if (OB_SUCCESS != create_obcdc_instance(cdc_factory, obcdc_instance)) {
LOG("[ERROR] construct_obcdc_instance failed");
} else if (NULL == obcdc_instance) {
ret = OB_ERR_UNEXPECTED;
LOG("[ERROR] obcdc_instance should not be null!");
} else {
if (OB_SUCCESS != init_obcdc_instance(*obcdc_instance)) {
LOG("[ERROR] obcdc_instance init failed");
} else {
while(OB_SUCCESS == ret) {
Record *record = NULL;
if (OB_SUCCESS != (ret = fetch_next_cdc_record(*obcdc_instance, record))) {
if (OB_TIMEOUT == ret) {
ret = OB_SUCCESS;
} else {
LOG("[ERROR] fetch_next_cdc_record failed");
}
} else if (OB_SUCCESS != (ret = handle_cdc_record(record))) {
LOG("[ERROR] handle_cdc_record failed");
} else if (OB_SUCCESS != (ret = release_cdc_record(*obcdc_instance, record))) {
LOG("[ERROR] release_cdc_record failed");
}
}
}
destroy_obcdc_instance(cdc_factory, obcdc_instance);
}
return 0;
}
Appendix
This chapter shows the code of some obcdc header files.
ObCDCFactory
ObCDCFactory is an obcdc instance factory that is responsible for creating or destroying obcdc instances.
Notice
Only one obcdc instance can be created in a process.
// libobcdc.h: ObCDCFactory
namespace oceanbase{
namespace liboblog{
class ObCDCFactory
{
public:
ObCDCFactory();
~ObCDCFactory();
public:
IObCDCInstance *construct_obcdc();
void deconstruct(IObCDCInstance *log);
};
}
}
IObCDCInstance
IObCDCInstance is the interface provided by obcdc for external applications. It supports operations such as initialization, startup, shutdown, destruction, data fetching, and data release. The following code shows some commonly used interface definitions.
// libobcdc.h: IObCDCInstance
namespace oceanbase{
namespace liboblog{
// IObCDCInstance is the interface provided by liboblog for external applications.
// Note: The parameters to be passed in for the following interfaces are ignored.
class IObCDCInstance
{
public:
virtual ~IObCDCInstance() {};
public:
/*
* Initialize libobcdc.
* @param config_file Name of the configuration file.
* @param start_timestamp Start timestamp (in seconds).
* @param err_cb Pointer to the error callback function.
*/
virtual int init(const char *config_file,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* Initialize libobcdc.
* @param configs Configuration information specified by using a map.
* @param start_timestamp Start timestamp (in seconds).
* @param err_cb Pointer to the error callback function.
*/
virtual int init(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* Initialize libobcdc.
* @param configs Configuration information specified by using a map.
* @param start_timestamp Start timestamp (in microseconds).
* @param err_cb Pointer to the error callback function.
*/
virtual int init_with_start_tstamp_usec(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_usec,
ERROR_CALLBACK err_cb = NULL) = 0;
virtual void destroy() = 0;
/*
* Fetch the next binlog record from the OceanBase cluster.
* @param record Binlog record. The memory is allocated by oblog. You can call release_record multiple times after calling next_record multiple times.
* @param OB_SUCCESS Success.
* @param OB_TIMEOUT Timeout.
* @param other errorcode Failure.
*/
virtual int next_record(ICDCRecord **record, const int64_t timeout_us) = 0;
/*
* Fetch the next binlog record from the OceanBase cluster.
* @param [out] record Binlog record. The memory is allocated by oblog. You can call release_record multiple times after calling next_record multiple times.
* @param [out] major_version Major version of ICDCRecord.
* @param [out] tenant_id Tenant ID of ICDCRecord.
*
* @param OB_SUCCESS Success.
* @param OB_TIMEOUT Timeout.
* @param other error code Failure.
*/
virtual int next_record(ICDCRecord **record,
int32_t &major_version,
uint64_t &tenant_id,
const int64_t timeout_us) = 0;
/*
* Release each ICDCRecord.
* @param record
*/
virtual void release_record(ICDCRecord *record) = 0;
/*
* Launch libobcdc.
* @retval OB_SUCCESS on success
* @retval ! OB_SUCCESS on fail
*/
virtual int launch() = 0;
/*
* Stop libobcdc.
*/
virtual void stop() = 0;
/// Get the list of all tenant IDs served by oblog after oblog is initialized.
///
/// @param [out] tenant_ids Tenant IDs served by oblog.
///
/// @retval OB_SUCCESS Success.
/// @retval other value Failure.
virtual int get_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
};
}
}