OceanBase Database supports the hash join algorithm, which can connect two tables based on specific fields. However, when the tables involved in the join, especially the probe table, contain a large amount of data, the performance of hash join can be significantly affected. In such cases, hash join can leverage runtime filters (RF) to enhance efficiency.
Introduction
Runtime filter is a technique used to optimize the performance of hash joins by reducing the amount of data that needs to be probed during the hash join process. For example, in scenarios involving multiple dimensional tables and fact tables in a star schema, runtime filters can be an effective optimization strategy.
A runtime filter is essentially a filter that leverages the build process of a hash join to create a lightweight filter. This filter is then broadcast to the large tables participating in the probe. The probe table can use multiple runtime filters to filter data at the storage layer in advance, reducing the amount of data that actually participates in the hash join and the network transmission, thereby improving query efficiency.
Runtime filters can be classified based on three levels: whether they require cross-machine transmission, based on their data structures, and based on the entities being filtered. Specifically, based on the need for cross-machine transmission, runtime filters can be either local or global. Based on their data structures, they can be Bloom filters, In filters, or range filters. Both local and global runtime filters can utilize these three data structures. Based on the entities being filtered, runtime filters can be categorized into filter join keys and filter partitions. The first category is the standard runtime filter, and the second is the part join filter. The classification based on the entities being filtered is orthogonal to the classification based on cross-machine transmission requirements. Standard runtime filters support all three data structures, while part join filters currently only support Bloom filters.
This topic provides specific examples to introduce local runtime filters, global runtime filters, and part join filters.
Local Runtime Filter
Runtime filters of Local Runtime Filter do not require network transmission, and the filters built only need to be computed on the local node. Local Runtime Filter is typically used in scenarios where there is no shuffle on the probe side of a hash join.
The following is an example of an execution plan for a Local Runtime Filter.
obclient> CREATE TABLE tt1(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------+
| ============================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| -------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |7 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |6 | |
| |2 | └─PX PARTITION ITERATOR | |1 |6 | |
| |3 | └─HASH JOIN | |1 |6 | |
| |4 | ├─JOIN FILTER CREATE|:RF0000 |1 |3 | |
| |5 | │ └─TABLE FULL SCAN |tt2 |1 |3 | |
| |6 | └─JOIN FILTER USE |:RF0000 |1 |4 | |
| |7 | └─TABLE FULL SCAN |tt1 |1 |4 | |
| ============================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| dop=4 |
| 2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| partition wise, force partition granule |
| 3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| equal_conds ([tt1.v1 = tt2.v1]), other_conds(nil) |
| 4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1] |
| 5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| access([tt2.v1], [tt2.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
| 6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256 |
| 7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
| access([tt1.v1], [tt1.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false], |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set
In the preceding example, the 4th JOIN FILTER CREATE and 6th JOIN FILTER USE operators, whose NAME field values are RF0000, are regular runtime filters. They build a local runtime filter in the plan, which does not require network transmission and is used only on the local node.
Global Runtime Filter
Runtime filters of Global Runtime Filter require broadcasting to multiple execution nodes. In the plan, runtime filters can be pushed down to any position to perform filtering as needed. Compared with Local Runtime Filter, Global Runtime Filter not only reduces the projection and calculation overhead at the SQL layer but also saves network transmission costs, often providing significant performance improvements.
The following is an example of an execution plan for a Global Runtime Filter.
obclient> CREATE TABLE tt1 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt3 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN BASIC SELECT /*+ LEADING(a (b c)) PARALLEL(3) USE_HASH(b) USE_HASH(c) PQ_DISTRIBUTE(c HASH HASH) PX_JOIN_FILTER(c a) PX_JOIN_FILTER(c b) */ COUNT(*) FROM tt1 a, tt2 b, tt3 c WHERE a.c1_rand=b.c1_rand AND a.c2_rand = c.c2_rand AND b.c3_rand = c.c3_rand;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ======================================================== |
| |ID|OPERATOR |NAME | |
| -------------------------------------------------------- |
| |0 |SCALAR GROUP BY | | |
| |1 |└─PX COORDINATOR | | |
| |2 | └─EXCHANGE OUT DISTR |:EX10003| |
| |3 | └─MERGE GROUP BY | | |
| |4 | └─SHARED HASH JOIN | | |
| |5 | ├─JOIN FILTER CREATE |:RF0001 | |
| |6 | │ └─EXCHANGE IN DISTR | | |
| |7 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000| |
| |8 | │ └─PX BLOCK ITERATOR | | |
| |9 | │ └─TABLE FULL SCAN |a | |
| |10| └─HASH JOIN | | |
| |11| ├─JOIN FILTER CREATE |:RF0000 | |
| |12| │ └─EXCHANGE IN DISTR | | |
| |13| │ └─EXCHANGE OUT DISTR (HASH) |:EX10001| |
| |14| │ └─PX BLOCK ITERATOR | | |
| |15| │ └─TABLE FULL SCAN |b | |
| |16| └─EXCHANGE IN DISTR | | |
| |17| └─EXCHANGE OUT DISTR (HASH) |:EX10002| |
| |18| └─JOIN FILTER USE |:RF0000 | |
| |19| └─JOIN FILTER USE |:RF0001 | |
| |20| └─PX BLOCK ITERATOR | | |
| |21| └─TABLE FULL SCAN |c | |
| ======================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]) |
| 1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| 2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| dop=3 |
| 3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT(*)]) |
| 4 - output(nil), filter(nil), rowset=256 |
| equal_conds([a.c1_rand = b.c1_rand], [a.c2_rand = c.c2_rand]), other_conds(nil) |
| 5 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[a.c2_rand] |
| 6 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| 7 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| dop=3 |
| 8 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| 9 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| access([a.c1_rand], [a.c2_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([a.__pk_increment]), range(MIN ; MAX)always true |
| 10 - output([b.c1_rand], [c.c2_rand]), filter(nil), rowset=256 |
| equal_conds([b.c3_rand = c.c3_rand]), other_conds(nil) |
| 11 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[b.c3_rand] |
| 12 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| 13 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| (#keys=1, [b.c3_rand]), dop=3 |
| 14 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| 15 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| access([b.c1_rand], [b.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([b.__pk_increment]), range(MIN ; MAX)always true |
| 16 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 17 - output([c.c3_rand], [c.c2_rand), filter(nil), rowset=256 |
| (#keys=1, [c.c3_rand]), dop=3 |
| 18 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 19 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 20 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 21 - output([c.c3_rand], [c.c2_rand]), filter([RF_IN_FILTER(c.c3_rand)], [RF_RANGE_FILTER(c.c3_rand)], [RF_BLOOM_FILTER(c.c3_rand)], [RF_IN_FILTER(c.c2_rand)], |
| [RF_RANGE_FILTER(c.c2_rand)], [RF_BLOOM_FILTER(c.c2_rand)]), rowset=256 |
| access([c.c2_rand], [c.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false], |
| range_key([c.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set
In the preceding example, the 5th operator has a NAME field value of RF0001, and the 11th operator has a NAME field value of RF000. The runtime filter is pushed down to the 21st TABLE FULL SCAN operator, which filters data across multiple DFOs, demonstrating the characteristics of a Global Runtime Filter.
Part Join Filter
In a hash join, the left side blocks data and builds a hash table, while the right side matches data row by row. The left side builds a hash table using all its data. If the left side can obtain the partition distribution characteristics of the right side's table, unnecessary partitions can be filtered out during the right side's table scan, improving performance. Part Join Filter is introduced to optimize this scenario. To generate a Part Join Filter, the join key must include the partitioning key of the right-side table, as this is a prerequisite.
The following is an example of an execution plan for a Part Join Filter.
obclient> CREATE TABLE tt1(v1 INT);
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+-------------------------------------------------------------------------------+
| Query Plan |
+-------------------------------------------------------------------------------+
| ======================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |5 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | └─HASH JOIN | |1 |4 | |
| |3 | ├─PART JOIN FILTER CREATE |:RF0000 |1 |1 | |
| |4 | │ └─EXCHANGE IN DISTR | |1 |1 | |
| |5 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |1 | |
| |6 | │ └─PX BLOCK ITERATOR | |1 |1 | |
| |7 | │ └─TABLE FULL SCAN |tt1 |1 |1 | |
| |8 | └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1 |3 | |
| |9 | └─TABLE FULL SCAN |tt2 |1 |3 | |
| ======================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([tt1.v1], [tt2.v1]), filter(nil), rowset=256 |
| equal_conds([tt1.v1 = tt2.v1]), other_conds(nil) |
| 3 - output([tt1.v1]), filter(nil), rowset=256 |
| RF_TYPE(bloom), RF_EXPR[t1.v1] |
| 4 - output([tt1.v1]), filter(nil), rowset=256 |
| 5 - output([tt1.v1]), filter(nil), rowset=256 |
| (#keys=1, [t1.v1]), dop=3 |
| 6 - output([tt1.v1]), filter(nil), rowset=256 |
| 7 - output([tt1.v1]), filter(nil), rowset=256 |
| access([tt1.v1]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
| 8 - output([tt2.v1]), filter(nil), rowset=256 |
| affinitize |
| 9 - output([tt2.v1]), filter(nil), rowset=256 |
| access([tt2.v1]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------------------+
In the preceding example, the 3rd PART JOIN FILTER CREATE operator and the 8th PX PARTITION HASH JOIN-FILTER operator are Part Join Filters. The 3rd operator, which corresponds to the Part Join Filter, filters the tt2 table at the partition level and is applied to the 8th operator.
Enable and disable runtime filter manually
Runtime filter is applicable only to hash join. If the join type is not hash join, the optimizer will not allocate a runtime filter. Generally, when performing a hash join, the optimizer automatically allocates a runtime filter. If the optimizer does not allocate a runtime filter, you can manually specify a runtime filter by using a hint.
The PX_JOIN_FILTER and PX_PART_JOIN_FILTER hints are used to manually enable runtime filter. The SQL syntax is as follows:
/*+ PX_JOIN_FILTER(join_right_table_name)*/
/*+ PX_PART_JOIN_FILTER(join_right_table_name)*/
Here is an example:
EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
If the degree of parallelism is 1, a runtime filter is not allocated. In this case, you must specify a degree of parallelism of at least 2. You can use the PARALLEL hint to specify the degree of parallelism. Here is an example:
/*+ PARALLEL(2) */
Note that if the hash join is not effective, using a runtime filter cannot solve the problem and may even cause a slight performance degradation. Therefore, before you manually enable runtime filter, you must evaluate the query scenario to determine whether runtime filter is applicable.
The NO_PX_JOIN_FILTER and NO_PX_PART_JOIN_FILTER hints are used to manually disable runtime filter. The SQL syntax is as follows:
/*+ NO_PX_JOIN_FILTER(join_right_table_name)*/
/*+ NO_PX_PART_JOIN_FILTER(join_right_table_name)*/
Adjust the execution strategy of runtime filters
Runtime filters have adaptability. By default, when the selectivity of the join key meets the conditions, three types of runtime filters, namely IN filter, RANGE filter, and Bloom filter, are created. The IN filter uses a hash table for filtering, and the RANGE filter uses the maximum or minimum value for filtering.
In terms of filter priority, the IN filter has the most accurate filtering capability. When the IN filter is enabled during execution, the other two filters are automatically disabled. The executor determines whether to use the IN filter based on the actual NDV value during execution. Additionally, each filter can adaptively disable and re-enable itself based on real-time filtering performance.
OceanBase Database provides system variables to adjust the execution strategy of runtime filters, including runtime_filter_type, runtime_filter_max_in_num, runtime_filter_wait_time_ms, and runtime_bloom_filter_max_size.
The runtime_filter_type variable specifies the type of the enabled runtime filter. The default value is 'BLOOM_FILTER,RANGE,IN', indicating that all three types of runtime filters are enabled. When runtime_filter_type='', no runtime filter is enabled. Here is an example:
ALTER SYSTEM SET runtime_filter_type = 'BLOOM_FILTER,RANGE,IN'
Generally, you can use the default value to specify the runtime filter type, without the need to use the runtime_filter_type variable. OceanBase Database selects the optimal runtime filter type for filtering during optimization and execution.
The runtime_filter_max_in_num variable specifies the NDV of the IN filter. The default value is 1,024. The optimizer estimates the NDV of the build table. If NDV > runtime_filter_max_in_num, the IN filter is not allocated. However, the estimated NDV may not be accurate, leading to potential misallocation of the IN filter when the actual NDV is high but the estimated NDV is low. In such cases, the executor will adaptively disable the IN filter during execution based on the actual NDV. Generally, we recommend that you do not set this value excessively high, as the IN filter's optimization effect is inferior to the Bloom filter when the build table has a high NDV.
The runtime_filter_wait_time_ms variable specifies the maximum wait time for the probe side to wait for the runtime filter to arrive. The default value is 10 ms. The probe side waits for the runtime filter to arrive before sending data. If the runtime filter has not arrived within the time specified by runtime_filter_wait_time_ms, it enters the bypass mode, where it directly sends data without filtering through the runtime filter. When the runtime filter arrives later, it is still enabled and executed for filtering. Generally, this value does not need to be adjusted. If the actual Bloom filter data is large and the filtering performance is good, you can appropriately increase this value.
The runtime_bloom_filter_max_size variable limits the maximum size of the Bloom filter. The default value is 2,048 MB. If the build table contains an excessive amount of data, and the default maximum size of the Bloom filter cannot accommodate all the data, you need to increase the value of runtime_bloom_filter_max_size.