Flume is a high-availability, high-reliability distributed log collection, aggregation, and transmission software. Its core function is to collect data from data sources and send it to the specified destination. This topic describes how to use a custom OceanBaseSink to write Flume-collected log data to OceanBase Database, enabling real-time collection and storage of log data. Flume uses its Source-Channel-Sink architecture to integrate with OceanBase JDBC drivers through custom Sink components, supporting both single-node and multi-node deployment modes. It is suitable for log monitoring, data analysis, and other scenarios.
Version compatibility
- OceanBase Database version: ≥ V4.3.5 BP2
Prerequisites
Before you use Flume, make sure that:
- Flume is installed. For more information about how to install Flume, see Install Flume. [Note: The original document contains Docker installation steps.]
- OceanBase Database is deployed and a MySQL user tenant is created. For more information about how to create a user tenant, see Create a tenant.
- A Java environment (JDK 1.8 or later) is installed.
- Maven is installed to build the project.
Procedure
Step 1: Obtain the connection string of OceanBase Database
Contact the OceanBase Database deployment personnel to obtain the connection string, for example:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
Parameter description:
$host: the IP address for connection. For ODP connection, use the ODP IP address. For direct connection, use the OBServer IP address.$port: the connection port. For ODP connection, the default value is2883. For direct connection, the default value is2881.$database_name: the database name.Notice
The user for connecting to the tenant must have the
CREATE,INSERT,DROP, andSELECTprivileges on the database. For more information about user privileges, see Privilege types in MySQL mode.$user_name: the connection account. For ODP connection, the format isUser@Tenant#ClusterorCluster:Tenant:User. For direct connection, the format isUser@Tenant.$password: the account password.
For more information about the connection string, see Connect to an OceanBase tenant by using OBClient.
Here is an example:
obclient -hxxx.xxx.xxx.xxx -P2881 -utest_user001@mysql001 -p****** -Dtest
Step 2: Create a database table
After you log in to OceanBase Database, execute the following statement to create a target table for storing log data:
CREATE TABLE `flume_data` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`message` varchar(255) DEFAULT NULL,
`createTime` datetime NOT NULL,
PRIMARY KEY (`id`)
) WITH COLUMN GROUP(each column);
Step 3: Develop and package a custom OceanBaseSink
This example uses Flume V1.11.0 and OceanBase JDBC V2.4.3. Perform the following steps:
Create a Maven project and add the following dependencies to the
pom.xmlfile:<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-configuration</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>com.oceanbase</groupId> <artifactId>oceanbase-client</artifactId> <version>2.4.3</version> </dependency> </dependencies>Create a Message class to encapsulate the log content and time information:
package org.example; public class Message { private String message; private String createTime; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public String getCreateTime() { return createTime; } public void setCreateTime(String createTime) { this.createTime = createTime; } }Create an OceanBaseSink class and implement the log writing logic:
package org.example; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; public class OceanBaseSink extends AbstractSink implements Configurable { private Logger LOG = LoggerFactory.getLogger(OceanBaseSink.class); private String hostname; private String port; private String databaseName; private String tableName; private String user; private String password; private PreparedStatement preparedStatement; private Connection conn; private int batchSize; public OceanBaseSink() { LOG.info("OceanBaseSink start..."); } public void configure(Context context) { hostname = context.getString("hostname"); Preconditions.checkNotNull(hostname, "hostname must be set!!"); port = context.getString("port"); Preconditions.checkNotNull(port, "port must be set!!"); databaseName = context.getString("databaseName"); Preconditions.checkNotNull(databaseName, "databaseName must be set!!"); tableName = context.getString("tableName"); Preconditions.checkNotNull(tableName, "tableName must be set!!"); user = context.getString("user"); Preconditions.checkNotNull(user, "user must be set!!"); password = context.getString("password"); Preconditions.checkNotNull(password, "password must be set!!"); batchSize = context.getInteger("batchSize", 100); Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!"); } @Override public void start() { super.start(); try { Class.forName("com.oceanbase.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } String url = "jdbc:oceanbase://" + hostname + ":" + port + "/" + databaseName; try { conn = DriverManager.getConnection(url, user, password); conn.setAutoCommit(false); preparedStatement = conn.prepareStatement("insert into " + tableName + " (message,createTime) values (?,?)"); } catch (SQLException e) { e.printStackTrace(); System.exit(1); } } @Override public void stop() { super.stop(); if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } public Status process() throws EventDeliveryException { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event; String content; List<Message> messages = Lists.newArrayList(); transaction.begin(); try { for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) { content = new String(event.getBody()); Message message = new Message(); message.setMessage(content); message.setCreateTime(df.format(new Date())); messages.add(message); } else { result = Status.BACKOFF; break; } } if (messages.size() > 0) { preparedStatement.clearBatch(); for (Message temp : messages) { preparedStatement.setString(1, temp.getMessage()); preparedStatement.setString(2, temp.getCreateTime()); preparedStatement.addBatch(); } preparedStatement.executeBatch(); conn.commit(); } transaction.commit(); } catch (Exception e) { try { transaction.rollback(); } catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been.successful.", e2); } LOG.error("Failed to commit transaction.Transaction rolled back.", e); Throwables.propagate(e); } finally { transaction.close(); } return result; } }Run the Maven packaging command to generate the project Jar package.
mvn clean package -DskipTestsObtain
oceanbase-client.jar.wget https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jarThe project Jar package and
oceanbase-client.jarmust be placed in the lib directory of the Flume installation directory.
Step 4: Configure the Flume agent
Depending on the deployment mode, you can configure the Flume agent in single-node mode or multi-node mode.
Single-node mode
Create a configuration file named oceanbase.conf and write the following content to it:
agent1.sources=source1
agent1.channels=channel1
agent1.sinks=oceanbaseSink
# describe/configure source1
agent1.sources.source1.type=exec
agent1.sources.source1.command=tail -F /var/log/tomcat/localhost_access_log.******.txt
agent1.sources.source1.channels=channel1
# describe/configure channel1
agent1.channels.channel1.type=memory
agent1.channels.channel1.capacity=5000
agent1.channels.channel1.transactionCapacity=1000
# describe sink. there are using oceanbaseSink that is a jar
agent1.sinks.oceanbaseSink.type=org.example.OceanBaseSink #Need to confirm whether the package path is correct.
agent1.sinks.oceanbaseSink.hostname=$host
agent1.sinks.oceanbaseSink.port=$port
agent1.sinks.oceanbaseSink.databaseName=$database_name
agent1.sinks.oceanbaseSink.tableName=flume_data
agent1.sinks.oceanbaseSink.user=$user_name
agent1.sinks.oceanbaseSink.password=$password
agent1.sinks.oceanbaseSink.channel=channel1
agent1.sinks.oceanbaseSink.batchSize=5
Parameter description:
$host: the IP address of the OceanBase database.$port: the port number of the OceanBase database.$database_name: the name of the database.$user_name: the username of the database user.$password: the password of the database user.$collector_host: the IP address of the collector node.
Multi-node mode - Node A: Collector and data ingestion node
Create a configuration file named oceanbase.conf and write the following content to it:
agent2.sources = avroSource
agent2.channels = channel1
agent2.sinks = oceanbaseSink
# Source (Avro, receives data from Agent1)
agent2.sources.avroSource.type = avro
agent2.sources.avroSource.bind = 0.0.0.0
agent2.sources.avroSource.port = 4141
agent2.sources.avroSource.channels = channel1
# Channel
agent2.channels.channel1.type = memory
agent2.channels.channel1.capacity = 5000
agent2.channels.channel1.transactionCapacity = 1000
# Sink (OceanBaseSink, writes data to OceanBase)
agent1.sinks.oceanbaseSink.type=org.example.OceanBaseSink #Need to confirm whether the package path is correct.
agent2.sinks.oceanbaseSink.hostname=$host
agent2.sinks.oceanbaseSink.port=$port
agent2.sinks.oceanbaseSink.databaseName=$database_name
agent2.sinks.oceanbaseSink.tableName=flume_data
agent2.sinks.oceanbaseSink.user=$user_name
agent2.sinks.oceanbaseSink.password=$password
agent2.sinks.oceanbaseSink.channel=channel1
agent2.sinks.oceanbaseSink.batchSize=5
Multi-node mode - Node B: Log collection node
Create a configuration file named oceanbase2.conf and write the following content to it:
agent1.sources = source1
agent1.channels = channel1
agent1.sinks = avroSink
# Source
agent1.sources.source1.type = exec
agent1.sources.source1.command=tail -F /var/log/tomcat/localhost_access_log.******.txt
agent1.sources.source1.channels = channel1
# Channel
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 5000
agent1.channels.channel1.transactionCapacity = 1000
# Sink (Avro, sends data to Collector node 2)
agent1.sinks.avroSink.type = avro
agent1.sinks.avroSink.hostname = $collector_host
agent1.sinks.avroSink.port = 4141
agent1.sinks.avroSink.channel = channel1
Parameter description:
$host: the IP address of the OceanBase database.$port: the port number of the OceanBase database.$database_name: the name of the database.$user_name: the username of the database user.$password: the password of the database user.$collector_host: the IP address of the collector node.
Step 5: Start the Flume Agent
Start the Flume Agent in the deployment mode that you have selected.
Install Flume and Tomcat
# Deploy the Tomcat container (to generate logs)
sudo docker run -d \
--name=tomcat \
-p 8080:8080 \
-v /var/log/tomcat:/usr/local/tomcat/logs/ \
tomcat
# Download the Flume package
cd $HOME
wget https://archive.apache.org/dist/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz
# Extract and install Flume
tar -zxvf apache-flume-1.11.0-bin.tar.gz -C $HOME
mv $HOME/apache-flume-1.11.0-bin $HOME/flume
# Configure environment variables
echo 'export JAVA_HOME=JAVA_HOME_PATH
export FLUME_HOME=$HOME/flume
export FLUME_CONF_DIR=$FLUME_HOME/conf
export PATH=$PATH:$FLUME_HOME/bin' | sudo tee -a /etc/profile
source /etc/profile
The preceding installation steps are for reference only. You need to adjust them based on your environment. For more information, see Install Flume in the official Apache Flume documentation.
Start the Agent on a Single Node
Run the following command to start the Agent:
./flume/bin/flume-ng agent --conf conf --conf-file ./flume/conf/oceanbase.conf --name agent1 -Dflume.root.logger=INFO,console
Start the Agent on Multiple Nodes
Start the aggregation and data ingestion nodes on Node A:
./flume/bin/flume-ng agent --conf conf --conf-file ./flume/conf/oceanbase.conf --name agent1 -Dflume.root.logger=INFO,consoleStart the log collection node on Node B:
./flume/bin/flume-ng agent --conf conf --conf-file ./flume/conf/oceanbase2.conf --name agent2 -Dflume.root.logger=INFO,console
If the Agent is started successfully, the console displays the initialization and running logs of the Agent.
Verify the results
After you start the Flume agent, you can verify whether the log data has been successfully written to OceanBase Database. Perform the following steps:
Check the Flume console output: After you start the Flume agent, the console displays normal startup logs and data processing logs.
Query OceanBase Database: Log in to OceanBase Database and check whether data has been inserted into the
flume_datatable:SELECT * FROM flume_data ORDER BY createTime DESC LIMIT 10;Verify data integrity: Check whether the collected log content, including the message content and timestamp, is stored in the database.
Monitor the Flume status: Use the Flume monitoring interface or logs to view the status of data collection and writing.
FAQs
Q1: What should I do if the following error is returned when I start Flume: "ClassNotFoundException: com.oceanbase.jdbc.Driver"?
Cause: The OceanBase JDBC driver is not properly placed in the lib directory of Flume.
Solution:
- Download the OceanBase JDBC driver (version 2.4.3 or a compatible version).
- Copy
oceanbase-client.jarto thelibdirectory of Flume. - Restart the Flume agent.
Q2: Why does the data fail to be written to OceanBase Database?
Cause: The database connection parameters are configured incorrectly or the user does not have sufficient privileges.
Solution:
- Check whether the connection parameters (
hostname,port,databaseName,user, andpassword) in the configuration file are correctly configured. - Confirm that the database user has the
INSERTprivilege on theflume_datatable. - Check whether the network connection is normal and ensure that the Flume server can access the OceanBase Database.
Q3: Why is the data collection performance of Flume slow?
Cause: The batch size is not properly configured or the network latency is high.
Solution:
- Adjust the
batchSizeparameter and increase the batch size as needed based on the data volume. - Check the network status and optimize the network configuration.
- Consider using a multi-node deployment mode to distribute the collection pressure.
