OceanBase logo

OceanBase

A unified distributed database ready for your transactional, analytical, and AI workloads.

DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Resources

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS

OceanBase Cloud

OceanBase Database

Tools

Connectors and Middleware

QUICK START

OceanBase Cloud

OceanBase Database

BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Company

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

International - English
中国站 - 简体中文
日本 - 日本語
Sign In
Start on Cloud

A unified distributed database ready for your transactional, analytical, and AI workloads.

DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS
OceanBase CloudOceanBase Database
ToolsConnectors and Middleware
QUICK START
OceanBase CloudOceanBase Database
BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

Start on Cloud
编组
All Products
    • Databases
    • iconOceanBase Database
    • iconOceanBase Cloud
    • iconOceanBase Tugraph
    • iconInteractive Tutorials
    • iconOceanBase Best Practices
    • Tools
    • iconOceanBase Cloud Platform
    • iconOceanBase Migration Service
    • iconOceanBase Developer Center
    • iconOceanBase Migration Assessment
    • iconOceanBase Admin Tool
    • iconOceanBase Loader and Dumper
    • iconOceanBase Deployer
    • iconKubernetes operator for OceanBase
    • iconOceanBase Diagnostic Tool
    • iconOceanBase Binlog Service
    • Connectors and Middleware
    • iconOceanBase Database Proxy
    • iconEmbedded SQL in C for OceanBase
    • iconOceanBase Call Interface
    • iconOceanBase Connector/C
    • iconOceanBase Connector/J
    • iconOceanBase Connector/ODBC
    • iconOceanBase Connector/NET
icon

OceanBase Database

