Fix/iceberg drop tables#16
Conversation
dlt core gates refresh="drop_resources" / refresh="drop_sources" on hasattr(job_client, "drop_tables") (dlt/load/utils.py). Without that method the load layer warns and silently skips the drops, which is what triggered the "Client for iceberg_rest does not implement drop table" message in the field. Tests cover: - method existence (the hasattr gate) - selective drop of named tables only - idempotent behavior for missing tables - delete_schema=True clears _dlt_version rows for the current schema - refresh="drop_resources" end-to-end
IcebergRestClient only exposed drop_storage() (full namespace wipe). dlt core's refresh="drop_resources" / refresh="drop_sources" path calls job_client.drop_tables(*names, delete_schema=True); without it, drops are warned-and-skipped, leaving stale tables in the destination. Drops each named table via the PyIceberg catalog (swallowing NoSuchTableError for idempotence). When delete_schema=True, wipes _dlt_version rows for self.schema.name via table.delete(EqualTo(...)), matching the SqlJobClientBase.drop_tables contract. Inherited _delete_schema_in_storage isn't used because IcebergRestClient extends JobClientBase directly, not SqlJobClientBase, and would need a real SQL client to run the DELETE statement.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3a4cb8f6d7
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for name in table_names: | ||
| identifier = f"{self.config.namespace}.{name}" | ||
| try: | ||
| catalog.drop_table(identifier) |
There was a problem hiding this comment.
Purge Iceberg data when dropping resources
When refresh="drop_resources" or a direct drop_tables call removes a table, this only calls drop_table, which unregisters the table from the catalog but leaves the table's data and metadata files behind. For Iceberg REST catalogs PyIceberg exposes purge_table/purgeRequested specifically to delete those files, so refreshes that are meant to discard a resource can leave stale data in object storage indefinitely, which is surprising for destructive refreshes and can violate retention expectations.
Useful? React with 👍 / 👎.
Hi Nico, another small one that fell out of running
pipeline.extract(source, refresh="drop_resources")afterpipeline.sync_destination()against a Nessie deployment.The load layer emitted:
…and silently skipped the drops, so stale tables stuck around across refreshes.
Problem
dlt core gates the per-table drop path on
hasattr(job_client, "drop_tables")(dlt/load/utils.py:170).IcebergRestClientonly exposeddrop_storage()(a full namespace wipe) -- nodrop_tables(*names, delete_schema=True)-- so the load layer fell back to the warn-and-skip branch. Net effect:refresh="drop_resources"/refresh="drop_sources"were effectively no-ops on this destination.pipeline.sql_client().drop_dataset()had no coherent per-table partner (dataset-level works via the base-classDROP SCHEMA CASCADE).pyicebergdirectly for destructive ops.Solution
Implement the
JobClient.drop_tablescontract onIcebergRestClient:NoSuchTableErrorso the call is idempotent (dlt may pass tables that were never physically created).delete_schema=True, remove all_dlt_versionrows whereschema_name = self.schema.nameviatable.delete(EqualTo(...)), matching theSqlJobClientBase.drop_tablescontract.One deviation worth calling out: the obvious move would be
self._delete_schema_in_storage(self.schema), but that method lives onSqlJobClientBase(notJobClientBase) and usesself.sql_client.execute_sql(...).IcebergRestClientextendsJobClientBasedirectly, and itssql_clientis a DuckDB view provider rather than a real DDL-capable client -- so theDELETEis issued via PyIceberg's row-delete API instead, reusing the pattern already atdestination_client.py:1151-1153.Changes
destination_client.pyIcebergRestClient.drop_tables(new)catalog.drop_table(...);NoSuchTableErroris swallowed. Whendelete_schema=True, deletes_dlt_versionrows forself.schema.nameviaversion_table.delete(EqualTo("schema_name", ...)).No changes to
sql_client.pyorschema_evolution.py.Tests
test_drop_tables.pycovering:hasattrgate (method is actually exposed on the class)delete_schema=Trueclears_dlt_versionrows for the current schemapipeline.run(..., refresh="drop_resources")end-to-end (the originally reported symptom)