OceanBase Database supports the hash join algorithm, which can join two tables based on specific fields. However, when the tables involved, especially the probe table, contain a large amount of data, the performance of hash join is significantly affected. In such cases, hash join can leverage runtime filters (RF) to enhance efficiency.
Overview
Runtime filter is a technique used to optimize the performance of hash joins. It improves query efficiency by reducing the amount of data that needs to be probed during a hash join. For example, in scenarios involving multiple dimensional tables and fact tables joined in a star schema, runtime filters are an effective optimization technique.
A runtime filter is essentially a filter that leverages the build process of a hash join to create a lightweight filter. This filter is then broadcast to the large tables participating in the probe. The probe tables can use multiple runtime filters to filter data at the storage layer in advance, reducing the amount of data actually involved in the hash join and network transmission. This enhances query efficiency.
Runtime filters can be classified based on three criteria: whether they require cross-machine transmission, their data structures, and the entities being filtered. Based on cross-machine transmission, they are categorized into Local and Global filters. Based on data structures, they include Bloom filters, In filters, and Range filters. Both Local and Global filters can utilize these three data structures. Based on the entities being filtered, they are divided into filters that filter join keys and filters that filter partitions (Part Join Filters). The classification based on entities being filtered is orthogonal to the classification based on cross-machine transmission. Ordinary runtime filters support all three data structures, while Part Join Filters currently only support Bloom filters.
This topic will provide specific examples to explain Local Runtime Filters, Global Runtime Filters, and Part Join Filters.
Local runtime filter
The runtime filter of a local runtime filter does not require network transmission. The filter built by a local runtime filter is calculated only on the local node. Local runtime filters are typically used in scenarios where no shuffle occurs on the probe side of a hash join.
The following is an example of an execution plan for a local runtime filter.
obclient> CREATE TABLE tt1(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------+
| ============================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| -------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |7 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |6 | |
| |2 | └─PX PARTITION ITERATOR | |1 |6 | |
| |3 | └─HASH JOIN | |1 |6 | |
| |4 | ├─JOIN FILTER CREATE|:RF0000 |1 |3 | |
| |5 | │ └─TABLE FULL SCAN |tt2 |1 |3 | |
| |6 | └─JOIN FILTER USE |:RF0000 |1 |4 | |
| |7 | └─TABLE FULL SCAN |tt1 |1 |4 | |
| ============================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| dop=4 |
| 2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| partition wise, force partition granule |
| 3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| equal_conds ([tt1.v1 = tt2.v1]), other_conds(nil) |
| 4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1] |
| 5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| access([tt2.v1], [tt2.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
| 6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256 |
| 7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
| access([tt1.v1], [tt1.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false], |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set
In the preceding example, the 4th JOIN FILTER CREATE and 6th JOIN FILTER USE operators with a value of RF0000 in the NAME column are regular runtime filters. They build a local runtime filter in the plan, which does not require network transmission and is used only on the local node.
Global runtime filter
The runtime filter of a global runtime filter requires broadcasting to multiple execution nodes. In the plan, runtime filters can be pushed down to any position and used for filtering as needed. Compared with local runtime filters, global runtime filters not only reduce SQL-layer projection and computation overhead but also save network transmission costs, often achieving significant performance improvements.
The following is an example of an execution plan for a global runtime filter.
obclient> CREATE TABLE tt1 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt3 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN BASIC SELECT /*+ LEADING(a (b c)) PARALLEL(3) USE_HASH(b) USE_HASH(c) PQ_DISTRIBUTE(c HASH HASH) PX_JOIN_FILTER(c a) PX_JOIN_FILTER(c b) */ COUNT(*) FROM tt1 a, tt2 b, tt3 c WHERE a.c1_rand=b.c1_rand AND a.c2_rand = c.c2_rand AND b.c3_rand = c.c3_rand;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ======================================================== |
| |ID|OPERATOR |NAME | |
| -------------------------------------------------------- |
| |0 |SCALAR GROUP BY | | |
| |1 |└─PX COORDINATOR | | |
| |2 | └─EXCHANGE OUT DISTR |:EX10003| |
| |3 | └─MERGE GROUP BY | | |
| |4 | └─SHARED HASH JOIN | | |
| |5 | ├─JOIN FILTER CREATE |:RF0001 | |
| |6 | │ └─EXCHANGE IN DISTR | | |
| |7 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000| |
| |8 | │ └─PX BLOCK ITERATOR | | |
| |9 | │ └─TABLE FULL SCAN |a | |
| |10| └─HASH JOIN | | |
| |11| ├─JOIN FILTER CREATE |:RF0000 | |
| |12| │ └─EXCHANGE IN DISTR | | |
| |13| │ └─EXCHANGE OUT DISTR (HASH) |:EX10001| |
| |14| │ └─PX BLOCK ITERATOR | | |
| |15| │ └─TABLE FULL SCAN |b | |
| |16| └─EXCHANGE IN DISTR | | |
| |17| └─EXCHANGE OUT DISTR (HASH) |:EX10002| |
| |18| └─JOIN FILTER USE |:RF0000 | |
| |19| └─JOIN FILTER USE |:RF0001 | |
| |20| └─PX BLOCK ITERATOR | | |
| |21| └─TABLE FULL SCAN |c | |
| ======================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]) |
| 1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| 2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| dop=3 |
| 3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT(*)]) |
| 4 - output(nil), filter(nil), rowset=256 |
| equal_conds([a.c1_rand = b.c1_rand], [a.c2_rand = c.c2_rand]), other_conds(nil) |
| 5 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[a.c2_rand] |
| 6 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| 7 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| dop=3 |
| 8 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| 9 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| access([a.c1_rand], [a.c2_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([a.__pk_increment]), range(MIN ; MAX)always true |
| 10 - output([b.c1_rand], [c.c2_rand]), filter(nil), rowset=256 |
| equal_conds([b.c3_rand = c.c3_rand]), other_conds(nil) |
| 11 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[b.c3_rand] |
| 12 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| 13 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| (#keys=1, [b.c3_rand]), dop=3 |
| 14 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| 15 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| access([b.c1_rand], [b.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([b.__pk_increment]), range(MIN ; MAX)always true |
| 16 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 17 - output([c.c3_rand], [c.c2_rand), filter(nil), rowset=256 |
| (#keys=1, [c.c3_rand]), dop=3 |
| 18 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 19 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 20 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 21 - output([c.c3_rand], [c.c2_rand]), filter([RF_IN_FILTER(c.c3_rand)], [RF_RANGE_FILTER(c.c3_rand)], [RF_BLOOM_FILTER(c.c3_rand)], [RF_IN_FILTER(c.c2_rand)], |
| [RF_RANGE_FILTER(c.c2_rand)], [RF_BLOOM_FILTER(c.c2_rand)]), rowset=256 |
| access([c.c2_rand], [c.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false], |
| range_key([c.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set
In the preceding example, the 5th operator has a value of RF0001 in the NAME column, and the 11th operator has a value of RF000. The runtime filter is pushed down to the 21st TABLE FULL SCAN operator and filters data across multiple DFOs, which is a characteristic of global runtime filters.
Part join filter
In a hash join, the left side blocks data to build a hash table, and the right side matches data row by row. The left side builds a hash table using all the data on the left. If all the data on the left can be obtained, the distribution of partitions of the right table can be obtained. When scanning the data of the right table, unnecessary partitions can be filtered out based on the statistical partition distribution characteristics, improving performance. Part join filters are introduced to optimize this scenario. To calculate the specific partitions of the right table on the left side, the join key must include the partitioning key of the right table. This is a prerequisite for generating a part join filter.
The following is an example of an execution plan for a part join filter.
obclient> CREATE TABLE tt1(v1 INT);
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+-------------------------------------------------------------------------------+
| Query Plan |
+-------------------------------------------------------------------------------+
| ======================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |5 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | └─HASH JOIN | |1 |4 | |
| |3 | ├─PART JOIN FILTER CREATE |:RF0000 |1 |1 | |
| |4 | │ └─EXCHANGE IN DISTR | |1 |1 | |
| |5 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |1 | |
| |6 | │ └─PX BLOCK ITERATOR | |1 |1 | |
| |7 | │ └─TABLE FULL SCAN |tt1 |1 |1 | |
| |8 | └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1 |3 | |
| |9 | └─TABLE FULL SCAN |tt2 |1 |3 | |
| ======================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([tt1.v1], [tt2.v1]), filter(nil), rowset=256 |
| equal_conds([tt1.v1 = tt2.v1]), other_conds(nil) |
| 3 - output([tt1.v1]), filter(nil), rowset=256 |
| RF_TYPE(bloom), RF_EXPR[t1.v1] |
| 4 - output([tt1.v1]), filter(nil), rowset=256 |
| 5 - output([tt1.v1]), filter(nil), rowset=256 |
| (#keys=1, [t1.v1]), dop=3 |
| 6 - output([tt1.v1]), filter(nil), rowset=256 |
| 7 - output([tt1.v1]), filter(nil), rowset=256 |
| access([tt1.v1]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
| 8 - output([tt2.v1]), filter(nil), rowset=256 |
| affinitize |
| 9 - output([tt2.v1]), filter(nil), rowset=256 |
| access([tt2.v1]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------------------+
In the preceding example, the 3rd PART JOIN FILTER CREATE operator and the 8th PX PARTITION HASH JOIN-FILTER operator are part join filters. The 3rd operator corresponds to the part join filter and is used to filter the tt2 table at the partition level in the 8th operator.
Enable and disable runtime filter manually
Runtime filter is applicable only to hash join. When the join type is not hash join, the optimizer does not allocate runtime filter. Generally, when performing hash join, the optimizer automatically allocates runtime filter. If the optimizer does not allocate runtime filter, you can manually allocate it by using hints.
The PX_JOIN_FILTER and PX_PART_JOIN_FILTER hints are used to manually enable runtime filter. The SQL syntax is as follows:
/*+ PX_JOIN_FILTER(join_right_table_name)*/
/*+ PX_PART_JOIN_FILTER(join_right_table_name)*/
Here is an example:
EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
When the degree of parallelism is 1, runtime filter is not allocated. In this case, you must set the degree of parallelism to at least 2. You can use the PARALLEL hint to set the degree of parallelism. Here is an example:
/*+ PARALLEL(2) */
Note that when the hash join is not effective, using runtime filter cannot solve the problem and may even cause a slight performance drop. Therefore, before you manually enable runtime filter, you must carefully evaluate the query scenario to determine whether runtime filter applies.
The NO_PX_JOIN_FILTER and NO_PX_PART_JOIN_FILTER hints are used to manually disable runtime filter. The SQL syntax is as follows:
/*+ NO_PX_JOIN_FILTER(join_right_table_name)*/
/*+ NO_PX_PART_JOIN_FILTER(join_right_table_name)*/
Adjust the execution strategy for runtime filters
Runtime filters are adaptive. By default, when the selectivity of the join keys meets the conditions, three types of runtime filters (IN, RANGE, and BLOOM FILTER) are created. The IN filter uses a hash table for filtering, and the RANGE filter uses the maximum or minimum value for filtering.
The IN filter has the highest filtering accuracy. When the IN filter is enabled during execution, the other two filters are automatically disabled. The executor determines whether to use the IN filter based on the actual NDV value during execution. Additionally, each filter can be dynamically disabled and re-enabled based on real-time filtering performance.
OceanBase Database provides system variables to adjust the execution strategy for runtime filters, including runtime_filter_type, runtime_filter_max_in_num, runtime_filter_wait_time_ms, and runtime_bloom_filter_max_size.
The runtime_filter_type variable specifies the type of runtime filter to be enabled. The default value is 'BLOOM_FILTER,RANGE,IN', indicating that all three types of runtime filters are enabled. When runtime_filter_type='', no runtime filter is enabled. Example:
ALTER SYSTEM SET runtime_filter_type = 'BLOOM_FILTER,RANGE,IN'
Usually, you can use the default value to specify the runtime filter type, and there is no need to use the runtime_filter_type variable. OceanBase Database selects the optimal runtime filter type for filtering during optimization and execution.
The runtime_filter_max_in_num variable specifies the NDV of the IN filter. The default value is 1,024. The optimizer estimates the NDV of the build table. If NDV > runtime_filter_max_in_num, the IN filter is not allocated. Due to the inaccuracy of the optimizer's NDV estimation, there may be cases where the actual NDV is large, but the estimated NDV is small, leading to the misallocation of the IN filter. In such cases, the executor will dynamically disable the IN filter based on the actual NDV during execution. Generally, we recommend that you do not set this value too high, as the IN filter may not perform as well as the Bloom filter when the build table has a very high NDV.
The runtime_filter_wait_time_ms variable specifies the maximum wait time for the probe side to wait for the runtime filter to arrive. The default value is 10 ms. The probe side waits for the runtime filter to arrive before sending data. If the runtime filter has not arrived after runtime_filter_wait_time_ms milliseconds, it enters the bypass mode, where data is sent without filtering through the runtime filter. When the runtime filter arrives later, it is still enabled and executed for filtering. Generally, this value does not need to be adjusted. If the actual Bloom filter data is large and the filtering performance is good, you can appropriately increase this value.
The runtime_bloom_filter_max_size variable limits the maximum size of the Bloom filter, with a default value of 2,048 MB. If the build table contains an excessive amount of data, and the default maximum size of the Bloom filter cannot accommodate the data, you need to increase the value of runtime_bloom_filter_max_size.