Modernizing analytics data ingestion pipeline from legacy engine to distributed processing engine

Analytics is a key part of all Freshworks products. It gathers events from these products and puts them together in one central analytics platform. Then the data is transformed based on preset rules, making sure it’s organized by products and customers. Once the data is ready, we can use it for visualization and other tasks. This whole process happens continuously, and we promise that any event will be in our analytics system less than 15 minutes from when it happens.

To make this happen, we have a persistent pipeline. This pipeline has different jobs: It pulls data, transforms it, groups it together, and puts it into a special storage place regularly.

As Freshworks’ products keep growing, we handle more and more data in Analytics. We process millions of messages per minute.

  • At the busiest times, we get about 800,000 messages per minute. After we transform and multiply them, they become about 1.3 million messages per minute.
  • When we temporarily move data from our products to Analytics, like for special projects, it can go up to 3 million messages per minute. This needs extra infrastructure to handle.

Managing a large volume of data in a short time is demanding. It poses challenges in terms of expenses, scale, and speed. This article narrates the journey of transitioning our analytics data platform from traditional high-scale data handling methods to a modern distributed and auto-scalable system.

Legacy system

The legacy system used the traditional Python consumers and API layer to ingest data into a time series database in real time. This was built about 8 years ago and has been through many scale and performance enhancements, but it preserved the core API. The system was able to handle the scale efficiently with horizontal scaling (adding more infrastructure) until its deprecation. But the downside was the maintenance and cost required to achieve scale.

Here’s how the legacy system operated:

  • A group of horizontally distributed Python consumers
  • Followed by a separate system built with Ruby on Rails that received data through an API in real time and batched into CSVs
  • A scheduler written in Apache Airflow handled the loading of CSV files into the target warehouse

Here are the key functions of the system:

  • Continuously receive messages from the Central Kafka
  • Identify and remove duplicate messages while transforming them in real time
  • Validate and enforce a specific schema and structure for the transformed messages in real time
  • Generate CSV files and upload them to designated S3 buckets at regular intervals
  • Load the CSV files into a central storage location (target warehouse) at scheduled intervals

Architecture of legacy pipeline

Components and their roles

Central Kafka system

The centralized Freshworks Apache Kafka service is called Central. All the products push their events to Central from where the downstream systems will consume (Analytics is one of the consumers).

Transformation and push consumers

This is the entry point of the analytics ingestion pipeline. These are the set of Python consumers that read the messages from Kafka (Central) and apply the transformation rule configured based on the payload type. The transformed messages are pushed to a success topic in Kafka, which holds all the transformed messages from C1. The transformed message has all the necessary meta information of the target schema and table name along with the necessary data to be inserted into it.

Push consumers read the messages from the success topic and push (API call) the message to the reports platform endpoint. The transformed message is self-sufficient for the API layer to understand and continue the ingestion further.

Data ingestion and inlet layer

The data ingestion layer is the API gateway to which the push consumers push the events. The received events will be validated against the metadata and verified for the quality of the events received.

In the initial design, the API ingested data directly into the time series database. But whenever the traffic surged, the effect was more on the MySQL time series database since it started facing more writes than usual, affecting the overall write time. The ingestion API has to wait until the record is ingested into the time series database and thus the response time of API will be impacted.

To avoid this scenario, a Sidekiq queue was introduced to decouple the API and time series and to do controlled insertion into the time series database. The API layer could scale independently with no impact on the successive layers.

The Inlet layer dequeued from the Sidekiq queue and inserted events into the time series database.

Time series database

  • The time series database was built in MySQL and stored the incremental data every day.
  • The entire event is stored in a single column as JSON along with columns such as tableID, AccountID, etc.
  • This was a daily table and we always stored up to five days of data for fallback and recovery in case of disaster.

Batching layer

This is the batching layer that pulls a batch of data from the time series database and creates the CSV files.

  • Every row in the time series database has an associated row_id. The audit tables maintain the last row ID for every table that got batched and pushed as CSV files into S3.
  • This layer gets the last_row_id from the audit tables and pulls the newer rows.
  • Created CSVs with the set of data required.
  • CSV files will be uploaded to S3.

Airflow and Snowflake

The airflow layer is a typical scheduler that pulls the CSV files from S3 and loads them into the corresponding table in Snowflake (Upsert for most of the cases). Snowflake is our reporting database, which stores the transformed data for all the tenants.

