Blog编组 28
Optimizing the Distributed Execution Engine to Improve Database Query Performance

Optimizing the Distributed Execution Engine to Improve Database Query Performance

右侧logo

oceanbase database

Photo by Isaac Smith on Unsplash

This article was written by Guoping Wang, tech leader of the SQL engine team at OceanBase. Guoping Wang received his Ph.D. Degree in Computer Science from the National University of Singapore.

When used by a large number of customers in various industries, OceanBase Database has a few performance issues in some business scenarios.

For example, OceanBase Database cannot generate the optimal execution plans in certain distributed scenarios, the execution engine is intolerant of non-optimal execution plans, and it cannot accelerate the query execution at the maximum parallelism in certain scenarios.

To solve these issues, we have been thinking about how to enhance the SQL engine and improve its performance in processing distributed queries.

Essentially, the performance of the SQL engine in processing distributed queries relies on the optimization of distributed queries and the distributed execution engine. In the last webinar, we discussed the challenges and methods of optimizing distributed queries. In this article, we will talk about how to improve the performance of the distributed execution engine.

So far, we have done many things to enhance the execution engine:

‒ Implemented new distributed and standalone algorithms, such as Null-Aware Hash Anti-Join, Shared Broadcast Hash Join, Hash-based window functions, and partition bloom filters.

‒ Improved the implementation of the vectorized engine.

‒ Developed superb parallel pushdown techniques.

‒ Started the development of the adaptive technique.

These efforts have greatly improved the performance of both distributed and standalone queries.

For more information about how to implement the vectorized engine, read this article. Here, we will focus on the adaptive technique and parallel pushdown techniques for the distributed execution engine.

Develop an adaptive execution engine

Some OceanBase customers reported that the execution engine was intolerant of non-optimal execution plans generated by the optimizer. In other words, if the optimizer generated a non-optimal execution plan, the execution engine could not adjust the plan to improve the execution performance. Although the optimizer is designed to choose the optimal execution plans for database queries, the optimizer itself is not perfect. For example, it cannot accurately estimate the total number of rows. So, the optimizer may pick a less optimal execution plan, or even a lousy one.

To solve this problem, we introduced adaptivity to the execution engine. An adaptive execution engine identifies some non-optimal execution plans based on the real-time execution state and adjusts them accordingly to improve the execution performance. Although the adaptive technique is not a universal solution for all non-optimal execution plans generated by the optimizer, it has significantly improved the execution engine.

For example, the adaptive GROUP BY/DISTINCT parallel pushdown technique can prevent performance downgrade caused by regular GROUP BY/DISTINCT parallel pushdown of a non-optimal plan.

Before we dive into the adaptive technique, let me briefly introduce the GROUP BY/DISTINCT parallel pushdown technique.

As a general technique in distributed execution, GROUP BY/DISTINCT parallel pushdown is often used to push down the GROUP BY operator in advance to pre-aggregate some data. This reduces the workload of network transmission, thus improving performance.

As an example, the following execution plan pushes down the №5 operator, a GROUP BY operator, for data pre-aggregation, so that the network transmission workload of the №4 operator is reduced to achieve higher performance. However, note that GROUP BY pushdown does not necessarily improve the performance. It backfires sometimes, mainly because it consumes extra computing resources. GROUP BY parallel pushdown brings benefits only when the performance improvement in network transmission surpasses the extra computing cost.

