Automating the “Icehouse” – Fully-managed Open Lakehouse Platform on Starburst Galaxy

A fully managed, end-to-end lakehouse platform built on open-source Trino and Iceberg

Share

Today at Data Universe 2024 we announced our fully managed Icehouse implementation in Starburst Galaxy, serving as the foundation of our open lakehouse platform.  While this blog is focused on Apache Iceberg as part of the Icehouse implementation, Starburst continues to offer a truly open data lakehouse with full support for Delta Lake and Apache Hudi as well.

Described in the Icehouse Manifesto, Trino + Iceberg has become the preferred architecture for building an open lakehouse. Organizations like Netflix, Apple, Shopify, and Stripe have already adopted the Icehouse architecture for analytics, AI/ML, and data applications use cases at internet scale. However, as described in the manifesto, Icehouse architecture is more than just simply deploying the combination of Trino and Iceberg; data ingestion, Iceberg file management, data governance and security, and Trino capacity management are also critical to operating an Icehouse in production.

In this blog post, we will dive into how we are solving for each of the following in Starburst Galaxy:

Everything is an Iceberg table

It may be obvious that data in an Icehouse architecture would be stored in Iceberg format. However, there are many benefits to this beyond just the advantages of the table format itself.  With deeper integration of the Trino engine with optimization and governance of the underlying Iceberg tables, Starburst Galaxy achieves industry-leading price-performance and simplicity that was previously only available in proprietary systems, such as data warehouse and database architectures—without the vendor lock-in.

Data ingestion

Let’s first take a look at how data lands in the lakehouse to begin with. Most users will populate their Iceberg tables from a variety of sources, such as Apache Kafka, files loaded directly into object storage (e.g. Amazon S3), SaaS applications like Salesforce, or Change Data Capture (CDC) events from an operational database. We will first take a look at one of the most common sources, Kafka, and describe how you can use Galaxy to reliably ingest Kafka data into Iceberg tables. 

In principle, ingesting data from Kafka is a simple matter of converting messages from a Kafka topic into rows in an Iceberg table. However,  we’ve found from working with our customers that this can be more difficult than it initially seems. In theory, Kafka events can be read out and written to object storage using a tool such as Kafka Connect or Flink. Once in object storage, raw event data is typically transformed into a queryable table format, such as Iceberg. Apache Spark is often used for this transformation, as well as to catalog the data in a metastore such as AWS Glue. Parts of these systems are non-transactional, and usually provide at least once processing guarantees, which can introduce duplicate data into the data lakehouse—compromising the reliability of the data lakehouse as an analytics platform. 

But that’s not all! In order to accomplish those tasks, data engineering teams need to create and maintain many complex, custom pipeline processes, including building in orchestration logic or accounting for schema changes. All of these individual pieces of code are custom, brittle, and challenging to maintain at scale. And, as platform adoption grows, the infrastructure and overhead costs can easily grow out of control.

Working with our customers, we developed a managed ingestion solution for Kafka as part of our Icehouse implementation that greatly simplifies data ingestion. This integration takes just 3 simple steps:

  1. Configure Galaxy to connect to any Kafka-compliant event stream, such as open source Apache Kafka, Confluent Cloud or Platform, or Amazon Managed Streaming for Apache Kafka (MSK).
  2. Select a destination for the stream. Galaxy will connect to your Kafka topics, and each message read will be transformed into a relational form, a single row with columns, and ingested into an Iceberg table stored in your own S3 bucket.
  3. Transform / map the Kafka message (e.g., JSON, Avro, Protbuf format) to relational format (rows and columns with Trino data types). Here, Kafka messages are analyzed, and a mapping to relational form is automatically suggested. Once the transformation is in place, data starts flowing from Kafka to your Iceberg tables continuously. All those tasks that would take several manual hours are now automated to happen in near real-time!

Exactly once processing

The ingestion and pipelines for automated data preparation are only good if you can trust the data has been consumed correctly. In addition to the correctness of parsing and writing the Kafka messages to Iceberg tables, the ingestion cannot be lossy. Specifically, Starburst’s Icehouse implementation guarantees exactly once delivery, ensuring that no duplicate messages are read and that no messages are missed. Because we atomically update the metastore (e.g., Galaxy’s built-in metastore or AWS Glue), you will never be left in a corrupt state where new data was written to the Iceberg table but the metastore did not get updated to point to the latest Iceberg manifest file.

Data quality at ingestion

Unfortunately, incoming data doesn’t always match the expected format or it fails a data quality check. When this occurs, Starburst Galaxy places the data in a separate Iceberg table (a dead letter/poison message table).

