The file explosion problem in Apache Iceberg and what to do when it happens to you

  • Daniel Abadi

    Daniel Abadi

    Computer Science Professor

    University of Maryland, College Park

Share

Linkedin iconFacebook iconTwitter icon

Apache Iceberg is one of the leading tools that can be used to manage metadata associated with raw data stored in Parquet, ORC, or Avro open file formats. This metadata is important in helping users read, understand, write, and update a dataset, and includes information about: 

  • Table schemas (e.g. column attributes and their data types and integrity constraints), 
  • How data changes over time
  • How information relates to each other
  • High-level statistics about datasets 

Iceberg does more than simply cataloging all the metadata in a table. It also provides an interface for multiple data access tools to read, write, and update data concurrently. These data access tools, including Trino, Starburst, and Spark, take advantage of this interface to read and write data while preventing data corruption and other correctness anomalies that can arise from insufficient isolation when multiple applications concurrently try to update the same dataset. In so doing, it tracks the entire history of updates to a dataset as it is modified over time and allows data access tools to “time travel” at will, asking for data as of any desired snapshot in time while supporting analysis of differences across multiple versions. It also supports schema evolution so that the different snapshots may organize data using entirely different structures and CDC (change data capture) style usage that allows processes to run on newly added data as soon as it has been inserted into the dataset.

 

The potential cost of the Iceberg architectural approach

In short, Iceberg supports many critical high-level features that facilitate the management of complex datasets stored in open data formats that may change over time.

However, there is, of course, a cost of all of these features. In order for Iceberg to provide capabilities such as time travel and cross-version analysis, old versions of data must be kept around. This may result in the following side effects:

  • Increased data storage requirements. 
  • Proliferation of small files (see below)
  • Increased memory requirements (so that frequently accessed metadata can be stored in memory)
  • Older versions may slow down searches through metadata at runtime 

Why small files might proliferate when using Apache Iceberg

In order to provide capabilities such as deleting and updating data, Iceberg does not overwrite the data in raw files. In fact, overwriting is generally impossible or very slow in the open file formats Iceberg is designed for. Instead, it stores delta files that are designed to be read alongside the raw data and merged on the fly when the raw data is read. For example, a “position deletes” delta file may indicate that the first, third, and tenth rows of a raw data file have been deleted. When a query is received to read that raw data file, the delta file is also read and merged with the raw data file on the fly. Using this approach, the first, third, and tenth rows are removed during the read operation. 

In general, every time a process takes an action to modify a dataset managed by Iceberg, new metadata files are produced by Iceberg. For example, new files are created when inserting new data, removing data, or updating data. These files are necessary for several reasons, including

  • To record how raw data files have been updated
  • To ensure atomicity of these updates so that changes happen together as a transactional unit
  • To provide the ability to provide snapshots of arbitrary points in history, so that snapshots include the set of all files that existed before that point in time. 

In short, the more times a table is modified, the more files will be generated.

To limit the number of small files kept around, Iceberg provides parameters (such as write.metadata.delete-after-commit.enabled=true) that automatically remove metadata files that are older than that which is set by the parameter: write.metadata.previous-versions-max. However, this is not set by default, since removing old metadata files limits the utility of many of the great features that Iceberg is known for (discussed above). 

 

Examples of Iceberg usage that causes major small file proliferation

In practice, many Iceberg deployments see the number of metadata files grow rapidly over time. For example, “trickle load” applications that upload new data to a dataset managed by Iceberg in batches once every small period of time will cause a new metadata file to be generated per batch. The faster the batches come, the more metadata files will be generated. Very quickly, the number of files can grow to be extremely large.  

Derived datasets are another very common example of how small files proliferate. For example, Trino, Starburst, or other data processing engines may be run on a raw dataset and produce a new dataset. In many cases, the client chooses to write this new dataset to storage, in order to keep it for future processing tasks. Users who create these derived datasets are not at all concerned about the potential performance side-effects of having many small files. 

 

What’s wrong with having lots of small files?

There are two major problems with allowing the number of files to grow very large. First, most distributed file systems, such as HDFS, have a single, primary node that stores “inode information” (file name, location, and permission information) about each file in the file system in memory on that primary node. Such systems typically start to run out of memory when the file count gets into the billions, and require federation or other ugly fixes at this scale. Although only the largest scale Iceberg deployments will run into the scalability problem, it is still important to be aware of this potential issue.

