tl;dr
As part of the initial load of 30M rows (~8 GB uncompressed) data into our Data Warehouse (ClickHouse), using the dlt library, we ran into a Memory Overcommit error. It turns out that merge write_disposition (specifically the delete+insert setup) uses a (necessary) window function, which takes a significant amount of memory to deduplicate entries. To fix this, initial data migrations with large data should use the replace write_disposition.
Symptom
The issue occurred when trying to extract and load a full table into the data warehouse. Following a 15-min extraction process, the load of the data failed due to ClickHouse running into a “Memory Overcommit” error. This meant the data load seemed to failed.
Cause
Initial thoughts
Initially I assumed that the problem stemmed from the load of the data itself. Turns out that dlt is very good at loading data in chunks and via native formats (in our care Parquet) into the database, so it was not that.
Next, I looked at the actual SQL queries ran, and saw the issue lay with an INSERT INTO ... SELECT * FROM
query that dlt runs to copy data from a staging table into the destination table. Looking at the staging table, all the data had actually been loaded successfully (!!!).
Actual Root Cause
Following a deeper analysis, it looks like the issue was not in fact the INSERT INTO ... SELECT * FROM
, per se, but rather the deduplication check done in the SELECT
portion of that query, which uses a window function PARTITION BY
on Id
+ ROW_NUMBER
over the entire 30M dataset, which lead to the Memory Overcommit issue. This is because in ClickHouse PARTITION BY
typically require materializing partitioned data in memory. This is part of the dlt standard logic when using the merge
write_disposition
which deduplicates all entries in a staging table before loading them into the destination table.
Possible solutions
This issue can be resolved in multiple places:
-
Use
**replace**
write_disposition on initial load (chosen solution) - In my specific case I would only load this data volume on the initial hydration; once the data is bulk of the data is in the data warehouse, we use incremental (delta) loads to load the changes periodically/ on change (10k-100k per load max). Thereplace
write_disposition loads the data directly into the destination tables, rather than a staging table, from where it is copied afterwards. This is also the suggestion in the docs (I saw too late). -
Orchestrator/ dlt chunking - We are using Dagster as our orchestration framework of the data platform, which is able to separate (partition) extraction jobs based on a set of partitions, including temporal ones. This would mean that monthly or weekly partitions can be setup for large tables, with the extraction and load of these tables being split up. We explored this, but it would have been a nightmare to maintain, based on our requirements.
-
Throw memory at it - cannot overcommit it, you have more memory. This was in a pre-productive, self-hosted, environment, therefore resources were somewhat constrained. This would be the easiest (though not best) option if the workers/ DB can scale, as with ClickHouse Cloud. Due to the data we manage and the compliance requirements, we are unable to use this option.
Hope this saves someone some time.