Data plays a key role in how products are designed at Freshworks. In the first part of our series on how we build our data lake, Baikal, we looked at how we designed and built Baikal, and how we opened up our diverse data to make it accessible from one cohesive layer.
In this post, we dive into what we built over our data lake.
Data Ingestion – Incremental Strategy
When we started with data ingestion, we kept it simple. We built a pipeline using Sqoop, Hive, and scheduled it using Oozie. We did a lot of performance optimization by fine-tuning Sqoop (with mappers), managing the multi-line data in the column, and handling duplicates and international characters. As the usage of the platform increased, the demand for refreshing more frequently and thus, need for incremental data increased. We came up with the following strategy to address that.
Strategy for Incremental Updates
Hadoop wasn’t designed to handle updates and deletes but our focus was to achieve this using standard SQL within the Hive toolset. The following process shows how we went about it.
- Sqoop Phase: In this phase, we pulled only the data that had been changed by using the updated_at column from all shards. (Freshworks data is sharded owing to its massive scale)
- Hive Phase: In this phase, we moved the data to a staging table and did a deduplication check (a double check to avoid any duplicate data that could occur during migrations and rebalancing).
- Hive Reconciliation Phase:
- Updated Records:
- Staging Table (Staging A for example): This table contained new records and also the updated ones
- Existing Table (B for example): This contained the old data.
- Final (Refreshed) Table: Data from B is taken and added to A (excluding the ones present in A since they contain the same ID but old records).
- Deleted Records: Once a week, we did a complete refresh of the data (from RDS) in Baikal to ensure all security policies are adhered to
- Updated Records:
Our primary goal was to ensure that we complied with security policies. We implemented data discovery and classification to determine the IT security needs at an application level. We implemented encryption of data at rest and authentication using Active Directory (AD). We designed a fine-grained, role-based policy to follow.
Baikal is powered by Cloudera and it supports Kerberos for authentication and Apache Sentry for authorization.
Hadoop supports enterprise-grade authentication system with the help of Kerberos. Several components of the Hadoop ecosystem converge to use Kerberos authentication with the option to manage and store credentials in Active Directory.
To begin with, we set up Simple AD in AWS and integrated all EC2 instances with AD. We created groups in line with the org structure at Freshworks and finally we enabled Kerberos in Cloudera. This ensured that without Kerberos session, one would not be able to access any service or perform any action in the system.
Policies are enforced using Apache Sentry (for services such as Hive, Impala, and Search) and HDFS Access Control Lists. Based on the groups created within a Simple AD, these groups are automatically synced to EC2 instances as they are integrated with AD. Today we use Sentry not just for Hive, Impala and HDFS but also with Kafka (to restrict access to Topics and consumer groups) and Solr Collections.
One of the main reasons to set up the data lake was to ensure the ease of data exploration. It was for the same reason we made the data available in the form that people were most familiar with. We kept the schema the same as that in RDS and started evaluating editors that could make the self-service querying simple for end users.
When we started, we were only a five member team. The challenging task for us was to drive adoption, as well as accommodating every team’s needs. We wanted to implement a self-service portal on top of the data platform. We wanted to have a role-based, secure and authenticated access to data for our internal teams without creating dependencies on our team.
Hue (available out of the box in Cloudera) was the perfect web app to start with and we defined databases with fine-grained security access with Apache Sentry. In some cases, a few groups needed access to only specific tables in a database. Apache Sentry allowed us the possibility of restricting up to column level and security was the factor that tipped the scales in choosing Cloudera over Amazon S3 based data lake approach.
Hive editor was available for users to query data. Oozie workflow editors were opened up for users to define workflows and schedule jobs. We ran some targeted sessions and workshops to help onboard teams. User adoption increased with increased data flow.
We also had to make a lot of changes to the Hue code to make it complement our needs. It needed a few fixes in the authentication module to support Google Authorization Login and then linking the user record to work with AD user records, as they were the ones used for authorization.
Data Exploration using Impala
With respect to optimization for performance and ease of use for non-engineering teams, it was an overkill to explain how MapReduce operates and how different they are from running queries in MySQL. Impala was a much better alternative. It has a different execution engine from MapReduce and can cache similar queries resulting in a 600% increase in speed.
As a data scientist working with Hadoop, there is a need to explore data, run ad-hoc queries and build data pipelines. Initially, optimizing Hive queries focused mostly on techniques such as partitioning and bucketing.
We rolled out Impala after a lot of performance optimizations for the best BI and SQL analytics on Hadoop. We made changes to Hive table storage format to Parquet and compression type as Snappy from ORC-with-zlib. The new format significantly increased the size of the files and hence the number of containers required for the same query increased, but it reduced the CPU cycles required to decompress the data.
Baikal as a platform
Baikal was set up to be used as a service. True to its setup, the machine learning team members started writing their own Spark streams to process data in micro-batches to run their algorithms and build models on top of it.
Stream Processing on Kafka using Spark
Apache Spark, which comes as a bundle in Cloudera, helped us with large scale data processing and supported different programming languages and frameworks. Built on lightweight yet powerful APIs, Spark also supported in-memory processing, stream processing, graph processing and machine learning.
We set up a downstream Kafka (bundled in Cloudera) that consumed data from a centralized Kafka service. This centralized Kafka service handles data from all the Freshworks products and we wanted to enable our engineering teams to be able to process the data in real time. The streaming data available in downstream Kafka is ready to be processed by Spark and we also store this data in HBase for various analytical and reporting tools that we use.
We needed application logs in real time to debug and analyze flows. Log Ingestion was done using Flume, Morphline, and Solr (Cloudera Search). These were not just used by our DevOps team but also by our support teams. We needed to run server-log analysis for nearly 1TB of data on a daily basis. For this, we built a pipeline that includes Apache Flume, Solr, and Hue to arrive at the results.
Our analysis showed that only 30-day-old data was being used for real time needs, while the rest was largely used for audit trails. The data that is pushed by Apache Flume agents, installed across all application servers, are parsed, enriched (for geospatial needs) and indexed in Solr using Morphline. For a multi-tenant system, we had to tune a lot of parameters right from CPU usage to memory storage. Fortunately, Cloudera came with CGroups (Static Resources Allocation) that helped us manage the cluster resources better. We also fine-tuned Flume, Solr to handle close to 80,000 messages/second.
We use Oozie to define and schedule workflows for reporting. We used Hive Queries and inserted in an Oozie workflow after being tested in Beeswax Hive Editor. This helped us extract the data without touching the command line. The workflow contains an action (usually sending an email) which sends the user their queried data as a CSV file. Alternatively, the enriched/aggregated data (non-PII) can be stored in different file formats in HDFS, HBase, and S3. Some are even pushed to BI tools for further analytical processing.
Unstructured data from logs are parsed, processed, and enriched. They are then stored in the form of Hive Tables. Our DevOps team keeps a close tab on the performance as poor performances have a negative impact on the business and users. Apart from using this system to track events and debug issues, the DevOps team was one of the earliest adopters of this platform. They also generate and send out reports of the 95th and 90th percentile response times for various components, dubbed as delight metrics to the product teams. These reports are used to provide a comprehensive basis for automatic baselining, behavioral learning, and optimizing the application flow.
Sisense and Tableau are used extensively by the marketing and business functions. The dashboards are powered by data from our Data Lake. In order to enable Advanced Analytics, we helped teams build cubes (pre-summarized across dimensions) and generate dashboards of their choice. For example, in case of Sisense, we integrated the product using Hive JDBC connector protected with Kerberos. The permissions are in any case taken care by Sentry.
We also enabled teams to write and run their own Oozie jobs from Hue Editor. This way, teams having access to Natero APIs were able to push data using Oozie coordinators.
Our vision for the future
From the beginning, our engineering leadership team evangelized data-driven decision making and solutions in our workplace. We, the data lake team at Freshworks, aspire to make Baikal a platform that truly democratizes data access and helps every team to take swift decisions based on the insights and intelligence that data itself creates.