Terms
TPS
The number of messages retrieved per second at the source.
Latency
The latency of the current task. The unit is second. The latency calculation does not include the safe point.
ReadQ
When the data is asynchronously transmitted, the intermediate framework obtains data from ReadQ and writes the data to the destination. The data not obtained by the intermediate framework in the current process is cached in ReadQ. The default maximum value of ReadQ is 4096. If the value is small, it indicates that there may be no data at the source or that the source data retrieval is limited.
WriteConsume
The time of writing
{batch.size}data, in milliseconds. The lower the value ofWriteConsume, the better the performance at the destination.
Query metrics information
Run the ./connector_utils.sh metrics command to query metrics information.
./connector_utils.sh metrics
2022-09-22 12:49:48.876
SOURCE: [RPS:0.0, IOPS:0.0M, delay:1489ms]
SINK: [RPS:0.0, TPS:0.0, IOPS:0.0M, delay:2986440ms]
SINK_TIME: [execute_time:0.0ms/record, commit_time:0.0ms/batch]
SINK_SLOW_ROUTES:
SINK_THREAD: 4/4
DISPATCHER: wait record:0, ready batch:0, shardTime:nullms/record
forward_slot0 batchAccumulate: 0, recordAccumulate: 0
queue_slot1 batchAccumulate: 0, recordAccumulate: 0
heap:620M/1945M, noHeap:52M/488M, threadCount:18, cpu:0.054, sysCpu:51.555
ParNew(count:0, cost:0) ConcurrentMarkSweep(count:0, cost:0)
The information is described as follows:
Source RPS, IOPS, and DELAY.
Sink RPS, TPS (RecordBatch/s), IOPS, and DELAY.
Sink_TIME: execute_time indicates the execution time of a record, and commit_time indicates the execution time of a recordBatch.
SINK_SLOW_ROUTES: the information about the slowly executed SINK_ROUTES in the internal statistics. A SINK_ROUTE is a parallel writing unit. For example, partitions in Kafka, shards in DataHub, and queues in RocketMQ.
Sink_THREAD: the number of active sink threads/maximum sink threads. A small number of sink threads indicates that the sink end is idle and has not reached a bottleneck.
DISPATCHER indicates the conditions of the intermediate queue, wait record indicates the number of messages waiting to be allocated, and ready batch indicates the number of records to be executed by sinkThreads.
If the number of wait records is large, it indicates that the number of messages is large and that garbage collection may be involved.
If the number of ready batches is large, it indicates that the sink end has a bottleneck. You can try to improve the sink write speed (for example, by increasing the number of threads).
{Framework-Queue-Name} batchAccumulate: {number of accumulated recordBatches}, recordAccumulate: {number of accumulated records}.
If batchAccumulate is empty in the first queue, no data has entered at the source end.
If batchAccumulate is full in the last queue, it indicates that a bottleneck exists in RecordDispatcher (conflict matrix/hashing).
Heap memory usage, heap memory maximum, nonheap memory usage, nonheap memory maximum, the number of threads, process CPU and system CPU usage.
{Time} {youngGcName}(count:{Cumulative times}, cost:{Cumulative duration}) {fullGcName}(count:{Cumulative times}, cost:{Cumulative duration}).
Diagnose Incr-Sync or Full-Import
Obtain the component ID of Incr-Sync or Full-Import.
Log in to the OMS Community Edition console.
In the left-side navigation pane, click Data Migration.
On the Migration Tasks page, click the name of the target data migration task to go to its details page.
In the upper-right corner of the page, click View Component Monitoring.
In the View Component Monitoring dialog box, view the Component ID of Incr-Sync Component or Full-Import Component.
Go to the directory of the Incr-Sync or Full-Import component.
Log in to the OMS Community Edition deployment server.
Go to the Docker container.
docker exec -it ${CONTAINER_NAME} bashRun the following command to enter the directory of the component:
cd /home/ds/run/${Component ID}
Run the
./connector_utils.sh diagnosecommand in the Incr-Sync or Full-Import directory../connector_utils.sh diagnose -s 'YYYY-MM-DDTHH:mm:ss' -e 'YYYY-MM-DDTHH:mm:ss'In this command,
-sand-eare optional parameters.-sindicates the start time of log analysis and-eindicates the end time of log analysis. The time format is 'YYYY-MM-DDTHH:mm:ss' (for example, '2023-06-01T12:00:00').By default,
./connector_utils.sh diagnoseis used for analysis for 10 minutes (the default value of-eis the current time).The return result is as follows:
[Metrics] TPS: [last:345,avg:277.28,p99:911.00] RPS: [last:106,avg:257.08,p99:968.00] IOPS: [last:2KB,avg:21.33KB] EXECUTE_TIME: [last:34ms,avg:220.44ms,p99:783.00ms] SINK_DELAY: [last:19ms,avg:260.31ms,p99:819.00ms] SOURCE_DELAY: [ source_subtopic2_source_delay: [last:702ms,avg:525.00ms,p99:986.00ms] source_subtopic1_source_delay: [last:14ms,avg:490.69ms,p99:973.00ms] ] QUEUE_BATCH_ACCUMULATE: [ frame_queue_slot_1.batchAccumulate: [last:420,avg:496.00,p99:975.00] frame_queue_slot_2.batchAccumulate: [last:310,avg:470.05,p99:975.00] ] JVM-MEM: heap:34.28M/3641M, noHeap:19.38M/0M] THREAD: [count:4, sink:14/16] CPU: [last:17,avg:27.95,p99:62.00] [Pref] sink block: true youngGc: true [Suggest] config[coordinator.shuffleMinBatchSize]:20 to 40 config[coordinator.shuffleMaxBatchSize]:40 to 80 jvm to: -Xmx4096mThe information is described as follows:
The metrics information is the judgment basis.
The pref information indicates the bottle-neck points analyzed based on the metrics information.
The suggest information indicates the optimization points. For example, you can update the
shuffleMinBatchSize,shuffleMaxBatchSize, andconnectorJvmParamparameters in thecoordinatorcomponent of Incr-Sync or Full-Import.
workerNum
The value of
workerNumhas reached the upper limit, and theexecuteTime(execution time) andcommitTime(commit time) in thesinklogs in themetricslogs are within the normal range.Go to the View Component Monitoring dialog box.
Click Update next to the target component.
In the Update Configuration dialog box, hover the pointer over the
sink>workerNumparameter and click the edit icon.If the parameter does not exist, hover the pointer over the blank space next to the
sinkparameter and click the add icon.Note
If you write data to the database by using the direct load mode, you can modify the
serverParallelparameter to adjust the concurrency of the direct load server. The default value is 8.Increase the value of
workerNumbased on the machine resources.Enter the modified parameter in the text box and click the confirmation icon.
In the Update Configuration dialog box, click OK.
The value of
workerNumhas not reached the upper limit, and the garbage collection (GC) time between two consecutivemetricslogs is very long.Go to the View Component Monitoring dialog box.
Click Update next to the target component.
In the Update Configuration dialog box, hover the pointer over the
source>splitThresholdparameter and click the edit icon.If the parameter does not exist, hover the pointer over the blank space next to the
sourceparameter and click the add icon.The default value of the
splitThresholdparameter is 128. Decrease the value.Enter the modified parameter in the text box and click the confirmation icon.
In the Update Configuration dialog box, click OK.
The value of
workerNumis only1or2, and theconflictKeyordeepSizekeyword is printed in theconnector.logfile.Go to the View Component Monitoring dialog box.
Click Update next to the target component.
In the Update Configuration dialog box, hover the pointer over the blank space next to the
coordinatorparameter and click the add icon.Enter
hotKeyMergeas the key name and click the checkmark icon.In the Update Configuration dialog box, find the new key name. The default value is NULL.
Hover the pointer over the new parameter, click the edit icon that appears, and change the parameter value to
true. Click the confirmation icon.In the Update Configuration dialog box, click OK.
GC time is too long
Note
GC time is too long means that the Young GC time exceeds 300ms per second and Full GC occurs every second.
View GC
Run the following command in the task directory to view the details of GC per second:
/opt/alibaba/java/bin/jstat -gcutil `cat task.pid` 1s
Increase the JVM memory by setting the
coordinator>connectorJvmParamparameter to-Xms12g -Xmx16g.This is only an example. You need to adjust the memory based on the current machine. If the
-Xmnparameter already exists, you can remove it.Reduce the
coordinator>bridgeQueueSizeparameter. The default value is256, and it can be reduced to32.Synchronize and synchronize all data: Set the
sink>lingerMsparameter to 1.Limit the memory by setting the
coordinator>throttleMemoryBoundparameter to the specified number of bytes. We recommend that you set this parameter to 1/4 of the maximum memory.For example, if the maximum heap memory is 16G, the value is
16 * 1024 * 1024 * 1/4 = 4294967296.If the
dispatcherClassNameparameter is set toShuffleRecordDispatcherin theconf/coordinator.jsonorconf_new/coordinator.jsonfile, you can modify the following parameters ofcoordinator:maxRecordCapacity = 1000specifies the total number of records in thedispatcherqueue. By default, it is calculated asshuffleMinBatchSize * (shuffleBucketSize * 1.5) = 3840.Set the
shuffleBucketSizeparameter to 32. This reduces the number of batches that can be accumulated.Set the
shuffleFlushIntervalMsparameter to 10 to accelerate data pushing to the Sink.
Increase the
sink>workerNumparameter. The default value is16, and the maximum value can be adjusted to64.
Parameters related to batch accumulation
Notice
Parameters related to batch accumulation apply only to incremental synchronization links with non-database targets.
Based on the GC situation:
When GC is not severe, you can increase the batch accumulation capability within the acceptable range at the destination.
When GC is severe, reduce batch accumulation.
maxRecordCapacity: the maximum number of records in the batch accumulation queue. Default value:16000.shuffleMinBatchSize: the minimum number of records in a batch. Default value:20.shuffleMaxBatchSize: the maximum number of records in a batch. Default value:64.shuffleFlushIntervalMs: the flush interval. Default value:100ms.shuffleBucketMaxInFlightBatchSize: the maximum number of batches that can be in flight for each concurrency. For incremental synchronization, the default value is1. For full synchronization, there is no limit.shuffleBucketSize: the maximum number of concurrent batch accumulations.
Data is pushed to the Sink only when the conditions of shuffleMaxBatchSize or shuffleFlushIntervalMs are met, provided that the write thread has write capability.
Information required for latency
Screenshots of the task latency, Store latency, and Incr-Sync latency.
As shown in the preceding figure, a data migration or data synchronization task has multiple latency concepts.
No. Latency type Description 1 Task latency - The Incr-Sync latency is used as the task latency for a data migration or data synchronization task. If multiple Incr-Sync latencies exist, the maximum Incr-Sync latency is used.
- The task latency and component latency are calculated based on different scheduling storage, so they may differ. For example, a large number of tasks or a long scheduling time may cause the task latency to be greater than the component latency. This only indicates the difference in latency time.
2 Store latency The time difference between the current time and the time when the Store component fetches change records is the Store latency. This time is calculated based on polling, which is typically 10~30 seconds. 3 Incr-Sync latency The time difference between the current time and the minimum change time of the record written to the destination is the Incr-Sync latency. This time is calculated based on polling. The polling time is specified in the time in the parentheses. The metrics logs of the Incr-Sync component. For more information, see the section about how to query the metrics information.
We recommend that you package the
logsandconfdirectories of the Incr-Sync component and provide them.