Terms
TPS
The number of messages pulled by the source per second.
Latency
The latency of the current task, in seconds. Security checkpoints are not considered when the latency is calculated.
ReadQ
In asynchronous transmission, the intermediate framework obtains data from ReadQ and writes the data to the destination. The data of the current process that is not obtained by the intermediate framework will be cached in ReadQ. The default maximum value of ReadQ is 4096. If the value of ReadQ is small, the source has no data or a bottleneck exists in pulling data from the source.
WriteConsume
The time required to write
{batch.size}, in milliseconds. A smallerWriteConsumevalue indicates higher performance of the destination.
Query metrics
Run the ./connector_utils.sh metrics command to query metrics.
./connector_utils.sh metrics
2022-09-22 12:49:48.876
SOURCE: [RPS:0.0, IOPS:0.0M, delay:1489ms]
SINK: [RPS:0.0, TPS:0.0, IOPS:0.0M, delay:2986440ms]
SINK_TIME: [execute_time:0.0ms/record, commit_time:0.0ms/batch]
SINK_SLOW_ROUTES:
SINK_THREAD: 4/4
DISPATCHER: wait record:0, ready batch:0, shardTime:nullms/record
forward_slot0 batchAccumulate: 0, recordAccumulate: 0
queue_slot1 batchAccumulate: 0, recordAccumulate: 0
heap:620M/1945M, noHeap:52M/488M, threadCount:18, cpu:0.054, sysCpu:51.555
ParNew(count:0, cost:0) ConcurrentMarkSweep(count:0, cost:0)
Usage notes:
SOURCE RPS/IOPS/DELAY
SINK RPS/TPS(RecordBatch/s)/IOPS/DELAY
SINK_TIME: execute_time indicates the execution time of each record and commit_time indicates the batch execution time of multiple records.
SINK_SLOW_ROUTES: the information about SINK_ROUTES with a long execution time in internal statistics. SINK_ROUTES indicates units that support parallel writes. Example: partitions in a Kafka instance, shards in a DataHub instance, and queues in a RocketMQ instance.
SINK_THREAD: Number of active sinkThreads/Maximum number of sinkThreads. If a few sinkThreads exist, the destination is idle and does not reach a bottleneck.
DISPATCHER indicates the conditions of the intermediate queue, wait record indicates the number of messages waiting to be allocated, and ready batch indicates the number of records to be executed by sinkThreads.
If many wait records exist, a large number of messages exist, which may be affected by garbage collection (GC).
If a large number of records are to be executed, the destination encounters a bottleneck. In this case, you can try to increase the write speed of the destination. For example, you can increase threads.
{Name of the framework queue} batchAccumulate: {Number of accumulated records for batch execution}, recordAccumulate: {Number of accumulated records}
If batchAccumulate of the first queue is empty, the source encounters a bottleneck and no messages can be received by the source.
If batchAccumulate of the last queue is full, the record dispatcher (conflict matrix/hash) encounters a bottleneck. In this case, you must view the metrics of the dispatcher.
Heap memory usage/Maximum heap memory size, non-heap memory usage/maximum non-heap memory size, number of threads, process CPU && system CPU
{Time} {youngGcName}(count:{Accumulated number of times}, cost:{Accumulated consumed time}) {fullGcName}(count:{Accumulated number of times}, cost:{Accumulated consumed time})
Incr-Sync/Full-Import diagnostics
Run the ./connector_utils.sh diagnose command in the directory of Incr-Sync/Full-Import to diagnose Incr-Sync/Full-Import.
./connector_utils.sh diagnose
Parameter
-s: Optional. The start time of log analysis, which is 10 minutes ago by default.
-e: Optional. The end time of log analysis, which is the current time by default.
Command output:
[Metrics]
TPS: [last:345,avg:277.28,p99:911.00]
RPS: [last:106,avg:257.08,p99:968.00]
IOPS: [last:2KB,avg:21.33KB]
EXECUTE_TIME: [last:34ms,avg:220.44ms,p99:783.00ms]
SINK_DELAY: [last:19ms,avg:260.31ms,p99:819.00ms]
SOURCE_DELAY: [
source_subtopic2_source_delay: [last:702ms,avg:525.00ms,p99:986.00ms]
source_subtopic1_source_delay: [last:14ms,avg:490.69ms,p99:973.00ms]
]
QUEUE_BATCH_ACCUMULATE: [
frame_queue_slot_1.batchAccumulate: [last:420,avg:496.00,p99:975.00]
frame_queue_slot_2.batchAccumulate: [last:310,avg:470.05,p99:975.00]
]
JVM-MEM: heap:34.28M/3641M, noHeap:19.38M/0M]
THREAD: [count:4, sink:14/16]
CPU: [last:17,avg:27.95,p99:62.00]
[Pref]
sink block: true
youngGc: true
[Suggest]
config[coordinator.shuffleMinBatchSize]:20 to 40
config[coordinator.shuffleMaxBatchSize]:40 to 80
jvm to: -Xmx4096m
Usage notes:
The metrics are used for making judgments.
The Pref information indicates the bottleneck that you can analyze based on the metrics.
The Suggest information indicates the points for improvement. In the preceding example, you can modify the
coordinator.shuffleMinBatchSize,coordinator.shuffleMaxBatchSize, and jvm to parameters.
workerNum
If
workerNumis set to the maximum value, the values of theexecuteTimeandcommitTimeparameters of the destination in the metrics logs are acceptable.Migration: Modify
JDBCWriter.worker_numto increase the value ofworkerNumbased on the resources on the server.Synchronization: Modify
sink.workerNumto increase the value ofworkerNumbased on the resources on the server. Perform the following steps to change the value ofworkerNum: On the details page of the project, choose Component Monitoring > Connector Update. Add the workerNum parameter at the destination, enter a value, and then click Update.
workerNumis not set to the maximum value. In metrics logs, the GC time between two operations is very long.Migration: The default value of
JDBCWriter.tx_records_limitis128. Decrease this parameter to a small value, such as4.When the value of
workerNumis1or2, theconflictKeyordeepSizekeyword is recorded in theconnector.logfile.Migration: Add
JDBCWriter.coordinatorFile.hotKeyMerge = true.
Excessively long GC time
Note
If Young GC (YGC) per second exceeds 300 ms and Full GC (FGC) happens each second, it is considered that the GC time is excessively long.
View GC details
In the task directory, run the following command to view the GC details of each second:
/opt/alibaba/java/bin/jstat -gcutil `cat task.pid` 1s
Increase the JVM memory. Change the value of
coordinator.connectorJvmParamto-Xms12g.Here
-Xmx16gis for reference only. You must adjust the value based on the memory of the current server. If the parameter is already used, you can remove-Xmn.Synchronization && Full: Decrease the value of
coordinator.bridgeQueueSize. The default value is256. You can decrease the value to32.Kafka synchronization: Set
sink.lingerMsto 1.Limit the memory by specifying
coordinator.throttleMemoryBound={bytes}. We recommend that you specify this parameter to 1/4 of the maximum memory size.For example, if the maximum heap memory is 16 GB, we recommend that you set this parameter value to
16 × 1024 × 1024 × 1/4 = 4294967296.If
dispatcherClassNameinconf/coordinator.jsonorconf_new/coordinator.jsonisShuffleRecordDispatcher, you can modify the following parameters of thecoordinator:maxRecordCapacity = 1000specifies the total number ofdispatcherqueues. The default value is subject toshuffleMinBatchSize *(shuffleBucketSize* 1.5) = 3840.shuffleBucketSize=32specifies to decrease the number of batch buckets. The default value is128.shuffleFlushIntervalMs=10specifies to push data to the destination more quickly.
Increase the value of
sink.workerNum. The default value is16. You can increase the value to64.
Batch-related parameters
Notice
Batch-related parameters apply only to the incremental synchronization link of a non-database destination.
GC information:
When GC is not serious, increase the number of batches within the capacity of the destination.
Decrease the number of batches during GC verification.
maxRecordCapacity: the maximum number of batch queues. Default value:16000.shuffleMinBatchSize: the minimum number of batches. Default value:20.shuffleMaxBatchSize: the maximum number of batches. Default value:64.shuffleFlushIntervalMs: The refresh time. Default value:100ms.shuffleBucketMaxInFlightBatchSize: the maximum number of ongoing batches allowed in each concurrent operation. The default value is1for incremental migration and is not limited for full migration.shuffleBucketSize: the maximum number of concurrent batches.
Data is delivered when the shuffleMaxBatchSize || shuffleFlushIntervalMs condition is met and the write threads are capable of writing.
Latency information required
If the OMS version is V3.4.0 or later, the information of ./connector_utils.sh metrics executed in tasks is required.
If the OMS version is 3.3.1 or later, the following information is required:
Link type: full migration or incremental migration
Destination type
Whether the store has a latency
Currently configured parameters
Current RPS: View the
sink.rpsandsink_execute_timeinformation in themetrics.logfile.Current number of intermediate queues: View
batchAccumulate,recordAccumulate,ready_execute_batch_size, andwait_dispatch_record_sizein themetrics.logfile.GC information: Run the /opt/alibaba/java/bin/jstat -gcutil
cat task.pid1s command in thetaskdirectory to view the GC information of each second.