Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/828.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`ZipStore` now supports deletes (`supports_deletes` is `True`). Because ZIP files cannot delete entries in place, `ZipStore.delete` and `ZipStore.delete_dir` remove keys by rewriting the archive without the deleted members; overwritten (duplicate) members are compacted as a side effect.
63 changes: 54 additions & 9 deletions src/zarr/storage/_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import os
import shutil
import tempfile
import threading
import time
import zipfile
Expand All @@ -18,7 +19,7 @@
from zarr.core.buffer import Buffer, BufferPrototype

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from collections.abc import AsyncIterator, Callable, Iterable

ZipStoreAccessModeLiteral = Literal["r", "w", "a"]

Expand Down Expand Up @@ -55,7 +56,7 @@ class ZipStore(Store):
"""

supports_writes: bool = True
supports_deletes: bool = False
supports_deletes: bool = True
supports_listing: bool = True

path: Path
Expand Down Expand Up @@ -229,21 +230,65 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None:
if key not in members:
self._set(key, value)

def _rewrite_without(self, should_delete: Callable[[str], bool]) -> None:
# Rewrite the archive, dropping every member for which ``should_delete``
# returns True. ZIP files do not support in-place deletion, so the only
# way to remove an entry is to copy the surviving entries into a fresh
# archive (see issue #828). Duplicate members (created when a chunk is
# overwritten via ``writestr``) are compacted to their most recent value
# as a side effect, since that is what reads already return.
#
# This must be called while holding ``self._lock``.
members: dict[str, zipfile.ZipInfo] = {}
for info in self._zf.infolist():
members[info.filename] = info # keep the last entry for each name

to_delete = [name for name in members if should_delete(name)]
if not to_delete:
# nothing matched; leave the archive untouched
return

fd, tmp_path = tempfile.mkstemp(dir=self.path.parent)
os.close(fd)
try:
with zipfile.ZipFile(
tmp_path, mode="w", compression=self.compression, allowZip64=self.allowZip64
) as new_zf:
for name, info in members.items():
if should_delete(name):
continue
new_zf.writestr(info, self._zf.read(name))
self._zf.close()
os.replace(tmp_path, self.path)
except BaseException:
if os.path.exists(tmp_path):
os.remove(tmp_path)
raise

# Reopen in append mode so subsequent writes preserve the archive
# (the original mode may be "w"/"x", which would truncate or fail).
self._zf = zipfile.ZipFile(
self.path, mode="a", compression=self.compression, allowZip64=self.allowZip64
)

async def delete_dir(self, prefix: str) -> None:
# only raise NotImplementedError if any keys are found
# docstring inherited
self._check_writable()
if not self._is_open:
self._sync_open()
if prefix != "" and not prefix.endswith("/"):
prefix += "/"
async for _ in self.list_prefix(prefix):
raise NotImplementedError
with self._lock:
self._rewrite_without(lambda name: name.startswith(prefix))

async def delete(self, key: str) -> None:
# docstring inherited
# we choose to only raise NotImplementedError here if the key exists
# this allows the array/group APIs to avoid the overhead of existence checks
# deleting a missing key is a no-op, matching the other stores
self._check_writable()
if await self.exists(key):
raise NotImplementedError
if not self._is_open:
self._sync_open()
with self._lock:
self._rewrite_without(lambda name: name == key)

async def exists(self, key: str) -> bool:
# docstring inherited
Expand Down
3 changes: 3 additions & 0 deletions tests/test_codecs/test_sharding.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,9 @@ def test_write_partial_sharded_chunks(store: Store) -> None:
assert np.array_equal(a[0:16, 0:16], data)


# ZipStore overwrites shards by appending duplicate archive members (reads return
# the most recent), which zipfile reports via a "Duplicate name" UserWarning.
@pytest.mark.filterwarnings("ignore:Duplicate name:UserWarning")
@pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"])
async def test_delete_empty_shards(store: Store) -> None:
if not store.supports_deletes:
Expand Down
124 changes: 117 additions & 7 deletions tests/test_store/test_zip.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,114 @@ def test_store_supports_writes(self, store: ZipStore) -> None:
def test_store_supports_listing(self, store: ZipStore) -> None:
assert store.supports_listing

def test_store_supports_deletes(self, store: ZipStore) -> None:
assert store.supports_deletes

async def test_delete_compacts_duplicates(self, store: ZipStore) -> None:
# Overwriting a key leaves a duplicate member in the archive; deleting
# another key rewrites the archive and should compact the duplicates so
# the surviving key has a single, most-recent entry (issue #828).
await store.set("foo", cpu.Buffer.from_bytes(b"v1"))
with pytest.warns(UserWarning, match="Duplicate name: 'foo'"):
await store.set("foo", cpu.Buffer.from_bytes(b"v2"))
await store.set("bar", cpu.Buffer.from_bytes(b"bar"))

await store.delete("bar")

assert not await store.exists("bar")
assert store._zf.namelist().count("foo") == 1
buf = await self.get(store, "foo")
assert buf.to_bytes() == b"v2"

async def test_delete_then_set(self, store: ZipStore) -> None:
# after a delete (which reopens the archive) writes must still work
await store.set("foo", cpu.Buffer.from_bytes(b"foo"))
await store.delete("foo")
assert not await store.exists("foo")
await store.set("baz", cpu.Buffer.from_bytes(b"baz"))
buf = await self.get(store, "baz")
assert buf.to_bytes() == b"baz"

async def test_delete_and_delete_dir_auto_open(self, tmp_path: Path) -> None:
# delete() and delete_dir() should auto-open the archive like _get/_set,
# rather than assuming the caller opened it first.
store = ZipStore(tmp_path / "del.zip", mode="w", read_only=False)
assert not store._is_open
await store.delete("missing") # exercises the auto-open branch in delete()
assert store._is_open

store2 = ZipStore(tmp_path / "deldir.zip", mode="w", read_only=False)
assert not store2._is_open
await store2.delete_dir("missing") # auto-open branch in delete_dir()
assert store2._is_open

async def test_delete_dir_prefix_already_normalized(self, store: ZipStore) -> None:
# a prefix that already ends in "/" must skip the slash-appending branch
await store.set("foo/zarr.json", cpu.Buffer.from_bytes(b"a"))
await store.set("foo/c/0", cpu.Buffer.from_bytes(b"b"))
await store.set("bar/zarr.json", cpu.Buffer.from_bytes(b"c"))

await store.delete_dir("foo/")

assert not await store.exists("foo/zarr.json")
assert not await store.exists("foo/c/0")
assert await store.exists("bar/zarr.json")

async def test_delete_dir_empty_prefix_removes_all(self, store: ZipStore) -> None:
# an empty prefix also skips normalization and should remove everything
await store.set("a", cpu.Buffer.from_bytes(b"a"))
await store.set("b/c", cpu.Buffer.from_bytes(b"b"))

await store.delete_dir("")

assert not await store.exists("a")
assert not await store.exists("b/c")
assert store._zf.namelist() == []

async def test_delete_cleans_up_temp_on_failure(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
# if the rewrite fails (e.g. os.replace raises), the temporary archive
# must be removed and the original left untouched.
import zarr.storage._zip as zip_module

store = ZipStore(tmp_path / "fail.zip", mode="w", read_only=False)
await store.set("foo", cpu.Buffer.from_bytes(b"v"))

def boom(*args: Any, **kwargs: Any) -> None:
raise OSError("replace failed")

monkeypatch.setattr(zip_module.os, "replace", boom)

with pytest.raises(OSError, match="replace failed"):
await store.delete("foo")

# no leftover temp file: only the original archive remains
assert set(os.listdir(tmp_path)) == {"fail.zip"}

async def test_delete_failure_when_temp_already_removed(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
# defensive cleanup branch: if the temp archive is already gone when the
# rewrite fails, cleanup is skipped and the original error still propagates.
import zarr.storage._zip as zip_module

store = ZipStore(tmp_path / "fail2.zip", mode="w", read_only=False)
await store.set("foo", cpu.Buffer.from_bytes(b"v"))

real_remove = zip_module.os.remove

def replace_then_vanish(src: str, dst: str) -> None:
real_remove(src) # temp disappears before the except block runs
raise OSError("replace failed")

monkeypatch.setattr(zip_module.os, "replace", replace_then_vanish)

with pytest.raises(OSError, match="replace failed"):
await store.delete("foo")

assert set(os.listdir(tmp_path)) == {"fail2.zip"}

# TODO: fix this warning
@pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning")
def test_api_integration(self, store: ZipStore) -> None:
Expand All @@ -101,17 +209,19 @@ def test_api_integration(self, store: ZipStore) -> None:
with pytest.warns(UserWarning, match="Duplicate name: 'foo/c/0/0'"):
z[0, 0] = 100

# TODO: assigning an entire chunk to fill value ends up deleting the chunk which is not supported
# a work around will be needed here.
with pytest.raises(NotImplementedError):
z[0:10, 0:10] = 99
# assigning an entire chunk to the fill value deletes the chunk;
# ZipStore now supports deletes by rewriting the archive (issue #828)
z[0:10, 0:10] = 99
expected = data.copy()
expected[0:10, 0:10] = 99
assert np.array_equal(expected, z[:])

bar = root.create_group("bar", attributes={"hello": "world"})
assert "hello" in dict(bar.attrs)

# keys cannot be deleted
with pytest.raises(NotImplementedError):
del root["bar"]
# keys can now be deleted
del root["bar"]
assert "bar" not in root

store.close()

Expand Down
Loading