The optimizer of OceanBase Database generates a distributed execution plan in two stages:
Stage 1: Generate the optimal execution plan based on local relationship optimization without considering the physical distribution of data. After the local plan is generated, the optimizer checks whether the query accesses multiple partitions, or whether the user uses a Hint to force parallel query execution when accessing a local single-partition table.
Stage 2: Generate the distributed plan. Insert
EXCHANGEoperators at positions that require data redistribution based on the execution plan tree, thereby converting the original local plan tree into a distributed execution plan.
Generating a distributed plan is the process of finding the appropriate positions in the original plan tree to insert EXCHANGE operators. When traversing the plan tree from top to bottom, the optimizer determines whether to insert an EXCHANGE operator based on the data processing of the corresponding operator and the data partition situation of the input operator.
The following example shows the simplest single-table scan. When table t1 is a partitioned table, you can insert a matching EXCHANGE operator on TABLE SCAN and encapsulate TABLE SCAN and EXCHANGE OUT into a job for parallel execution.
obclient> CREATE TABLE t1 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT * FROM t1\G
*************************** 1. row ***************************
Query Plan:
==============================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------
|0 |PX COORDINATOR | |500000 |545109|
|1 | EXCHANGE OUT DISTR |:EX10000|500000 |320292|
|2 | PX PARTITION ITERATOR| |500000 |320292|
|3 | TABLE SCAN |T1 |500000 |320292|
======================================================
Outputs & filters:
-------------------------------------
0 - output([T1.V1], [T1.V2]), filter(nil)
1 - output([T1.V1], [T1.V2]), filter(nil), dop=1
2 - output([T1.V1], [T1.V2]), filter(nil)
3 - output([T1.V1], [T1.V2]), filter(nil),
access([T1.V1], [T1.V2]), partitions(p[0-4])
Single-input pushdown operator
Single-input pushdown operators mainly include the AGGREGATION, SORT, GROUP BY, and LIMIT operators. All listed operators, except for the LIMIT operator, have a matching operation key. If the distribution of the input data is consistent with the operation keys, a one-stage aggregation, also known as a partition-wise aggregation, is performed. Otherwise, a two-stage aggregation is required, and the AGGREGATION operator is pushed down.
Here is an example of one-stage aggregation:
obclient> CREATE TABLE t2 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT SUM(v1) FROM t2 GROUP BY v1\G
*************************** 1. row ***************************
Query Plan:
| ======================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------
|0 |PX COORDINATOR | |101 |357302|
|1 | EXCHANGE OUT DISTR |:EX10000|101 |357297|
|2 | PX PARTITION ITERATOR| |101 |357297|
|3 | MERGE GROUP BY | |101 |357297|
|4 | TABLE SCAN |t2 |400000 |247403|
======================================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(t2.v1)]), filter(nil)
1 - output([T_FUN_SUM(t2.v1)]), filter(nil), dop=1
2 - output([T_FUN_SUM(t2.v1)]), filter(nil)
3 - output([T_FUN_SUM(t2.v1)]), filter(nil),
group([t2.v1]), agg_func([T_FUN_SUM(t2.v1)])
4 - output([t2.v1]), filter(nil),
access([t2.v1]), partitions(p[0-3])
Here is an example of two-stage aggregation:
| ============================================================
|ID|OPERATOR |NAME |EST. ROWS|COST |
------------------------------------------------------------
|0 |PX COORDINATOR | |101 |561383|
|1 | EXCHANGE OUT DISTR |:EX10001|101 |561374|
|2 | HASH GROUP BY | |101 |561374|
|3 | EXCHANGE IN DISTR | |101 |408805|
|4 | EXCHANGE OUT DISTR (HASH)|:EX10000|101 |408795|
|5 | HASH GROUP BY | |101 |408795|
|6 | PX PARTITION ITERATOR | |400000 |256226|
|7 | TABLE SCAN |t2 |400000 |256226|
============================================================
Outputs & filters:
-------------------------------------
0 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil)
1 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil), dop=1
2 - output([T_FUN_SUM(T_FUN_SUM(t2.v1))]), filter(nil),
group([t2.v2]), agg_func([T_FUN_SUM(T_FUN_SUM(t2.v1))])
3 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil)
4 - (#keys=1, [t2.v2]), output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil), dop=1
5 - output([t2.v2], [T_FUN_SUM(t2.v1)]), filter(nil),
group([t2.v2]), agg_func([T_FUN_SUM(t2.v1)])
6 - output([t2.v1], [t2.v2]), filter(nil)
7 - output([t2.v1], [t2.v2]), filter(nil),
access([t2.v1], [t2.v2]), partitions(p[0-3])
Binary input operators
Among all binary input operators, you need to be concerned with the JOIN operator. The JOIN operator is used to generate distributed execution plans and select the data redistribution methods based on rules. The following three JOIN modes are supported for the JOIN operator:
Partition-wise join
Partition-wise join is used if both the left and right tables are partitioned in the same way, they have the same physical distribution, and their partitioning keys are the
joinconditions. Here is an example:obclient> CREATE TABLE t3 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 4; Query OK, 0 rows affected obclient> EXPLAIN SELECT * FROM t2, t3 WHERE t2.v1 = t3.v1\G *************************** 1. row *************************** Query Plan: =========================================================== |ID|OPERATOR |NAME |EST. ROWS |COST | |0 |PX COORDINATOR | |1568160000|1227554264| |1 | EXCHANGE OUT DISTR |:EX10000|1568160000|930670004 | |2 | PX PARTITION ITERATOR| |1568160000|930670004 | |3 | MERGE JOIN | |1568160000|930670004 | |4 | TABLE SCAN |t2 |400000 |256226 | |5 | TABLE SCAN |t3 |400000 |256226 | =========================================================== Outputs & filters: ------------------------------------- 0 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil) 1 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil), dop=1 2 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil) 3 - output([t2.v1], [t2.v2], [t3.v1], [t3.v2]), filter(nil), equal_conds([t2.v1 = t3.v1]), other_conds(nil) 4 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3]) 5 - output([t3.v1], [t3.v2]), filter(nil), access([t3.v1], [t3.v2]), partitions(p[0-3])Partial partition-wise join
When only one of the left and right tables is partitioned, or they both are partitioned but the join key is the same as the partitioning key of one of the two tables. Then, the partition distribution of the table where the corresponding partitioning key belongs is taken as the basis to redistribute the data in the other table. Here is an example:
obclient> CREATE TABLE t4 (v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 3; Query OK, 0 rows affected obclient> EXPLAIN SELECT * FROM t4, t2 WHERE t2.v1 = t4.v1\G *************************** 1. row *************************** Query Plan: =========================================================== |ID|OPERATOR |NAME |EST. ROWS|COST | ----------------------------------------------------------- |0 |PX COORDINATOR | |11880 |17658| |1 | EXCHANGE OUT DISTR |:EX10001|11880 |15409| |2 | NESTED-LOOP JOIN | |11880 |15409| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (PKEY)|:EX10000|3 |37 | |5 | PX PARTITION ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | PX PARTITION ITERATOR | |3960 |2561 | |8 | TABLE SCAN |t2 |3960 |2561 | =========================================================== Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=1 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), conds(nil), nl_params_([t4.v1]) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - (#keys=1, [t4.v1]), output([t4.v1], [t4.v2]), filter(nil), dop=1 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3])Data redistribution
When the join key is unrelated to the partitioning keys of both tables, you can choose
BROADCASTorHASH HASHfor data redistribution based on rules. See the following example:Notice
The two data redistribution methods in the following example are available only when the DOP is greater than 1.
obclient>EXPLAIN SELECT /*+ PARALLEL(2)*/* FROM t4, t2 WHERE t2.v2 = t4.v2\G *************************** 1. row *************************** Query Plan: ================================================================= |ID|OPERATOR |NAME |EST. ROWS|COST | ----------------------------------------------------------------- |0 |PX COORDINATOR | |11880 |396863| |1 | EXCHANGE OUT DISTR |:EX10001|11880 |394614| |2 | HASH JOIN | |11880 |394614| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (BROADCAST)|:EX10000|3 |37 | |5 | PX BLOCK ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | PX PARTITION ITERATOR | |400000 |256226| |8 | TABLE SCAN |t2 |400000 |256226| ================================================================= Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), equal_conds([t2.v2 = t4.v2]), other_conds(nil) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - output([t4.v1], [t4.v2]), filter(nil), dop=2 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3]) obclient>EXPLAIN SELECT /*+ PQ_DISTRIBUTE(t2 HASH HASH) PARALLEL(2)*/* FROM t4, t2 WHERE t2.v2 = t4.v2\G; *************************** 1. row *************************** Query Plan: ============================================================ |ID|OPERATOR |NAME |EST. ROWS|COST | ------------------------------------------------------------ |0 |PX COORDINATOR | |11880 |434727| |1 | EXCHANGE OUT DISTR |:EX10002|11880 |432478| |2 | HASH JOIN | |11880 |432478| |3 | EXCHANGE IN DISTR | |3 |37 | |4 | EXCHANGE OUT DISTR (HASH)|:EX10000|3 |37 | |5 | PX BLOCK ITERATOR | |3 |37 | |6 | TABLE SCAN |t4 |3 |37 | |7 | EXCHANGE IN DISTR | |400000 |294090| |8 | EXCHANGE OUT DISTR (HASH)|:EX10001|400000 |256226| |9 | PX PARTITION ITERATOR | |400000 |256226| |10| TABLE SCAN |t2 |400000 |256226| ============================================================ Outputs & filters: ------------------------------------- 0 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil) 1 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), dop=2 2 - output([t4.v1], [t4.v2], [t2.v1], [t2.v2]), filter(nil), equal_conds([t2.v2 = t4.v2]), other_conds(nil) 3 - output([t4.v1], [t4.v2]), filter(nil) 4 - (#keys=1, [t4.v2]), output([t4.v1], [t4.v2]), filter(nil), dop=2 5 - output([t4.v1], [t4.v2]), filter(nil) 6 - output([t4.v1], [t4.v2]), filter(nil), access([t4.v1], [t4.v2]), partitions(p[0-2]) 7 - output([t2.v1], [t2.v2]), filter(nil) 8 - (#keys=1, [t2.v2]), output([t2.v1], [t2.v2]), filter(nil), dop=2 9 - output([t2.v1], [t2.v2]), filter(nil) 10 - output([t2.v1], [t2.v2]), filter(nil), access([t2.v1], [t2.v2]), partitions(p[0-3])