diff --git a/CHANGELOG.md b/CHANGELOG.md
index e7ab498..cd9d839 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -9,6 +9,34 @@ breaking changes and the discipline is still being shaped.
## [Unreleased]
+**Added.**
+
+- *A `--preopen
[:ro|:rw]` flag for the experimental `--wasi` mode
+ unblocks DYNAMIC (non-literal) `Fs` paths.* Until now a `Fs` path that
+ the compiler cannot prove is a string literal (one taken from a
+ parameter, `env.args()`, or any computed value) was REJECTED at compile
+ time under `--wasi`, because no static preopen ceiling could be derived
+ for it. `--preopen` lets the OPERATOR explicitly declare filesystem
+ authority over a single directory; the compiler then admits the dynamic
+ path and the guest resolves it AT RUNTIME relative to that directory
+ (the WASI `--dir` model, as in wasmtime). This is framed honestly as a
+ LEVEL-2 operator-DECLARED grant (analogous to `inherit_env`), NOT
+ program-proven authority: the compiler could not derive it, which is
+ precisely why the operator had to declare it. The grant is recorded in
+ the SBOM (manifest, CycloneDX, SPDX) under a dedicated
+ `operator_declared_grants` block, clearly labelled `operator-declared`
+ and kept DISTINCT from the compiler-derived capability surface so a
+ regulator never reads it as program-proven. Read / write / exists /
+ is_dir / mkdir / list_dir all work with a dynamic path under
+ `--preopen`, with byte-for-byte parity across the Python, `capa:host`
+ and WASI backends, and the guest-side fine attenuation (`restrict_to` /
+ `allows`) still gates the dynamic path lexically. WITHOUT `--preopen`,
+ a dynamic `Fs` path continues to be rejected at compile time exactly as
+ before (no regression); literal paths continue to resolve via the
+ compiler-derived ceiling. This increment supports a SINGLE `--preopen`
+ for dynamic-path resolution; passing more than one is rejected with a
+ clear message.
+
**Changed.**
- *In the experimental `--wasi` mode, a dynamic (non-literal) URL passed
@@ -30,6 +58,32 @@ breaking changes and the discipline is still being shaped.
stays at Level 2 `inherit_env` on a dynamic key and is intentionally not
aligned with this fail-closed rule).
+**Fixed.**
+
+- *In the experimental `--wasi` mode, the guest-side fine attenuation gate
+ (`restrict_to` / `allows`) now lexically normalises `.` and `..` path
+ segments before its containment check, closing a bypass on a dynamic
+ path.* Previously the gate did a PURELY lexical prefix comparison: a
+ dynamic path such as `sub/../secret.txt` (reachable since `--preopen`
+ began admitting dynamic `Fs` paths) starts lexically with the allowed
+ prefix `sub/`, so it PASSED the gate and read a sibling OUTSIDE the
+ `restrict_to("sub")` subtree, while the Python oracle (which
+ canonicalises with `os.path.realpath`) correctly DENIED it. The gate now
+ normalises `.`/`..` in both the path and the stored prefixes first
+ (`$__fs_normalize`, an `os.path.normpath`-style collapse that preserves
+ a leading `..` so an escape stays an escape), restoring byte-for-byte
+ three-backend parity (Python oracle == `capa:host` == WASI): `sub/ok.txt`
+ is admitted, `sub/../secret.txt` and `sub/../sub2/x.txt` are denied, and
+ `sub/../sub/ok.txt` (which normalises back inside) is admitted. SYMLINKS
+ are still not resolved by the lexical gate -- that remains the documented
+ Level-2 loss, now the ONLY divergence from the realpath oracle (`.`/`..`
+ are handled). The Level-1 preopen ceiling (enforced by wasmtime) is
+ unchanged and still confines an unrestricted `Fs` to the granted
+ directory regardless of `..`. A program that MIXES a literal `Fs` path
+ and a dynamic one under `--preopen` still fails closed (layer b1 does not
+ yet support mixing), now with a clear message that names the limitation
+ and the flag instead of an internal "no closed preopen ceiling" wording.
+
## [1.14.0], 2026-06-29
**Capa 1.14.0.** A MINOR release: an experimental, opt-in `--wasi` mode
diff --git a/capa/cli.py b/capa/cli.py
index e89ef2f..41bc7dd 100644
--- a/capa/cli.py
+++ b/capa/cli.py
@@ -24,6 +24,7 @@
from capa import __version__ as _CAPA_VERSION
from capa.manifest import (
build_manifest, build_cyclonedx, build_spdx,
+ build_operator_declared_grants,
build_vex_document, build_provenance,
resolve_build_timestamp, SourceDateEpochError,
)
@@ -1062,6 +1063,23 @@ def _main_dispatch() -> int:
"The default capa:host path is unaffected."
),
)
+ parser.add_argument(
+ "--preopen",
+ action="append",
+ default=None,
+ metavar="[:ro|:rw]",
+ help=(
+ "with --wasi, grant the component filesystem authority over "
+ " as an OPERATOR-DECLARED preopen (Level 2, the WASI "
+ "--dir model), unblocking DYNAMIC (non-literal) Fs paths that "
+ "the compiler cannot derive a preopen for. The path is "
+ "resolved at runtime relative to . Append ':ro' for "
+ "read-only or ':rw' for read-write (default: rw). Recorded in "
+ "the SBOM as a declared grant, distinct from the "
+ "compiler-derived capability surface. This increment (b1) "
+ "supports a SINGLE --preopen for dynamic paths."
+ ),
+ )
parser.add_argument(
"--wasm-memory-cap",
type=int,
@@ -1276,11 +1294,18 @@ def _main_dispatch() -> int:
else:
print(msg, file=sys.stderr)
return 1
+ # WASI Fs layer b1: the operator-declared grant block (--preopen),
+ # surfaced in the manifest / CycloneDX / SPDX as Level-2
+ # operator-declared authority, distinct from the derived surface.
+ _operator_grants = _operator_grants_from_args(
+ getattr(args, "preopen", None)
+ )
if args.manifest:
import json
manifest = build_manifest(
module, filename=filename,
expr_labels=result.expr_labels,
+ operator_declared_grants=_operator_grants,
)
emit_artifact(json.dumps(manifest, indent=2))
return 0
@@ -1308,6 +1333,7 @@ def _main_dispatch() -> int:
sources=linked.sources if linked is not None else None,
timestamp=build_ts,
expr_labels=result.expr_labels,
+ operator_declared_grants=_operator_grants,
)
emit_artifact(json.dumps(sbom, indent=2))
return 0
@@ -1318,6 +1344,7 @@ def _main_dispatch() -> int:
sources=linked.sources if linked is not None else None,
timestamp=build_ts,
expr_labels=result.expr_labels,
+ operator_declared_grants=_operator_grants,
)
emit_artifact(json.dumps(sbom, indent=2))
return 0
@@ -1415,6 +1442,31 @@ def _main_dispatch() -> int:
print(msg, file=sys.stderr)
return 1
+ # ``--preopen`` (layer b1) is meaningful in --wasi mode (the
+ # operator-declared filesystem grant that unblocks dynamic Fs paths)
+ # AND when emitting an SBOM / manifest (it records the same grant as
+ # operator-declared authority, distinct from the derived surface).
+ # Reject it on any OTHER invocation with an actionable message rather
+ # than silently ignore it.
+ _emitting_sbom = bool(
+ getattr(args, "manifest", False) or getattr(args, "cyclonedx", False)
+ or getattr(args, "spdx", False)
+ )
+ if (getattr(args, "preopen", None)
+ and not bool(getattr(args, "wasi", False))
+ and not _emitting_sbom):
+ msg = (
+ "capa: --preopen requires --wasi (or an SBOM / --manifest "
+ "command): it is the operator-declared filesystem grant for "
+ "the WASI mode, recorded in the SBOM; it has no effect on the "
+ "default execution backend"
+ )
+ if use_color:
+ print(f"{C.RED}{msg}{C.RESET}", file=sys.stderr)
+ else:
+ print(msg, file=sys.stderr)
+ return 1
+
if (
args.run and not args.wasm and prefer_wasm
and _wasm_tooling_available()
@@ -1463,6 +1515,29 @@ def _main_dispatch() -> int:
else:
print(msg, file=sys.stderr)
return 1
+ # WASI Fs layer b1: parse the operator ``--preopen``. b1 supports a
+ # SINGLE preopen for dynamic-path resolution; reject more than one
+ # with a clear message rather than silently picking one. The
+ # presence of a preopen is the signal (``wasi_dynamic_fs``) that
+ # suppresses the compiler's dynamic-Fs-path rejection, and the
+ # parsed ``(host_dir, read_write)`` is the host grant.
+ fs_operator_preopen = None
+ wasi_dynamic_fs = False
+ preopen_specs = getattr(args, "preopen", None) or []
+ if preopen_specs:
+ if len(preopen_specs) > 1:
+ msg = (
+ "capa: --preopen: this increment (b1) supports a "
+ "single --preopen for dynamic Fs paths; got "
+ f"{len(preopen_specs)}"
+ )
+ if use_color:
+ print(f"{C.RED}{msg}{C.RESET}", file=sys.stderr)
+ else:
+ print(msg, file=sys.stderr)
+ return 1
+ fs_operator_preopen = _parse_preopen_spec(preopen_specs[0])
+ wasi_dynamic_fs = True
if result is None:
result = analyze(module, source=source, filename=filename)
try:
@@ -1472,6 +1547,7 @@ def _main_dispatch() -> int:
memory_cap_pages=wasm_memory_cap,
filename=filename,
wasi=wasi_mode,
+ wasi_dynamic_fs=wasi_dynamic_fs,
)
print(wat)
return 0
@@ -1480,6 +1556,7 @@ def _main_dispatch() -> int:
memory_cap_pages=wasm_memory_cap,
filename=filename,
wasi=wasi_mode,
+ wasi_dynamic_fs=wasi_dynamic_fs,
)
except Exception as e:
msg = f"capa: --wasm: {e}"
@@ -1589,6 +1666,7 @@ def _main_dispatch() -> int:
wasi=wasi_mode,
env_ceiling=env_ceiling,
fs_ceiling=fs_ceiling,
+ fs_operator_preopen=fs_operator_preopen,
net_ceiling=net_ceiling,
)
host.run_main(component_blob)
@@ -1763,6 +1841,48 @@ def _main_dispatch() -> int:
return 0
+def _parse_preopen_spec(spec: str) -> tuple[str, bool]:
+ """Parse one ``--preopen`` value ``[:ro|:rw]`` into
+ ``(host_dir, read_write)``.
+
+ The default permission is READ_WRITE (``rw``), the WASI ``--dir``
+ default; an explicit ``:ro`` suffix makes it READ_ONLY and ``:rw`` is
+ READ_WRITE. Only a trailing ``:ro`` / ``:rw`` is treated as a
+ permission suffix, so a directory name that itself contains a colon
+ (or a Windows drive ``C:\\...``) is preserved -- the split is on the
+ LAST ``:`` and only when the tail is exactly ``ro`` / ``rw``."""
+ read_write = True
+ host_dir = spec
+ if ":" in spec:
+ head, _, tail = spec.rpartition(":")
+ if tail in ("ro", "rw") and head:
+ host_dir = head
+ read_write = tail == "rw"
+ return (host_dir, read_write)
+
+
+def _operator_grants_from_args(preopen_specs) -> dict | None:
+ """Build the SBOM ``operator_declared_grants`` block from the
+ ``--preopen`` specs, or None when none were declared.
+
+ Each spec ``[:ro|:rw]`` becomes a preopen entry; the block is
+ honestly labelled operator-declared (Level 2) by
+ :func:`capa.manifest.build_operator_declared_grants`, distinct from
+ the compiler-derived surface."""
+ specs = preopen_specs or []
+ if not specs:
+ return None
+ preopens = []
+ for spec in specs:
+ host_dir, read_write = _parse_preopen_spec(spec)
+ preopens.append({
+ "kind": "fs",
+ "host_dir": host_dir,
+ "permission": "rw" if read_write else "ro",
+ })
+ return build_operator_declared_grants(preopens)
+
+
def _wrap_as_component(
core_wasm: bytes, wit_text: str, *, wasi: bool = False,
) -> bytes:
diff --git a/capa/ir/__init__.py b/capa/ir/__init__.py
index adee5ea..15f6654 100644
--- a/capa/ir/__init__.py
+++ b/capa/ir/__init__.py
@@ -152,6 +152,7 @@ def emit_wat(
memory_cap_pages: int | None = ..., # type: ignore[assignment]
manifest_json: str | None = None,
wasi: bool = False,
+ wasi_dynamic_fs: bool = False,
) -> str:
"""Emit WebAssembly text format (WAT) from a CIR module.
@@ -178,6 +179,7 @@ def emit_wat(
memory_cap_pages=memory_cap_pages,
manifest_json=manifest_json,
wasi=wasi,
+ wasi_dynamic_fs=wasi_dynamic_fs,
).emit(ir_module)
@@ -189,6 +191,7 @@ def compile_wat(
filename: str = "",
embed_manifest: bool = True,
wasi: bool = False,
+ wasi_dynamic_fs: bool = False,
) -> str:
"""End-to-end AST -> CIR -> WAT convenience helper. Mirrors
:func:`compile` but targets the Wasm Component Model text form
@@ -244,6 +247,7 @@ def compile_wat(
memory_cap_pages=memory_cap_pages,
manifest_json=manifest_json,
wasi=wasi,
+ wasi_dynamic_fs=wasi_dynamic_fs,
)
@@ -329,6 +333,7 @@ def compile_wasm(
filename: str = "",
embed_manifest: bool = True,
wasi: bool = False,
+ wasi_dynamic_fs: bool = False,
) -> bytes:
"""End-to-end AST -> CIR -> WAT -> binary Wasm assembly.
@@ -350,6 +355,7 @@ def compile_wasm(
filename=filename,
embed_manifest=embed_manifest,
wasi=wasi,
+ wasi_dynamic_fs=wasi_dynamic_fs,
)
proc = subprocess.run(
[wasm_tools_path, "parse", "-"],
diff --git a/capa/ir/_emit_wasm/__init__.py b/capa/ir/_emit_wasm/__init__.py
index c61664b..ede3fc6 100644
--- a/capa/ir/_emit_wasm/__init__.py
+++ b/capa/ir/_emit_wasm/__init__.py
@@ -201,6 +201,7 @@ def __init__(
memory_cap_pages: Optional[int] = MEMORY_CAP_DEFAULT_PAGES,
manifest_json: Optional[str] = None,
wasi: bool = False,
+ wasi_dynamic_fs: bool = False,
):
# Experimental opt-in (2026-06-27): when True, Random.system_seed
# and Clock.now_secs / now_monotonic import canonical WASI
@@ -211,6 +212,29 @@ def __init__(
# untouched all-``capa:host`` behaviour. See
# ``docs/design/wasi_mode.md``.
self._wasi: bool = wasi
+ # WASI Fs layer b1 (operator preopen, 2026-06-30): True when the
+ # operator declared ``--preopen `` for this run, granting the
+ # component filesystem authority over that directory and so
+ # UNBLOCKING dynamic (non-literal) Fs paths under ``--wasi``. A
+ # dynamic path is resolved at RUNTIME relative to the single
+ # operator preopen (the WASI ``--dir`` model, wasmtime's
+ # convention), framed honestly as a LEVEL-2 operator-DECLARED
+ # grant (see ``docs/design/wasi-attenuation.md``), distinct from
+ # the COMPILER-DERIVED preopen ceiling. When False (the default),
+ # a dynamic Fs path is REJECTED at compile time exactly as before
+ # -- this flag is the ONLY thing that suppresses that rejection.
+ #
+ # b1 INDEX RULE (emitter <-> host agreement): the operator preopen
+ # is the LAST preopen the host registers, AFTER every
+ # compiler-derived ceiling preopen, so it never shifts an existing
+ # literal call site's index. In the dynamic case the derived
+ # ceiling is NOT closed and so contributes NO preopens, leaving
+ # the operator preopen at index 0; the dynamic call-site emitter
+ # therefore addresses it with the constant
+ # ``_wasi_operator_preopen_index`` (0 whenever the ceiling is open,
+ # i.e. exactly the dynamic case). The host computes the same index
+ # (len(derived preopens)) so the two never disagree.
+ self._wasi_dynamic_fs: bool = wasi_dynamic_fs
self._lines: List[str] = []
self._indent = 0
self._unit = indent_unit
@@ -311,6 +335,27 @@ def __init__(
# chain's result areas), 0 when Net.get is not used.
self._wasi_net_scratch_offset = 0
+ # ----- WASI operator-preopen (layer b1) ----------------------
+
+ def _wasi_operator_preopen_index(self) -> int:
+ """The preopen INDEX the operator ``--preopen`` directory occupies
+ on the host, for the dynamic-Fs-path call-site emitter to address.
+
+ b1 index rule: the host registers the operator preopen AFTER every
+ compiler-derived ceiling preopen, so its index is the number of
+ derived preopens. A dynamic Fs path (the only thing that reaches
+ the operator preopen) requires a NOT-CLOSED ceiling, which
+ contributes NO derived preopens, so this is 0 in the dynamic case.
+ For a fully-literal program (closed ceiling) the operator preopen
+ sits at ``len(ceiling.preopens)`` and is unused by the guest (no
+ dynamic call site), but still registered + recorded for honesty;
+ the constant returned here matches the host's registration order
+ either way."""
+ ceiling = self._fs_ceiling
+ if ceiling is None or not getattr(ceiling, "closed", False):
+ return 0
+ return len(ceiling.preopens)
+
# ----- public ------------------------------------------------
def emit(self, module: Module) -> str:
@@ -1111,6 +1156,7 @@ def emit(self, module: Module) -> str:
or self._wasi_env_uses_get_or_args()
or self._wasi_net_uses_attenuation()
or self._wasi_fs_uses_preopens
+ or self._wasi_fs_uses_attenuation()
or (self._wasi and ("Stdio", "read_line") in self._used_caps)
):
heap_start = _align_up(self._string_data_offset, 8)
diff --git a/capa/ir/_emit_wasm/_caps.py b/capa/ir/_emit_wasm/_caps.py
index 23d5e9c..e7e7cca 100644
--- a/capa/ir/_emit_wasm/_caps.py
+++ b/capa/ir/_emit_wasm/_caps.py
@@ -117,6 +117,37 @@
class _CapDispatchMixin:
+ def _wasi_fs_no_ceiling_error(self) -> "WasmEmissionError":
+ """The error raised when a LITERAL Fs path reaches a call site but
+ the static preopen ceiling is not closed.
+
+ With ``--preopen`` (``self._wasi_dynamic_fs``) this is reached ONLY
+ for the literal path of a program that ALSO passes a DYNAMIC path
+ to an Fs op: the dynamic path opens the ceiling, so the literal can
+ no longer be resolved to a derived preopen index. Layer b1 does not
+ yet support MIXING literal and dynamic Fs paths in one program;
+ fail closed with a message that names the limitation and the flag,
+ rather than the internal ceiling wording. Without ``--preopen`` the
+ generic no-ceiling message stands."""
+ if self._wasi_dynamic_fs:
+ return WasmEmissionError(
+ "WASI --preopen mode (Fs layer b1) does not yet support "
+ "MIXING a string-literal Fs path and a dynamic Fs path in "
+ "the same program: the dynamic path opens the static "
+ "preopen ceiling, so the literal path can no longer be "
+ "resolved to a derived preopen (fail-closed). Use only "
+ "dynamic paths (resolved against the single --preopen "
+ "directory), or only literal paths (drop --preopen), or "
+ "fall back to the default capa:host backend (drop --wasi)."
+ )
+ return WasmEmissionError(
+ "Fs in WASI mode requires a closed static preopen ceiling "
+ "(every filesystem path a string literal); this program has "
+ "no closed ceiling, so no preopen can be derived (fail-closed). "
+ "Grant a directory with --preopen to admit dynamic "
+ "paths, or use the default capa:host backend (drop --wasi)."
+ )
+
def _cap_method_wasm_sig(
self, cap: str, method: str,
) -> tuple[list[str], str]:
@@ -647,18 +678,22 @@ def _emit_wasi_fs_metadata_call(
)
arg = instr.args[0]
if arg.kind != "lit_str" or not isinstance(arg.literal, str):
- # Defensive: the ceiling fail-closed check should have
- # rejected this already, but never emit a wrapper call with
- # an unresolved path.
+ # WASI Fs layer b1 (operator preopen): a DYNAMIC path is
+ # admitted when the operator declared ``--preopen`` -- it is
+ # resolved at RUNTIME relative to the single operator preopen.
+ if self._wasi_dynamic_fs:
+ self._emit_wasi_fs_dynamic_metadata_call(instr, method)
+ return
+ # Defensive: without an operator preopen the ceiling
+ # fail-closed check should have rejected this already, but
+ # never emit a wrapper call with an unresolved path.
raise WasmEmissionError(
f"Fs.{method} in WASI mode requires a string-literal "
f"path (the preopen ceiling must be closed)"
)
ceiling = self._fs_ceiling
if ceiling is None or not ceiling.closed:
- raise WasmEmissionError(
- "Fs in WASI mode has no closed preopen ceiling"
- )
+ raise self._wasi_fs_no_ceiling_error()
idx, rel = resolve_fs_call(ceiling, arg.literal)
# The FULL original literal path is interned for the guest-side
# fail-closed attenuation gate (``$Fs_path_allowed``): the
@@ -753,15 +788,18 @@ def _emit_wasi_fs_read_call(self, instr: MethodCall) -> None:
)
arg = instr.args[0]
if arg.kind != "lit_str" or not isinstance(arg.literal, str):
+ # WASI Fs layer b1 (operator preopen): a DYNAMIC path resolves
+ # at runtime relative to the single operator ``--preopen`` dir.
+ if self._wasi_dynamic_fs:
+ self._emit_wasi_fs_dynamic_read_call(instr)
+ return
raise WasmEmissionError(
"Fs.read in WASI mode requires a string-literal path "
"(the preopen ceiling must be closed)"
)
ceiling = self._fs_ceiling
if ceiling is None or not ceiling.closed:
- raise WasmEmissionError(
- "Fs in WASI mode has no closed preopen ceiling"
- )
+ raise self._wasi_fs_no_ceiling_error()
idx, rel = resolve_fs_call(ceiling, arg.literal)
rel_off, rel_len = self._intern_string(rel)
# Full original literal + receiver handle for the guest-side
@@ -813,15 +851,18 @@ def _emit_wasi_fs_write_call(self, instr: MethodCall) -> None:
)
arg = instr.args[0]
if arg.kind != "lit_str" or not isinstance(arg.literal, str):
+ # WASI Fs layer b1 (operator preopen): a DYNAMIC path resolves
+ # at runtime relative to the single operator ``--preopen`` dir.
+ if self._wasi_dynamic_fs:
+ self._emit_wasi_fs_dynamic_write_call(instr)
+ return
raise WasmEmissionError(
"Fs.write in WASI mode requires a string-literal path "
"(the preopen ceiling must be closed)"
)
ceiling = self._fs_ceiling
if ceiling is None or not ceiling.closed:
- raise WasmEmissionError(
- "Fs in WASI mode has no closed preopen ceiling"
- )
+ raise self._wasi_fs_no_ceiling_error()
idx, rel = resolve_fs_call(ceiling, arg.literal)
rel_off, rel_len = self._intern_string(rel)
# Full original literal + receiver handle for the guest-side
@@ -871,15 +912,18 @@ def _emit_wasi_fs_list_dir_call(self, instr: MethodCall) -> None:
)
arg = instr.args[0]
if arg.kind != "lit_str" or not isinstance(arg.literal, str):
+ # WASI Fs layer b1 (operator preopen): a DYNAMIC path resolves
+ # at runtime relative to the single operator ``--preopen`` dir.
+ if self._wasi_dynamic_fs:
+ self._emit_wasi_fs_dynamic_list_dir_call(instr)
+ return
raise WasmEmissionError(
"Fs.list_dir in WASI mode requires a string-literal path "
"(the preopen ceiling must be closed)"
)
ceiling = self._fs_ceiling
if ceiling is None or not ceiling.closed:
- raise WasmEmissionError(
- "Fs in WASI mode has no closed preopen ceiling"
- )
+ raise self._wasi_fs_no_ceiling_error()
idx, rel = resolve_fs_call(ceiling, arg.literal)
rel_off, rel_len = self._intern_string(rel)
# Full original literal + receiver handle for the guest-side
@@ -900,6 +944,156 @@ def _emit_wasi_fs_list_dir_call(self, instr: MethodCall) -> None:
"result_list_string_io_error", instr.dst,
)
+ # ---- WASI Fs layer b1: DYNAMIC path call sites --------------
+ #
+ # A dynamic (non-literal) Fs path is admitted ONLY when the operator
+ # declared ``--preopen `` (``self._wasi_dynamic_fs``). The path
+ # is NOT resolvable at compile time, so the call site addresses the
+ # single operator preopen (``_wasi_operator_preopen_index()``) and
+ # hands the path's runtime ``(ptr, len)`` to the wrapper as BOTH the
+ # FULL path (for the guest-side fail-closed attenuation gate
+ # ``$Fs_path_allowed``, against which a restricted Fs's prefixes are
+ # compared) AND the RELATIVE path (wasmtime resolves it relative to
+ # the operator preopen descriptor). The wrappers are UNCHANGED -- only
+ # the operands differ (runtime ``(ptr, len)`` + ``idx`` const, the
+ # Fs.write content-arg push pattern), so there is ZERO new WAT here.
+
+ def _push_wasi_dynamic_fs_path(self, arg) -> None:
+ """Push a DYNAMIC Fs path argument's ``(ptr, len)`` for a layer-b1
+ call site. The path must be a String LOCAL or PARAM (the lowerer
+ flattens an Fs path argument to a local / param before the
+ MethodCall, so a side-effecting re-evaluation never occurs); a
+ bare ``(local.get _ptr; local.get _len)`` pair is emitted, safe to
+ repeat for the full + relative operands. Anything else is a shape
+ the b1 increment does not handle and is rejected loudly."""
+ is_string_local = (
+ arg.kind == "local" and self._is_string_local(arg.name)
+ )
+ is_string_param = (
+ arg.kind == "param" and self._param_is_string(arg.name)
+ )
+ if not (is_string_local or is_string_param):
+ raise WasmEmissionError(
+ "Fs dynamic path under --preopen must be a String local "
+ f"or param (b1), got {arg.kind!r}"
+ )
+ self._push_string_arg(arg)
+
+ def _emit_wasi_fs_dynamic_metadata_call(
+ self, instr: MethodCall, method: str,
+ ) -> None:
+ """Dynamic-path ``fs.exists / is_dir / mkdir`` under ``--preopen``.
+
+ Mirrors ``_emit_wasi_fs_metadata_call`` but with the operator
+ preopen index + the runtime path (ptr, len). ``mkdir`` keeps full
+ ``os.makedirs(exist_ok=True)`` parity: the dynamic relative path
+ may be multi-segment and its segments are not known at compile
+ time, so it routes to the runtime recursive sequencer
+ ``$Fs_mkdir_recursive`` (which walks the path's ``/`` boundaries
+ and calls the single-segment ``$Fs_mkdir`` per cumulative prefix,
+ short-circuiting on a genuine Err) -- byte-parity with the literal
+ path's compile-time prefix unrolling and with the oracle."""
+ if len(instr.args) != 1:
+ raise WasmEmissionError(
+ f"Fs.{method} expected 1 arg, got {len(instr.args)}"
+ )
+ arg = instr.args[0]
+ idx = self._wasi_operator_preopen_index()
+ if method == "mkdir":
+ self._write("i32.const 20")
+ self._write("call $alloc")
+ self._write("local.set $_ret_area")
+ self._push_fs_handle(instr.receiver)
+ self._push_wasi_dynamic_fs_path(arg) # full (ptr, len)
+ self._write(f"i32.const {idx}")
+ self._push_wasi_dynamic_fs_path(arg) # rel (ptr, len) == full
+ self._write("local.get $_ret_area")
+ self._write("call $Fs_mkdir_recursive")
+ self._emit_cap_indirect_materialise(
+ "result_unit_io_error", instr.dst,
+ )
+ return
+ self._push_fs_handle(instr.receiver)
+ self._push_wasi_dynamic_fs_path(arg) # full (ptr, len)
+ self._write(f"i32.const {idx}")
+ self._push_wasi_dynamic_fs_path(arg) # rel (ptr, len) == full
+ self._write(f"call $Fs_{method}")
+ if instr.dst is not None:
+ self._write(f"local.set ${instr.dst}")
+
+ def _emit_wasi_fs_dynamic_read_call(self, instr: MethodCall) -> None:
+ """Dynamic-path ``fs.read`` under ``--preopen``. Mirrors
+ ``_emit_wasi_fs_read_call`` with the operator preopen index + the
+ runtime path (ptr, len)."""
+ if len(instr.args) != 1:
+ raise WasmEmissionError(
+ f"Fs.read expected 1 arg, got {len(instr.args)}"
+ )
+ arg = instr.args[0]
+ idx = self._wasi_operator_preopen_index()
+ self._write("i32.const 20")
+ self._write("call $alloc")
+ self._write("local.set $_ret_area")
+ self._push_fs_handle(instr.receiver)
+ self._push_wasi_dynamic_fs_path(arg)
+ self._write(f"i32.const {idx}")
+ self._push_wasi_dynamic_fs_path(arg)
+ self._write("local.get $_ret_area")
+ self._write("call $Fs_read")
+ self._emit_cap_indirect_materialise(
+ "result_string_io_error", instr.dst,
+ )
+
+ def _emit_wasi_fs_dynamic_write_call(self, instr: MethodCall) -> None:
+ """Dynamic-path ``fs.write(path, content)`` under ``--preopen``.
+ The PATH (arg[0]) is the dynamic runtime (ptr, len); the CONTENT
+ (arg[1]) is any String pushed the usual way (its bytes already
+ live in linear memory). Mirrors ``_emit_wasi_fs_write_call``."""
+ if len(instr.args) != 2:
+ raise WasmEmissionError(
+ f"Fs.write expected 2 args (path, content), got "
+ f"{len(instr.args)}"
+ )
+ arg = instr.args[0]
+ idx = self._wasi_operator_preopen_index()
+ self._write("i32.const 20")
+ self._write("call $alloc")
+ self._write("local.set $_ret_area")
+ self._push_fs_handle(instr.receiver)
+ self._push_wasi_dynamic_fs_path(arg)
+ self._write(f"i32.const {idx}")
+ self._push_wasi_dynamic_fs_path(arg)
+ # content (ptr, len) - already in linear memory.
+ self._push_string_arg(instr.args[1])
+ self._write("local.get $_ret_area")
+ self._write("call $Fs_write")
+ self._emit_cap_indirect_materialise(
+ "result_unit_io_error", instr.dst,
+ )
+
+ def _emit_wasi_fs_dynamic_list_dir_call(self, instr: MethodCall) -> None:
+ """Dynamic-path ``fs.list_dir`` under ``--preopen``. Mirrors
+ ``_emit_wasi_fs_list_dir_call`` with the operator preopen index +
+ the runtime path (ptr, len)."""
+ if len(instr.args) != 1:
+ raise WasmEmissionError(
+ f"Fs.list_dir expected 1 arg, got {len(instr.args)}"
+ )
+ arg = instr.args[0]
+ idx = self._wasi_operator_preopen_index()
+ self._write("i32.const 20")
+ self._write("call $alloc")
+ self._write("local.set $_ret_area")
+ self._push_fs_handle(instr.receiver)
+ self._push_wasi_dynamic_fs_path(arg)
+ self._write(f"i32.const {idx}")
+ self._push_wasi_dynamic_fs_path(arg)
+ self._write("local.get $_ret_area")
+ self._write("call $Fs_list_dir")
+ self._emit_cap_indirect_materialise(
+ "result_list_string_io_error", instr.dst,
+ )
+
# ---- slice 25.3 Net handle-passing helpers -----------------
def _push_net_handle(self, recv) -> None:
diff --git a/capa/ir/_emit_wasm/_wasi.py b/capa/ir/_emit_wasm/_wasi.py
index 020471b..04c2906 100644
--- a/capa/ir/_emit_wasm/_wasi.py
+++ b/capa/ir/_emit_wasm/_wasi.py
@@ -188,8 +188,9 @@
("Fs", "list_dir"),
# Fs FINE ATTENUATION (2026-06-28): restrict_to / allows are
# implemented GUEST-SIDE (Level 2 of docs/design/wasi-attenuation.md),
- # analogous to Env's restrict_to_keys / allows but with LEXICAL path
- # containment in place of key equality. No capa:host/fs import: their
+ # analogous to Env's restrict_to_keys / allows but with path-prefix
+ # containment (with lexical ``.``/``..`` normalisation; symlinks
+ # unresolved) in place of key equality. No capa:host/fs import: their
# ``$Fs_restrict_to`` / ``$Fs_allows`` bindings are emitted as guest
# WAT wrappers by ``_emit_wasi_wrappers``. Listed here so the import
# loop does NOT try to emit a capa:host/fs import for them (the host
@@ -343,9 +344,10 @@ def _validate_wasi_caps(self) -> None:
read-directory enumeration -> guest-side sort) over
wasi:io/streams, AND the fine-grained attenuators ``restrict_to``
/ ``allows`` implemented GUEST-SIDE (Level 2 of
- ``docs/design/wasi-attenuation.md``), with LEXICAL path
- containment in place of the oracle's realpath (the honest TOCTOU
- / symlink loss documented there). No Fs method is rejected here
+ ``docs/design/wasi-attenuation.md``), with path-prefix containment
+ that lexically normalises ``.``/``..`` (oracle parity for those)
+ but does NOT resolve symlinks (the honest TOCTOU / symlink loss
+ documented there). No Fs method is rejected here
(``_WASI_FS_REJECTED`` is now empty); the fail-closed preopen
ceiling obligation below still applies to any op that touches the
filesystem.
@@ -401,10 +403,21 @@ def _validate_wasi_caps(self) -> None:
# preopen to address and the wrapper cannot run. Reject at
# compile time with a clear message rather than emit code that
# always denies at runtime.
+ #
+ # WASI Fs layer b1 (operator preopen, 2026-06-30): when the
+ # operator declared ``--preopen `` (``self._wasi_dynamic_fs``),
+ # the dynamic path is RESOLVED AT RUNTIME relative to that single
+ # operator preopen (the WASI ``--dir`` model). The rejection is
+ # SUPPRESSED -- the operator has explicitly granted the authority
+ # the compiler could not derive, a LEVEL-2 operator-DECLARED grant
+ # (recorded in the SBOM, distinct from the derived surface). Without
+ # ``--preopen`` the rejection stands exactly as before (the prior
+ # behaviour is intentionally preserved).
if any(
cap == "Fs" and method in (_WASI_FS_METADATA | _WASI_FS_STREAM)
for cap, method in self._used_caps
- ) and self._fs_ceiling is not None and not self._fs_ceiling.closed:
+ ) and self._fs_ceiling is not None and not self._fs_ceiling.closed \
+ and not self._wasi_dynamic_fs:
raise WasmEmissionError(
"Fs in WASI mode requires every filesystem path to be a "
"string literal (the static preopen ceiling must be "
@@ -1130,6 +1143,7 @@ def _emit_wasi_wrappers(self) -> None:
# migrated Fs op AND the ``restrict_to`` / ``allows`` wrappers;
# emit them once when any Fs op (or attenuator) is present.
if self._wasi_fs_uses_attenuation():
+ self._emit_wasi_fs_normalize_helper()
self._emit_wasi_fs_path_contained_helper()
self._emit_wasi_fs_path_allowed_helper()
if ("Fs", "restrict_to") in used:
@@ -1142,6 +1156,12 @@ def _emit_wasi_wrappers(self) -> None:
self._emit_wasi_fs_is_dir_wrapper()
if ("Fs", "mkdir") in used:
self._emit_wasi_fs_mkdir_wrapper()
+ # Layer b1: a DYNAMIC mkdir path cannot be unrolled into
+ # cumulative prefixes at compile time, so emit the runtime
+ # recursive sequencer (over the existing single-segment
+ # ``$Fs_mkdir``) when an operator preopen admits dynamic paths.
+ if self._wasi_dynamic_fs:
+ self._emit_wasi_fs_mkdir_recursive_helper()
if ("Fs", "read") in used:
self._emit_wasi_fs_read_wrapper()
if ("Fs", "write") in used:
@@ -2401,28 +2421,456 @@ def _emit_wasi_net_restrict_to_wrapper(self) -> None:
# ----- guest-side Fs attenuation (Level 2) -------------------
+ def _emit_wasi_fs_normalize_helper(self) -> None:
+ """``$__fs_normalize (src_ptr i32, src_len i32, dst_ptr i32) ->
+ i32`` -> writes the LEXICALLY normalised path into ``[dst_ptr,
+ dst_ptr+ret)`` and returns its length ``ret``.
+
+ Collapses ``.`` and ``..`` segments the way ``os.path.realpath``
+ does for the NO-SYMLINK case (the lexical part the guest can
+ reproduce without a kernel walk), so the containment gate matches
+ the Python oracle (``Fs.allows``, which canonicalises via
+ ``realpath``) for ``.`` / ``..``. Symlinks are still NOT resolved
+ -- that remains the documented Level-2 loss
+ (``docs/design/wasi_mode.md``).
+
+ Rules (validated byte-for-byte against ``os.path.normpath`` and a
+ 9331-input fuzz of the segment reference, scratchpad
+ ``wat_sim2.py``):
+ - split on ``/``; drop empty segments (``//``, trailing ``/``)
+ and ``.``;
+ - ``..`` POPS the previous emitted segment when one exists AND
+ it is not itself a (locked) leading ``..``; otherwise, for an
+ ABSOLUTE path it is dropped (cannot escape root), for a
+ RELATIVE path it is KEPT (a leading ``..`` escapes the prefix,
+ so containment must fail);
+ - an absolute path keeps its single leading ``/``; a relative
+ path that normalises to empty becomes ``.``.
+ The output is never longer than the input, so the caller sizes the
+ destination buffer at ``max(src_len, 1)``.
+
+ WAT-local helpers are inlined: segment append (prepend ``/`` when
+ ``dst_len > 0``) and the ``..`` pop / last-segment-is-``..`` test
+ (scan back from ``dst_len`` to the previous ``/`` or to 0)."""
+ self._write(
+ "(func $__fs_normalize (param $src_ptr i32) "
+ "(param $src_len i32) (param $dst_ptr i32) (result i32)"
+ )
+ self._indent += 1
+ self._write("(local $is_abs i32)")
+ self._write("(local $i i32)")
+ self._write("(local $dst_len i32)")
+ self._write("(local $seg_start i32)")
+ self._write("(local $seg_len i32)")
+ self._write("(local $last_start i32)")
+ self._write("(local $j i32)")
+ # is_abs = src_len > 0 && src[0] == '/'.
+ self._write("local.get $src_len")
+ self._write("i32.const 0")
+ self._write("i32.gt_u")
+ self._write("if (result i32)")
+ self._indent += 1
+ self._write("local.get $src_ptr")
+ self._write("i32.load8_u")
+ self._write("i32.const 47")
+ self._write("i32.eq")
+ self._indent -= 1
+ self._write("else")
+ self._indent += 1
+ self._write("i32.const 0")
+ self._indent -= 1
+ self._write("end")
+ self._write("local.set $is_abs")
+ # If absolute, the leading '/' is emitted at the end; dst here
+ # holds only the RELATIVE remainder (so the pop / leading-'..'
+ # logic never crosses the root slash). dst_len starts at 0.
+ self._write("i32.const 0")
+ self._write("local.set $dst_len")
+ self._write("i32.const 0")
+ self._write("local.set $i")
+ self._write("(block $scan_done")
+ self._indent += 1
+ self._write("(loop $scan")
+ self._indent += 1
+ self._write("local.get $i")
+ self._write("local.get $src_len")
+ self._write("i32.ge_u")
+ self._write("br_if $scan_done")
+ # skip a '/' run.
+ self._write("local.get $src_ptr")
+ self._write("local.get $i")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.const 47")
+ self._write("i32.eq")
+ self._write("if")
+ self._indent += 1
+ self._write("local.get $i")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("local.set $i")
+ self._write("br $scan")
+ self._indent -= 1
+ self._write("end")
+ # segment = [seg_start, i) until next '/' or end.
+ self._write("local.get $i")
+ self._write("local.set $seg_start")
+ self._write("(block $seg_done")
+ self._indent += 1
+ self._write("(loop $seg")
+ self._indent += 1
+ self._write("local.get $i")
+ self._write("local.get $src_len")
+ self._write("i32.ge_u")
+ self._write("br_if $seg_done")
+ self._write("local.get $src_ptr")
+ self._write("local.get $i")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.const 47")
+ self._write("i32.eq")
+ self._write("br_if $seg_done")
+ self._write("local.get $i")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("local.set $i")
+ self._write("br $seg")
+ self._indent -= 1
+ self._write(")")
+ self._indent -= 1
+ self._write(")")
+ self._write("local.get $i")
+ self._write("local.get $seg_start")
+ self._write("i32.sub")
+ self._write("local.set $seg_len")
+ # '.' (len 1, byte '.') -> drop.
+ self._write("local.get $seg_len")
+ self._write("i32.const 1")
+ self._write("i32.eq")
+ self._write("local.get $src_ptr")
+ self._write("local.get $seg_start")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.const 46")
+ self._write("i32.eq")
+ self._write("i32.and")
+ self._write("if")
+ self._indent += 1
+ self._write("br $scan")
+ self._indent -= 1
+ self._write("end")
+ # '..' (len 2, both bytes '.') -> pop / drop / keep.
+ self._write("local.get $seg_len")
+ self._write("i32.const 2")
+ self._write("i32.eq")
+ self._write("local.get $src_ptr")
+ self._write("local.get $seg_start")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.const 46")
+ self._write("i32.eq")
+ self._write("i32.and")
+ self._write("local.get $src_ptr")
+ self._write("local.get $seg_start")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.const 46")
+ self._write("i32.eq")
+ self._write("i32.and")
+ self._write("if")
+ self._indent += 1
+ # last_start = start of the last emitted segment in dst: scan back
+ # from dst_len for the previous '/'; 0 if none.
+ self._write("i32.const 0")
+ self._write("local.set $last_start")
+ self._write("local.get $dst_len")
+ self._write("local.set $j")
+ self._write("(block $back_done")
+ self._indent += 1
+ self._write("(loop $back")
+ self._indent += 1
+ self._write("local.get $j")
+ self._write("i32.eqz")
+ self._write("br_if $back_done")
+ self._write("local.get $dst_ptr")
+ self._write("local.get $j")
+ self._write("i32.const 1")
+ self._write("i32.sub")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.const 47")
+ self._write("i32.eq")
+ self._write("if")
+ self._indent += 1
+ self._write("local.get $j")
+ self._write("local.set $last_start")
+ self._write("br $back_done")
+ self._indent -= 1
+ self._write("end")
+ self._write("local.get $j")
+ self._write("i32.const 1")
+ self._write("i32.sub")
+ self._write("local.set $j")
+ self._write("br $back")
+ self._indent -= 1
+ self._write(")")
+ self._indent -= 1
+ self._write(")")
+ # can_pop = dst_len > 0 AND last segment != '..'. The last segment
+ # is '..' iff (dst_len - last_start == 2) and both its bytes are
+ # '.'. Compute "last_is_dotdot".
+ # If dst_len == 0 -> not poppable.
+ self._write("local.get $dst_len")
+ self._write("i32.eqz")
+ self._write("if")
+ self._indent += 1
+ # empty dst: absolute drops, relative keeps '..'.
+ self._write("local.get $is_abs")
+ self._write("if")
+ self._indent += 1
+ self._write("br $scan")
+ self._indent -= 1
+ self._write("end")
+ # relative + empty: append '..' (no leading '/').
+ self._write("local.get $dst_ptr")
+ self._write("i32.const 46")
+ self._write("i32.store8")
+ self._write("local.get $dst_ptr")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("i32.const 46")
+ self._write("i32.store8")
+ self._write("i32.const 2")
+ self._write("local.set $dst_len")
+ self._write("br $scan")
+ self._indent -= 1
+ self._write("end")
+ # dst_len > 0: is the last segment exactly '..'?
+ self._write("local.get $dst_len")
+ self._write("local.get $last_start")
+ self._write("i32.sub")
+ self._write("i32.const 2")
+ self._write("i32.eq")
+ self._write("local.get $dst_ptr")
+ self._write("local.get $last_start")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.const 46")
+ self._write("i32.eq")
+ self._write("i32.and")
+ self._write("local.get $dst_ptr")
+ self._write("local.get $last_start")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.const 46")
+ self._write("i32.eq")
+ self._write("i32.and")
+ self._write("if")
+ self._indent += 1
+ # last segment is a locked leading '..': absolute can't happen here
+ # (a leading '..' is only kept for relative), so keep another '..'.
+ self._write("local.get $is_abs")
+ self._write("if")
+ self._indent += 1
+ self._write("br $scan")
+ self._indent -= 1
+ self._write("end")
+ # append '/..' (dst_len > 0 so prepend a separator).
+ self._write("local.get $dst_ptr")
+ self._write("local.get $dst_len")
+ self._write("i32.add")
+ self._write("i32.const 47")
+ self._write("i32.store8")
+ self._write("local.get $dst_ptr")
+ self._write("local.get $dst_len")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("i32.add")
+ self._write("i32.const 46")
+ self._write("i32.store8")
+ self._write("local.get $dst_ptr")
+ self._write("local.get $dst_len")
+ self._write("i32.const 2")
+ self._write("i32.add")
+ self._write("i32.add")
+ self._write("i32.const 46")
+ self._write("i32.store8")
+ self._write("local.get $dst_len")
+ self._write("i32.const 3")
+ self._write("i32.add")
+ self._write("local.set $dst_len")
+ self._write("br $scan")
+ self._indent -= 1
+ self._write("end")
+ # poppable: truncate dst to last_start (drop the '/segment').
+ # last_start is the byte AFTER the separator, so the new length is
+ # last_start - 1 when last_start > 0 (drop the separator too), or 0.
+ self._write("local.get $last_start")
+ self._write("i32.eqz")
+ self._write("if (result i32)")
+ self._indent += 1
+ self._write("i32.const 0")
+ self._indent -= 1
+ self._write("else")
+ self._indent += 1
+ self._write("local.get $last_start")
+ self._write("i32.const 1")
+ self._write("i32.sub")
+ self._indent -= 1
+ self._write("end")
+ self._write("local.set $dst_len")
+ self._write("br $scan")
+ self._indent -= 1
+ self._write("end")
+ # normal segment: append it (prepend '/' when dst_len > 0).
+ self._write("local.get $dst_len")
+ self._write("i32.const 0")
+ self._write("i32.gt_u")
+ self._write("if")
+ self._indent += 1
+ self._write("local.get $dst_ptr")
+ self._write("local.get $dst_len")
+ self._write("i32.add")
+ self._write("i32.const 47")
+ self._write("i32.store8")
+ self._write("local.get $dst_len")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("local.set $dst_len")
+ self._indent -= 1
+ self._write("end")
+ # copy seg_len bytes src[seg_start..] -> dst[dst_len..].
+ self._write("i32.const 0")
+ self._write("local.set $j")
+ self._write("(block $copy_done")
+ self._indent += 1
+ self._write("(loop $copy")
+ self._indent += 1
+ self._write("local.get $j")
+ self._write("local.get $seg_len")
+ self._write("i32.ge_u")
+ self._write("br_if $copy_done")
+ self._write("local.get $dst_ptr")
+ self._write("local.get $dst_len")
+ self._write("i32.add")
+ self._write("local.get $src_ptr")
+ self._write("local.get $seg_start")
+ self._write("i32.add")
+ self._write("local.get $j")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.store8")
+ self._write("local.get $dst_len")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("local.set $dst_len")
+ self._write("local.get $j")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("local.set $j")
+ self._write("br $copy")
+ self._indent -= 1
+ self._write(")")
+ self._indent -= 1
+ self._write(")")
+ self._write("br $scan")
+ self._indent -= 1
+ self._write(")")
+ self._indent -= 1
+ self._write(")")
+ # Post-loop: build the final layout.
+ # Absolute: shift the relative remainder one byte right and write a
+ # leading '/'. dst currently holds [0, dst_len) of the relative
+ # remainder; we move it up so index 0 is '/'.
+ self._write("local.get $is_abs")
+ self._write("if")
+ self._indent += 1
+ # shift bytes right by 1, from the top down (no overlap clobber).
+ self._write("local.get $dst_len")
+ self._write("local.set $j")
+ self._write("(block $shift_done")
+ self._indent += 1
+ self._write("(loop $shift")
+ self._indent += 1
+ self._write("local.get $j")
+ self._write("i32.eqz")
+ self._write("br_if $shift_done")
+ self._write("local.get $dst_ptr")
+ self._write("local.get $j")
+ self._write("i32.add")
+ self._write("local.get $dst_ptr")
+ self._write("local.get $j")
+ self._write("i32.const 1")
+ self._write("i32.sub")
+ self._write("i32.add")
+ self._write("i32.load8_u")
+ self._write("i32.store8")
+ self._write("local.get $j")
+ self._write("i32.const 1")
+ self._write("i32.sub")
+ self._write("local.set $j")
+ self._write("br $shift")
+ self._indent -= 1
+ self._write(")")
+ self._indent -= 1
+ self._write(")")
+ self._write("local.get $dst_ptr")
+ self._write("i32.const 47")
+ self._write("i32.store8")
+ self._write("local.get $dst_len")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("return")
+ self._indent -= 1
+ self._write("end")
+ # Relative + empty result -> '.'.
+ self._write("local.get $dst_len")
+ self._write("i32.eqz")
+ self._write("if")
+ self._indent += 1
+ self._write("local.get $dst_ptr")
+ self._write("i32.const 46")
+ self._write("i32.store8")
+ self._write("i32.const 1")
+ self._write("return")
+ self._indent -= 1
+ self._write("end")
+ self._write("local.get $dst_len")
+ self._indent -= 1
+ self._write(")")
+
def _emit_wasi_fs_path_contained_helper(self) -> None:
"""``$Fs_path_contained (path_ptr i32, path_len i32,
pre_ptr i32, pre_len i32) -> i32`` -> 1 iff ``path`` is the
- directory/file ``prefix`` itself or lies under it, by LEXICAL
- path-segment containment.
+ directory/file ``prefix`` itself or lies under it, by path-segment
+ containment AFTER lexical ``.``/``..`` normalisation.
This is the guest-side analogue of the Python oracle's
``Path(os.path.realpath(path)).is_relative_to(
os.path.realpath(prefix))`` (``Fs.allows``,
``capa/runtime/_capabilities.py:173-183``). The guest cannot
- ``realpath`` (no kernel syscall), so the containment is LEXICAL:
- it compares the literal path strings. For CANONICAL paths (no
- ``.`` / ``..`` segments, no symlinks, no repeated slashes) the
- lexical result is BYTE-IDENTICAL to the oracle, because
- ``realpath`` prepends the SAME process CWD to a relative path and
- its relative prefix (so the CWD cancels in the containment) and
- leaves a canonical absolute path unchanged. For NON-CANONICAL
- paths the lexical check may diverge from ``realpath`` -- the
- honest, documented Level-2 loss (TOCTOU / symlink) in
- ``docs/design/wasi-attenuation.md``.
-
- Algorithm (matching the segment-aware ``is_relative_to``):
+ ``realpath`` (no kernel syscall), but it FIRST normalises ``.`` and
+ ``..`` in BOTH the path and the prefix lexically (``$__fs_normalize``,
+ the ``os.path.normpath``-style collapse), reproducing what
+ ``realpath`` does for those segments in the no-symlink case. So
+ ``sub/../secret.txt`` normalises to ``secret.txt`` (NOT contained
+ in ``sub`` -> denied, matching the oracle) and ``sub/../sub/ok.txt``
+ normalises to ``sub/ok.txt`` (contained -> allowed). For paths
+ whose ONLY non-canonical feature is ``.``/``..`` the result is now
+ BYTE-IDENTICAL to the oracle (``realpath`` also prepends the SAME
+ process CWD to a relative path and its relative prefix, so the CWD
+ cancels in the containment). SYMLINKS are still NOT resolved -- a
+ symlink inside the prefix that points outside it is admitted here
+ (caught only by the Level-1 preopen ceiling); that is the only
+ remaining Level-2 loss (TOCTOU / symlink) in
+ ``docs/design/wasi_mode.md``.
+
+ Algorithm (matching the segment-aware ``is_relative_to``), run on
+ the NORMALISED path / prefix:
1. strip trailing ``/`` from both path and prefix (keep a lone
``/`` as ``/``), so ``dir/`` and ``dir`` compare equal.
@@ -2443,6 +2891,54 @@ def _emit_wasi_fs_path_contained_helper(self) -> None:
self._write("(local $pl i32)")
self._write("(local $ql i32)")
self._write("(local $i i32)")
+ self._write("(local $npath_ptr i32)")
+ self._write("(local $npath_len i32)")
+ self._write("(local $npre_ptr i32)")
+ self._write("(local $npre_len i32)")
+ # LEXICAL normalisation of '.' / '..' FIRST, on BOTH path and
+ # prefix, so the containment matches the oracle (which canonicalises
+ # both via realpath). e.g. "sub/../secret.txt" normalises to
+ # "secret.txt" (NOT contained in "sub" -> denied), while
+ # "sub/../sub/ok.txt" normalises to "sub/ok.txt" (contained ->
+ # allowed). Each output is <= its input length; allocate
+ # max(len, 1) so an empty input still has a 1-byte buffer for the
+ # '.' result. Symlinks are NOT resolved (the documented Level-2
+ # loss); only '.' / '..' are collapsed.
+ self._write("local.get $path_len")
+ self._write("i32.const 1")
+ self._write("local.get $path_len")
+ self._write("i32.const 0")
+ self._write("i32.gt_u")
+ self._write("select")
+ self._write("call $alloc")
+ self._write("local.set $npath_ptr")
+ self._write("local.get $path_ptr")
+ self._write("local.get $path_len")
+ self._write("local.get $npath_ptr")
+ self._write("call $__fs_normalize")
+ self._write("local.set $npath_len")
+ self._write("local.get $pre_len")
+ self._write("i32.const 1")
+ self._write("local.get $pre_len")
+ self._write("i32.const 0")
+ self._write("i32.gt_u")
+ self._write("select")
+ self._write("call $alloc")
+ self._write("local.set $npre_ptr")
+ self._write("local.get $pre_ptr")
+ self._write("local.get $pre_len")
+ self._write("local.get $npre_ptr")
+ self._write("call $__fs_normalize")
+ self._write("local.set $npre_len")
+ # From here the compare runs on the NORMALISED buffers.
+ self._write("local.get $npath_ptr")
+ self._write("local.set $path_ptr")
+ self._write("local.get $npath_len")
+ self._write("local.set $path_len")
+ self._write("local.get $npre_ptr")
+ self._write("local.set $pre_ptr")
+ self._write("local.get $npre_len")
+ self._write("local.set $pre_len")
# pl = strip_trailing_slash_len(path); ql = ...(prefix). A
# trailing '/' is dropped unless the string is a lone '/'.
self._write("local.get $path_ptr")
@@ -3100,6 +3596,98 @@ def _emit_wasi_fs_unit_err(self, msg_off: int, msg_len: int) -> None:
self._write("i32.const 0")
self._write("i32.store offset=16")
+ def _emit_wasi_fs_mkdir_recursive_helper(self) -> None:
+ """``$Fs_mkdir_recursive (handle, full_ptr, full_len, idx,
+ rel_ptr, rel_len, ret_area)`` -> recursive ``mkdir`` over a
+ RUNTIME relative path (WASI Fs layer b1, dynamic ``--preopen``).
+
+ A dynamic ``fs.mkdir(path)`` path is not known at compile time, so
+ the literal call site's cumulative-prefix unrolling cannot run.
+ This helper replicates ``os.makedirs(exist_ok=True)`` AT RUNTIME:
+ it scans the relative path for ``/`` separators and calls the
+ existing single-segment ``$Fs_mkdir`` once per cumulative prefix
+ (``a`` then ``a/b`` then ``a/b/c``), in order, each idempotent
+ (``$Fs_mkdir`` maps ``exist`` to Ok). It SHORT-CIRCUITS the
+ moment a prefix writes a genuine ``Err`` (ret_area tag@0 != 0),
+ leaving that Err in ``ret_area`` for the materialiser -- exactly
+ the literal path's behaviour, so a multi-segment dynamic mkdir is
+ byte-parity with the oracle. The FULL path is passed unchanged to
+ every ``$Fs_mkdir`` call so the fine-attenuation gate sees the
+ same full path each time (a denied target denies the first
+ prefix). ``$Fs_mkdir`` is REUSED verbatim; this helper only
+ sequences the prefixes a runtime path cannot pre-enumerate."""
+ self._write(
+ "(func $Fs_mkdir_recursive (param $handle i32) "
+ "(param $full_ptr i32) (param $full_len i32) (param $idx i32) "
+ "(param $rel_ptr i32) (param $rel_len i32) (param $ret_area i32)"
+ )
+ self._indent += 1
+ self._write("(local $k i32)")
+ # Walk k = 1 .. rel_len; at each k that is either a '/' boundary
+ # (rel[k] == '/') or the end (k == rel_len), mkdir the prefix
+ # rel[0:k]. A leading '/' yields a zero-length first prefix the
+ # boundary loop never emits (k starts at 1 and rel[0]=='/' is a
+ # boundary that mkdirs rel[0:1] == "/", which $Fs_mkdir handles).
+ self._write("i32.const 1")
+ self._write("local.set $k")
+ self._write("(block $done")
+ self._indent += 1
+ self._write("(loop $seg")
+ self._indent += 1
+ # if k > rel_len -> done.
+ self._write("local.get $k")
+ self._write("local.get $rel_len")
+ self._write("i32.gt_u")
+ self._write("br_if $done")
+ # boundary = (k == rel_len) OR (rel[k] == '/'). Guard the load
+ # behind the end check so k == rel_len never reads out of range.
+ self._write("local.get $k")
+ self._write("local.get $rel_len")
+ self._write("i32.eq")
+ self._write("if (result i32)")
+ self._indent += 1
+ self._write("i32.const 1")
+ self._indent -= 1
+ self._write("else")
+ self._indent += 1
+ self._write("local.get $rel_ptr")
+ self._write("local.get $k")
+ self._write("i32.add")
+ self._write("i32.load8_u offset=0")
+ self._write("i32.const 47") # '/'
+ self._write("i32.eq")
+ self._indent -= 1
+ self._write("end")
+ self._write("if")
+ self._indent += 1
+ # mkdir(prefix = rel[0:k]).
+ self._write("local.get $handle")
+ self._write("local.get $full_ptr")
+ self._write("local.get $full_len")
+ self._write("local.get $idx")
+ self._write("local.get $rel_ptr") # prefix ptr = rel_ptr
+ self._write("local.get $k") # prefix len = k
+ self._write("local.get $ret_area")
+ self._write("call $Fs_mkdir")
+ # Short-circuit on a genuine Err (tag@0 != 0).
+ self._write("local.get $ret_area")
+ self._write("i32.load8_u offset=0")
+ self._write("br_if $done")
+ self._indent -= 1
+ self._write("end")
+ # k += 1; continue.
+ self._write("local.get $k")
+ self._write("i32.const 1")
+ self._write("i32.add")
+ self._write("local.set $k")
+ self._write("br $seg")
+ self._indent -= 1
+ self._write(")")
+ self._indent -= 1
+ self._write(")")
+ self._indent -= 1
+ self._write(")")
+
# ----- Fs.read via wasi:filesystem + wasi:io/streams ---------
def _emit_wasi_fs_read_wrapper(self) -> None:
diff --git a/capa/manifest/__init__.py b/capa/manifest/__init__.py
index 7594af1..1b5e25e 100644
--- a/capa/manifest/__init__.py
+++ b/capa/manifest/__init__.py
@@ -38,7 +38,10 @@
from __future__ import annotations
from ._cyclonedx import CYCLONEDX_SPEC_VERSION, build_cyclonedx
-from ._funrec import SCHEMA_VERSION, build_manifest, display_filename
+from ._funrec import (
+ SCHEMA_VERSION, build_manifest, build_operator_declared_grants,
+ display_filename,
+)
from ._provenance import (
CAPA_BUILD_TYPE, CAPA_BUILDER_ID, SLSA_PREDICATE_TYPE,
build_provenance,
@@ -57,6 +60,7 @@
"CAPA_BUILDER_ID",
"SLSA_PREDICATE_TYPE",
"build_manifest",
+ "build_operator_declared_grants",
"build_cyclonedx",
"build_spdx",
"build_vex_document",
diff --git a/capa/manifest/_cyclonedx.py b/capa/manifest/_cyclonedx.py
index e76329c..8409724 100644
--- a/capa/manifest/_cyclonedx.py
+++ b/capa/manifest/_cyclonedx.py
@@ -72,6 +72,7 @@ def build_cyclonedx(
source: Optional[str] = None,
sources: Optional[dict[str, str]] = None,
expr_labels: Optional[dict[int, str]] = None,
+ operator_declared_grants: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""Build a CycloneDX 1.5 SBOM with embedded Capa capability metadata.
@@ -88,6 +89,7 @@ def build_cyclonedx(
inner = build_manifest(
module, filename=filename, capa_version=capa_version,
expr_labels=expr_labels,
+ operator_declared_grants=operator_declared_grants,
)
if timestamp is None:
@@ -123,6 +125,28 @@ def build_cyclonedx(
"value": str(inner["summary"]["functions_crossing_unsafe"])},
]
+ # WASI Fs layer b1: surface operator-DECLARED grants (e.g. --preopen)
+ # as top-level metadata properties, clearly namespaced + labelled as
+ # operator-declared (Level 2) so an SBOM consumer never mistakes them
+ # for the program-proven, compiler-derived capability surface. The
+ # trust_level property states the framing in-band; one
+ # ``capa:operator_declared_grant:preopen`` property per granted dir.
+ _grants = inner.get("operator_declared_grants") or {}
+ _preopens = _grants.get("preopens") or []
+ if _preopens:
+ metadata_properties.append({
+ "name": "capa:operator_declared_grants:trust_level",
+ "value": str(_grants.get("trust_level", "operator-declared")),
+ })
+ for _pre in _preopens:
+ metadata_properties.append({
+ "name": "capa:operator_declared_grant:preopen",
+ "value": (
+ f"{_pre.get('host_dir', '')}"
+ f" [{_pre.get('permission', 'rw')}]"
+ ),
+ })
+
metadata = {
"timestamp": timestamp,
"tools": {
diff --git a/capa/manifest/_funrec.py b/capa/manifest/_funrec.py
index b4c1205..3c6f7c7 100644
--- a/capa/manifest/_funrec.py
+++ b/capa/manifest/_funrec.py
@@ -230,12 +230,47 @@ def _demangle_type_text(s: str) -> str:
return _MANGLE_INLINE_RE.sub(r"\1", s)
+def build_operator_declared_grants(
+ preopens: Optional[list[dict[str, Any]]] = None,
+) -> dict[str, Any]:
+ """Build the ``operator_declared_grants`` manifest block (WASI Fs
+ layer b1, 2026-06-30).
+
+ This block records authority the OPERATOR declared at build / run
+ time (e.g. ``--preopen ``), as DISTINCT from the
+ compiler-DERIVED capability surface that the rest of the manifest
+ proves. A regulator MUST read it as Level-2 operator-DECLARED
+ authority, NOT as program-proven: the compiler could not derive
+ these grants (that is precisely why the operator had to declare
+ them), so they are an explicit trust the operator placed in the
+ deployment, not a property the type system established.
+
+ ``preopens`` is a list of ``{"host_dir": str, "permission":
+ "ro"|"rw", "kind": "fs"}`` entries (or None / empty when no operator
+ grant was declared). The block is always present so a consumer can
+ rely on its shape; an empty ``preopens`` means "no operator grant
+ was declared"."""
+ return {
+ # The honest label a regulator-facing consumer keys on: this is
+ # NOT derived/proven authority.
+ "trust_level": "operator-declared",
+ "note": (
+ "Authority declared by the operator at build/run time "
+ "(e.g. --preopen). DISTINCT from the compiler-derived, "
+ "program-proven capability surface; the compiler could not "
+ "derive these grants."
+ ),
+ "preopens": list(preopens or []),
+ }
+
+
def build_manifest(
module: A.Module,
*,
filename: str = "",
capa_version: Optional[str] = None,
expr_labels: Optional[dict[int, str]] = None,
+ operator_declared_grants: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""Build a manifest dict from an analysed module.
@@ -249,6 +284,12 @@ def build_manifest(
@public`` bridges, dropping no-op declassifies of already-public
values. When omitted, every syntactic declassify is counted (the
historical, analysis-free behaviour).
+
+ ``operator_declared_grants`` (WASI Fs layer b1, 2026-06-30): the
+ block produced by :func:`build_operator_declared_grants` recording
+ operator-DECLARED authority (e.g. ``--preopen``), clearly distinct
+ from the derived surface. When None, an EMPTY grants block is
+ recorded so the field shape is stable for consumers.
"""
if capa_version is None:
from .. import __version__ as capa_version
@@ -393,6 +434,14 @@ def build_manifest(
"user_defined_capabilities": user_caps,
"typestates": protocol_states,
"functions": functions,
+ # WASI Fs layer b1: operator-declared authority (e.g. --preopen),
+ # honestly labelled Level-2 / operator-declared, distinct from the
+ # derived surface above. Always present (empty when none declared).
+ "operator_declared_grants": (
+ operator_declared_grants
+ if operator_declared_grants is not None
+ else build_operator_declared_grants()
+ ),
"summary": summary,
}
diff --git a/capa/manifest/_spdx.py b/capa/manifest/_spdx.py
index 18acebd..4141a6a 100644
--- a/capa/manifest/_spdx.py
+++ b/capa/manifest/_spdx.py
@@ -105,6 +105,7 @@ def build_spdx(
source: Optional[str] = None,
sources: Optional[dict[str, str]] = None,
expr_labels: Optional[dict[int, str]] = None,
+ operator_declared_grants: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""Build an SPDX 2.3 document with embedded Capa capability metadata.
@@ -121,6 +122,7 @@ def build_spdx(
inner = build_manifest(
module, filename=filename, capa_version=capa_version,
expr_labels=expr_labels,
+ operator_declared_grants=operator_declared_grants,
)
if timestamp is None:
@@ -157,6 +159,21 @@ def build_spdx(
_annot(timestamp, "summary:functions_crossing_unsafe",
str(inner["summary"]["functions_crossing_unsafe"])),
]
+ # WASI Fs layer b1: operator-DECLARED grants (e.g. --preopen) as
+ # program-package annotations, labelled operator-declared (Level 2)
+ # so an SPDX consumer does not read them as program-proven authority.
+ _grants = inner.get("operator_declared_grants") or {}
+ _preopens = _grants.get("preopens") or []
+ if _preopens:
+ program_annotations.append(_annot(
+ timestamp, "operator_declared_grants:trust_level",
+ str(_grants.get("trust_level", "operator-declared")),
+ ))
+ for _pre in _preopens:
+ program_annotations.append(_annot(
+ timestamp, "operator_declared_grant:preopen",
+ f"{_pre.get('host_dir', '')} [{_pre.get('permission', 'rw')}]",
+ ))
program_pkg = {
"SPDXID": program_id,
"name": bom_basename,
diff --git a/capa/runtime/_wasm_component_host.py b/capa/runtime/_wasm_component_host.py
index 4f94812..8fe878f 100644
--- a/capa/runtime/_wasm_component_host.py
+++ b/capa/runtime/_wasm_component_host.py
@@ -86,6 +86,7 @@ def __init__(
wasi: bool = False,
env_ceiling: Optional["object"] = None,
fs_ceiling: Optional["object"] = None,
+ fs_operator_preopen: Optional[tuple] = None,
net_ceiling: Optional["object"] = None,
stdin: Optional[bytes] = None,
):
@@ -118,6 +119,21 @@ def __init__(
# already rejected such a program in --wasi mode, so this is
# belt-and-braces. Only consulted in ``--wasi`` mode.
self._fs_ceiling = fs_ceiling
+ # WASI Fs layer b1 (operator preopen, 2026-06-30): an OPERATOR-
+ # DECLARED filesystem grant, ``(host_dir, read_write)`` or None.
+ # When the operator passes ``--preopen [:ro|:rw]`` the host
+ # registers that directory as a preopen AFTER every
+ # compiler-derived ceiling preopen, so its guest index is
+ # ``len(ceiling.preopens)``. In the dynamic-path case the derived
+ # ceiling is NOT closed (no derived preopens), so the operator
+ # preopen lands at index 0 -- the constant the dynamic-path
+ # call-site emitter addresses. This is the WASI ``--dir`` model
+ # (wasmtime's ``--dir``): authority DECLARED by the operator
+ # (Level 2), distinct from the COMPILER-DERIVED ceiling. It is the
+ # ONLY thing that lets a dynamic Fs path resolve at runtime; the
+ # compiler suppresses its dynamic-path rejection symmetrically
+ # (``--wasi-dynamic-fs``). Only consulted in ``--wasi`` mode.
+ self._fs_operator_preopen = fs_operator_preopen
# Records the preopens actually installed on the WasiConfig in
# WASI mode (a list of (host_path, "ro"|"rw") tuples), exposed
# for tests / diagnostics so the ceiling guarantee is
@@ -420,7 +436,11 @@ def _apply_fs_preopens(self, wasi_cfg) -> None:
through a READ_ONLY preopen, independent of guest behaviour."""
ceiling = self._fs_ceiling
if ceiling is None or not getattr(ceiling, "closed", False):
- self._wasi_fs_applied = []
+ # No derived preopens. In layer b1 an OPERATOR ``--preopen``
+ # may still grant authority for dynamic paths: register it
+ # alone (at index 0, matching the dynamic call-site emitter's
+ # ``_wasi_operator_preopen_index() == 0`` for an open ceiling).
+ self._wasi_fs_applied = self._apply_operator_preopen(wasi_cfg, 0)
return
# ``get-directories`` returns descriptors ONLY for the preopens
# actually registered, in registration order, so EVERY ceiling
@@ -469,8 +489,48 @@ def _apply_fs_preopens(self, wasi_cfg) -> None:
wasi_cfg.preopen_dir(
host_path, guest_path, dir_perms, file_perms,
)
+ # Layer b1: append the operator ``--preopen`` AFTER the derived
+ # preopens so it never shifts a derived index (index ==
+ # len(ceiling.preopens)). For an all-literal program this preopen
+ # is registered + recorded but unused by the guest (no dynamic
+ # call site); the grant stays honest in the SBOM regardless.
+ applied += self._apply_operator_preopen(
+ wasi_cfg, len(ceiling.preopens),
+ )
self._wasi_fs_applied = applied
+ def _apply_operator_preopen(self, wasi_cfg, index: int):
+ """Register the operator ``--preopen`` directory (layer b1) at the
+ given guest preopen ``index`` and return the list of applied
+ records (empty when no operator preopen was declared).
+
+ The operator preopen is an explicit Level-2 operator grant: the
+ directory is mounted READ_WRITE or READ_ONLY per its declared
+ permission. A non-existent host directory is skipped (no record),
+ so a dynamic path resolved against a missing preopen sees no
+ descriptor at ``index`` and fails fail-closed-as-absent, matching
+ the derived-ceiling convention."""
+ grant = self._fs_operator_preopen
+ if not grant:
+ return []
+ host_dir, read_write = grant[0], bool(grant[1])
+ if not os.path.isdir(host_dir):
+ return []
+ guest_path = f"/capa-preopen-{index}"
+ if read_write:
+ wasi_cfg.preopen_dir(
+ host_dir, guest_path,
+ wasmtime.DirPerms.READ_WRITE,
+ wasmtime.FilePerms.READ_WRITE,
+ )
+ return [(host_dir, "operator-rw")]
+ wasi_cfg.preopen_dir(
+ host_dir, guest_path,
+ wasmtime.DirPerms.READ_ONLY,
+ wasmtime.FilePerms.READ_ONLY,
+ )
+ return [(host_dir, "operator-ro")]
+
def _register_all(self) -> None:
root = self._linker.root()
self._register_stdio(root)
diff --git a/docs/design/wasi_mode.md b/docs/design/wasi_mode.md
index 653c961..daade87 100644
--- a/docs/design/wasi_mode.md
+++ b/docs/design/wasi_mode.md
@@ -34,7 +34,7 @@ see the WAT) rewrites the migrated touch-points:
| `Env.restrict_to_keys` | `capa:host/env.restrict-to-keys` (host handle table) | guest-side allow-list intersection (no host) |
| `Env.allows` | `capa:host/env.allows` (host handle table) | guest-side allow-list membership (no host) |
| `Fs.restrict_to` | `capa:host/fs.restrict-to` (host handle table) | guest-side prefix allow-list union (no host) |
-| `Fs.allows` | `capa:host/fs.allows` (host handle table) | guest-side lexical prefix containment (no host) |
+| `Fs.allows` | `capa:host/fs.allows` (host handle table) | guest-side prefix containment with lexical `.`/`..` normalisation (no host; symlinks unresolved) |
| `Net.get` | `capa:host/net.get` (host handle table) | `wasi:http/outgoing-handler.handle` + the wasi:http request/response chain + `wasi:io/streams` body read, gated guest-side by the static ceiling **and** the fine allow-list |
| `Net.post` | `capa:host/net.post` (host handle table) | the Net.get chain + `wasi:io/streams` flow-controlled outgoing-body **write** of the request body before the handle, same two guest-side gates |
| `Net.restrict_to` | `capa:host/net.restrict-to` (host handle table) | guest-side host allow-list intersection (no host) |
@@ -805,22 +805,33 @@ exactly like Env:
`write` / `mkdir` therefore leaves **nothing** on disk -- the gate
fires before the file is opened.
-**Path containment (LEXICAL, not realpath).** The oracle canonicalises
-both the prefix and the queried path with `os.path.realpath` (resolving
-`..` / `.` / symlinks) before the `is_relative_to` boundary check. The
-guest has **no realpath syscall**, so `$Fs_path_contained` does a
-**lexical** path-segment containment: strip trailing `/` from both, then
-the path is contained iff its first `len(prefix)` bytes equal the prefix
-AND the next byte is `/` or the path IS the prefix (the segment boundary
-that stops `data/ab` matching `data/a`). **For CANONICAL paths** (no `.`
-/ `..` segments, no symlinks, no repeated slashes) this is
-**byte-identical** to the oracle: `realpath` prepends the SAME process
-CWD to a relative path and its relative prefix (so the CWD cancels in the
-containment) and leaves a canonical absolute path unchanged. **For
-NON-CANONICAL paths or symlinks** the lexical check may **diverge** from
-the realpath oracle -- the honest, documented **TOCTOU / symlink loss**
-of Level 2. The migrated tests use canonical absolute literals, where
-parity holds byte-for-byte across all three backends.
+**Path containment (LEXICAL `.`/`..` normalisation, symlinks not
+resolved).** The oracle canonicalises both the prefix and the queried
+path with `os.path.realpath` (resolving `..` / `.` / symlinks) before the
+`is_relative_to` boundary check. The guest has **no realpath syscall**,
+so `$Fs_path_contained` **lexically normalises** the `.` and `..`
+segments of BOTH the path and the prefix FIRST (via `$__fs_normalize`,
+the `os.path.normpath`-style collapse that preserves a leading `..` so an
+escape stays an escape), and only then does the path-segment containment:
+strip trailing `/` from both, then the path is contained iff its first
+`len(prefix)` bytes equal the prefix AND the next byte is `/` or the path
+IS the prefix (the segment boundary that stops `data/ab` matching
+`data/a`). The `.`/`..` normalisation reproduces what `realpath` does for
+those segments in the no-symlink case, so a dynamic path such as
+`sub/../secret.txt` normalises to `secret.txt` (NOT contained in `sub` ->
+**denied**, matching the oracle), while `sub/../sub/ok.txt` normalises to
+`sub/ok.txt` (contained -> **allowed**). The `$__fs_normalize` rule is
+validated byte-for-byte against `os.path.normpath` and over a 9331-input
+segment fuzz (see the scratchpad reference behind
+`TestWasiFsDynamicPreopen.test_restricted_fs_dynamic_path_dotdot_normalized`).
+`realpath` also prepends the SAME process CWD to a relative path and its
+relative prefix (so the CWD cancels in the containment) and leaves a
+canonical absolute path unchanged, so for non-symlink paths the result is
+**byte-identical** to the oracle. **SYMLINKS are still NOT resolved** --
+the gate is lexical, so a symlink inside a prefix that points outside it
+is admitted by the guest (and caught only by the Level-1 preopen
+ceiling). That symlink (TOCTOU) loss is the **only** remaining Level-2
+divergence from the realpath oracle; `.` and `..` are now handled.
**Interaction Level 1 + Level 2.** The guest-side allow-list (fine,
Level 2) operates ON TOP OF the preopen (the Level-1 ceiling): the fine
diff --git a/tests/test_manifest.py b/tests/test_manifest.py
index e722960..2008cf7 100644
--- a/tests/test_manifest.py
+++ b/tests/test_manifest.py
@@ -263,6 +263,10 @@ class TestTopLevelShape(unittest.TestCase):
"user_defined_capabilities",
"typestates",
"functions",
+ # WASI Fs layer b1: operator-declared authority (e.g. --preopen),
+ # always present (empty when none declared), distinct from the
+ # compiler-derived surface.
+ "operator_declared_grants",
"summary",
}
diff --git a/tests/test_wasi_mode.py b/tests/test_wasi_mode.py
index 81b5618..4578758 100644
--- a/tests/test_wasi_mode.py
+++ b/tests/test_wasi_mode.py
@@ -526,6 +526,66 @@ def test_wasi_without_component_rejected(self):
self.assertIn("--wasi requires --component", err)
+class TestWasiPreopenFlagGuards(unittest.TestCase):
+ """``--preopen`` (layer b1) guards: it requires --wasi (or an SBOM
+ command) and b1 supports a single preopen for dynamic paths. These
+ fail before any Wasm toolchain is needed."""
+
+ def _run_cli(self, argv, src):
+ import tempfile
+ from pathlib import Path
+ from capa.cli import main
+ err = io.StringIO()
+ old_err, old_out, old_argv = sys.stderr, sys.stdout, sys.argv
+ sys.stderr = err
+ sys.stdout = io.StringIO()
+ with tempfile.TemporaryDirectory() as d:
+ f = Path(d) / "p.capa"
+ f.write_text(src, encoding="utf-8")
+ sys.argv = ["capa", *argv, str(f)]
+ try:
+ code = main()
+ finally:
+ sys.stderr, sys.stdout, sys.argv = old_err, old_out, old_argv
+ return code, err.getvalue()
+
+ _DYN = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.read(p)\n"
+ " Ok(c) -> stdio.println(c)\n"
+ " Err(e) -> stdio.println(\"err\")\n"
+ " None -> stdio.println(\"none\")\n"
+ )
+
+ def test_preopen_without_wasi_rejected(self):
+ code, err = self._run_cli(
+ ["--wasm", "--component", "--run", "--preopen", "/tmp/x"],
+ self._DYN,
+ )
+ self.assertEqual(code, 1)
+ self.assertIn("--preopen requires --wasi", err)
+
+ def test_multiple_preopen_rejected(self):
+ code, err = self._run_cli(
+ ["--wasm", "--component", "--wasi", "--run",
+ "--preopen", "/tmp/a", "--preopen", "/tmp/b"],
+ self._DYN,
+ )
+ self.assertEqual(code, 1)
+ self.assertIn("single --preopen", err)
+
+ def test_preopen_allowed_with_manifest(self):
+ # --preopen is accepted alongside an SBOM/--manifest command (it
+ # records the operator grant). No --wasi needed there.
+ code, err = self._run_cli(
+ ["--manifest", "--preopen", "/data:ro"], self._DYN,
+ )
+ self.assertEqual(code, 0, err)
+
+
class TestWasiEnvCeilingAnalysis(unittest.TestCase):
"""Static Env authority-ceiling analysis (Level 1 pre-requisite).
@@ -1421,6 +1481,195 @@ def test_read_dynamic_path_fail_closed_rejected(self):
self.assertIn("literal", str(cm.exception))
+class TestWasiFsDynamicPreopenCompile(unittest.TestCase):
+ """WASI Fs layer b1: the operator ``--preopen`` flag UNBLOCKS a
+ DYNAMIC Fs path at compile time (suppressing the dynamic-path
+ rejection) and records the grant in the SBOM. Pure-Python checks (no
+ wasm-tools / wasmtime), so this class is not gated."""
+
+ _DYN_SRC = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.read(p)\n"
+ " Ok(c) -> stdio.println(c)\n"
+ " Err(e) -> stdio.println(\"err\")\n"
+ " None -> stdio.println(\"none\")\n"
+ )
+
+ def _compile(self, src: str, *, dynamic_fs: bool):
+ from capa.ir import compile_wat
+ module, result = _parse_analyze(src)
+ return compile_wat(
+ module, types=result.types, wasi=True,
+ wasi_dynamic_fs=dynamic_fs,
+ )
+
+ def test_without_preopen_still_rejected(self):
+ # NO regression: without the operator preopen the dynamic path is
+ # still rejected at compile time exactly as before.
+ with self.assertRaises(Exception) as cm:
+ self._compile(self._DYN_SRC, dynamic_fs=False)
+ self.assertIn("WASI mode", str(cm.exception))
+ self.assertIn("literal", str(cm.exception))
+
+ def test_with_preopen_compiles(self):
+ # With the operator preopen the dynamic path compiles: the Fs.read
+ # wrapper + the preopen resolver are emitted, no capa:host/fs.
+ wat = self._compile(self._DYN_SRC, dynamic_fs=True)
+ self.assertIn("(func $Fs_read", wat)
+ self.assertIn("(func $__wasi_fs_preopen_desc", wat)
+ self.assertNotIn('"capa:host/fs"', wat)
+
+ def test_dynamic_metadata_and_streams_compile(self):
+ # exists / is_dir / mkdir / write / list_dir all admit a dynamic
+ # path under the operator preopen.
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " stdio.println(\"${fs.exists(p)}\")\n"
+ " stdio.println(\"${fs.is_dir(p)}\")\n"
+ " let m = fs.mkdir(p)\n"
+ " let w = fs.write(p, \"x\")\n"
+ " let l = fs.list_dir(p)\n"
+ " None -> stdio.println(\"none\")\n"
+ )
+ wat = self._compile(src, dynamic_fs=True)
+ self.assertIn("(func $Fs_exists", wat)
+ self.assertIn("(func $Fs_is_dir", wat)
+ self.assertIn("(func $Fs_mkdir", wat)
+ self.assertIn("(func $Fs_write", wat)
+ self.assertIn("(func $Fs_list_dir", wat)
+
+ def test_mixed_literal_and_dynamic_path_clear_message(self):
+ # A program that mixes a LITERAL Fs path and a DYNAMIC one under
+ # --preopen fails closed (no index misalignment), but with a CLEAR
+ # message naming the b1 limitation and the flag, not the internal
+ # "no closed preopen ceiling" wording.
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match fs.read(\"fixed.txt\")\n"
+ " Ok(c) -> stdio.println(c)\n"
+ " Err(e) -> stdio.println(\"err\")\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.read(p)\n"
+ " Ok(c) -> stdio.println(c)\n"
+ " Err(e) -> stdio.println(\"err\")\n"
+ " None -> stdio.println(\"none\")\n"
+ )
+ with self.assertRaises(Exception) as cm:
+ self._compile(src, dynamic_fs=True)
+ msg = str(cm.exception)
+ self.assertIn("--preopen", msg)
+ self.assertIn("MIXING", msg)
+ self.assertNotIn("has no closed preopen ceiling", msg)
+
+ def test_operator_preopen_index_is_zero_when_ceiling_open(self):
+ # b1 index rule: with no derived preopens (dynamic ceiling) the
+ # operator preopen is index 0, the constant the dynamic call site
+ # addresses.
+ from capa.ir import compile_wat # noqa: F401
+ from capa.ir._emit_wasm import WasmEmitter
+ from capa.ir._lower import Lowerer
+ module, result = _parse_analyze(self._DYN_SRC)
+ cir = Lowerer(types=result.types or {}).lower_module(module)
+ em = WasmEmitter(wasi=True, wasi_dynamic_fs=True)
+ em.emit(cir)
+ self.assertEqual(em._wasi_operator_preopen_index(), 0)
+
+ def test_grant_recorded_in_manifest(self):
+ # The operator grant is surfaced in the manifest as a Level-2
+ # operator-DECLARED block, distinct from the derived surface.
+ from capa.manifest import (
+ build_manifest, build_operator_declared_grants,
+ )
+ module, result = _parse_analyze(self._DYN_SRC)
+ grants = build_operator_declared_grants([
+ {"kind": "fs", "host_dir": "/data", "permission": "rw"},
+ ])
+ man = build_manifest(
+ module, operator_declared_grants=grants,
+ )
+ block = man["operator_declared_grants"]
+ self.assertEqual(block["trust_level"], "operator-declared")
+ self.assertEqual(block["preopens"][0]["host_dir"], "/data")
+ self.assertEqual(block["preopens"][0]["permission"], "rw")
+
+ def test_grant_recorded_in_cyclonedx_and_spdx(self):
+ from capa.manifest import (
+ build_cyclonedx, build_spdx, build_operator_declared_grants,
+ )
+ module, result = _parse_analyze(self._DYN_SRC)
+ grants = build_operator_declared_grants([
+ {"kind": "fs", "host_dir": "/data", "permission": "ro"},
+ ])
+ cdx = build_cyclonedx(
+ module, timestamp="2026-06-30T00:00:00Z",
+ operator_declared_grants=grants,
+ )
+ props = {p["name"]: p["value"] for p in cdx["metadata"]["properties"]}
+ self.assertEqual(
+ props["capa:operator_declared_grants:trust_level"],
+ "operator-declared",
+ )
+ self.assertIn(
+ "capa:operator_declared_grant:preopen", props,
+ )
+ self.assertIn("/data", props["capa:operator_declared_grant:preopen"])
+ spdx = build_spdx(
+ module, timestamp="2026-06-30T00:00:00Z",
+ operator_declared_grants=grants,
+ )
+ comments = [
+ a["comment"] for a in spdx["packages"][0]["annotations"]
+ ]
+ self.assertTrue(any(
+ "operator_declared_grant:preopen" in c for c in comments
+ ))
+
+ def test_empty_grant_block_present_by_default(self):
+ # The block is always present (empty preopens) so consumers can
+ # rely on the shape even when no operator grant is declared.
+ from capa.manifest import build_manifest
+ module, result = _parse_analyze(
+ "fun main(stdio: Stdio)\n stdio.println(\"hi\")\n"
+ )
+ man = build_manifest(module)
+ self.assertEqual(man["operator_declared_grants"]["preopens"], [])
+
+
+class TestWasiPreopenSpecParse(unittest.TestCase):
+ """The CLI ``--preopen [:ro|:rw]`` spec parser (pure Python)."""
+
+ def test_default_is_read_write(self):
+ from capa.cli import _parse_preopen_spec
+ self.assertEqual(_parse_preopen_spec("/data"), ("/data", True))
+
+ def test_ro_suffix(self):
+ from capa.cli import _parse_preopen_spec
+ self.assertEqual(_parse_preopen_spec("/data:ro"), ("/data", False))
+
+ def test_rw_suffix(self):
+ from capa.cli import _parse_preopen_spec
+ self.assertEqual(_parse_preopen_spec("/data:rw"), ("/data", True))
+
+ def test_colon_in_path_preserved(self):
+ from capa.cli import _parse_preopen_spec
+ # Only a trailing :ro / :rw is a permission suffix; a Windows
+ # drive colon (or any other colon) is preserved.
+ self.assertEqual(
+ _parse_preopen_spec("C:/data"), ("C:/data", True),
+ )
+ self.assertEqual(
+ _parse_preopen_spec("C:/data:ro"), ("C:/data", False),
+ )
+
+
class TestWasiNetDynamicUrlRejections(unittest.TestCase):
"""A DYNAMIC Net url (not a string literal) reaching get / post is
rejected at COMPILE time in --wasi (2026-06-29), SYMMETRIC with the Fs
@@ -2483,6 +2732,404 @@ def _run_wasi_fs(src: str, data_dir: str) -> str:
)
+def _build_wasi_dynamic_fs_component(src: str) -> bytes:
+ """Build a --wasi component with the operator-preopen flag set, so a
+ DYNAMIC Fs path is admitted (layer b1). The compiler suppresses its
+ dynamic-path rejection because an operator preopen is declared."""
+ from capa.ir import compile_wasm, compile_wit
+ from capa.cli import _wrap_as_component
+ module, result = _parse_analyze(src)
+ core = compile_wasm(
+ module, types=result.types, wasi=True, wasi_dynamic_fs=True,
+ )
+ wit = compile_wit(module, types=result.types, wasi=True)
+ return _wrap_as_component(core, wit, wasi=True)
+
+
+def _run_wasi_dynamic_fs(
+ src: str, preopen_dir: str, *, read_write: bool = True,
+ args: tuple = (),
+) -> str:
+ """Build + run a DYNAMIC-path Fs program in WASI mode under a single
+ operator ``--preopen`` directory; capture stdout. The dynamic path is
+ resolved at runtime relative to ``preopen_dir`` (the operator grant)."""
+ from capa.runtime._wasm_component_host import WasmComponentHost
+ comp = _build_wasi_dynamic_fs_component(src)
+ host = WasmComponentHost(
+ args=args, wasi=True,
+ fs_operator_preopen=(preopen_dir, read_write),
+ )
+ return _wasi_run_capture(host, comp)
+
+
+def _run_python_in_cwd(src: str, cwd: str, args: tuple = ()) -> str:
+ """Run a program on the Python oracle with ``cwd`` as the working
+ directory and ``args`` as ``sys.argv[1:]`` (so ``env.args()`` and a
+ relative Fs path resolve the same way the WASI operator preopen makes
+ them resolve: relative to the granted directory)."""
+ from capa import transpile
+ module, result = _parse_analyze(src)
+ code = transpile(module, types=result.types, bindings=result.bindings)
+ buf = io.StringIO()
+ saved_out, saved_argv, saved_cwd = sys.stdout, list(sys.argv), os.getcwd()
+ sys.stdout = buf
+ sys.argv = ["prog"] + list(args)
+ os.chdir(cwd)
+ try:
+ ns: dict = {"__name__": "__main__"}
+ exec(compile(code, "", "exec"), ns)
+ finally:
+ sys.stdout = saved_out
+ sys.argv = saved_argv
+ os.chdir(saved_cwd)
+ return buf.getvalue()
+
+
+def _run_capa_host_in_cwd(src: str, cwd: str, args: tuple = ()) -> str:
+ """Run a program on the default capa:host component backend with
+ ``cwd`` as the working directory and ``args`` as the program argv, so
+ a relative dynamic Fs path resolves identically to the WASI operator
+ preopen and the Python oracle."""
+ from capa.ir import compile_wasm, compile_wit
+ from capa.cli import _wrap_as_component
+ from capa.runtime._wasm_component_host import WasmComponentHost
+ module, result = _parse_analyze(src)
+ core = compile_wasm(module, types=result.types, wasi=False)
+ wit = compile_wit(module, types=result.types, wasi=False)
+ comp = _wrap_as_component(core, wit, wasi=False)
+ buf = io.StringIO()
+ saved_out, saved_cwd = sys.stdout, os.getcwd()
+ sys.stdout = buf
+ os.chdir(cwd)
+ try:
+ WasmComponentHost(args=args, wasi=False).run_main(comp)
+ finally:
+ sys.stdout = saved_out
+ os.chdir(saved_cwd)
+ return buf.getvalue()
+
+
+@unittest.skipUnless(
+ _has_wasm_tools() and _has_wasmtime_wasip2(),
+ "wasm-tools and/or wasmtime-py with WASI P2 not installed",
+)
+class TestWasiFsDynamicPreopen(unittest.TestCase):
+ """End-to-end WASI Fs layer b1: a genuine DYNAMIC Fs path (sourced
+ from ``env.args()``) compiles + runs under a single operator
+ ``--preopen`` directory, resolving at runtime relative to it, with
+ three-way byte-parity (Python oracle == capa:host backend == WASI
+ backend) on both output and filesystem effect. The dynamic path makes
+ the static ceiling NOT closed, so the operator preopen is the sole
+ preopen (index 0)."""
+
+ def setUp(self):
+ import tempfile
+ self._td = tempfile.mkdtemp(prefix="capa-wasi-dynfs-")
+
+ def tearDown(self):
+ import shutil
+ shutil.rmtree(self._td, ignore_errors=True)
+
+ def _fresh_dir(self, name):
+ import tempfile
+ d = tempfile.mkdtemp(prefix=f"capa-{name}-", dir=self._td)
+ return d
+
+ def test_read_dynamic_three_backend_parity(self):
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.read(p)\n"
+ " Ok(c) -> stdio.println(c)\n"
+ " Err(e) -> stdio.println(\"ERR\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ # One controlled directory PER backend so each reads its own copy.
+ outs = []
+ for be in ("wasi", "py", "host"):
+ d = self._fresh_dir(be)
+ with open(os.path.join(d, "hello.txt"), "w") as f:
+ f.write("DYNAMIC-READ-OK")
+ if be == "wasi":
+ outs.append(_run_wasi_dynamic_fs(
+ src, d, args=("hello.txt",),
+ ))
+ elif be == "py":
+ outs.append(_run_python_in_cwd(src, d, args=("hello.txt",)))
+ else:
+ outs.append(_run_capa_host_in_cwd(
+ src, d, args=("hello.txt",),
+ ))
+ self.assertEqual(outs[0], "DYNAMIC-READ-OK\n")
+ self.assertEqual(outs[0], outs[1])
+ self.assertEqual(outs[0], outs[2])
+
+ def test_write_dynamic_three_backend_parity_and_effect(self):
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.write(p, \"CONTENT-XYZ\")\n"
+ " Ok(u) -> stdio.println(\"WROTE\")\n"
+ " Err(e) -> stdio.println(\"ERR\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ results = {}
+ effects = {}
+ for be in ("wasi", "py", "host"):
+ d = self._fresh_dir(be)
+ if be == "wasi":
+ results[be] = _run_wasi_dynamic_fs(src, d, args=("o.txt",))
+ elif be == "py":
+ results[be] = _run_python_in_cwd(src, d, args=("o.txt",))
+ else:
+ results[be] = _run_capa_host_in_cwd(src, d, args=("o.txt",))
+ with open(os.path.join(d, "o.txt")) as f:
+ effects[be] = f.read()
+ self.assertEqual(results["wasi"], "WROTE\n")
+ self.assertEqual(results["wasi"], results["py"])
+ self.assertEqual(results["wasi"], results["host"])
+ self.assertEqual(effects["wasi"], "CONTENT-XYZ")
+ self.assertEqual(effects["wasi"], effects["py"])
+ self.assertEqual(effects["wasi"], effects["host"])
+
+ def test_exists_is_dir_dynamic_parity(self):
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " stdio.println(\"e=${fs.exists(p)}\")\n"
+ " stdio.println(\"d=${fs.is_dir(p)}\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ for arg, mk in (("there.txt", "file"), ("adir", "dir"),
+ ("nope", None)):
+ results = {}
+ for be in ("wasi", "py", "host"):
+ d = self._fresh_dir(f"{be}-{arg}")
+ with open(os.path.join(d, "there.txt"), "w") as f:
+ f.write("x")
+ os.makedirs(os.path.join(d, "adir"))
+ if be == "wasi":
+ results[be] = _run_wasi_dynamic_fs(src, d, args=(arg,))
+ elif be == "py":
+ results[be] = _run_python_in_cwd(src, d, args=(arg,))
+ else:
+ results[be] = _run_capa_host_in_cwd(src, d, args=(arg,))
+ self.assertEqual(results["wasi"], results["py"], arg)
+ self.assertEqual(results["wasi"], results["host"], arg)
+
+ def test_mkdir_dynamic_parity_and_effect(self):
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.mkdir(p)\n"
+ " Ok(u) -> stdio.println(\"MK=ok\")\n"
+ " Err(e) -> stdio.println(\"MK=err\")\n"
+ " stdio.println(\"d=${fs.is_dir(p)}\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ results = {}
+ for be in ("wasi", "py", "host"):
+ d = self._fresh_dir(be)
+ if be == "wasi":
+ results[be] = _run_wasi_dynamic_fs(src, d, args=("newdir",))
+ elif be == "py":
+ results[be] = _run_python_in_cwd(src, d, args=("newdir",))
+ else:
+ results[be] = _run_capa_host_in_cwd(src, d, args=("newdir",))
+ self.assertTrue(os.path.isdir(os.path.join(d, "newdir")), be)
+ self.assertEqual(results["wasi"], results["py"])
+ self.assertEqual(results["wasi"], results["host"])
+
+ def test_mkdir_dynamic_multi_segment_parity_and_effect(self):
+ # A MULTI-segment dynamic mkdir replicates os.makedirs(exist_ok)
+ # at runtime ($Fs_mkdir_recursive over $Fs_mkdir per prefix), so a
+ # missing-parent tree is created and the Result matches the oracle.
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.mkdir(p)\n"
+ " Ok(u) -> stdio.println(\"MK=ok\")\n"
+ " Err(e) -> stdio.println(\"MK=err\")\n"
+ " stdio.println(\"d=${fs.is_dir(p)}\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ results = {}
+ for be in ("wasi", "py", "host"):
+ d = self._fresh_dir(be)
+ if be == "wasi":
+ results[be] = _run_wasi_dynamic_fs(src, d, args=("a/b/c",))
+ elif be == "py":
+ results[be] = _run_python_in_cwd(src, d, args=("a/b/c",))
+ else:
+ results[be] = _run_capa_host_in_cwd(src, d, args=("a/b/c",))
+ self.assertTrue(os.path.isdir(os.path.join(d, "a", "b", "c")), be)
+ self.assertEqual(results["wasi"], "MK=ok\nd=true\n")
+ self.assertEqual(results["wasi"], results["py"])
+ self.assertEqual(results["wasi"], results["host"])
+
+ def test_list_dir_dynamic_parity(self):
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.list_dir(p)\n"
+ " Ok(names) ->\n"
+ " for n in names\n"
+ " stdio.println(n)\n"
+ " Err(e) -> stdio.println(\"ERR\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ results = {}
+ for be in ("wasi", "py", "host"):
+ d = self._fresh_dir(be)
+ sub = os.path.join(d, "ld")
+ os.makedirs(sub)
+ for nm in ("b.txt", "a.txt", "c.txt"):
+ with open(os.path.join(sub, nm), "w") as f:
+ f.write("")
+ if be == "wasi":
+ results[be] = _run_wasi_dynamic_fs(src, d, args=("ld",))
+ elif be == "py":
+ results[be] = _run_python_in_cwd(src, d, args=("ld",))
+ else:
+ results[be] = _run_capa_host_in_cwd(src, d, args=("ld",))
+ self.assertEqual(results["wasi"], "a.txt\nb.txt\nc.txt\n")
+ self.assertEqual(results["wasi"], results["py"])
+ self.assertEqual(results["wasi"], results["host"])
+
+ def test_restricted_fs_plus_dynamic_path_mitigation(self):
+ # The fine attenuation gate ($Fs_path_allowed) still works with a
+ # DYNAMIC path: a restrict_to'd Fs denies a runtime path outside
+ # the prefix and admits one inside, byte-parity with the oracle.
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let r = fs.restrict_to(\"allowed\")\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) -> stdio.println(\"${r.exists(p)}\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ for arg, expect in (("allowed/ok.txt", "true\n"),
+ ("secret.txt", "false\n")):
+ results = {}
+ for be in ("wasi", "py", "host"):
+ d = self._fresh_dir(f"{be}-r")
+ os.makedirs(os.path.join(d, "allowed"))
+ with open(os.path.join(d, "allowed", "ok.txt"), "w") as f:
+ f.write("INSIDE")
+ with open(os.path.join(d, "secret.txt"), "w") as f:
+ f.write("SECRET")
+ if be == "wasi":
+ results[be] = _run_wasi_dynamic_fs(src, d, args=(arg,))
+ elif be == "py":
+ results[be] = _run_python_in_cwd(src, d, args=(arg,))
+ else:
+ results[be] = _run_capa_host_in_cwd(src, d, args=(arg,))
+ self.assertEqual(results["wasi"], expect, arg)
+ self.assertEqual(results["wasi"], results["py"], arg)
+ self.assertEqual(results["wasi"], results["host"], arg)
+
+ def test_restricted_fs_dynamic_path_dotdot_normalized(self):
+ # CRITICAL parity: a DYNAMIC path with '.' / '..' must be LEXICALLY
+ # normalised before the fine-attenuation containment check, so a
+ # path that escapes the restrict_to subtree via '..' is DENIED
+ # (matching the realpath oracle), and one that stays inside after
+ # normalisation is ADMITTED. Without normalisation the lexical
+ # prefix "sub/" would match "sub/../secret.txt" and LEAK a sibling.
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let r = fs.restrict_to(\"sub\")\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match r.read(p)\n"
+ " Ok(c) -> stdio.println(\"READ:${c}\")\n"
+ " Err(e) -> stdio.println(\"DENIED\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ # (arg, expected). The oracle (os.path.realpath + is_relative_to)
+ # produces exactly this table; the WASI guest must match it.
+ table = [
+ ("sub/ok.txt", "READ:SUB-OK\n"), # inside -> read
+ ("sub/../secret.txt", "DENIED\n"), # escapes -> denied
+ ("sub/../sub2/x.txt", "DENIED\n"), # escapes -> denied
+ ("secret.txt", "DENIED\n"), # outside -> denied
+ ("sub/../sub/ok.txt", "READ:SUB-OK\n"), # normalises inside
+ ("sub/./ok.txt", "READ:SUB-OK\n"), # '.' inside
+ ]
+ for arg, expect in table:
+ results = {}
+ for be in ("wasi", "py", "host"):
+ d = self._fresh_dir(f"{be}-dd")
+ os.makedirs(os.path.join(d, "sub"))
+ os.makedirs(os.path.join(d, "sub2"))
+ with open(os.path.join(d, "sub", "ok.txt"), "w") as f:
+ f.write("SUB-OK")
+ with open(os.path.join(d, "secret.txt"), "w") as f:
+ f.write("TOP-SECRET")
+ with open(os.path.join(d, "sub2", "x.txt"), "w") as f:
+ f.write("SIBLING")
+ if be == "wasi":
+ results[be] = _run_wasi_dynamic_fs(src, d, args=(arg,))
+ elif be == "py":
+ results[be] = _run_python_in_cwd(src, d, args=(arg,))
+ else:
+ results[be] = _run_capa_host_in_cwd(src, d, args=(arg,))
+ self.assertEqual(results["wasi"], expect, arg)
+ self.assertEqual(results["wasi"], results["py"], arg)
+ self.assertEqual(results["wasi"], results["host"], arg)
+
+ def test_dynamic_dotdot_confined_to_preopen_when_unrestricted(self):
+ # LEVEL-1 confinement is NOT regressed: an UNRESTRICTED Fs (handle
+ # 0, no restrict_to) with a dynamic '..' path that tries to escape
+ # the operator preopen is denied by WASMTIME (the preopen ceiling),
+ # not by the guest gate. A decoy file sits OUTSIDE the preopen.
+ src = (
+ "fun main(fs: Fs, env: Env, stdio: Stdio)\n"
+ " let args = env.args()\n"
+ " match args.get(0)\n"
+ " Some(p) ->\n"
+ " match fs.read(p)\n"
+ " Ok(c) -> stdio.println(\"READ:${c}\")\n"
+ " Err(e) -> stdio.println(\"DENIED\")\n"
+ " None -> stdio.println(\"NOARG\")\n"
+ )
+ outer = self._fresh_dir("confine")
+ preopen = os.path.join(outer, "preopen")
+ os.makedirs(preopen)
+ with open(os.path.join(outer, "decoy.txt"), "w") as f:
+ f.write("OUTSIDE-DECOY")
+ with open(os.path.join(preopen, "in.txt"), "w") as f:
+ f.write("INSIDE-OK")
+ # An escape attempt -> wasmtime denies (Err), not the decoy leaked.
+ esc = _run_wasi_dynamic_fs(src, preopen, args=("../decoy.txt",))
+ self.assertEqual(esc, "DENIED\n")
+ # The in-preopen read still works.
+ ok = _run_wasi_dynamic_fs(src, preopen, args=("in.txt",))
+ self.assertEqual(ok, "READ:INSIDE-OK\n")
+
+ def test_operator_preopen_registered_at_index_zero(self):
+ # The host installs exactly the operator preopen (index 0) for a
+ # dynamic-path program (no derived ceiling).
+ from capa.runtime._wasm_component_host import WasmComponentHost
+ d = self._fresh_dir("idx")
+ host = WasmComponentHost(
+ wasi=True, fs_operator_preopen=(d, True),
+ )
+ self.assertEqual(host._wasi_fs_applied, [(d, "operator-rw")])
+
+
@unittest.skipUnless(
_has_wasm_tools() and _has_wasmtime_wasip2(),
"wasm-tools and/or wasmtime-py with WASI P2 not installed",