Real-Time data processing architecture using Apache Spark, Apache Kafka, and ClickHouse

Saravanan A R
WhatfixEngineeringBlog
8 min readJan 3, 2022

--

This blog will talk about Real-time data processing architecture using Apache Spark, Apache Kafka, and ClickHouse.

What is Real-Time data processing architecture?

Real-time data processing architecture involves processing data in real-time/near real-time. It processes the data as soon as the data is generated and provides insights.

Let’s say, we are getting events(data) on how the user is using our web application. To make it simple, say, we’re collecting events when the user clicks a content, hovering over content, etc. Using these collected events we want to provide a customized experience for an individual user. To provide a customized experience, we want to process the collected events as soon as it is generated. The delay in processing will impact the user experience. Ideally, whenever the user interacts with our web application, we process the collected data and provide a customized experience without any delay.

In case, if you’re not aware of any one of the terminologies (Apache Spark | Apache Kafka | ClickHouse | Amazon S3), I would suggest taking the tutorial from the link below.

Apache Spark:
Apache Spark is a distributed computing framework. To make it simple, in layman’s terms, Say, we have a very large dataset and we want to do some operations on it(say, we want to count the occurrence of a particular word). To do this operation, we have a cluster of machines. And we decided to split the dataset into subsets and distribute the subsets across machines to do our operations. We thought this way, we can utilize the available machines efficiently. But maintaining subsets across machines is complex. What if there is a framework that will handle this splitting of dataset across machines and also handles consistency and fault tolerance. Yes, Apache Spark provides such a framework.

Apache Kafka:
Apache Kafka is a message streaming component. It provides a high-throughput, low-latency platform to stream data.

ClickHouse:
ClickHouse is an open-source column-oriented DBMS. It’s useful to generate analytics reports using SQL-based queries. Also, ClickHouse comes with a Kafka engine using which we can read/write data to and fro from Kafka topics.

Amazon S3:
Amazon S3 is an object storage service and it’s a part of the AWS ecosystem. It offers a highly available and scalable storage service.

Prerequisites:
To implement this architecture, we have to install Apache Spark, Apache Kafka, and ClickHouse.

Goal:
In this blog, with this architecture, we are demonstrating a simple analysis of how much time(in terms of percentage) each API call takes to resolve in a time range.

Consider, we’re collecting the following data.

  1. API_ID — Uniquely identifies an API endpoint exposed by a service.
  2. Time_taken — how much time the server took to resolve the API.

These data are stored in ClickHouse from various services. Our goal is to find how much percentage of time each API call takes in a time period. Using this insight, we can analyze how much impact is created by APIs on the servers in a certain time range.

The real-time data processing architecture:

Architecture Description:
In this architecture, the ClickHouse is considered as a single source of truth. Other services/components store data into ClickHouse.

1. Whenever the applications insert a record into a ClickHouse table, the inserted record will be pushed into Kafka by ClickHouse Kafka Engine. (Refer to step A in the architecture diagram).

In our demonstration example, the application/services will insert <api_id, time_taken> into a ClickHouse and it’ll be pushed to Kafka to stream.

2. Apache Spark streams real-time data from Kafka using Kafka-Spark connector. Using this stream of data, Spark can do real-time processing. (Refer to step B)

In our example, the Apache Spark job receives a stream of <api_id, time_taken> from Kafka.

3. Once the processing completes, it’ll store the result into the S3 bucket. (Refer to step C)

In our example, the Apache spark job will calculate the percentage of time each API took in a time period and the calculated percentage will be stored as a param(percentage_time) in our dataset. ie. <api_id, time_taken, percentage_time>

Implementation of this architecture involves five steps:

1. Kafka-Apache Spark Connector setup.
2. Apache Spark-Amazon S3 connector setup.
3. Configure ClickHouse — Kafka.
4. Listen to Kafka topic from Apache Spark.
5. Store the result into Amazon S3.

1. Kafka-Apache Spark connector setup:
To listen to Kafka topics, we need the following jars. These jars will provide an interface for Apache Spark to listen to Kafka topics.

Also, Apache Spark has the ability to listen to multiple Kafka topics.

  1. spark-sql-kafka-0–10_2.12–3.1.2.jar
  2. spark-token-provider-kafka-0–10_2.12–3.1.2.jar
  3. kafka-clients-3.0.0.jar
  4. commons-pool2–2.11.1.jar

Copy these jars into $SPARK_HOME/jars/ folder.

wget https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.1.2/spark-sql-kafka-0-10_2.12-3.1.2.jar -o $SPARK_HOME/jars/wget https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.1.2/spark-token-provider-kafka-0-10_2.12-3.1.2.jar -o $SPARK_HOME/jars/wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.0.0/kafka-clients-3.0.0.jar -o $SPARK_HOME/jars/wget https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar -o $SPARK_HOME/jars/

Tip: By default, Apache Spark job loads jars from $SPARK_HOME/jars/ folder. It’s not mandatory that you have to copy the jars into this folder. Instead, you can load these jars while submitting a Spark job using — conf argument. Refer here to know more about this.

2. Apache Spark-Amazon S3 Bucket Connector:
To write data into the Amazon S3 bucket, we need the following jars. These jars provide an interface to write data into the Amazon S3 bucket from Apache Spark.

1. aws-java-sdk-core-1.11.762.jar

2.aws-java-sdk-1.11.762.jar

3.Aws-java-sdk-dynamodb-1.11.762.jar

4. aws-java-sdk-s3–1.11.762.jar

5. hadoop-aws-3.1.2.ja

Copy these jars into $SPARK_HOME/jars/ folder.

wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.762/aws-java-sdk-core-1.11.762.jar -o $SPARK_HOME/jars/wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.762/aws-java-sdk-1.11.762.jar -o $SPARK_HOME/jars/wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-dynamodb/1.11.762/aws-java-sdk-dynamodb-1.11.762.jar -o $SPARK_HOME/jars/wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.762/aws-java-sdk-s3-1.11.762.jar -o $SPARK_HOME/jars/wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.2/hadoop-aws-3.1.2.jar -o $SPARK_HOME/jars/

Tip: By default, Apache Spark job loads jars from $SPARK_HOME/jars/ folder. It’s not mandatory that you have to copy the jars into this folder. Instead, you can load these jars while submitting a Spark job using — conf argument. Refer here to know more about this.

3. Configure ClickHouse — Kafka:
With the help of the ClickHouse — Kafka engine, we can push records from the ClickHouse table to Kafka or vice versa.

In our demonstration, say, our requirement is, whenever we insert a record in the API table with SERVICE_ID = 1, the inserted record has to be pushed into Kafka.

To do this, we need four things:
a. A ClickHouse source table — Create a source table API. Whenever we insert a record into this table, the data will be pushed to Kafka by the ClickHouse Kafka engine.

— To create an API table
CREATE TABLE API (
api_id Int32 Codec(DoubleDelta, LZ4),
time DateTime Codec(DoubleDelta, LZ4),
time_taken DateTime Codec(DoubleDelta, LZ4),
Service_id Int32
) Engine = MergeTree
PARTITION BY toYYYYMM(time)
ORDER BY (api_id, time);

b. Kafka topic (kafka_topic_1) — Create a target Kafka topic where the Kafka engine pushes ClickHouse records that satisfy the condition SERVICE_ID = 1.

#To create kafka topic
kafka-topics \
--bootstrap-server kafka:9092 \
--topic kafka_topic_1 \
--create --partitions 6 \
--replication-factor 2

c. Create a ClickHouse table(kafka_queue) using the Kafka table engine corresponding to the created Kafka topic.

--To create kafka_queue table using kafka engine
CREATE TABLE kafka_queue (
api_id Int32,
time_taken Int32
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'kafka_topic_1',
kafka_format = 'CSV',
kafka_max_block_size = 1048576;

d. Create a materialized view(kafka_queue_mv) — it transfers rows with service_id = 1 from the API table to the kafka_queue table.

-- To create materialized view in ClickHouse
CREATE MATERIALIZED VIEW kafka_queue_mv TO kafka_queue_mv AS
SELECT api_id, time_taken FROM API
WHERE service_id = 1

Finally, we need to run a Kafka consumer to test.

> kafka-console-consumer.sh --bootstrap-server kafka:9092 --
topic kafka_topic_1

Now, whenever you insert a record to the API table with service_id = 1, the record will be pushed to the kafka_topic_1.

4. Listen to Kafka topic from Apache Spark:
To read the Kafka stream from the Apache Spark job, we need the following information.
a. Kafka server domain/IP address.
b. The port on which the Kafka server runs.
c. Kafka topic[s] to listen to.

Apache Spark has the ability to listen to multiple Kafka topics from multiple Kafka instances. For demonstration, the below code snippet listens to the topic kafka-topic-1 from the kafka:9092” instance.

dataframe = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "kafka-topic-1")
.load()

Once we load, the next step is to calculate the percentage of time taken by API.

dataframe
.withColumn(“percentage_time”, percent_rank().over(windowSpec))
.show()

The next step is, we need to write the stream to the Amazon S3 bucket.

Note: Apache Spark has the ability to write the stream to many data layers including HDFS, Cassandra, etc.

5. Store the result into Amazon S3 Bucket:

To store the data from Apache Spark to Amazon S3 bucket, we need S3 bucket Access Id and Secret key. Using this access id and secret key, Apache Spark authenticates with AWS.

We have two options to set Access Id and Secret Key:

5-a) Store Access Id and Secret key as an environmental variable — Not a preferred way. The reason is, any Spark job we submit, it’ll have access to access_id and secret_key since it’s set as an environmental variable.

export AWS_SECRET_ACCESS_KEY=XXXXX
export AWS_ACCESS_KEY_ID=XXXXX

5-b. Set Access Id and Secret Key in SparkContext — Another option is, we can set Access Id and Secret Key in the Spark Context object.

spark.sparkContext._jsc.hadoopConfiguration()
.set(“fs.s3a.access.key”, xxxxxxx)
spark.sparkContext._jsc.hadoopConfiguration()
.set(“fs.s3a.secret.key”, xxxxxxxxxxxxxxxxxxx)

In the above code snippet, we are setting fs.s3a.access.key and fs.s3a.secret.key params.

Tip: For S3:// schema, use fs.s3.access.key and fs.s3.secret.key params.

Once we set the access key and secret key, the next step is to write the code which actually stores the data into the S3 bucket. Writing a Spark data frame to an S3 bucket is similar to writing to HDFS or a local file system.

The code snippet for this,

dataframe
.write
.mode("overwrite")
.parquet("s3a://test.bucket/spark_output)

In the above code snippet, we are writing the data frame as a parquet file into the S3 bucket(s3a://test.bucket/spark_output)

Conclusion:

In this blog, we’ve discussed Real-time data processing architecture.
At Whatfix, we are reinventing ourselves with new cool technologies to give an amazing experience to the customer. We are using Apache Spark as a data processing framework, ClickHouse as an analytics platform, and Kaka as an event streaming service.

In real-time processing architecture, the load given to the Spark job is constant. If we set the offset as 1K in the Spark-Kafka connector, the Spark job will read a 1K record at a time from Kafka. So, at any given point, the spark job will process only a 1K record. Also, this architecture will give real-time/near-real-time results.

--

--