Incr-Sync or Full-Import tuning

2023-12-08 06:52:31  Updated

Concepts

  • TPS

    The number of messages pulled by the source per second.

  • Latency

    The latency of the current task, in seconds. Security checkpoints are not considered when the latency is calculated.

  • ReadQ

    In asynchronous transmission, the intermediate framework obtains data from ReadQ and writes the data to the destination. The data of the current process that is not obtained by the intermediate framework will be cached in ReadQ. The default maximum value of ReadQ is 4096. If the value of ReadQ is small, the source has no data or a bottleneck exists in pulling data from the source.

  • WriteConsume

    The time required to write {batch.size}, in milliseconds. A smaller WriteConsume value indicates higher performance of the destination.

Query metrics

Run the ./connector_utils.sh metrics command to query metrics.

./connector_utils.sh metrics

2022-09-22 12:49:48.876
SOURCE: [RPS:0.0, IOPS:0.0M, delay:1489ms]
SINK: [RPS:0.0, TPS:0.0, IOPS:0.0M, delay:2986440ms]
SINK_TIME: [execute_time:0.0ms/record, commit_time:0.0ms/batch]
SINK_SLOW_ROUTES:
SINK_THREAD: 4/4
DISPATCHER: wait record:0, ready batch:0, shardTime:nullms/record
forward_slot0 batchAccumulate: 0, recordAccumulate: 0
queue_slot1 batchAccumulate: 0, recordAccumulate: 0
heap:620M/1945M, noHeap:52M/488M, threadCount:18, cpu:0.054, sysCpu:51.555
ParNew(count:0, cost:0) ConcurrentMarkSweep(count:0, cost:0)

Notes:

  1. SOURCE RPS/IOPS/DELAY

  2. SINK RPS/TPS(RecordBatch/s)/IOPS/DELAY

  3. SINK_TIME: execute_time indicates the execution time of each record and commit_time indicates the batch execution time of multiple records.

  4. SINK_SLOW_ROUTES: the information about SINK_ROUTES with a long execution time in internal statistics. SINK_ROUTES indicates units that support parallel writes. Example: partitions in a Kafka instance, shards in a DataHub instance, and queues in a RocketMQ instance.

  5. SINK_THREAD: Number of active sinkThreads/Maximum number of sinkThreads. If a few sinkThreads exist, the destination is idle and does not reach a bottleneck.

  6. DISPATCHER indicates the conditions of the intermediate queue, wait record indicates the number of messages waiting to be allocated, and ready batch indicates the number of records to be executed by sinkThreads.

    If many wait records exist, a large number of messages exist, which may be affected by garbage collection (GC).

    If a large number of records are to be executed, the destination encounters a bottleneck. In this case, you can try to increase the write speed of the destination. For example, you can increase threads.

  7. {Name of the framework queue} batchAccumulate: {Number of accumulated records for batch execution}, recordAccumulate: {Number of accumulated records}

    1. If batchAccumulate of the first queue is empty, the source encounters a bottleneck and no messages can be received by the source.

    2. If batchAccumulate of the last queue is full, the record dispatcher (conflict matrix/hash) encounters a bottleneck. In this case, you must view the metrics of the dispatcher.

  8. Heap memory usage/Maximum heap memory size, non-heap memory usage/maximum non-heap memory size, number of threads, process CPU && system CPU

  9. {Time} {youngGcName}(count:{Accumulated number of times}, cost:{Accumulated consumed time}) {fullGcName}(count:{Accumulated number of times}, cost:{Accumulated consumed time})

Diagnose Incr-Sync or Full-Import

Run the ./connector_utils.sh diagnose command in the directory of Incr-Sync or Full-Import to diagnose Incr-Sync or Full-Import.

./connector_utils.sh diagnose -s 'YYYY-MM-DDTHH:mm:ss' -e 'YYYY-MM-DDTHH:mm:ss'

In the preceding command, the -s and -e options are optional. -s specifies the start time of log analysis and -e specifies the end time. Specify the timestamp in the 'YYYY-MM-DDTHH:mm:ss' format, such as '2023-06-01T12:00:00'.

By default, the analysis period of the ./connector_utils.sh diagnose command is 10 minutes. The default value of the -e option is the current time.

Command output:

[Metrics]
TPS: [last:345,avg:277.28,p99:911.00]
RPS: [last:106,avg:257.08,p99:968.00]
IOPS: [last:2KB,avg:21.33KB]
EXECUTE_TIME: [last:34ms,avg:220.44ms,p99:783.00ms]
SINK_DELAY: [last:19ms,avg:260.31ms,p99:819.00ms]
SOURCE_DELAY: [
 source_subtopic2_source_delay: [last:702ms,avg:525.00ms,p99:986.00ms]
 source_subtopic1_source_delay: [last:14ms,avg:490.69ms,p99:973.00ms]
]
QUEUE_BATCH_ACCUMULATE: [
 frame_queue_slot_1.batchAccumulate: [last:420,avg:496.00,p99:975.00]
 frame_queue_slot_2.batchAccumulate: [last:310,avg:470.05,p99:975.00]
]
JVM-MEM: heap:34.28M/3641M, noHeap:19.38M/0M]
THREAD: [count:4, sink:14/16]
CPU: [last:17,avg:27.95,p99:62.00]
[Pref]
sink block: true
youngGc: true
[Suggest]
config[coordinator.shuffleMinBatchSize]:20 to 40
config[coordinator.shuffleMaxBatchSize]:40 to 80
jvm to: -Xmx4096m

