Revamping data consumption at Meesho for modern data needs

  • Raghwendra Singh

    Raghwendra Singh

    Software Development Engineer 4

    Meesho

Share

Our data was scaling at an increasing rate. We needed more advanced compute engines with the ability to scale, audit, and run existing and newer workloads with better control. 

Meesho’s first data lake consisted of a framework that ingested data from different pipelines into a managed data warehouse. We had issues scaling the managed data warehouse due to read after write locks, and storage coupled with compute, increasing cluster costs.

To overcome these scaling issues, we moved towards a data lake architecture where our data is written in an object store with partitioning that allows us to write data in append, upsert and overwrite modes. A Hive metastore (HMS) is used so that different compute engines like Starburst Enterprise Platform (SEP) & Spark can be employed for different use cases.

We use SEP as one of the engines because it provides similar or better query performance compared to a managed data warehouse for dashboard data sources, and ad hoc queries from Zeppelin and the long running queries. In addition, SEP provides the following benefits:

  • Delta lake catalogs read data directly from delta transaction logs.
  • The clusters are easy to maintain.
  • The clusters are scalable to support different workloads at different times of the day.

Our initial SEP cluster

Our team had not transitioned fully to Kubernetes-based deployments when we initially deployed SEP. Instead, we used SEP’s Cloudformation template (CFT) to create multiple clusters designed for different workloads:

  • ETL: Long-running queries, on the order of an hour, supporting dashboards and data science use cases.
  • Reporting: Long-running queries which create data to share via google drive reports or email.
  • Backend clients: Queries supporting Java clients and simple APIs, requiring a variety of query run times, to allow backend teams to query the data lake, abstracting the underlying query engine.
  • Dashboards: Fast queries, less than three minutes, supporting dashboard creation and consumption.
  • Notebooks: Medium runtime queries, less than 15 minutes, supporting web-based data notebooks allowing data exploration and query prototyping to be used by the ETL cluster.

The SEP cluster deployed using CFT on r5.12xlarge instances, providing access to Delta Lake and Hive catalogs. HMS and Ranger for RBAC provide supporting services.

Cluster design issues

Once we were up and running, we encountered the following cluster design issues:

  • Longer cluster update and creation times, as CFT template requires many resources to be created. 
  • Less flexibility to make changes to the infrastructure, as everything is launched by the CFT template in a rigid manner. 
  • Autoscaling groups downscale too slowly, and create a situation where the cluster runs with the maximum number of nodes even when consumption is greatly reduced.

To solve these problems, we ran a PoC on a new SEP cluster deployed on Kubernetes with Starburst’s SEP Helm charts.

Kubernetes proof of concept

We kicked off a proof-of-concept cluster in Kubernetes using the same instance type r5.12xlarge for the nodegroups to keep performance the same, and running 1 pod per worker utilizing ~80-85% of the memory from the instance. As the SEP images were in the us-east-1 repository, image pull times were higher, so we cloned the images in Meesho’s production ECR repository to reduce pull time, and therefore overall pod launch time.

Performance testing approach

To recreate the different query workloads for short running and long running queries, we designed a query replay system that allows you to select a desired query run length, and executes a matching query to check performance. With this system, we can execute queries on the same schedule as they are run on the production SEP cluster.

Additionally, we added a regression task in our Airflow DAG. We enable this task when we want to do performance testing during design iterations without affecting the production workflow performance.

Design iteration 1: Improving fault tolerant execution query performance

With the migration to Kubernetes, we also wanted to utilize spot instances for running queries in a fault tolerant manner. We enabled fault toleration execution (FTE) in SEP configured with an exchange manager and a task retry policy for short- (three minute) and medium-running (15 minutes ) queries. We found that the query run times increased three-fold using S3 as a buffer. Similarly, long-running queries taking less than 10 minutes increased the runtime by 1.5x to 2x or more using a task retry policy.

Given these results, we next used the FTE with a query retry policy without configuring the exchange manager. This uses memory to store the intermediate data from the tasks. We found that this produced similar performance as without FTE configured, but still executes query retries if a node is terminated due to spot instance preemption.

Design iteration 2: Reducing the SEP query execution time on EKS

After running SEP in an Kubernetes cluster with FTE query retry policies, it was clear that for shorter running queries the runtime was close to those observed in the CFT-based cluster. However, long-running queries were still slower by ~1.5x.

For the most part, the Kubernetes cluster used the same configuration as the CFS cluster, except using multiple availability zones (AZs) for the Kubernetes node group. While running the same queries on both clusters with the same resources at the same time, we also observed that the Kubernetes cluster processed half the data in bytes per second as the CFT-based cluster.

We then switched the Kubernetes cluster to run in a single availability zone, and got the same performance as observed on the CFT cluster.

