OceanBase Database supports the hash join algorithm, which can join two tables based on certain fields. However, when the tables involved in the join, especially the probe table, contain a large amount of data, the performance of the hash join is greatly affected. In this case, the hash join can use runtime filters (RF) to improve efficiency.
How runtime filter works
Runtime filter is a technique used to optimize the performance of hash joins. It improves query efficiency by reducing the amount of data that needs to be probed during hash joins. For example, in scenarios involving joining multiple dimensional tables and fact tables in a star schema, runtime filters are an effective optimization method.
A runtime filter is essentially a filter that leverages the build process of a hash join to create a lightweight filter. This filter is then broadcast to large tables participating in the probe. The probe table can use multiple runtime filters to filter data at the storage layer in advance, reducing the amount of data that actually participates in the hash join and network transmission. This enhances query efficiency.
Runtime filters can be classified based on three criteria: whether they require cross-machine transmission, their data structures, and the entities they filter. Based on cross-machine transmission, they can be local or global. Based on data structures, they can be Bloom filters, in filters, or range filters. Both local and global runtime filters can use any of these three data structures. Based on the entities being filtered, they can be filters for join keys or filters for partitions. The first type is a regular runtime filter, and the second is a part join filter. The classification based on the entities being filtered is orthogonal to the classification based on cross-machine transmission. Regular runtime filters support all three data structures, while part join filters currently only support Bloom filters.
This topic provides specific examples to explain local runtime filters, global runtime filters, and part join filters.
Local runtime filter
Runtime filters of the local runtime filter type do not require network transmission. The filters built by local runtime filters are computed only on local nodes. Local runtime filters are typically used in scenarios where there is no shuffle on the probe side of a hash join.
The following is an example of an execution plan for a local runtime filter.
obclient> CREATE TABLE tt1(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------+
| ============================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| -------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |7 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |6 | |
| |2 | └─PX PARTITION ITERATOR | |1 |6 | |
| |3 | └─HASH JOIN | |1 |6 | |
| |4 | ├─JOIN FILTER CREATE|:RF0000 |1 |3 | |
| |5 | │ └─TABLE FULL SCAN |tt2 |1 |3 | |
| |6 | └─JOIN FILTER USE |:RF0000 |1 |4 | |
| |7 | └─TABLE FULL SCAN |tt1 |1 |4 | |
| ============================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| dop=4 |
| 2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| partition wise, force partition granule |
| 3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| equal_conds ([tt1.v1 = tt2.v1]), other_conds(nil) |
| 4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1] |
| 5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| access([tt2.v1], [tt2.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
| 6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256 |
| 7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
| access([tt1.v1], [tt1.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false], |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set
In the preceding example, the 4th JOIN FILTER CREATE and 6th JOIN FILTER USE operators, whose NAME field values are RF0000, are regular runtime filters. They build a local runtime filter in the plan. This filter does not require network transmission and is used only on the local node.
Global runtime filter
Runtime filters of the global runtime filter type require broadcasting to multiple execution nodes. In the plan, runtime filters (RFs) can be pushed down to any position in the plan as needed to perform filtering. Compared with local runtime filters, global runtime filters not only reduce the SQL layer projection and computation overhead but also save network transmission overhead. This often results in significant performance improvements.
The following is an example of an execution plan for a global runtime filter.
obclient> CREATE TABLE tt1 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt3 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN BASIC SELECT /*+ LEADING(a (b c)) PARALLEL(3) USE_HASH(b) USE_HASH(c) PQ_DISTRIBUTE(c HASH HASH) PX_JOIN_FILTER(c a) PX_JOIN_FILTER(c b) */ COUNT(*) FROM tt1 a, tt2 b, tt3 c WHERE a.c1_rand=b.c1_rand AND a.c2_rand = c.c2_rand AND b.c3_rand = c.c3_rand;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ======================================================== |
| |ID|OPERATOR |NAME | |
| -------------------------------------------------------- |
| |0 |SCALAR GROUP BY | | |
| |1 |└─PX COORDINATOR | | |
| |2 | └─EXCHANGE OUT DISTR |:EX10003| |
| |3 | └─MERGE GROUP BY | | |
| |4 | └─SHARED HASH JOIN | | |
| |5 | ├─JOIN FILTER CREATE |:RF0001 | |
| |6 | │ └─EXCHANGE IN DISTR | | |
| |7 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000| |
| |8 | │ └─PX BLOCK ITERATOR | | |
| |9 | │ └─TABLE FULL SCAN |a | |
| |10| └─HASH JOIN | | |
| |11| ├─JOIN FILTER CREATE |:RF0000 | |
| |12| │ └─EXCHANGE IN DISTR | | |
| |13| │ └─EXCHANGE OUT DISTR (HASH) |:EX10001| |
| |14| │ └─PX BLOCK ITERATOR | | |
| |15| │ └─TABLE FULL SCAN |b | |
| |16| └─EXCHANGE IN DISTR | | |
| |17| └─EXCHANGE OUT DISTR (HASH) |:EX10002| |
| |18| └─JOIN FILTER USE |:RF0000 | |
| |19| └─JOIN FILTER USE |:RF0001 | |
| |20| └─PX BLOCK ITERATOR | | |
| |21| └─TABLE FULL SCAN |c | |
| ======================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]) |
| 1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| 2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| dop=3 |
| 3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT(*)]) |
| 4 - output(nil), filter(nil), rowset=256 |
| equal_conds([a.c1_rand = b.c1_rand], [a.c2_rand = c.c2_rand]), other_conds(nil) |
| 5 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[a.c2_rand] |
| 6 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| 7 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| dop=3 |
| 8 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| 9 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| access([a.c1_rand], [a.c2_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([a.__pk_increment]), range(MIN ; MAX)always true |
| 10 - output([b.c1_rand], [c.c2_rand]), filter(nil), rowset=256 |
| equal_conds([b.c3_rand = c.c3_rand]), other_conds(nil) |
| 11 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[b.c3_rand] |
| 12 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| 13 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| (#keys=1, [b.c3_rand]), dop=3 |
| 14 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| 15 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| access([b.c1_rand], [b.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([b.__pk_increment]), range(MIN ; MAX)always true |
| 16 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 17 - output([c.c3_rand], [c.c2_rand), filter(nil), rowset=256 |
| (#keys=1, [c.c3_rand]), dop=3 |
| 18 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 19 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 20 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 21 - output([c.c3_rand], [c.c2_rand]), filter([RF_IN_FILTER(c.c3_rand)], [RF_RANGE_FILTER(c.c3_rand)], [RF_BLOOM_FILTER(c.c3_rand)], [RF_IN_FILTER(c.c2_rand)], |
| [RF_RANGE_FILTER(c.c2_rand)], [RF_BLOOM_FILTER(c.c2_rand)]), rowset=256 |
| access([c.c2_rand], [c.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false], |
| range_key([c.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set
In the preceding example, Operator 5 creates a runtime filter denoted by RF0001 in the NAME field, and Operator 11 creates a runtime filter denoted by RF0000. The two runtime filters are pushed down and used at Operator 21 TABLE FULL SCAN. Since the runtime filters are transmitted over multiple data flow operators (DFOs), they are global runtime filters.
Part join filter
During a hash join, the left side blocks data to build a hash table, and the right side matches data row by row. The left side builds a hash table using all the data on the left side. If all the data on the left side is available, the system can obtain the partition distribution characteristics of the right side table. This way, unnecessary partitions can be filtered out when the right side table is scanned, improving performance. Part join filters are introduced to optimize this scenario. To calculate the specific partitions of the right side table on the left side, the join key must include the partitioning key of the right side table. This is a prerequisite for generating a part join filter.
The following is an example of an execution plan for a part join filter.
obclient> CREATE TABLE tt1(v1 INT);
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+-------------------------------------------------------------------------------+
| Query Plan |
+-------------------------------------------------------------------------------+
| ======================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |5 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | └─HASH JOIN | |1 |4 | |
| |3 | ├─PART JOIN FILTER CREATE |:RF0000 |1 |1 | |
| |4 | │ └─EXCHANGE IN DISTR | |1 |1 | |
| |5 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |1 | |
| |6 | │ └─PX BLOCK ITERATOR | |1 |1 | |
| |7 | │ └─TABLE FULL SCAN |tt1 |1 |1 | |
| |8 | └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1 |3 | |
| |9 | └─TABLE FULL SCAN |tt2 |1 |3 | |
| ======================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([tt1.v1], [tt2.v1]), filter(nil), rowset=256 |
| equal_conds([tt1.v1 = tt2.v1]), other_conds(nil) |
| 3 - output([tt1.v1]), filter(nil), rowset=256 |
| RF_TYPE(bloom), RF_EXPR[t1.v1] |
| 4 - output([tt1.v1]), filter(nil), rowset=256 |
| 5 - output([tt1.v1]), filter(nil), rowset=256 |
| (#keys=1, [t1.v1]), dop=3 |
| 6 - output([tt1.v1]), filter(nil), rowset=256 |
| 7 - output([tt1.v1]), filter(nil), rowset=256 |
| access([tt1.v1]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
| 8 - output([tt2.v1]), filter(nil), rowset=256 |
| affinitize |
| 9 - output([tt2.v1]), filter(nil), rowset=256 |
| access([tt2.v1]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------------------+
In the preceding example, the 3rd PART JOIN FILTER CREATE operator and the 8th PX PARTITION HASH JOIN-FILTER operator are part join filters. The 3rd operator, which corresponds to the part join filter, is used to filter the tt2 table at the partition level on the 8th operator.
Enable and disable runtime filter manually
Runtime filter is only applicable to hash join. If the join type is not hash join, the optimizer will not allocate a runtime filter. Generally, when the optimizer performs a hash join, it automatically allocates a runtime filter. If the optimizer does not allocate a runtime filter, you can manually allocate one by using a hint.
The PX_JOIN_FILTER and PX_PART_JOIN_FILTER hints are used to manually enable runtime filter. The SQL syntax is as follows:
/*+ PX_JOIN_FILTER(join_right_table_name)*/
/*+ PX_PART_JOIN_FILTER(join_right_table_name)*/
Here is an example:
code-placeholder-dcdef118-a9a3-4c51-8569-829cca19ad51
When the degree of parallelism is 1, a runtime filter is not allocated. In this case, you must specify a degree of parallelism of at least 2. You can use the PARALLEL hint to specify the degree of parallelism. Here is an example:
/*+ PARALLEL(2) */
Note that if a hash join has poor filterability, using a runtime filter cannot solve the problem and may even cause a slight performance degradation. Therefore, before you manually enable runtime filter, you must carefully evaluate the query scenario to determine whether runtime filter applies.
The NO_PX_JOIN_FILTER and NO_PX_PART_JOIN_FILTER hints are used to manually disable runtime filter. The SQL syntax is as follows:
/*+ NO_PX_JOIN_FILTER(join_right_table_name)*/
/*+ NO_PX_PART_JOIN_FILTER(join_right_table_name)*/
Adjust the execution strategy for runtime filters
Runtime filters are adaptive. By default, when the selectivity of the join key meets the conditions, three types of runtime filters, namely In Filter, Range Filter, and Bloom Filter, are created. The In Filter uses a hash table for filtering, and the Range Filter uses the maximum or minimum value for filtering.
In terms of filtering priority, the In Filter has the most accurate filtering capability. When the In Filter is enabled during execution, the other two filters are automatically disabled. The executor determines whether to use the In Filter based on the actual NDV value. Additionally, each filter can be dynamically disabled and re-enabled based on real-time filtering performance.
OceanBase Database provides system variables to adjust the execution strategies for runtime filters, including runtime_filter_type, runtime_filter_max_in_num, runtime_filter_wait_time_ms, and runtime_bloom_filter_max_size.
The runtime_filter_type variable specifies the type of runtime filter to be enabled. The default value is 'BLOOM_FILTER,RANGE,IN', indicating that all three types of runtime filters are enabled. When runtime_filter_type='', no runtime filter is enabled. Example:
ALTER SYSTEM SET runtime_filter_type = 'BLOOM_FILTER,RANGE,IN'
In most cases, the default value is sufficient to specify the runtime filter type, and there is no need to use the runtime_filter_type variable. OceanBase Database selects the optimal runtime filter type for filtering during optimization and execution.
The runtime_filter_max_in_num variable specifies the NDV for the In Filter. The default value is 1,024. The optimizer estimates the NDV of the build table. If NDV > runtime_filter_max_in_num, the In Filter is not allocated. Due to the potential inaccuracies in NDV estimation by the optimizer, there may be cases where the actual NDV is high, but the estimated NDV is low, leading to the allocation of the In Filter. In such cases, the executor will adaptively disable the In Filter during execution based on the actual NDV. Generally, it is not recommended to set this value too high, as the In Filter's optimization effect is inferior to the Bloom Filter when the build table has a high NDV.
The runtime_filter_wait_time_ms variable specifies the maximum wait time for the probe side to wait for the runtime filter to arrive, with a default value of 10 ms. The probe side waits for the runtime filter to arrive before sending data. If the runtime filter has not arrived after runtime_filter_wait_time_ms milliseconds, it enters the ByPass stage, where data is sent without filtering through the runtime filter. When the runtime filter arrives later, it is still enabled and executed for filtering. Generally, this value does not need to be adjusted. If the actual Bloom Filter data is large and the filtering performance is good, this value can be appropriately increased.
The runtime_bloom_filter_max_size variable limits the maximum size of the Bloom Filter, with a default value of 2,048 MB. If the build table contains an excessive amount of data, and the default maximum size of the Bloom Filter cannot accommodate the data, the value of runtime_bloom_filter_max_size needs to be increased.