Notes:

  1. The metrics are used for making judgments.

  2. The pref information indicates the bottleneck that you can analyze based on the metrics.

  3. The suggest information indicates the points for improvement. In the preceding example, you can modify the coordinator.shuffleMinBatchSize, coordinator.shuffleMaxBatchSize, and jvm to parameters.

workerNum

  • If workerNum is set to the maximum value, the values of the executeTime and commitTime parameters of the destination in the metrics logs are acceptable.

    • Migration: Modify JDBCWriter.worker_num to increase the value of workerNum based on the resources on the server.

    • Synchronization: Modify sink.workerNum to increase the value of workerNum based on the resources on the server. Perform the following steps to change the value of workerNum: On the details page of the project, choose Component Monitoring > Connector Update. Add the workerNum parameter at the destination, enter a value, and then click Update.

  • workerNum is not set to the maximum value. In metrics logs, the GC time between two operations is very long.

    Migration: The default value of JDBCWriter.tx_records_limit is 128. Decrease this parameter to a small value, such as 4.

  • When the value of workerNum is 1 or 2, the conflictKey or deepSize keyword is recorded in the connector.log file.

    Migration: Add JDBCWriter.coordinatorFile.hotKeyMerge = true.

Excessively long GC time

Description

If Young GC (YGC) per second exceeds 300 ms and Full GC (FGC) happens each second, it is considered that the GC time is excessively long.

View GC details

In the task directory, run the following command to view the GC details of each second:

/opt/alibaba/java/bin/jstat -gcutil `cat task.pid` 1s 
  • Increase the JVM memory. Change the value of coordinator.connectorJvmParam to -Xms12g.

    Here -Xmx16g is for reference only. You must adjust the value based on the memory of the current server. If the parameter is already used, you can remove -Xmn.

  • Synchronization && Full: Decrease the value of coordinator.bridgeQueueSize. The default value is 256. You can decrease the value to 32.

  • Kafka synchronization: Set sink.lingerMs to 1.

  • Limit the memory by specifying coordinator.throttleMemoryBound={bytes}. We recommend that you specify this parameter to 1/4 of the maximum memory size.

    For example, if the maximum heap memory is 16 GB, we recommend that you set this parameter value to 16 × 1024 × 1024 × 1/4 = 4294967296.

  • If dispatcherClassName in conf/coordinator.json or conf_new/coordinator.json is ShuffleRecordDispatcher, you can modify the following parameters of the coordinator:

    • maxRecordCapacity = 1000 specifies the total number of dispatcher queues. The default value is subject to shuffleMinBatchSize *(shuffleBucketSize* 1.5) = 3840.

    • shuffleBucketSize=32 specifies to decrease the number of batch buckets. The default value is 128.

    • shuffleFlushIntervalMs=10 specifies to push data to the destination more quickly.

  • Increase the value of sink.workerNum. The default value is 16. You can increase the value to 64.

Notice

Batch-related parameters apply only to the incremental synchronization link of a non-database destination.

GC information:

  1. When GC is not seriously affected, increase the number of batches to the extent supported by the destination data source.

  2. Decrease the number of batches during GC verification.

    • maxRecordCapacity: the maximum number of batch queues. Default value: 16000.

    • shuffleMinBatchSize: the minimum number of batches. Default value: 20.

    • shuffleMaxBatchSize: the maximum number of batches. Default value: 64.

    • shuffleFlushIntervalMs: The refresh time. Default value: 100ms.

    • shuffleBucketMaxInFlightBatchSize: the maximum number of ongoing batches allowed in each concurrent operation. The default value is 1 for incremental migration and is not limited for full migration.

    • shuffleBucketSize: the maximum number of concurrent batches.

Data is delivered when the shuffleMaxBatchSize || shuffleFlushIntervalMs condition is met and the write threads are capable of writing.

Latency information required

If the OMS version is V3.4.0 or later, the information of ./connector_utils.sh metrics executed in tasks is required.

If the OMS version is 3.3.1 or later, the following information is required:

  1. Link type: full migration or incremental migration

  2. Destination type

  3. Whether the store has a latency

  4. Currently configured parameters

  5. Current RPS: View the sink.rps and sink_execute_time information in the metrics.log file.

  6. Current number of intermediate queues: View batchAccumulate, recordAccumulate, ready_execute_batch_size, and wait_dispatch_record_size in the metrics.log file.

  7. GC information: Run the /opt/alibaba/java/bin/jstat -gcutil cat task.pid 1s command in the task directory to view the GC information of each second.

Contact Us