Challenges in the legacy pipeline

We were experiencing a continuous surge in traffic from our products to Analytics over time. This heightened influx imposed additional strain on the system. Also, our pipeline had to handle both the usual traffic and migration simultaneously whenever needed.

Therefore, in cases where we encountered a substantial migration with a throughput similar to that of our product’s regular traffic, we adopted a more gradual approach, utilizing limited resources. We did synchronize the data, but with a further delay. The delay ranged from four hours to 24 hours based on the volume and criticality of the migration. But for a critical migration with high throughput of approximately 2x, we had to double the infrastructure. And when the migration was complete, we had to decommission the new resources.

This process of scaling in and out came with challenges:

Scalability

  • The scaling process was carried out manually, often necessitating the ETL engineer’s input to gauge and adjust the scaling levels.
  • Kafka strictly maintains a one-to-one correspondence between partitions and consumer processes, so the highest achievable level of parallelism for a consumer group was determined by the number of partitions within a given topic.
  • When we introduced additional consumers, it was crucial to correspondingly increase the number of Inlet machines. This was done to prevent a rapid accumulation of data in Redis.
  • However, having more Inlet machines translated to an augmented number of connections on the RDS.
  • Furthermore, the storage capacity of the RDS system depleted more quickly than a typical day when scaling adjustments are made.
  • Both Redis and Sidekiq limited scaling. Additionally, the time series database presented a bottleneck in terms of scalability, and implementing sharding introduced additional overhead.

Maintenance

  • Managing multiple intermediary systems between Kafka and the target warehouse posed operational challenges.
  • Any service downtime within the entire pipeline resulted in overall system downtime, requiring significant on-call effort for maintenance.
  • The deployment process was complex due to the presence of multiple components.

Cost

  • The integration of numerous systems led to increased operational costs. Over the years, the overall system costs grew steadily, driven by the rising traffic from various products.
  • Any scale-out in the consumer system necessitated a corresponding scale-out in subsequent systems, resulting in a multiplication of the overall pipeline costs.

Solution

The solution is to replace the entire batching layer (highlighted in pink in the legacy architecture diagram above) with a single distributed streaming pipeline in Apache Spark. The internal details are covered in the next section.

New system

Our solution was to build a streaming pipeline in Apache Spark, replacing the legacy pipeline.

The streaming pipeline reads messages from Success Topic and does all the validation, schema enforcement, schema evolution, segregation, and batching. Finally, it creates CSV files and loads them into S3. The output file from the streaming pipeline is similar to the output files from the legacy pipeline.

The Spark pipeline covers all the functional and non-functional requirements in parity with the legacy pipeline. It also has an advantage when it comes to costs, ease of maintenance, performance, and scale.

Workflow steps

Consumption as DataFrame

The Spark application, triggered at predefined intervals, initiates the journey by reading data from Kafka in the form of DataFrames. The Dataframe at this point looks like this:

TopicName Offset Partition Value
Topic 1 Offset 1 1 abc
Topic 2 Offset 2 10 xyz1

The Value column holds the entire message. Since the Spark pipeline consumes from the Success Topic, the value column holds events published by the transformation consumers (C1 consumers). This transformed message holds all the necessary details, such as:

  • Target product (Freshdesk, Freshservice, CRM, etc.)
  • Target TableName

Thus the transformed message has sufficient information for the Spark pipeline to proceed

The Spark application connects to the meta database and pulls the schema (table, column, mandatory keys, primary keys) of all the tables for the product

Filtering DataFrame for each table and applying schema

The DataFames undergo filtering to create filtered DataFrame tailored to specific tables; e.g., if we have 100 tables to process in a job, there will be 100 filtered DataFrames.

The next step is to assign schema to each of the filtered DataFrames based on the target product and table name.

The resultant DataFrame after this step looks identical to the target table. For example, the filtered DataFrame of the Tickets Table of Freshdesk looks like this:

AccountID TicketID Col1 Col2
123456 8439 tkt it

File creation

  • The filtered DataFrame can be written as a file into the load bucket.
  • The output is neatly consolidated CSV/Parquet files for each table.
  • These files, in turn, serve as the direct input for loading data into their corresponding target tables.

Upsert using staging table

  • Since the incremental batch file has to be merged into the target table for every batch, we have an intermediate staging table to achieve the same.
  • The staging table acts as a temporary holding ground for new data, enabling efficient upsert operations against the final target table.

