OceanBase Database supports hash joins that join two tables based on equality match of certain fields. However, if the data volume of the tables, especially the probe table, involved in a hash join is large, the performance of the join is deteriorated. In this case, you can use runtime filters to improve the efficiency.
Principle
Runtime filters optimize the performance of a hash join by reducing the amount of data to be probed. For example, in scenarios where multiple dimension tables and fact tables connected in a star schema are joined, using runtime filters is an effective way to optimize the join operation.
Lightweight runtime filters are created based on the build process of a hash join and then broadcast to the probe table. The probe table can use multiple types of runtime filters to filter data at the storage layer in advance. This reduces the amount of data that actually participates in the hash join and network transmission, thus improving the query efficiency.
Runtime filters can be classified in three aspects. First, they can be classified into local runtime filters and global runtime filters based on whether cross-server transmission is required. Second, they can be classified into Bloom filters, In filters, and Range filters based on the data structure. Both local runtime filters and global runtime filters support all these data structures. Third, runtime filters can be classified into common runtime filters and part join filters based on the filtered entity. Common runtime filters filter join keys, while part join filters filter partitions. Both common runtime filters and part join filters can be local or global. A common runtime filter supports any of the three data structures, while a part join filter supports only the Bloom filter.
This topic provides examples of local runtime filters, global runtime filters, and part join filters.
Local runtime filters
A local runtime filter does not need to be transmitted over the network. The filter only needs to calculate the filter conditions on the local node. A local runtime filter is commonly used for scenarios where data in the probe table of a hash join is not shuffled.
The following example shows an execution plan that uses a local runtime filter.
obclient> CREATE TABLE tt1(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT, v2 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------+
| ============================================================== |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| -------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |7 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10000|1 |6 | |
| |2 | └─PX PARTITION ITERATOR | |1 |6 | |
| |3 | └─HASH JOIN | |1 |6 | |
| |4 | ├─JOIN FILTER CREATE|:RF0000 |1 |3 | |
| |5 | │ └─TABLE FULL SCAN |tt2 |1 |3 | |
| |6 | └─JOIN FILTER USE |:RF0000 |1 |4 | |
| |7 | └─TABLE FULL SCAN |tt1 |1 |4 | |
| ============================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt1.v2, tt2.v1, tt2.v2)]), filter(nil), rowset=256 |
| dop=4 |
| 2 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| partition wise, force partition granule |
| 3 - output([tt1.v1], [tt2.v1], [tt2.v2], [tt1.v2]), filter(nil), rowset=256 |
| equal_conds ([tt1.v1 = tt2.v1]), other_conds(nil) |
| 4 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[tt2.v1] |
| 5 - output([tt2.v1], [tt2.v2]), filter(nil), rowset=256 |
| access([tt2.v1], [tt2.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
| 6 - output([tt1.v1], [tt1.v2]), filter(nil), rowset=256 |
| 7 - output([tt1.v1], [tt1.v2]), filter([RF_IN_FILTER(tt1.v1)], [RF_RANGE_FILTER(tt1.v1)], [RF_BLOOM_FILTER(tt1.v1)]), rowset=256 |
| access([tt1.v1], [tt1.v2]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false], |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------+
32 rows in set
In the preceding example, Operator 4 JOIN FILTER CREATE creates a runtime filter denoted by RF0000 in the NAME field, and Operator 6 JOIN FILTER USE uses the runtime filter. Since the runtime filter is directly used on the local node without being transmitted over the network, it is a local runtime filter.
Global runtime filters
A global runtime filter needs to be broadcast to multiple execution nodes and can be pushed down to any position in the shape of the execution plan for data filtering. Compared with a local runtime filter, a global runtime filter consumes more overhead in projection and computing at the SQL layer, as well as in network transmission, and therefore usually delivers sound performance improvement.
The following example shows an execution plan that uses a global runtime filter.
obclient> CREATE TABLE tt1 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt2 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> CREATE TABLE tt3 (c1_rand INT, c2_rand INT, c3_rand INT, c4_rand INT, c5_rand INT) PARTITION BY HASH(c5_rand) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN BASIC SELECT /*+ LEADING(a (b c)) PARALLEL(3) USE_HASH(b) USE_HASH(c) PQ_DISTRIBUTE(c HASH HASH) PX_JOIN_FILTER(c a) PX_JOIN_FILTER(c b) */ COUNT(*) FROM tt1 a, tt2 b, tt3 c WHERE a.c1_rand=b.c1_rand AND a.c2_rand = c.c2_rand AND b.c3_rand = c.c3_rand;
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Query Plan |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| ======================================================== |
| |ID|OPERATOR |NAME | |
| -------------------------------------------------------- |
| |0 |SCALAR GROUP BY | | |
| |1 |└─PX COORDINATOR | | |
| |2 | └─EXCHANGE OUT DISTR |:EX10003| |
| |3 | └─MERGE GROUP BY | | |
| |4 | └─SHARED HASH JOIN | | |
| |5 | ├─JOIN FILTER CREATE |:RF0001 | |
| |6 | │ └─EXCHANGE IN DISTR | | |
| |7 | │ └─EXCHANGE OUT DISTR (BC2HOST)|:EX10000| |
| |8 | │ └─PX BLOCK ITERATOR | | |
| |9 | │ └─TABLE FULL SCAN |a | |
| |10| └─HASH JOIN | | |
| |11| ├─JOIN FILTER CREATE |:RF0000 | |
| |12| │ └─EXCHANGE IN DISTR | | |
| |13| │ └─EXCHANGE OUT DISTR (HASH) |:EX10001| |
| |14| │ └─PX BLOCK ITERATOR | | |
| |15| │ └─TABLE FULL SCAN |b | |
| |16| └─EXCHANGE IN DISTR | | |
| |17| └─EXCHANGE OUT DISTR (HASH) |:EX10002| |
| |18| └─JOIN FILTER USE |:RF0000 | |
| |19| └─JOIN FILTER USE |:RF0001 | |
| |20| └─PX BLOCK ITERATOR | | |
| |21| └─TABLE FULL SCAN |c | |
| ======================================================== |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT_SUM(T_FUN_COUNT(*))]) |
| 1 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| 2 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| dop=3 |
| 3 - output([T_FUN_COUNT(*)]), filter(nil), rowset=256 |
| group(nil), agg_func([T_FUN_COUNT(*)]) |
| 4 - output(nil), filter(nil), rowset=256 |
| equal_conds([a.c1_rand = b.c1_rand], [a.c2_rand = c.c2_rand]), other_conds(nil) |
| 5 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[a.c2_rand] |
| 6 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| 7 - output([a.c2_rand], [a.c1_rand]), filter(nil), rowset=256 |
| dop=3 |
| 8 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| 9 - output([a.c1_rand], [a.c2_rand]), filter(nil), rowset=256 |
| access([a.c1_rand], [a.c2_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([a.__pk_increment]), range(MIN ; MAX)always true |
| 10 - output([b.c1_rand], [c.c2_rand]), filter(nil), rowset=256 |
| equal_conds([b.c3_rand = c.c3_rand]), other_conds(nil) |
| 11 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| RF_TYPE(in, range, bloom), RF_EXPR[b.c3_rand] |
| 12 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| 13 - output([b.c3_rand], [b.c1_rand]), filter(nil), rowset=256 |
| (#keys=1, [b.c3_rand]), dop=3 |
| 14 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| 15 - output([b.c1_rand], [b.c3_rand]), filter(nil), rowset=256 |
| access([b.c1_rand], [b.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([b.__pk_increment]), range(MIN ; MAX)always true |
| 16 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 17 - output([c.c3_rand], [c.c2_rand), filter(nil), rowset=256 |
| (#keys=1, [c.c3_rand]), dop=3 |
| 18 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 19 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 20 - output([c.c3_rand], [c.c2_rand]), filter(nil), rowset=256 |
| 21 - output([c.c3_rand], [c.c2_rand]), filter([RF_IN_FILTER(c.c3_rand)], [RF_RANGE_FILTER(c.c3_rand)], [RF_BLOOM_FILTER(c.c3_rand)], [RF_IN_FILTER(c.c2_rand)], |
| [RF_RANGE_FILTER(c.c2_rand)], [RF_BLOOM_FILTER(c.c2_rand)]), rowset=256 |
| access([c.c2_rand], [c.c3_rand]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, filter_before_indexback[false,false,false,false,false,false], |
| range_key([c.__pk_increment]), range(MIN ; MAX)always true |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
70 rows in set
In the preceding example, Operator 5 creates a runtime filter denoted by RF0001 in the NAME field, and Operator 11 creates a runtime filter denoted by RF0000. The two runtime filters are pushed down and used at Operator 21 TABLE FULL SCAN. Since the runtime filters are transmitted over multiple data flow objects (DFOs), they are global runtime filters.
Part join filters
During a hash join, a hash table is created in blocking mode based on the left table, and data in the right table is matched row by row. All data in the left table is obtained for building the hash table. During the building process, if the system can determine which partitions in the right table contain data in the left table, it can filter those unnecessary partitions from the right table when scanning its data, thus improving the join performance. Part join filters are introduced to implement such optimization. To determine which partitions of the right table contain data in the left table, the join key must contain the partitioning key of the right table, which is the prerequisite for generating a part join filter.
The following example is an execution plan that uses a part join filter.
obclient> CREATE TABLE tt1(v1 INT);
Query OK, 0 rows affected
obclient> CREATE TABLE tt2(v1 INT) PARTITION BY HASH(v1) PARTITIONS 5;
Query OK, 0 rows affected
obclient> EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
+-------------------------------------------------------------------------------+
| Query Plan |
+-------------------------------------------------------------------------------+
| ======================================================================= |
| |ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)| |
| ----------------------------------------------------------------------- |
| |0 |PX COORDINATOR | |1 |5 | |
| |1 |└─EXCHANGE OUT DISTR |:EX10001|1 |4 | |
| |2 | └─HASH JOIN | |1 |4 | |
| |3 | ├─PART JOIN FILTER CREATE |:RF0000 |1 |1 | |
| |4 | │ └─EXCHANGE IN DISTR | |1 |1 | |
| |5 | │ └─EXCHANGE OUT DISTR (PKEY)|:EX10000|1 |1 | |
| |6 | │ └─PX BLOCK ITERATOR | |1 |1 | |
| |7 | │ └─TABLE FULL SCAN |tt1 |1 |1 | |
| |8 | └─PX PARTITION HASH JOIN-FILTER|:RF0000 |1 |3 | |
| |9 | └─TABLE FULL SCAN |tt2 |1 |3 | |
| ======================================================================= |
| Outputs & filters: |
| ------------------------------------- |
| 0 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| 1 - output([INTERNAL_FUNCTION(tt1.v1, tt2.v1)]), filter(nil), rowset=256 |
| dop=3 |
| 2 - output([tt1.v1], [tt2.v1]), filter(nil), rowset=256 |
| equal_conds([tt1.v1 = tt2.v1]), other_conds(nil) |
| 3 - output([tt1.v1]), filter(nil), rowset=256 |
| RF_TYPE(bloom), RF_EXPR[t1.v1] |
| 4 - output([tt1.v1]), filter(nil), rowset=256 |
| 5 - output([tt1.v1]), filter(nil), rowset=256 |
| (#keys=1, [t1.v1]), dop=3 |
| 6 - output([tt1.v1]), filter(nil), rowset=256 |
| 7 - output([tt1.v1]), filter(nil), rowset=256 |
| access([tt1.v1]), partitions(p0) |
| is_index_back=false, is_global_index=false, |
| range_key([tt1.__pk_increment]), range(MIN ; MAX)always true |
| 8 - output([tt2.v1]), filter(nil), rowset=256 |
| affinitize |
| 9 - output([tt2.v1]), filter(nil), rowset=256 |
| access([tt2.v1]), partitions(p[0-4]) |
| is_index_back=false, is_global_index=false, |
| range_key([tt2.__pk_increment]), range(MIN ; MAX)always true |
+-------------------------------------------------------------------------------+
In the preceding example, Operator 3 PART JOIN FILTER CREATE creates a part join filter, and Operator 8 PX PARTITION HASH JOIN-FILTER uses the part join filter to filter out partitions that will not participate in the join from table tt2.
Manually enable and disable the runtime filter feature
Runtime filters can be used only for hash joins. The OceanBase Database optimizer will not assign any runtime filters if a join is not a hash join. In most cases, the optimizer automatically assigns runtime filters during hash joins. If the optimizer does not assign a runtime filter, you can manually assign one by using a hint.
You can use the PX_JOIN_FILTER or PX_PART_JOIN_FILTER hint to manually enable the runtime filter feature. The syntaxes of the hints are as follows:
/*+ PX_JOIN_FILTER(join_right_table_name)*/
/*+ PX_PART_JOIN_FILTER(join_right_table_name)*/
Here are some examples:
EXPLAIN SELECT /*+ PX_JOIN_FILTER(tt2) PARALLEL(4) */ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
EXPLAIN SELECT /*+ PARALLEL(3) LEADING(tt1 tt2) PX_PART_JOIN_FILTER(tt2)*/ * FROM tt1 JOIN tt2 ON tt1.v1=tt2.v1;
In scenarios where the degree of parallelism (DOP) is 1, no runtime filter is assigned. To use a runtime filter, you must set the DOP to at least 2. You can use the PARALLEL hint to set the DOP. Here is an example:
/*+ PARALLEL(2) */
Note that runtime filters may not speed up a query or even cause a slight performance degradation when data is not well filtered in a hash join. Therefore, when you choose to manually enable the runtime filter feature, you need to carefully evaluate the query scenario to determine whether runtime filters are applicable.
You can use the NO_PX_JOIN_FILTER or NO_PX_PART_JOIN_FILTER hint to manually disable the runtime filter feature. The syntaxes of the hints are as follows:
/*+ NO_PX_JOIN_FILTER(join_right_table_name)*/
/*+ NO_PX_PART_JOIN_FILTER(join_right_table_name)*/
Adjust the execution strategy of runtime filters
Runtime filters are adaptive. By default, when the selectivity of a join based on the join key meets requirements, runtime filters of three data structures, including In, Range, and Bloom filters, are created. An In filter uses the hash table for filtering. A Range filter uses the maximum or minimum value for filtering.
In terms of filtering priority, because the In filter has the most accurate filtering performance, when the In filter is enabled during execution, the other two filters will be adaptively disabled. During execution, the executor will determine whether to use the In filter based on the actual number of distinct values (NDV). In addition, in actual computing, each type of filters are adaptively enabled or disabled based on the real-time filterability.
OceanBase Database provides system variables to allow you to adjust the execution strategy of runtime filters, including runtime_filter_type, runtime_filter_max_in_num, runtime_filter_wait_time_ms, and runtime_bloom_filter_max_size.
You can use the runtime_filter_type variable to specify the type of runtime filters to be enabled. The default value of runtime_filter_type is 'BLOOM_FILTER,RANGE,IN', which indicates that three types of runtime filters are enabled at the same time. When the variable is set to runtime_filter_type='', no runtime filters are enabled. Here is an example:
ALTER SYSTEM SET runtime_filter_type = 'BLOOM_FILTER,RANGE,IN'
Generally, you can use the default value for the runtime_filter_type variable and do not need to specify a specific type. In that case, OceanBase Database selects the optimal type of runtime filters during the optimization and execution phases.
You can use the runtime_filter_max_in_num variable to specify the NDV for an In filter. The default value is 1024. The optimizer estimates the NDV of the build table. If NDV > runtime_filter_max_in_num, the optimizer does not assign an In filter. Because the NDV estimated by the optimizer may not be accurate, there may be cases where the actual NDV is large but the estimated NDV is small, resulting in the misassignment of an In filter. In this case, the executor will adaptively disable the In filter based on the actual NDV during execution. In general, we recommend that you do not set this value too large, because when the NDV of the build table is large, the optimization effect of In filters will be not as good as that of Bloom filters.
You can use the runtime_filter_wait_time_ms variable to set the maximum time that the probe side waits for a runtime filter to arrive. The default value is 10 ms. The probe side waits for a runtime filter to arrive before outputting data. If the runtime filter does not arrive within the period specified in runtime_filter_wait_time_ms, the probe side goes to the By Pass phase. Then, the probe side starts output unfiltered data. When the runtime filter arrives at a later time, it will still be enabled and used to filter data. In most cases, you do not need to adjust the value of this variable. If the data size of a Bloom filter is large and the filterability is good, you can increase the value.
You can use the runtime_bloom_filter_max_size variable to control the maximum length of a Bloom filter. The default value is 2048 MB. When the build table has massive data and exceeds the default maximum length of the Bloom filter, you need to increase the value of runtime_bloom_filter_max_size.