This topic describes how to run the client software of IntelliJ IDEA to consume data from a subscription channel by using an SDK.
Prerequisites
A data subscription task has been created and is running normally.
The endpoint for the data subscription task has been configured, and the machine where the consumer task is located can connect to the subscription address.
A consumer group has been created for the data subscription task.
Considerations
When consuming subscription data, you need to call the commit() method of the UserRecord class to submit the offset information. Otherwise, data may be consumed multiple times.
Consumer groups are independent of each other.
Procedure
Check the SDK version.
You can view the latest Maven dependency on the oms-subscribe-sdk page.
Open IntelliJ IDEA and create a Maven project, then add the SDK dependency to the
pom.xmlfile.<dependency> <groupId>com.oceanbase.oms</groupId> <artifactId>oms-subscribe-sdk</artifactId> <version>${oms.subscribe.sdk.version}</version> </dependency>Edit the consumer code. Here is an example. You need to modify the values of each parameter in
SdkConfig.public static void main(String[] args) { SdkConfig config = new SdkConfig.Builder() .brokerUrl("oms-cn-subsribe-****.com:18000") .topic("oms_***_topic") .groupId("oms****_groupId") .clientId("oms****_clientId") .username("oms_***_username") .password("oms_***_password") .subscribeMode(SubscribeMode.SUBSCRIBE) .startTimestampSeconds(1757576635) .build(); OmsSubscribeClient omsSubscribeClient = new OmsSubscribeClient(config); omsSubscribeClient.addRecordListeners(new TestListener()); omsSubscribeClient.init(); omsSubscribeClient.start(); omsSubscribeClient.stop(); } static class TestListener implements RecordListener { @Override public void consume(UserRecord record) { System.out.println(record.getOperationType()); record.commit("test_info"); } }Parameter Description Acquisition method Required Default value brokerUrl Subscription address in the data subscription task Go to the details page of the data subscription task, and click Subscription Details>Data Subscription tab to obtain the network address and port number in the Subscription Address area. Required N/A topic Subscription topic in the data subscription task Go to the details page of the data subscription task, and click Subscription Details>Data Subscription tab to obtain the subscription topic in the Subscription Topic area. Required N/A groupId Consumer group name in consumption management Go to the details page of the data subscription task, and click Subscription Details>Data Subscription tab to obtain the consumer group name and consumer group account in the Consumption Management area. Required N/A clientId ClientId in consumption management. Applied only when subscribeModeisSUBSCRIBE.N/A Optional N/A username Consumer account The consumer group account that you set up when creating the consumer group. Required N/A password Consumer password The consumer group account password that you set up when creating the consumer group. Required N/A startTimestampSeconds The consumption time point, which is the timestamp of the first piece of data consumed by the SDK client. The format is a Unix timestamp in seconds. For example, 1757582867. The consumption time point must be within the data range of the subscription instance and must be converted to a Unix timestamp.
You can check the data range in the Data Write Timestamp Range area on the data subscription task details page.Optional N/A startOffset The consumption offset, which is set based on the partition. The format is Map . The key is the partition ID and the value is the offset value.
For example, {0: 1000, 1: 2000} indicates that Partition 0 starts consumption from Offset 1000, and Partition 1 starts consumption from Offset 2000.The consumption offset must be within the data range of the subscription instance.
You can check the data range in the Data Write Timestamp Range area on the data subscription task details page.Optional N/A subscribeMode The consumption modes of the SDK client include ASSIGN (default) and SUBSCRIBE. N/A Optional ASSIGN checkpointNotExistScope The behavior if unable to find the consumption offset at startup. Valid values include BEGINNING, END, and EXCEPTION.
The reason for not finding the consumption offset may be the start timestamp is not provided or the provided start timestamp does not correspond to an existing offset.N/A Optional EXCEPTION properties Additional Kafka consumer configuration properties such as tuning parameters. The format is a Properties object, allowing you to set any configuration option for the Kafka consumer. For more information, see the consumer configuration items in the official Kafka documentation. Optional N/A pollTimeoutMs The timeout time of the Kafka consumer poll operation, in milliseconds. N/A Optional 500 checkpointCommitInterval The checkpoint submission interval, in milliseconds. The SDK will periodically submit consumption progress based on this interval.
Notice: The source of the submitted offset must rely on your call to the commit() method of UserRecord.N/A Optional 5000 userRegisteredStore The user-defined checkpoint storage implementation for customizing checkpoint persistence methods. This requires the SerializeCheckpointStoreinterface implemented.Implement the SerializeCheckpointStoreinterface to provide customized checkpoint storage logic.Optional N/A