Parallel execution is a strategy to optimize SQL query tasks. It splits a query task into multiple subtasks and allows them to run on multiple processors in parallel to improve the execution efficiency of the whole query task.
In current computer systems, multi-core processors, multithreading, and high-speed network connections are widely used, which makes parallel execution an efficient query technology. This technology significantly reduces the response time of compute-intensive large queries, and comes in handy in fields such as batch data import/export and quick index table creation. It is widely applied in business scenarios such as offline data warehouses, real-time reports, and online big data analytics.
Parallel execution significantly improves the query performance in the following scenarios:
- Scan of or connection to large tables, and sorting or aggregation of a large amount of data
- DDL operations on large tables, such as changing the primary key or column type and creating indexes
- Table creation based on existing data, such as creating a table by using the
CREATE TABLE AS SELECTstatement - Batch data insertion, deletion, and updates
Application scenarios
In addition to analytical systems such as offline data warehousing, real-time report, and online big data analytics systems, parallel execution can also be used to accelerate DDL operations and batch data processing in the online transaction processing (OLTP) field.
Parallel execution makes full use of multiple CPU cores and I/O resources to reduce the SQL execution time. Parallel execution is superior to serial execution in the following circumstances:
- Large amount of data to access
- Low SQL query concurrency
- Low requirements on the latency
- Sufficient hardware resources
Parallel execution uses multiple processors to concurrently handle the same task. This can improve the performance in the following system circumstances:
- Symmetric multiprocessing (SMP) system and cluster
- Sufficient I/O bandwidth
- More memory resources than needed, which can be used for memory-intensive operations such as sorting and hash table creation
- Appropriate system load or system load with peak-valley characteristics, such as a system load that remains below 30%
If your system does not have the preceding characteristics, parallel execution cannot significantly improve the performance. It can result in poor performance in a system with a high load, small memory size, or insufficient I/O bandwidth.
Parallel execution does not have special requirements on the hardware. However, the number of CPU cores, memory size, storage I/O performance, and network bandwidth can affect the parallel execution performance. If any of the factors becomes a bottleneck, the overall performance can be affected.
Technical mechanism
Parallel execution splits an SQL query task into multiple subtasks and schedules these subtasks on multiple processors to improve the execution efficiency.
When a parallel execution plan is generated for an SQL query, the query is executed in the following steps:
- The main thread, which is responsible for receiving and parsing SQL queries, allocates the worker threads required for parallel execution in advance. These worker threads may involve clusters on multiple servers.
- The main thread enables the parallel execution (PX) coordinator.
- The PX coordinator parses the execution plan into multiple steps and schedules the steps from bottom up. Each operation is designed to be eligible for parallel execution.
- After all operations are executed in parallel, the PX coordinator receives the calculation results and transfers the results to the upper-layer operator (such as the Aggregate operator) for serial execution of operations ineligible for parallel execution, such as the final SUM operation.
Granules of parallel execution
The basic working unit for parallel data scan is called a granule.
OceanBase Database divides a table scan task into multiple granules. Each granule describes a part of the table scan task. A granule covers the data of a single partition. Therefore, the data of each partition corresponds to an independent scan task. In other words, each granule corresponds to one partition scan task.
Two types of granules are supported:
Partition granule
A partition granule describes a whole partition. Therefore, the number of partition granules of a scan task is equal to the number of partitions involved in the scan task. Here, a partition can be one in a primary table or an index table. Partition granules are commonly used in partition-wise joins to ensure that the corresponding partitions of the two tables are processed based on partition granules.
Block granule
A block granule describes a segment of continuous data in a partition. Generally, block granules are used to divide data in a data scan scenario. Each partition is divided into multiple blocks. The blocks are concatenated based on specific rules to form a task queue, which will be consumed by PX worker threads.
The following figure shows the technical mechanism of block granules.

