The optimizer of OceanBase Database generates a distributed execution plan in the following two stages:
In these two stages, the execution plans are generated in the following way:
Stage 1: The optimizer generates optimal execution plans for an SQL query based on the optimal local relationships, regardless of the physical distribution of data. Then, it checks whether the query accesses data of multiple partitions, or whether the user uses hints to forcibly run a parallel query when the query accesses a local single-partition table.
Stage 2: The optimizer generates a distributed plan. The optimizer inserts
EXCHANGEoperators to nodes that require data redistribution to convert the local execution plan tree to a distributed execution plan.
Operators of a distributed execution plan
Generating a distributed plan is essentially a process of finding the best position in the original plan tree to insert the EXCHANGE operators. When the optimizer traverses the plan tree from the top down, it needs to analyze the data processing results of the corresponding operators and the data partitions of the input operators before inserting an EXCHANGE operator.
The following example shows the simplest syntax of a single-table scan:
obclient> CREATE TABLE t1 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT * FROM t1\G
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------
|0 |PX COORDINATOR | |500000 |545109|
|1 | EXCHANGE OUT DISTR |:EX10000|500000 |320292|
|2 | PX PARTITION ITERATOR| |500000 |320292|
|3 | TABLE SCAN |T1 |500000 |320292|
======================================================
Outputs & filters:
-------------------------------------
0 - output([T1.V1], [T1.V2]), filter(nil)
1 - output([T1.V1], [T1.V2]), filter(nil), dop=1
2 - output([T1.V1], [T1.V2]), filter(nil)
3 - output([T1.V1], [T1.V2]), filter(nil),
access([T1.V1], [T1.V2]), partitions(p[0-4])
If t1 is a partitioned table, you can insert a matching EXCHANGE operator to TABLE SCAN and encapsulate TABLE SCAN and EXCHANGE OUT into a job for parallel execution.
Single-input pushdown operator
Single-input pushdown operators mainly include the AGGREGATION, SORT, GROUP BY, and LIMIT operators. All listed operators, except for the LIMIT operator, have a matching operation key. If the distribution of the input data is consistent with the operation keys, a one-stage aggregation, also known as a partition-wise aggregation, is performed. Otherwise, a two-stage aggregation is required, and the AGGREGATION operator is pushed down.
Example of one-stage aggregation:
obclient> CREATE TABLE t2 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT SUM(v1) FROM t2 GROUP BY v1\G
*************************** 1. row ***************************
Query Plan:
| ======================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------
|0 |PX COORDINATOR | |101 |357302|
|1 | EXCHANGE OUT DISTR |:EX10000|101 |357297|
|2 | PX PARTITION ITERATOR| |101 |357297|
|3 | MERGE GROUP BY | |101 |357297|
|4 | TABLE SCAN |t2 |400000 |247403|
======================================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(t2.v1)]), filter(nil)
1 - output([T_FUN_SUM(t2.v1)]), filter(nil), dop=1
2 - output([T_FUN_SUM(t2.v1)]), filter(nil)
3 - output([T_FUN_SUM(t2.v1)]), filter(nil),
group([t2.v1]), agg_func([T_FUN_SUM(t2.v1)])
4 - output([t2.v1]), filter(nil),
access([t2.v1]), partitions(p[0-3])
Example of two-stage aggregation:
| ============================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------------
|0 |PX COORDINATOR | |101 |561383|
|1 | EXCHANGE OUT DISTR |:EX10001|101 |561374|
|2 | HASH GROUP BY | |101 |561374|
|3 | EXCHANGE IN DISTR | |101 |408805|
|4 | EXCHANGE OUT DISTR (HASH)|:EX10000|101 |408795|
|5 | HASH GROUP BY | |101 |408795|
|6 | PX PARTITION ITERATOR | |400000 |256226|
|7 | TABLE SCAN |t2 |400000 |256226|
============================================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil)
1 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil), dop=1
2 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil),
group([t2.v2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.v1))])
3 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil)
4 - (#keys=1, [t2.v2]), output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil), dop=1
5 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil),
group([t2.v2]), agg_func([T_FUN_SUM(t2.v1)])
6 - output([t2.v1], [t2.v2]), filter(nil)
7 - output([t2.v1], [t2.v2]), filter(nil),
access([t2.v1], [t2.v2]), partitions(p[0-3])
Binary input operators
Among all binary input operators, you need to be concerned with the JOIN operator. The JOIN operator is used to generate distributed execution plans and select the data redistribution methods based on rules. The following three JOIN modes are supported for the JOIN operator:
Partition-wise join
Partition-wise join is used if both the left and right tables are partitioned in the same way, they have the same physical distribution, and their partitioning keys are the
joinconditions. For example:obclient> CREATE TABLE t3 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4; Query OK, 0 rows affected obclient> EXPLAIN SELECT * FROM t2, t3 WHERE t2.v1 = t3.v1\G *************************** 1. row *************************** Query Plan: =========================================================== |ID|OPERATOR |NAME |EST. ROWS |COST | |0 |PX COORDINATOR | |1568160000|1227554264| |1 | EXCHANGE OUT DISTR |:EX10000|1568160000|930670004 | |2 | PX PARTITION ITERATOR| |1568160000|930670004 | |3 | MERGE JOIN | |1568160000|930670004 | |4 | TABLE SCAN |t2 |400000 |256226 | |5 | TABLE SCAN |t3 |400000 |256226 | =========================================================== Outputs & filters: ------------------------------------- 0 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil) 1 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil), dop=1 2 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil) 3 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil), equal_conds([t2.v1 = t3.v1]), other_conds(nil) 4 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3]) 5 - output([t3.v1], [t3.v2]), filter(nil), access([t3.v1], [t3.v2]), partitions(p[0-3])Partial partition-wise join
When only one of the left and right tables is partitioned, or they both are partitioned but the join key is the same as the partitioning key of one of the two tables. Then, the partition distribution of the table where the corresponding partitioning key belongs is taken as the basis to redistribute the data in the other table. For example:
obclient> CREATE TABLE t4 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 3; Query OK, 0 rows affected obclient> EXPLAIN SELECT * FROM t4, t2 WHERE t2.v1 = t4.v1\G *************************** 1. row *************************** Query Plan: =========================================================== |ID|OPERATOR |NAME |EST. ROWS|COST | ----------------------------------------------------------- |0 |PX COORDINATOR | |11880 |17658| |1 | EXCHANGE OUT DISTR |:EX10001|11880 |15409| |2 | NESTED-LOOP JOIN | |11880 |15409| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (PKEY)|:EX10000|3 |37 | |5 | PX PARTITION ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | PX PARTITION ITERATOR | |3960 |2561 | |8 | TABLE SCAN |t2 |3960 |2561 | =========================================================== Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=1 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), conds(nil), nl_params_([t4.v1]) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - (#keys=1, [t4.v1]), output([t4.v1], [t4.v2]), filter(nil), dop=1 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3])Data redistribution
If the join key is irrelative to the partitioning keys of both tables, you can choose either
BROADCASTorHASH-HASHfor data redistribution based on the specified rules. For more information, see the following example.Notice
The two data redistribution methods in the following example are available only when the DOP is greater than 1.
obclient> EXPLAIN SELECT /*+ PARALLEL(2)*/* FROM t4, t2 WHERE t2.v2 = t4.v2\G *************************** 1. row *************************** Query Plan: ================================================================= |ID|OPERATOR |NAME |EST. ROWS|COST | ----------------------------------------------------------------- |0 |PX COORDINATOR | |11880 |396863| |1 | EXCHANGE OUT DISTR |:EX10001|11880 |394614| |2 | HASH JOIN | |11880 |394614| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (BROADCAST)|:EX10000|3 |37 | |5 | PX BLOCK ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | PX PARTITION ITERATOR | |400000 |256226| |8 | TABLE SCAN |t2 |400000 |256226| ================================================================= Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), equal_conds([t2.v2 = t4.v2]), other_conds(nil) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - output([t4.v1], [t4.v2]), filter(nil), dop=2 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3]) obclient> EXPLAIN SELECT /*+ PQ_DISTRIBUTE(t2 HASH HASH) PARALLEL(2)*/* FROM t4, t2 WHERE t2.v2 = t4.v2\G *************************** 1. row *************************** Query Plan: ============================================================ |ID|OPERATOR |NAME |EST. ROWS|COST | ------------------------------------------------------------ |0 |PX COORDINATOR | |11880 |434727| |1 | EXCHANGE OUT DISTR |:EX10002|11880 |432478| |2 | HASH JOIN | |11880 |432478| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (HASH)|:EX10000|3 |37 | |5 | PX BLOCK ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | EXCHANGE IN DISTR | |400000 |294090| |8 | EXCHANGE OUT DISTR (HASH)|:EX10001|400000 |256226| |9 | PX PARTITION ITERATOR | |400000 |256226| |10| TABLE SCAN |t2 |400000 |256226| ============================================================ Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), equal_conds([t2.v2 = t4.v2]), other_conds(nil) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - (#keys=1, [t4.v2]), output([t4.v1], [t4.v2]), filter(nil), dop=2 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - (#keys=1, [t2.v2]), output([t2.v1], [t2.v2]), filter(nil), dop=2 9 - output([t2.v1], [t2.v2]), filter(nil) 10 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3])