Time-based batch processing architecture using Apache Spark, and ClickHouse

Saravanan A R
WhatfixEngineeringBlog
5 min readMar 31, 2022

--

In the previous blog, we talked about Real-time processing architecture using Apache Spark, ClickHouse, and Apache Kafka.

This blog will talk about “Time-based batch processing architecture”.

Why do we need a batch processing architecture over the Real-time processing architecture?

  1. Business logic decides what to choose. If the business logic doesn’t want to process the data in real-time. Instead, it just wants to process the data in a periodic manner. Even if we process the data in real-time, there is no impact on the business, better to choose Time-based batch processing architecture.
  2. Also, In real-time processing architecture, a Spark job will run and listen to message streaming components like Kafka all the time and process the newly generated data in real-time. In time-based processing architecture, the spark job won’t run all the time. Instead, the Spark job will be initiated when needed. So, we are not utilizing the computing resource all the time. We are using the resource when needed.

For example, we want to generate a feature adoption rate report every week.
Say, in our product, we released a feature Autosuggestion. Whenever the user types, the product will provide a list of suggestions. We have to generate an adoption report for this feature every week.

Consider, the product has 5 Million users and we have billions of analytics events for those users. With our architecture, we’re generating this report every week.

The report has three data,
1. User ID
2. How many times has the user used the “Autosuggest” feature in the last week.
3. The Average number of times the user typed before the user got the right suggestion.

The report will look like,

To build Time based data processing architecture to generate adoption reports, we need the following components:

  1. A cron to trigger Apache Spark job in a configured time period.
  2. Apache Spark — ClickHouse connector to import feature analytics data from ClickHouse to Apache Spark.
  3. Apache Spark — S3 connector to store the report in the AWS S3 bucket.
The time-based batch processing architecture

Unlike the real-time processing architecture, in this architecture, Apache Spark won’t listen to message streaming components like Kafka topics all the time.
Instead, when cron hits the configured time(A), it’ll import the data directly from ClickHouse using ClickHouse Connector(B).
Once the data is imported, Apache Spark will process the data and generate the report. The generated report will be stored in the AWS S3 bucket(C).
Let’s see how to implement each component in detail.

A Cron to trigger Apache Spark Job.

With the help of cron expressions, we can specify when the Spark job has to be initiated.
Our goal is, we want to generate the adoption report every week. For example, to trigger the job every week at Sunday, 12 AM, the cron expression is, 0 0 * * 0

Apache Spark — ClickHouse connector:
Once the cron triggers the Apache Spark job, with the help of the ClickHouse connector, a Spark job will import the data from ClickHouse.
To set up this connector, copy the ClickHouse-Spark JDBC jar into the $SPARK_HOME/jars/ folder.

wget https://repo1.maven.org/maven2/com/github/housepower/clickhouse-native-jdbc/2.6.0/clickhouse-native-jdbc-2.6.0.jar -o $SPARK_HOME/jars/

Tip: By default, Apache Spark job loads jars from $SPARK_HOME/jars/ folder. It’s not mandatory that you have to copy the jars into this folder. Instead, you can load these jars while submitting a Spark job using — conf argument. Refer here to know more about this.

Now, we have copied the required jar into the Apache Spark classpath.
Next, to query data from ClickHouse, we need,
a. ClickHouse IP and port.
b. Database name and table name.
c. ClickHouse user name and password.
d. ClickHouse query.

The code snippet to query ClickHouse data is,

clickhouse_url="jdbc:clickhouse://<ip><port>/<db_name>"properties = {
"password": <clickhouse_password>,
"user": <clickhouse_user>,
"driver":"com.github.housepower.jdbc.ClickHouseDriver"
}
clickhouse_query="select user_id, typed_count from autosuggest_analytics where timestamp <= currentTime and timestamp >= currentTime-sevenDays”autosuggest_events = spark.read.jdbc(url=clickhouse_url, table=clickhouse_query, properties=properties)

With the above code snippet, we have imported the last seven days’ analytics data specific to the AutoSuggest feature.
The “autosuggest_events” variable holds the pointer to the imported data distributed across the worker nodes.

Let’s display the top 9 data present in the autosuggest_events variable.

Now, we want to generate two columns. no_of_times_used and avg_typed_count.
The column no_of_times_used represents how many times the user used the Auto-suggest feature in the last week.
The column avg_typed_count represents the average keyboard typed count before the user clicks the suggestion.

With the below code snippet, we can generate these two columns

adoption_report = autosuggest_events.groupBy("user_id") \
.agg(
count() as no_of_times_used,
avg("typed_count") as avg_typed_count
)

Now, the adoption_report will have the needed report. The next step is to save the data in the S3 bucket.

Apache Spark — S3 connector:

To store the result in the S3 bucket, we need to add the S3 bucket connectors.

1. aws-java-sdk-core-1.11.762.jar
2.aws-java-sdk-1.11.762.jar
3.Aws-java-sdk-dynamodb-1.11.762.jar
4. aws-java-sdk-s3–1.11.762.jar
5. hadoop-aws-3.1.2.ja

Copy these jars into $SPARK_HOME/jars/ folder.

Once we copied these jars, in the code, we need to set the access key and secret key. Refer to the below code for how to set it.

spark.sparkContext._jsc.hadoopConfiguration()
.set(“fs.s3a.access.key”, xxxxxxx)
spark.sparkContext._jsc.hadoopConfiguration()
.set(“fs.s3a.secret.key”, xxxxxxxxxxxxxxxxxxx)

Now, we are ready with the setup. Next, we need to persist the adoption report to the S3 bucket.

Writing to the S3 bucket in Apache Spark is very simple and straightforward.

dataframe
.write
.mode("overwrite")
.csv("s3a://test.bucket/spark_output")

With the above code, we can write the adoption_report RDD into the S3 bucket in the form of a CSV file.

The option mode(“overwrite”) will overwrite the already persisted data in the given s3 bucket URI.

Conclusion:
In this blog, we’ve discussed Time-based batch processing architecture, the load given to Apache Spark is not constant.
The main advantage of this architecture is, the Apache Spark job will be triggered when it’s needed. Once the processing completes, the Spark job will move to Finished State. So, we are not utilizing the computing resource all the time, unlike the Real-time processing architecture.
The disadvantage of this architecture is, the load is purely based on how much data is generated in the specified time interval.
For example, If 1M analytics data was generated in a week ‘A’, Apache Spark will process the 1 Million generated data.
In the next week, we may have 10 Million analytics data to process. Hence, identifying the resource limit(CPU, Memory) for Apache Spark is tricky.

--

--