OceanBase Database supports the hash join algorithm, which can join two tables based on specific fields. However, when the tables involved in the join, especially the probe table, contain a large amount of data, the performance of the hash join can be significantly affected. In such cases, the hash join can utilize a runtime filter (RF) to improve efficiency.
Introduction
Runtime filter is a technique used to optimize the performance of hash joins. It improves query efficiency by reducing the amount of data that needs to be probed during a hash join. For example, in scenarios involving joining multiple dimensional tables and fact tables in a star schema, runtime filters are an effective optimization method.
A runtime filter is essentially a filter that leverages the build process of a hash join to create a lightweight filter. This filter is then broadcast to large tables participating in the probe. The probe table can use multiple runtime filters to filter data at the storage layer in advance, reducing the amount of data that actually participates in the hash join and the network traffic. This enhances query efficiency.
Runtime filters can be classified based on three criteria: whether they require cross-machine transmission, the data structures they use, and the entities they filter. Based on cross-machine transmission, they can be local or global. Based on data structures, they can be Bloom filters, in filters, or range filters. Both local and global runtime filters can use any of these three data structures. Based on the entities they filter, they can be filters for join keys or filters for partitions. The first type is a regular runtime filter, and the second is a part join filter. The classification based on the entities they filter is orthogonal to the classification based on cross-machine transmission. Regular runtime filters support all three data structures, while part join filters currently only support Bloom filters.
This topic will provide specific examples to introduce local runtime filters, global runtime filters, and part join filters.
Local Runtime Filter
Runtime filters for local runtime filters do not require network transmission. The filters are computed only on the local node. Local runtime filters are typically used in scenarios where there is no shuffle on the probe side of a hash join.
The following is an example of an execution plan for a local runtime filter.
obclient> CREATE TABLE tt1(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------+
| ============================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| -------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |7 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |6 | |
| |2 | └─PX PARTITION ITERATOR | |1 |6 | |
| |3 | └─HASH JOIN | |1 |6 | |
| |4 | ├─JOIN FILTER CREATE|:RF0000 |1 |3 | |
| |5 | │ └─TABLE FULL SCAN |tt2 |1 |3 | |
| |6 | └─JOIN FILTER USE |:RF0000 |1 |4 | |
| |7 | └─TABLE FULL SCAN |tt1 |1 |4 | |
| ============================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| dop=4 |
| 2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| partition wise, force partition granule |
| 3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| equal_conds ([tt1.v1 = tt2.v1]), other_conds(nil) |
| 4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1] |
| 5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| access([tt2.v1], [tt2.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
| 6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256 |
| 7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
| access([tt1.v1], [tt1.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false], |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set
In this example, the 4th JOIN FILTER CREATE and 6th JOIN FILTER USE operators, which have a value of RF0000 in the NAME field, are regular runtime filters. They build a local runtime filter in this plan, which does not require network transmission and is used only on the local node.
Global Runtime Filter
Runtime filters for global runtime filters need to be broadcast to multiple execution nodes. In the execution plan, runtime filters (RFs) can be pushed down to any position in the plan as needed. Compared with local runtime filters, global runtime filters not only reduce the projection and computation overhead at the SQL layer but also reduce network transmission overhead. This often results in significant performance improvements.
The following is an example of an execution plan for a global runtime filter.
obclient> CREATE TABLE tt1 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt3 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN BASIC SELECT /*+ LEADING(a (b c)) PARALLEL(3) USE_HASH(b) USE_HASH(c) PQ_DISTRIBUTE(c HASH HASH) PX_JOIN_FILTER(c a) PX_JOIN_FILTER(c b) */ COUNT(*) FROM tt1 a, tt2 b, tt3 c WHERE a.c1_rand=b.c1_rand AND a.c2_rand = c.c2_rand AND b.c3_rand = c.c3_rand;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ======================================================== |
| |ID|OPERATOR |NAME | |
| -------------------------------------------------------- |
| |0 |SCALAR GROUP BY | | |
| |1 |└─PX COORDINATOR | | |
| |2 | └─EXCHANGE OUT DISTR |:EX10003| |
| |3 | └─MERGE GROUP BY | | |
| |4 | └─SHARED HASH JOIN | | |
| |5 | ├─JOIN FILTER CREATE |:RF0001 | |
| |6 | │ └─EXCHANGE IN DISTR | | |
| |7 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000| |
| |8 | │ └─PX BLOCK ITERATOR | | |
| |9 | │ └─TABLE FULL SCAN |a | |
| |10| └─HASH JOIN | | |
| |11| ├─JOIN FILTER CREATE |:RF0000 | |
| |12| │ └─EXCHANGE IN DISTR | | |
| |13| │ └─EXCHANGE OUT DISTR (HASH) |:EX10001| |
| |14| │ └─PX BLOCK ITERATOR | | |
| |15| │ └─TABLE FULL SCAN |b | |
| |16| └─EXCHANGE IN DISTR | | |
| |17| └─EXCHANGE OUT DISTR (HASH) |:EX10002| |
| |18| └─JOIN FILTER USE |:RF0000 | |
| |19| └─JOIN FILTER USE |:RF0001 | |
| |20| └─PX BLOCK ITERATOR | | |
| |21| └─TABLE FULL SCAN |c | |
| ======================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]) |
| 1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| 2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| dop=3 |
| 3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT(*)]) |
| 4 - output(nil), filter(nil), rowset=256 |
| equal_conds([a.c1_rand = b.c1_rand], [a.c2_rand = c.c2_rand]), other_conds(nil) |
| 5 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[a.c2_rand] |
| 6 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| 7 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| dop=3 |
| 8 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| 9 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| access([a.c1_rand], [a.c2_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([a.__pk_increment]), range(MIN ; MAX)always true |
| 10 - output([b.c1_rand], [c.c2_rand]), filter(nil), rowset=256 |
| equal_conds([b.c3_rand = c.c3_rand]), other_conds(nil) |
| 11 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[b.c3_rand] |
| 12 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| 13 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| (#keys=1, [b.c3_rand]), dop=3 |
| 14 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| 15 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| access([b.c1_rand], [b.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([b.__pk_increment]), range(MIN ; MAX)always true |
| 16 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 17 - output([c.c3_rand], [c.c2_rand), filter(nil), rowset=256 |
| (#keys=1, [c.c3_rand]), dop=3 |
| 18 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 19 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 20 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 21 - output([c.c3_rand], [c.c2_rand]), filter([RF_IN_FILTER(c.c3_rand)], [RF_RANGE_FILTER(c.c3_rand)], [RF_BLOOM_FILTER(c.c3_rand)], [RF_IN_FILTER(c.c2_rand)], |
| [RF_RANGE_FILTER(c.c2_rand)], [RF_BLOOM_FILTER(c.c2_rand)]), rowset=256 |
| access([c.c2_rand], [c.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false], |
| range_key([c.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set
In this example, the 5th operator has a value of RF0001 in the NAME field, and the 11th operator has a value of RF000. The runtime filter is pushed down to the 21st TABLE FULL SCAN operator and filters data across multiple DFOs, which is a characteristic of global runtime filters.
Part Join Filter
In a hash join, the left side blocks data and builds a hash table, while the right side matches data row by row. The left side builds a hash table using all the data on the left side. If all the data on the left side can be obtained, the distribution of partitions of the right side table can be obtained. When scanning the right side table, unnecessary partitions can be filtered out based on the statistics of partition distribution, improving performance. Part join filters are introduced to optimize this scenario. To calculate the specific partitions of the right side table on the left side, the join key must include the partitioning key of the right side table. This is a prerequisite for generating part join filters.
The following is an example of an execution plan for a part join filter.
obclient> CREATE TABLE tt1(v1 INT);
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+-------------------------------------------------------------------------------+
| Query Plan |
+-------------------------------------------------------------------------------+
| ======================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |5 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | └─HASH JOIN | |1 |4 | |
| |3 | ├─PART JOIN FILTER CREATE |:RF0000 |1 |1 | |
| |4 | │ └─EXCHANGE IN DISTR | |1 |1 | |
| |5 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |1 | |
| |6 | │ └─PX BLOCK ITERATOR | |1 |1 | |
| |7 | │ └─TABLE FULL SCAN |tt1 |1 |1 | |
| |8 | └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1 |3 | |
| |9 | └─TABLE FULL SCAN |tt2 |1 |3 | |
| ======================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([tt1.v1], [tt2.v1]), filter(nil), rowset=256 |
| equal_conds([tt1.v1 = tt2.v1]), other_conds(nil) |
| 3 - output([tt1.v1]), filter(nil), rowset=256 |
| RF_TYPE(bloom), RF_EXPR[t1.v1] |
| 4 - output([tt1.v1]), filter(nil), rowset=256 |
| 5 - output([tt1.v1]), filter(nil), rowset=256 |
| (#keys=1, [t1.v1]), dop=3 |
| 6 - output([tt1.v1]), filter(nil), rowset=256 |
| 7 - output([tt1.v1]), filter(nil), rowset=256 |
| access([tt1.v1]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
| 8 - output([tt2.v1]), filter(nil), rowset=256 |
| affinitize |
| 9 - output([tt2.v1]), filter(nil), rowset=256 |
| access([tt2.v1]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------------------+
In this example, the 3rd PART JOIN FILTER CREATE operator and the 8th PX PARTITION HASH JOIN-FILTER operator are part join filters. The 3rd operator, which corresponds to the part join filter, is applied to the tt2 table in the 8th operator for partition-level filtering.
Enable and disable runtime filter manually
Runtime filter is applicable only to hash join. When the join type is not hash join, the optimizer will not allocate runtime filter. Generally, when performing a hash join, the optimizer automatically allocates runtime filter. If the optimizer does not allocate runtime filter, you can manually specify it using hints.
The PX_JOIN_FILTER and PX_PART_JOIN_FILTER hints are used to manually enable runtime filter. The SQL syntax is as follows:
/*+ PX_JOIN_FILTER(join_right_table_name)*/
/*+ PX_PART_JOIN_FILTER(join_right_table_name)*/
Here is an example:
EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
When the degree of parallelism is 1, no runtime filter is allocated. In this case, you must set the degree of parallelism to at least 2. You can use the PARALLEL hint to do so. Here is an example:
/*+ PARALLEL(2) */
Note that when a hash join does not provide sufficient filtering, using a runtime filter may not resolve the issue and could instead result in a slight performance degradation. Therefore, when considering manually enabling runtime filter, it is essential to thoroughly evaluate the query scenario to determine if runtime filter is appropriate.
The NO_PX_JOIN_FILTER and NO_PX_PART_JOIN_FILTER hints are used to manually disable runtime filter. The SQL syntax is as follows:
/*+ NO_PX_JOIN_FILTER(join_right_table_name)*/
/*+ NO_PX_PART_JOIN_FILTER(join_right_table_name)*/
Adjust the execution strategy for runtime filters
Runtime filters have adaptability. By default, when the selectivity of the join keys meets the conditions, three types of runtime filters (IN, RANGE, and BLOOM) are created. The IN filter uses a hash table for filtering, and the RANGE filter uses the maximum or minimum value for filtering.
In terms of filter priority, the IN filter has the most accurate filtering. When the IN filter is enabled during execution, the other two filters are automatically disabled. The executor determines whether to use the IN filter based on the actual NDV value during execution. Additionally, each filter can adaptively disable itself and re-enable based on real-time filtering performance.
OceanBase Database provides system variables to adjust the execution strategies for runtime filters, including runtime_filter_type, runtime_filter_max_in_num, runtime_filter_wait_time_ms, and runtime_bloom_filter_max_size.
The runtime_filter_type variable specifies the type of the enabled runtime filter. The default value is 'BLOOM_FILTER,RANGE,IN', indicating that all three types of runtime filters are enabled. If runtime_filter_type='', no runtime filters are enabled. Here is an example:
ALTER SYSTEM SET runtime_filter_type = 'BLOOM_FILTER,RANGE,IN'
Generally, you do not need to use the runtime_filter_type variable because OceanBase Database selects the optimal runtime filter type during optimization and execution.
The runtime_filter_max_in_num variable specifies the NDV of the IN filter. The default value is 1024. The optimizer estimates the NDV of the build table. If NDV > runtime_filter_max_in_num, the IN filter is not allocated. Since the optimizer's estimation of NDV may not be accurate, there may be cases where the actual NDV is high, but the estimated value is low, leading to the incorrect allocation of the IN filter. In such cases, the executor will adaptively disable the IN filter based on the actual NDV during execution. Generally, we recommend that you do not set this value too high, as the IN filter may not perform as well as the Bloom filter when the NDV of the build table is very high.
The runtime_filter_wait_time_ms variable specifies the maximum wait time for the probe side to wait for the runtime filter to arrive, with a default value of 10 ms. The probe side waits for the runtime filter to arrive before sending data. If the runtime filter has not arrived after runtime_filter_wait_time_ms milliseconds, it enters the bypass mode, where it bypasses the runtime filter and starts sending data. When the runtime filter arrives later, it is still enabled and executed. Generally, this value does not need to be adjusted. If the actual Bloom filter data is very large and the filtering performance is good, you can appropriately increase this value.
The runtime_bloom_filter_max_size variable limits the maximum size of the Bloom filter, with a default value of 2048 MB. If the build table contains a large amount of data, and the default maximum size of the Bloom filter cannot accommodate the data, you need to increase the value of runtime_bloom_filter_max_size.