In addition to incorporating all the necessary features into the existing pipeline for parity, the new pipeline boasts several technical advantages:

  • The entire pipeline is driven by configuration settings. The addition, removal, or modification of an ingestion pipeline can be accomplished through configurations
  • Making changes to a table—whether adding, removing, or modifying—will not impact the pipeline and requires no alterations to the code. The pipeline seamlessly handles schema evolution for any modified or added tables.
  • Spark’s auto-scaling feature proves to be advantageous. Parameters for controlling auto-scaling can be selected and configured based on our specific requirements.
  • Manually scaling up or down a cluster is also straightforward and, once again, driven by configurations.
  • The need for Redis is eliminated, given that Spark functions as an in-memory compute engine.

Key features and implementation details

Schema evolution

  1. New features often require the addition of tables or columns
  2. When a column is removed at the source, a corresponding removal from the analytics schema is essential to maintain coherence
  3. Changes in data type from one form to another in the source system

To address this, we’ve implemented a schema evolution flow:

  1. Schema revision job: This job triggers the creation of a Redis key, timestamped with the current time (time.now), signifying the occurrence of a schema revision
  2. Spark integration: Before processing each microbatch, the Spark component checks the Redis key against a variable storing the previous timestamp. If a change is detected, it retrieves the updated metadata from the database.

Observability

  • Analytics is a P0 system and observability is crucial to identify any issue or incident on time
  • Mean time to detect and mean time to resolve have to align with the commitment of the platform to the customer

Operational metrics

For observing the health, stability, and performance of the application, some of the key metrics are published to Trigmetry (central service for observability in Freshworks).

All the operational metrics are published to Trigmetry from the executor machines of the cluster in regular intervals to ensure continuous observability and alert the engineers in case of anomalies.

  • Number of incoming events in a micro-batch
  • Number of messages processed
  • Number of rejected messages in transformation
  • Batch duration

Consumer lag metrics

Unlike a regular Python consumer, the Spark streaming application does not start with a static consumer group, so it does not commit offsets to broker. So the traditional way of calculating the consumer lag with the consumer group is not possible.

Instead, it stores the offsets in the checkpoint file. So we have written a standalone scheduled Python consumer to poll the latest offset for the topic and compare against the offset in the checkpoint file to calculate the lag.

Key metrics captured

  • Input records per batch: When dropped to zero continuously, alert as ‘Spark streaming application failure’
  • Processed records per batch: When dropped to zero continuously alert as ‘Spark streaming application failure’
  • Rejected records per batch : Alert: Spark streaming records rejected
  • Batch duration: When the batch duration crosses the trigger interval but input records and processed records are more than zero, alert as ‘Spark streaming batch spillover’
  • Consumer lag: The consumer is lagging behind beyond the threshold value, alert as ‘Spark streaming lag above threshold’

This will indicate the following scenarios:

  1. Surge in incoming traffic
  2. Consumer is not running for a period of time
  3. The processing rate is behind the incoming rate

Summary of improvements

Scalability

  • The ad-hoc pipelines can be scaled independently based on the input traffic without any bottlenecks. The new pipeline being a single standalone application has no limitation or bottleneck.
  • Scaling operations are more efficient. Addressing lag issues is considerably swifter and simpler. Dynamic allocation and auto-scaling features allow us to increase infrastructure and throughput within five minutes of hitting lag thresholds.
  • The previous limitations tied to Redis and MySQL, which hindered scalability, have been eliminated.
  • If an incident is identified and requires re-sync of data in high volume, the streaming pipeline can complete it within a few hours, versus days in the old pipeline. This is again because of the scaling capability being independent and has no dependencies on the live pipeline.

Maintenance

  • Deployment has become more straightforward in Jenkins. Adding a new product is a matter of including a new entry in the configuration file and deploying the application. This action initiates the cluster, deploys the app, and handles the distribution of CSV files, logs, and metrics.
  • The overall maintenance efforts have reduced drastically after the migration. We have a single application and a cluster setup in place of five components before. The number of alerts has reduced by 75%.

Availability and fault tolerance

  • The system boasts high availability with the supervisor in place. Any failure or machine crash triggers an immediate automatic restart by the supervisor.
  • The pipeline, even in the event of a failure, resumes from the point of interruption. The checkpoint is managed locally, independent of Kafka. We can also revisit any previous point in time to reprocess data, which proves invaluable for message reprocessing.

