With the advent of modern table formats such as Apache Iceberg that provide data warehousing capabilities on object storage, there has been a huge groundswell around the concept of a modern data lake. However, the process of actually standing up a data lake can be quite daunting if you don’t know where to start.
In this blog post, we will be walking through the necessary steps involved to migrate a table currently residing in Postgres into a modern data lake in AWS S3. During this process, we will leverage Starburst Galaxy as our data lake analytics platform and dbt Cloud as our data transformation tool. This will involve connecting to the underlying Postgres database and transforming the data into a manageable format by adhering to SCD Type #2 to account for DML statements.
Pre-requisites
Before getting started, ensure you have the following:
- A GitHub account
- A Starburst Galaxy domain
- A running PostgreSQL instance (any RDBMS instance will work)
- Access to a dbt Cloud account
- Authentication credentials to Postgres and AWS S3
Setup Process
- Configure two new catalogs within Starburst Galaxy
- Postgresql catalog to your existing database
- AWS S3 catalog for your data lake table
- Locate your connection variables in Starburst Galaxy’s Partner Connect pane
- Navigate to dbt Cloud and configure a new project
- From account settings (using the gear menu in the top right corner), click “+ New Project”.
- Enter a project name and choose Starburst as your connection.
- Enter the settings for your new project (Host and Port) obtained from the second step above.
- Enter the development credentials for your new project.
- User and password (step 2)
- Catalog: Name of the AWS S3 catalog you created in step 1
- Schema: Name of the schema that you would like your data lake to reside in (e.g. “burst_bank”)
- Configure integration between dbt Cloud and GitHub
- Install dbt Cloud in your Github account
- Clone the following repo within dbt Cloud using HTTPS: https://github.com/YCat33/rds_to_s3_dbt.git
Leveraging dbt Cloud for Data Transformation
Now that we have the setup process out of the way, we can start to dig into exactly what is contained within this repository. We will start that process by examining the “postgres_datalake_sources.yaml” file that contains information about the original Postgres table in question.
Within this file, we are specifying the exact location that our Postgres table resides in, as well as a few other descriptive measures.
Be sure to update the following fields:
- database: Name of the Postgres catalog created in step 1(a) of the setup process
- schema: Schema that our source table resides in within Postgres
- name: Table name in Postgres
- description: Brief description of what information is contained within the table
As a point of reference, here is the DDL statement that was used to create the customer table in Postgres (I will be using this table as an example):
Now all of that information is situated, let’s turn our attention to the postgres_burst_bank_customer_snapshot file that will be used to create a historical SCD Type #2 table for changes implemented within the underlying Postgres table.
There is a lot going on in this file so let’s break it down piece by piece. To start, I am denoting this SQL file as a snapshot, which allows dbt to view how your underlying data changes and is ideal when looking to implement Slowly Changing Dimensions (SCD).
On the first run, dbt will create the initial snapshot table — this will be the result set of your select statement, with additional columns including dbt_valid_from and dbt_valid_to. All records will have a dbt_valid_to = null.
On subsequent runs, dbt will check which records have changed or if any new records have been created. The dbt_valid_to column will be updated for any existing records that have changed. The updated record and any new records will be inserted into the snapshot table. These records will now have dbt_valid_to = null.
**Note: Before moving on to the next section, I want to call out the timestamp_fix macro, which will automatically cast all current_timestamp to timestamp(6) which is the format required for Iceberg tables. This is important as the snapshot mechanism within dbt will create new timestamp columns in the postgres_burst_bank_customer_snapshot table generated by this file.
From there we simply perform a select statement which is selecting all columns within the underlying Postgres table with a slight re-ordering occurring with respect to the registration_date moving from the last to the fourth column in the table.
Now that we have our snapshot table created within S3, we can move on to the dl_customer_consume model that will populate the table that our end-users will query from. Similar to the snapshot table, we are designating an incremental materialization that will leverage a “unique_key”, of our choosing, to update the table.
In the above example, I am leveraging the “custkey” as the unique_key and specifying the table type as “Iceberg” and file format as “Parquet”. Next, I am choosing to implement partitioning on the “registration_date” column on a monthly cadence and sort the data on the “custkey” field.
Note that I am able to partition the registration_date column even though it is not the last column in the table (this would be a requirement in Hive). Additionally, I am able to implement “hidden partitioning” using the “registration month” instead of having to partition on registration_date. All of this will enable Starburst Galaxy to leverage these values and reduce query times.
Now that everything is in place, we can go ahead and execute the following commands within dbt:
dbt snapshot
Triggers the postgres_burst_bank_customer_snapshot.sql file to run.
dbt run --select dl_customer_consume
Triggers the dl_customer_consume.sql file to run.
Using Starburst Galaxy to understand the table workflow
Now that we have executed the models to be run, let’s go into Starburst Galaxy and identify the work done within the engine. You can leverage universal search in the top navigation bar to identify additional metadata information on the table.
Once on the table page within the catalog, we can locate the “Lineage” tab to understand how this table was generated and where the data is coming from.
Here we are able to clearly see that the dl_customer_consume table is downstream from both the customer table in Postgres and the postgres_burst_bank_customer_snapshot table in S3 with a direct relationship between each table.
Inserting into the table
Let’s execute the following statement within Postgres and then re-trigger the workflow in dbt Cloud:
insert into onsite_galaxy_postgres.burst_bank.customer
values (‘99999999’, 'Commander', 'Bun Bun', '123 Street', 'Candyland', 'MA', '02108', 'US', '999-999-9999', '01-01-2001', 'M', 'N', '123-45-6789',
'N', 51299.55, 850, '1966-09-08'
);
dbt snapshot
dbt run --select dl_customer_consume
Taking a look at the lineage of the iceberg_customer_table table, we can now see that there is a “tmp” table created by dbt Cloud as we are running it as an incremental table update.
Updating records in the table
update burst_bank.customer set city = 'Starburst' where custkey = '99999999';
dbt snapshot
dbt run --select dl_customer_consume
Now that we’ve rerun the pipeline, let’s take a look at the output of the following query:
select custkey, city, dbt_updated_at, dbt_valid_from, dbt_valid_to
from demo_aws_s3.dbt_snapshots.postgres_burst_bank_customer_snapshot
where custkey = '99999999'
order by custkey;
Since this is a snapshot table, we can view the history of this record and notice that previously the city value was “Candyland” but that row now has the “dbt_valid_to” column populated (since it is no longer valid) and the “current” record now has a value of “Starburst”.
This is consistent with what we see within the dl_customer_consume table below because dl_customer_consume model within dbt is filtering based on the dbt_valid_from column.
Summary
With a little bit of work and the help of Starburst Galaxy and dbt Cloud, we have now been able to successfully implement a modern data lake with enhanced CDC (change data capture).
This affords us the ability to migrate our workloads from more traditional databases to object storage without needing to sacrifice ACID compliance or time-travel capabilities. In addition, we have the ability to conduct schema evolution, partition evolution and even dynamic partition functionality that in the old days of Hive would have been impossible. So try it out for yourself today to see how you can get the best of both worlds with a modern data lake using Starburst Galaxy and dbt Cloud.