Racing for commits on Delta Lake tables using Starburst

  • Marius Grama

    Marius Grama

    Engineering Manager

    Starburst

  • Evan Smith

    Evan Smith

    Technical Content Manager

    Starburst Data

Share

Linkedin iconFacebook iconTwitter icon

Delta Lake is great for data analytics and data engineering. One of the things that make it so successful is its support for Snapshot Isolation. This architectural feature allows read operations to occur on different versions of a dataset. With this approach, users can perform queries on a given table at a specific moment in time using the tableset’s version history. For example, this Snapshot Isolation allows Delta Lake to deliver Time Travel functionality. Companies deploying Delta Lake can leverage this architecture in many different use cases, leading to direct business value.

However, using Snapshot Isolation comes with one major downside for systems making use of it. When executing write operations with transaction guarantees, it can be complex to ensure the integrity of the table between concurrent writes.

This article will explore the problem of ensuring that ACID guarantees are met even when performing writes on Delta Lake tables concurrently. It will also look at how this problem can be solved using Starburst and how such an approach can help users transparently reconcile concurrent write operations when using Delta Lake.

 

How Delta Lake architecture works

To help unpack how this topic, let’s examine how the Delta Lake architecture operates. After that, let’s focus specifically on the question of managing concurrent write operations. 

Let’s start at the beginning. 

Delta Lake is an open-source table format. It acts as a storage layer, typically operating on top of object storage used by most data lakes. Along with other open table formats like Apache Iceberg and Apache Hudi, Delta Lake makes ACID transactions easy when using inexpensive object storage.

To make this possible, Delta Lake’s architecture operates using two key mechanisms:

  • Data Storage: Delta Lake stores data in Parquet files, a columnar file format optimized for efficient data analysis.
  • Transaction Log: Delta Lake records every transaction committed to the Delta Lake table and stores it in a transaction log as metadata. 

Delta Lake table directory structure

To help understand how this data architecture works, have a look at the following images. The second image shows how the Delta Lake table directory structure works. 

In the example below, a _delta_log directory shows the creation of multiple versions of a dataset’s history. Each of these versions tracks the metadata that describes the changes made to the table. Importantly, this metadata is held in a JSON format, and named in an ascending numerical order. As more data is added, deleted, or updated, the delta log records these changes as new JSON files in the transaction log. To increase the read throughput of the transaction log, operations are also collapsed into checkpoint files. These checkpoint files are recorded at predefined intervals as parquet files and play an important role in Delta Lake’s ability to operate responsively. 

Diagram showing a visual representation of how a Delta Lake table directory structure works.

Delta Lake transaction log

The second image shows how the Delta Lake transaction log operates. The Transaction Log works by recording changes in the state of the target dataset. 

In the example below, a transaction log is created using a JSON file type. First, two transactions are added, and each is recorded in parquet files. This change of state is captured in a JSON file. Next, an additional transaction is recorded, creating another JSON file. Finally, the 2nd entry is deleted, and a fourth entry added, creating a third version of the JSON file containing the metadata. Note that the last transaction uses a method known as Copy-on-write

Diagram showing how the Delta Lake transaction log operations, including an example of data being added, deleted, and updated.

What data lakehouses let you do differently than Hive? 

These two differences allow Delta Lake to solve a very particular set of problems introduced by idiosyncrasies in the Hive table format. This approach is similar to other data lakehouse table formats.

Overall, a data lakehouse architecture is able to achieve a number of things that a data warehouse or data lake architecture cannot, including: 

  • ACID Transactions using cloud object storage.
  • Schema Evolution, allowing columns to be added, modified, or dropped without breaking existing applications.
  • Data Versioning, allowing users to track changes to a table over time with the possibility of reverting it to a previous version.
  • Data Skipping, allowing users to skip over irrelevant data when filtering selecting data from the table.
  • Time Travel, allowing the ability to query historical versions of the a table and write queries against those versions.

What happens if two users operate on the same dataset at the same time? 

You might wonder how all of this functionality flows from one metadata table. And beyond that, what happens when two users try to operate on the same dataset at the same time?

The answer is that the Delta Log allows for parallel data processing in a very particular way. It uses the integrity of the metadata underlying those data files as an ACID guarantee. It achieves this by ensuring that write operations don’t change existing files. Instead, updates write new metadata on top of a file, ensuring that one user cannot negatively impact another user. For example, if a user updates a file, F1.parquet, Delta Lake records this update as two operations: 

  • ADD F2.parquet
  • DELETE F1.parquet

This approach reduces the risk of corruption by multiple users, greatly improving team collaboration and allowing teams who use Delta Lake to generate additional business value from the underlying data architecture. 

How conditional writes allow for concurrent writes on Delta Lake

How do you make use of this on Delta Lake? The solution comes by leveraging conditional writes and concurrent writes. By reconciling concurrent writes, Delta Lake allows multiple users to operate on a dataset at the same time, ensuring ACID compliance. 

Specifically, the two work in the following way:

  • Conditional writes: Restrict data being written through an update or delete operation. To do this, they only proceed with a write if certain conditions have been met. Conditional writes occur in Delta Lake when writing to the transaction log files, essentially performing a COMMIT operation on the write. 
  • Concurrent writes: Concurrent writes occur if the conditions of a given write have been satisfied. In practice, this allows different users to write in different partitions without restriction. Because conditions are being observed, and because the Delta Log keeps a record of all changes anyway, users are free to write to the dataset knowing that the single source of truth has been maintained. This kind of optimistic concurrency is essentially underwritten by the Delta Log itself, which will resolve any discrepancies in the event that one might occur using the reconciliation logic of the operations already committed. In essence, concurrent writes use this mechanism to manage the logic needed for more than one user to commit write operations simultaneously. Importantly, this occurs without negatively impacting the integrity of the dataset. 