SQL - V4.2.5

    Download PDF

    OceanBase logo

    The Unified Distributed Database for the AI Era.

    Follow Us
    Products
    OceanBase CloudOceanBase EnterpriseOceanBase Community EditionOceanBase seekdb
    Resources
    DocsBlogLive DemosTraining & Certification
    Company
    About OceanBaseTrust CenterLegalPartnerContact Us
    Follow Us

    © OceanBase 2026. All rights reserved

    Cloud Service AgreementPrivacy PolicySecurity
    Contact Us
    Document Feedback
    1. Documentation Center
    2. OceanBase Database
    3. SQL
    4. V4.2.5
    iconOceanBase Database
    SQL - V 4.2.5
    SQL
    KV
    • V 4.4.2
    • V 4.3.5
    • V 4.3.3
    • V 4.3.1
    • V 4.3.0
    • V 4.2.5
    • V 4.2.2
    • V 4.2.1
    • V 4.2.0
    • V 4.1.0
    • V 4.0.0
    • V 3.1.4 and earlier

    obcdc development instructions

    Last Updated:2025-01-02 01:58:41  Updated
    share
    What is on this page
    Use obcdc to develop your OceanBase data consumption tool
    Check for missing dynamic libraries
    Header files
    Build an obcdc instance
    Use obcdc
    Destroy an obcdc instance
    Considerations
    Examples
    Appendix
    ObCDCFactory
    IObCDCInstance

    folded

    share

    This topic describes how to connect obcdc to your data consumption link.

    Use obcdc to develop your OceanBase data consumption tool

    obcdc is written in C++. After you compile obcdc, you get some dynamic libraries. The development of downstream consumer applications depends on the dynamic libraries and header files such as libobcdc.h and ob_errno.h.

    Check for missing dynamic libraries

    obcdc depends on the oceanbase-ce-libs package, which can be downloaded from OceanBase Download Center for installation.

    rpm -ivh oceanbase-ce-libs-****.rpm
    

    After the installation is completed, run the ldd ./libobcdc.so command to check for missing dynamic libraries on the local server. Make sure that all dynamic libraries, such as libmariadb.so.3 in oceanbase-ce-libs, are on the local server, and configure LD_LIBRARY_PATH for programs that use libobcdc to ensure that obcdc can be linked.

    Header files

    libobcdc.h provides a detailed description of its APIs for your reference. Some frequently used APIs are described in the Appendix section.

    Build an obcdc instance

    You can use the ObCDCFactory::construct_obcdc() method to build obcdc.

    Use obcdc

    • Initialize obcdc: Call the init/init_with_start_tstamp_usec operation to introduce the obcdc configuration and startup timestamp.

      • Configure information: You can pass in the path or map of the configuration file.
      • Specify the startup time: You can specify the time to start log pulling, in seconds or milliseconds.
    • Start obcdc: Call the launch operation to start running obcdc.

    • Retrieve LogRecords: You can call the next_record operation to continuously retrieve incremental data of OceanBase Database from obcdc. This operation allows you to specify the timeout period and the tenant from which you want to retrieve the data. Data is encapsulated in the LogRecord format. The LogRecord memory is allocated by obcdc.

    • Return LogRecords: You can call the release_record operation to return the consumed LogRecords to obcdc. obcdc has a background garbage collection (GC) thread to reclaim the memory in asynchronous mode.

    • Retrieve the IDs of all tenants that use the current obcdc service: Call the get_tenant_ids operation to retrieve the list of all tenants that use the current obcdc service.

    Destroy an obcdc instance

    To destroy an obcdc instance, you must first stop the obcdc instance.

    • Stop obcdc:

      • You can call the stop operation to stop each module of the obcdc instance.
      • Then, you can call the destroy operation to destruct each module of obcdc and release the related resources.
    • Destroy the obcdc instance: Call the ObCDCFactory::deconstruct(IObCDCInstance *instance) operation to destroy the obcdc instance. After that, you cannot access the pointer to the obcdc instance you created in the first step.

    Considerations

    • All data retrieved from obcdc is stored in the memory allocated by the obcdc process. Therefore, make sure that you call the next_record and release_record operations in pairs. Otherwise, the memory cannot be released. You can call the next_record operation multiple times and then call the release_record operation to release the memory occupied by the corresponding LogRecords.

    • After you call the stop or launch operation, you cannot call the launch or init operations.

    • If error codes are returned by the obcdc interfaces, handle them as appropriate:

      • If OB_SUCCESS is returned, the data is retrieved successfully and the returned data pointer is not NULL.
      • If OB_TIMEOUT is returned, no data is pulled from obcdc. You can retry retrieving the data after you receive this error code. Generally, the data timestamps are in real time. If the data timestamps are not in real time, obcdc may have encountered an internal issue. In this case, perform troubleshooting.
      • If OB_IN_STOP_STATE is returned, it indicates that the obcdc instance is stopped. This may be caused because the caller has called the stop or destroy operation in obcdc, or an internal exception has occurred and all modules have stopped working. When you receive this error code, you can record the necessary information, such as the security timestamp, and then exit the process.
      • Error codes of other types are unexpected. You can record the necessary information, such as the security timestamp, and then exit the process.

    Examples

    The following demo shows how to use obcdc. For more information, see obcdc_tailf and obcdc_tailf source code.

    Notice

    This demo is for development reference only. You cannot directly compile or run it.

    /**
     * Copyright (c) 2021 OceanBase
     * OceanBase CE is licensed under Mulan PubL v2.
     * You can use this software according to the terms and conditions of the Mulan PubL v2.
     * You may obtain a copy of Mulan PubL v2 at:
     *          http://license.coscl.org.cn/MulanPubL-2.0
     * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
     * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
     * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
     * See the Mulan PubL v2 for more details.
     *
     * obcdc_demo
     */
    
    #include <iostream>
    #include "include/libobcdc/libobcdc.h"
    #include "include/libobcdc/ob_errno.h"
    
    using namespace std;
    using namespace oceanbase::libobcdc;
    using namespace oceanbase::common;
    
    typedef IBinlogRecord Record;
    
    #define LOG(msg) \
        do { \
          std::cout << msg << std::endl; \
        } while (0)
    
    int create_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *&obcdc_instance)
    {
      int ret = OB_SUCCESS;
    
      if (NULL == (obcdc_instance = cdc_factory.construct_obcdc())) {
        ret = OB_NOT_INIT;
        LOG("[ERROR] construct_obcdc failed");
      }
    
      return ret;
    }
    
    void destroy_obcdc_instance(ObCDCFactory &cdc_factory, IObCDCInstance *obcdc_instance)
    {
      obcdc_instance->stop();
      cdc_factory.deconstruct(obcdc_instance);
    }
    
    int init_obcdc_instance(IObCDCInstance &obcdc_instance)
    {
      int ret = OB_SUCCESS;
      const char *config_path = "conf/libobcdc.conf";
    
      if (OB_SUCCESS != (ret = obcdc_instance.init(config_path, 0))) {
        LOG("obcdc_instance init failed");
      } else if (OB_SUCCESS != (ret = obcdc_instance.launch())) {
        LOG("obcdc_instance launch failed");
      }
    
      return ret;
    }
    
    int fetch_next_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
    {
      int ret = OB_SUCCESS;
      const int64_t timeout = 10000; // usec
    
      if (OB_SUCCESS != (ret = obcdc_instance.next_record(&record, timeout))) {
        if (OB_TIMEOUT != ret) {
          LOG("[WARN] next_record failed");
        }
      } else if (NULL == record) {
        ret = OB_ERR_UNEXPECTED;
        LOG("invalid record");
      }
    
      return ret;
    }
    
    int release_cdc_record(IObCDCInstance &obcdc_instance, Record *record)
    {
      int ret = OB_SUCCESS;
    
      obcdc_instance.release_record(record);
    
      return ret;
    }
    int handle_cdc_record(Record *record)
    {
      int ret = OB_SUCCESS;
      return ret;
    }
    
    int main(int argc, char **argv)
    {
      int ret = OB_SUCCESS;
      ObCDCFactory cdc_factory;
      IObCDCInstance *obcdc_instance = NULL;
    
      if (OB_SUCCESS != create_obcdc_instance(cdc_factory, obcdc_instance)) {
        LOG("[ERROR] construct_obcdc_instance failed");
      } else if (NULL == obcdc_instance) {
        ret = OB_ERR_UNEXPECTED;
        LOG("[ERROR] obcdc_instance should not be null!");
      } else {
        if (OB_SUCCESS != init_obcdc_instance(*obcdc_instance)) {
          LOG("[ERROR] obcdc_instance init failed");
        } else {
          while(OB_SUCCESS == ret) {
            Record *record = NULL;
            if (OB_SUCCESS != (ret = fetch_next_cdc_record(*obcdc_instance, record))) {
              if (OB_TIMEOUT == ret) {
                ret = OB_SUCCESS;
              } else {
                LOG("[ERROR] fetch_next_cdc_record failed");
              }
            } else if (OB_SUCCESS != (ret = handle_cdc_record(record))) {
              LOG("[ERROR] handle_cdc_record failed");
            } else if (OB_SUCCESS != (ret = release_cdc_record(*obcdc_instance, record))) {
              LOG("[ERROR] release_cdc_record failed");
            }
          }
        }
    
        destroy_obcdc_instance(cdc_factory, obcdc_instance);
      }
    
      return 0;
    }
    

    Appendix

    This section shows the code in some obcdc header files.

    ObCDCFactory

    ObCDCFactory is the instance factory of obcdc. It is used in creating and destroying obcdc instances.

    Notice

    Each process can create only one obcdc instance.

    // libobcdc.h: ObCDCFactory
    namespace oceanbase{
    namespace liboblog{
    
    class ObCDCFactory
    {
    public:
      ObCDCFactory();
      ~ObCDCFactory();
    public:
      IObCDCInstance *construct_obcdc();
      void deconstruct(IObCDCInstance *log);
    };
    
    }
    }
    

    IObCDCInstance

    IObCDCInstance is an API provided by obcdc for external access. It provides the capabilities of initializing, starting, stopping, and destroying obcdc instances and retrieving and returning data. The following example describes some frequently used API definitions.

    // libobcdc.h: IObCDCInstance
    namespace oceanbase{
    namespace liboblog{
    
    // IObCDCInstance is an API provided by libobcdc for external access.
    // Note that this example ignores the input parameters that you need to pass to the API.
    
    class IObCDCInstance
    {
    public:
      virtual ~IObCDCInstance() {};
    public:
      /*
       * init libobcdc
       * @param config_file       config file name
       * @param start_timestamp   start timestamp (by second)
       * @param err_cb            error callback function pointer
       */
      virtual int init(const char *config_file,
          const uint64_t start_timestamp_sec,
          ERROR_CALLBACK err_cb = NULL) = 0;
    
      /*
       * init libobcdc
       * @param configs         config by map
       * @param start_timestamp start timestamp (by second)
       * @param err_cb          error callback function pointer
       */
      virtual int init(const std::map<std::string, std::string> &configs,
          const uint64_t start_timestamp_sec,
          ERROR_CALLBACK err_cb = NULL) = 0;
    
      /*
       * init libobcdc
       * @param configs         config by map
       * @param start_timestamp start timestamp by microsecond
       * @param err_cb          error callback function pointer
       */
      virtual int init_with_start_tstamp_usec(const std::map<std::string, std::string> &configs,
          const uint64_t start_timestamp_usec,
          ERROR_CALLBACK err_cb = NULL) = 0;
    
      virtual void destroy() = 0;
    
      /*
       * fetch next binlog record from OB cluster
       * @param record           binlog record, memory allocated by oblog, support release_record(corresponding times) after mutli next_record
       * @param OB_SUCCESS       success
       * @param OB_TIMEOUT       timeout
       * @param other errorcode  fail
       */
      virtual int next_record(ICDCRecord **record, const int64_t timeout_us) = 0;
    
      /*
       * fetch next binlog record from OB cluster
       * @param [out] record        binlog record, memory allocated by oblog, support release_record(corresponding tiems) after mutli next_record
       * @param [out] major_version major version of ICDCRecord
       * @param [out] tenant_id     tenant id of ICDCRecord
       *
       * @param OB_SUCCESS          success
       * @param OB_TIMEOUT          timeout
       * @param other error code    fail
       */
    
      virtual int next_record(ICDCRecord **record,
          int32_t &major_version,
          uint64_t &tenant_id,
          const int64_t timeout_us) = 0;
    
      /*
       * release recorcd for EACH ICDCRecord
       * @param record
       */
      virtual void release_record(ICDCRecord *record) = 0;
    
      /*
       * Launch libobcdc
       * @retval OB_SUCCESS on success
       * @retval ! OB_SUCCESS on fail
       */
      virtual int launch() = 0;
    
      /*
       * Stop libobcdc
       */
    
      virtual void stop() = 0;
    
      /// get all serving tenant id list after oblog inited
      ///
      /// @param [out]            tenant_ids tenant ids that oblog serving
      ///
      /// @retval OB_SUCCESS      success
      /// @retval other value     fail
    
      virtual int get_tenant_ids(std::vector<uint64_t> &tenant_ids) = 0;
    };
    
    }
    }
    

    Previous topic

    Install and deploy OBCDC
    Last

    Next topic

    obcdc_tailf
    Next
    What is on this page
    Use obcdc to develop your OceanBase data consumption tool
    Check for missing dynamic libraries
    Header files
    Build an obcdc instance
    Use obcdc
    Destroy an obcdc instance
    Considerations
    Examples
    Appendix
    ObCDCFactory
    IObCDCInstance