The EXCHANGE operator is used for data exchange between threads. It is typically used in distributed scenarios and appears in pairs, with an OUT operator at the source and an IN operator at the destination.
EXCHANGE operator types
The EXCHANGE operators supported by OceanBase Database are mainly classified based on their data redistribution strategies. Common types include:
Operator type |
Feature Description |
Core Principles |
|---|---|---|
| EXCH-IN/OUT | A general data exchange operator used for aggregating or distributing data. | As a basic data transmission channel, it does not specify a particular distribution logic; the data flow is determined by upper-layer operations. |
| EXCH-IN/OUT (REMOTE) | Pulls data from a remote node to the local node. | Read data directly from a remote data source (such as the node where a single partition resides) and transmit it to the requesting node. |
| EXCH-IN/OUT (PKEY) | Redistribute data based on the partitioning key. | Data is recalculated and redistributed to the corresponding partition nodes based on the partitioning rules of the target table. This is commonly used to ensure the locality of related data. |
| EXCH-IN/OUT (HASH) | Data is repartitioned based on hash functions. | Applying a set of hash functions to specified columns and distributing rows with the same hash value to the same processing unit is fundamental for implementing parallel hash joins and aggregations. |
| EXCH-IN/OUT (BC2HOST) | Broadcasts data to all worker threads. | A complete copy of the input data is copied and sent to each worker thread (or node) participating in the query. |
EXCH-IN/OUT
EXCH-IN/OUT, also known as EXCHANGE IN/EXCHANGE OUT, is used to aggregate data from multiple partitions and send it to the primary node where the query resides.
EXCH-IN/OUT example
Create table
tbl1.obclient> CREATE TABLE tbl1(col1 INT, col2 INT) PARTITION BY HASH(col1) PARTITIONS 5;Q1: Query all rows in
tbl1and view its execution plan.obclient> EXPLAIN SELECT * FROM tbl1;The result is as follows:
+---------------------------------------------------------------------------------+ | Query Plan | +---------------------------------------------------------------------------------+ | ============================================================= | | |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| | | ------------------------------------------------------------- | | |0 |PX COORDINATOR | |1 |13 | | | |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |12 | | | |2 | └─PX PARTITION ITERATOR| |1 |11 | | | |3 | └─TABLE FULL SCAN |TBL1 |1 |11 | | | ============================================================= | | Outputs & filters: | | ------------------------------------- | | 0 - output([INTERNAL_FUNCTION(TBL1.COL1, TBL1.COL2)]), filter(nil), rowset=16 | | 1 - output([INTERNAL_FUNCTION(TBL1.COL1, TBL1.COL2)]), filter(nil), rowset=16 | | dop=1 | | 2 - output([TBL1.COL1], [TBL1.COL2]), filter(nil), rowset=16 | | force partition granule | | 3 - output([TBL1.COL1], [TBL1.COL2]), filter(nil), rowset=16 | | access([TBL1.COL1], [TBL1.COL2]), partitions(p[0-4]) | | is_index_back=false, is_global_index=false, | | range_key([TBL1.__pk_increment]), range(MIN ; MAX)always true | +---------------------------------------------------------------------------------+ 19 rows in set
In the preceding Q1 query, data from five partitions (p0, p1, p2, p3, and p4) was accessed. Operator 0 is represented as a PX COORDINATOR, which is a special type of EXCHANGE IN operator. In addition to pulling back remote data, it also schedules the execution of subplans. Operator 1 receives the output generated by Operator 2 and sends the data out; Operator 0 receives the outputs generated by Operator 1 across multiple partitions and aggregates them for output. In the Q1 query result, in addition to the PX COORDINATOR operator (Operator 0) and the EXCHANGE OUT DISTR operator, the following execution operators are also displayed:
PX PARTITION ITERATOR: This operator belongs to theGIoperators and is used to iterate data at the partition granularity. For more information, see GI.TABLE FULL SCAN: This operator belongs to theTABLE SCANoperators and is used to show which index (or primary table) the optimizer chose to access the data. For more information, see TABLE SCAN.
The Outputs & filters section in the execution plan display of Q1 lists the output information of the EXCH-IN/OUT operator in detail as follows:
Information Name |
Meaning |
Example |
|---|---|---|
| output | The list of columns or expressions output by the operator. | output([INTERNAL_FUNCTION(TBL1.COL1, TBL1.COL2)]) output([INTERNAL_FUNCTION(TBL1.COL1, TBL1.COL2)])Indicates that the data output by this operator (PX COORDINATOR) is internally encapsulated.TBL1.COL1andTBL1.COL2Column.INTERNAL_FUNCTIONis an internal encapsulation format identifier used for data transmission between operators in the parallel execution framework, indicating that data is converging from worker threads to coordinator threads. |
| filter | The operator requires a predicate to filter the application. | filter(nil)Indicates that no further rows need to be filtered. |
| rowset | Indicates the vectorization size of the current operator. | rowset=16Indicates that the vectorization size of the current operator is 16. |
EXCH-IN/OUT (REMOTE)
The EXCH-IN/OUT (REMOTE) operator is used to pull remote data (data from a single partition) back to the local node.
EXCH-IN/OUT (REMOTE) example
Create a non-partitioned table
tbl2on server A.obclient> CREATE TABLE tbl2(col1 INT, col2 INT);Q2: Query all rows in
tbl2on server B and view its execution plan.obclient> EXPLAIN SELECT * FROM tbl2;The result is as follows:
+--------------------------------------------------------------------+ | Query Plan | +--------------------------------------------------------------------+ | ===================================================== | | |ID|OPERATOR |NAME|EST.ROWS|EST.TIME(us)| | | ----------------------------------------------------- | | |0 |EXCHANGE IN REMOTE | |1 |5 | | | |1 |└─EXCHANGE OUT REMOTE| |1 |4 | | | |2 | └─TABLE FULL SCAN |TBL2|1 |3 | | | ===================================================== | | Outputs & filters: | | ------------------------------------- | | 0 - output([TBL2.COL1], [TBL2.COL2]), filter(nil) | | 1 - output([TBL2.COL1], [TBL2.COL2]), filter(nil) | | 2 - output([TBL2.COL1], [TBL2.COL2]), filter(nil), rowset=16 | | access([TBL2.COL1], [TBL2.COL2]), partitions(p0) | | is_index_back=false, is_global_index=false, | | range_key([TBL2.__pk_increment]), range(MIN ; MAX)always true| +--------------------------------------------------------------------+
In the preceding example, a non-partitioned table is created on server A, and a Q2 query is executed on server B to read data from this table. Since the data to be read is remote, the execution plan assigns Operator 0 and Operator 1 to fetch the remote data. Operator 1 executes on server A, reads data from the tbl2 table, and sends it out; Operator 0 executes on server B, receiving the output generated by Operator 1.
The Outputs & filters section in the execution plan display for Q2 lists the output information of the EXCH-IN/OUT (REMOTE) operator in detail. The meaning of the fields is the same as that of the EXCH-IN/OUT operator.
EXCH-IN/OUT (PKEY)
The EXCH-IN/OUT (PKEY) operator is used for data repartitioning. It is typically used in binary operators to repartition the data of one child node according to the partitioning method of the other child node.
EXCH-IN/OUT (PKEY) Example
Create table
tbl3.obclient> CREATE TABLE tbl3 (col1 INT, col2 INT) PARTITION BY HASH(col1) PARTITIONS 5;Create table
tbl4.obclient> CREATE TABLE tbl4 (col1 INT PRIMARY KEY, col2 INT) PARTITION BY HASH(col1) PARTITIONS 4;Q3: Query the rows where
tbl4.col1 = tbl3.col1in tablestbl3andtbl4, and view the execution plan of this query.obclient> EXPLAIN SELECT * FROM tbl4, tbl3 WHERE tbl4.col1 = tbl3.col1;The return result is as follows:
+-------------------------------------------------------------------------------------------------------+ | Query Plan | +-------------------------------------------------------------------------------------------------------+ | ===================================================================== | | |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| | | --------------------------------------------------------------------- | | |0 |PX COORDINATOR | |1 |38 | | | |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |37 | | | |2 | └─HASH JOIN | |1 |36 | | | |3 | ├─EXCHANGE IN DISTR | |1 |17 | | | |4 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |16 | | | |5 | │ └─PX PARTITION ITERATOR | |1 |16 | | | |6 | │ └─TABLE FULL SCAN |tbl4 |1 |16 | | | |7 | └─PX PARTITION ITERATOR | |1 |19 | | | |8 | └─TABLE FULL SCAN |tbl3 |1 |19 | | | ===================================================================== | | Outputs & filters: | | ------------------------------------- | | 0 - output([INTERNAL_FUNCTION(tbl4.col1, tbl4.col2, tbl3.col1, tbl3.col2)]), filter(nil), rowset=16 | | 1 - output([INTERNAL_FUNCTION(tbl4.col1, tbl4.col2, tbl3.col1, tbl3.col2)]), filter(nil), rowset=16 | | dop=1 | | 2 - output([tbl4.col1], [tbl3.col1], [tbl4.col2], [tbl3.col2]), filter(nil), rowset=16 | | equal_conds([tbl4.col1 = tbl3.col1]), other_conds(nil) | | 3 - output([tbl4.col1], [tbl4.col2]), filter(nil), rowset=16 | | 4 - output([tbl4.col1], [tbl4.col2]), filter(nil), rowset=16 | | (#keys=1, [tbl4.col1]), dop=1 | | 5 - output([tbl4.col1], [tbl4.col2]), filter(nil), rowset=16 | | force partition granule | | 6 - output([tbl4.col1], [tbl4.col2]), filter(nil), rowset=16 | | access([tbl4.col1], [tbl4.col2]), partitions(p[0-3]) | | is_index_back=false, is_global_index=false, | | range_key([tbl4.col1]), range(MIN ; MAX)always true | | 7 - output([tbl3.col1], [tbl3.col2]), filter(nil), rowset=16 | | affinitize, force partition granule | | 8 - output([tbl3.col1], [tbl3.col2]), filter(nil), rowset=16 | | access([tbl3.col1], [tbl3.col2]), partitions(p[0-4]) | | is_index_back=false, is_global_index=false, | | range_key([tbl3.__pk_increment]), range(MIN ; MAX)always true | +-------------------------------------------------------------------------------------------------------+
The Q3 query above performs a join on the data of two partitioned tables. The execution plan repartitions the data of table tbl4 according to the partitioning method of table tbl3. The input of Operator 4 is the result of scanning table tbl4. For each row in table tbl4, this operator determines which node a row of data should be sent to based on the data partitioning of table tbl3 and the join conditions of the query.
Additionally, you can see that Operator 0 is represented as a PX COORDINATOR. It is a special type of EXCHANGE IN operator that not only fetches remote data but also schedules the execution of subplans. Operator 3 is an EXCHANGE IN DISTR, a special EXCHANGE IN operator used for performing a certain degree of merge sort when summarizing data from multiple partitions. In this execution plan, each partition of data received by Operator 3 is already sorted in order of the col1 column. It merges and sorts each received data to ensure the final output is also ordered by col1. The following execution operators are also displayed:
PX PARTITION ITERATOR: This operator belongs to theGIoperator family and is used to iterate data at the partition granularity. For more information, see GI.TABLE FULL SCAN: This operator belongs to theTABLE SCANoperator family and is used to show which index (or primary table) the optimizer chooses to access data. For more information, see TABLE SCAN.
The Outputs & filters section in the execution plan display for Q3 lists the output information of the EXCHANGE OUT DISTR (PKEY) operator in detail as follows:
Information Name |
Meaning |
Example |
|---|---|---|
| output | The list of columns or expressions output by the operator. | output([tbl4.col1], [tbl4.col2])indicates that the query will return the table.tbl4in thecol1、col2Column. |
| filter | The operator requires a predicate to filter the application. | filter(nil)Indicates that no further rows need to be filtered. |
| rowset | Indicates the vectorization size of the current operator. | rowset=16Indicates that the vectorization size of the current operator is 16. |
| pkey | Specifies the column by which to repartition the data. | #keys=1, [tbl4.col1]indicates sorted bytbl4.col1Columnar repartitioning. |
EXCH-IN/OUT (HASH)
The EXCH-IN/OUT (HASH) operator is used to repartition data using a set of hash functions.
EXCH-IN/OUT (HASH) Example
Create table
tbl5.obclient> CREATE TABLE tbl5 (col1 INT, col2 INT) PARTITION BY HASH(col1) PARTITIONS 4;Create table
tbl6.obclient> CREATE TABLE tbl6 (col1 INT, col2 INT) PARTITION BY HASH(col1) PARTITIONS 4;Q4: Query the rows where
tbl5.col2 = tbl6.col2fromtbl5andtbl6, and view the execution plan for this query.obclient> EXPLAIN SELECT /*+PARALLEL(2) LEADING(tbl5 tbl6) USE_HASH(tbl6) PQ_DISTRIBUTE(tbl6 HASH HASH)*/ * FROM tbl5, tbl6 WHERE tbl5.col2 = tbl6.col2;The result is as follows:
+-------------------------------------------------------------------------------------------------------+ | Query Plan | +-------------------------------------------------------------------------------------------------------+ | ===================================================================== | | |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| | | --------------------------------------------------------------------- | | |0 |PX COORDINATOR | |1 |18 | | | |1 |└─EXCHANGE OUT DISTR |:EX10002|1 |17 | | | |2 | └─HASH JOIN | |1 |17 | | | |3 | ├─EXCHANGE IN DISTR | |1 |9 | | | |4 | │ └─EXCHANGE OUT DISTR (HASH)|:EX10000|1 |8 | | | |5 | │ └─PX BLOCK ITERATOR | |1 |8 | | | |6 | │ └─TABLE FULL SCAN |tbl5 |1 |8 | | | |7 | └─EXCHANGE IN DISTR | |1 |9 | | | |8 | └─EXCHANGE OUT DISTR (HASH)|:EX10001|1 |8 | | | |9 | └─PX BLOCK ITERATOR | |1 |8 | | | |10| └─TABLE FULL SCAN |tbl6 |1 |8 | | | ===================================================================== | | Outputs & filters: | | ------------------------------------- | | 0 - output([INTERNAL_FUNCTION(tbl5.col1, tbl5.col2, tbl6.col1, tbl6.col2)]), filter(nil), rowset=16 | | 1 - output([INTERNAL_FUNCTION(tbl5.col1, tbl5.col2, tbl6.col1, tbl6.col2)]), filter(nil), rowset=16 | | dop=2 | | 2 - output([tbl5.col2], [tbl6.col2], [tbl5.col1], [tbl6.col1]), filter(nil), rowset=16 | | equal_conds([tbl5.col2 = tbl6.col2]), other_conds(nil) | | 3 - output([tbl5.col2], [tbl5.col1]), filter(nil), rowset=16 | | 4 - output([tbl5.col2], [tbl5.col1]), filter(nil), rowset=16 | | (#keys=1, [tbl5.col2]), dop=2 | | 5 - output([tbl5.col1], [tbl5.col2]), filter(nil), rowset=16 | | 6 - output([tbl5.col1], [tbl5.col2]), filter(nil), rowset=16 | | access([tbl5.col1], [tbl5.col2]), partitions(p[0-3]) | | is_index_back=false, is_global_index=false, | | range_key([tbl5.__pk_increment]), range(MIN ; MAX)always true | | 7 - output([tbl6.col2], [tbl6.col1]), filter(nil), rowset=16 | | 8 - output([tbl6.col2], [tbl6.col1]), filter(nil), rowset=16 | | (#keys=1, [tbl6.col2]), dop=2 | | 9 - output([tbl6.col1], [tbl6.col2]), filter(nil), rowset=16 | | 10 - output([tbl6.col1], [tbl6.col2]), filter(nil), rowset=16 | | access([tbl6.col1], [tbl6.col2]), partitions(p[0-3]) | | is_index_back=false, is_global_index=false, | | range_key([tbl6.__pk_increment]), range(MIN ; MAX)always true | +-------------------------------------------------------------------------------------------------------+
In the execution plan shown above for query Q4, operators 3-5 and 7-8 are two groups of EXCHANGE operators using hash repartitioning. These two groups of operators scatter the data from tables tbl5 and tbl6 into multiple partitions based on a new hash function. In the example, the hash columns are tbl5.col2 and tbl6.col2, which ensures that rows with the same value in column c2 are distributed to the same partition. Based on the repartitioned data, operator 2 HASH JOIN joins each partition of data according to the condition tbl5.col2= tbl6.col2. Furthermore, since the query is executed with a degree of parallelism (DOP) of 2, the plan shows dop = 2.
Operator 0 PX COORDINATOR is a special type of EXCHANGE IN operator. In addition to pulling back data from remote nodes, it is also responsible for scheduling the execution of subplans. The following execution operators are also shown:
PX PARTITION ITERATOR: This operator belongs to theGIcategory and is used to iterate data at the partition granularity. For more information, see GI.TABLE FULL SCAN: This operator belongs to theTABLE SCANcategory and is used to show which index (or primary table) the optimizer chooses to access data. For more information, see TABLE SCAN.
The Outputs & filters section in the execution plan display for example Q4 lists the output information for the EXCHANGE OUT DISTR (HASH) operator in detail as follows:
Information Name |
Meaning |
Example |
|---|---|---|
| output | The list of columns or expressions output by the operator. | output([tbl6.col2], [tbl6.col1])indicates that the query will return the table.tbl6in thecol2、col1Column. |
| filter | The operator requires a predicate to filter the application. | filter(nil)Indicates that no further rows need to be filtered. |
| rowset | Indicates the vectorization size of the current operator. | rowset=16indicates that the vectorization size of the current operator is 16. |
| pkey | Specifies the column by which to repartition the data. | #keys=1, [tbl6.col2]indicates sorted bytbl6.col2Columnar repartitioning. |
EXCH-IN/OUT (BC2HOST)
The EXCH-IN/OUT (BC2HOST) operator is used to repartition input data using the BC2HOST method, which broadcasts the data to other threads.
EXCH-IN/OUT (BC2HOST) Example
Create table
tbl7.obclient> CREATE TABLE tbl7 (col1 INT, col2 INT) PARTITION BY HASH(col1) PARTITIONS 4;Create table
tbl8.obclient> CREATE TABLE tbl8 (col1 INT, col2 INT) PARTITION BY HASH(col1) PARTITIONS 4;Q5: Query the rows where
tbl7.col2 = tbl8.col2fromtbl7andtbl8, and view the execution plan for this query.obclient> EXPALIN SELECT /*+PARALLEL(2) */ * FROM tbl7, tbl8 WHERE tbl7.col2 = tbl8.col2;The result is as follows:
+-------------------------------------------------------------------------------------------------------+ | Query Plan | +-------------------------------------------------------------------------------------------------------+ | ======================================================================== | | |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| | | ------------------------------------------------------------------------ | | |0 |PX COORDINATOR | |1 |18 | | | |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |17 | | | |2 | └─SHARED HASH JOIN | |1 |17 | | | |3 | ├─EXCHANGE IN DISTR | |1 |9 | | | |4 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000|1 |8 | | | |5 | │ └─PX BLOCK ITERATOR | |1 |8 | | | |6 | │ └─TABLE FULL SCAN |tbl7 |1 |8 | | | |7 | └─PX BLOCK ITERATOR | |4 |8 | | | |8 | └─TABLE FULL SCAN |tbl8 |4 |8 | | | ======================================================================== | | Outputs & filters: | | ------------------------------------- | | 0 - output([INTERNAL_FUNCTION(tbl7.col1, tbl7.col2, tbl8.col1, tbl8.col2)]), filter(nil), rowset=16 | | 1 - output([INTERNAL_FUNCTION(tbl7.col1, tbl7.col2, tbl8.col1, tbl8.col2)]), filter(nil), rowset=16 | | dop=2 | | 2 - output([tbl7.col2], [tbl8.col2], [tbl7.col1], [tbl8.col1]), filter(nil), rowset=16 | | equal_conds([tbl7.col2 = tbl8.col2]), other_conds(nil) | | 3 - output([tbl7.col2], [tbl7.col1]), filter(nil), rowset=16 | | 4 - output([tbl7.col2], [tbl7.col1]), filter(nil), rowset=16 | | dop=2 | | 5 - output([tbl7.col1], [tbl7.col2]), filter(nil), rowset=16 | | 6 - output([tbl7.col1], [tbl7.col2]), filter(nil), rowset=16 | | access([tbl7.col1], [tbl7.col2]), partitions(p[0-3]) | | is_index_back=false, is_global_index=false, | | range_key([tbl7.__pk_increment]), range(MIN ; MAX)always true | | 7 - output([tbl8.col1], [tbl8.col2]), filter(nil), rowset=16 | | 8 - output([tbl8.col1], [tbl8.col2]), filter(nil), rowset=16 | | access([tbl8.col1], [tbl8.col2]), partitions(p[0-3]) | | is_index_back=false, is_global_index=false, | | range_key([tbl8.__pk_increment]), range(MIN ; MAX)always true | +-------------------------------------------------------------------------------------------------------+
In the execution plan shown above for query Q5, operators 3-4 are a group of EXCHANGE operators using the BC2HOST repartitioning method. It broadcasts the data from table tbl7 to each thread, and the data from each partition of table tbl8 attempts to join with the broadcasted data from table tbl7.
The Outputs & filters section in the execution plan display for example Q5 lists the information for the EXCH-IN/OUT (BC2HOST) operator in detail. The meaning of the fields is the same as that of the EXCH-IN/OUT operator.