Extensibility

  • The groundwork laid in the platform simplifies the process of modernizing C1.
  • For near-real-time (NRT) use cases, achieving lower trigger intervals is now more feasible, provided the target supports NRT ingestion.

Accuracy

  • The new pipeline accommodates a broader range of emojis and characters that were previously unsupported. This enhancement has notably improved accuracy in VARCHAR columns.

Cost savings

  • Over half of the infrastructure expenses have been saved through optimizations in our AWS infrastructure.
  • Streamlined logging processes have contributed to cost savings on Haystack. Additionally, Redis usage has been reduced.
Cost savings across regions
Region Savings percentage
(legacy vs. stream)
Daily savings ($) Yearly savings ($)
AU 48% 63 23K
IND 80% 623 227K
EUC 66% 261 95K
US 82% 1361 496K

 

Total annual savings: $841K + Haystack & staging ($260K) = $1 million

Effective use of EMR

The entire Spark application is hosted in EMR clusters. Amazon EMR is the service provided by AWS in which we can deploy and run our Apache Spark application on-demand.

Apache Spark being a distributed engine, the best performance is achieved only on the right strategy of distribution of workload. So, the best results can be achieved only by choosing the optimal values in resource allocation based on our use case, efficient memory, and disk optimization in the EMR clusters used.

Optimisations on EMR

Resource allocation

  • Two core containers for small applications (XL machines)
  • Four core containers for big or medium-sized applications (2XL machines)
  • Driver is of equal size of executor
  • Master node is generally 2XL since it has to coordinate multiple applications

Memory optimization

  • Used Persist and Unpersist effectively. Since Spark consumes a batch of messages in every trigger, the input dataframe after initial validation is cached so that the subsequent filtering for each table is done on the cached dataframe. This eliminated the redundant operations starting from the source. At the end of the micro-batch, the data frame is un-persisted to free up the storage memory for the next micro-batch.
  • The entire DAG involves no shuffling since there is no join or repartition involved.

Disk optimization

  • The cache is done in memory and not spilled onto disk. Since the trigger interval is usually one to five minutes, the volume of messages in a batch will be a few hundred megabytes and will fit into memory; there will be no IO to disk.
  • Faced application failures due to disk being full since the logs are getting pushed to HDFS by default.
    • Changed the configuration to route Spark event logs to S3 instead of HDFS.
    • Enabled the rolling logs and tuned the logging configurations
      • –conf spark.eventLog.dir=s3://{event_log_bucket}/{event_log_directory}/ ‘
      • –conf spark.history.fs.logDirectory=s3://{event_log_bucket}/{event_log_directory}/ ‘
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.history.fs.cleaner.enabled": "true",
"spark.history.fs.cleaner.maxAge": "2h",
"spark.history.fs.cleaner.interval": "1h",
"spark.eventLog.rolling.enabled": "true",
"spark.eventLog.rolling.maxFileSize": "10m",
"spark.history.fs.eventLog.rolling.maxFilesToRetain": "5",
"spark.eventLog.rotation.interval": "300"
}
},
{
"Classification": "yarn-site",
"Properties": {
"yarn.log-aggregation.retain-seconds": "14400",
"yarn.log-aggregation-enable": "true"
}
}
]

Conclusion

The challenges faced in the legacy pipeline and resolved in the new pipeline are scalability, cost, maintenance, and high availability. The new pipeline operates at much higher scale with a much lower cost. It needs less effort for maintenance and keeping the lights on.

There are modern techniques and advancements happening in the world of distributed computing, and this sets the path forward to build an efficient data ingestion system.

Upcoming projects in data platform

This modernization is the first project in this space, and there are many projects to follow:

  • Modernization of C1 consumers and integrating C1 into the application. One application from Kafka to Batch files.
  • NRT pipelines from source to sink
  • Building a data lake solution in addition to data warehouse to support machine learning workloads

Upcoming articles in this series

  • Deep dive into choosing the right infrastructure in EMR for our use case
  • Deep dive into observability and building a consumer lag dashboard for Apache Spark streaming
  • Deep dive into effective use of cache and persist in a Spark application

References

  • https://kafka.apache.org/intro
  • https://spark.apache.org/
  • https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html
  • https://spark.apache.org/docs/latest/sql-programming-guide.html