Placing both the landed data and any invalid data into Iceberg tables is a very important architectural choice of Starburst Galaxy. In fact, ingesting data is often not enough, and further transformations are required for the data to become consumer-ready. Additionally, placing all transformations into Iceberg tables, including aggregates and joins that are performed in the Galaxy Icehouse implementation, as well as metadata information, is equally important. With everything as an Iceberg table, troubleshooting data quality issues becomes substantially easier and consistent because every question asked is simply a SQL query against an Iceberg table! Furthermore, you can decide to keep or throw away as much of the transformation data as you want, again, as it is simply an Iceberg table in your lake.

Automated data preparation

Beyond ingestion, for data to be useful to an organization, it must meet correctness and performance requirements for live apps and dashboards as well as ad hoc querying. Using Starburst’s Icehouse implementation, data engineering teams can use SQL to further transform and prepare their data as part of the ingestion pipeline to make it ready for consumption. 

This is a very important concept built on the first principle that everything is a managed Iceberg table. The initial ingestion is just the first stop of potentially several downstream transformations needed to prepare the data for consumption. We mend together both data ingestion and preparation as one in our Icehouse implementation. And as data is ingested, this pipeline of transformations are evaluated in near real-time on only the newly ingested data. The results for the transformation stage, defined by a SQL query, are stored in an Iceberg table. Each table in the transformation pipeline is queryable and can also be used to track lineage or debug data quality issues in the pipeline. Similar to ingestion, each stage in the pipelines you set up has a separate Iceberg table (a dead letter / poison message table) for the records that failed.

For example, you may want to create a downstream Iceberg table that filters on a store’s sales by geographical location and then create another downstream Iceberg table that pre-aggregates sales on yearly or monthly boundaries.

Schema changes

We’ve talked about handling errors in a separate Iceberg table, such as when the data doesn’t match the expected schema. As is often the case, schema changes can happen upstream, and it is important for the ingestion system to adapt to these changes. When schemas change, such as columns being added or removed, the ingestion system must adjust accordingly; otherwise, all new messages will fail to ingest and end up in the dead letter table. This would result in data downtime (delays in data being available for your production queries) while you determine the issue. Using Galaxy, you can easily identify and resolve schema changes, minimizing disruption and data downtime. 

With the Hive-style tables of legacy data lakes, schema changes simply were not possible. Iceberg’s design allows you to easily alter a table as you’re used to with a database or data warehouse. Galaxy takes care of handling the schema changes, but it’s the underlying power of the Iceberg + Trino design that makes this possible.

Automated data maintenance & optimization

For anyone building an Icehouse architecture, continuous data engineering efforts are required to maintain data quality, adhere to data retention policies, and uphold SLAs. These efforts include manually managing data files or creating and maintaining custom software that manages the data lifecycles and optimizes data for query performance. Furthermore, continuous data ingestion and preparation creates further headaches, as they create significantly more smaller files to manage than compared to a batch-oriented system that writes data periodically. Often, these operations are performed reactively when needed or require significant engineering investment to build something durable. This takes time, resources, and expertise from the data engineering team responsible for managing the lakehouse, which is typically already stretched.

With Galaxy’s Icehouse implementation, we automated data maintenance, relieving data teams of the burden of optimizing Iceberg tables and associated operations. This includes:

  • Compaction jobs for ingested tables are automatically run in the background, optimizing table storage and query performance without manual intervention.
  • Flexible data retention periods (e.g., 1 week, 1 month, 6 months, 1 year, or indefinitely), with automatic deletion of data older than the specified period. Retention policies can be set at the table or schema level. 
  • Error detection and notification mechanisms to alert users of any issues preventing data ingestion. This includes errors such as throttling, bad configurations, or parsing failures.

These operations are run in the background and happen automatically so you don’t have to worry about what maintenance tasks to run and when to run them.

Querying data

We’ve described our most recent additions to Starburst’s Icehouse implementation, specifically how Starburst can help you easily ingest, prepare, and manage data stored in Iceberg tables. However, an Icehouse implementation is not complete without both Iceberg and Trino. Since Starburst is built on Trino, you can run SQL queries on the Iceberg tables using Starburst. To provide the best cost/performance, Starburst’s Icehouse implementation builds on the auto-tuning capabilities in Starburst Warp Speed to enable interactive querying without requiring costly expert tuning and code changes. 

In addition to querying, Starburst also provides data governance and manages Trino compute capacity to help operationalize an Icehouse. Gravity, Galaxy’s robust governance layer, helps complete an Icehouse architecture with built-in access controls, universal discovery, observability features, and intelligent classification methods.

Preview starting today

As of today, we’re launching our private preview program for our fully managed Icehouse implementation in Starburst Galaxy. If you’re interested in experiencing the power of Trino + Iceberg using Starburst’s Icehouse, you can inquire here and a member of our Product team will reach out to you.