Design iteration 3: Sizing the SEP coordinator & worker pods

In this iteration, we switched to running multiple pods on larger r5.16xlarge and r5.24xlarge instance types. This did not yield better performance compared to running one pod per instance of r5.12xlarge type, so we reverted to using one pod per r5.12xlarge instance.

Further enhancements

Once we were sure we had our basic Kubernetes cluster design correct, we added stability-related features.

High Availability 

As we wanted to roll out the queries from the CFT-based clusters to the new Kubernetes clusters gradually while adding high availability, we implemented Lyft’s Presto gateway.

With this architecture, we are able to distribute short-, medium- and long-running queries in round-robin fashion between the Kubernetes and CFT clusters running at 50% capacity with the same resources and configuration with high availability.

The gateway also allows us to set up clusters in multiple AZs, increasing our spot instance access. This enables us to do blue/green deployments, achieving zero downtime even during updates.

Graceful shutdown

To support safely downscaling without query failures, we configured different graceful shutdown periods on each cluster based on the maximum query runtime time of the workload each cluster was designated to handle.

We also added a set of scheduled, CPU-based auto scaling policies for the worker autoscaling groups (ASGs) in the CFT-based clusters, enabling us to scale the cluster according to the active users and query pattern on the different clusters.

To add similar functionality in the Kubernetes cluster worker ASGs, we added lifecycle hooks on termination to keep the terminating node in its terminating wait state for the graceful shutdown period. We also added a node termination handler, which captures node termination events and triggers a sigterm on the pod running on the instance marked for termination. This allows us to use the scheduled scaling policies in worker nodegroups with the Kubernetes HPA horizontal pod autoscaler (HPA).

Finally, we wanted to also terminate the instances where the pod was running when the pods were being terminated. To achieve this, we added a sidecar to run the aws-cli container in the worker pods, which allows us to terminate the instance running the terminating worker pods.

Productionalizing Kubernetes

We rolled Kubernetes clusters out to production by creating separate nodegroups for the coordinator and for the workers in each Kubernetes cluster. This allows us to have separate instance types for the coordinator and workers, as well as allowing us to specify different spot instance percentages for worker nodes.

Shifting workloads

Existing workloads were shifted until we had equivalent loads in CFT and Kubernetes. Once we had shifted a full 50% of queries from CFT-based clusters to their Kubernetes-based counterparts, we were able to compare the performance of the queries and the cost differences on both clusters and found the following:

  • Scaling in the Kubernetes SEP cluster with HPA was more aggressive than in ASGs without HPA with the same average cpu utilization.
  • Software costs in the Kubernetes-based clusters were reduced as compared to their CFT counterparts.
  • The p90 execution and queue times were similar on both cluster types, even with the Kubernetes-based clusters running with less workers on average than in the CFT-based clusters.

Increasing spot instance utilization

We wanted to better utilize spot instances on the different ad hoc clusters based on the average query runtimes of their workloads, and set the spot instance targets for peak periods as follows:

  • Primary short-running queries: 50%
  • Other short-running queries: 70%
  • Medium-running queries: 70%
  • Long running queries: The first n instances configured for an on-demand based on the minReplica. After the minReplica count is reached, add 10-20 % spot instances. 

For non-peak periods, we wanted to run with higher spot percentages, and even higher overnight when usage, to around 95%. To configure these different spot percentages, we created a cron-based system that schedules the on-demand percentages for the different worker autoscaling groups.

Creating a fallback

To ensure that we minimize launch failures stemming from the unavailability of the prescribed percentages of spot instances, we wanted to dynamically increase the on-demand percentage. To do this, we created a fallback framework where, on every spot instance of launch failure, an SQS topic notification is sent which triggers a lambda function that increases the worker nodegroup ASG on-demand percentage by a given percent.

With this new system, if a situation arises where there are 0 spot instances available over a duration that causes the on-demand percentage to increase to 100%, the cron-based system will eventually reset the percentages to the optimal values for the worker nodegroup ASG.

Combined, the cron system and fallback framework together help us reduce costs by approximately 30%.

Conclusion

TL;DR 

The migration of Trino clusters from AWS CFT to K8s allows us to:

  1. Perform upgrades and deploy Trino clusters 3-7 minutes faster. Prior to the migration, deployments and upgrades would take approximately 20-30 minutes.
  2. Run the Trino cluster on a mix of spot and on-demand instances. We can also  modify the percentage used by each instance type throughout the day. This was not possible with CFT deployments, which required us to use either 100% spot or 100% on-demand instances. 
  3. Reduce the infrastructure cost by 50-70%. The p90 execution time remains under 60 seconds, despite the growing data scan and queries increasing in number and complexity. 

Starburst Galaxy

Activate the data in and around your data lake

Start Free