Photo by Onur Binay on Unsplash
Complex database architectures, especially those deployed for processing massive amounts of data, tend to pose sneaky and growing hazards, leading to high system throughput, interruptions between service modules, and difficulties in the maintenance of duplicated data.
A game company was a victim of such issues. Starting from the challenges to their real-time data warehouse, the head of the company’s big data platform shared their stories about replacing ClickHouse and Hive with their self-managed integrated warehouse architecture for both storage and real-time analysis.
As a game company, data analysis is very important to us, and the performance of our data analysis system matters a lot. In the past, we use data warehouses to analyze the game data, such as user behaviors (game download, account registration, and account top-up), advertising data, marketing data, gaming status of players, and combat parameters.
Like most companies in this industry, our data warehouses are built in a typical Lambda architecture, as shown in Figure 1. The game data was processed by the following workflow:
2. The cached data is used by scheduled tasks for:
‒ Offline processing by using a Hive data warehouse, and
‒ Real-time analysis by using a ClickHouse data warehouse
3. The processed data is then sent to application systems, such as the business intelligence (BI) system, user system, marketing system, and third-party systems.
Figure 1 Architecture of the old data warehouse system
During data pre-processing, the system sends alerts and takes action against defective data, such as data with incomplete fields or incorrect field types. When it comes to data processing, the game industry differs from other industries in that game companies perform attribution analysis on their data. Simply put, we analyze the data collection process in detail to determine the exact source of each piece of data, such as a specific ad position and then adjust our advertising strategies based on the attribution analysis result.
The architecture of our data warehouses consists of the Original Definition Service (ODS) layer, Data Warehouse Details (DWD) layer, Data Warehouse Middleware (DWM) layer, Data Warehouse Service (DWS) layer, and Data Market (DM) layer.
After passing quality checks, the raw data is written to the ODS layer in Hive and ClickHouse data warehouses, respectively. The data in the ODS layer is also available in the Kafka instance. Therefore, Kafka is technically part of the ODS layer. The data processed in the ODS layers is stored in the DWD layer by our self-developed task scheduling system, which manages the source data dictionary and data quality, and helps schedule many tasks, such as task rerun, task priority adjustment, and data quality alerting. Then, the data is processed for metric aggregation in the DWM and DWS layers before being sent to the DM layer based on PostgreSQL and Redis databases.
We were proud of this architecture, but it could not overcome some of our challenges.
Challenge 1: Real-time data.
While many companies were using the T+1 strategy for their data warehouses, we managed to optimize our Hive data warehouse and were able to obtain the analysis results 30 minutes after the data was generated. In this way, we could load the data once every 30 minutes, write it to Hive, and perform INSERT OVERWRITE to insert it into the partition of the same day, which reduced data fragmentation. In addition, the real-time ClickHouse data warehouse could present the analysis results in about one minute after data generation. However, we need to see the results in milliseconds in some cases, which cannot be done with Hive or ClickHouse.
Challenge 2: Data consistency.
If you have worked with the Lambda architecture, you would know that the ClickHouse and Hive data warehouses usually provide inconsistent results. Our system allow data duplication and was capable of data deduplication, but inconsistent results were still generated after data deduplication. Therefore, we used the ClickHouse warehouse only for real-time data viewing, and the Hive warehouse for final data use.
Challenge 3: System maintainability.
Although we built the real-time ClickHouse data warehouse and offline Hive data warehouse in one architecture, their code systems were different, resulting in a considerable maintenance workload.
Challenge 4: Ad hoc query efficiency.
It took more than ten minutes sometimes for the offline Hive data warehouse to generate the query results, and seconds to minutes for the real-time ClickHouse data warehouse to do the same. So, the performance of ad hoc queries was fine in most cases, but unacceptable in some highly demanding circumstances. The advertiser channel association analysis, for example, requires a system to associate the order data with user data and then with the advertiser information. However, we had to wait about 30 minutes for the analysis results of the advertising performance, which should be desirably generated within one second.
That demanding timeliness was impossible to achieve for a data warehouse in the Lambda architecture, leaving us no choice but to look for satisfactory alternatives.
We first investigated Hudi and Doris data warehouses. Hudi took at least 60 seconds from data writes to running the JOIN query, and Doris could present results in 10 to 60 seconds.
Given that our ClickHouse data warehouse could present results in 66 seconds, as shown in Figure 2, the performance improvement by those two alternatives could hardly improve our situation.
Figure 2 ClickHouse presented query results in about 66 seconds
Then, we learned that OceanBase Database was capable of hybrid transaction and analytical processing (HTAP) and tested its speed in querying user IDs. At first, we performed the test with optimized indexes only. OceanBase Database executed 120 million JOIN queries against 3.4 billion data rows, and presented the query results in 0.23 seconds, as shown in Figure 3. After that, we warmed up the data and obtained the results in 0.01 seconds, a performance improvement of 286 times (66/0.23). Well, that improvement was astonishing.
Figure 3 OceanBase presented the query results in milliseconds
Without any hesitation, we decided to encapsulate OceanBase capabilities into our system to meet our business needs, such as searching for user IDs, searching for advertising information by user ID, and checking promotion performance in real-time.
After deciding on using OceanBase Database, we developed a plan to make sure that we could use it effectively.
First, we processed the historical data and real-time data separately:
‒ Historical data: We exported the historical data to a CSV file, and then imported it to OceanBase Database by using DataX.
‒ Real-time data: Considering our requirement for millisecond-level responses, we chose Flink SQL to extract data in real-time. The test showed that Flink SQL loaded data to OceanBase Database within one second after data generation.
We had trouble importing the historical data as we were not familiar enough with OceanBase Database. However, problems were solved with the help of OceanBase technical experts. For example, when exporting the historical data to a CSV file by using DataX, you’d better specify port 2881 in the configuration file for direct connection to OceanBase Database. If you use port 2883, your commands will be sent first to an OBProxy, which may forward your commands to another machine. If DataX is not deployed on that machine, the files cannot be found.
As for real-time data writes to OceanBase Database, it’s worth mentioning that we also considered Apache Spark, an analytics engine that writes data in micro-batches, which is not as good as Flink in terms of real-timeliness. While the time interval between two micro-batches could be up to 300 ms, Flink wrote data to OceanBase Database in real-time. That’s why we chose Flink SQL to write data to our OceanBase database.
The following images show the real-time Extract-Transform-Load (ETL) process of data by using Flink. Figure 4 shows the process of Flink extracting data in real-time from the Kafka data source.
Figure 4 Flink extracting data from the Kafka data source in real-time
Figure 5 shows the process of Flink SQL transforming and processing data in real-time.
Figure 5 Flink SQL transforming and processing data in real-time
Figure 6 shows the process of Flink SQL loading data to the destination OceanBase database in real-time.
We also partially extracted the code and converted it into a script for batch submission. The script allowed Flink Source to synchronize data in real-time from data sources such as Kafka, MySQL, Oracle, OceanBase, MongoDB, PostgreSQL, and SQL Server and process the data before loading it to the destination OceanBase database. In this way, we have built our core real-time data warehouse system based on OceanBase Database.
The code shown in the preceding three figures has been deployed in our production environment for two purposes: finding user IDs and searching for data attributes based on user IDs, such as the specific advertising channels that directed users to our products.
Figure 7 shows the deployment of the OceanBase Database in our current business system.
Figure 7 The architecture of a data warehouse system integrated with OceanBase Database
The stable operation of the OceanBase Database in our business system has solved the four stubborn challenges that we previously mentioned.
Flink SQL extracts data from the Kafka instance and then loads it into an OceanBase database in real-time. Hopefully, the Flink-SQL-connector-OceanBase-CDC tool will better support efficient historical data reprocessing without data loss or redundancy and seamlessly match historical and real-time data without omission.
As we keep upgrading OceanBase Change Data Capture (CDC) to better versions, we are looking forward to the flink-connector developed by the community, especially for OceanBase. With the well-developed Flink-SQL-connector-OceanBase-CDC tool for data reads and flink-connector for data writes, we can efficiently process the game data in data warehouses at the second and third layers without any loss or redundancy, extending the borders of the OceanBase ecosystem to big data and, as a bonus, achieving storage/computing separation in a big data environment.
2. Data consistency
OceanBase Database handles the historical and real-time data in our existing data warehouse system perfectly without any redundancy, omission, or loss.
3. Ad hoc query performance
In the test with index optimization only, OceanBase Database executed 120 million JOIN queries against 3.4 billion data rows and presented the query results in 0.23 seconds. After that, we warmed up the data and obtained the results in 0.01 seconds, a performance improvement of 286 times (66/0.23).
4. System maintainability
Instead of maintaining two data warehouses, namely ClickHouse and Hive, we will gradually migrate all core business systems to OceanBase Database, which allows us to run our business on a simplified architecture that supports both TP and AP workloads at the same time.
Next, we will migrate the user system, advertising system, and data analysis system to OceanBase Database to support marketing and channel management and analysis, as shown in Figure 8. Code development and data synchronization are already underway. We hope to host all our business data in OceanBase Database for analysis and storage and handle all our tasks in an all-in-one system for closed-loop data processing.
OceanBase has impressed us. We hope that this article can inspire active communication among folks in this industry. If you are interested in any details of this article or have any questions, feel free to leave a comment.