Building Reporting Structures on S3 using Starburst Galaxy and Apache Iceberg

  • Tom Nats

    Tom Nats

    Director of Customer Solutions

    Starburst

Share

AWS S3 has become one of the most widely used storage platforms in the world. Companies store a variety of data on S3 from application data to event based and IoT data. Oftentimes, this data is used for analytics in the form of regular BI reporting in addition to ad hoc reporting.

The relational database has been around for 30+ years and SQL is the de facto standard in the industry to interact and ask questions of big data. Within the last few years, copying and building reporting structures in a cloud data warehouse has been the default behavior because honestly, there weren’t many other choices.

Leaving this data on S3 without copying or moving this data is a preferred method for any company hoping to eliminate an expensive and time consuming data migration. In addition, the data lives in the customer’s account and isn’t tied to any technology/vendor which allows different processing engines, such as Starburst, to query this data providing the ultimate optionality and capitalizing on the separation of storage and compute.

What do you do after you move data to AWS S3?

In this blog post, we’ll cover how once data is landed on S3, reporting structures can easily be built directly on S3 and queried with any SQL BI tool easily. Additionally, with our new Great Lakes connector, Starburst Galaxy handles the complexity of the file and open table format of choice whether it be Apache Iceberg tables, Delta Lake or regular ORC or Parquet file.

In the diagram above, we have a very simple example of sales and customer data landing on S3 in raw JSON format from different source systems (it could be csv, txt, Parquet, ORC, etc.. as well).  From there, on a scheduled basis, that data is inserted into a partitioned table we’ll call our “structured” layer. Next, reporting tables are created and updated on a regular basis to power SQL BI reporting and ad hoc querying. Additionally, the base tables can be queried directly if needed.

Landing Raw Data Files

Data from one or more source systems are landed into S3 using a tool of your choice. This can be via scripting or an ingestion tool which there are many to choose from. Additionally, if the source system has a Starburst Galaxy connector available, data can be ingested using simple SQL as well.

Sales Data

For our example, let’s say sales data is being landed every hour into an S3 bucket. Since there is lots of sales data coming in, the files are dropped into a folder per day, every hour: s3://myzonebucket/salesdataraw/<YYYY-MM-DD>

Each file is json with a unique name and timestamp. Example: salesdataraw_2022-04-02-105717.json

The json data looks like this:

{ 
  "orderid": "29282", 
  "custkey": "372", 
  "product": "shoes", 
  "quantity": "7", 
  "total": "87", 
  "orderdate": "04/03/2022" 
}

To create a table that can read this data, we use the Great Lakes connector in Starburst Galaxy which can handle json, txt, csv, Parquet, ORC, Apache Iceberg and Delta Lake formats. Since we’re on AWS, I’ve chosen to use AWS Gluea fully managed ETL (Extract, Transform, Load) service provided by AWS—for a metastore.

create table s3datalake.raw.sales_land ( 
   order _id bigint, 
   custkey bigint, 
   product varchar, 
   quantity integer, 
   total integer, 
   orderdate date 
) with 
(type='hive', format='json', partitioned_by=array['orderdate'], 
external_location='s3://lakehouse-sb/blog/sales_land'); 
);

Next, we need to let Glue know about the partitions in this location: (this needs to be done when new folders are created each day and only using the Hive connector using partitioned tables)

call system.sync_partition_metadata('blog', 'sales_land', 'FULL')

Note: if you use the AWS Glue crawler then this command would not be needed as it takes care of this process

Now we can query this data using standard SQL:

select * from s3lakehouse.blog.sales_land; 
  orderid | custkey | product | quantity | total | orderdate 
---------+---------+---------+----------+------------------ 
   29282 |      372 | shoes   |        7 |    65   2022-02-04 
   29283 |      371 | wrench  |        3 |    24   2022-02-05

As new data arrives, it will automatically be picked up in subsequent queries.

Note: As new data arrives per day and new directories are added, the sync_partitions_metadata command would need to be executed on a daily basis. For faster performance, it’s quicker to use the system.register_partition procedure instead. See the documentation for examples.

Customer Data

For customer data, new and modified records are placed in a single folder with unique file names and a column named “last_update_dt” with the date the record was created or modified in the source system:

Example: customerraw_2022-04-02-105717.json

{ 
  "custkey": "372", 
  "name": "Bob Jones", 
  "state": "CO", 
  "zip": "98373", 
  "cust_since": "2009/04/23", 
  “last_update_dt”: "2022/04/05" 
}

To create tables based on this data, we use similar sql as above:

create table s3datalake.raw.customer_land (
  custkey bigint,
  name varchar,
  state varchar,
  zip integer,
  cust_since date,
  last_update_dt date
) with
(type='hive', format='json'
);

Just like before, we can just execute SQL against this table now:

select * from s3lakehouse.blog.customer_land; 
 custkey |   name    | state |  zip  | cust_since | last_update_dt 
---------+-----------+-------+-------+------------+---------------- 
   29283 | Elon Musk | TX    | 82872 | 2012-07-14 | 2022-04-05 
   29282 | Bob Jones | CO    | 98373 | 2009-04-23 | 2022-04-05

