This topic provides specific examples of how to enable parallel queries for partitioned tables, non-partitioned tables, multiple tables, and how to disable parallel queries, as well as the system views related to parallel execution in OceanBase Database.
Enable parallel query for partitioned tables
When querying a partitioned table, if the number of target partitions exceeds 1, the system automatically enables parallel query, with the DOP value set to 1 by default.
As shown in the example below, a partitioned table named ptable is created, and a full-table scan operation is performed on ptable. The EXPLAIN command is used to view the generated execution plan. The execution plan shows that the default DOP value for parallel query on a partitioned table is 1. If the OceanBase cluster has 3 OBServer nodes and the 16 partitions of the ptable table are evenly distributed across the 3 OBServer nodes, each OBServer node will start a worker thread to scan the data in its partitions. In this case, three worker threads will be started in total to scan the table.
obclient> CREATE TABLE PTABLE(c1 INT , c2 INT) PARTITION BY HASH(c1) PARTITIONS 16;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT * FROM ptable;
+---------------------------------------------------------------------------------+
| Query Plan |
+---------------------------------------------------------------------------------+
| ============================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |62 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |62 | |
| |2 | └─PX PARTITION ITERATOR| |1 |61 | |
| |3 | └─TABLE FULL SCAN |ptable |1 |61 | |
| ============================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(ptable.c1, ptable.c2)]), filter(nil), rowset=16 |
| 1 - output([INTERNAL_FUNCTION(ptable.c1, ptable.c2)]), filter(nil), rowset=16 |
| dop=1 |
| 2 - output([ptable.c1], [ptable.c2]), filter(nil), rowset=16 |
| force partition granule |
| 3 - output([ptable.c1], [ptable.c2]), filter(nil), rowset=16 |
| access([ptable.c1], [ptable.c2]), partitions(p[0-15]) |
| is_index_back=false, is_global_index=false, |
| range_key([ptable.__pk_increment]), range(MIN ; MAX)always true |
+---------------------------------------------------------------------------------+
19 rows in set
For a partitioned table, parallel query can be enabled by adding the PARALLEL hint and specifying the dop value. The EXPLAIN command is used to view the generated execution plan.
obclient> EXPLAIN SELECT /*+ PARALLEL(8) */ * FROM ptable;
Query Plan: ==================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
--------------------------------------------------
|0 |PX COORDINATOR | |1600000 |737992|
|1 | EXCHANGE OUT DISTR|:EX10000|1600000 |618888|
|2 | PX BLOCK ITERATOR| |1600000 |618888|
|3 | TABLE SCAN |ptable |1600000 |618888|
==================================================
Outputs & filters:
-------------------------------------
0 - output([INTERNAL_FUNCTION(ptable.c1, ptable.c2)]), filter(nil)
1 - output([INTERNAL_FUNCTION(ptable.c1, ptable.c2)]), filter(nil), dop=8
2 - output([ptable.c1], [ptable.c2]), filter(nil)
3 - output([ptable.c1], [ptable.c2]), filter(nil),
access([ptable.c1], [ptable.c2]), partitions(p[0-15])
The execution plan shows that the DOP value for parallel query is 8. If the number of OBServer nodes containing the query partitions is less than or equal to the DOP value, the worker threads (with a total number equal to the DOP value) will be distributed to the involved OBServer nodes based on a certain strategy. If the number of OBServer nodes containing the query partitions exceeds the DOP value, each OBServer node will start at least one worker thread. The total number of worker threads to be started will exceed the DOP value.
For example, when the DOP value is 8, if 16 partitions are evenly distributed across four OBServer nodes, each OBServer node will start two worker threads to scan its corresponding partitions (a total of eight worker threads). If the 16 partitions are distributed across 16 OBServer nodes (with each node hosting one partition), each OBServer node will start one worker thread to scan its corresponding partition (a total of 16 worker threads).
If the number of query partitions for a partitioned table is less than or equal to 1, parallel query is not enabled. As shown in the example below, a filter condition c1=1 is added to the query on the ptable table.
obclient> EXPLAIN SELECT * FROM ptable WHERE c1 = 1;
+------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------+
| ======================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ------------------------------------------------------- |
| |0 |EXCHANGE IN REMOTE | |1 |5 | |
| |1 |└─EXCHANGE OUT REMOTE| |1 |5 | |
| |2 | └─TABLE FULL SCAN |ptable|1 |4 | |
| ======================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([ptable.c1], [ptable.c2]), filter(nil) |
| 1 - output([ptable.c1], [ptable.c2]), filter(nil) |
| 2 - output([ptable.c1], [ptable.c2]), filter([ptable.c1 = 1]), rowset=16 |
| access([ptable.c1], [ptable.c2]), partitions(p1) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false], |
| range_key([ptable.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------+
15 rows in set
The execution plan shows that the number of target partitions is 1, and parallel query is not enabled. If you want to enable parallel query for a query on a single partition, you can only add the PARALLEL hint to enable parallel query within the partition and use the EXPLAIN command to view the generated execution plan.
obclient> EXPLAIN SELECT /*+ PARALLEL(8) */ * FROM ptable WHERE c1 = 1;
+------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------+
| ========================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| --------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |1 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |1 | |
| |2 | └─PX BLOCK ITERATOR| |1 |1 | |
| |3 | └─TABLE FULL SCAN|ptable |1 |1 | |
| ========================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(ptable.c1, ptable.c2)]), filter(nil), rowset=16 |
| 1 - output([INTERNAL_FUNCTION(ptable.c1, ptable.c2)]), filter(nil), rowset=16 |
| dop=8 |
| 2 - output([ptable.c1], [ptable.c2]), filter(nil), rowset=16 |
| 3 - output([ptable.c1], [ptable.c2]), filter([ptable.c1 = 1]), rowset=16 |
| access([ptable.c1], [ptable.c2]), partitions(p1) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false], |
| range_key([ptable.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------+
18 rows in set
Note
To enable parallel query within a partition when the number of query partitions is 1, the DOP value must be greater than or equal to 2. If the DOP value is empty or less than 2, parallel query will not be enabled.
Enable parallel query for non-partitioned tables
A non-partitioned table is essentially a partitioned table with only one partition. Therefore, queries on a non-partitioned table can only be enabled for parallel query within the partition by adding the PARALLEL hint. Otherwise, parallel query will not be enabled.
As shown in the example below, a non-partitioned table named stable is created, and a full-table scan operation is performed on stable. The EXPLAIN command is used to view the generated execution plan.
obclient> CREATE TABLE stable(c1 INT, c2 INT);
Query OK, 0 rows affected
obclient> EXPLAIN SELECT * FROM stable;
+-----------------------------------------------------------------------+
| Query Plan |
+-----------------------------------------------------------------------+
| ======================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ------------------------------------------------------- |
| |0 |EXCHANGE IN REMOTE | |1 |5 | |
| |1 |└─EXCHANGE OUT REMOTE| |1 |5 | |
| |2 | └─TABLE FULL SCAN |stable|1 |4 | |
| ======================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([stable.c1], [stable.c2]), filter(nil) |
| 1 - output([stable.c1], [stable.c2]), filter(nil) |
| 2 - output([stable.c1], [stable.c2]), filter(nil), rowset=16 |
| access([stable.c1], [stable.c2]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([stable.__pk_increment]), range(MIN ; MAX)always true |
+-----------------------------------------------------------------------+
15 rows in set
The execution plan shows that parallel query is not enabled for a non-partitioned table when the PARALLEL hint is not added.
For a non-partitioned table, parallel query can be enabled by adding the PARALLEL hint and specifying the dop value (greater than or equal to 2). The EXPLAIN command is used to view the generated execution plan.
obclient> EXPLAIN SELECT /*+ PARALLEL(4)*/ * FROM stable;
+---------------------------------------------------------------------------------+
| Query Plan |
+---------------------------------------------------------------------------------+
| ========================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| --------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |2 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |2 | |
| |2 | └─PX BLOCK ITERATOR| |1 |1 | |
| |3 | └─TABLE FULL SCAN|stable |1 |1 | |
| ========================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(stable.c1, stable.c2)]), filter(nil), rowset=16 |
| 1 - output([INTERNAL_FUNCTION(stable.c1, stable.c2)]), filter(nil), rowset=16 |
| dop=4 |
| 2 - output([stable.c1], [stable.c2]), filter(nil), rowset=16 |
| 3 - output([stable.c1], [stable.c2]), filter(nil), rowset=16 |
| access([stable.c1], [stable.c2]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([stable.__pk_increment]), range(MIN ; MAX)always true |
+---------------------------------------------------------------------------------+
18 rows in set
Enable multi-table parallel query
A multi-table JOIN query is the most common type of multi-table query. In a multi-table query, if the number of partitions to be queried for each table is greater than 1, parallel query is enabled for each table.
The following example shows how to create two partitioned tables named p1table and p2table.
obclient> CREATE TABLE p1table(c1 INT ,c2 INT) PARTITION BY HASH(c1) PARTITIONS 2;
Query OK, 0 rows affected
obclient> CREATE TABLE p2table(c1 INT ,c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
Query OK, 0 rows affected
The following example shows how to query the JOIN result of p1table and p2table, where the JOIN condition is p1table.c1 = p2table.c2. The execution plan is as follows:
obclient> EXPLAIN SELECT p1table.c1, p2table.c1 AS p2_c1 FROM p1table JOIN p2table ON p1table.c1 = p2table.c2;
+-----------------------------------------------------------------------------------+
| Query Plan |
+-----------------------------------------------------------------------------------+
| ===================================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| --------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |25 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |25 | |
| |2 | └─HASH JOIN | |1 |25 | |
| |3 | ├─PX PARTITION ITERATOR | |1 |8 | |
| |4 | │ └─TABLE FULL SCAN |p1table |1 |8 | |
| |5 | └─EXCHANGE IN DISTR | |1 |17 | |
| |6 | └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |16 | |
| |7 | └─PX PARTITION ITERATOR | |1 |16 | |
| |8 | └─TABLE FULL SCAN |p2table |1 |16 | |
| ===================================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(p1table.c1, p2table.c1)]), filter(nil), rowset=16 |
| 1 - output([INTERNAL_FUNCTION(p1table.c1, p2table.c1)]), filter(nil), rowset=16 |
| dop=1 |
| 2 - output([p1table.c1], [p2table.c1]), filter(nil), rowset=16 |
| equal_conds([p1table.c1 = p2table.c2]), other_conds(nil) |
| 3 - output([p1table.c1]), filter(nil), rowset=16 |
| affinitize, force partition granule |
| 4 - output([p1table.c1]), filter(nil), rowset=16 |
| access([p1table.c1]), partitions(p[0-1]) |
| is_index_back=false, is_global_index=false, |
| range_key([p1table.__pk_increment]), range(MIN ; MAX)always true |
| 5 - output([p2table.c2], [p2table.c1]), filter(nil), rowset=16 |
| 6 - output([p2table.c2], [p2table.c1]), filter(nil), rowset=16 |
| (#keys=1, [p2table.c2]), dop=1 |
| 7 - output([p2table.c1], [p2table.c2]), filter(nil), rowset=16 |
| force partition granule |
| 8 - output([p2table.c1], [p2table.c2]), filter(nil), rowset=16 |
| access([p2table.c1], [p2table.c2]), partitions(p[0-3]) |
| is_index_back=false, is_global_index=false, |
| range_key([p2table.__pk_increment]), range(MIN ; MAX)always true |
+-----------------------------------------------------------------------------------+
35 rows in set
By default, parallel query is enabled for p1table and p2table (the number of partitions to be queried for each table is greater than 1). The default value of dop is 1. You can also change the degree of parallelism by using the PARALLEL hint.
The following example shows how to change the JOIN condition to p1table.c1 = p2table.c2 and p2table.c1 = 1. In this case, only one partition is selected for p2table. The execution plan is as follows:
obclient> EXPLAIN SELECT * FROM p1table p1 JOIN p2table p2 ON p1.c1=p2.c2 AND p2.c1=1;
+---------------------------------------------------------------------------------------+
| Query Plan |
+---------------------------------------------------------------------------------------+
| ===================================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| --------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |15 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |15 | |
| |2 | └─HASH JOIN | |1 |13 | |
| |3 | ├─PX PARTITION ITERATOR | |1 |8 | |
| |4 | │ └─TABLE FULL SCAN |p1 |1 |8 | |
| |5 | └─EXCHANGE IN DISTR | |1 |5 | |
| |6 | └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |5 | |
| |7 | └─TABLE FULL SCAN |p2 |1 |4 | |
| ===================================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(p1.c1, p1.c2, p2.c1, p2.c2)]), filter(nil), rowset=16 |
| 1 - output([INTERNAL_FUNCTION(p1.c1, p1.c2, p2.c1, p2.c2)]), filter(nil), rowset=16 |
| dop=1 |
| 2 - output([p1.c1], [p2.c2], [p1.c2], [p2.c1]), filter(nil), rowset=16 |
| equal_conds([p1.c1 = p2.c2]), other_conds(nil) |
| 3 - output([p1.c1], [p1.c2]), filter(nil), rowset=16 |
| affinitize, force partition granule |
| 4 - output([p1.c1], [p1.c2]), filter(nil), rowset=16 |
| access([p1.c1], [p1.c2]), partitions(p[0-1]) |
| is_index_back=false, is_global_index=false, |
| range_key([p1.__pk_increment]), range(MIN ; MAX)always true |
| 5 - output([p2.c2], [p2.c1]), filter(nil), rowset=16 |
| 6 - output([p2.c2], [p2.c1]), filter(nil), rowset=16 |
| (#keys=1, [p2.c2]), is_single, dop=1 |
| 7 - output([p2.c1], [p2.c2]), filter([p2.c1 = 1]), rowset=16 |
| access([p2.c1], [p2.c2]), partitions(p1) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false], |
| range_key([p2.__pk_increment]), range(MIN ; MAX)always true |
+---------------------------------------------------------------------------------------+
32 rows in set
The plan shows that only one partition is scanned for p2table. By default, parallel query is not enabled for p2table. Two partitions are scanned for p1table, and parallel query is enabled for p1table by default. You can also change the degree of parallelism by using the PARALLEL hint to enable partition-level parallel query for p2table.
System views related to parallel execution
OceanBase Database provides the system view GV$OB_SQL_AUDIT/V$OB_SQL_AUDIT to view the running status of parallel execution and some statistics.
The GV$OB_SQL_AUDIT/V$OB_SQL_AUDIT view contains many fields. The fields related to parallel execution are qc_id, dfo_id, sqc_id, and worker_id.
For more information, see (G)V$OB_SQL_AUDIT.
