Parallel execution is an optimization strategy for SQL query tasks. It breaks down a query task into multiple subtasks that run simultaneously on multiple processor cores, thereby accelerating the entire query task.
Modern computer systems widely use multi-core processors, multi-threading, and high-speed network connections, making parallel execution an efficient query technique. This technique significantly reduces the response time of complex queries and applies to business scenarios such as offline data warehouses, real-time reporting, and online big data analysis. It also powers applications in areas such as batch data transfer and quick index table construction.
Parallel execution significantly enhances the performance of the following SQL query scenarios:
- Large table scans, large table joins, large-scale sorting, and aggregation
- DDL operations, such as modifying primary keys, changing column types, and creating indexes, on large tables
- Table creation (using
Create Table As Select) from large amounts of data - Batch data insertion, deletion, and updating
Scenarios
Parallel execution applies to not only analytical systems such as offline data warehouses, real-time reporting, and online big data analytics but also OLTP scenarios to speed up DDL operations and batch data processing.
Parallel execution aims to reduce SQL execution time by making full use of multiple CPU and I/O resources. When the following conditions are met, parallel execution outperforms serial execution:
- A large amount of data is accessed.
- The concurrency of SQL queries is low.
- The system has sufficient hardware resources.
- The requirement for latency is not high.
In a parallel execution scenario, multiple CPU cores in a system or cluster concurrently process the same task. This can improve performance if the following system conditions are met:
- The system is a symmetric multiprocessing (SMP) system or a cluster.
- The I/O bandwidth is sufficient.
- The system has abundant memory for memory-intensive operations, such as sorting and hash table construction.
- The system load is moderate or fluctuates within a small range (for example, the system load is usually below 30%).
If the system does not meet the preceding conditions, parallel execution might not significantly improve performance. In highly loaded systems with limited memory or I/O capabilities, parallel execution might even perform worse.
Parallel execution has no special hardware requirements. However, the CPU core count, memory size, storage I/O performance, and network bandwidth can affect parallel execution performance. If any of these becomes a bottleneck, the overall performance will be compromised.
Principle
Parallel execution decomposes an SQL query task into multiple subtasks and schedules them onto multiple processors for efficient concurrent execution.
Once an SQL query is parsed into a parallel execution plan, the execution process follows these steps:
- The SQL main thread (the thread responsible for receiving and parsing SQL) allocates the required thread resources for parallel execution in advance according to the shape of the plan. These thread resources may involve clusters across multiple machines.
- The SQL main thread enables the parallel scheduling operator (PX Coordinator).
- The parallel scheduling operator parses the plan, breaks it down into multiple operation steps, and schedules these steps in a bottom-up order. Each operation is designed to support maximum parallel execution.
- After all operations are executed in parallel, the parallel scheduling operator receives the computation results and passes them to the upper-level operator (e.g., the Aggregate operator) to serially complete the remaining non-parallelizable computations (e.g., the final SUM calculation).
Granule
In parallel data scanning, the basic unit of work is called a Granule.
OceanBase Database divides a table scan task into multiple granules, with each granule defining the scope of the scan task. Since each granule covers the data of only one partition, the data in each partition corresponds to one independent scan task. In other words, each granule corresponds to a scan task within a partition.
Based on the granularity characteristics of Granule, it can be divided into the following two categories:
Partition Granule
A partition granule describes a complete partition. If a scan task involves m partitions, it is divided into m partition granules, regardless of whether these partitions belong to the same table or are indexed partitions. Partition granules are most commonly used in partition-wise joins to ensure that corresponding partitions of the two tables are processed by partition granules.
Block Granule
A block granule describes a continuous range of data in a partition. In a data scan scenario, block granules are usually used to divide data. A partition is divided into several blocks, and these blocks are connected in a queue based on a specific rule. Parallel execution threads consume data in the queue.
The following figure illustrates the concept of block granules.