Now we have both of our landing tables built out and we can query the data in these files.

Structured Tables

Since we have incoming data being landed into an S3 bucket, we want to insert and update that data into our “base” area. This can be used for direct querying from our end users as well as a source of data to build reporting structures.

Sales data – this data is arriving in JSON files which is not good for performant queries (even though it’s partitioned). An open, columnar file format can be used such as Parquet or ORC files which will provide much faster query performance for analytical workloads. In addition, using Delta Lake or Apache Iceberg has even more advantages due to the performance gains especially on larger tables with partitions.

Steps to populate the sales base table:

  1. We first create a table that will hold the sales data using an Iceberg table that will be partitioned by orderdate:
create table s3lakehouse.blog.sales_base( 
 orderid bigint, 
 custkey bigint, 
 product varchar, 
 quantity integer, 
 price double, 
 orderdate date, 
 create_dt date, 
 last_update_dt date 
) with 
(type='iceberg', partitioning = array['orderdate']);
  1. Next, we insert data into our sales_base table, we would issue a simple insert statement:
-- daily incremental load 
insert into s3lakehouse.blog.sales_base 
with sl as 
(select 
   orderid, 
   custkey, 
   product, 
   quantity, 
   price, 
   orderdate, 
   current_date, 
   current_date 
from 
   s3lakehouse.blog.sales_land where orderdate = cast('2022-09-24' as date)) 
select * from sl where orderid not in (select orderid from 
s3lakehouse.blog.sales_base where orderdate = cast('2022-09024' as date));

The reason we add a date filter is to only select the data from the landing table reducing the amount of data we need to scan on S3. So, each time we run this query, we will adjust the date so we’re only scanning one day of data in both tables making the insert very fast.

Customer data

Next, we load any new or modified customer data into the customer_base table.

First, we’ll create the table and make it an Iceberg one so we can benefit from updates and performance:

create table s3lakehouse.blog.customer_base( 
  custkey bigint, 
  name varchar, 
  state varchar, 
  zip integer, 
  cust_since date, 
  last_update_dt date 
) with 
(type=iceberg);

Next, we will insert or update any customer data that has arrived in the landing table.

-- Update or insert using merge
MERGE INTO s3lakehouse.blog.customer_base AS b
USING s3lakehouse.blog.customer_land AS l
ON (b.custkey = l.custkey)
WHEN MATCHED and b.name != l.name
THEN UPDATE
SET name = l.name, 
            state = l.state,
            zip = l.zip,
            cust_since = l.cust_since
WHEN NOT MATCHED
        THEN INSERT (custkey, name, state, zip, cust_since,last_update_dt,active_ind,end_dt)
              VALUES(l.custkey, l.name, l.state, l.zip, l.cust_since,l.last_update_dt,'Y',null);

This is known as a “Type 1” update meaning we just insert any new rows and update existing ones. We could also do a SCD2 Slowing Changing Dimension Type 2 which means we keep history in the table for every modified record. This is a very common technique brought over from the data warehouse world. There will be a separate blog post on this coming soon.

From here, we have built out our landing and base tables. The structured tables can be queried directly by anyone if needed. Since they use the Iceberg table format, they will be more performant for selective queries.

Reporting Tables

The structured tables will be used as the source for our reporting tables which is a pretty common methodology. We’ll continue to use the Iceberg format for the increased performance over regular Parquet or ORC file formats.

In this example, we’ll build three very simple reporting tables which are:

  1. Sales by Week
  2. Sales by Month
  3. Sales by Year

Create sales by week rollup table:

create table s3lakehouse.blog.weekly_sales_rollup with (type='iceberg') as
select sum(quantity+price) total,date_format(orderdate, '%v') week from s3lakehouse.blog.sales_base
group by date_format(orderdate, '%v');

Create sales by month rollup table:

create table s3lakehouse.blog.monthly_sales_rollup with (type='iceberg') as
select sum(quantity+price) total,date_format(orderdate, '%m') month from s3lakehouse.blog.sales_base
group by date_format(orderdate, '%m');

Create sales by year rollup table:

create table s3lakehouse.blog.yearly_sales_rollup with (type='iceberg') as
select sum(quantity+price) total,date_format(orderdate, '%Y') year from s3lakehouse.blog.sales_base
group by date_format(orderdate, '%Y');

Now we can query these tables using any BI or SQL tool:

select * from s3lakehouse.blog.yearly_sales_rollup order by 2;

These rollup or reporting tables can be added to or replaced each time new data arrives.

As you can see, using open-source Trino and Iceberg together provides an easy way to ingest data and provide analytics right on top of any object storage such as S3, Azure ADLS and GCP’s cloud storage.  With more and more Iceberg features being added to Trino, it’s quickly becoming the dominant table format of choice.

Note, all of the commands above were run on Starburst Galaxy which is a managed platform for Trino. Within a few clicks, you can have a cluster up and running to build your own data lakehouse on S3/ADLS/GCS. Additionally, all of these commands were run on our “free” cluster which is great for learning and experimenting with Trino and Iceberg.

See you in the Starburst Galaxy!