This topic describes how to connect obcdc to your data consumption link.
Use obcdc to develop your OceanBase data consumption tool
obcdc is written in C++. After you compile obcdc, you get some dynamic libraries. The development of downstream consumer applications depends on the dynamic libraries and header files such as libobcdc.h and ob_errno.h.
Check for missing dynamic libraries
obcdc depends on the oceanbase-ce-libs package, which can be downloaded from OceanBase Download Center for installation.
rpm -ivh oceanbase-ce-libs-****.rpm
After the installation is completed, run the ldd ./libobcdc.so command to check for missing dynamic libraries on the local server. Make sure that all dynamic libraries, such as libmariadb.so.3 in oceanbase-ce-libs, are on the local server, and configure LD_LIBRARY_PATH for programs that use libobcdc to ensure that obcdc can be linked.
Header files
libobcdc.h provides a detailed description of its API operations for your reference. Some frequently used API operations are described in the Appendix section.
Build an obcdc instance
You can use the ObCDCFactory::construct_obcdc() method to build obcdc.
Use obcdc
Initialize obcdc: Call the
init/init_with_start_tstamp_usecoperation to introduce the obcdc configuration and startup timestamp.- Configure information: You can pass in the path or map of the configuration file.
- Specify the startup time: You can specify the time to start log pulling, in seconds or milliseconds.
Start obcdc: Call the
launchoperation to start running obcdc.Retrieve LogRecords: You can call the
next_recordoperation to continuously retrieve incremental data of OceanBase Database from obcdc. This operation allows you to specify the timeout period and the tenant from which you want to retrieve the data. Data is encapsulated in the LogRecord format. The LogRecord memory is allocated by obcdc.Return LogRecords: You can call the
release_recordoperation to return the consumed LogRecords to obcdc. obcdc has a background garbage collection (GC) thread to reclaim the memory in asynchronous mode.Retrieve the IDs of all tenants that use the current obcdc service: Call the
get_tenant_idsoperation to retrieve the list of all tenants that use the current obcdc service.
Destroy an obcdc instance
To destroy an obcdc instance, you must first stop the obcdc instance.
Stop obcdc:
- You can call the
stopoperation to stop each module of the obcdc instance. - Then, you can call the
destroyoperation to destruct each module of obcdc and release the related resources.
- You can call the
Destroy the obcdc instance: Call the
ObCDCFactory::deconstruct(IObCDCInstance *instance)operation to destroy the obcdc instance. After that, you cannot access the pointer to the obcdc instance you created in the first step.
Considerations
All data retrieved from obcdc is stored in the memory allocated by the obcdc process. Therefore, make sure that you call the
next_recordandrelease_recordoperations in pairs. Otherwise, the memory cannot be released. You can call thenext_recordoperation multiple times and then call therelease_recordoperation to release the memory occupied by the corresponding LogRecords.After you call the
stoporlaunchoperation, you cannot call thelaunchorinitoperations.If error codes are returned by the obcdc interfaces, handle them as appropriate:
- If
OB_SUCCESSis returned, the data is retrieved successfully and the returned data pointer is not NULL. - If
OB_TIMEOUTis returned, no data is pulled from obcdc. You can retry retrieving the data after you receive this error code. Generally, the data timestamps are in real time. If the data timestamps are not in real time, obcdc may have encountered an internal issue. In this case, perform troubleshooting. - If
OB_IN_STOP_STATEis returned, it indicates that the obcdc instance is stopped. This may be caused because the caller has called thestopordestroyoperation in obcdc, or an internal exception has occurred and all modules have stopped working. When you receive this error code, you can record the necessary information, such as the security timestamp, and then exit the process. - Error codes of other types are unexpected. You can record the necessary information, such as the security timestamp, and then exit the process.
- If
Example
The following demo shows how to use obcdc. For more information, see obcdc_tailf and obcdc_tailf source code.
Notice
This demo is for development reference only. You cannot directly compile or run it.
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*
* obcdc_demo
*/
#include <iostream>
#include "include/libobcdc/libobcdc.h"
#include "include/libobcdc/ob_errno.h"
using namespace std;
using namespace oceanbase::libobcdc;
using namespace oceanbase::common;
typedef IBinlogRecord Record;
#define LOG(msg) \
do { \
std::cout << msg << std::endl; \
} while (0)
int create_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *&obcdc_instance)
{
int ret = OB_SUCCESS;
if (NULL == (obcdc_instance = cdc_factory.construct_obcdc())) {
ret = OB_NOT_INIT;
LOG("[ERROR] construct_obcdc failed");
}
return ret;
}
void destroy_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *obcdc_instance)
{
obcdc_instance->stop();
cdc_factory.deconstruct(obcdc_instance);
}
int init_obcdc_instance(IObCDCInstance &obcdc_instance)
{
int ret = OB_SUCCESS;
const char *config_path = "conf/libobcdc.conf";
if (OB_SUCCESS != (ret = obcdc_instance.init(config_path, 0))) {
LOG("obcdc_instance init failed");
} else if (OB_SUCCESS != (ret = obcdc_instance.launch())) {
LOG("obcdc_instance launch failed");
}
return ret;
}
int fetch_next_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
const int64_t timeout = 10000; // usec
if (OB_SUCCESS != (ret = obcdc_instance.next_record(&record, timeout))) {
if (OB_TIMEOUT != ret) {
LOG("[WARN] next_record failed");
}
} else if (NULL == record) {
ret = OB_ERR_UNEXPECTED;
LOG("invalid record");
}
return ret;
}
int release_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
{
int ret = OB_SUCCESS;
obcdc_instance.release_record(record);
return ret;
}
int handle_cdc_record(Record *record)
{
int ret = OB_SUCCESS;
return ret;
}
int main(int argc, char **argv)
{
int ret = OB_SUCCESS;
ObCDCFactory cdc_factory;
IObCDCInstance *obcdc_instance = NULL;
if (OB_SUCCESS != create_obcdc_instance(cdc_factory, obcdc_instance)) {
LOG("[ERROR] construct_obcdc_instance failed");
} else if (NULL == obcdc_instance) {
ret = OB_ERR_UNEXPECTED;
LOG("[ERROR] obcdc_instance should not be null!");
} else {
if (OB_SUCCESS != init_obcdc_instance(*obcdc_instance)) {
LOG("[ERROR] obcdc_instance init failed");
} else {
while(OB_SUCCESS == ret) {
Record *record = NULL;
if (OB_SUCCESS != (ret = fetch_next_cdc_record(*obcdc_instance, record))) {
if (OB_TIMEOUT == ret) {
ret = OB_SUCCESS;
} else {
LOG("[ERROR] fetch_next_cdc_record failed");
}
} else if (OB_SUCCESS != (ret = handle_cdc_record(record))) {
LOG("[ERROR] handle_cdc_record failed");
} else if (OB_SUCCESS != (ret = release_cdc_record(*obcdc_instance, record))) {
LOG("[ERROR] release_cdc_record failed");
}
}
}
destroy_obcdc_instance(cdc_factory, obcdc_instance);
}
return 0;
}
Appendix
This section shows the code in some obcdc header files.
ObCDCFactory
ObCDCFactory is the instance factory of obcdc. It is used in creating and destroying obcdc instances.
Notice
Each process can create only one obcdc instance.
// libobcdc.h: ObCDCFactory
namespace oceanbase{
namespace liboblog{
class ObCDCFactory
{
public:
ObCDCFactory();
~ObCDCFactory();
public:
IObCDCInstance *construct_obcdc();
void deconstruct(IObCDCInstance *log);
};
}
}
IObCDCInstance
IObCDCInstance is an API provided by obcdc for external access. It provides the capabilities of initializing, starting, stopping, and destroying obcdc instances and retrieving and returning data. The following example describes some frequently used API definitions.
// libobcdc.h: IObCDCInstance
namespace oceanbase{
namespace liboblog{
// IObCDCInstance is an API provided by libobcdc for external access.
// Note that this example ignores the input parameters that you need to pass to the API.
class IObCDCInstance
{
public:
virtual ~IObCDCInstance() {};
public:
/*
* init libobcdc
* @param config_file config file name
* @param start_timestamp start timestamp (by second)
* @param err_cb error callback function pointer
*/
virtual int init(const char *config_file,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* init libobcdc
* @param configs config by map
* @param start_timestamp start timestamp (by second)
* @param err_cb error callback function pointer
*/
virtual int init(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_sec,
ERROR_CALLBACK err_cb = NULL) = 0;
/*
* init libobcdc
* @param configs config by map
* @param start_timestamp start timestamp by microsecond
* @param err_cb error callback function pointer
*/
virtual int init_with_start_tstamp_usec(const std::map<std::string, std::string> &configs,
const uint64_t start_timestamp_usec,
ERROR_CALLBACK err_cb = NULL) = 0;
virtual void destroy() = 0;
/*
* fetch next binlog record from OB cluster
* @param record binlog record, memory allocated by oblog, support release_record(corresponding times) after mutli next_record
* @param OB_SUCCESS success
* @param OB_TIMEOUT timeout
* @param other errorcode fail
*/
virtual int next_record(ICDCRecord **record, const int64_t timeout_us) = 0;
/*
* fetch next binlog record from OB cluster
* @param [out] record binlog record, memory allocated by oblog, support release_record(corresponding tiems) after mutli next_record
* @param [out] major_version major version of ICDCRecord
* @param [out] tenant_id tenant id of ICDCRecord
*
* @param OB_SUCCESS success
* @param OB_TIMEOUT timeout
* @param other error code fail
*/
virtual int next_record(ICDCRecord **record,
int32_t &major_version,
uint64_t &tenant_id,
const int64_t timeout_us) = 0;
/*
* release recorcd for EACH ICDCRecord
* @param record
*/
virtual void release_record(ICDCRecord *record) = 0;
/*
* Launch libobcdc
* @retval OB_SUCCESS on success
* @retval ! OB_SUCCESS on fail
*/
virtual int launch() = 0;
/*
* Stop libobcdc
*/
virtual void stop() = 0;
/// get all serving tenant id list after oblog inited
///
/// @param [out] tenant_ids tenant ids that oblog serving
///
/// @retval OB_SUCCESS success
/// @retval other value fail
virtual int get_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
};
}
}