OceanBase Database supports the hash join algorithm, which can join two tables based on specific fields. However, when the tables involved in the join, especially the probe table, contain a large amount of data, the performance of hash join can be significantly affected. In such cases, hash join can leverage runtime filters (RF) to enhance efficiency.
Introduction
Runtime filter is a technique for optimizing the performance of hash joins. It improves query efficiency by reducing the amount of data that needs to be probed during a hash join. For example, in scenarios involving star schema joins between multiple dimension tables and fact tables, runtime filters are a highly effective optimization method.
In essence, a runtime filter is a filter that leverages the build process of a hash join to create a lightweight filter. This filter is then broadcast to large tables involved in the probe. The probe table can use multiple runtime filters to filter data at the storage layer in advance. This reduces the amount of data actually participating in the hash join and the network transmission, thereby enhancing query efficiency.
Runtime filters can be classified based on three criteria: whether they require cross-machine transmission, their data structures, and the entities they filter. Based on the need for cross-machine transmission, runtime filters are categorized into local and global types. Based on their data structures, they are classified into Bloom filters, In filters, and Range filters. Both local and global runtime filters can utilize these three data structures. Based on the entities they filter, runtime filters are divided into those that filter join keys and those that filter partitions. The latter is known as a Part Join Filter. The classification based on the entities being filtered is orthogonal to the classification based on cross-machine transmission requirements. While regular runtime filters support all three data structures, Part Join Filters currently only support Bloom filters.
This section will provide a detailed introduction to local runtime filters, global runtime filters, and Part Join Filters through specific examples.
Local Runtime Filter
The runtime filter for a local runtime filter does not require network transmission. The filter is computed only on the local node. Local runtime filters are typically used in scenarios where there is no shuffle on the probe side of a hash join.
The following is an example of an execution plan for a local runtime filter.
obclient> CREATE TABLE tt1(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------+
| ============================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| -------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |7 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |6 | |
| |2 | └─PX PARTITION ITERATOR | |1 |6 | |
| |3 | └─HASH JOIN | |1 |6 | |
| |4 | ├─JOIN FILTER CREATE|:RF0000 |1 |3 | |
| |5 | │ └─TABLE FULL SCAN |tt2 |1 |3 | |
| |6 | └─JOIN FILTER USE |:RF0000 |1 |4 | |
| |7 | └─TABLE FULL SCAN |tt1 |1 |4 | |
| ============================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| dop=4 |
| 2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| partition wise, force partition granule |
| 3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| equal_conds ([tt1.v1 = tt2.v1]), other_conds(nil) |
| 4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1] |
| 5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| access([tt2.v1], [tt2.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
| 6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256 |
| 7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
| access([tt1.v1], [tt1.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false], |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set
In this example, the JOIN FILTER CREATE operator with the NAME field value of RF0000 and the JOIN FILTER USE operator with the NAME field value of RF0000 are regular runtime filters. They build a local runtime filter in this plan, which does not require network transmission and is used only on the local node.
Global Runtime Filter
The runtime filter for a global runtime filter needs to be broadcast to multiple execution nodes. In the execution plan, the filter can be pushed down to any location in the plan as needed. Compared with local runtime filters, global runtime filters not only reduce the projection and computational overhead at the SQL layer but also save on network transmission. This results in significant performance improvements.
The following is an example of an execution plan for a global runtime filter.
obclient> CREATE TABLE tt1 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt3 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN BASIC SELECT /*+ LEADING(a (b c)) PARALLEL(3) USE_HASH(b) USE_HASH(c) PQ_DISTRIBUTE(c HASH HASH) PX_JOIN_FILTER(c a) PX_JOIN_FILTER(c b) */ COUNT(*) FROM tt1 a, tt2 b, tt3 c WHERE a.c1_rand=b.c1_rand AND a.c2_rand = c.c2_rand AND b.c3_rand = c.c3_rand;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ======================================================== |
| |ID|OPERATOR |NAME | |
| -------------------------------------------------------- |
| |0 |SCALAR GROUP BY | | |
| |1 |└─PX COORDINATOR | | |
| |2 | └─EXCHANGE OUT DISTR |:EX10003| |
| |3 | └─MERGE GROUP BY | | |
| |4 | └─SHARED HASH JOIN | | |
| |5 | ├─JOIN FILTER CREATE |:RF0001 | |
| |6 | │ └─EXCHANGE IN DISTR | | |
| |7 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000| |
| |8 | │ └─PX BLOCK ITERATOR | | |
| |9 | │ └─TABLE FULL SCAN |a | |
| |10| └─HASH JOIN | | |
| |11| ├─JOIN FILTER CREATE |:RF0000 | |
| |12| │ └─EXCHANGE IN DISTR | | |
| |13| │ └─EXCHANGE OUT DISTR (HASH) |:EX10001| |
| |14| │ └─PX BLOCK ITERATOR | | |
| |15| │ └─TABLE FULL SCAN |b | |
| |16| └─EXCHANGE IN DISTR | | |
| |17| └─EXCHANGE OUT DISTR (HASH) |:EX10002| |
| |18| └─JOIN FILTER USE |:RF0000 | |
| |19| └─JOIN FILTER USE |:RF0001 | |
| |20| └─PX BLOCK ITERATOR | | |
| |21| └─TABLE FULL SCAN |c | |
| ======================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]) |
| 1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| 2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| dop=3 |
| 3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT(*)]) |
| 4 - output(nil), filter(nil), rowset=256 |
| equal_conds([a.c1_rand = b.c1_rand], [a.c2_rand = c.c2_rand]), other_conds(nil) |
| 5 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[a.c2_rand] |
| 6 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| 7 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| dop=3 |
| 8 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| 9 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| access([a.c1_rand], [a.c2_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([a.__pk_increment]), range(MIN ; MAX)always true |
| 10 - output([b.c1_rand], [c.c2_rand]), filter(nil), rowset=256 |
| equal_conds([b.c3_rand = c.c3_rand]), other_conds(nil) |
| 11 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[b.c3_rand] |
| 12 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| 13 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| (#keys=1, [b.c3_rand]), dop=3 |
| 14 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| 15 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| access([b.c1_rand], [b.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([b.__pk_increment]), range(MIN ; MAX)always true |
| 16 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 17 - output([c.c3_rand], [c.c2_rand), filter(nil), rowset=256 |
| (#keys=1, [c.c3_rand]), dop=3 |
| 18 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 19 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 20 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 21 - output([c.c3_rand], [c.c2_rand]), filter([RF_IN_FILTER(c.c3_rand)], [RF_RANGE_FILTER(c.c3_rand)], [RF_BLOOM_FILTER(c.c3_rand)], [RF_IN_FILTER(c.c2_rand)], |
| [RF_RANGE_FILTER(c.c2_rand)], [RF_BLOOM_FILTER(c.c2_rand)]), rowset=256 |
| access([c.c2_rand], [c.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false], |
| range_key([c.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set
In this example, the NAME field value of the 5th operator corresponds to RF0001, and the NAME field value of the 11th operator corresponds to RF000. The filter is pushed down to the TABLE FULL SCAN operator at position 21 and filters data across multiple DFOs, which is a characteristic of global runtime filters.
Part Join Filter
In a hash join, the left side blocks data and builds a hash table, while the right side matches data row by row. The left side builds a hash table that contains all the data on the left side. If all the data on the left side can be obtained, the distribution of partitions of a table on the right side can be obtained. During the scan of the table on the right side, unnecessary partitions can be filtered out based on the statistics of the partition distribution, improving performance. Part join filters are introduced to optimize this scenario. To calculate the specific partitions of a table on the right side for a hash join, the join key must include the partitioning key of the table on the right side. This is a prerequisite for generating a part join filter.
The following is an example of an execution plan for a part join filter.
obclient> CREATE TABLE tt1(v1 INT);
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+-------------------------------------------------------------------------------+
| Query Plan |
+-------------------------------------------------------------------------------+
| ======================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |5 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | └─HASH JOIN | |1 |4 | |
| |3 | ├─PART JOIN FILTER CREATE |:RF0000 |1 |1 | |
| |4 | │ └─EXCHANGE IN DISTR | |1 |1 | |
| |5 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |1 | |
| |6 | │ └─PX BLOCK ITERATOR | |1 |1 | |
| |7 | │ └─TABLE FULL SCAN |tt1 |1 |1 | |
| |8 | └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1 |3 | |
| |9 | └─TABLE FULL SCAN |tt2 |1 |3 | |
| ======================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([tt1.v1], [tt2.v1]), filter(nil), rowset=256 |
| equal_conds([tt1.v1 = tt2.v1]), other_conds(nil) |
| 3 - output([tt1.v1]), filter(nil), rowset=256 |
| RF_TYPE(bloom), RF_EXPR[t1.v1] |
| 4 - output([tt1.v1]), filter(nil), rowset=256 |
| 5 - output([tt1.v1]), filter(nil), rowset=256 |
| (#keys=1, [t1.v1]), dop=3 |
| 6 - output([tt1.v1]), filter(nil), rowset=256 |
| 7 - output([tt1.v1]), filter(nil), rowset=256 |
| access([tt1.v1]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
| 8 - output([tt2.v1]), filter(nil), rowset=256 |
| affinitize |
| 9 - output([tt2.v1]), filter(nil), rowset=256 |
| access([tt2.v1]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------------------+
In this example, the 3rd PART JOIN FILTER CREATE operator and the 8th PX PARTITION HASH JOIN-FILTER operator are part join filters. The 3rd operator corresponds to the part join filter and is used to filter the tt2 table at the partition level in the 8th operator.
Manually enable and disable runtime filter
Runtime filter is applicable only to hash join. When the join type is not hash join, the optimizer will not allocate a runtime filter. Generally, when the optimizer performs a hash join, it automatically allocates a runtime filter. If the optimizer does not allocate a runtime filter, you can manually specify a hint to allocate one.
The PX_JOIN_FILTER and PX_PART_JOIN_FILTER hints are used to manually enable runtime filter. The SQL syntax is as follows:
/*+ PX_JOIN_FILTER(join_right_table_name)*/
/*+ PX_PART_JOIN_FILTER(join_right_table_name)*/
Here is an example:
EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
If the degree of parallelism is 1, a runtime filter is not allocated. In this case, you must set the degree of parallelism to at least 2. You can use the PARALLEL hint to specify the degree of parallelism. Here is an example:
/*+ PARALLEL(2) */
Note that if the hash join is not effective, using a runtime filter cannot solve the problem and may even cause a slight drop in performance. Therefore, before you manually enable runtime filter, you must evaluate the query scenario to determine whether runtime filter applies.
The NO_PX_JOIN_FILTER and NO_PX_PART_JOIN_FILTER hints are used to manually disable runtime filter. The SQL syntax is as follows:
/*+ NO_PX_JOIN_FILTER(join_right_table_name)*/
/*+ NO_PX_PART_JOIN_FILTER(join_right_table_name)*/
Adjust the execution strategy for runtime filters
Runtime filters have adaptability. By default, when the selectivity of the join keys meets the conditions, three types of runtime filters, namely IN filter, RANGE filter, and Bloom filter, are created. The IN filter uses a hash table for filtering, and the RANGE filter uses the maximum or minimum value for filtering.
In terms of filter priority, the IN filter has the highest accuracy. When the IN filter is enabled during execution, the other two filters are automatically disabled. The executor determines whether to use the IN filter based on the actual NDV value during execution. Additionally, each filter adapts to disable or re-enable itself based on real-time filtering performance.
OceanBase Database provides system variables to adjust the execution strategies for runtime filters, including runtime_filter_type, runtime_filter_max_in_num, runtime_filter_wait_time_ms, and runtime_bloom_filter_max_size.
The runtime_filter_type variable specifies the type of runtime filter to be enabled. By default, it is set to 'BLOOM_FILTER,RANGE,IN', indicating that all three types of runtime filters are enabled. If runtime_filter_type='', no runtime filters are enabled. Here is an example:
ALTER SYSTEM SET runtime_filter_type = 'BLOOM_FILTER,RANGE,IN'
Generally, the default value is sufficient, and you do not need to use the runtime_filter_type variable. OceanBase Database selects the optimal runtime filter type for filtering during optimization and execution.
The runtime_filter_max_in_num variable specifies the NDV of the IN filter, with a default value of 1,024. The optimizer estimates the NDV of the build table. If NDV > runtime_filter_max_in_num, the IN filter is not allocated. Due to potential inaccuracies in NDV estimation, there may be cases where the actual NDV is large, but the estimated NDV is small, leading to the misallocation of the IN filter. In such cases, the executor will adaptively disable the IN filter during execution based on the actual NDV. Generally, it is not recommended to set this value too high, as the IN filter's optimization effect is inferior to the Bloom filter when the build table has a very high NDV.
The runtime_filter_wait_time_ms variable specifies the maximum wait time for the probe side to wait for the runtime filter to arrive, with a default value of 10 ms. The probe side waits for the runtime filter to arrive before sending data. If the runtime filter has not arrived within runtime_filter_wait_time_ms milliseconds, it enters the bypass stage, where it directly sends data without filtering through the runtime filter. When the runtime filter arrives later, it will still be enabled and executed. Generally, this value does not need to be adjusted. If a large amount of data is used by the Bloom filter and the filtering performance is good, this value can be appropriately increased.
The runtime_bloom_filter_max_size variable limits the maximum size of the Bloom filter, with a default value of 2,048 MB. If the build table contains an excessive amount of data, and the default maximum size of the Bloom filter cannot accommodate all the data, the value of runtime_bloom_filter_max_size needs to be increased.