The larger and more prevalent problem is that all accesses to data managed by Iceberg require access to metadata to ensure that the correct version of data is being read. This means that all files that contain potentially relevant metadata to a particular request need to be opened and merged on the fly. If there are many small files containing potentially relevant metadata that need to be opened, this results in many requests to the file system. Each request of this nature has a fixed cost, and these file system I/O operations can become a bottleneck, since a single data access request may explode into thousands of such operations.

For example, my PhD student Pooja Nilangekar ran an experiment on some TPC-DS data (a well-known performance benchmark in the data systems community) in which the tables were stored in Iceberg and 3% of the data was modified through delete and insert operations. The performance of the TPC-DS queries was reduced by a factor of 1.5X as a result of the many small files generated by these modifications. 

Indeed, performance slowdowns arising from the proliferation of small files are an extremely common reason why accesses to Iceberg datasets slow down over time. Once the problem is identified, several possible solutions exist to fix or alleviate it (discussed below). 

Using the $files metadata table

Obviously, solutions to the “many small files problem” can only be used after the problem has been identified. One approach to help identify the problem is to use the $files metadata table that is stored alongside each table in Iceberg. This table allows you to find information about file sizes and other important statistics within a dataset. For example, if a table called foo is managed by Iceberg, the following SQL query will list the location and size of each data-file associated with the foo table:

SELECT content, file_size_in_bytes FROM “foo$files”  

Although this query only returns information about the data files and delete files stored as part of foo — and not other types of metadata files — these two numbers are directly related. If this query returns many small files, this means that there are also a large number of small metadata files associated with the data files. Best practices are to have file sizes on the order of hundreds of megabytes; if there are thousands of files less than this order of magnitude, it is time to start thinking about taking corrective action. 

The content column (the first one returned in the above query) indicates the type of each file — whether it is a raw data file (in which case it has a value of 0) or a delete file (values of 1 or 2 depending on the type of delete). Having many delete files increases costs of querying the data since these deletes need to be merged with the raw data file when it is read. Therefore, having many delete files — even if they are large — is another reason to take corrective action. 

Why deleting old snapshots doesn’t fully solve the problem

The simplest way to remove some of these Iceberg files is to delete old snapshots. Many data files (along with their associated metadata) exist to enable time travel. If you know that you have no need to run a query as of a particular time in the past, you don’t need these old snapshots, and deleting them is the obvious course of action. For example, running the following command in Trino:

ALTER TABLE foo EXECUTE expire_snapshots(retention_threshold => ‘5d’)

removes all data files along with their associated metadata for snapshots that are older than 5 days. 

As mentioned above, Iceberg defaults the write.metadata.delete-after-commit.enabled parameter to be false. Changing it to true will automatically remove metadata files that are older than that which is set by the parameter: write.metadata.previous-versions-max.

Unfortunately, deleting old snapshots usually does not help with query performance problems because Iceberg generally does not need to read metadata associated with old snapshots at query time. Furthermore, the files returned in the $files table described above only lists files in the current snapshot. This technique is, therefore, mostly utilized to save storage space and avoid the “billions of files” scalability bottleneck in distributed file systems. 

 

File compaction is the more important tool to solve performance problems

The main solution to the performance issues caused by the file explosion problem is to compact data files. As mentioned above, the main performance problem caused by the proliferation of small files is that each file has a fixed cost to open, and the process of opening many small files to respond to a query can become a bottleneck. By merging files together, instead of having many small files, one will instead have fewer larger files that store the exact same information. Importantly, file compaction does not result in any data being removed, so it does not remove the ability to time travel. It merely rewrites the data in storage in a more organized way so that it can be accessed more efficiently.  

File compaction does not have to be an alternative to the approach of deleting old snapshots described in the previous section. They are both useful tools to decrease the number of Iceberg files stored in the file system. 

The easiest way perform file compaction is to decide on a good target file size, and then issue a command to compact data files so that they end up with this specified target file size. A good best practice is to target 100MB file sizes. The syntax of the command varies depending on what system you are using to access Iceberg. In Trino, the command looks like:

ALTER TABLE foo EXECUTE optimize(file_size_threshold => ‘100MB’)

