Parallel execution involves the division of a large task into multiple small tasks, which are processed in parallel by multiple threads or processes. Parallel execution uses more CPU and I/O resources to reduce the response time.
Parallel execution involves parallel queries, parallel DDL operations, and parallel DML operations.
You can run parallel queries in the following scenarios:
You can use the
PARALLELhint to specify the degree of parallelism (DOP) for a query and execute this query in parallel.The system automatically runs a parallel query on a partitioned table that has more than one partition.
Run a parallel query on a partitioned table
For a partitioned table that has more than one partition to be queried, the system automatically runs a parallel query. By default, the dop value for the query is 1.
In the following example, a partitioned table ptable is created, and a full table scan is performed on the entire table. You can execute the EXPLAIN statement to view the execution plan generated by the optimizer.
obclient> CREATE TABLE PTABLE(c1 INT , c2 INT) PARTITION BY HASH(c1) PARTITIONS 16;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT * FROM ptable;
Query Plan:
=======================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
-------------------------------------------------------
|0 |EXCHANGE IN DISTR | |1600000 |1246946|
|1 | EXCHANGE OUT DISTR |:EX10000|1600000 |1095490|
|2 | PX PARTITION ITERATOR| |1600000 |1095490|
|3 | TABLE SCAN |ptable |1600000 |1095490|
=======================================================
Outputs & filters:
-------------------------------------
0 - output([ptable.c1], [ptable.c2]), filter(nil)
1 - output([ptable.c1], [ptable.c2]), filter(nil), dop=1
2 - output([ptable.c1], [ptable.c2]), filter(nil)
3 - output([ptable.c1], [ptable.c2]), filter(nil),
access([ptable.c1], [ptable.c2]), partitions(p[0-15])
The execution plan shows that the default dop for a parallel query is 1 for a partitioned table. Assume that your OceanBase cluster has three OBServer nodes, and the table ptable has 16 partitions scattered on these three OBServer nodes. Each OBServer node will start a worker thread to scan data in these partitions, which means a total of three worker threads are started.
To query a partitioned table, you can add the PARALLEL hint to the query and set dop to manually run a parallel query. Then, you can execute the EXPLAIN statement to view the execution plan generated by the optimizer.
obclient> EXPLAIN SELECT /*+ PARALLEL(8) */ * FROM ptable;
Query Plan:
=======================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
-------------------------------------------------------
|0 |EXCHANGE IN DISTR | |1600000 |1246946|
|1 | EXCHANGE OUT DISTR |:EX10000|1600000 |1095490|
|2 | PX PARTITION ITERATOR| |1600000 |1095490|
|3 | TABLE SCAN |ptable |1600000 |1095490|
=======================================================
Outputs & filters:
-------------------------------------
0 - output([ptable.c1], [ptable.c2]), filter(nil)
1 - output([ptable.c1], [ptable.c2]), filter(nil), dop=8
2 - output([ptable.c1], [ptable.c2]), filter(nil)
3 - output([ptable.c1], [ptable.c2]), filter(nil),
access([ptable.c1], [ptable.c2]), partitions(p[0-15])
In this execution plan, the dop value for the parallel query is 8. If the number of OBServer nodes where the queried partitions are located is less than or equal to the dop value, the worker threads are assigned to these OBServer nodes based on the specified rule. The total number of worker threads is equal to the dop value. If the number of queried OBServer nodes is greater than the dop value, each OBServer node starts at least one worker thread. Therefore, the total number of worker threads will be greater than the dop value.
For example, if dop = 8 and 16 partitions are evenly distributed on four OBServer nodes, each OBServer node starts two worker threads to scan the corresponding partitions (a total of eight worker threads are started). If 16 partitions are evenly distributed on 16 OBServer nodes (one partition for a node), each OBServer node starts one worker thread to scan the corresponding partition (a total of 16 worker threads are started).
To query a partitioned table that has zero or one partition to be queried, the system does not run a parallel query. The following example adds a filter condition c1 = 1 to the query on ptable.
obclient> EXPLAIN SELECT * FROM ptable WHERE c1 = 1;
Query Plan:
======================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
--------------------------------------
|0 |TABLE SCAN|ptable|990 |85222|
======================================
Outputs & filters:
-------------------------------------
0 - output([ptable.c1], [ptable.c2]), filter([ptable.c1 = 1]),
access([ptable.c1], [ptable.c2]), partitions(p1)
The plan indicates that only one partition needs to be queried. So, the system did not run a parallel query. To run a parallel query on a single partition, you can only add the PARALLEL hint to the query. Then, you can execute the EXPLAIN statement to view the execution plan generated by the optimizer.
obclient> EXPLAIN SELECT /*+ PARALLEL(8) */ * FROM ptable WHERE c1 = 1;
Query Plan:
=================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
-------------------------------------------------
|0 |EXCHANGE IN DISTR | |990 |85316|
|1 | EXCHANGE OUT DISTR|:EX10000|990 |85222|
|2 | PX BLOCK ITERATOR| |990 |85222|
|3 | TABLE SCAN |ptable |990 |85222|
=================================================
Outputs & filters:
-------------------------------------
0 - output([ptable.c1], [ptable.c2]), filter(nil)
1 - output([ptable.c1], [ptable.c2]), filter(nil), dop=8
2 - output([ptable.c1], [ptable.c2]), filter(nil)
3 - output([ptable.c1], [ptable.c2]), filter([ptable.c1 = 1]),
access([ptable.c1], [ptable.c2]), partitions(p1)
Notice
- To run a query in parallel on a single partition by specifying the
PARALLELhint in the query, you must set thedopvalue of the query to a value greater than or equal to 2. - If the
dopvalue of a query is null or less than 2, the system does run a parallel query.
Run a parallel query on a non-partitioned table
A non-partitioned table is essentially a partitioned table with one partition. Therefore, you can run a parallel query on a non-partitioned table only by adding the PARALLEL hint to the query.
In the following example, a non-partitioned table stable is created, and a full table scan is performed on the stable table. You can execute the EXPLAIN statement to view the execution plan generated by the optimizer.
obclient> CREATE TABLE stable(c1 INT, c2 INT);
Query OK, 0 rows affected
obclient> EXPLAIN SELECT * FROM stable;
Query Plan:
======================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
--------------------------------------
|0 |TABLE SCAN|stable|100000 |68478|
======================================
Outputs & filters:
-------------------------------------
0 - output([stable.c1], [stable.c2]), filter(nil),
access([stable.c1], [stable.c2]), partitions(p0)
The execution plan shows that the system does not run a parallel query on a non-partitioned table if no hint is added to the query.
You can add the PARALLEL hint to a query and set its dop value to a value greater than or equal to 2 to run a parallel query on the non-partitioned table. Then, you can execute the EXPLAIN statement to view the execution plan generated by the optimizer.
obclient> EXPLAIN SELECT /*+ PARALLEL(4)*/ * FROM stable;
Query Plan:
=================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
-------------------------------------------------
|0 |EXCHANGE IN DISTR | |100000 |77944|
|1 | EXCHANGE OUT DISTR|:EX10000|100000 |68478|
|2 | PX BLOCK ITERATOR| |100000 |68478|
|3 | TABLE SCAN |stable |100000 |68478|
=================================================
Outputs & filters:
-------------------------------------
0 - output([stable.c1], [stable.c2]), filter(nil)
1 - output([stable.c1], [stable.c2]), filter(nil), dop=4
2 - output([stable.c1], [stable.c2]), filter(nil)
3 - output([stable.c1], [stable.c2]), filter(nil),
access([stable.c1], [stable.c2]), partitions(p0)
Run a parallel query on multiple tables
Multi-table JOIN queries are most frequently used.
In the following example, partitioned tables p1table and p2table are created:
obclient> CREATE TABLE p1table(c1 INT ,c2 INT) PARTITION BY HASH(c1) PARTITIONS 2;
Query OK, 0 rows affected
obclient> CREATE TABLE p2table(c1 INT ,c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected
Use the join condition p1table.c1=p2table.c2 to join the result sets for a query on p1table and p2table. The following example shows the execution plan for the query:
obclient> EXPLAIN SELECT * FROM p1table p1 JOIN p2table p2 ON p1.c1=p2.c2;
Query Plan:
====================================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
--------------------------------------------------------------------
|0 |EXCHANGE IN DISTR | |784080000|614282633|
|1 | EXCHANGE OUT DISTR |:EX10001|784080000|465840503|
|2 | HASH JOIN | |784080000|465840503|
|3 | EXCHANGE IN DISTR | |200000 |155887 |
|4 | EXCHANGE OUT DISTR (BROADCAST)|:EX10000|200000 |136955 |
|5 | PX PARTITION ITERATOR | |200000 |136955 |
|6 | TABLE SCAN |p1 |200000 |136955 |
|7 | PX PARTITION ITERATOR | |400000 |273873 |
|8 | TABLE SCAN |p2 |400000 |273873 |
====================================================================
Outputs & filters:
-------------------------------------
0 - output([p1.c1], [p1.c2], [p2.c1], [p2.c2]), filter(nil)
1 - output([p1.c1], [p1.c2], [p2.c1], [p2.c2]), filter(nil), dop=1
2 - output([p1.c1], [p1.c2], [p2.c1], [p2.c2]), filter(nil),
equal_conds([p1.c1 = p2.c2]), other_conds(nil)
3 - output([p1.c1], [p1.c2]), filter(nil)
4 - output([p1.c1], [p1.c2]), filter(nil), dop=1
5 - output([p1.c1], [p1.c2]), filter(nil)
6 - output([p1.c1], [p1.c2]), filter(nil),
access([p1.c1], [p1.c2]), partitions(p[0-1])
7 - output([p2.c1], [p2.c2]), filter(nil)
8 - output([p2.c1], [p2.c2]), filter(nil),
access([p2.c1], [p2.c2]), partitions(p[0-3])
By default, the system runs the query in parallel on both p1table and p2table because they both have more than one partition to be queried. The default value of dop is 1. Likewise, you can change the DOP of a query by adding the PARALLEL hint to it.
In the following example, the join condition is changed to p1table.c1=p2table.c2 and p2table.c1=1 so that only a single partition is selected for p2table. The following example shows the execution plan for the query:
obclient> EXPLAIN SELECT * FROM p1table p1 JOIN p2table p2 ON p1.c1=p2.c2 AND p2.c1=1;
Query Plan:
=============================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
-------------------------------------------------------------
|0 |EXCHANGE IN DISTR | |1940598 |1807515|
|1 | EXCHANGE OUT DISTR |:EX10001|1940598 |1440121|
|2 | HASH JOIN | |1940598 |1440121|
|3 | EXCHANGE IN DISTR | |990 |85316 |
|4 | EXCHANGE OUT DISTR (PKEY)|:EX10000|990 |85222 |
|5 | TABLE SCAN |p2 |990 |85222 |
|6 | PX PARTITION ITERATOR | |200000 |136955 |
|7 | TABLE SCAN |p1 |200000 |136955 |
=============================================================
Outputs & filters:
-------------------------------------
0 - output([p1.c1], [p1.c2], [p2.c1], [p2.c2]), filter(nil)
1 - output([p1.c1], [p1.c2], [p2.c1], [p2.c2]), filter(nil), dop=1
2 - output([p1.c1], [p1.c2], [p2.c1], [p2.c2]), filter(nil),
equal_conds([p1.c1 = p2.c2]), other_conds(nil)
3 - output([p2.c1], [p2.c2]), filter(nil)
4 - (#keys=1, [p2.c2]), output([p2.c1], [p2.c2]), filter(nil), dop=1
5 - output([p2.c1], [p2.c2]), filter([p2.c1 = 1]),
access([p2.c1], [p2.c2]), partitions(p1)
6 - output([p1.c1], [p1.c2]), filter(nil)
7 - output([p1.c1], [p1.c2]), filter(nil),
access([p1.c1], [p1.c2]), partitions(p[0-1])
In this plan, p2table has only one partition be scanned, and p1table has two. By default, a parallel query is run on p1table and not on p2table. Likewise, you can change the DOP of a query by adding the PARALLEL hint to it. Then, you can run a query in parallel on a single partition of p2table.
System views for parallel execution
OceanBase Database provides a system view (G)V$OB_SQL_AUDIT to demonstrate the status and other information about parallel execution.
The (G)V$OB_SQL_AUDIT view has many fields, among which qc_id, dfo_id, sqc_id, and worker_id are related to parallel execution.
For more information, see (G)V$OB_SQL_AUDIT.