From feec49e512dd535049086b44944c5cc71b4faf94 Mon Sep 17 00:00:00 2001 From: nelsoduarte Date: Tue, 30 Jun 2026 17:08:28 +0100 Subject: [PATCH 1/2] feat(wasi): --preopen flag unblocks dynamic Fs paths (operator grant, layer b1) Add a `--preopen [:ro|:rw]` flag to the experimental `--wasi` mode that lets the operator declare filesystem authority over a single directory, unblocking DYNAMIC (non-literal) Fs paths the compiler cannot derive a preopen ceiling for. The dynamic path resolves at RUNTIME relative to the operator preopen (the WASI `--dir` model). Framed honestly as a Level-2 operator-declared grant, distinct from the compiler-derived surface, and recorded as such in the SBOM. Emitter: - A new dynamic-path branch in the Fs call-site emitters (read / write / exists / is_dir / mkdir / list_dir): pushes the runtime path (ptr,len) as both the full path (for the fine-attenuation gate) and the relative path, plus the operator preopen index. Reuses the existing guest wrappers verbatim; the only new WAT is a runtime recursive mkdir sequencer ($Fs_mkdir_recursive) that walks a dynamic path's segments and calls the single-segment $Fs_mkdir per cumulative prefix, preserving os.makedirs(exist_ok=True) parity that the literal path gets via compile-time prefix unrolling. - `_validate_wasi_caps` suppresses the dynamic-path rejection only when an operator preopen is declared; without it the rejection stands unchanged. Index rule (emitter <-> host agreement): the host registers the operator preopen AFTER every derived ceiling preopen, so its index is the number of derived preopens. A dynamic path forces a not-closed ceiling (no derived preopens), so the operator preopen is index 0, the constant the dynamic call site addresses. Host: register the operator preopen (host_dir, perms) after the derived ceiling preopens, exposed via `_wasi_fs_applied`. CLI: `--preopen` (gated to --wasi or an SBOM command; repeatable but b1 rejects more than one for dynamic-path resolution). Threads the grant to the emitter (suppress rejection) and the host (register), and into the manifest / SBOM. SBOM: a new top-level `operator_declared_grants` block (manifest, CycloneDX, SPDX), labelled operator-declared (Level 2), distinct from the derived surface so a regulator never reads it as program-proven. Parity: read / write / exists / is_dir / mkdir (single + multi-segment) / list_dir over a dynamic path are byte-for-byte identical across the Python, capa:host and WASI backends; the guest-side fine attenuation (restrict_to / allows) still gates the dynamic path lexically. Without --preopen the dynamic path still rejects at compile time; literal paths still resolve via the derived ceiling. Full suite: 3459 tests green. --- CHANGELOG.md | 28 ++ capa/cli.py | 120 ++++++ capa/ir/__init__.py | 6 + capa/ir/_emit_wasm/__init__.py | 45 +++ capa/ir/_emit_wasm/_caps.py | 177 ++++++++- capa/ir/_emit_wasm/_wasi.py | 111 +++++- capa/manifest/__init__.py | 6 +- capa/manifest/_cyclonedx.py | 24 ++ capa/manifest/_funrec.py | 49 +++ capa/manifest/_spdx.py | 17 + capa/runtime/_wasm_component_host.py | 62 ++- tests/test_manifest.py | 4 + tests/test_wasi_mode.py | 543 +++++++++++++++++++++++++++ 13 files changed, 1186 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7ab498..67c1994 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,34 @@ breaking changes and the discipline is still being shaped. ## [Unreleased] +**Added.** + +- *A `--preopen [:ro|:rw]` flag for the experimental `--wasi` mode + unblocks DYNAMIC (non-literal) `Fs` paths.* Until now a `Fs` path that + the compiler cannot prove is a string literal (one taken from a + parameter, `env.args()`, or any computed value) was REJECTED at compile + time under `--wasi`, because no static preopen ceiling could be derived + for it. `--preopen` lets the OPERATOR explicitly declare filesystem + authority over a single directory; the compiler then admits the dynamic + path and the guest resolves it AT RUNTIME relative to that directory + (the WASI `--dir` model, as in wasmtime). This is framed honestly as a + LEVEL-2 operator-DECLARED grant (analogous to `inherit_env`), NOT + program-proven authority: the compiler could not derive it, which is + precisely why the operator had to declare it. The grant is recorded in + the SBOM (manifest, CycloneDX, SPDX) under a dedicated + `operator_declared_grants` block, clearly labelled `operator-declared` + and kept DISTINCT from the compiler-derived capability surface so a + regulator never reads it as program-proven. Read / write / exists / + is_dir / mkdir / list_dir all work with a dynamic path under + `--preopen`, with byte-for-byte parity across the Python, `capa:host` + and WASI backends, and the guest-side fine attenuation (`restrict_to` / + `allows`) still gates the dynamic path lexically. WITHOUT `--preopen`, + a dynamic `Fs` path continues to be rejected at compile time exactly as + before (no regression); literal paths continue to resolve via the + compiler-derived ceiling. This increment supports a SINGLE `--preopen` + for dynamic-path resolution; passing more than one is rejected with a + clear message. + **Changed.** - *In the experimental `--wasi` mode, a dynamic (non-literal) URL passed diff --git a/capa/cli.py b/capa/cli.py index e89ef2f..41bc7dd 100644 --- a/capa/cli.py +++ b/capa/cli.py @@ -24,6 +24,7 @@ from capa import __version__ as _CAPA_VERSION from capa.manifest import ( build_manifest, build_cyclonedx, build_spdx, + build_operator_declared_grants, build_vex_document, build_provenance, resolve_build_timestamp, SourceDateEpochError, ) @@ -1062,6 +1063,23 @@ def _main_dispatch() -> int: "The default capa:host path is unaffected." ), ) + parser.add_argument( + "--preopen", + action="append", + default=None, + metavar="[:ro|:rw]", + help=( + "with --wasi, grant the component filesystem authority over " + " as an OPERATOR-DECLARED preopen (Level 2, the WASI " + "--dir model), unblocking DYNAMIC (non-literal) Fs paths that " + "the compiler cannot derive a preopen for. The path is " + "resolved at runtime relative to . Append ':ro' for " + "read-only or ':rw' for read-write (default: rw). Recorded in " + "the SBOM as a declared grant, distinct from the " + "compiler-derived capability surface. This increment (b1) " + "supports a SINGLE --preopen for dynamic paths." + ), + ) parser.add_argument( "--wasm-memory-cap", type=int, @@ -1276,11 +1294,18 @@ def _main_dispatch() -> int: else: print(msg, file=sys.stderr) return 1 + # WASI Fs layer b1: the operator-declared grant block (--preopen), + # surfaced in the manifest / CycloneDX / SPDX as Level-2 + # operator-declared authority, distinct from the derived surface. + _operator_grants = _operator_grants_from_args( + getattr(args, "preopen", None) + ) if args.manifest: import json manifest = build_manifest( module, filename=filename, expr_labels=result.expr_labels, + operator_declared_grants=_operator_grants, ) emit_artifact(json.dumps(manifest, indent=2)) return 0 @@ -1308,6 +1333,7 @@ def _main_dispatch() -> int: sources=linked.sources if linked is not None else None, timestamp=build_ts, expr_labels=result.expr_labels, + operator_declared_grants=_operator_grants, ) emit_artifact(json.dumps(sbom, indent=2)) return 0 @@ -1318,6 +1344,7 @@ def _main_dispatch() -> int: sources=linked.sources if linked is not None else None, timestamp=build_ts, expr_labels=result.expr_labels, + operator_declared_grants=_operator_grants, ) emit_artifact(json.dumps(sbom, indent=2)) return 0 @@ -1415,6 +1442,31 @@ def _main_dispatch() -> int: print(msg, file=sys.stderr) return 1 + # ``--preopen`` (layer b1) is meaningful in --wasi mode (the + # operator-declared filesystem grant that unblocks dynamic Fs paths) + # AND when emitting an SBOM / manifest (it records the same grant as + # operator-declared authority, distinct from the derived surface). + # Reject it on any OTHER invocation with an actionable message rather + # than silently ignore it. + _emitting_sbom = bool( + getattr(args, "manifest", False) or getattr(args, "cyclonedx", False) + or getattr(args, "spdx", False) + ) + if (getattr(args, "preopen", None) + and not bool(getattr(args, "wasi", False)) + and not _emitting_sbom): + msg = ( + "capa: --preopen requires --wasi (or an SBOM / --manifest " + "command): it is the operator-declared filesystem grant for " + "the WASI mode, recorded in the SBOM; it has no effect on the " + "default execution backend" + ) + if use_color: + print(f"{C.RED}{msg}{C.RESET}", file=sys.stderr) + else: + print(msg, file=sys.stderr) + return 1 + if ( args.run and not args.wasm and prefer_wasm and _wasm_tooling_available() @@ -1463,6 +1515,29 @@ def _main_dispatch() -> int: else: print(msg, file=sys.stderr) return 1 + # WASI Fs layer b1: parse the operator ``--preopen``. b1 supports a + # SINGLE preopen for dynamic-path resolution; reject more than one + # with a clear message rather than silently picking one. The + # presence of a preopen is the signal (``wasi_dynamic_fs``) that + # suppresses the compiler's dynamic-Fs-path rejection, and the + # parsed ``(host_dir, read_write)`` is the host grant. + fs_operator_preopen = None + wasi_dynamic_fs = False + preopen_specs = getattr(args, "preopen", None) or [] + if preopen_specs: + if len(preopen_specs) > 1: + msg = ( + "capa: --preopen: this increment (b1) supports a " + "single --preopen for dynamic Fs paths; got " + f"{len(preopen_specs)}" + ) + if use_color: + print(f"{C.RED}{msg}{C.RESET}", file=sys.stderr) + else: + print(msg, file=sys.stderr) + return 1 + fs_operator_preopen = _parse_preopen_spec(preopen_specs[0]) + wasi_dynamic_fs = True if result is None: result = analyze(module, source=source, filename=filename) try: @@ -1472,6 +1547,7 @@ def _main_dispatch() -> int: memory_cap_pages=wasm_memory_cap, filename=filename, wasi=wasi_mode, + wasi_dynamic_fs=wasi_dynamic_fs, ) print(wat) return 0 @@ -1480,6 +1556,7 @@ def _main_dispatch() -> int: memory_cap_pages=wasm_memory_cap, filename=filename, wasi=wasi_mode, + wasi_dynamic_fs=wasi_dynamic_fs, ) except Exception as e: msg = f"capa: --wasm: {e}" @@ -1589,6 +1666,7 @@ def _main_dispatch() -> int: wasi=wasi_mode, env_ceiling=env_ceiling, fs_ceiling=fs_ceiling, + fs_operator_preopen=fs_operator_preopen, net_ceiling=net_ceiling, ) host.run_main(component_blob) @@ -1763,6 +1841,48 @@ def _main_dispatch() -> int: return 0 +def _parse_preopen_spec(spec: str) -> tuple[str, bool]: + """Parse one ``--preopen`` value ``[:ro|:rw]`` into + ``(host_dir, read_write)``. + + The default permission is READ_WRITE (``rw``), the WASI ``--dir`` + default; an explicit ``:ro`` suffix makes it READ_ONLY and ``:rw`` is + READ_WRITE. Only a trailing ``:ro`` / ``:rw`` is treated as a + permission suffix, so a directory name that itself contains a colon + (or a Windows drive ``C:\\...``) is preserved -- the split is on the + LAST ``:`` and only when the tail is exactly ``ro`` / ``rw``.""" + read_write = True + host_dir = spec + if ":" in spec: + head, _, tail = spec.rpartition(":") + if tail in ("ro", "rw") and head: + host_dir = head + read_write = tail == "rw" + return (host_dir, read_write) + + +def _operator_grants_from_args(preopen_specs) -> dict | None: + """Build the SBOM ``operator_declared_grants`` block from the + ``--preopen`` specs, or None when none were declared. + + Each spec ``[:ro|:rw]`` becomes a preopen entry; the block is + honestly labelled operator-declared (Level 2) by + :func:`capa.manifest.build_operator_declared_grants`, distinct from + the compiler-derived surface.""" + specs = preopen_specs or [] + if not specs: + return None + preopens = [] + for spec in specs: + host_dir, read_write = _parse_preopen_spec(spec) + preopens.append({ + "kind": "fs", + "host_dir": host_dir, + "permission": "rw" if read_write else "ro", + }) + return build_operator_declared_grants(preopens) + + def _wrap_as_component( core_wasm: bytes, wit_text: str, *, wasi: bool = False, ) -> bytes: diff --git a/capa/ir/__init__.py b/capa/ir/__init__.py index adee5ea..15f6654 100644 --- a/capa/ir/__init__.py +++ b/capa/ir/__init__.py @@ -152,6 +152,7 @@ def emit_wat( memory_cap_pages: int | None = ..., # type: ignore[assignment] manifest_json: str | None = None, wasi: bool = False, + wasi_dynamic_fs: bool = False, ) -> str: """Emit WebAssembly text format (WAT) from a CIR module. @@ -178,6 +179,7 @@ def emit_wat( memory_cap_pages=memory_cap_pages, manifest_json=manifest_json, wasi=wasi, + wasi_dynamic_fs=wasi_dynamic_fs, ).emit(ir_module) @@ -189,6 +191,7 @@ def compile_wat( filename: str = "", embed_manifest: bool = True, wasi: bool = False, + wasi_dynamic_fs: bool = False, ) -> str: """End-to-end AST -> CIR -> WAT convenience helper. Mirrors :func:`compile` but targets the Wasm Component Model text form @@ -244,6 +247,7 @@ def compile_wat( memory_cap_pages=memory_cap_pages, manifest_json=manifest_json, wasi=wasi, + wasi_dynamic_fs=wasi_dynamic_fs, ) @@ -329,6 +333,7 @@ def compile_wasm( filename: str = "", embed_manifest: bool = True, wasi: bool = False, + wasi_dynamic_fs: bool = False, ) -> bytes: """End-to-end AST -> CIR -> WAT -> binary Wasm assembly. @@ -350,6 +355,7 @@ def compile_wasm( filename=filename, embed_manifest=embed_manifest, wasi=wasi, + wasi_dynamic_fs=wasi_dynamic_fs, ) proc = subprocess.run( [wasm_tools_path, "parse", "-"], diff --git a/capa/ir/_emit_wasm/__init__.py b/capa/ir/_emit_wasm/__init__.py index c61664b..e460549 100644 --- a/capa/ir/_emit_wasm/__init__.py +++ b/capa/ir/_emit_wasm/__init__.py @@ -201,6 +201,7 @@ def __init__( memory_cap_pages: Optional[int] = MEMORY_CAP_DEFAULT_PAGES, manifest_json: Optional[str] = None, wasi: bool = False, + wasi_dynamic_fs: bool = False, ): # Experimental opt-in (2026-06-27): when True, Random.system_seed # and Clock.now_secs / now_monotonic import canonical WASI @@ -211,6 +212,29 @@ def __init__( # untouched all-``capa:host`` behaviour. See # ``docs/design/wasi_mode.md``. self._wasi: bool = wasi + # WASI Fs layer b1 (operator preopen, 2026-06-30): True when the + # operator declared ``--preopen `` for this run, granting the + # component filesystem authority over that directory and so + # UNBLOCKING dynamic (non-literal) Fs paths under ``--wasi``. A + # dynamic path is resolved at RUNTIME relative to the single + # operator preopen (the WASI ``--dir`` model, wasmtime's + # convention), framed honestly as a LEVEL-2 operator-DECLARED + # grant (see ``docs/design/wasi-attenuation.md``), distinct from + # the COMPILER-DERIVED preopen ceiling. When False (the default), + # a dynamic Fs path is REJECTED at compile time exactly as before + # -- this flag is the ONLY thing that suppresses that rejection. + # + # b1 INDEX RULE (emitter <-> host agreement): the operator preopen + # is the LAST preopen the host registers, AFTER every + # compiler-derived ceiling preopen, so it never shifts an existing + # literal call site's index. In the dynamic case the derived + # ceiling is NOT closed and so contributes NO preopens, leaving + # the operator preopen at index 0; the dynamic call-site emitter + # therefore addresses it with the constant + # ``_wasi_operator_preopen_index`` (0 whenever the ceiling is open, + # i.e. exactly the dynamic case). The host computes the same index + # (len(derived preopens)) so the two never disagree. + self._wasi_dynamic_fs: bool = wasi_dynamic_fs self._lines: List[str] = [] self._indent = 0 self._unit = indent_unit @@ -311,6 +335,27 @@ def __init__( # chain's result areas), 0 when Net.get is not used. self._wasi_net_scratch_offset = 0 + # ----- WASI operator-preopen (layer b1) ---------------------- + + def _wasi_operator_preopen_index(self) -> int: + """The preopen INDEX the operator ``--preopen`` directory occupies + on the host, for the dynamic-Fs-path call-site emitter to address. + + b1 index rule: the host registers the operator preopen AFTER every + compiler-derived ceiling preopen, so its index is the number of + derived preopens. A dynamic Fs path (the only thing that reaches + the operator preopen) requires a NOT-CLOSED ceiling, which + contributes NO derived preopens, so this is 0 in the dynamic case. + For a fully-literal program (closed ceiling) the operator preopen + sits at ``len(ceiling.preopens)`` and is unused by the guest (no + dynamic call site), but still registered + recorded for honesty; + the constant returned here matches the host's registration order + either way.""" + ceiling = self._fs_ceiling + if ceiling is None or not getattr(ceiling, "closed", False): + return 0 + return len(ceiling.preopens) + # ----- public ------------------------------------------------ def emit(self, module: Module) -> str: diff --git a/capa/ir/_emit_wasm/_caps.py b/capa/ir/_emit_wasm/_caps.py index 23d5e9c..0f61628 100644 --- a/capa/ir/_emit_wasm/_caps.py +++ b/capa/ir/_emit_wasm/_caps.py @@ -647,9 +647,15 @@ def _emit_wasi_fs_metadata_call( ) arg = instr.args[0] if arg.kind != "lit_str" or not isinstance(arg.literal, str): - # Defensive: the ceiling fail-closed check should have - # rejected this already, but never emit a wrapper call with - # an unresolved path. + # WASI Fs layer b1 (operator preopen): a DYNAMIC path is + # admitted when the operator declared ``--preopen`` -- it is + # resolved at RUNTIME relative to the single operator preopen. + if self._wasi_dynamic_fs: + self._emit_wasi_fs_dynamic_metadata_call(instr, method) + return + # Defensive: without an operator preopen the ceiling + # fail-closed check should have rejected this already, but + # never emit a wrapper call with an unresolved path. raise WasmEmissionError( f"Fs.{method} in WASI mode requires a string-literal " f"path (the preopen ceiling must be closed)" @@ -753,6 +759,11 @@ def _emit_wasi_fs_read_call(self, instr: MethodCall) -> None: ) arg = instr.args[0] if arg.kind != "lit_str" or not isinstance(arg.literal, str): + # WASI Fs layer b1 (operator preopen): a DYNAMIC path resolves + # at runtime relative to the single operator ``--preopen`` dir. + if self._wasi_dynamic_fs: + self._emit_wasi_fs_dynamic_read_call(instr) + return raise WasmEmissionError( "Fs.read in WASI mode requires a string-literal path " "(the preopen ceiling must be closed)" @@ -813,6 +824,11 @@ def _emit_wasi_fs_write_call(self, instr: MethodCall) -> None: ) arg = instr.args[0] if arg.kind != "lit_str" or not isinstance(arg.literal, str): + # WASI Fs layer b1 (operator preopen): a DYNAMIC path resolves + # at runtime relative to the single operator ``--preopen`` dir. + if self._wasi_dynamic_fs: + self._emit_wasi_fs_dynamic_write_call(instr) + return raise WasmEmissionError( "Fs.write in WASI mode requires a string-literal path " "(the preopen ceiling must be closed)" @@ -871,6 +887,11 @@ def _emit_wasi_fs_list_dir_call(self, instr: MethodCall) -> None: ) arg = instr.args[0] if arg.kind != "lit_str" or not isinstance(arg.literal, str): + # WASI Fs layer b1 (operator preopen): a DYNAMIC path resolves + # at runtime relative to the single operator ``--preopen`` dir. + if self._wasi_dynamic_fs: + self._emit_wasi_fs_dynamic_list_dir_call(instr) + return raise WasmEmissionError( "Fs.list_dir in WASI mode requires a string-literal path " "(the preopen ceiling must be closed)" @@ -900,6 +921,156 @@ def _emit_wasi_fs_list_dir_call(self, instr: MethodCall) -> None: "result_list_string_io_error", instr.dst, ) + # ---- WASI Fs layer b1: DYNAMIC path call sites -------------- + # + # A dynamic (non-literal) Fs path is admitted ONLY when the operator + # declared ``--preopen `` (``self._wasi_dynamic_fs``). The path + # is NOT resolvable at compile time, so the call site addresses the + # single operator preopen (``_wasi_operator_preopen_index()``) and + # hands the path's runtime ``(ptr, len)`` to the wrapper as BOTH the + # FULL path (for the guest-side fail-closed attenuation gate + # ``$Fs_path_allowed``, against which a restricted Fs's prefixes are + # compared) AND the RELATIVE path (wasmtime resolves it relative to + # the operator preopen descriptor). The wrappers are UNCHANGED -- only + # the operands differ (runtime ``(ptr, len)`` + ``idx`` const, the + # Fs.write content-arg push pattern), so there is ZERO new WAT here. + + def _push_wasi_dynamic_fs_path(self, arg) -> None: + """Push a DYNAMIC Fs path argument's ``(ptr, len)`` for a layer-b1 + call site. The path must be a String LOCAL or PARAM (the lowerer + flattens an Fs path argument to a local / param before the + MethodCall, so a side-effecting re-evaluation never occurs); a + bare ``(local.get _ptr; local.get _len)`` pair is emitted, safe to + repeat for the full + relative operands. Anything else is a shape + the b1 increment does not handle and is rejected loudly.""" + is_string_local = ( + arg.kind == "local" and self._is_string_local(arg.name) + ) + is_string_param = ( + arg.kind == "param" and self._param_is_string(arg.name) + ) + if not (is_string_local or is_string_param): + raise WasmEmissionError( + "Fs dynamic path under --preopen must be a String local " + f"or param (b1), got {arg.kind!r}" + ) + self._push_string_arg(arg) + + def _emit_wasi_fs_dynamic_metadata_call( + self, instr: MethodCall, method: str, + ) -> None: + """Dynamic-path ``fs.exists / is_dir / mkdir`` under ``--preopen``. + + Mirrors ``_emit_wasi_fs_metadata_call`` but with the operator + preopen index + the runtime path (ptr, len). ``mkdir`` keeps full + ``os.makedirs(exist_ok=True)`` parity: the dynamic relative path + may be multi-segment and its segments are not known at compile + time, so it routes to the runtime recursive sequencer + ``$Fs_mkdir_recursive`` (which walks the path's ``/`` boundaries + and calls the single-segment ``$Fs_mkdir`` per cumulative prefix, + short-circuiting on a genuine Err) -- byte-parity with the literal + path's compile-time prefix unrolling and with the oracle.""" + if len(instr.args) != 1: + raise WasmEmissionError( + f"Fs.{method} expected 1 arg, got {len(instr.args)}" + ) + arg = instr.args[0] + idx = self._wasi_operator_preopen_index() + if method == "mkdir": + self._write("i32.const 20") + self._write("call $alloc") + self._write("local.set $_ret_area") + self._push_fs_handle(instr.receiver) + self._push_wasi_dynamic_fs_path(arg) # full (ptr, len) + self._write(f"i32.const {idx}") + self._push_wasi_dynamic_fs_path(arg) # rel (ptr, len) == full + self._write("local.get $_ret_area") + self._write("call $Fs_mkdir_recursive") + self._emit_cap_indirect_materialise( + "result_unit_io_error", instr.dst, + ) + return + self._push_fs_handle(instr.receiver) + self._push_wasi_dynamic_fs_path(arg) # full (ptr, len) + self._write(f"i32.const {idx}") + self._push_wasi_dynamic_fs_path(arg) # rel (ptr, len) == full + self._write(f"call $Fs_{method}") + if instr.dst is not None: + self._write(f"local.set ${instr.dst}") + + def _emit_wasi_fs_dynamic_read_call(self, instr: MethodCall) -> None: + """Dynamic-path ``fs.read`` under ``--preopen``. Mirrors + ``_emit_wasi_fs_read_call`` with the operator preopen index + the + runtime path (ptr, len).""" + if len(instr.args) != 1: + raise WasmEmissionError( + f"Fs.read expected 1 arg, got {len(instr.args)}" + ) + arg = instr.args[0] + idx = self._wasi_operator_preopen_index() + self._write("i32.const 20") + self._write("call $alloc") + self._write("local.set $_ret_area") + self._push_fs_handle(instr.receiver) + self._push_wasi_dynamic_fs_path(arg) + self._write(f"i32.const {idx}") + self._push_wasi_dynamic_fs_path(arg) + self._write("local.get $_ret_area") + self._write("call $Fs_read") + self._emit_cap_indirect_materialise( + "result_string_io_error", instr.dst, + ) + + def _emit_wasi_fs_dynamic_write_call(self, instr: MethodCall) -> None: + """Dynamic-path ``fs.write(path, content)`` under ``--preopen``. + The PATH (arg[0]) is the dynamic runtime (ptr, len); the CONTENT + (arg[1]) is any String pushed the usual way (its bytes already + live in linear memory). Mirrors ``_emit_wasi_fs_write_call``.""" + if len(instr.args) != 2: + raise WasmEmissionError( + f"Fs.write expected 2 args (path, content), got " + f"{len(instr.args)}" + ) + arg = instr.args[0] + idx = self._wasi_operator_preopen_index() + self._write("i32.const 20") + self._write("call $alloc") + self._write("local.set $_ret_area") + self._push_fs_handle(instr.receiver) + self._push_wasi_dynamic_fs_path(arg) + self._write(f"i32.const {idx}") + self._push_wasi_dynamic_fs_path(arg) + # content (ptr, len) - already in linear memory. + self._push_string_arg(instr.args[1]) + self._write("local.get $_ret_area") + self._write("call $Fs_write") + self._emit_cap_indirect_materialise( + "result_unit_io_error", instr.dst, + ) + + def _emit_wasi_fs_dynamic_list_dir_call(self, instr: MethodCall) -> None: + """Dynamic-path ``fs.list_dir`` under ``--preopen``. Mirrors + ``_emit_wasi_fs_list_dir_call`` with the operator preopen index + + the runtime path (ptr, len).""" + if len(instr.args) != 1: + raise WasmEmissionError( + f"Fs.list_dir expected 1 arg, got {len(instr.args)}" + ) + arg = instr.args[0] + idx = self._wasi_operator_preopen_index() + self._write("i32.const 20") + self._write("call $alloc") + self._write("local.set $_ret_area") + self._push_fs_handle(instr.receiver) + self._push_wasi_dynamic_fs_path(arg) + self._write(f"i32.const {idx}") + self._push_wasi_dynamic_fs_path(arg) + self._write("local.get $_ret_area") + self._write("call $Fs_list_dir") + self._emit_cap_indirect_materialise( + "result_list_string_io_error", instr.dst, + ) + # ---- slice 25.3 Net handle-passing helpers ----------------- def _push_net_handle(self, recv) -> None: diff --git a/capa/ir/_emit_wasm/_wasi.py b/capa/ir/_emit_wasm/_wasi.py index 020471b..1312068 100644 --- a/capa/ir/_emit_wasm/_wasi.py +++ b/capa/ir/_emit_wasm/_wasi.py @@ -401,10 +401,21 @@ def _validate_wasi_caps(self) -> None: # preopen to address and the wrapper cannot run. Reject at # compile time with a clear message rather than emit code that # always denies at runtime. + # + # WASI Fs layer b1 (operator preopen, 2026-06-30): when the + # operator declared ``--preopen `` (``self._wasi_dynamic_fs``), + # the dynamic path is RESOLVED AT RUNTIME relative to that single + # operator preopen (the WASI ``--dir`` model). The rejection is + # SUPPRESSED -- the operator has explicitly granted the authority + # the compiler could not derive, a LEVEL-2 operator-DECLARED grant + # (recorded in the SBOM, distinct from the derived surface). Without + # ``--preopen`` the rejection stands exactly as before (the prior + # behaviour is intentionally preserved). if any( cap == "Fs" and method in (_WASI_FS_METADATA | _WASI_FS_STREAM) for cap, method in self._used_caps - ) and self._fs_ceiling is not None and not self._fs_ceiling.closed: + ) and self._fs_ceiling is not None and not self._fs_ceiling.closed \ + and not self._wasi_dynamic_fs: raise WasmEmissionError( "Fs in WASI mode requires every filesystem path to be a " "string literal (the static preopen ceiling must be " @@ -1142,6 +1153,12 @@ def _emit_wasi_wrappers(self) -> None: self._emit_wasi_fs_is_dir_wrapper() if ("Fs", "mkdir") in used: self._emit_wasi_fs_mkdir_wrapper() + # Layer b1: a DYNAMIC mkdir path cannot be unrolled into + # cumulative prefixes at compile time, so emit the runtime + # recursive sequencer (over the existing single-segment + # ``$Fs_mkdir``) when an operator preopen admits dynamic paths. + if self._wasi_dynamic_fs: + self._emit_wasi_fs_mkdir_recursive_helper() if ("Fs", "read") in used: self._emit_wasi_fs_read_wrapper() if ("Fs", "write") in used: @@ -3100,6 +3117,98 @@ def _emit_wasi_fs_unit_err(self, msg_off: int, msg_len: int) -> None: self._write("i32.const 0") self._write("i32.store offset=16") + def _emit_wasi_fs_mkdir_recursive_helper(self) -> None: + """``$Fs_mkdir_recursive (handle, full_ptr, full_len, idx, + rel_ptr, rel_len, ret_area)`` -> recursive ``mkdir`` over a + RUNTIME relative path (WASI Fs layer b1, dynamic ``--preopen``). + + A dynamic ``fs.mkdir(path)`` path is not known at compile time, so + the literal call site's cumulative-prefix unrolling cannot run. + This helper replicates ``os.makedirs(exist_ok=True)`` AT RUNTIME: + it scans the relative path for ``/`` separators and calls the + existing single-segment ``$Fs_mkdir`` once per cumulative prefix + (``a`` then ``a/b`` then ``a/b/c``), in order, each idempotent + (``$Fs_mkdir`` maps ``exist`` to Ok). It SHORT-CIRCUITS the + moment a prefix writes a genuine ``Err`` (ret_area tag@0 != 0), + leaving that Err in ``ret_area`` for the materialiser -- exactly + the literal path's behaviour, so a multi-segment dynamic mkdir is + byte-parity with the oracle. The FULL path is passed unchanged to + every ``$Fs_mkdir`` call so the fine-attenuation gate sees the + same full path each time (a denied target denies the first + prefix). ``$Fs_mkdir`` is REUSED verbatim; this helper only + sequences the prefixes a runtime path cannot pre-enumerate.""" + self._write( + "(func $Fs_mkdir_recursive (param $handle i32) " + "(param $full_ptr i32) (param $full_len i32) (param $idx i32) " + "(param $rel_ptr i32) (param $rel_len i32) (param $ret_area i32)" + ) + self._indent += 1 + self._write("(local $k i32)") + # Walk k = 1 .. rel_len; at each k that is either a '/' boundary + # (rel[k] == '/') or the end (k == rel_len), mkdir the prefix + # rel[0:k]. A leading '/' yields a zero-length first prefix the + # boundary loop never emits (k starts at 1 and rel[0]=='/' is a + # boundary that mkdirs rel[0:1] == "/", which $Fs_mkdir handles). + self._write("i32.const 1") + self._write("local.set $k") + self._write("(block $done") + self._indent += 1 + self._write("(loop $seg") + self._indent += 1 + # if k > rel_len -> done. + self._write("local.get $k") + self._write("local.get $rel_len") + self._write("i32.gt_u") + self._write("br_if $done") + # boundary = (k == rel_len) OR (rel[k] == '/'). Guard the load + # behind the end check so k == rel_len never reads out of range. + self._write("local.get $k") + self._write("local.get $rel_len") + self._write("i32.eq") + self._write("if (result i32)") + self._indent += 1 + self._write("i32.const 1") + self._indent -= 1 + self._write("else") + self._indent += 1 + self._write("local.get $rel_ptr") + self._write("local.get $k") + self._write("i32.add") + self._write("i32.load8_u offset=0") + self._write("i32.const 47") # '/' + self._write("i32.eq") + self._indent -= 1 + self._write("end") + self._write("if") + self._indent += 1 + # mkdir(prefix = rel[0:k]). + self._write("local.get $handle") + self._write("local.get $full_ptr") + self._write("local.get $full_len") + self._write("local.get $idx") + self._write("local.get $rel_ptr") # prefix ptr = rel_ptr + self._write("local.get $k") # prefix len = k + self._write("local.get $ret_area") + self._write("call $Fs_mkdir") + # Short-circuit on a genuine Err (tag@0 != 0). + self._write("local.get $ret_area") + self._write("i32.load8_u offset=0") + self._write("br_if $done") + self._indent -= 1 + self._write("end") + # k += 1; continue. + self._write("local.get $k") + self._write("i32.const 1") + self._write("i32.add") + self._write("local.set $k") + self._write("br $seg") + self._indent -= 1 + self._write(")") + self._indent -= 1 + self._write(")") + self._indent -= 1 + self._write(")") + # ----- Fs.read via wasi:filesystem + wasi:io/streams --------- def _emit_wasi_fs_read_wrapper(self) -> None: diff --git a/capa/manifest/__init__.py b/capa/manifest/__init__.py index 7594af1..1b5e25e 100644 --- a/capa/manifest/__init__.py +++ b/capa/manifest/__init__.py @@ -38,7 +38,10 @@ from __future__ import annotations from ._cyclonedx import CYCLONEDX_SPEC_VERSION, build_cyclonedx -from ._funrec import SCHEMA_VERSION, build_manifest, display_filename +from ._funrec import ( + SCHEMA_VERSION, build_manifest, build_operator_declared_grants, + display_filename, +) from ._provenance import ( CAPA_BUILD_TYPE, CAPA_BUILDER_ID, SLSA_PREDICATE_TYPE, build_provenance, @@ -57,6 +60,7 @@ "CAPA_BUILDER_ID", "SLSA_PREDICATE_TYPE", "build_manifest", + "build_operator_declared_grants", "build_cyclonedx", "build_spdx", "build_vex_document", diff --git a/capa/manifest/_cyclonedx.py b/capa/manifest/_cyclonedx.py index e76329c..8409724 100644 --- a/capa/manifest/_cyclonedx.py +++ b/capa/manifest/_cyclonedx.py @@ -72,6 +72,7 @@ def build_cyclonedx( source: Optional[str] = None, sources: Optional[dict[str, str]] = None, expr_labels: Optional[dict[int, str]] = None, + operator_declared_grants: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """Build a CycloneDX 1.5 SBOM with embedded Capa capability metadata. @@ -88,6 +89,7 @@ def build_cyclonedx( inner = build_manifest( module, filename=filename, capa_version=capa_version, expr_labels=expr_labels, + operator_declared_grants=operator_declared_grants, ) if timestamp is None: @@ -123,6 +125,28 @@ def build_cyclonedx( "value": str(inner["summary"]["functions_crossing_unsafe"])}, ] + # WASI Fs layer b1: surface operator-DECLARED grants (e.g. --preopen) + # as top-level metadata properties, clearly namespaced + labelled as + # operator-declared (Level 2) so an SBOM consumer never mistakes them + # for the program-proven, compiler-derived capability surface. The + # trust_level property states the framing in-band; one + # ``capa:operator_declared_grant:preopen`` property per granted dir. + _grants = inner.get("operator_declared_grants") or {} + _preopens = _grants.get("preopens") or [] + if _preopens: + metadata_properties.append({ + "name": "capa:operator_declared_grants:trust_level", + "value": str(_grants.get("trust_level", "operator-declared")), + }) + for _pre in _preopens: + metadata_properties.append({ + "name": "capa:operator_declared_grant:preopen", + "value": ( + f"{_pre.get('host_dir', '')}" + f" [{_pre.get('permission', 'rw')}]" + ), + }) + metadata = { "timestamp": timestamp, "tools": { diff --git a/capa/manifest/_funrec.py b/capa/manifest/_funrec.py index b4c1205..3c6f7c7 100644 --- a/capa/manifest/_funrec.py +++ b/capa/manifest/_funrec.py @@ -230,12 +230,47 @@ def _demangle_type_text(s: str) -> str: return _MANGLE_INLINE_RE.sub(r"\1", s) +def build_operator_declared_grants( + preopens: Optional[list[dict[str, Any]]] = None, +) -> dict[str, Any]: + """Build the ``operator_declared_grants`` manifest block (WASI Fs + layer b1, 2026-06-30). + + This block records authority the OPERATOR declared at build / run + time (e.g. ``--preopen ``), as DISTINCT from the + compiler-DERIVED capability surface that the rest of the manifest + proves. A regulator MUST read it as Level-2 operator-DECLARED + authority, NOT as program-proven: the compiler could not derive + these grants (that is precisely why the operator had to declare + them), so they are an explicit trust the operator placed in the + deployment, not a property the type system established. + + ``preopens`` is a list of ``{"host_dir": str, "permission": + "ro"|"rw", "kind": "fs"}`` entries (or None / empty when no operator + grant was declared). The block is always present so a consumer can + rely on its shape; an empty ``preopens`` means "no operator grant + was declared".""" + return { + # The honest label a regulator-facing consumer keys on: this is + # NOT derived/proven authority. + "trust_level": "operator-declared", + "note": ( + "Authority declared by the operator at build/run time " + "(e.g. --preopen). DISTINCT from the compiler-derived, " + "program-proven capability surface; the compiler could not " + "derive these grants." + ), + "preopens": list(preopens or []), + } + + def build_manifest( module: A.Module, *, filename: str = "", capa_version: Optional[str] = None, expr_labels: Optional[dict[int, str]] = None, + operator_declared_grants: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """Build a manifest dict from an analysed module. @@ -249,6 +284,12 @@ def build_manifest( @public`` bridges, dropping no-op declassifies of already-public values. When omitted, every syntactic declassify is counted (the historical, analysis-free behaviour). + + ``operator_declared_grants`` (WASI Fs layer b1, 2026-06-30): the + block produced by :func:`build_operator_declared_grants` recording + operator-DECLARED authority (e.g. ``--preopen``), clearly distinct + from the derived surface. When None, an EMPTY grants block is + recorded so the field shape is stable for consumers. """ if capa_version is None: from .. import __version__ as capa_version @@ -393,6 +434,14 @@ def build_manifest( "user_defined_capabilities": user_caps, "typestates": protocol_states, "functions": functions, + # WASI Fs layer b1: operator-declared authority (e.g. --preopen), + # honestly labelled Level-2 / operator-declared, distinct from the + # derived surface above. Always present (empty when none declared). + "operator_declared_grants": ( + operator_declared_grants + if operator_declared_grants is not None + else build_operator_declared_grants() + ), "summary": summary, } diff --git a/capa/manifest/_spdx.py b/capa/manifest/_spdx.py index 18acebd..4141a6a 100644 --- a/capa/manifest/_spdx.py +++ b/capa/manifest/_spdx.py @@ -105,6 +105,7 @@ def build_spdx( source: Optional[str] = None, sources: Optional[dict[str, str]] = None, expr_labels: Optional[dict[int, str]] = None, + operator_declared_grants: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: """Build an SPDX 2.3 document with embedded Capa capability metadata. @@ -121,6 +122,7 @@ def build_spdx( inner = build_manifest( module, filename=filename, capa_version=capa_version, expr_labels=expr_labels, + operator_declared_grants=operator_declared_grants, ) if timestamp is None: @@ -157,6 +159,21 @@ def build_spdx( _annot(timestamp, "summary:functions_crossing_unsafe", str(inner["summary"]["functions_crossing_unsafe"])), ] + # WASI Fs layer b1: operator-DECLARED grants (e.g. --preopen) as + # program-package annotations, labelled operator-declared (Level 2) + # so an SPDX consumer does not read them as program-proven authority. + _grants = inner.get("operator_declared_grants") or {} + _preopens = _grants.get("preopens") or [] + if _preopens: + program_annotations.append(_annot( + timestamp, "operator_declared_grants:trust_level", + str(_grants.get("trust_level", "operator-declared")), + )) + for _pre in _preopens: + program_annotations.append(_annot( + timestamp, "operator_declared_grant:preopen", + f"{_pre.get('host_dir', '')} [{_pre.get('permission', 'rw')}]", + )) program_pkg = { "SPDXID": program_id, "name": bom_basename, diff --git a/capa/runtime/_wasm_component_host.py b/capa/runtime/_wasm_component_host.py index 4f94812..8fe878f 100644 --- a/capa/runtime/_wasm_component_host.py +++ b/capa/runtime/_wasm_component_host.py @@ -86,6 +86,7 @@ def __init__( wasi: bool = False, env_ceiling: Optional["object"] = None, fs_ceiling: Optional["object"] = None, + fs_operator_preopen: Optional[tuple] = None, net_ceiling: Optional["object"] = None, stdin: Optional[bytes] = None, ): @@ -118,6 +119,21 @@ def __init__( # already rejected such a program in --wasi mode, so this is # belt-and-braces. Only consulted in ``--wasi`` mode. self._fs_ceiling = fs_ceiling + # WASI Fs layer b1 (operator preopen, 2026-06-30): an OPERATOR- + # DECLARED filesystem grant, ``(host_dir, read_write)`` or None. + # When the operator passes ``--preopen [:ro|:rw]`` the host + # registers that directory as a preopen AFTER every + # compiler-derived ceiling preopen, so its guest index is + # ``len(ceiling.preopens)``. In the dynamic-path case the derived + # ceiling is NOT closed (no derived preopens), so the operator + # preopen lands at index 0 -- the constant the dynamic-path + # call-site emitter addresses. This is the WASI ``--dir`` model + # (wasmtime's ``--dir``): authority DECLARED by the operator + # (Level 2), distinct from the COMPILER-DERIVED ceiling. It is the + # ONLY thing that lets a dynamic Fs path resolve at runtime; the + # compiler suppresses its dynamic-path rejection symmetrically + # (``--wasi-dynamic-fs``). Only consulted in ``--wasi`` mode. + self._fs_operator_preopen = fs_operator_preopen # Records the preopens actually installed on the WasiConfig in # WASI mode (a list of (host_path, "ro"|"rw") tuples), exposed # for tests / diagnostics so the ceiling guarantee is @@ -420,7 +436,11 @@ def _apply_fs_preopens(self, wasi_cfg) -> None: through a READ_ONLY preopen, independent of guest behaviour.""" ceiling = self._fs_ceiling if ceiling is None or not getattr(ceiling, "closed", False): - self._wasi_fs_applied = [] + # No derived preopens. In layer b1 an OPERATOR ``--preopen`` + # may still grant authority for dynamic paths: register it + # alone (at index 0, matching the dynamic call-site emitter's + # ``_wasi_operator_preopen_index() == 0`` for an open ceiling). + self._wasi_fs_applied = self._apply_operator_preopen(wasi_cfg, 0) return # ``get-directories`` returns descriptors ONLY for the preopens # actually registered, in registration order, so EVERY ceiling @@ -469,8 +489,48 @@ def _apply_fs_preopens(self, wasi_cfg) -> None: wasi_cfg.preopen_dir( host_path, guest_path, dir_perms, file_perms, ) + # Layer b1: append the operator ``--preopen`` AFTER the derived + # preopens so it never shifts a derived index (index == + # len(ceiling.preopens)). For an all-literal program this preopen + # is registered + recorded but unused by the guest (no dynamic + # call site); the grant stays honest in the SBOM regardless. + applied += self._apply_operator_preopen( + wasi_cfg, len(ceiling.preopens), + ) self._wasi_fs_applied = applied + def _apply_operator_preopen(self, wasi_cfg, index: int): + """Register the operator ``--preopen`` directory (layer b1) at the + given guest preopen ``index`` and return the list of applied + records (empty when no operator preopen was declared). + + The operator preopen is an explicit Level-2 operator grant: the + directory is mounted READ_WRITE or READ_ONLY per its declared + permission. A non-existent host directory is skipped (no record), + so a dynamic path resolved against a missing preopen sees no + descriptor at ``index`` and fails fail-closed-as-absent, matching + the derived-ceiling convention.""" + grant = self._fs_operator_preopen + if not grant: + return [] + host_dir, read_write = grant[0], bool(grant[1]) + if not os.path.isdir(host_dir): + return [] + guest_path = f"/capa-preopen-{index}" + if read_write: + wasi_cfg.preopen_dir( + host_dir, guest_path, + wasmtime.DirPerms.READ_WRITE, + wasmtime.FilePerms.READ_WRITE, + ) + return [(host_dir, "operator-rw")] + wasi_cfg.preopen_dir( + host_dir, guest_path, + wasmtime.DirPerms.READ_ONLY, + wasmtime.FilePerms.READ_ONLY, + ) + return [(host_dir, "operator-ro")] + def _register_all(self) -> None: root = self._linker.root() self._register_stdio(root) diff --git a/tests/test_manifest.py b/tests/test_manifest.py index e722960..2008cf7 100644 --- a/tests/test_manifest.py +++ b/tests/test_manifest.py @@ -263,6 +263,10 @@ class TestTopLevelShape(unittest.TestCase): "user_defined_capabilities", "typestates", "functions", + # WASI Fs layer b1: operator-declared authority (e.g. --preopen), + # always present (empty when none declared), distinct from the + # compiler-derived surface. + "operator_declared_grants", "summary", } diff --git a/tests/test_wasi_mode.py b/tests/test_wasi_mode.py index 81b5618..deb8098 100644 --- a/tests/test_wasi_mode.py +++ b/tests/test_wasi_mode.py @@ -526,6 +526,66 @@ def test_wasi_without_component_rejected(self): self.assertIn("--wasi requires --component", err) +class TestWasiPreopenFlagGuards(unittest.TestCase): + """``--preopen`` (layer b1) guards: it requires --wasi (or an SBOM + command) and b1 supports a single preopen for dynamic paths. These + fail before any Wasm toolchain is needed.""" + + def _run_cli(self, argv, src): + import tempfile + from pathlib import Path + from capa.cli import main + err = io.StringIO() + old_err, old_out, old_argv = sys.stderr, sys.stdout, sys.argv + sys.stderr = err + sys.stdout = io.StringIO() + with tempfile.TemporaryDirectory() as d: + f = Path(d) / "p.capa" + f.write_text(src, encoding="utf-8") + sys.argv = ["capa", *argv, str(f)] + try: + code = main() + finally: + sys.stderr, sys.stdout, sys.argv = old_err, old_out, old_argv + return code, err.getvalue() + + _DYN = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.read(p)\n" + " Ok(c) -> stdio.println(c)\n" + " Err(e) -> stdio.println(\"err\")\n" + " None -> stdio.println(\"none\")\n" + ) + + def test_preopen_without_wasi_rejected(self): + code, err = self._run_cli( + ["--wasm", "--component", "--run", "--preopen", "/tmp/x"], + self._DYN, + ) + self.assertEqual(code, 1) + self.assertIn("--preopen requires --wasi", err) + + def test_multiple_preopen_rejected(self): + code, err = self._run_cli( + ["--wasm", "--component", "--wasi", "--run", + "--preopen", "/tmp/a", "--preopen", "/tmp/b"], + self._DYN, + ) + self.assertEqual(code, 1) + self.assertIn("single --preopen", err) + + def test_preopen_allowed_with_manifest(self): + # --preopen is accepted alongside an SBOM/--manifest command (it + # records the operator grant). No --wasi needed there. + code, err = self._run_cli( + ["--manifest", "--preopen", "/data:ro"], self._DYN, + ) + self.assertEqual(code, 0, err) + + class TestWasiEnvCeilingAnalysis(unittest.TestCase): """Static Env authority-ceiling analysis (Level 1 pre-requisite). @@ -1421,6 +1481,170 @@ def test_read_dynamic_path_fail_closed_rejected(self): self.assertIn("literal", str(cm.exception)) +class TestWasiFsDynamicPreopenCompile(unittest.TestCase): + """WASI Fs layer b1: the operator ``--preopen`` flag UNBLOCKS a + DYNAMIC Fs path at compile time (suppressing the dynamic-path + rejection) and records the grant in the SBOM. Pure-Python checks (no + wasm-tools / wasmtime), so this class is not gated.""" + + _DYN_SRC = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.read(p)\n" + " Ok(c) -> stdio.println(c)\n" + " Err(e) -> stdio.println(\"err\")\n" + " None -> stdio.println(\"none\")\n" + ) + + def _compile(self, src: str, *, dynamic_fs: bool): + from capa.ir import compile_wat + module, result = _parse_analyze(src) + return compile_wat( + module, types=result.types, wasi=True, + wasi_dynamic_fs=dynamic_fs, + ) + + def test_without_preopen_still_rejected(self): + # NO regression: without the operator preopen the dynamic path is + # still rejected at compile time exactly as before. + with self.assertRaises(Exception) as cm: + self._compile(self._DYN_SRC, dynamic_fs=False) + self.assertIn("WASI mode", str(cm.exception)) + self.assertIn("literal", str(cm.exception)) + + def test_with_preopen_compiles(self): + # With the operator preopen the dynamic path compiles: the Fs.read + # wrapper + the preopen resolver are emitted, no capa:host/fs. + wat = self._compile(self._DYN_SRC, dynamic_fs=True) + self.assertIn("(func $Fs_read", wat) + self.assertIn("(func $__wasi_fs_preopen_desc", wat) + self.assertNotIn('"capa:host/fs"', wat) + + def test_dynamic_metadata_and_streams_compile(self): + # exists / is_dir / mkdir / write / list_dir all admit a dynamic + # path under the operator preopen. + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " stdio.println(\"${fs.exists(p)}\")\n" + " stdio.println(\"${fs.is_dir(p)}\")\n" + " let m = fs.mkdir(p)\n" + " let w = fs.write(p, \"x\")\n" + " let l = fs.list_dir(p)\n" + " None -> stdio.println(\"none\")\n" + ) + wat = self._compile(src, dynamic_fs=True) + self.assertIn("(func $Fs_exists", wat) + self.assertIn("(func $Fs_is_dir", wat) + self.assertIn("(func $Fs_mkdir", wat) + self.assertIn("(func $Fs_write", wat) + self.assertIn("(func $Fs_list_dir", wat) + + def test_operator_preopen_index_is_zero_when_ceiling_open(self): + # b1 index rule: with no derived preopens (dynamic ceiling) the + # operator preopen is index 0, the constant the dynamic call site + # addresses. + from capa.ir import compile_wat # noqa: F401 + from capa.ir._emit_wasm import WasmEmitter + from capa.ir._lower import Lowerer + module, result = _parse_analyze(self._DYN_SRC) + cir = Lowerer(types=result.types or {}).lower_module(module) + em = WasmEmitter(wasi=True, wasi_dynamic_fs=True) + em.emit(cir) + self.assertEqual(em._wasi_operator_preopen_index(), 0) + + def test_grant_recorded_in_manifest(self): + # The operator grant is surfaced in the manifest as a Level-2 + # operator-DECLARED block, distinct from the derived surface. + from capa.manifest import ( + build_manifest, build_operator_declared_grants, + ) + module, result = _parse_analyze(self._DYN_SRC) + grants = build_operator_declared_grants([ + {"kind": "fs", "host_dir": "/data", "permission": "rw"}, + ]) + man = build_manifest( + module, operator_declared_grants=grants, + ) + block = man["operator_declared_grants"] + self.assertEqual(block["trust_level"], "operator-declared") + self.assertEqual(block["preopens"][0]["host_dir"], "/data") + self.assertEqual(block["preopens"][0]["permission"], "rw") + + def test_grant_recorded_in_cyclonedx_and_spdx(self): + from capa.manifest import ( + build_cyclonedx, build_spdx, build_operator_declared_grants, + ) + module, result = _parse_analyze(self._DYN_SRC) + grants = build_operator_declared_grants([ + {"kind": "fs", "host_dir": "/data", "permission": "ro"}, + ]) + cdx = build_cyclonedx( + module, timestamp="2026-06-30T00:00:00Z", + operator_declared_grants=grants, + ) + props = {p["name"]: p["value"] for p in cdx["metadata"]["properties"]} + self.assertEqual( + props["capa:operator_declared_grants:trust_level"], + "operator-declared", + ) + self.assertIn( + "capa:operator_declared_grant:preopen", props, + ) + self.assertIn("/data", props["capa:operator_declared_grant:preopen"]) + spdx = build_spdx( + module, timestamp="2026-06-30T00:00:00Z", + operator_declared_grants=grants, + ) + comments = [ + a["comment"] for a in spdx["packages"][0]["annotations"] + ] + self.assertTrue(any( + "operator_declared_grant:preopen" in c for c in comments + )) + + def test_empty_grant_block_present_by_default(self): + # The block is always present (empty preopens) so consumers can + # rely on the shape even when no operator grant is declared. + from capa.manifest import build_manifest + module, result = _parse_analyze( + "fun main(stdio: Stdio)\n stdio.println(\"hi\")\n" + ) + man = build_manifest(module) + self.assertEqual(man["operator_declared_grants"]["preopens"], []) + + +class TestWasiPreopenSpecParse(unittest.TestCase): + """The CLI ``--preopen [:ro|:rw]`` spec parser (pure Python).""" + + def test_default_is_read_write(self): + from capa.cli import _parse_preopen_spec + self.assertEqual(_parse_preopen_spec("/data"), ("/data", True)) + + def test_ro_suffix(self): + from capa.cli import _parse_preopen_spec + self.assertEqual(_parse_preopen_spec("/data:ro"), ("/data", False)) + + def test_rw_suffix(self): + from capa.cli import _parse_preopen_spec + self.assertEqual(_parse_preopen_spec("/data:rw"), ("/data", True)) + + def test_colon_in_path_preserved(self): + from capa.cli import _parse_preopen_spec + # Only a trailing :ro / :rw is a permission suffix; a Windows + # drive colon (or any other colon) is preserved. + self.assertEqual( + _parse_preopen_spec("C:/data"), ("C:/data", True), + ) + self.assertEqual( + _parse_preopen_spec("C:/data:ro"), ("C:/data", False), + ) + + class TestWasiNetDynamicUrlRejections(unittest.TestCase): """A DYNAMIC Net url (not a string literal) reaching get / post is rejected at COMPILE time in --wasi (2026-06-29), SYMMETRIC with the Fs @@ -2483,6 +2707,325 @@ def _run_wasi_fs(src: str, data_dir: str) -> str: ) +def _build_wasi_dynamic_fs_component(src: str) -> bytes: + """Build a --wasi component with the operator-preopen flag set, so a + DYNAMIC Fs path is admitted (layer b1). The compiler suppresses its + dynamic-path rejection because an operator preopen is declared.""" + from capa.ir import compile_wasm, compile_wit + from capa.cli import _wrap_as_component + module, result = _parse_analyze(src) + core = compile_wasm( + module, types=result.types, wasi=True, wasi_dynamic_fs=True, + ) + wit = compile_wit(module, types=result.types, wasi=True) + return _wrap_as_component(core, wit, wasi=True) + + +def _run_wasi_dynamic_fs( + src: str, preopen_dir: str, *, read_write: bool = True, + args: tuple = (), +) -> str: + """Build + run a DYNAMIC-path Fs program in WASI mode under a single + operator ``--preopen`` directory; capture stdout. The dynamic path is + resolved at runtime relative to ``preopen_dir`` (the operator grant).""" + from capa.runtime._wasm_component_host import WasmComponentHost + comp = _build_wasi_dynamic_fs_component(src) + host = WasmComponentHost( + args=args, wasi=True, + fs_operator_preopen=(preopen_dir, read_write), + ) + return _wasi_run_capture(host, comp) + + +def _run_python_in_cwd(src: str, cwd: str, args: tuple = ()) -> str: + """Run a program on the Python oracle with ``cwd`` as the working + directory and ``args`` as ``sys.argv[1:]`` (so ``env.args()`` and a + relative Fs path resolve the same way the WASI operator preopen makes + them resolve: relative to the granted directory).""" + from capa import transpile + module, result = _parse_analyze(src) + code = transpile(module, types=result.types, bindings=result.bindings) + buf = io.StringIO() + saved_out, saved_argv, saved_cwd = sys.stdout, list(sys.argv), os.getcwd() + sys.stdout = buf + sys.argv = ["prog"] + list(args) + os.chdir(cwd) + try: + ns: dict = {"__name__": "__main__"} + exec(compile(code, "", "exec"), ns) + finally: + sys.stdout = saved_out + sys.argv = saved_argv + os.chdir(saved_cwd) + return buf.getvalue() + + +def _run_capa_host_in_cwd(src: str, cwd: str, args: tuple = ()) -> str: + """Run a program on the default capa:host component backend with + ``cwd`` as the working directory and ``args`` as the program argv, so + a relative dynamic Fs path resolves identically to the WASI operator + preopen and the Python oracle.""" + from capa.ir import compile_wasm, compile_wit + from capa.cli import _wrap_as_component + from capa.runtime._wasm_component_host import WasmComponentHost + module, result = _parse_analyze(src) + core = compile_wasm(module, types=result.types, wasi=False) + wit = compile_wit(module, types=result.types, wasi=False) + comp = _wrap_as_component(core, wit, wasi=False) + buf = io.StringIO() + saved_out, saved_cwd = sys.stdout, os.getcwd() + sys.stdout = buf + os.chdir(cwd) + try: + WasmComponentHost(args=args, wasi=False).run_main(comp) + finally: + sys.stdout = saved_out + os.chdir(saved_cwd) + return buf.getvalue() + + +@unittest.skipUnless( + _has_wasm_tools() and _has_wasmtime_wasip2(), + "wasm-tools and/or wasmtime-py with WASI P2 not installed", +) +class TestWasiFsDynamicPreopen(unittest.TestCase): + """End-to-end WASI Fs layer b1: a genuine DYNAMIC Fs path (sourced + from ``env.args()``) compiles + runs under a single operator + ``--preopen`` directory, resolving at runtime relative to it, with + three-way byte-parity (Python oracle == capa:host backend == WASI + backend) on both output and filesystem effect. The dynamic path makes + the static ceiling NOT closed, so the operator preopen is the sole + preopen (index 0).""" + + def setUp(self): + import tempfile + self._td = tempfile.mkdtemp(prefix="capa-wasi-dynfs-") + + def tearDown(self): + import shutil + shutil.rmtree(self._td, ignore_errors=True) + + def _fresh_dir(self, name): + import tempfile + d = tempfile.mkdtemp(prefix=f"capa-{name}-", dir=self._td) + return d + + def test_read_dynamic_three_backend_parity(self): + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.read(p)\n" + " Ok(c) -> stdio.println(c)\n" + " Err(e) -> stdio.println(\"ERR\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + # One controlled directory PER backend so each reads its own copy. + outs = [] + for be in ("wasi", "py", "host"): + d = self._fresh_dir(be) + with open(os.path.join(d, "hello.txt"), "w") as f: + f.write("DYNAMIC-READ-OK") + if be == "wasi": + outs.append(_run_wasi_dynamic_fs( + src, d, args=("hello.txt",), + )) + elif be == "py": + outs.append(_run_python_in_cwd(src, d, args=("hello.txt",))) + else: + outs.append(_run_capa_host_in_cwd( + src, d, args=("hello.txt",), + )) + self.assertEqual(outs[0], "DYNAMIC-READ-OK\n") + self.assertEqual(outs[0], outs[1]) + self.assertEqual(outs[0], outs[2]) + + def test_write_dynamic_three_backend_parity_and_effect(self): + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.write(p, \"CONTENT-XYZ\")\n" + " Ok(u) -> stdio.println(\"WROTE\")\n" + " Err(e) -> stdio.println(\"ERR\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + results = {} + effects = {} + for be in ("wasi", "py", "host"): + d = self._fresh_dir(be) + if be == "wasi": + results[be] = _run_wasi_dynamic_fs(src, d, args=("o.txt",)) + elif be == "py": + results[be] = _run_python_in_cwd(src, d, args=("o.txt",)) + else: + results[be] = _run_capa_host_in_cwd(src, d, args=("o.txt",)) + with open(os.path.join(d, "o.txt")) as f: + effects[be] = f.read() + self.assertEqual(results["wasi"], "WROTE\n") + self.assertEqual(results["wasi"], results["py"]) + self.assertEqual(results["wasi"], results["host"]) + self.assertEqual(effects["wasi"], "CONTENT-XYZ") + self.assertEqual(effects["wasi"], effects["py"]) + self.assertEqual(effects["wasi"], effects["host"]) + + def test_exists_is_dir_dynamic_parity(self): + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " stdio.println(\"e=${fs.exists(p)}\")\n" + " stdio.println(\"d=${fs.is_dir(p)}\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + for arg, mk in (("there.txt", "file"), ("adir", "dir"), + ("nope", None)): + results = {} + for be in ("wasi", "py", "host"): + d = self._fresh_dir(f"{be}-{arg}") + with open(os.path.join(d, "there.txt"), "w") as f: + f.write("x") + os.makedirs(os.path.join(d, "adir")) + if be == "wasi": + results[be] = _run_wasi_dynamic_fs(src, d, args=(arg,)) + elif be == "py": + results[be] = _run_python_in_cwd(src, d, args=(arg,)) + else: + results[be] = _run_capa_host_in_cwd(src, d, args=(arg,)) + self.assertEqual(results["wasi"], results["py"], arg) + self.assertEqual(results["wasi"], results["host"], arg) + + def test_mkdir_dynamic_parity_and_effect(self): + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.mkdir(p)\n" + " Ok(u) -> stdio.println(\"MK=ok\")\n" + " Err(e) -> stdio.println(\"MK=err\")\n" + " stdio.println(\"d=${fs.is_dir(p)}\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + results = {} + for be in ("wasi", "py", "host"): + d = self._fresh_dir(be) + if be == "wasi": + results[be] = _run_wasi_dynamic_fs(src, d, args=("newdir",)) + elif be == "py": + results[be] = _run_python_in_cwd(src, d, args=("newdir",)) + else: + results[be] = _run_capa_host_in_cwd(src, d, args=("newdir",)) + self.assertTrue(os.path.isdir(os.path.join(d, "newdir")), be) + self.assertEqual(results["wasi"], results["py"]) + self.assertEqual(results["wasi"], results["host"]) + + def test_mkdir_dynamic_multi_segment_parity_and_effect(self): + # A MULTI-segment dynamic mkdir replicates os.makedirs(exist_ok) + # at runtime ($Fs_mkdir_recursive over $Fs_mkdir per prefix), so a + # missing-parent tree is created and the Result matches the oracle. + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.mkdir(p)\n" + " Ok(u) -> stdio.println(\"MK=ok\")\n" + " Err(e) -> stdio.println(\"MK=err\")\n" + " stdio.println(\"d=${fs.is_dir(p)}\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + results = {} + for be in ("wasi", "py", "host"): + d = self._fresh_dir(be) + if be == "wasi": + results[be] = _run_wasi_dynamic_fs(src, d, args=("a/b/c",)) + elif be == "py": + results[be] = _run_python_in_cwd(src, d, args=("a/b/c",)) + else: + results[be] = _run_capa_host_in_cwd(src, d, args=("a/b/c",)) + self.assertTrue(os.path.isdir(os.path.join(d, "a", "b", "c")), be) + self.assertEqual(results["wasi"], "MK=ok\nd=true\n") + self.assertEqual(results["wasi"], results["py"]) + self.assertEqual(results["wasi"], results["host"]) + + def test_list_dir_dynamic_parity(self): + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.list_dir(p)\n" + " Ok(names) ->\n" + " for n in names\n" + " stdio.println(n)\n" + " Err(e) -> stdio.println(\"ERR\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + results = {} + for be in ("wasi", "py", "host"): + d = self._fresh_dir(be) + sub = os.path.join(d, "ld") + os.makedirs(sub) + for nm in ("b.txt", "a.txt", "c.txt"): + with open(os.path.join(sub, nm), "w") as f: + f.write("") + if be == "wasi": + results[be] = _run_wasi_dynamic_fs(src, d, args=("ld",)) + elif be == "py": + results[be] = _run_python_in_cwd(src, d, args=("ld",)) + else: + results[be] = _run_capa_host_in_cwd(src, d, args=("ld",)) + self.assertEqual(results["wasi"], "a.txt\nb.txt\nc.txt\n") + self.assertEqual(results["wasi"], results["py"]) + self.assertEqual(results["wasi"], results["host"]) + + def test_restricted_fs_plus_dynamic_path_mitigation(self): + # The fine attenuation gate ($Fs_path_allowed) still works with a + # DYNAMIC path: a restrict_to'd Fs denies a runtime path outside + # the prefix and admits one inside, byte-parity with the oracle. + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let r = fs.restrict_to(\"allowed\")\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) -> stdio.println(\"${r.exists(p)}\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + for arg, expect in (("allowed/ok.txt", "true\n"), + ("secret.txt", "false\n")): + results = {} + for be in ("wasi", "py", "host"): + d = self._fresh_dir(f"{be}-r") + os.makedirs(os.path.join(d, "allowed")) + with open(os.path.join(d, "allowed", "ok.txt"), "w") as f: + f.write("INSIDE") + with open(os.path.join(d, "secret.txt"), "w") as f: + f.write("SECRET") + if be == "wasi": + results[be] = _run_wasi_dynamic_fs(src, d, args=(arg,)) + elif be == "py": + results[be] = _run_python_in_cwd(src, d, args=(arg,)) + else: + results[be] = _run_capa_host_in_cwd(src, d, args=(arg,)) + self.assertEqual(results["wasi"], expect, arg) + self.assertEqual(results["wasi"], results["py"], arg) + self.assertEqual(results["wasi"], results["host"], arg) + + def test_operator_preopen_registered_at_index_zero(self): + # The host installs exactly the operator preopen (index 0) for a + # dynamic-path program (no derived ceiling). + from capa.runtime._wasm_component_host import WasmComponentHost + d = self._fresh_dir("idx") + host = WasmComponentHost( + wasi=True, fs_operator_preopen=(d, True), + ) + self.assertEqual(host._wasi_fs_applied, [(d, "operator-rw")]) + + @unittest.skipUnless( _has_wasm_tools() and _has_wasmtime_wasip2(), "wasm-tools and/or wasmtime-py with WASI P2 not installed", From bc62bbc02481034280d366cb9993a1018284ffa6 Mon Sep 17 00:00:00 2001 From: nelsoduarte Date: Tue, 30 Jun 2026 18:05:29 +0100 Subject: [PATCH 2/2] fix(wasi): normalize . and .. in dynamic-path fine-attenuation gate (oracle parity) The guest-side Fs fine-attenuation gate ($Fs_path_allowed via $Fs_path_contained) did a purely lexical prefix comparison. Once --preopen began admitting dynamic Fs paths, a path like "sub/../secret.txt" started lexically with the restrict_to("sub") prefix and PASSED the gate, reading a sibling outside the subtree, while the Python oracle (os.path.realpath) correctly denied it. The gate now lexically normalizes . and .. in both the runtime path and the stored prefixes first (new $__fs_normalize, an os.path.normpath-style collapse that preserves a leading .. so an escape stays an escape), restoring byte-for-byte three-backend parity (Python == capa:host == WASI): sub/ok.txt admitted; sub/../secret.txt and sub/../sub2/x.txt denied; sub/../sub/ok.txt (normalizes back inside) admitted. Symlinks are still not resolved, the only remaining Level-2 loss. The Level-1 preopen ceiling (wasmtime) is unchanged and still confines an unrestricted Fs to the granted dir regardless of .. Also: a program mixing a literal and a dynamic Fs path under --preopen still fails closed (b1 limitation), now with a clear message naming the limitation and the flag instead of an internal "no closed preopen ceiling" wording. Docs (wasi_mode.md) and CHANGELOG updated; tests added for the .. parity table, Level-1 confinement of an unrestricted dynamic .. path, and the mixed-path message. --- CHANGELOG.md | 26 ++ capa/ir/_emit_wasm/__init__.py | 1 + capa/ir/_emit_wasm/_caps.py | 47 ++- capa/ir/_emit_wasm/_wasi.py | 517 +++++++++++++++++++++++++++++++-- docs/design/wasi_mode.md | 45 +-- tests/test_wasi_mode.py | 104 +++++++ 6 files changed, 692 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67c1994..cd9d839 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,32 @@ breaking changes and the discipline is still being shaped. stays at Level 2 `inherit_env` on a dynamic key and is intentionally not aligned with this fail-closed rule). +**Fixed.** + +- *In the experimental `--wasi` mode, the guest-side fine attenuation gate + (`restrict_to` / `allows`) now lexically normalises `.` and `..` path + segments before its containment check, closing a bypass on a dynamic + path.* Previously the gate did a PURELY lexical prefix comparison: a + dynamic path such as `sub/../secret.txt` (reachable since `--preopen` + began admitting dynamic `Fs` paths) starts lexically with the allowed + prefix `sub/`, so it PASSED the gate and read a sibling OUTSIDE the + `restrict_to("sub")` subtree, while the Python oracle (which + canonicalises with `os.path.realpath`) correctly DENIED it. The gate now + normalises `.`/`..` in both the path and the stored prefixes first + (`$__fs_normalize`, an `os.path.normpath`-style collapse that preserves + a leading `..` so an escape stays an escape), restoring byte-for-byte + three-backend parity (Python oracle == `capa:host` == WASI): `sub/ok.txt` + is admitted, `sub/../secret.txt` and `sub/../sub2/x.txt` are denied, and + `sub/../sub/ok.txt` (which normalises back inside) is admitted. SYMLINKS + are still not resolved by the lexical gate -- that remains the documented + Level-2 loss, now the ONLY divergence from the realpath oracle (`.`/`..` + are handled). The Level-1 preopen ceiling (enforced by wasmtime) is + unchanged and still confines an unrestricted `Fs` to the granted + directory regardless of `..`. A program that MIXES a literal `Fs` path + and a dynamic one under `--preopen` still fails closed (layer b1 does not + yet support mixing), now with a clear message that names the limitation + and the flag instead of an internal "no closed preopen ceiling" wording. + ## [1.14.0], 2026-06-29 **Capa 1.14.0.** A MINOR release: an experimental, opt-in `--wasi` mode diff --git a/capa/ir/_emit_wasm/__init__.py b/capa/ir/_emit_wasm/__init__.py index e460549..ede3fc6 100644 --- a/capa/ir/_emit_wasm/__init__.py +++ b/capa/ir/_emit_wasm/__init__.py @@ -1156,6 +1156,7 @@ def emit(self, module: Module) -> str: or self._wasi_env_uses_get_or_args() or self._wasi_net_uses_attenuation() or self._wasi_fs_uses_preopens + or self._wasi_fs_uses_attenuation() or (self._wasi and ("Stdio", "read_line") in self._used_caps) ): heap_start = _align_up(self._string_data_offset, 8) diff --git a/capa/ir/_emit_wasm/_caps.py b/capa/ir/_emit_wasm/_caps.py index 0f61628..e7e7cca 100644 --- a/capa/ir/_emit_wasm/_caps.py +++ b/capa/ir/_emit_wasm/_caps.py @@ -117,6 +117,37 @@ class _CapDispatchMixin: + def _wasi_fs_no_ceiling_error(self) -> "WasmEmissionError": + """The error raised when a LITERAL Fs path reaches a call site but + the static preopen ceiling is not closed. + + With ``--preopen`` (``self._wasi_dynamic_fs``) this is reached ONLY + for the literal path of a program that ALSO passes a DYNAMIC path + to an Fs op: the dynamic path opens the ceiling, so the literal can + no longer be resolved to a derived preopen index. Layer b1 does not + yet support MIXING literal and dynamic Fs paths in one program; + fail closed with a message that names the limitation and the flag, + rather than the internal ceiling wording. Without ``--preopen`` the + generic no-ceiling message stands.""" + if self._wasi_dynamic_fs: + return WasmEmissionError( + "WASI --preopen mode (Fs layer b1) does not yet support " + "MIXING a string-literal Fs path and a dynamic Fs path in " + "the same program: the dynamic path opens the static " + "preopen ceiling, so the literal path can no longer be " + "resolved to a derived preopen (fail-closed). Use only " + "dynamic paths (resolved against the single --preopen " + "directory), or only literal paths (drop --preopen), or " + "fall back to the default capa:host backend (drop --wasi)." + ) + return WasmEmissionError( + "Fs in WASI mode requires a closed static preopen ceiling " + "(every filesystem path a string literal); this program has " + "no closed ceiling, so no preopen can be derived (fail-closed). " + "Grant a directory with --preopen to admit dynamic " + "paths, or use the default capa:host backend (drop --wasi)." + ) + def _cap_method_wasm_sig( self, cap: str, method: str, ) -> tuple[list[str], str]: @@ -662,9 +693,7 @@ def _emit_wasi_fs_metadata_call( ) ceiling = self._fs_ceiling if ceiling is None or not ceiling.closed: - raise WasmEmissionError( - "Fs in WASI mode has no closed preopen ceiling" - ) + raise self._wasi_fs_no_ceiling_error() idx, rel = resolve_fs_call(ceiling, arg.literal) # The FULL original literal path is interned for the guest-side # fail-closed attenuation gate (``$Fs_path_allowed``): the @@ -770,9 +799,7 @@ def _emit_wasi_fs_read_call(self, instr: MethodCall) -> None: ) ceiling = self._fs_ceiling if ceiling is None or not ceiling.closed: - raise WasmEmissionError( - "Fs in WASI mode has no closed preopen ceiling" - ) + raise self._wasi_fs_no_ceiling_error() idx, rel = resolve_fs_call(ceiling, arg.literal) rel_off, rel_len = self._intern_string(rel) # Full original literal + receiver handle for the guest-side @@ -835,9 +862,7 @@ def _emit_wasi_fs_write_call(self, instr: MethodCall) -> None: ) ceiling = self._fs_ceiling if ceiling is None or not ceiling.closed: - raise WasmEmissionError( - "Fs in WASI mode has no closed preopen ceiling" - ) + raise self._wasi_fs_no_ceiling_error() idx, rel = resolve_fs_call(ceiling, arg.literal) rel_off, rel_len = self._intern_string(rel) # Full original literal + receiver handle for the guest-side @@ -898,9 +923,7 @@ def _emit_wasi_fs_list_dir_call(self, instr: MethodCall) -> None: ) ceiling = self._fs_ceiling if ceiling is None or not ceiling.closed: - raise WasmEmissionError( - "Fs in WASI mode has no closed preopen ceiling" - ) + raise self._wasi_fs_no_ceiling_error() idx, rel = resolve_fs_call(ceiling, arg.literal) rel_off, rel_len = self._intern_string(rel) # Full original literal + receiver handle for the guest-side diff --git a/capa/ir/_emit_wasm/_wasi.py b/capa/ir/_emit_wasm/_wasi.py index 1312068..04c2906 100644 --- a/capa/ir/_emit_wasm/_wasi.py +++ b/capa/ir/_emit_wasm/_wasi.py @@ -188,8 +188,9 @@ ("Fs", "list_dir"), # Fs FINE ATTENUATION (2026-06-28): restrict_to / allows are # implemented GUEST-SIDE (Level 2 of docs/design/wasi-attenuation.md), - # analogous to Env's restrict_to_keys / allows but with LEXICAL path - # containment in place of key equality. No capa:host/fs import: their + # analogous to Env's restrict_to_keys / allows but with path-prefix + # containment (with lexical ``.``/``..`` normalisation; symlinks + # unresolved) in place of key equality. No capa:host/fs import: their # ``$Fs_restrict_to`` / ``$Fs_allows`` bindings are emitted as guest # WAT wrappers by ``_emit_wasi_wrappers``. Listed here so the import # loop does NOT try to emit a capa:host/fs import for them (the host @@ -343,9 +344,10 @@ def _validate_wasi_caps(self) -> None: read-directory enumeration -> guest-side sort) over wasi:io/streams, AND the fine-grained attenuators ``restrict_to`` / ``allows`` implemented GUEST-SIDE (Level 2 of - ``docs/design/wasi-attenuation.md``), with LEXICAL path - containment in place of the oracle's realpath (the honest TOCTOU - / symlink loss documented there). No Fs method is rejected here + ``docs/design/wasi-attenuation.md``), with path-prefix containment + that lexically normalises ``.``/``..`` (oracle parity for those) + but does NOT resolve symlinks (the honest TOCTOU / symlink loss + documented there). No Fs method is rejected here (``_WASI_FS_REJECTED`` is now empty); the fail-closed preopen ceiling obligation below still applies to any op that touches the filesystem. @@ -1141,6 +1143,7 @@ def _emit_wasi_wrappers(self) -> None: # migrated Fs op AND the ``restrict_to`` / ``allows`` wrappers; # emit them once when any Fs op (or attenuator) is present. if self._wasi_fs_uses_attenuation(): + self._emit_wasi_fs_normalize_helper() self._emit_wasi_fs_path_contained_helper() self._emit_wasi_fs_path_allowed_helper() if ("Fs", "restrict_to") in used: @@ -2418,28 +2421,456 @@ def _emit_wasi_net_restrict_to_wrapper(self) -> None: # ----- guest-side Fs attenuation (Level 2) ------------------- + def _emit_wasi_fs_normalize_helper(self) -> None: + """``$__fs_normalize (src_ptr i32, src_len i32, dst_ptr i32) -> + i32`` -> writes the LEXICALLY normalised path into ``[dst_ptr, + dst_ptr+ret)`` and returns its length ``ret``. + + Collapses ``.`` and ``..`` segments the way ``os.path.realpath`` + does for the NO-SYMLINK case (the lexical part the guest can + reproduce without a kernel walk), so the containment gate matches + the Python oracle (``Fs.allows``, which canonicalises via + ``realpath``) for ``.`` / ``..``. Symlinks are still NOT resolved + -- that remains the documented Level-2 loss + (``docs/design/wasi_mode.md``). + + Rules (validated byte-for-byte against ``os.path.normpath`` and a + 9331-input fuzz of the segment reference, scratchpad + ``wat_sim2.py``): + - split on ``/``; drop empty segments (``//``, trailing ``/``) + and ``.``; + - ``..`` POPS the previous emitted segment when one exists AND + it is not itself a (locked) leading ``..``; otherwise, for an + ABSOLUTE path it is dropped (cannot escape root), for a + RELATIVE path it is KEPT (a leading ``..`` escapes the prefix, + so containment must fail); + - an absolute path keeps its single leading ``/``; a relative + path that normalises to empty becomes ``.``. + The output is never longer than the input, so the caller sizes the + destination buffer at ``max(src_len, 1)``. + + WAT-local helpers are inlined: segment append (prepend ``/`` when + ``dst_len > 0``) and the ``..`` pop / last-segment-is-``..`` test + (scan back from ``dst_len`` to the previous ``/`` or to 0).""" + self._write( + "(func $__fs_normalize (param $src_ptr i32) " + "(param $src_len i32) (param $dst_ptr i32) (result i32)" + ) + self._indent += 1 + self._write("(local $is_abs i32)") + self._write("(local $i i32)") + self._write("(local $dst_len i32)") + self._write("(local $seg_start i32)") + self._write("(local $seg_len i32)") + self._write("(local $last_start i32)") + self._write("(local $j i32)") + # is_abs = src_len > 0 && src[0] == '/'. + self._write("local.get $src_len") + self._write("i32.const 0") + self._write("i32.gt_u") + self._write("if (result i32)") + self._indent += 1 + self._write("local.get $src_ptr") + self._write("i32.load8_u") + self._write("i32.const 47") + self._write("i32.eq") + self._indent -= 1 + self._write("else") + self._indent += 1 + self._write("i32.const 0") + self._indent -= 1 + self._write("end") + self._write("local.set $is_abs") + # If absolute, the leading '/' is emitted at the end; dst here + # holds only the RELATIVE remainder (so the pop / leading-'..' + # logic never crosses the root slash). dst_len starts at 0. + self._write("i32.const 0") + self._write("local.set $dst_len") + self._write("i32.const 0") + self._write("local.set $i") + self._write("(block $scan_done") + self._indent += 1 + self._write("(loop $scan") + self._indent += 1 + self._write("local.get $i") + self._write("local.get $src_len") + self._write("i32.ge_u") + self._write("br_if $scan_done") + # skip a '/' run. + self._write("local.get $src_ptr") + self._write("local.get $i") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.const 47") + self._write("i32.eq") + self._write("if") + self._indent += 1 + self._write("local.get $i") + self._write("i32.const 1") + self._write("i32.add") + self._write("local.set $i") + self._write("br $scan") + self._indent -= 1 + self._write("end") + # segment = [seg_start, i) until next '/' or end. + self._write("local.get $i") + self._write("local.set $seg_start") + self._write("(block $seg_done") + self._indent += 1 + self._write("(loop $seg") + self._indent += 1 + self._write("local.get $i") + self._write("local.get $src_len") + self._write("i32.ge_u") + self._write("br_if $seg_done") + self._write("local.get $src_ptr") + self._write("local.get $i") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.const 47") + self._write("i32.eq") + self._write("br_if $seg_done") + self._write("local.get $i") + self._write("i32.const 1") + self._write("i32.add") + self._write("local.set $i") + self._write("br $seg") + self._indent -= 1 + self._write(")") + self._indent -= 1 + self._write(")") + self._write("local.get $i") + self._write("local.get $seg_start") + self._write("i32.sub") + self._write("local.set $seg_len") + # '.' (len 1, byte '.') -> drop. + self._write("local.get $seg_len") + self._write("i32.const 1") + self._write("i32.eq") + self._write("local.get $src_ptr") + self._write("local.get $seg_start") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.const 46") + self._write("i32.eq") + self._write("i32.and") + self._write("if") + self._indent += 1 + self._write("br $scan") + self._indent -= 1 + self._write("end") + # '..' (len 2, both bytes '.') -> pop / drop / keep. + self._write("local.get $seg_len") + self._write("i32.const 2") + self._write("i32.eq") + self._write("local.get $src_ptr") + self._write("local.get $seg_start") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.const 46") + self._write("i32.eq") + self._write("i32.and") + self._write("local.get $src_ptr") + self._write("local.get $seg_start") + self._write("i32.const 1") + self._write("i32.add") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.const 46") + self._write("i32.eq") + self._write("i32.and") + self._write("if") + self._indent += 1 + # last_start = start of the last emitted segment in dst: scan back + # from dst_len for the previous '/'; 0 if none. + self._write("i32.const 0") + self._write("local.set $last_start") + self._write("local.get $dst_len") + self._write("local.set $j") + self._write("(block $back_done") + self._indent += 1 + self._write("(loop $back") + self._indent += 1 + self._write("local.get $j") + self._write("i32.eqz") + self._write("br_if $back_done") + self._write("local.get $dst_ptr") + self._write("local.get $j") + self._write("i32.const 1") + self._write("i32.sub") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.const 47") + self._write("i32.eq") + self._write("if") + self._indent += 1 + self._write("local.get $j") + self._write("local.set $last_start") + self._write("br $back_done") + self._indent -= 1 + self._write("end") + self._write("local.get $j") + self._write("i32.const 1") + self._write("i32.sub") + self._write("local.set $j") + self._write("br $back") + self._indent -= 1 + self._write(")") + self._indent -= 1 + self._write(")") + # can_pop = dst_len > 0 AND last segment != '..'. The last segment + # is '..' iff (dst_len - last_start == 2) and both its bytes are + # '.'. Compute "last_is_dotdot". + # If dst_len == 0 -> not poppable. + self._write("local.get $dst_len") + self._write("i32.eqz") + self._write("if") + self._indent += 1 + # empty dst: absolute drops, relative keeps '..'. + self._write("local.get $is_abs") + self._write("if") + self._indent += 1 + self._write("br $scan") + self._indent -= 1 + self._write("end") + # relative + empty: append '..' (no leading '/'). + self._write("local.get $dst_ptr") + self._write("i32.const 46") + self._write("i32.store8") + self._write("local.get $dst_ptr") + self._write("i32.const 1") + self._write("i32.add") + self._write("i32.const 46") + self._write("i32.store8") + self._write("i32.const 2") + self._write("local.set $dst_len") + self._write("br $scan") + self._indent -= 1 + self._write("end") + # dst_len > 0: is the last segment exactly '..'? + self._write("local.get $dst_len") + self._write("local.get $last_start") + self._write("i32.sub") + self._write("i32.const 2") + self._write("i32.eq") + self._write("local.get $dst_ptr") + self._write("local.get $last_start") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.const 46") + self._write("i32.eq") + self._write("i32.and") + self._write("local.get $dst_ptr") + self._write("local.get $last_start") + self._write("i32.const 1") + self._write("i32.add") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.const 46") + self._write("i32.eq") + self._write("i32.and") + self._write("if") + self._indent += 1 + # last segment is a locked leading '..': absolute can't happen here + # (a leading '..' is only kept for relative), so keep another '..'. + self._write("local.get $is_abs") + self._write("if") + self._indent += 1 + self._write("br $scan") + self._indent -= 1 + self._write("end") + # append '/..' (dst_len > 0 so prepend a separator). + self._write("local.get $dst_ptr") + self._write("local.get $dst_len") + self._write("i32.add") + self._write("i32.const 47") + self._write("i32.store8") + self._write("local.get $dst_ptr") + self._write("local.get $dst_len") + self._write("i32.const 1") + self._write("i32.add") + self._write("i32.add") + self._write("i32.const 46") + self._write("i32.store8") + self._write("local.get $dst_ptr") + self._write("local.get $dst_len") + self._write("i32.const 2") + self._write("i32.add") + self._write("i32.add") + self._write("i32.const 46") + self._write("i32.store8") + self._write("local.get $dst_len") + self._write("i32.const 3") + self._write("i32.add") + self._write("local.set $dst_len") + self._write("br $scan") + self._indent -= 1 + self._write("end") + # poppable: truncate dst to last_start (drop the '/segment'). + # last_start is the byte AFTER the separator, so the new length is + # last_start - 1 when last_start > 0 (drop the separator too), or 0. + self._write("local.get $last_start") + self._write("i32.eqz") + self._write("if (result i32)") + self._indent += 1 + self._write("i32.const 0") + self._indent -= 1 + self._write("else") + self._indent += 1 + self._write("local.get $last_start") + self._write("i32.const 1") + self._write("i32.sub") + self._indent -= 1 + self._write("end") + self._write("local.set $dst_len") + self._write("br $scan") + self._indent -= 1 + self._write("end") + # normal segment: append it (prepend '/' when dst_len > 0). + self._write("local.get $dst_len") + self._write("i32.const 0") + self._write("i32.gt_u") + self._write("if") + self._indent += 1 + self._write("local.get $dst_ptr") + self._write("local.get $dst_len") + self._write("i32.add") + self._write("i32.const 47") + self._write("i32.store8") + self._write("local.get $dst_len") + self._write("i32.const 1") + self._write("i32.add") + self._write("local.set $dst_len") + self._indent -= 1 + self._write("end") + # copy seg_len bytes src[seg_start..] -> dst[dst_len..]. + self._write("i32.const 0") + self._write("local.set $j") + self._write("(block $copy_done") + self._indent += 1 + self._write("(loop $copy") + self._indent += 1 + self._write("local.get $j") + self._write("local.get $seg_len") + self._write("i32.ge_u") + self._write("br_if $copy_done") + self._write("local.get $dst_ptr") + self._write("local.get $dst_len") + self._write("i32.add") + self._write("local.get $src_ptr") + self._write("local.get $seg_start") + self._write("i32.add") + self._write("local.get $j") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.store8") + self._write("local.get $dst_len") + self._write("i32.const 1") + self._write("i32.add") + self._write("local.set $dst_len") + self._write("local.get $j") + self._write("i32.const 1") + self._write("i32.add") + self._write("local.set $j") + self._write("br $copy") + self._indent -= 1 + self._write(")") + self._indent -= 1 + self._write(")") + self._write("br $scan") + self._indent -= 1 + self._write(")") + self._indent -= 1 + self._write(")") + # Post-loop: build the final layout. + # Absolute: shift the relative remainder one byte right and write a + # leading '/'. dst currently holds [0, dst_len) of the relative + # remainder; we move it up so index 0 is '/'. + self._write("local.get $is_abs") + self._write("if") + self._indent += 1 + # shift bytes right by 1, from the top down (no overlap clobber). + self._write("local.get $dst_len") + self._write("local.set $j") + self._write("(block $shift_done") + self._indent += 1 + self._write("(loop $shift") + self._indent += 1 + self._write("local.get $j") + self._write("i32.eqz") + self._write("br_if $shift_done") + self._write("local.get $dst_ptr") + self._write("local.get $j") + self._write("i32.add") + self._write("local.get $dst_ptr") + self._write("local.get $j") + self._write("i32.const 1") + self._write("i32.sub") + self._write("i32.add") + self._write("i32.load8_u") + self._write("i32.store8") + self._write("local.get $j") + self._write("i32.const 1") + self._write("i32.sub") + self._write("local.set $j") + self._write("br $shift") + self._indent -= 1 + self._write(")") + self._indent -= 1 + self._write(")") + self._write("local.get $dst_ptr") + self._write("i32.const 47") + self._write("i32.store8") + self._write("local.get $dst_len") + self._write("i32.const 1") + self._write("i32.add") + self._write("return") + self._indent -= 1 + self._write("end") + # Relative + empty result -> '.'. + self._write("local.get $dst_len") + self._write("i32.eqz") + self._write("if") + self._indent += 1 + self._write("local.get $dst_ptr") + self._write("i32.const 46") + self._write("i32.store8") + self._write("i32.const 1") + self._write("return") + self._indent -= 1 + self._write("end") + self._write("local.get $dst_len") + self._indent -= 1 + self._write(")") + def _emit_wasi_fs_path_contained_helper(self) -> None: """``$Fs_path_contained (path_ptr i32, path_len i32, pre_ptr i32, pre_len i32) -> i32`` -> 1 iff ``path`` is the - directory/file ``prefix`` itself or lies under it, by LEXICAL - path-segment containment. + directory/file ``prefix`` itself or lies under it, by path-segment + containment AFTER lexical ``.``/``..`` normalisation. This is the guest-side analogue of the Python oracle's ``Path(os.path.realpath(path)).is_relative_to( os.path.realpath(prefix))`` (``Fs.allows``, ``capa/runtime/_capabilities.py:173-183``). The guest cannot - ``realpath`` (no kernel syscall), so the containment is LEXICAL: - it compares the literal path strings. For CANONICAL paths (no - ``.`` / ``..`` segments, no symlinks, no repeated slashes) the - lexical result is BYTE-IDENTICAL to the oracle, because - ``realpath`` prepends the SAME process CWD to a relative path and - its relative prefix (so the CWD cancels in the containment) and - leaves a canonical absolute path unchanged. For NON-CANONICAL - paths the lexical check may diverge from ``realpath`` -- the - honest, documented Level-2 loss (TOCTOU / symlink) in - ``docs/design/wasi-attenuation.md``. - - Algorithm (matching the segment-aware ``is_relative_to``): + ``realpath`` (no kernel syscall), but it FIRST normalises ``.`` and + ``..`` in BOTH the path and the prefix lexically (``$__fs_normalize``, + the ``os.path.normpath``-style collapse), reproducing what + ``realpath`` does for those segments in the no-symlink case. So + ``sub/../secret.txt`` normalises to ``secret.txt`` (NOT contained + in ``sub`` -> denied, matching the oracle) and ``sub/../sub/ok.txt`` + normalises to ``sub/ok.txt`` (contained -> allowed). For paths + whose ONLY non-canonical feature is ``.``/``..`` the result is now + BYTE-IDENTICAL to the oracle (``realpath`` also prepends the SAME + process CWD to a relative path and its relative prefix, so the CWD + cancels in the containment). SYMLINKS are still NOT resolved -- a + symlink inside the prefix that points outside it is admitted here + (caught only by the Level-1 preopen ceiling); that is the only + remaining Level-2 loss (TOCTOU / symlink) in + ``docs/design/wasi_mode.md``. + + Algorithm (matching the segment-aware ``is_relative_to``), run on + the NORMALISED path / prefix: 1. strip trailing ``/`` from both path and prefix (keep a lone ``/`` as ``/``), so ``dir/`` and ``dir`` compare equal. @@ -2460,6 +2891,54 @@ def _emit_wasi_fs_path_contained_helper(self) -> None: self._write("(local $pl i32)") self._write("(local $ql i32)") self._write("(local $i i32)") + self._write("(local $npath_ptr i32)") + self._write("(local $npath_len i32)") + self._write("(local $npre_ptr i32)") + self._write("(local $npre_len i32)") + # LEXICAL normalisation of '.' / '..' FIRST, on BOTH path and + # prefix, so the containment matches the oracle (which canonicalises + # both via realpath). e.g. "sub/../secret.txt" normalises to + # "secret.txt" (NOT contained in "sub" -> denied), while + # "sub/../sub/ok.txt" normalises to "sub/ok.txt" (contained -> + # allowed). Each output is <= its input length; allocate + # max(len, 1) so an empty input still has a 1-byte buffer for the + # '.' result. Symlinks are NOT resolved (the documented Level-2 + # loss); only '.' / '..' are collapsed. + self._write("local.get $path_len") + self._write("i32.const 1") + self._write("local.get $path_len") + self._write("i32.const 0") + self._write("i32.gt_u") + self._write("select") + self._write("call $alloc") + self._write("local.set $npath_ptr") + self._write("local.get $path_ptr") + self._write("local.get $path_len") + self._write("local.get $npath_ptr") + self._write("call $__fs_normalize") + self._write("local.set $npath_len") + self._write("local.get $pre_len") + self._write("i32.const 1") + self._write("local.get $pre_len") + self._write("i32.const 0") + self._write("i32.gt_u") + self._write("select") + self._write("call $alloc") + self._write("local.set $npre_ptr") + self._write("local.get $pre_ptr") + self._write("local.get $pre_len") + self._write("local.get $npre_ptr") + self._write("call $__fs_normalize") + self._write("local.set $npre_len") + # From here the compare runs on the NORMALISED buffers. + self._write("local.get $npath_ptr") + self._write("local.set $path_ptr") + self._write("local.get $npath_len") + self._write("local.set $path_len") + self._write("local.get $npre_ptr") + self._write("local.set $pre_ptr") + self._write("local.get $npre_len") + self._write("local.set $pre_len") # pl = strip_trailing_slash_len(path); ql = ...(prefix). A # trailing '/' is dropped unless the string is a lone '/'. self._write("local.get $path_ptr") diff --git a/docs/design/wasi_mode.md b/docs/design/wasi_mode.md index 653c961..daade87 100644 --- a/docs/design/wasi_mode.md +++ b/docs/design/wasi_mode.md @@ -34,7 +34,7 @@ see the WAT) rewrites the migrated touch-points: | `Env.restrict_to_keys` | `capa:host/env.restrict-to-keys` (host handle table) | guest-side allow-list intersection (no host) | | `Env.allows` | `capa:host/env.allows` (host handle table) | guest-side allow-list membership (no host) | | `Fs.restrict_to` | `capa:host/fs.restrict-to` (host handle table) | guest-side prefix allow-list union (no host) | -| `Fs.allows` | `capa:host/fs.allows` (host handle table) | guest-side lexical prefix containment (no host) | +| `Fs.allows` | `capa:host/fs.allows` (host handle table) | guest-side prefix containment with lexical `.`/`..` normalisation (no host; symlinks unresolved) | | `Net.get` | `capa:host/net.get` (host handle table) | `wasi:http/outgoing-handler.handle` + the wasi:http request/response chain + `wasi:io/streams` body read, gated guest-side by the static ceiling **and** the fine allow-list | | `Net.post` | `capa:host/net.post` (host handle table) | the Net.get chain + `wasi:io/streams` flow-controlled outgoing-body **write** of the request body before the handle, same two guest-side gates | | `Net.restrict_to` | `capa:host/net.restrict-to` (host handle table) | guest-side host allow-list intersection (no host) | @@ -805,22 +805,33 @@ exactly like Env: `write` / `mkdir` therefore leaves **nothing** on disk -- the gate fires before the file is opened. -**Path containment (LEXICAL, not realpath).** The oracle canonicalises -both the prefix and the queried path with `os.path.realpath` (resolving -`..` / `.` / symlinks) before the `is_relative_to` boundary check. The -guest has **no realpath syscall**, so `$Fs_path_contained` does a -**lexical** path-segment containment: strip trailing `/` from both, then -the path is contained iff its first `len(prefix)` bytes equal the prefix -AND the next byte is `/` or the path IS the prefix (the segment boundary -that stops `data/ab` matching `data/a`). **For CANONICAL paths** (no `.` -/ `..` segments, no symlinks, no repeated slashes) this is -**byte-identical** to the oracle: `realpath` prepends the SAME process -CWD to a relative path and its relative prefix (so the CWD cancels in the -containment) and leaves a canonical absolute path unchanged. **For -NON-CANONICAL paths or symlinks** the lexical check may **diverge** from -the realpath oracle -- the honest, documented **TOCTOU / symlink loss** -of Level 2. The migrated tests use canonical absolute literals, where -parity holds byte-for-byte across all three backends. +**Path containment (LEXICAL `.`/`..` normalisation, symlinks not +resolved).** The oracle canonicalises both the prefix and the queried +path with `os.path.realpath` (resolving `..` / `.` / symlinks) before the +`is_relative_to` boundary check. The guest has **no realpath syscall**, +so `$Fs_path_contained` **lexically normalises** the `.` and `..` +segments of BOTH the path and the prefix FIRST (via `$__fs_normalize`, +the `os.path.normpath`-style collapse that preserves a leading `..` so an +escape stays an escape), and only then does the path-segment containment: +strip trailing `/` from both, then the path is contained iff its first +`len(prefix)` bytes equal the prefix AND the next byte is `/` or the path +IS the prefix (the segment boundary that stops `data/ab` matching +`data/a`). The `.`/`..` normalisation reproduces what `realpath` does for +those segments in the no-symlink case, so a dynamic path such as +`sub/../secret.txt` normalises to `secret.txt` (NOT contained in `sub` -> +**denied**, matching the oracle), while `sub/../sub/ok.txt` normalises to +`sub/ok.txt` (contained -> **allowed**). The `$__fs_normalize` rule is +validated byte-for-byte against `os.path.normpath` and over a 9331-input +segment fuzz (see the scratchpad reference behind +`TestWasiFsDynamicPreopen.test_restricted_fs_dynamic_path_dotdot_normalized`). +`realpath` also prepends the SAME process CWD to a relative path and its +relative prefix (so the CWD cancels in the containment) and leaves a +canonical absolute path unchanged, so for non-symlink paths the result is +**byte-identical** to the oracle. **SYMLINKS are still NOT resolved** -- +the gate is lexical, so a symlink inside a prefix that points outside it +is admitted by the guest (and caught only by the Level-1 preopen +ceiling). That symlink (TOCTOU) loss is the **only** remaining Level-2 +divergence from the realpath oracle; `.` and `..` are now handled. **Interaction Level 1 + Level 2.** The guest-side allow-list (fine, Level 2) operates ON TOP OF the preopen (the Level-1 ceiling): the fine diff --git a/tests/test_wasi_mode.py b/tests/test_wasi_mode.py index deb8098..4578758 100644 --- a/tests/test_wasi_mode.py +++ b/tests/test_wasi_mode.py @@ -1544,6 +1544,31 @@ def test_dynamic_metadata_and_streams_compile(self): self.assertIn("(func $Fs_write", wat) self.assertIn("(func $Fs_list_dir", wat) + def test_mixed_literal_and_dynamic_path_clear_message(self): + # A program that mixes a LITERAL Fs path and a DYNAMIC one under + # --preopen fails closed (no index misalignment), but with a CLEAR + # message naming the b1 limitation and the flag, not the internal + # "no closed preopen ceiling" wording. + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match fs.read(\"fixed.txt\")\n" + " Ok(c) -> stdio.println(c)\n" + " Err(e) -> stdio.println(\"err\")\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.read(p)\n" + " Ok(c) -> stdio.println(c)\n" + " Err(e) -> stdio.println(\"err\")\n" + " None -> stdio.println(\"none\")\n" + ) + with self.assertRaises(Exception) as cm: + self._compile(src, dynamic_fs=True) + msg = str(cm.exception) + self.assertIn("--preopen", msg) + self.assertIn("MIXING", msg) + self.assertNotIn("has no closed preopen ceiling", msg) + def test_operator_preopen_index_is_zero_when_ceiling_open(self): # b1 index rule: with no derived preopens (dynamic ceiling) the # operator preopen is index 0, the constant the dynamic call site @@ -3015,6 +3040,85 @@ def test_restricted_fs_plus_dynamic_path_mitigation(self): self.assertEqual(results["wasi"], results["py"], arg) self.assertEqual(results["wasi"], results["host"], arg) + def test_restricted_fs_dynamic_path_dotdot_normalized(self): + # CRITICAL parity: a DYNAMIC path with '.' / '..' must be LEXICALLY + # normalised before the fine-attenuation containment check, so a + # path that escapes the restrict_to subtree via '..' is DENIED + # (matching the realpath oracle), and one that stays inside after + # normalisation is ADMITTED. Without normalisation the lexical + # prefix "sub/" would match "sub/../secret.txt" and LEAK a sibling. + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let r = fs.restrict_to(\"sub\")\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match r.read(p)\n" + " Ok(c) -> stdio.println(\"READ:${c}\")\n" + " Err(e) -> stdio.println(\"DENIED\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + # (arg, expected). The oracle (os.path.realpath + is_relative_to) + # produces exactly this table; the WASI guest must match it. + table = [ + ("sub/ok.txt", "READ:SUB-OK\n"), # inside -> read + ("sub/../secret.txt", "DENIED\n"), # escapes -> denied + ("sub/../sub2/x.txt", "DENIED\n"), # escapes -> denied + ("secret.txt", "DENIED\n"), # outside -> denied + ("sub/../sub/ok.txt", "READ:SUB-OK\n"), # normalises inside + ("sub/./ok.txt", "READ:SUB-OK\n"), # '.' inside + ] + for arg, expect in table: + results = {} + for be in ("wasi", "py", "host"): + d = self._fresh_dir(f"{be}-dd") + os.makedirs(os.path.join(d, "sub")) + os.makedirs(os.path.join(d, "sub2")) + with open(os.path.join(d, "sub", "ok.txt"), "w") as f: + f.write("SUB-OK") + with open(os.path.join(d, "secret.txt"), "w") as f: + f.write("TOP-SECRET") + with open(os.path.join(d, "sub2", "x.txt"), "w") as f: + f.write("SIBLING") + if be == "wasi": + results[be] = _run_wasi_dynamic_fs(src, d, args=(arg,)) + elif be == "py": + results[be] = _run_python_in_cwd(src, d, args=(arg,)) + else: + results[be] = _run_capa_host_in_cwd(src, d, args=(arg,)) + self.assertEqual(results["wasi"], expect, arg) + self.assertEqual(results["wasi"], results["py"], arg) + self.assertEqual(results["wasi"], results["host"], arg) + + def test_dynamic_dotdot_confined_to_preopen_when_unrestricted(self): + # LEVEL-1 confinement is NOT regressed: an UNRESTRICTED Fs (handle + # 0, no restrict_to) with a dynamic '..' path that tries to escape + # the operator preopen is denied by WASMTIME (the preopen ceiling), + # not by the guest gate. A decoy file sits OUTSIDE the preopen. + src = ( + "fun main(fs: Fs, env: Env, stdio: Stdio)\n" + " let args = env.args()\n" + " match args.get(0)\n" + " Some(p) ->\n" + " match fs.read(p)\n" + " Ok(c) -> stdio.println(\"READ:${c}\")\n" + " Err(e) -> stdio.println(\"DENIED\")\n" + " None -> stdio.println(\"NOARG\")\n" + ) + outer = self._fresh_dir("confine") + preopen = os.path.join(outer, "preopen") + os.makedirs(preopen) + with open(os.path.join(outer, "decoy.txt"), "w") as f: + f.write("OUTSIDE-DECOY") + with open(os.path.join(preopen, "in.txt"), "w") as f: + f.write("INSIDE-OK") + # An escape attempt -> wasmtime denies (Err), not the decoy leaked. + esc = _run_wasi_dynamic_fs(src, preopen, args=("../decoy.txt",)) + self.assertEqual(esc, "DENIED\n") + # The in-preopen read still works. + ok = _run_wasi_dynamic_fs(src, preopen, args=("in.txt",)) + self.assertEqual(ok, "READ:INSIDE-OK\n") + def test_operator_preopen_registered_at_index_zero(self): # The host installs exactly the operator preopen (index 0) for a # dynamic-path program (no derived ceiling).