create table R1(a int primary key, b int, c int) partition by hash(a) partitions 4;explain select b, sum(c) from R1 group by b;| ==========================================================|ID|OPERATOR                     |NAME    |EST. ROWS|COST|----------------------------------------------------------|0 |PX COORDINATOR               |        |1        |10  ||1 | EXCHANGE OUT DISTR          |:EX10001|1        |10  ||2 |  HASH GROUP BY              |        |1        |9   ||3 |   EXCHANGE IN DISTR         |        |1        |9   ||4 |    EXCHANGE OUT DISTR (HASH)|:EX10000|1        |8   ||5 |     HASH GROUP BY           |        |1        |8   ||6 |      PX PARTITION ITERATOR  |        |1        |7   ||7 |       TABLE SCAN            |r1      |1        |7   |==========================================================Outputs & filters:-------------------------------------  0 - output([INTERNAL_FUNCTION(r1.b, T_FUN_SUM(T_FUN_SUM(r1.c)))]), filter(nil), rowset=256  1 - output([INTERNAL_FUNCTION(r1.b, T_FUN_SUM(T_FUN_SUM(r1.c)))]), filter(nil), rowset=256, dop=1  2 - output([r1.b], [T_FUN_SUM(T_FUN_SUM(r1.c))]), filter(nil), rowset=256,      group([r1.b]), agg_func([T_FUN_SUM(T_FUN_SUM(r1.c))])  3 - output([r1.b], [T_FUN_SUM(r1.c)]), filter(nil), rowset=256  4 - (#keys=1, [r1.b]), output([r1.b], [T_FUN_SUM(r1.c)]), filter(nil), rowset=256, dop=1  5 - output([r1.b], [T_FUN_SUM(r1.c)]), filter(nil), rowset=256,      group([r1.b]), agg_func([T_FUN_SUM(r1.c)])  6 - output([r1.b], [r1.c]), filter(nil), rowset=256  7 - output([r1.b], [r1.c]), filter(nil), rowset=256,      access([r1.b], [r1.c]), partitions(p[0-3])

In the past, the OceanBase execution engine decided whether to push down the GROUP BY operator based on the cost calculated by the optimizer. However, GROUP BY pushdown could be an accident because the optimizer might incorrectly estimate the number of rows. To solve this issue, we introduced the adaptive GROUP BY/DISTINCT parallel pushdown technique, which allows the optimizer to always push down the GROUP BY or DISTINCT operator, and then decide whether to skip the pushed GROUP BY or DISTINCT operator by sampling some of its data during the execution.

The challenge of this technique lies in how to make the pushed operator achieve satisfactory pre-aggregation performance. The OceanBase solution is to control the performance of the HASH table of the pushed operator by limiting the table within the L3 cache and performing multiple rounds of sampling to prevent misjudgment due to continuous non-aggregation of data. The key points of the solution are described as follows:

‒ The execution engine limits the HASH table within L2 cache (1 MB) and, in the case of unsatisfactory pre-aggregation performance, marks the HSAH table as discarded. If the pre-aggregation performance is good, the execution engine expands the HASH table to L3 cache (10 MB) and, if more memory is needed during the execution, marks the HSAH table as discarded.

‒ If the HSAH table is discarded, the execution engine returns and releases all rows of the table, and then rebuilds the HSAH table to start the next round of sampling.

‒ If pre-aggregation fails to achieve satisfactory performance in five consecutive rounds of sampling, the execution engine skips the pushed GROUP BY operator.

Compared to the execution without operator pushdown, adaptive GROUP BY/DISTINCT parallel pushdown involves extra overhead for sampling and computing, which are required to decide whether to skip the pushed operator during the execution. However, our tests based on various data distribution modes indicate that the extra overhead can be controlled to be within 10%, which is much lower than the performance gain.

We are also working on more adaptive techniques, such as the adaptive creation and detection of bloom filters, adaptive tuning of Nested Loop Join and Hash Join, and adaptive tuning of Distributed Broadcast Join and Distributed Hash-Hash Repartition Join.

Use parallel pushdown techniques to further enhance distributed query performance

Parallel pushdown in the execution of distributed queries is a technique where the computing of some operators is pushed down to improve performance. Generally, this technique improves the performance of distributed queries by performing executions at the maximum parallelism or reducing network transmission. It significantly improves the performance of distributed queries by orders of magnitude in many cases. The GROUP BY/DISTINCT parallel pushdown technique described in the previous section is a typical example of parallel pushdown techniques. OceanBase provides well-developed parallel pushdown techniques, which work on almost all operators in analytical processing, such as GroupBy, ROLLUP, DISTINCT, and window functions.

The implementation of parallel pushdown techniques varies based on the operators. Considering the complexity of parallel execution, the parallel pushdown of each operator faces different challenges. Now, let’s talk about the benefits of parallel pushdown techniques by taking the three-stage DISTINCT parallel pushdown of aggregate functions as an example.

In the following example, query Q1 contains an aggregation function that has two DISTINCT keywords. Before a parallel pushdown technique is used, deduplication and aggregation are performed by the №0 operator, which does not support parallel execution and results in poor overall execution performance.

create table R1(a int, b int, c int, d int, primary key(a,b)) partition by hash(b) partitions 4;Q1: select sum(distinct c), sum(distinct d) from R1 where a = 5;| =====================================================|ID|OPERATOR                |NAME    |EST. ROWS|COST|-----------------------------------------------------|0 |SCALAR GROUP BY         |        |1        |2365||1 | PX COORDINATOR         |        |3960     |2122||2 |  EXCHANGE OUT DISTR    |:EX10000|3960     |1532||3 |   PX PARTITION ITERATOR|        |3960     |1532||4 |    TABLE SCAN          |r1      |3960     |1532|=====================================================Outputs & filters:-------------------------------------  0 - output([T_FUN_SUM(distinct r1.c)], [T_FUN_SUM(distinct r1.d)]), filter(nil),      group(nil), agg_func([T_FUN_SUM(distinct r1.c)], [T_FUN_SUM(distinct r1.d)])  1 - output([r1.c], [r1.d]), filter(nil)  2 - output([r1.c], [r1.d]), filter(nil), dop=1  3 - output([r1.c], [r1.d]), filter(nil)  4 - output([r1.c], [r1.d]), filter(nil),      access([r1.c], [r1.d]), partitions(p[0-3])

To address the issue, we introduced the three-stage parallel pushdown technique.

Stage 1: Push down the DISTINCT keyword for partial data deduplication, which is completed by the №6 operator in the following example.

Stage 2: Repartition data based on the deduplicated column, and then perform complete deduplication and partial pre-aggregation, which are completed by the №3, №4, and №5 operators.

Stage 3: Aggregate the results of Stage 2, which is completed by the №0, №1, and №2 operators.

Compared to the execution without operator pushdown, the three-stage parallel pushdown technique has two performance benefits. First, it allows data deduplication and pre-aggregation at the maximum parallelism. Second, data deduplication by using the DISTINCT pushdown technique reduces the workload of network transmission.

create table R1(a int, b int, c int, d int, primary key(a,b)) partition by hash(b) partitions 4;select sum(distinct c) from R1 where a = 5;| ===========================================================|ID|OPERATOR                      |NAME    |EST. ROWS|COST|-----------------------------------------------------------|0 |SCALAR GROUP BY               |        |1        |1986||1 | PX COORDINATOR               |        |1        |1835||2 |  EXCHANGE OUT DISTR          |:EX10001|1        |1835||3 |   MERGE GROUP BY             |        |1        |1835||4 |    EXCHANGE IN DISTR         |        |1        |1683||5 |     EXCHANGE OUT DISTR (HASH)|:EX10000|1        |1683||6 |      HASH GROUP BY           |        |1        |1683||7 |       PX PARTITION ITERATOR  |        |3960     |1532||8 |        TABLE SCAN            |r1      |3960     |1532|===========================================================Outputs & filters:-------------------------------------  0 - output([T_FUN_SUM(T_FUN_SUM(distinct r1.c))]), filter(nil),      group(nil), agg_func([T_FUN_SUM(T_FUN_SUM(distinct r1.c))])  1 - output([T_FUN_SUM(distinct r1.c)]), filter(nil)  2 - output([T_FUN_SUM(distinct r1.c)]), filter(nil), dop=1  3 - output([T_FUN_SUM(distinct r1.c)]), filter(nil),      group(nil), agg_func([T_FUN_SUM(distinct r1.c)])  4 - output([r1.c]), filter(nil)  5 - (#keys=1, [r1.c]), output([r1.c]), filter(nil), dop=1  6 - output([r1.c]), filter(nil),      group([r1.c]), agg_func(nil)  7 - output([r1.c]), filter(nil)  8 - output([r1.c]), filter(nil),      access([r1.c]), partitions(p[0-3]

The preceding example shows how the three-stage parallel pushdown technique works for an aggregate function with only one DISTINCT keyword. The question is, is it still effective for aggregate functions with more DISTINCT keywords?

The answer is yes. The trick is that in Stage 1, we create a replica of the data set for each aggregate function that has N DISTINCT keywords and tag the replica to indicate its association with this aggregate function. Similar operations are performed in Stages 2 and 3, except for some minor differences in terms of implementation. The following example describes the three-stage pushdown of an aggregation function that has two DISTINCT keywords in the OceanBase Database. Each AGGR_CODE is a tag of the data replica for a DISTINCT keyword.

create table R1(a int, b int, c int, d int, primary key(a,b)) partition by hash(b) partitions 4;select sum(distinct c), sum(distinct d) from R1 where a = 5;| ===========================================================|ID|OPERATOR                      |NAME    |EST. ROWS|COST|-----------------------------------------------------------|0 |SCALAR GROUP BY               |        |1        |13  ||1 | PX COORDINATOR               |        |2        |13  ||2 |  EXCHANGE OUT DISTR          |:EX10001|2        |12  ||3 |   HASH GROUP BY              |        |2        |11  ||4 |    EXCHANGE IN DISTR         |        |2        |10  ||5 |     EXCHANGE OUT DISTR (HASH)|:EX10000|2        |9   ||6 |      HASH GROUP BY           |        |2        |8   ||7 |       PX PARTITION ITERATOR  |        |1        |7   ||8 |        TABLE SCAN            |r1      |1        |7   |===========================================================Outputs & filters:-------------------------------------  0 - output([T_FUN_SUM(T_FUN_SUM(dup(r1.c)))], [T_FUN_SUM(T_FUN_SUM(dup(r1.d)))]), filter(nil), rowset=256,          group(nil), agg_func([T_FUN_SUM(T_FUN_SUM(dup(r1.c)))], [T_FUN_SUM(T_FUN_SUM(dup(r1.d)))])  1 - output([AGGR_CODE], [T_FUN_SUM(dup(r1.c))], [T_FUN_SUM(dup(r1.d))]), filter(nil), rowset=256  2 - output([AGGR_CODE], [T_FUN_SUM(dup(r1.c))], [T_FUN_SUM(dup(r1.d))]), filter(nil), rowset=256, dop=1  3 - output([AGGR_CODE], [T_FUN_SUM(dup(r1.c))], [T_FUN_SUM(dup(r1.d))]), filter(nil), rowset=256,    group([AGGR_CODE]), agg_func([T_FUN_SUM(dup(r1.c))], [T_FUN_SUM(dup(r1.d))])  4 - output([AGGR_CODE], [dup(r1.c)], [dup(r1.d)]), filter(nil), rowset=256  5 - (#keys=3, [AGGR_CODE], [dup(r1.c)], [dup(r1.d)]), output([AGGR_CODE], [dup(r1.c)], [dup(r1.d)]), filter(nil), rowset=256, dop=1  6 - output([AGGR_CODE], [dup(r1.c)], [dup(r1.d)]), filter(nil), rowset=256,     group([AGGR_CODE], [dup(r1.c)], [dup(r1.d)]), agg_func(nil)  7 - output([r1.c], [r1.d]), filter(nil), rowset=256  8 - output([r1.c], [r1.d]), filter(nil), rowset=256,     access([r1.c], [r1.d]), partitions(p[0-3])

Distributed parallel pushdown is often used by our customers. In the past, many customers reported issues with distributed queries due to the poor performance of the old parallel pushdown technique, and hoped that we can help them resolve the issues.

We tested the techniques by running the TPC-DS benchmark with a scale of 100 GB. The test results show that the new technique significantly improves the distributed query performance. The total execution duration of 99 queries decreases from 918s to 270s, as shown in the following figure.

oceanbase database

If you have any questions, feel free to leave a comment below!


ICON_SHARE
ICON_SHARE