Why do conditional and concurrent writing matter?

Together, both conditional writes and concurrent writes allow something very powerful. Specifically, they let teams work independently on a single dataset safe in the knowledge that it will not harm the integrity of the data inside it. This fundamental shift makes working on data that changes rapidly far easier, and in turn, ensures that the value of that data is not disrupted by version clashes or discrepancies. The transaction log itself acts as the arbiter of truth while allowing for a more agile, flexible arrangement than what would have been possible previously using Hive or similar technologies. 

AWS S3 adds conditional write support

This is the part of the story where AWS comes in. 

Azure ADLS and Google GCS Cloud Object Storage Provider have long offered conditional writes capability. However, until recently, Amazon S3 object storage was lacking this capability. This limitation also reflects the ability to ensure that the Delta Lake tables are not corrupted in usage scenarios involving concurrent writes from different query engines.

Let’s look at an example of the problem. 

Concurrent writes transaction log collision 

The image below shows what happens without conditional write support. Two operations try to add data concurrently to a table, resulting in a transaction log collision with the transaction log file 00000000000000000022.json. Without AWS S3 conditional write support, problems like this could potentially occur, corrupting the integrity of the table. This scenario risks losing track of the changes made by the first operation committed in the transaction log file 00000000000000000022.json.

This is the main problem solved by AWS supporting conditional writes. 

Diagram showing the problem that can occur when writing to delta lake simultaneously without concurrent write support, leading to a collision of data.

Starburst and conditional writes functionality on Amazon S3

How is Starburst helping to solve this problem? We’re happy to announce that both Starburst Galaxy and Starburst Enterprise (release 458-e) now support this new feature. This means that previous limitations no longer apply on Amazon S3 when performing concurrent writes from query engine clusters using the Amazon S3 API. This follows the official announcement from AWS several weeks ago. 

Are you interested in seeing how we did it? Starburst believes in transparency, and we’re immensely proud of the work that our Engineering team puts into new features like this one. For more information on the efforts to create this functionality, please see the following GitHub repo

Concurrent writes reconciliation on disjoint partitions 

As you’ve seen, the Delta Lake table format provides ACID transaction guarantees between read and write operations made on a Delta Lake table. Importantly, this functionality has many implications. For example, one common scenario requested by Starburst customers who rely on Delta Lake workloads is the ability to perform concurrent write operations on disjoint partitions of a table. 

Supporting data partitions

For example, concurrent write operations might be used in a table partitioned by date to insert into today’s partition while redacting all deactivated user information before today’s date to ensure GDPR compliance

Failing operations can be very problematic when dealing with concurrent write operations on a Delta Lake table. This might occur because another write operation touches an unrelated partition of the table and manages to commit before the other write operations.

To fix this, we’ve added reconciliation logic in the Delta Lake connector for concurrent writes operations. This provides the ability to retry with an incremented transaction log identifier transparently. Importantly, the transaction log file names are monotonically ascending). This means that the operation stumbled into a conditional write failure because the transaction log file already exists.

Let’s look at an example of how this problem is solved. 

Concurrent writes transaction log reconciliation

Let’s use the same example involving concurrent writes that we saw before, but this time, let’s add concurrent write support. Here, both operations try to add data concurrently to the table, resulting in the same collision when trying to write the transaction log file 00000000000000000022.json. 

However, with concurrent write support providing reconciliation, the failed commit will not result in a corrupted table. Instead, it will attempt to retry to commit using the next transaction log. In this example, the write succeeds, and the table remaining uncorrupted. 

The image below shows how this works in practice. 

Image showing how concurrent writes in Delta Lake can be reconciled using concurrent write support with Starburst.

How to invoke concurrent writes using SQL

This functionality is currently available for all DML languages. This includes the INSERT, UPDATE, MERGE, DELETE, and TRUNCATE operations.

For example, the SQL below uses this functionality. Specifically, it inserts new customer data into a table partitioned by day and deletes old data according to certain logical conditions being true. Importantly, the code uses concurrent write functionality, making a conditional write that is then reconciled. These queries would be invoked in parallel. Before this feature, this could have created a conflict. However, with the new write concurrency support, the integrity of this table is maintained under all conditions. 

INSERT INTO customer_data(day, customer_id, data) VALUES (CURRENT_DATE(), 4001, ‘PII data’);
DELETE FROM customer_data WHERE day < CURRENT_DATE() and customer_id = 1234

Starburst and Delta Lake

At Starburst, we’ve built a state-of-the-art offering capable of handling critical Delta Lake business workloads by some of the largest organizations in the world. 

Built on Trino, Starburst represents one of the very best ways to optimize your compute when dealing with Delta Lake. With these new changes to the Concurrent write feature, we continue the trend started with our Delta Lake connector. Whether using SQL for ETL and data pipelines, BI tools, ad hoc queries, or machine learning (ML) and Artificial Intelligence (AI), Starburst helps you get the most out of Delta Lake and provides Optionality

Want to know more about your options with Starburst? Download a free Starburst Galaxy trial today. 

Cookie Notice

This site uses cookies for performance, analytics, personalization and advertising purposes. For more information about how we use cookies please see our Cookie Policy.

Manage Consent Preferences

Essential/Strictly Necessary Cookies

Required

These cookies are essential in order to enable you to move around the website and use its features, such as accessing secure areas of the website.

Analytical/Performance Cookies

These are analytics cookies that allow us to collect information about how visitors use a website, for instance which pages visitors go to most often, and if they get error messages from web pages.

Functional/Preference Cookies

These cookies allow our website to properly function and in particular will allow you to use its more personal features.

Targeting/Advertising Cookies

These cookies are used by third parties to build a profile of your interests and show you relevant adverts on other sites.