From 175ce653270a89962bdfd819913961c1b0f59ce8 Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Fri, 5 Jun 2026 10:51:12 -0700 Subject: [PATCH 1/6] prototype for negative caching, i.e., get misses --- changes/4040.feature.md | 1 + src/zarr/experimental/cache_store.py | 118 +++++++++++++++- tests/test_experimental/test_cache_store.py | 141 ++++++++++++++++++++ 3 files changed, 258 insertions(+), 2 deletions(-) create mode 100644 changes/4040.feature.md diff --git a/changes/4040.feature.md b/changes/4040.feature.md new file mode 100644 index 0000000000..4d9f2fb9ee --- /dev/null +++ b/changes/4040.feature.md @@ -0,0 +1 @@ +`zarr.experimental.cache_store.CacheStore` gained opt-in negative caching via `cache_missing=True`. When enabled, a full-key read that finds the key absent in the source store is remembered, so repeat reads of that absent key return immediately without a source round-trip — useful for sparse arrays where most chunks resolve to the fill value. Negative entries respect `max_age_seconds`, are bounded by `max_missing_keys` (default 100,000, least-recently-used eviction), and are evicted when the key is written via `set`/`set_if_not_exists`. The default is `False` (no behavior change); negative-cache activity is reported as `negative_hits` in `cache_stats()` and `missing_keys` in `cache_info()`. Only full-key reads are affected (not byte-range reads or `exists`). diff --git a/src/zarr/experimental/cache_store.py b/src/zarr/experimental/cache_store.py index 1535b42f67..380b5e81d2 100644 --- a/src/zarr/experimental/cache_store.py +++ b/src/zarr/experimental/cache_store.py @@ -30,8 +30,14 @@ class _CacheState: hits: int = 0 misses: int = 0 evictions: int = 0 + negative_hits: int = 0 key_insert_times: dict[_CacheEntryKey, float] = field(default_factory=dict) range_cache: dict[str, dict[ByteRequest, Buffer]] = field(default_factory=dict) + # Negative cache: full keys known to be absent in the source store, mapped to + # their (monotonic) insertion time for freshness. OrderedDict gives O(1) LRU + # eviction via popitem(last=False). Kept separate from the byte-size accounting + # above (negative entries carry no data) and bounded by ``max_missing_keys``. + missing_keys: OrderedDict[str, float] = field(default_factory=OrderedDict) class CacheStore(WrapperStore[Store]): @@ -62,6 +68,23 @@ class CacheStore(WrapperStore[Store]): Note: Individual values larger than max_size will not be cached. cache_set_data : bool, optional Whether to cache data when it's written to the store. Default is True. + cache_missing : bool, optional + Whether to remember full-key misses (negative caching). When True, a full-key + ``get`` that finds the key absent in the source store records that absence, so + subsequent ``get``s for the same key return ``None`` without a source round-trip. + This benefits repeated reads of sparse arrays (most chunks absent). Negative + entries respect ``max_age_seconds`` and are evicted when the key is written + (``set``/``set_if_not_exists``). Only full-key reads are affected (not byte-range + reads or ``exists``). Default is False. + + Note: with ``max_age_seconds="infinity"`` a remembered miss never expires, so a + key written to the source by another process would stay invisible through this + cache. Pair ``cache_missing=True`` with a finite ``max_age_seconds`` if the source + may be written concurrently. + max_missing_keys : int, optional + Maximum number of negative (missing-key) entries to retain when + ``cache_missing`` is True. When exceeded, the least recently used missing keys + are evicted. Bounds memory for large sparse scans. Default is 100,000. Examples -------- @@ -91,6 +114,8 @@ class CacheStore(WrapperStore[Store]): max_age_seconds: int | Literal["infinity"] max_size: int | None cache_set_data: bool + cache_missing: bool + max_missing_keys: int _state: _CacheState def __init__( @@ -101,6 +126,8 @@ def __init__( max_age_seconds: int | str = "infinity", max_size: int | None = None, cache_set_data: bool = True, + cache_missing: bool = False, + max_missing_keys: int = 100_000, ) -> None: super().__init__(store) @@ -111,6 +138,9 @@ def __init__( ) raise ValueError(msg) + if max_missing_keys < 1: + raise ValueError("max_missing_keys must be a positive integer") + self._cache = cache_store # Validate and set max_age_seconds if isinstance(max_age_seconds, str): @@ -121,6 +151,8 @@ def __init__( self.max_age_seconds = max_age_seconds self.max_size = max_size self.cache_set_data = cache_set_data + self.cache_missing = cache_missing + self.max_missing_keys = max_missing_keys self._state = _CacheState() def _with_store(self, store: Store) -> Self: @@ -136,6 +168,8 @@ def with_read_only(self, read_only: bool = False) -> Self: max_age_seconds=self.max_age_seconds, max_size=self.max_size, cache_set_data=self.cache_set_data, + cache_missing=self.cache_missing, + max_missing_keys=self.max_missing_keys, ) store._state = self._state return store @@ -151,6 +185,34 @@ def _is_key_fresh(self, entry_key: _CacheEntryKey) -> bool: elapsed = now - self._state.key_insert_times.get(entry_key, 0) return elapsed < self.max_age_seconds + def _is_missing_fresh(self, key: str) -> bool: + """Check if a negative (missing-key) entry is still fresh. + + Mirrors ``_is_key_fresh`` but reads the negative-cache insertion time. + """ + if self.max_age_seconds == "infinity": + return True + elapsed = time.monotonic() - self._state.missing_keys.get(key, 0.0) + return elapsed < self.max_age_seconds + + def _record_missing(self, key: str) -> None: + """Record *key* as known-missing, evicting the oldest entries past the cap. + + Must be called while holding ``self._state.lock``. + """ + self._state.missing_keys[key] = time.monotonic() + self._state.missing_keys.move_to_end(key) + while len(self._state.missing_keys) > self.max_missing_keys: + self._state.missing_keys.popitem(last=False) + self._state.evictions += 1 + + def _evict_missing(self, key: str) -> None: + """Drop any negative entry for *key* (it is now present or being written). + + Must be called while holding ``self._state.lock``. + """ + self._state.missing_keys.pop(key, None) + async def _accommodate_value(self, value_size: int) -> None: """Ensure there is enough space in the cache for a new value. @@ -266,6 +328,10 @@ async def _cache_miss( await self._cache.delete(key) async with self._state.lock: self._remove_from_tracking(key) + # The key is absent in the source: remember the miss so a repeat + # read can short-circuit without a source round-trip. + if self.cache_missing: + self._record_missing(key) else: entry_key: _CacheEntryKey = (key, byte_range) async with self._state.lock: @@ -279,6 +345,10 @@ async def _cache_miss( if byte_range is None: await self._cache.set(key, result) await self._track_entry(key, result) + # A value now exists for this key: drop any stale negative entry. + if self.cache_missing: + async with self._state.lock: + self._evict_missing(key) else: entry_key = (key, byte_range) self._state.range_cache.setdefault(key, {})[byte_range] = result @@ -351,6 +421,17 @@ async def get( Buffer | None The retrieved data, or None if not found """ + # Negative cache fast-path (full-key reads only): a fresh "known absent" record + # short-circuits to None without consulting the positive cache or the source. + # Checked here, before the positive-entry freshness gate, because a negative-only + # key has no positive entry and would otherwise be routed straight to the source. + if self.cache_missing and byte_range is None: + async with self._state.lock: + if key in self._state.missing_keys and self._is_missing_fresh(key): + self._state.negative_hits += 1 + self._state.missing_keys.move_to_end(key) + return None + entry_key: _CacheEntryKey = (key, byte_range) if byte_range is not None else key if not self._is_key_fresh(entry_key): return await self._get_no_cache(key, prototype, byte_range) @@ -369,9 +450,12 @@ async def set(self, key: str, value: Buffer) -> None: The data to store """ await super().set(key, value) - # Invalidate all cached byte-range entries (source data changed) + # Invalidate all cached byte-range entries (source data changed) and drop any + # negative entry — the key now has a value. async with self._state.lock: self._invalidate_range_entries(key) + if self.cache_missing: + self._evict_missing(key) if self.cache_set_data: await self._cache.set(key, value) await self._track_entry(key, value) @@ -380,6 +464,26 @@ async def set(self, key: str, value: Buffer) -> None: async with self._state.lock: self._remove_from_tracking(key) + async def set_if_not_exists(self, key: str, value: Buffer) -> None: + """ + Store data only if the key does not already exist in the source store. + + Parameters + ---------- + key : str + The key to store under + value : Buffer + The data to store + """ + await super().set_if_not_exists(key, value) + # Whether or not the write happened, any negative entry is now unsafe: either + # we just wrote the key, or it already existed (so the record was already + # wrong). Evicting unconditionally is always safe. We do not populate the + # positive cache here — there is no guaranteed-fresh value to store. + if self.cache_missing: + async with self._state.lock: + self._evict_missing(key) + async def delete(self, key: str) -> None: """ Delete data from both the underlying store and cache. @@ -407,18 +511,26 @@ def cache_info(self) -> dict[str, Any]: "max_size": self.max_size, "current_size": self._state.current_size, "cache_set_data": self.cache_set_data, + "cache_missing": self.cache_missing, "tracked_keys": len(self._state.key_insert_times), "cached_keys": len(self._state.cache_order), + "missing_keys": len(self._state.missing_keys), } def cache_stats(self) -> dict[str, Any]: - """Return cache performance statistics.""" + """Return cache performance statistics. + + ``hit_rate`` reflects positive-cache hits over positive lookups only; a + negative-cache hit (an absent key served from the negative cache) is reported + separately as ``negative_hits`` and is counted as neither a hit nor a miss. + """ total_requests = self._state.hits + self._state.misses hit_rate = self._state.hits / total_requests if total_requests > 0 else 0.0 return { "hits": self._state.hits, "misses": self._state.misses, "evictions": self._state.evictions, + "negative_hits": self._state.negative_hits, "total_requests": total_requests, "hit_rate": hit_rate, } @@ -435,7 +547,9 @@ async def clear_cache(self) -> None: self._state.cache_order.clear() self._state.key_sizes.clear() self._state.range_cache.clear() + self._state.missing_keys.clear() self._state.current_size = 0 + self._state.negative_hits = 0 def __repr__(self) -> str: """Return string representation of the cache store.""" diff --git a/tests/test_experimental/test_cache_store.py b/tests/test_experimental/test_cache_store.py index fc17ccd5e1..8181a3044a 100644 --- a/tests/test_experimental/test_cache_store.py +++ b/tests/test_experimental/test_cache_store.py @@ -298,8 +298,10 @@ async def test_cache_info(self, cached_store: CacheStore) -> None: "max_size", "current_size", "cache_set_data", + "cache_missing", "tracked_keys", "cached_keys", + "missing_keys", } assert set(info.keys()) == expected_keys @@ -1047,3 +1049,142 @@ async def test_delete_invalidates_cached_byte_ranges(self) -> None: # Key is gone from source result = await cached_store.get("key", proto) assert result is None + + +class TestCacheStoreNegativeCaching: + """Tests for opt-in negative (missing-key) caching (``cache_missing=True``).""" + + async def test_basic(self, monkeypatch: pytest.MonkeyPatch) -> None: + """A second get of an absent key is served from the negative cache without a + source round-trip.""" + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + + calls = {"n": 0} + orig_get = source.get + + async def counting_get(*args: object, **kwargs: object) -> object: + calls["n"] += 1 + return await orig_get(*args, **kwargs) # type: ignore[arg-type] + + monkeypatch.setattr(source, "get", counting_get) + + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 1 + after_first = calls["n"] + + assert await cs.get("c/0", proto) is None + assert calls["n"] == after_first # no further source access + assert cs.cache_stats()["negative_hits"] == 1 + + async def test_disabled_by_default(self) -> None: + """With the default ``cache_missing=False`` nothing is remembered.""" + cs = CacheStore(MemoryStore(), cache_store=MemoryStore()) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto) is None + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 0 + assert cs.cache_stats()["negative_hits"] == 0 + + async def test_evicted_on_set(self) -> None: + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 1 + + await cs.set("c/0", CPUBuffer.from_bytes(b"value")) + assert cs.cache_info()["missing_keys"] == 0 + result = await cs.get("c/0", proto) + assert result is not None + assert result.to_bytes() == b"value" + + async def test_evicted_on_set_if_not_exists(self) -> None: + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 1 + + await cs.set_if_not_exists("c/0", CPUBuffer.from_bytes(b"value")) + assert cs.cache_info()["missing_keys"] == 0 + result = await cs.get("c/0", proto) + assert result is not None + assert result.to_bytes() == b"value" + + async def test_respects_ttl(self) -> None: + """A negative entry expires after ``max_age_seconds`` so a key written to the + source out-of-band becomes visible again.""" + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True, max_age_seconds=1) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto) is None + + # an external writer adds the key directly to the source store + await source.set("c/0", CPUBuffer.from_bytes(b"late")) + + # before TTL: still reported missing from the negative cache + assert await cs.get("c/0", proto) is None + await asyncio.sleep(1.1) + + # after TTL: the stale negative entry is bypassed, source is consulted + result = await cs.get("c/0", proto) + assert result is not None + assert result.to_bytes() == b"late" + assert cs.cache_info()["missing_keys"] == 0 + + async def test_bounded(self) -> None: + """``max_missing_keys`` bounds the negative cache, evicting LRU entries.""" + cs = CacheStore( + MemoryStore(), cache_store=MemoryStore(), cache_missing=True, max_missing_keys=10 + ) + proto = default_buffer_prototype() + for i in range(25): + assert await cs.get(f"c/{i}", proto) is None + + assert cs.cache_info()["missing_keys"] == 10 + assert cs.cache_stats()["evictions"] >= 15 + # the 10 most-recently-seen keys are retained (LRU) + for i in range(15, 25): + assert f"c/{i}" in cs._state.missing_keys + for i in range(15): + assert f"c/{i}" not in cs._state.missing_keys + + async def test_byte_range_unaffected(self) -> None: + """Byte-range misses do not populate the negative cache.""" + cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + assert await cs.get("c/0", proto, byte_range=RangeByteRequest(0, 4)) is None + assert cs.cache_info()["missing_keys"] == 0 + + async def test_stats_and_info(self) -> None: + """``negative_hits``/``missing_keys``/``cache_missing`` are surfaced and the + positive ``hit_rate`` is unaffected by negative hits.""" + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + + await cs.set("present", CPUBuffer.from_bytes(b"x")) + assert (await cs.get("present", proto)) is not None # positive hit + assert await cs.get("absent", proto) is None # records miss + assert await cs.get("absent", proto) is None # negative hit + + info = cs.cache_info() + stats = cs.cache_stats() + assert info["cache_missing"] is True + assert info["missing_keys"] == 1 + assert stats["negative_hits"] == 1 + assert stats["hits"] == 1 + assert stats["misses"] == 1 # negative hit counts as neither hit nor miss + assert stats["hit_rate"] == 0.5 + + async def test_delete_does_not_record(self) -> None: + """Deleting a key does not create a negative entry (deletion != checked-absent).""" + cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True) + await cs.delete("c/0") + assert cs.cache_info()["missing_keys"] == 0 + + async def test_max_missing_keys_validated(self) -> None: + with pytest.raises(ValueError, match="max_missing_keys"): + CacheStore(MemoryStore(), cache_store=MemoryStore(), max_missing_keys=0) From 5ff0af0bdc1f2669fa72d078ed8707afd9d727b0 Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Fri, 5 Jun 2026 11:32:59 -0700 Subject: [PATCH 2/6] matching current api for positive cached values --- changes/4040.feature.md | 2 +- src/zarr/experimental/cache_store.py | 50 +++++++++------------ tests/test_experimental/test_cache_store.py | 35 +++++---------- 3 files changed, 34 insertions(+), 53 deletions(-) diff --git a/changes/4040.feature.md b/changes/4040.feature.md index 4d9f2fb9ee..7e2813ed5d 100644 --- a/changes/4040.feature.md +++ b/changes/4040.feature.md @@ -1 +1 @@ -`zarr.experimental.cache_store.CacheStore` gained opt-in negative caching via `cache_missing=True`. When enabled, a full-key read that finds the key absent in the source store is remembered, so repeat reads of that absent key return immediately without a source round-trip — useful for sparse arrays where most chunks resolve to the fill value. Negative entries respect `max_age_seconds`, are bounded by `max_missing_keys` (default 100,000, least-recently-used eviction), and are evicted when the key is written via `set`/`set_if_not_exists`. The default is `False` (no behavior change); negative-cache activity is reported as `negative_hits` in `cache_stats()` and `missing_keys` in `cache_info()`. Only full-key reads are affected (not byte-range reads or `exists`). +`zarr.experimental.cache_store.CacheStore` now performs negative caching by default (`cache_missing=True`, opt-out). A full-key read that finds the key absent in the source store is remembered, so repeat reads of that absent key return immediately without a source round-trip — useful for sparse arrays where most chunks resolve to the fill value. Remembered misses respect `max_age_seconds` and are evicted when the key is written via `set`/`set_if_not_exists`. Negative-cache activity is reported as `negative_hits` in `cache_stats()` and `missing_keys` in `cache_info()`. Only full-key reads are affected (not byte-range reads or `exists`). Pass `cache_missing=False` to restore the previous behavior. Like the positive cache (unbounded when `max_size is None`), the negative cache is bounded only by `max_age_seconds`; set a finite TTL for scans over very large sparse key spaces. diff --git a/src/zarr/experimental/cache_store.py b/src/zarr/experimental/cache_store.py index 380b5e81d2..4205686fb6 100644 --- a/src/zarr/experimental/cache_store.py +++ b/src/zarr/experimental/cache_store.py @@ -33,11 +33,11 @@ class _CacheState: negative_hits: int = 0 key_insert_times: dict[_CacheEntryKey, float] = field(default_factory=dict) range_cache: dict[str, dict[ByteRequest, Buffer]] = field(default_factory=dict) - # Negative cache: full keys known to be absent in the source store, mapped to - # their (monotonic) insertion time for freshness. OrderedDict gives O(1) LRU - # eviction via popitem(last=False). Kept separate from the byte-size accounting - # above (negative entries carry no data) and bounded by ``max_missing_keys``. - missing_keys: OrderedDict[str, float] = field(default_factory=OrderedDict) + # Negative cache: full keys known to be absent in the source store, mapped to their + # (monotonic) insertion time. Used to short-circuit repeat reads of absent keys. + # Entries carry no data, so they are kept out of the byte-size accounting above; + # staleness is bounded by ``max_age_seconds``. + missing_keys: dict[str, float] = field(default_factory=dict) class CacheStore(WrapperStore[Store]): @@ -75,16 +75,19 @@ class CacheStore(WrapperStore[Store]): This benefits repeated reads of sparse arrays (most chunks absent). Negative entries respect ``max_age_seconds`` and are evicted when the key is written (``set``/``set_if_not_exists``). Only full-key reads are affected (not byte-range - reads or ``exists``). Default is False. + reads or ``exists``). Default is True. - Note: with ``max_age_seconds="infinity"`` a remembered miss never expires, so a - key written to the source by another process would stay invisible through this - cache. Pair ``cache_missing=True`` with a finite ``max_age_seconds`` if the source - may be written concurrently. - max_missing_keys : int, optional - Maximum number of negative (missing-key) entries to retain when - ``cache_missing`` is True. When exceeded, the least recently used missing keys - are evicted. Bounds memory for large sparse scans. Default is 100,000. + Notes: + + - With ``max_age_seconds="infinity"`` (the default) a remembered miss never + expires, so a key written to the source by another process stays invisible + through this cache. Pair ``cache_missing=True`` with a finite + ``max_age_seconds`` if the source may be written concurrently. + - Like the positive cache (which is unbounded when ``max_size is None``), the + negative cache is bounded only by ``max_age_seconds``. With an infinite TTL, + a scan over a very large sparse key space will accumulate one small entry per + absent key. Set a finite ``max_age_seconds`` (or ``cache_missing=False``) for + such workloads. Examples -------- @@ -115,7 +118,6 @@ class CacheStore(WrapperStore[Store]): max_size: int | None cache_set_data: bool cache_missing: bool - max_missing_keys: int _state: _CacheState def __init__( @@ -126,8 +128,7 @@ def __init__( max_age_seconds: int | str = "infinity", max_size: int | None = None, cache_set_data: bool = True, - cache_missing: bool = False, - max_missing_keys: int = 100_000, + cache_missing: bool = True, ) -> None: super().__init__(store) @@ -138,9 +139,6 @@ def __init__( ) raise ValueError(msg) - if max_missing_keys < 1: - raise ValueError("max_missing_keys must be a positive integer") - self._cache = cache_store # Validate and set max_age_seconds if isinstance(max_age_seconds, str): @@ -152,7 +150,6 @@ def __init__( self.max_size = max_size self.cache_set_data = cache_set_data self.cache_missing = cache_missing - self.max_missing_keys = max_missing_keys self._state = _CacheState() def _with_store(self, store: Store) -> Self: @@ -169,7 +166,6 @@ def with_read_only(self, read_only: bool = False) -> Self: max_size=self.max_size, cache_set_data=self.cache_set_data, cache_missing=self.cache_missing, - max_missing_keys=self.max_missing_keys, ) store._state = self._state return store @@ -196,15 +192,12 @@ def _is_missing_fresh(self, key: str) -> bool: return elapsed < self.max_age_seconds def _record_missing(self, key: str) -> None: - """Record *key* as known-missing, evicting the oldest entries past the cap. + """Record *key* as known-missing (absent in the source store). - Must be called while holding ``self._state.lock``. + Must be called while holding ``self._state.lock``. Staleness is bounded by + ``max_age_seconds`` via ``_is_missing_fresh``. """ self._state.missing_keys[key] = time.monotonic() - self._state.missing_keys.move_to_end(key) - while len(self._state.missing_keys) > self.max_missing_keys: - self._state.missing_keys.popitem(last=False) - self._state.evictions += 1 def _evict_missing(self, key: str) -> None: """Drop any negative entry for *key* (it is now present or being written). @@ -429,7 +422,6 @@ async def get( async with self._state.lock: if key in self._state.missing_keys and self._is_missing_fresh(key): self._state.negative_hits += 1 - self._state.missing_keys.move_to_end(key) return None entry_key: _CacheEntryKey = (key, byte_range) if byte_range is not None else key diff --git a/tests/test_experimental/test_cache_store.py b/tests/test_experimental/test_cache_store.py index 8181a3044a..17ee32c5c4 100644 --- a/tests/test_experimental/test_cache_store.py +++ b/tests/test_experimental/test_cache_store.py @@ -1078,10 +1078,20 @@ async def counting_get(*args: object, **kwargs: object) -> object: assert calls["n"] == after_first # no further source access assert cs.cache_stats()["negative_hits"] == 1 - async def test_disabled_by_default(self) -> None: - """With the default ``cache_missing=False`` nothing is remembered.""" + async def test_enabled_by_default(self) -> None: + """Negative caching is on by default (opt-out).""" cs = CacheStore(MemoryStore(), cache_store=MemoryStore()) proto = default_buffer_prototype() + assert cs.cache_missing is True + assert await cs.get("c/0", proto) is None + assert await cs.get("c/0", proto) is None + assert cs.cache_info()["missing_keys"] == 1 + assert cs.cache_stats()["negative_hits"] == 1 + + async def test_can_be_disabled(self) -> None: + """With ``cache_missing=False`` nothing is remembered.""" + cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=False) + proto = default_buffer_prototype() assert await cs.get("c/0", proto) is None assert await cs.get("c/0", proto) is None assert cs.cache_info()["missing_keys"] == 0 @@ -1134,23 +1144,6 @@ async def test_respects_ttl(self) -> None: assert result.to_bytes() == b"late" assert cs.cache_info()["missing_keys"] == 0 - async def test_bounded(self) -> None: - """``max_missing_keys`` bounds the negative cache, evicting LRU entries.""" - cs = CacheStore( - MemoryStore(), cache_store=MemoryStore(), cache_missing=True, max_missing_keys=10 - ) - proto = default_buffer_prototype() - for i in range(25): - assert await cs.get(f"c/{i}", proto) is None - - assert cs.cache_info()["missing_keys"] == 10 - assert cs.cache_stats()["evictions"] >= 15 - # the 10 most-recently-seen keys are retained (LRU) - for i in range(15, 25): - assert f"c/{i}" in cs._state.missing_keys - for i in range(15): - assert f"c/{i}" not in cs._state.missing_keys - async def test_byte_range_unaffected(self) -> None: """Byte-range misses do not populate the negative cache.""" cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True) @@ -1184,7 +1177,3 @@ async def test_delete_does_not_record(self) -> None: cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True) await cs.delete("c/0") assert cs.cache_info()["missing_keys"] == 0 - - async def test_max_missing_keys_validated(self) -> None: - with pytest.raises(ValueError, match="max_missing_keys"): - CacheStore(MemoryStore(), cache_store=MemoryStore(), max_missing_keys=0) From 497d6981cb2413313a5f7c9e10e5509fb9997145 Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Thu, 25 Jun 2026 16:04:26 -0700 Subject: [PATCH 3/6] unified slot-based cache for both positive and negative entries (shares cache budget) --- src/zarr/experimental/cache_store.py | 215 +++++++++++++------- tests/test_experimental/test_cache_store.py | 139 +++++++++---- 2 files changed, 236 insertions(+), 118 deletions(-) diff --git a/src/zarr/experimental/cache_store.py b/src/zarr/experimental/cache_store.py index 4205686fb6..3289f780e1 100644 --- a/src/zarr/experimental/cache_store.py +++ b/src/zarr/experimental/cache_store.py @@ -20,24 +20,52 @@ # live in the in-memory range cache. _CacheEntryKey = str | tuple[str, ByteRequest] +# Nominal byte cost charged to ``max_size`` for a negative (known-absent) entry. +# Such entries carry no data, but each one occupies an index slot (the key plus a +# small ``_Entry`` record), so it is charged a flat overhead. This lets a single +# ``max_size`` budget bound *total* cache memory — cached values and miss-markers +# together — rather than letting negative entries grow without limit. +_NEGATIVE_ENTRY_SIZE = 128 + + +@dataclass(slots=True) +class _Entry: + """A single cache slot, tracked in :attr:`_CacheState.entries`. + + ``present=True`` (the default): a value is cached for this key — in the + Store-backed cache for full keys, or the in-memory range cache for + byte-range keys — occupying ``size`` bytes. + + ``present=False``: the full key is known-*absent* in the source store (a + negative-cache entry). It carries no data, but is charged a flat + ``_NEGATIVE_ENTRY_SIZE`` against ``max_size`` for the index slot it occupies, + so cached values and miss-markers share one memory budget. Its staleness is + bounded by ``max_age_seconds``. + + Because every key maps to exactly one ``_Entry``, "present" and "absent" are + mutually exclusive by construction: a key cannot simultaneously be cached and + marked missing. + """ + + insert_time: float + size: int = 0 + present: bool = True + @dataclass(slots=True) class _CacheState: - cache_order: OrderedDict[_CacheEntryKey, None] = field(default_factory=OrderedDict) + # Single source of truth for every tracked key (full-key and byte-range, + # present and absent). Ordered for LRU eviction; ``move_to_end`` marks a key + # most-recently-used. Replaces the former parallel cache_order / key_sizes / + # key_insert_times / missing_keys structures so a key has one unambiguous state. + entries: OrderedDict[_CacheEntryKey, _Entry] = field(default_factory=OrderedDict) current_size: int = 0 - key_sizes: dict[_CacheEntryKey, int] = field(default_factory=dict) lock: asyncio.Lock = field(default_factory=asyncio.Lock) hits: int = 0 misses: int = 0 evictions: int = 0 negative_hits: int = 0 - key_insert_times: dict[_CacheEntryKey, float] = field(default_factory=dict) range_cache: dict[str, dict[ByteRequest, Buffer]] = field(default_factory=dict) - # Negative cache: full keys known to be absent in the source store, mapped to their - # (monotonic) insertion time. Used to short-circuit repeat reads of absent keys. - # Entries carry no data, so they are kept out of the byte-size accounting above; - # staleness is bounded by ``max_age_seconds``. - missing_keys: dict[str, float] = field(default_factory=dict) class CacheStore(WrapperStore[Store]): @@ -83,11 +111,13 @@ class CacheStore(WrapperStore[Store]): expires, so a key written to the source by another process stays invisible through this cache. Pair ``cache_missing=True`` with a finite ``max_age_seconds`` if the source may be written concurrently. - - Like the positive cache (which is unbounded when ``max_size is None``), the - negative cache is bounded only by ``max_age_seconds``. With an infinite TTL, - a scan over a very large sparse key space will accumulate one small entry per - absent key. Set a finite ``max_age_seconds`` (or ``cache_missing=False``) for - such workloads. + - Negative entries share the ``max_size`` budget with cached values: each is + charged a small flat overhead, and under memory pressure miss-markers are + evicted (least-recently-used first) before any cached value. A single + ``max_size`` therefore bounds *total* cache memory. When ``max_size is None`` + both caches are unbounded, so a scan over a very large sparse key space will + accumulate one small entry per absent key; set ``max_size`` (and/or a finite + ``max_age_seconds``, or ``cache_missing=False``) for such workloads. Examples -------- @@ -170,56 +200,84 @@ def with_read_only(self, read_only: bool = False) -> Self: store._state = self._state return store - def _is_key_fresh(self, entry_key: _CacheEntryKey) -> bool: - """Check if a cached entry is still fresh based on max_age_seconds. + def _is_fresh(self, entry_key: _CacheEntryKey) -> bool: + """Check if a tracked entry (present or absent) is still fresh. - Uses monotonic time for accurate elapsed time measurement. + Uses monotonic time for accurate elapsed time measurement. A key with no + entry is treated as not fresh (except under an infinite TTL, matching the + previous behaviour of routing unseen keys through the cache path). """ if self.max_age_seconds == "infinity": return True - now = time.monotonic() - elapsed = now - self._state.key_insert_times.get(entry_key, 0) - return elapsed < self.max_age_seconds - - def _is_missing_fresh(self, key: str) -> bool: - """Check if a negative (missing-key) entry is still fresh. - - Mirrors ``_is_key_fresh`` but reads the negative-cache insertion time. - """ - if self.max_age_seconds == "infinity": - return True - elapsed = time.monotonic() - self._state.missing_keys.get(key, 0.0) + entry = self._state.entries.get(entry_key) + if entry is None: + return False + elapsed = time.monotonic() - entry.insert_time return elapsed < self.max_age_seconds - def _record_missing(self, key: str) -> None: + async def _record_missing(self, key: str) -> None: """Record *key* as known-missing (absent in the source store). - Must be called while holding ``self._state.lock``. Staleness is bounded by - ``max_age_seconds`` via ``_is_missing_fresh``. + Overwrites any existing slot for *key*, so a key cannot be both cached and + marked missing. The marker is charged ``_NEGATIVE_ENTRY_SIZE`` against the + shared ``max_size`` budget, then the budget is re-enforced (evicting + absent entries first). Must be called while holding ``self._state.lock``. + Staleness is bounded by ``max_age_seconds`` via ``_is_fresh``. """ - self._state.missing_keys[key] = time.monotonic() + old = self._state.entries.get(key) + if old is not None: + self._state.current_size = max(0, self._state.current_size - old.size) + self._state.entries[key] = _Entry( + insert_time=time.monotonic(), size=_NEGATIVE_ENTRY_SIZE, present=False + ) + self._state.entries.move_to_end(key) + self._state.current_size += _NEGATIVE_ENTRY_SIZE + # Re-enforce the shared budget (no further incoming bytes to reserve). + await self._accommodate_value(0) def _evict_missing(self, key: str) -> None: """Drop any negative entry for *key* (it is now present or being written). - Must be called while holding ``self._state.lock``. + Only removes an *absent* slot — a present (cached) value for the same key is + left untouched. Must be called while holding ``self._state.lock``. """ - self._state.missing_keys.pop(key, None) + entry = self._state.entries.get(key) + if entry is not None and not entry.present: + del self._state.entries[key] async def _accommodate_value(self, value_size: int) -> None: - """Ensure there is enough space in the cache for a new value. + """Evict until ``value_size`` more bytes fit within ``max_size``. - Must be called while holding self._state.lock. + Eviction is *absent-first*: least-recently-used negative markers are + dropped before any cached value, because miss-markers are cheap to + regenerate (just re-probe the source) while cached data is not. A cached + value is only evicted once no negative markers remain. Must be called + while holding self._state.lock. """ if self.max_size is None: return - # Remove least recently used items until we have enough space - while self._state.current_size + value_size > self.max_size and self._state.cache_order: - # Get the least recently used key (first in OrderedDict) - lru_key = next(iter(self._state.cache_order)) + while self._state.current_size + value_size > self.max_size: + lru_key = self._next_eviction_candidate() + if lru_key is None: + break await self._evict_key(lru_key) + def _next_eviction_candidate(self) -> _CacheEntryKey | None: + """Return the next entry to evict, preferring absent markers (LRU-first). + + Walks entries in LRU order: the first absent entry found is returned; if + none are absent, the least-recently-used present entry is returned. Must + be called while holding self._state.lock. + """ + lru_present: _CacheEntryKey | None = None + for entry_key, entry in self._state.entries.items(): + if not entry.present: + return entry_key + if lru_present is None: + lru_present = entry_key + return lru_present + async def _evict_key(self, entry_key: _CacheEntryKey) -> None: """Evict a cache entry. @@ -229,10 +287,13 @@ async def _evict_key(self, entry_key: _CacheEntryKey) -> None: For ``(str, ByteRequest)`` keys the entry is removed from the in-memory range cache. """ - key_size = self._state.key_sizes.get(entry_key, 0) + entry = self._state.entries.pop(entry_key, None) + key_size = entry.size if entry is not None else 0 if isinstance(entry_key, str): - await self._cache.delete(entry_key) + # Absent markers store no value in the backing cache — skip the delete. + if entry is None or entry.present: + await self._cache.delete(entry_key) else: base_key, byte_range = entry_key per_key = self._state.range_cache.get(base_key) @@ -241,9 +302,6 @@ async def _evict_key(self, entry_key: _CacheEntryKey) -> None: if not per_key: del self._state.range_cache[base_key] - self._state.cache_order.pop(entry_key, None) - self._state.key_insert_times.pop(entry_key, None) - self._state.key_sizes.pop(entry_key, None) self._state.current_size = max(0, self._state.current_size - key_size) self._state.evictions += 1 @@ -263,36 +321,38 @@ async def _track_entry(self, entry_key: _CacheEntryKey, value: Buffer) -> bool: return False async with self._state.lock: - # If key already exists, subtract old size first - if entry_key in self._state.key_sizes: - old_size = self._state.key_sizes[entry_key] - self._state.current_size -= old_size + # If key already exists, subtract old size first (an absent slot has + # size 0, so this also cleanly upgrades a negative entry to present). + old = self._state.entries.get(entry_key) + if old is not None: + self._state.current_size -= old.size # Make room for the new value await self._accommodate_value(value_size) - # Update tracking atomically - self._state.cache_order[entry_key] = None + # Update tracking atomically. Assigning to an existing key preserves + # its LRU position, matching the previous behaviour. + self._state.entries[entry_key] = _Entry( + insert_time=time.monotonic(), size=value_size, present=True + ) self._state.current_size += value_size - self._state.key_sizes[entry_key] = value_size - self._state.key_insert_times[entry_key] = time.monotonic() return True async def _update_access_order(self, entry_key: _CacheEntryKey) -> None: """Update the access order for LRU tracking.""" - if entry_key in self._state.cache_order: + if entry_key in self._state.entries: async with self._state.lock: - self._state.cache_order.move_to_end(entry_key) + self._state.entries.move_to_end(entry_key) def _remove_from_tracking(self, entry_key: _CacheEntryKey) -> None: - """Remove an entry from all tracking structures. + """Remove an entry from tracking, reclaiming any bytes it accounted for. Must be called while holding self._state.lock. """ - self._state.cache_order.pop(entry_key, None) - self._state.key_insert_times.pop(entry_key, None) - self._state.key_sizes.pop(entry_key, None) + entry = self._state.entries.pop(entry_key, None) + if entry is not None: + self._state.current_size = max(0, self._state.current_size - entry.size) def _invalidate_range_entries(self, key: str) -> None: """Remove all byte-range entries for *key* from the range cache and tracking. @@ -303,10 +363,9 @@ def _invalidate_range_entries(self, key: str) -> None: if per_key is not None: for byte_range in per_key: entry_key: _CacheEntryKey = (key, byte_range) - entry_size = self._state.key_sizes.pop(entry_key, 0) - self._state.cache_order.pop(entry_key, None) - self._state.key_insert_times.pop(entry_key, None) - self._state.current_size = max(0, self._state.current_size - entry_size) + entry = self._state.entries.pop(entry_key, None) + if entry is not None: + self._state.current_size = max(0, self._state.current_size - entry.size) # ------------------------------------------------------------------ # get helpers @@ -324,7 +383,7 @@ async def _cache_miss( # The key is absent in the source: remember the miss so a repeat # read can short-circuit without a source round-trip. if self.cache_missing: - self._record_missing(key) + await self._record_missing(key) else: entry_key: _CacheEntryKey = (key, byte_range) async with self._state.lock: @@ -337,11 +396,10 @@ async def _cache_miss( else: if byte_range is None: await self._cache.set(key, result) + # ``_track_entry`` overwrites the key's single slot with a present + # entry, so any prior negative marker is structurally replaced — + # no separate negative-cache eviction is needed here. await self._track_entry(key, result) - # A value now exists for this key: drop any stale negative entry. - if self.cache_missing: - async with self._state.lock: - self._evict_missing(key) else: entry_key = (key, byte_range) self._state.range_cache.setdefault(key, {})[byte_range] = result @@ -420,12 +478,13 @@ async def get( # key has no positive entry and would otherwise be routed straight to the source. if self.cache_missing and byte_range is None: async with self._state.lock: - if key in self._state.missing_keys and self._is_missing_fresh(key): + entry = self._state.entries.get(key) + if entry is not None and not entry.present and self._is_fresh(key): self._state.negative_hits += 1 return None entry_key: _CacheEntryKey = (key, byte_range) if byte_range is not None else key - if not self._is_key_fresh(entry_key): + if not self._is_fresh(entry_key): return await self._get_no_cache(key, prototype, byte_range) else: return await self._get_try_cache(key, prototype, byte_range) @@ -495,6 +554,8 @@ async def delete(self, key: str) -> None: def cache_info(self) -> dict[str, Any]: """Return information about the cache state.""" + present = sum(1 for entry in self._state.entries.values() if entry.present) + missing = len(self._state.entries) - present return { "cache_store_type": type(self._cache).__name__, "max_age_seconds": "infinity" @@ -504,9 +565,9 @@ def cache_info(self) -> dict[str, Any]: "current_size": self._state.current_size, "cache_set_data": self.cache_set_data, "cache_missing": self.cache_missing, - "tracked_keys": len(self._state.key_insert_times), - "cached_keys": len(self._state.cache_order), - "missing_keys": len(self._state.missing_keys), + "tracked_keys": len(self._state.entries), + "cached_keys": present, + "missing_keys": missing, } def cache_stats(self) -> dict[str, Any]: @@ -535,16 +596,14 @@ async def clear_cache(self) -> None: # Reset tracking async with self._state.lock: - self._state.key_insert_times.clear() - self._state.cache_order.clear() - self._state.key_sizes.clear() + self._state.entries.clear() self._state.range_cache.clear() - self._state.missing_keys.clear() self._state.current_size = 0 self._state.negative_hits = 0 def __repr__(self) -> str: """Return string representation of the cache store.""" + cached_keys = sum(1 for entry in self._state.entries.values() if entry.present) return ( f"{self.__class__.__name__}(" f"store={self._store!r}, " @@ -552,5 +611,5 @@ def __repr__(self) -> str: f"max_age_seconds={self.max_age_seconds}, " f"max_size={self.max_size}, " f"current_size={self._state.current_size}, " - f"cached_keys={len(self._state.cache_order)})" + f"cached_keys={cached_keys})" ) diff --git a/tests/test_experimental/test_cache_store.py b/tests/test_experimental/test_cache_store.py index 8c5fe2c281..c8b12536be 100644 --- a/tests/test_experimental/test_cache_store.py +++ b/tests/test_experimental/test_cache_store.py @@ -10,7 +10,7 @@ from zarr.abc.store import RangeByteRequest, Store, SuffixByteRequest from zarr.core.buffer.core import default_buffer_prototype from zarr.core.buffer.cpu import Buffer as CPUBuffer -from zarr.experimental.cache_store import CacheStore +from zarr.experimental.cache_store import CacheStore, _Entry from zarr.storage import MemoryStore @@ -62,7 +62,7 @@ async def test_with_read_only_round_trip(self) -> None: # Cache configuration and state are shared assert writer._cache is cached_ro._cache assert writer._state is cached_ro._state - assert writer._state.key_insert_times is cached_ro._state.key_insert_times + assert writer._state.entries is cached_ro._state.entries # Writes via the writable cache store succeed and are cached await writer.set("foo", buf) @@ -132,13 +132,13 @@ async def test_cache_expiration(self) -> None: await cached_store.set("expire_key", test_data) # Should be fresh initially - assert cached_store._is_key_fresh("expire_key") + assert cached_store._is_fresh("expire_key") # Wait for expiration await asyncio.sleep(1.1) # Should now be stale - assert not cached_store._is_key_fresh("expire_key") + assert not cached_store._is_fresh("expire_key") async def test_cache_set_data_false(self, source_store: Store, cache_store: Store) -> None: """Test behavior when cache_set_data=False.""" @@ -222,11 +222,11 @@ async def test_infinity_max_age(self, cached_store: CacheStore) -> None: await cached_store.set("eternal_key", test_data) # Should always be fresh - assert cached_store._is_key_fresh("eternal_key") + assert cached_store._is_fresh("eternal_key") # Even after time passes await asyncio.sleep(0.1) - assert cached_store._is_key_fresh("eternal_key") + assert cached_store._is_fresh("eternal_key") async def test_cache_returns_cached_data_for_performance( self, cached_store: CacheStore, source_store: Store @@ -235,7 +235,9 @@ async def test_cache_returns_cached_data_for_performance( # Put data in cache but not source (simulates orphaned cache entry) test_data = CPUBuffer.from_bytes(b"orphaned data") await cached_store._cache.set("orphan_key", test_data) - cached_store._state.key_insert_times["orphan_key"] = time.monotonic() + cached_store._state.entries["orphan_key"] = _Entry( + insert_time=time.monotonic(), size=len(test_data), present=True + ) # Cache should return data for performance (no source verification) result = await cached_store.get("orphan_key", default_buffer_prototype()) @@ -244,7 +246,7 @@ async def test_cache_returns_cached_data_for_performance( # Cache entry should remain (performance optimization) assert await cached_store._cache.exists("orphan_key") - assert "orphan_key" in cached_store._state.key_insert_times + assert "orphan_key" in cached_store._state.entries async def test_cache_coherency_through_expiration(self) -> None: """Test that cache coherency is managed through cache expiration, not source verification.""" @@ -380,7 +382,7 @@ async def test_max_age_infinity(self) -> None: await cached_store.set("test_key", test_data) # Even after time passes, key should be fresh - assert cached_store._is_key_fresh("test_key") + assert cached_store._is_fresh("test_key") async def test_max_age_numeric(self) -> None: """Test cache with numeric max age.""" @@ -397,13 +399,13 @@ async def test_max_age_numeric(self) -> None: await cached_store.set("test_key", test_data) # Key should be fresh initially - assert cached_store._is_key_fresh("test_key") + assert cached_store._is_fresh("test_key") # Manually set old timestamp to test expiration - cached_store._state.key_insert_times["test_key"] = time.monotonic() - 2 # 2 seconds ago + cached_store._state.entries["test_key"].insert_time = time.monotonic() - 2 # 2 seconds ago # Key should now be stale - assert not cached_store._is_key_fresh("test_key") + assert not cached_store._is_fresh("test_key") async def test_cache_set_data_disabled(self) -> None: """Test cache behavior when cache_set_data is False.""" @@ -553,8 +555,8 @@ async def test_evict_key_exception_handling(self) -> None: await cached_store.set("test_key", test_data) # Manually corrupt the tracking to trigger exception - # Remove from one structure but not others to create inconsistency - del cached_store._state.cache_order["test_key"] + # Remove the tracked entry while leaving the cached value behind + del cached_store._state.entries["test_key"] # Try to evict - should handle the KeyError gracefully await cached_store._evict_key("test_key") @@ -575,16 +577,16 @@ async def test_get_no_cache_delete_tracking(self) -> None: await cached_store._track_entry("phantom_key", test_data) # Verify it's in tracking - assert "phantom_key" in cached_store._state.cache_order - assert "phantom_key" in cached_store._state.key_insert_times + assert "phantom_key" in cached_store._state.entries # Now try to get it - since it's not in source, should clean up tracking result = await cached_store._get_no_cache("phantom_key", default_buffer_prototype()) assert result is None - # Should have cleaned up tracking - assert "phantom_key" not in cached_store._state.cache_order - assert "phantom_key" not in cached_store._state.key_insert_times + # Should have cleaned up tracking (the positive entry is gone). With + # cache_missing on by default, a negative marker replaces it. + entry = cached_store._state.entries.get("phantom_key") + assert entry is None or not entry.present async def test_accommodate_value_no_max_size(self) -> None: """Test _accommodate_value early return when max_size is None.""" @@ -644,9 +646,7 @@ async def set_large(key: str) -> None: # Size should be consistent with tracked keys assert info["current_size"] <= 200 # Might pass # But verify actual cache store size matches tracking - total_size = sum( - cached_store._state.key_sizes.get(k, 0) for k in cached_store._state.cache_order - ) + total_size = sum(entry.size for entry in cached_store._state.entries.values()) assert total_size == info["current_size"] # WOULD FAIL async def test_concurrent_get_and_evict(self) -> None: @@ -675,7 +675,10 @@ async def write_key() -> None: # Verify consistency info = cached_store.cache_info() assert info["current_size"] <= 100 - assert len(cached_store._state.cache_order) == len(cached_store._state.key_sizes) + # Tracked size accounting stays consistent with all entries (present + # values plus any negative markers, which each carry a flat overhead). + total_size = sum(entry.size for entry in cached_store._state.entries.values()) + assert total_size == info["current_size"] async def test_eviction_actually_deletes_from_cache_store(self) -> None: """Test that eviction removes keys from cache_store, not just tracking.""" @@ -696,8 +699,7 @@ async def test_eviction_actually_deletes_from_cache_store(self) -> None: await cached_store.set("key2", data2) # Check tracking - key1 should be removed - assert "key1" not in cached_store._state.cache_order - assert "key1" not in cached_store._state.key_sizes + assert "key1" not in cached_store._state.entries # CRITICAL: key1 should also be removed from cache_store assert not await cache_store.exists("key1"), ( @@ -769,21 +771,15 @@ async def test_all_tracked_keys_exist_in_cache_store(self) -> None: data = CPUBuffer.from_bytes(b"x" * 50) await cached_store.set(f"key_{i}", data) - # Every str key in tracking should exist in cache_store - # (tuple keys are byte-range entries stored in-memory, not in the Store) - for entry_key in cached_store._state.cache_order: - if isinstance(entry_key, str): + # Every present str key in tracking should exist in cache_store. + # (tuple keys are byte-range entries stored in-memory, not in the Store; + # absent entries are negative markers with no stored value.) + for entry_key, entry in cached_store._state.entries.items(): + if isinstance(entry_key, str) and entry.present: assert await cache_store.exists(entry_key), ( f"Key '{entry_key}' is tracked but doesn't exist in cache_store" ) - # Every str key in _key_sizes should exist in cache_store - for entry_key in cached_store._state.key_sizes: - if isinstance(entry_key, str): - assert await cache_store.exists(entry_key), ( - f"Key '{entry_key}' has size tracked but doesn't exist in cache_store" - ) - # Additional coverage tests for 100% coverage async def test_cache_store_requires_delete_support(self) -> None: @@ -999,13 +995,13 @@ async def test_set_invalidates_cached_byte_ranges(self) -> None: assert r1.to_bytes() == b"old" # Byte-range entry should be in range_cache - assert ("key", RangeByteRequest(0, 3)) in cached_store._state.cache_order + assert ("key", RangeByteRequest(0, 3)) in cached_store._state.entries # Overwrite via set() — range entries must be invalidated await cached_store.set("key", CPUBuffer.from_bytes(b"NEW DATA!!")) # The old range entry should be gone from tracking and range_cache - assert ("key", RangeByteRequest(0, 3)) not in cached_store._state.cache_order + assert ("key", RangeByteRequest(0, 3)) not in cached_store._state.entries assert "key" not in cached_store._state.range_cache # A fresh byte-range read should return the new data @@ -1027,12 +1023,12 @@ async def test_delete_invalidates_cached_byte_ranges(self) -> None: assert r is not None assert r.to_bytes() == b"hello" - assert ("key", RangeByteRequest(0, 5)) in cached_store._state.cache_order + assert ("key", RangeByteRequest(0, 5)) in cached_store._state.entries # Delete the key — range entries must be cleaned up await cached_store.delete("key") - assert ("key", RangeByteRequest(0, 5)) not in cached_store._state.cache_order + assert ("key", RangeByteRequest(0, 5)) not in cached_store._state.entries assert "key" not in cached_store._state.range_cache # Key is gone from source @@ -1166,3 +1162,66 @@ async def test_delete_does_not_record(self) -> None: cs = CacheStore(MemoryStore(), cache_store=MemoryStore(), cache_missing=True) await cs.delete("c/0") assert cs.cache_info()["missing_keys"] == 0 + + async def test_negative_entry_counts_against_max_size(self) -> None: + """A negative marker is charged against the shared ``max_size`` budget.""" + from zarr.experimental.cache_store import _NEGATIVE_ENTRY_SIZE + + cs = CacheStore( + MemoryStore(), cache_store=MemoryStore(), cache_missing=True, max_size=10_000 + ) + proto = default_buffer_prototype() + assert cs.cache_info()["current_size"] == 0 + assert await cs.get("absent", proto) is None + assert cs.cache_info()["current_size"] == _NEGATIVE_ENTRY_SIZE + + async def test_shared_budget_bounds_negative_entries(self) -> None: + """Many misses cannot grow the cache past ``max_size`` — old negative + markers are evicted (LRU) to stay within the shared budget.""" + from zarr.experimental.cache_store import _NEGATIVE_ENTRY_SIZE + + cap = 5 + cs = CacheStore( + MemoryStore(), + cache_store=MemoryStore(), + cache_missing=True, + max_size=cap * _NEGATIVE_ENTRY_SIZE, + ) + proto = default_buffer_prototype() + for i in range(25): + assert await cs.get(f"absent/{i}", proto) is None + + info = cs.cache_info() + assert info["missing_keys"] == cap + assert info["current_size"] <= cap * _NEGATIVE_ENTRY_SIZE + # The most-recent misses are retained (LRU eviction of the oldest). + assert await cs.get("absent/24", proto) is None + assert cs.cache_stats()["negative_hits"] >= 1 + + async def test_absent_evicted_before_present(self) -> None: + """Under memory pressure, miss-markers are evicted before cached values.""" + from zarr.experimental.cache_store import _NEGATIVE_ENTRY_SIZE + + source = MemoryStore() + # Budget for one value plus a couple of negative markers. + value = CPUBuffer.from_bytes(b"v" * 64) + cs = CacheStore( + source, + cache_store=MemoryStore(), + cache_missing=True, + max_size=len(value) + 2 * _NEGATIVE_ENTRY_SIZE, + ) + proto = default_buffer_prototype() + + # Cache a present value, then record several misses that exceed the budget. + await source.set("present", value) + assert (await cs.get("present", proto)) is not None + for i in range(5): + assert await cs.get(f"absent/{i}", proto) is None + + # The present value survives; negative markers were evicted to make room. + info = cs.cache_info() + assert info["cached_keys"] == 1 + assert "present" in cs._state.entries + assert cs._state.entries["present"].present + assert info["current_size"] <= cs.max_size From 7829bfbd2d0fd3ad4a571807e20aeee82925ace5 Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Thu, 25 Jun 2026 16:27:24 -0700 Subject: [PATCH 4/6] fixing eviction bug where we weren't reclaiming bytes on evicted negative entries --- src/zarr/experimental/cache_store.py | 45 ++++++++++++++++----- tests/test_experimental/test_cache_store.py | 44 +++++++++++++++++++- 2 files changed, 77 insertions(+), 12 deletions(-) diff --git a/src/zarr/experimental/cache_store.py b/src/zarr/experimental/cache_store.py index 3289f780e1..f758c0ec89 100644 --- a/src/zarr/experimental/cache_store.py +++ b/src/zarr/experimental/cache_store.py @@ -118,6 +118,11 @@ class CacheStore(WrapperStore[Store]): both caches are unbounded, so a scan over a very large sparse key space will accumulate one small entry per absent key; set ``max_size`` (and/or a finite ``max_age_seconds``, or ``cache_missing=False``) for such workloads. + - This is store-level, per-key negative caching aimed at the stock ``arr[:]`` + path, which probes every chunk. For very large sparse arrays, prefer the + array-level sparse-read primitives ``zarr.shards_initialized`` and + ``zarr.read_regions`` (PR #4028), which touch only populated chunks and so + never issue the empty-chunk reads this cache would otherwise remember. Examples -------- @@ -218,31 +223,47 @@ def _is_fresh(self, entry_key: _CacheEntryKey) -> bool: async def _record_missing(self, key: str) -> None: """Record *key* as known-missing (absent in the source store). - Overwrites any existing slot for *key*, so a key cannot be both cached and - marked missing. The marker is charged ``_NEGATIVE_ENTRY_SIZE`` against the - shared ``max_size`` budget, then the budget is re-enforced (evicting - absent entries first). Must be called while holding ``self._state.lock``. - Staleness is bounded by ``max_age_seconds`` via ``_is_fresh``. + Charges a flat ``_NEGATIVE_ENTRY_SIZE`` against the shared ``max_size`` + budget. A negative marker is strictly lower priority than cached data: it + may only displace *other* (older) absent markers to fit, never a cached + value, and is skipped entirely if the budget is full of cached values. + + The caller (``_cache_miss``) has already removed any backing-store value and + tracking slot for *key*, so this records a fresh marker. Must be called + while holding ``self._state.lock``. Staleness is bounded by + ``max_age_seconds`` via ``_is_fresh``. """ - old = self._state.entries.get(key) + # Drop any pre-existing slot for this key, reclaiming its bytes. + old = self._state.entries.pop(key, None) if old is not None: self._state.current_size = max(0, self._state.current_size - old.size) + + # Make room by evicting older absent markers only — never cached values. + if self.max_size is not None: + while self._state.current_size + _NEGATIVE_ENTRY_SIZE > self.max_size: + lru_absent = next( + (k for k, e in self._state.entries.items() if not e.present), None + ) + if lru_absent is None: + return # only cached values fill the budget — don't record the miss + await self._evict_key(lru_absent) + self._state.entries[key] = _Entry( insert_time=time.monotonic(), size=_NEGATIVE_ENTRY_SIZE, present=False ) self._state.entries.move_to_end(key) self._state.current_size += _NEGATIVE_ENTRY_SIZE - # Re-enforce the shared budget (no further incoming bytes to reserve). - await self._accommodate_value(0) def _evict_missing(self, key: str) -> None: """Drop any negative entry for *key* (it is now present or being written). Only removes an *absent* slot — a present (cached) value for the same key is - left untouched. Must be called while holding ``self._state.lock``. + left untouched — and reclaims the marker's charged bytes. Must be called + while holding ``self._state.lock``. """ entry = self._state.entries.get(key) if entry is not None and not entry.present: + self._state.current_size = max(0, self._state.current_size - entry.size) del self._state.entries[key] async def _accommodate_value(self, value_size: int) -> None: @@ -341,8 +362,10 @@ async def _track_entry(self, entry_key: _CacheEntryKey, value: Buffer) -> bool: async def _update_access_order(self, entry_key: _CacheEntryKey) -> None: """Update the access order for LRU tracking.""" - if entry_key in self._state.entries: - async with self._state.lock: + async with self._state.lock: + # Re-check membership under the lock: the entry may have been evicted + # by a concurrent operation between the call and acquiring the lock. + if entry_key in self._state.entries: self._state.entries.move_to_end(entry_key) def _remove_from_tracking(self, entry_key: _CacheEntryKey) -> None: diff --git a/tests/test_experimental/test_cache_store.py b/tests/test_experimental/test_cache_store.py index c8b12536be..353e73af82 100644 --- a/tests/test_experimental/test_cache_store.py +++ b/tests/test_experimental/test_cache_store.py @@ -1219,9 +1219,51 @@ async def test_absent_evicted_before_present(self) -> None: for i in range(5): assert await cs.get(f"absent/{i}", proto) is None - # The present value survives; negative markers were evicted to make room. + # The present value survives; markers are bounded and never evict it. info = cs.cache_info() assert info["cached_keys"] == 1 assert "present" in cs._state.entries assert cs._state.entries["present"].present + # Markers fill only the room left over by the cached value (2 here), proving + # both that misses were actually recorded and that they were bounded. + assert info["missing_keys"] == 2 assert info["current_size"] <= cs.max_size + + async def test_no_size_leak_on_miss_then_write(self) -> None: + """Recording a miss then writing the key must not leak the marker's charge + against ``current_size`` (regression for negative-entry accounting).""" + from zarr.experimental.cache_store import _NEGATIVE_ENTRY_SIZE + + source = MemoryStore() + value = CPUBuffer.from_bytes(b"v" * 50) + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True, max_size=10_000) + proto = default_buffer_prototype() + + # Miss → marker charged; then write the same key → marker must be reclaimed. + assert await cs.get("k", proto) is None + assert cs.cache_info()["current_size"] == _NEGATIVE_ENTRY_SIZE + await cs.set("k", value) + + info = cs.cache_info() + assert info["missing_keys"] == 0 + assert info["cached_keys"] == 1 + # Only the value's bytes remain — no leftover marker overhead. + assert info["current_size"] == len(value) + # And the invariant holds: current_size == sum of all tracked entry sizes. + total = sum(entry.size for entry in cs._state.entries.values()) + assert total == info["current_size"] + + async def test_no_size_leak_on_miss_then_set_if_not_exists(self) -> None: + """``set_if_not_exists`` after a miss reclaims the marker's charge too.""" + source = MemoryStore() + value = CPUBuffer.from_bytes(b"v" * 50) + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True, max_size=10_000) + proto = default_buffer_prototype() + + assert await cs.get("k", proto) is None + await cs.set_if_not_exists("k", value) + + info = cs.cache_info() + assert info["missing_keys"] == 0 + total = sum(entry.size for entry in cs._state.entries.values()) + assert total == info["current_size"] From 2ca2376d1810458aa0c2d3dd02b39f89b671002d Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Thu, 25 Jun 2026 16:43:02 -0700 Subject: [PATCH 5/6] updating narrative user docs --- docs/user-guide/experimental.md | 56 +++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/docs/user-guide/experimental.md b/docs/user-guide/experimental.md index 1c6d952c7c..7f9beea381 100644 --- a/docs/user-guide/experimental.md +++ b/docs/user-guide/experimental.md @@ -137,6 +137,51 @@ cache = CacheStore( ) ``` +**cache_missing**: Controls *negative caching* — remembering keys that are absent in the +source store (on by default). Without it, the positive cache cannot help with an absent +key: there is no value to store, so every read re-pays a source round-trip. This is the +dominant cost when reading sparse arrays (mostly empty chunks) repeatedly through a cache. +With `cache_missing=True`, a full-key read that finds the key absent records that absence, +so subsequent reads of the same key return immediately without consulting the source. The +remembered miss is evicted when the key is written and respects `max_age_seconds`. + +```python exec="true" session="experimental" source="above" +import asyncio +from zarr.storage import MemoryStore +from zarr.core.buffer import default_buffer_prototype + +neg_cache = CacheStore( + store=MemoryStore(), + cache_store=MemoryStore(), + cache_missing=True, # default; pass False to disable + max_age_seconds=300, # recommended: bound staleness of remembered misses +) + +async def read_absent_twice(): + proto = default_buffer_prototype() + await neg_cache.get("c/0", proto) # first read: real miss, consults the source + await neg_cache.get("c/0", proto) # second read: served from the negative cache + +asyncio.run(read_absent_twice()) + +info = neg_cache.cache_info() +print(info['cache_missing']) # True +print(info['missing_keys']) # 1 — one remembered absent key +print(neg_cache.cache_stats()['negative_hits']) # 1 — one read served without a source round-trip +``` + +Negative markers share the `max_size` budget with cached values: each is charged a small +flat overhead, and under memory pressure markers are evicted (least-recently-used first) +before any cached value, so a flood of empty-chunk reads can never evict real cached data. +Only full-key reads are affected — byte-range reads and `exists()` are unchanged. + +> **Note:** With the default `max_age_seconds="infinity"`, a remembered miss never expires, +> so a key written to the source by another process stays invisible through the cache until +> it is written through the cache. Pair `cache_missing=True` with a finite `max_age_seconds` +> when the source may be written concurrently. For very large sparse arrays, prefer the +> array-level sparse-read primitives `zarr.shards_initialized` / `zarr.read_regions`, which +> read only populated chunks and avoid the empty-chunk reads entirely. + ## Cache Statistics The CacheStore provides statistics to monitor cache performance and state: @@ -155,9 +200,19 @@ print(info['current_size']) print(info['tracked_keys']) print(info['cached_keys']) print(info['cache_set_data']) +print(info['cache_missing']) # negative caching enabled? +print(info['missing_keys']) # number of remembered absent keys + +# cache_stats() reports hit/miss counts and negative-cache activity +stats = cached_store.cache_stats() +print(stats['hits']) +print(stats['misses']) +print(stats['negative_hits']) # absent-key reads served without a source round-trip ``` The `cache_info()` method returns a dictionary with detailed information about the cache state. +A negative hit (an absent key served from the negative cache) is reported separately as +`negative_hits` and counts as neither a hit nor a miss, so it does not affect `hit_rate`. ## Cache Management @@ -185,6 +240,7 @@ The `clear_cache()` method is an async method that clears both the cache store 4. **Monitor cache statistics**: Use `cache_info()` to tune cache size and access patterns 5. **Consider data locality**: Group related data accesses together to improve cache efficiency 6. **Set appropriate expiration**: Use `max_age_seconds` for time-sensitive data or "infinity" for static data +7. **Negative caching for sparse data**: Leave `cache_missing` on (the default) to skip repeated source round-trips for absent keys; pair it with a finite `max_age_seconds` if the source may be written by another process ## Working with Different Store Types From bb0a346eea74e0b563b464e60d3fd7740f33d94c Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Thu, 25 Jun 2026 17:05:30 -0700 Subject: [PATCH 6/6] minor bug fix --- src/zarr/experimental/cache_store.py | 55 ++++---- tests/test_experimental/test_cache_store.py | 131 ++++++++++++++++++++ 2 files changed, 164 insertions(+), 22 deletions(-) diff --git a/src/zarr/experimental/cache_store.py b/src/zarr/experimental/cache_store.py index f758c0ec89..9dd224ed67 100644 --- a/src/zarr/experimental/cache_store.py +++ b/src/zarr/experimental/cache_store.py @@ -248,10 +248,10 @@ async def _record_missing(self, key: str) -> None: return # only cached values fill the budget — don't record the miss await self._evict_key(lru_absent) + # The key was popped above, so this assignment appends it as most-recent. self._state.entries[key] = _Entry( insert_time=time.monotonic(), size=_NEGATIVE_ENTRY_SIZE, present=False ) - self._state.entries.move_to_end(key) self._state.current_size += _NEGATIVE_ENTRY_SIZE def _evict_missing(self, key: str) -> None: @@ -281,7 +281,9 @@ async def _accommodate_value(self, value_size: int) -> None: while self._state.current_size + value_size > self.max_size: lru_key = self._next_eviction_candidate() if lru_key is None: - break + # Defensive: the sole caller (``_track_entry``) guarantees + # ``value_size <= max_size``, so an empty cache always has room. + break # pragma: no cover await self._evict_key(lru_key) def _next_eviction_candidate(self) -> _CacheEntryKey | None: @@ -342,17 +344,19 @@ async def _track_entry(self, entry_key: _CacheEntryKey, value: Buffer) -> bool: return False async with self._state.lock: - # If key already exists, subtract old size first (an absent slot has - # size 0, so this also cleanly upgrades a negative entry to present). - old = self._state.entries.get(entry_key) + # Pop any existing slot for this key first, reclaiming its bytes. Popping + # (rather than leaving it in place) is essential: it removes the key from + # the eviction candidates so ``_accommodate_value`` cannot select the very + # key being (re)tracked — which would double-subtract its size, stop the + # eviction loop early, and (for a present overwrite) delete the value the + # caller just wrote to the backing store. The caller has already written + # the new value, so we do not touch the backing store here. + old = self._state.entries.pop(entry_key, None) if old is not None: - self._state.current_size -= old.size + self._state.current_size = max(0, self._state.current_size - old.size) - # Make room for the new value + # Make room for the new value, then track it (appended as most-recent). await self._accommodate_value(value_size) - - # Update tracking atomically. Assigning to an existing key preserves - # its LRU position, matching the previous behaviour. self._state.entries[entry_key] = _Entry( insert_time=time.monotonic(), size=value_size, present=True ) @@ -402,11 +406,14 @@ async def _cache_miss( if byte_range is None: await self._cache.delete(key) async with self._state.lock: - self._remove_from_tracking(key) - # The key is absent in the source: remember the miss so a repeat - # read can short-circuit without a source round-trip. + # The key is absent in the source. Either remember the miss (so a + # repeat read short-circuits without a source round-trip) or just + # drop any stale tracking slot — ``_record_missing`` replaces the + # slot itself, reclaiming the bytes of any prior cached value. if self.cache_missing: await self._record_missing(key) + else: + self._remove_from_tracking(key) else: entry_key: _CacheEntryKey = (key, byte_range) async with self._state.lock: @@ -550,13 +557,17 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None: The data to store """ await super().set_if_not_exists(key, value) - # Whether or not the write happened, any negative entry is now unsafe: either - # we just wrote the key, or it already existed (so the record was already - # wrong). Evicting unconditionally is always safe. We do not populate the - # positive cache here — there is no guaranteed-fresh value to store. - if self.cache_missing: - async with self._state.lock: - self._evict_missing(key) + # Whether or not the write happened, any cached state for this key may now be + # stale (we may have just written a new value, or it already existed). Drop + # all of it — byte-range entries, any positive value, and any negative marker + # — so the next read reflects the source. Invalidating unconditionally is + # always safe. We do not populate the positive cache here: there is no + # guaranteed-fresh value to store (the write may have been a no-op). + async with self._state.lock: + self._invalidate_range_entries(key) + await self._cache.delete(key) + async with self._state.lock: + self._remove_from_tracking(key) async def delete(self, key: str) -> None: """ @@ -617,12 +628,12 @@ async def clear_cache(self) -> None: if hasattr(self._cache, "clear"): await self._cache.clear() - # Reset tracking + # Reset tracking. Cumulative performance counters (hits/misses/evictions/ + # negative_hits) are lifetime stats and intentionally survive a clear. async with self._state.lock: self._state.entries.clear() self._state.range_cache.clear() self._state.current_size = 0 - self._state.negative_hits = 0 def __repr__(self) -> str: """Return string representation of the cache store.""" diff --git a/tests/test_experimental/test_cache_store.py b/tests/test_experimental/test_cache_store.py index 353e73af82..42c84f7ca3 100644 --- a/tests/test_experimental/test_cache_store.py +++ b/tests/test_experimental/test_cache_store.py @@ -1267,3 +1267,134 @@ async def test_no_size_leak_on_miss_then_set_if_not_exists(self) -> None: assert info["missing_keys"] == 0 total = sum(entry.size for entry in cs._state.entries.values()) assert total == info["current_size"] + + async def test_stale_cached_value_becomes_negative_entry(self) -> None: + """A cached value whose source key later reads absent is replaced by a negative + marker, reclaiming the value's bytes (no leftover positive accounting).""" + from zarr.experimental.cache_store import _NEGATIVE_ENTRY_SIZE + + source = MemoryStore() + value = CPUBuffer.from_bytes(b"v" * 200) + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True, max_age_seconds=1) + proto = default_buffer_prototype() + + # Cache a present value, then delete the key from the source out-of-band. + await cs.set("k", value) + assert cs.cache_info()["current_size"] == len(value) + await source.delete("k") + # Force the cached entry stale so the next read consults the (now empty) source. + cs._state.entries["k"].insert_time = time.monotonic() - 10 + + assert await cs.get("k", proto) is None # source absent -> records a miss + info = cs.cache_info() + assert info["cached_keys"] == 0 + assert info["missing_keys"] == 1 + # The 200-byte value was reclaimed; only the marker's overhead remains. + assert info["current_size"] == _NEGATIVE_ENTRY_SIZE + + async def test_miss_not_recorded_when_budget_full_of_values(self) -> None: + """When the budget is full of cached values and no markers exist to evict, a + miss is not recorded (a marker never displaces a cached value).""" + source = MemoryStore() + value = CPUBuffer.from_bytes(b"v" * 200) + # Budget fits exactly one value, with no room for a negative marker. + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True, max_size=200) + proto = default_buffer_prototype() + + await cs.set("present", value) + assert cs.cache_info()["cached_keys"] == 1 + + assert await cs.get("absent", proto) is None + info = cs.cache_info() + assert info["missing_keys"] == 0 # no room -> miss not remembered + assert info["cached_keys"] == 1 # cached value untouched + assert info["current_size"] == len(value) + + async def test_caching_value_evicts_absent_markers(self) -> None: + """Caching a present value reclaims room by evicting negative markers first.""" + from zarr.experimental.cache_store import _NEGATIVE_ENTRY_SIZE + + source = MemoryStore() + cs = CacheStore( + source, + cache_store=MemoryStore(), + cache_missing=True, + max_size=3 * _NEGATIVE_ENTRY_SIZE, + ) + proto = default_buffer_prototype() + + # Fill the budget with three negative markers. + for i in range(3): + assert await cs.get(f"absent/{i}", proto) is None + assert cs.cache_info()["missing_keys"] == 3 + + # Caching a value must evict marker(s) to fit — markers go before any value. + await cs.set("v", CPUBuffer.from_bytes(b"v" * 100)) + info = cs.cache_info() + assert info["cached_keys"] == 1 + assert info["missing_keys"] < 3 # at least one marker evicted to make room + assert info["current_size"] <= cs.max_size + + async def test_upgrade_marker_to_value_under_pressure_evicts_other_entry(self) -> None: + """Upgrading a stale negative marker to a cached value under memory pressure must + evict a *different* entry, not self-evict (which would under-count current_size + and breach max_size).""" + from zarr.experimental.cache_store import _NEGATIVE_ENTRY_SIZE + + source = MemoryStore() + cs = CacheStore( + source, + cache_store=MemoryStore(), + cache_missing=True, + max_age_seconds=1000, + max_size=300, + ) + proto = default_buffer_prototype() + + # One cached value "a" (128 B) and one negative marker "k" (128 B) → 256 B used. + await cs.set("a", CPUBuffer.from_bytes(b"a" * (_NEGATIVE_ENTRY_SIZE))) + assert await cs.get("k", proto) is None + assert cs.cache_info()["current_size"] == 2 * _NEGATIVE_ENTRY_SIZE + + # "k" now exists in the source with a 200 B value; force the marker stale so the + # next read fetches it and upgrades the slot to a present value (needs eviction). + await source.set("k", CPUBuffer.from_bytes(b"k" * 200)) + cs._state.entries["k"].insert_time = time.monotonic() - 5000 + + result = await cs.get("k", proto) + assert result is not None + assert result.to_bytes() == b"k" * 200 + + info = cs.cache_info() + # Only "k" remains (the other value "a" was evicted to make room); the bound + # holds and the size accounting matches the actual tracked entries exactly. + assert info["cached_keys"] == 1 + assert "a" not in cs._state.entries + assert not await cs._cache.exists("a") + assert info["current_size"] == 200 + assert info["current_size"] <= cs.max_size + total = sum(entry.size for entry in cs._state.entries.values()) + assert total == info["current_size"] + + async def test_set_if_not_exists_invalidates_stale_byte_range(self) -> None: + """``set_if_not_exists`` must invalidate cached byte-range entries, not just the + negative marker, so a later byte-range read does not return stale bytes.""" + source = MemoryStore() + cs = CacheStore(source, cache_store=MemoryStore(), cache_missing=True) + proto = default_buffer_prototype() + + await source.set("k", CPUBuffer.from_bytes(b"old data!!")) + r1 = await cs.get("k", proto, byte_range=RangeByteRequest(0, 3)) + assert r1 is not None + assert r1.to_bytes() == b"old" + assert ("k", RangeByteRequest(0, 3)) in cs._state.entries + + # Source key removed out-of-band, then re-created via set_if_not_exists. + await source.delete("k") + await cs.set_if_not_exists("k", CPUBuffer.from_bytes(b"NEW data!!")) + + # The stale byte-range entry must be gone, and a fresh read returns new bytes. + assert ("k", RangeByteRequest(0, 3)) not in cs._state.entries + r2 = await cs.get("k", proto, byte_range=RangeByteRequest(0, 3)) + assert r2 is not None + assert r2.to_bytes() == b"NEW"