The optimizer automatically chooses whether to divide data into partition granules or block granules based on the specified degree of parallelism. If block granules are chosen, the runtime system will dynamically divide a block into sub-blocks to ensure that the sub-blocks are neither too large nor too small. A sub-block that is too large can cause data skew and lead to insufficient work for some threads. A sub-block that is too small results in frequent switchovers for scans, causing high overheads.
Once the partition granularity (namely, block granule) is determined, a scan task corresponding to each granule is generated. The table scan operator sequentially processes these scan tasks and completes them one by one until all tasks are executed.
Parallel execution model
Producer-consumer pipeline model
Parallel execution is implemented by using the producer-consumer pipeline model.
After a parallel scheduling operator parses an execution plan, the plan is divided into multiple operation steps. Each operation step is called a DFO (Data Flow Operation).
Usually, the parallel scheduling operator starts two DFOS simultaneously. These two DFOS are connected in a producer-consumer manner to enable parallel execution between them. Each DFO uses a group of threads to execute tasks, which is called intra-DFO parallel execution. The number of threads used by an DFO is called the DOP (Degree of Parallisim).
In the next stage, the consumer DFO becomes the producer DFO. Under the coordination of the parallel scheduling operator, the consumer DFO and the producer DFO are started simultaneously. The following figure shows the process of the DFOs in the producer-consumer model.
- The data generated by DFO A is transmitted to DFO B for processing in real time.
- After DFO B completes the processing, the data is stored in the current thread and waits for the upstream DFO C to start.
- After receiving the start notification from DFO C, DFO B changes its role to a producer and starts transmitting data to DFO C. After receiving the data, DFO C starts processing it.