Given a degree of parallelism (DOP), the optimizer will automatically choose to divide data into partition granules or block granules for balancing of scan tasks. If the optimizer chooses block granules, the parallel execution framework dynamically divides data into blocks during running and ensures that each block is of an appropriate size, which is not too large or small. An excessively large block size can lead to data skew, where some threads cannot be fully used. An excessively small size can lead to frequent scan, which increases the switching overhead.
After partition granules are divided, each granule corresponds to a scan task. The TABLE SCAN operator handles the scan tasks one by one.
Parallel execution model
Producer-consumer pipeline model
The producer-consumer model is used for pipelined execution.
The PX coordinator parses the execution plan into multiple steps. Each step is called a data flow operation (DFO).
Generally, the PX coordinator starts two DFOs at the same time. The two DFOs are connected in producer-consumer mode for parallel execution. This is called inter-DFO parallel execution. Each DFO is executed by a group of threads. This is called intra-DFO parallel execution. The number of threads used for a DFO is called DOP.
A consumer DFO in a phase will become a producer DFO in the next phase. Under the coordination by the PX coordinator, the consumer DFO and producer DFO are started at the same time. The following figure shows the process in the producer-consumer model.
- The data generated by DFO A is transmitted in real time to DFO B for calculation.
- After calculation, DFO B stores the data in the current thread and waits for the upper-layer DFO C to start.
- When DFO B is notified that DFO C has been started, it switches to the producer role and starts to transmit data to DFO C. After DFO C receives the data, it starts calculation.
In the following example, the execution plan of the SELECT statement first performs a full-table scan on the game table, performs summation by team based on the team table, and finally calculates the total score of the team table.
CREATE TABLE game (round INT PRIMARY KEY, team VARCHAR(10), score INT)
PARTITION BY HASH(round) PARTITIONS 3;
INSERT INTO game VALUES (1, "CN", 4), (2, "CN", 5), (3, "JP", 3);
INSERT INTO game VALUES (4, "CN", 4), (5, "US", 4), (6, "JP", 4);
SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
obclient> EXPLAIN SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
obclient> EXPLAIN SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
+---------------------------------------------------------------------------------------------------------+
| Query Plan |
+---------------------------------------------------------------------------------------------------------+
| ================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |4 | |
| |1 | EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | HASH GROUP BY | |1 |4 | |
| |3 | EXCHANGE IN DISTR | |3 |3 | |
| |4 | EXCHANGE OUT DISTR (HASH)|:EX10000|3 |3 | |
| |5 | HASH GROUP BY | |3 |2 | |
| |6 | PX BLOCK ITERATOR | |1 |2 | |
| |7 | TABLE SCAN |game |1 |2 | |
| ================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([game.team], [T_FUN_SUM(T_FUN_SUM(game.score))]), filter(nil), rowset=256 |
| group([game.team]), agg_func([T_FUN_SUM(T_FUN_SUM(game.score))]) |
| 3 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| 4 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| (#keys=1, [game.team]), dop=3 |
| 5 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| group([game.team]), agg_func([T_FUN_SUM(game.score)]) |
| 6 - output([game.team], [game.score]), filter(nil), rowset=256 |
| 7 - output([game.team], [game.score]), filter(nil), rowset=256 |
| access([game.team], [game.score]), partitions(p[0-2]) |
| is_index_back=false, is_global_index=false, |
| range_key([game.round]), range(MIN ; MAX)always true |
+---------------------------------------------------------------------------------------------------------+
29 rows in set
The following figure shows the execution process of the previous example.
The figure shows that six threads are used for the query. The execution process is described as follows:
- Step 1: The first three threads are responsible for scanning the
gametable. They separately aggregate thegame.teamdata. - Step 2: The rest three threads are responsible for the final aggregation of the data aggregated in the first three threads.
- Step 3: The PX coordinator returns the final aggregation results to the client.
The data sent from Step 1 to Step 2 is hashed by using the game.team field to determine the thread to which the aggregated data is to be sent.
Data distribution methods between the producer and the consumer
A data distribution method is the method used by a group of PX worker threads (producers) to send data to another group of worker threads (consumers). The optimizer selects the optimal data redistribution method based on a series of optimization strategies to achieve the optimal performance.
General data distribution methods in parallel execution are described as follows:
Hash distribution
In the hash distribution method, the producer hashes the data based on the distribution key and obtains the modulus to determine the consumer worker thread to which the data is to be sent. In hash distribution, data can be evenly distributed to multiple consumer worker threads.
Pkey distribution
In pkey distribution, the producer determines through calculation the partition in the target table to which a data row belongs and sends the row data to a consumer thread responsible for this partition. The pkey distribution method is commonly used in partial partition-wise joins. The data at the consumer end can be directly used for partition-wise joins with that at the producer end without redistribution, thereby reducing network communication and improving the performance.
Pkey hash distribution
In pkey hash distribution, the producer first calculates the partition in the target table to which a data row belongs and hashes the data based on the distribution key to determine the consumer thread to which the data is to be sent.
Pkey hash distribution is commonly used in parallel DML scenarios where a partition can be concurrently updated by multiple threads. Pkey hash distribution can ensure that rows with identical values are processed by the same thread and that rows with different values are evenly distributed to multiple threads.
Broadcast distribution
In broadcast distribution, the producer sends all data rows to each consumer thread so that each consumer thread has the full data of the producer. In broadcast distribution, data is copied from small tables to all nodes involved in a join. Then, joins are executed locally to reduce network communication.
Broadcast to host distribution
In broadcast to host distribution, the producer sends all rows to each consumer node so that each consumer node has the full data of the producer. Then, the consumer threads on each node process the data in a collaborative manner.
Broadcast to host distribution is commonly used in
NESTED LOOP JOINandSHARED HASH JOINscenarios. In aNESTED LOOP JOINscenario, each consumer thread obtains a part of the shared data as the driver data for the join operation on the target table. In aSHARED HASH JOINscenario, the consumer threads jointly build a hash table based on the shared data. This avoids the situation where each consumer thread independently builds a hash table identical to that of others, thereby reducing the overhead.Range distribution
In range distribution, the producer divides data into ranges and different consumer threads process data of different ranges. Range distribution is commonly used in sorting scenarios. Each consumer thread only needs to sort the data allocated to it. This ensures that the data is globally ordered.
Random distribution
In random distribution, the producer randomly scatters the data and sends the data to the consumer threads so that each consumer thread processes an almost equal amount of data, thereby achieving load balancing. Random distribution is commonly used in multithreaded parallel
UNION ALLscenarios, where data is scattered only for load balancing and the scattered data is not associated.Hybrid hash distribution
Hybrid hash distribution is an adaptive distribution method used in join operations. Based on collected statistics, OceanBase Database provides a group of parameters to define regular values and frequent values. In hybrid hash distribution, hash distribution is used for regular values on both sides of a join, broadcast distribution is used for frequent values on the left side, and random distribution is used for frequent values on the right side.
Data transmission mechanism between the producer and the consumer
The two DFOs concurrently started by the PX coordinator are connected in producer-consumer mode for parallel execution. A transmission network is required for transmitting data between the producer and the consumer.
For example, if the producer DFO uses two threads (DOP = 2) for data scan and the consumer DFO uses three threads (DOP = 3) for data aggregation, each producer thread creates three virtual links to the consumer threads. Totally six virtual links are created. The following figure shows the transmission mechanism.

The created virtual transmission network is called the data transfer layer (DTL). In the parallel execution framework of OceanBase Database, all control messages and data rows are transmitted over the DTL. Each worker thread can establish thousands of virtual links, providing high scalability. The DTL also provides features such as data buffering, batch data sending, and automatic throttling.
If the two ends of the DTL are on the same node, the DTL transfers messages through memory copy. If the two ends of the DTL are on different nodes, the DTL transfers messages through network communication.
Worker threads
Two types of threads are used in parallel queries: one main thread and multiple PX worker threads. The main thread uses the same thread pool as normal transaction processing (TP) queries. PX worker threads come from a dedicated thread pool.
OceanBase Database uses a dedicated thread pool model to allocate PX worker threads. Each tenant has a dedicated PX thread pool on each of its nodes. All PX worker threads are allocated from this thread pool.
Before the PX coordinator schedules each DFO, it requests threads from the thread pool. After a DFO is executed, the threads for the DFO are immediately released.
The initial size of the thread pool is 0. It can be dynamically scaled out without an upper limit. To avoid excessive idle threads, the thread pool introduces the automatic reclamation mechanism. For any thread:
- If the thread is left idle for more than 10 minutes and the number of remaining threads in the thread pool exceeds 8, the thread will be reclaimed and destroyed.
- If the thread is left idle for more than 60 minutes, it will be destroyed unconditionally.
Theoretically, the thread pool has no upper limit in size. However, the following mechanisms actually contribute to an upper limit:
- Threads must be requested from the Admission module before parallel execution. Parallel execution can start only after threads are successfully requested. This mechanism can limit the number of parallel queries. For more information about the Admission module, see Concurrency control and queuing.
- At most N threads can be requested from the resource pool at a time, where N = Value of
MIN_CPUof the unit config for the resource units of the tenant × Value ofpx_workers_per_cpu_quota. At most N threads are allocated even if more than N threads are requested.px_workers_per_cpu_quotais a tenant-level parameter and is10by default. For example, if the DOP of a DFO is 100, the DFO needs to request 30 threads from node A and 70 threads from node B. Assuming that the value ofMIN_CPUof the unit config is4and the value ofpx_workers_per_cpu_quotais10, N = 4 ×10 = 40. The DFO can actually request 30 threads from node A and 40 threads from node B. Its actual DOP is 70.
Performance optimization through load balancing
To achieve the optimal performance, allocate the same number of tasks to each worker thread as far as possible.
If data is divided based on block granules, the tasks are dynamically allocated to worker threads. This can minimize the imbalance in workloads. In other words, the workload of each worker thread does not obviously exceed those of others.
If data is divided based on partition granules, you can optimize the performance by ensuring that the number of tasks is an integral multiple of the number of worker threads. This is very useful for partition-wise joins and parallel DML.
Assume that a table has 16 partitions and that the amount of data in each partition is almost the same. You can use 16 worker threads (DOP = 16) to finish the job with 1/16 of the time otherwise required, five worker threads to finish the job with 1/5 of the time otherwise required, or two threads to finish the job with half the time otherwise required. However, if you use 15 worker threads to process the data of 16 partitions, the first thread will start to process the data of the 16th partition after it finishes processing the data of the first partition. Other threads will become idle after they finish processing the data of their respective allocated partition. If the amount of data in each partition is close, this configuration will result in poor performance. If the amount of data in each partition varies, the actual performance depends on the actual situation.
Similarly, assume that you use six threads to process the data of 16 partitions and that each partition has a close amount of data.
Each thread will start to process the data of a second partition after it finishes processing the data of the first partition. However, only four threads will process the data of a third partition while the other two threads will become idle.
Given N partitions and P worker threads, you cannot simply calculate the time required for parallel execution by dividing N by P. You need to consider the situation where some threads may need to wait for other threads to complete data processing for the last partition. You can specify an appropriate DOP to minimize the imbalance in workloads and optimize the performance.
Inapplicable scenarios
Parallel execution is inapplicable in the following scenarios:
Typical SQL queries in the system are executed within milliseconds.
A parallel query has a millisecond-level scheduling overhead. For a short query, the benefit of parallel execution may be neutralized by the scheduling overhead.
The system load is high.
Parallel execution is designed to make full use of idle system resources. For a system with a high load, parallel execution may fail to bring extra benefits but compromise the overall system performance.
In serial execution, a single thread is used to execute database operations. Serial execution is preferred to parallel execution in the following circumstances:
- A small amount of data to access
- High concurrency
- A query execution time less than 100 ms
Parallel execution is partially inapplicable in the following circumstances:
- The top-layer DFO does not require parallel execution. It interacts with the client and executes top-layer operations that do not require parallel execution, such as
LIMITandPX COORDINATORoperations. - A DFO that contains the user-defined function (UDF)
TABLEcan only be executed in serial. Other DFOs can still be executed in parallel. - Parallel execution is inapplicable to general
SELECTand DML statements in an OLTP system.