Potential drawbacks and tradeoffs when using file compaction 

The main downside of this file compaction process is that it consumes processing resources, which can potentially slow down concurrent jobs. Furthermore, it may cause commit conflicts if a client attempts to update files currently being compacted, thereby causing query retries, increasing latency and further wasting processing resources.

Therefore, it is important not to run compaction jobs unnecessarily. Furthermore, it is important to prioritize compaction jobs that will yield the most benefits, thereby consuming processing resources wisely. 

In general, the optimize command shown above will not compact files that will yield zero benefit from compaction. For example, if a file has no recently deleted data, and is already at the target file size threshold, the optimize command will not waste resources compacting that file. However, as long as there is a non-zero benefit, the optimize command will spend processing resources compacting data files even if the benefit is tiny. It does not consider the overall efficiency of the compaction and does not make judgment calls regarding whether the cost of compaction is worth it. 

For example, if the file sizes in a table are already close to the target size, the effort copying and rewriting the files will still be incurred, yet the final state after all this effort will hardly be different from the original state. 

Therefore, we strongly recommend that manual or automatic processes are put in place to analyze the benefit of running file compaction jobs in advance, and to strategically prioritize allocating resources to compaction jobs that will likely yield large benefit from compaction — either because those tables are (or anticipated to be) queried frequently or because they currently consist of many small files. As stated above, running the SELECT file_size_in_bytes SQL query from above will check the current file sizes in a table. The results of this query can be used by these manual or automatic processes to drive the strategic compaction decisions. 

Using filters to limit the scope of file compaction 

In many cases, a table that has been compacted will subsequently have additional data written to it. In such cases, it is possible to increase the efficiency of the use of compaction resources to direct the compaction effort to focus on the new data instead of the entire table. This is done via adding a filter to the optimize command:

ALTER TABLE foo EXECUTE optimize

     WHERE CAST(timestamp_tz AS DATE) > DATE ‘2025-01-01’

The above code only compacts files containing data more recent than January 1, 2025. This avoids recompacting the older data that had already been compacted previously.  Currently, for this filter to be accepted, the attributes referred to in the filter condition must be part of the table’s partitioning attributes. However, this requirement may be dropped in the future. 

Another benefit of using filters such as in the example above is that they help reduce the probability of conflicts during the compaction process. Processes can write to other parts of the table while only the parts of the table covered by these filters are being compacted. 

Other best practices when performing file compaction 

Many Iceberg users write code that runs on a periodic basis that checks the file sizes of the current set of tables in a database  (e.g., via the  file_size_in_bytes SQL query) and automatically runs file compaction on tables that contain large numbers of small files. Although this is often a good idea, if the computing resources that can be devoted to file compaction are limited, it is more important to prioritize those resources to compacting tables that are frequently queried (even if they contain fewer small files) over recently added data with many small files that may never get queried.

It is also considered best practice to record the filters used when compacting tables. This facilitates manual and automatic file compaction processes to generate new filters during a subsequent compaction operation on the same table. This ensures that the new filters do not overlap (even slightly) with the previous filters. Following this approach allows these future compaction operations to be more precise in the data targeted during compaction, thereby being more efficient with processing resources. 

Even with precise filters and careful choices when choosing which files to compact, compacting files still consumes many resources since the files being compacted need to be read, merged, and rewritten to storage. In order to avoid drawing resources away from an ongoing query processing workload, it is often best to perform the compaction on a separate cluster than what performs the query processing. For example, a large Icehouse deployment may have a dozen machines running Trino or Starburst that query data stored in Iceberg, and a separate cluster consisting of 3-4 machines that perform background compaction tasks.

 

Conclusion

In summary, file compaction is an extremely important maintenance task for Iceberg deployments. Unless datasets are static, inserts, deletes, and updates all generate metadata files that can slow down query processing over time. Because of this, simply deleting old snapshots is not sufficient to get around this performance problem. Instead, organizations need to devote some resources (and perhaps even a small separate computing cluster) to continuously compacting Iceberg data files, targeting the highest priority tables that are read frequently or that have many small files. These compaction operations can be triggered manually or automatically, but if done manually it is important that this maintenance is not neglected. 

Acknowledgments: I’d like to thank Carl Steinbach and Raunaq Morarka for their feedback and suggestions for this blog post.