In the following query example, the execution plan of the SELECT statement scans the entire game table first, then groups and sums the data by team in the team table, and finally calculates the total scores of each team.
CREATE TABLE game (round INT PRIMARY KEY, team VARCHAR(10), score INT)
PARTITION BY HASH(round) PARTITIONS 3;
INSERT INTO game VALUES (1, "CN", 4), (2, "CN", 5), (3, "JP", 3);
INSERT INTO game VALUES (4, "CN", 4), (5, "US", 4), (6, "JP", 4);
SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
obclient> EXPLAIN SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
obclient> EXPLAIN SELECT /*+ PARALLEL(3) */ team, SUM(score) TOTAL FROM game GROUP BY team;
+---------------------------------------------------------------------------------------------------------+
| Query Plan |
+---------------------------------------------------------------------------------------------------------+
| ================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |4 | |
| |1 | EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | HASH GROUP BY | |1 |4 | |
| |3 | EXCHANGE IN DISTR | |3 |3 | |
| |4 | EXCHANGE OUT DISTR (HASH)|:EX10000|3 |3 | |
| |5 | HASH GROUP BY | |3 |2 | |
| |6 | PX BLOCK ITERATOR | |1 |2 | |
| |7 | TABLE SCAN |game |1 |2 | |
| ================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(game.team, T_FUN_SUM(T_FUN_SUM(game.score)))]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([game.team], [T_FUN_SUM(T_FUN_SUM(game.score))]), filter(nil), rowset=256 |
| group([game.team]), agg_func([T_FUN_SUM(T_FUN_SUM(game.score))]) |
| 3 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| 4 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| (#keys=1, [game.team]), dop=3 |
| 5 - output([game.team], [T_FUN_SUM(game.score)]), filter(nil), rowset=256 |
| group([game.team]), agg_func([T_FUN_SUM(game.score)]) |
| 6 - output([game.team], [game.score]), filter(nil), rowset=256 |
| 7 - output([game.team], [game.score]), filter(nil), rowset=256 |
| access([game.team], [game.score]), partitions(p[0-2]) |
| is_index_back=false, is_global_index=false, |
| range_key([game.round]), range(MIN ; MAX)always true |
+---------------------------------------------------------------------------------------------------------+
29 rows in set
The following diagram shows the execution plan of the preceding query example:

As shown in the figure, the query actually uses six threads. The execution process and task allocation are as follows:
- Step 1: The first three threads scan the
gametable and pre-aggregate thegame.teamdata in each thread. - Step 2: The last three threads aggregate the pre-aggregated data.
- Step 3: The aggregated result is returned to the client by the parallel scheduler.
When the data from Step 1 is transmitted to Step 2, it is hashed by using the game.team field to determine the threads that receive the pre-aggregated data.
Data distribution between producers and consumers
Data distribution refers to the method used to send data from a group of worker threads (producers) that execute in parallel to another group of worker threads (consumers). The optimizer selects the optimal data distribution method to achieve the best performance by using a series of optimization strategies.
Common data distribution methods in parallel execution include:
HASH DISTRIBUTION
In HASH DISTRIBUTION, the producer threads hash and modulo the data rows based on the distribution key to determine the consumer threads to send the data rows to. Generally, HASH DISTRIBUTION can distribute data relatively evenly to multiple consumer threads.
PKEY DISTRIBUTION
In PKEY DISTRIBUTION, the producer threads calculate the partitions of the target tables for the data rows and send the data rows to the consumer threads that handle the partitions. PKEY DISTRIBUTION is commonly used in partial partition-wise join scenarios. In this scenario, the data does not need to be redistributed to the consumer side but can be partition-wise joined with the data from the producer side. This reduces the amount of network communication and improves the performance.
PKEY HASH DISTRIBUTION
In PKEY HASH DISTRIBUTION, the producer threads calculate the partitions of the target tables for the data rows and then hash the data rows based on the distribution key to determine the consumer threads to send the data rows to.
PKEY HASH DISTRIBUTION is commonly used in parallel DML scenarios. In such scenarios, multiple threads can concurrently update a partition. Therefore, the PKEY HASH DISTRIBUTION method is used to ensure that data rows with the same value are processed by the same thread, and those with different values are distributed to different threads as evenly as possible.
BROADCAST DISTRIBUTION
In BROADCAST DISTRIBUTION, the producer threads send each data row to all consumer threads, ensuring that each consumer thread has all the data from the producer side. BROADCAST DISTRIBUTION is commonly used to copy data from a small table to all nodes that will perform the join operation and then execute the join operation locally on each node to reduce the amount of network communication.
BC2HOST DISTRIBUTION
In BC2HOST DISTRIBUTION, the producer threads send each data row to all consumer nodes, ensuring that each consumer node has all the data from the producer side. Then, the consumer threads in the nodes work together to process the data.
BC2HOST DISTRIBUTION is commonly used in
NESTED LOOP JOINandSHARED HASH JOINscenarios. In theNESTED LOOP JOINscenario, each consumer thread obtains a part of the shared data as the driving data and uses it to perform the join operation on the target table; in theSHARED HASH JOINscenario, each consumer thread builds a hash table based on the shared data. This way, multiple threads can build one hash table together to avoid the unnecessary overhead of building the same hash table for each thread.RANGE DISTRIBUTION
In RANGE DISTRIBUTION, the producer threads divide the data into segments based on ranges and the consumer threads process data in different ranges. RANGE DISTRIBUTION is commonly used in sorting scenarios. Each consumer thread only needs to sort the data it is allocated to maintain order globally.
RANDOM DISTRIBUTION
In RANDOM DISTRIBUTION, the producer threads randomly scatter the data and send it to the consumer threads to ensure that each consumer thread handles nearly the same amount of data. This achieves load balancing. RANDOM DISTRIBUTION is commonly used in multi-threaded parallel
UNION ALLscenarios. In such scenarios, it is necessary to scatter the data and achieve load balancing, but the data rows do not need to be ordered or associated.HYBRID HASH DISTRIBUTION
Hybrid HASH DISTRIBUTION is used in adaptive join algorithms. Based on the statistical information collected, OceanBase Database provides a set of configuration parameters to define regular values and frequent values. In the hybrid HASH DISTRIBUTION method, regular values on both sides of the join are hashed and distributed, frequent values on the left side are broadcast, and frequent values on the right side are randomly distributed.

Data transmission between producers and consumers
At any given time, DFOs that are parallelly scheduled by the same operator are connected in a producer-consumer manner for parallel execution. To transmit data between producers and consumers, a data transmission network is needed to be created.
For example, suppose a data scan DFO uses DOP = 2 and a data aggregation DFO uses DOP = 3. In this case, each of the two producer threads creates three virtual links to connect to the consumer threads, resulting in a total of six virtual links. The following figure illustrates this scenario.

The virtual transmission network created is called the data transfer layer (DTL). In the parallel execution framework of OceanBase Database, all control messages and row data are transmitted through the DTL. Each worker thread can establish thousands of virtual links, demonstrating the high scalability of the DTL. Additionally, the DTL supports features such as data buffering, batch data transmission, and automatic flow control.
When the two ends of the DTL are on the same node, the DTL transmits messages through memory copying. If the two ends of the DTL are on different nodes, the DTL transmits messages through network communication.
Worker threads
In a parallel query, there are two types of threads: a main thread and multiple worker threads. The main thread runs in the thread pool that is also used for ordinary TP queries, whereas the worker threads run in a dedicated thread pool.
OceanBase Database uses a dedicated thread pool to allocate worker threads for parallel queries. Each tenant has a dedicated parallel execution thread pool on each node it belongs to. All worker threads for parallel queries are allocated from these thread pools.
Before a parallel scheduling operator schedules each DFO, it requests thread resources from the thread pool. After the DFO executes, the thread resources are immediately released.
The initial size of the thread pool is 0 and it can grow dynamically without an upper limit. To avoid excessive idle threads, the thread pool has an automatic thread recycling mechanism. For any thread:
- If it has been idle for more than 10 minutes and the thread pool still has more than 8 threads, the thread is recycled and destroyed.
- If it has been idle for more than 60 minutes, the thread is unconditionally destroyed.
Although the upper limit of the thread pool size is unlimited, two mechanisms usually set an upper limit to the actual thread pool size in most cases:
- Before parallel execution starts, you must use the Admission module to reserve thread resources. You can only start execution after the reservation succeeds. This mechanism limits the number of concurrent queries. For more information, see Concurrency control and queuing.
- Each time when you request threads from the thread pool, the maximum number of threads that can be allocated to you at a time is N. Here, N is the result of the MIN_CPU of the unit multiplied by the px_workers_per_cpu_quota tenant parameter. If you request more threads than N, at most N threads will be allocated to you. The default value of px_workers_per_cpu_quota is 10. For example, the DOP of a DFO is 100. It needs to request 30 threads from node A and 70 threads from node B. If the MIN_CPU of the unit is 4 and the px_workers_per_cpu_quota tenant parameter is 10, then N = 4 × 10 = 40. In this case, the DFO actually requests 30 threads from node A and 40 threads from node B, and its actual DOP is 70.
Optimize performance by load balancing
To achieve the best performance, the tasks assigned to all worker threads should be as equal as possible.
When SQL statements use block granularity to divide tasks, the system dynamically allocates tasks to worker threads. This minimizes workload imbalance, ensuring that no worker thread is significantly overloaded.
When SQL statements use partition granularity to divide tasks, you can optimize performance by setting the number of tasks to an integer multiple of the number of worker threads. This approach is useful in partition-wise join and parallel DML scenarios.
Assume that a table has 16 partitions with approximately equal data amounts. You can use 16 worker threads (with a DOP of 16) to complete the work in about 1/16 of the time; you can also use five worker threads (with a DOP of 5) to complete the work in about 1/5 of the time; or you can use two worker threads (with a DOP of 2) to complete the work in about half the time. However, if you use 15 worker threads to process 16 partitions, the first thread will start processing the 16th partition after it completes the first one. The other threads will become idle after they complete their work. If the data amounts in each partition are similar, this configuration results in poor performance. If the data amounts in each partition vary, the actual performance varies accordingly.
Similarly, assume that you use six worker threads to process 16 partitions with similar data amounts in each partition.
In this case, after each thread completes the work in the first partition, it will move on to the second partition. However, only four threads will process the third partition, and the other two threads will remain idle.
Generally, the time required to perform parallel operations on N partitions by using P worker threads is not equal to N/P. This formula does not take into account the fact that some threads may need to wait for other threads to complete the last partition. However, you can select an appropriate DOP to minimize workload imbalance and optimize performance.
Scenarios where parallel execution is not recommended
Parallel execution is generally not recommended in the following scenarios:
Typical SQL queries take milliseconds to execute in your system.
Parallel queries have scheduling overheads measured in milliseconds. For short queries, the scheduling overheads can offset the benefits of parallel execution.
Your system is under high load.
The design goal of parallel execution is to make full use of idle system resources. If the system itself has no idle resources, parallel execution is unlikely to improve performance and may even degrade overall system performance.
Serial execution uses a single thread to perform database operations. It outperforms parallel execution in the following scenarios:
- The query accesses a small amount of data.
- High concurrency is required.
- The query execution time is less than 100 ms.
Parallel execution cannot be enabled for the following scenarios:
- The top-level DFO does not require parallel execution. It is responsible for interactions with the client and performs operations that do not require parallel execution at the top level, such as
LIMITandPX COORDINATOR. - A DFO containing a
TABLEuser-defined function (UDF) must be executed serially, but other DFOs can still be executed in parallel. - Parallel execution is not supported for ordinary
SELECTand DML statements in OLTP systems.