diff --git a/pyproject.toml b/pyproject.toml index 95d794c9..1d928f9e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = ["src/agent", "src/llm", "src/observability"] +packages = ["src/agent", "src/evaluation", "src/llm", "src/observability"] [project] name = "assetopsbench-mcp" @@ -42,6 +42,7 @@ wo-mcp-server = "servers.wo.main:main" vibration-mcp-server = "servers.vibration.main:main" openai-agent = "agent.openai_agent.cli:main" deep-agent = "agent.deep_agent.cli:main" +evaluate = "evaluation.cli:main" [dependency-groups] diff --git a/src/evaluation/__init__.py b/src/evaluation/__init__.py index e69de29b..ca632ab1 100644 --- a/src/evaluation/__init__.py +++ b/src/evaluation/__init__.py @@ -0,0 +1,34 @@ +"""Offline evaluation harness for AssetOpsBench agent runs. + +Consumes saved trajectory files (written by +:func:`observability.persistence.persist_trajectory`) and scenario files +(under ``src/scenarios/``) and emits a structured JSON report combining +graded outcomes with operational metrics. + +The shape mirrors conventions from SWE-bench, HELM, and τ-bench: +``run`` (executes the agent — already exists) → ``evaluate`` (this +module) → ``report.json``. Re-grading from saved trajectories is +first-class. +""" + +from .models import ( + AggregateOps, + EvalReport, + GradeResult, + OpsMetrics, + PersistedTrajectory, + Scenario, + ScenarioResult, + TypeBreakdown, +) + +__all__ = [ + "AggregateOps", + "EvalReport", + "GradeResult", + "OpsMetrics", + "PersistedTrajectory", + "Scenario", + "ScenarioResult", + "TypeBreakdown", +] diff --git a/src/evaluation/cli.py b/src/evaluation/cli.py new file mode 100644 index 00000000..80645289 --- /dev/null +++ b/src/evaluation/cli.py @@ -0,0 +1,106 @@ +"""``uv run evaluate`` — offline grading + report generation.""" + +from __future__ import annotations + +import argparse +import logging +import sys +from pathlib import Path + +from . import graders as grader_registry +from .report import render_summary, write_report +from .runner import evaluate + + +def _build_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser( + prog="evaluate", + description=( + "Grade saved agent trajectories against scenario files and " + "emit a JSON report." + ), + ) + p.add_argument( + "--trajectories", + type=Path, + required=True, + help="Directory of {run_id}.json trajectory files (or a single file).", + ) + p.add_argument( + "--scenarios", + type=Path, + nargs="+", + required=True, + help="One or more scenario JSON / JSONL files.", + ) + p.add_argument( + "--output", + type=Path, + required=True, + help="Path to write the JSON report.", + ) + p.add_argument( + "--grader-default", + default="llm_judge", + help="Grader name when scenario.grading_method is unset. " + "Default: llm_judge.", + ) + p.add_argument( + "--judge-model", + default=None, + help="Model id for the LLM judge (e.g. " + "litellm_proxy/anthropic/claude-opus-4-5). " + "Required when any scenario routes to llm_judge.", + ) + p.add_argument( + "-v", + "--verbose", + action="store_true", + help="Enable INFO-level logging.", + ) + return p + + +def _maybe_install_judge(judge_model: str | None) -> None: + if not judge_model: + return + # Imported lazily so the CLI works for deterministic-only runs even + # if the LiteLLM dep happens to be flaky in the dev environment. + from llm import LiteLLMBackend # type: ignore[import-not-found] + + from .graders.llm_judge import install + + install(LiteLLMBackend(model=judge_model)) + + +def _validate_grader_default(name: str) -> None: + try: + grader_registry.get(name) + except KeyError as exc: + raise SystemExit(str(exc)) + + +def main(argv: list[str] | None = None) -> int: + args = _build_parser().parse_args(argv) + logging.basicConfig( + level=logging.INFO if args.verbose else logging.WARNING, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + ) + + _maybe_install_judge(args.judge_model) + _validate_grader_default(args.grader_default) + + report = evaluate( + trajectories_path=args.trajectories, + scenarios_paths=list(args.scenarios), + default_grading_method=args.grader_default, + ) + + out = write_report(report, args.output) + print(render_summary(report)) + print(f"\nReport written: {out}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/evaluation/graders/__init__.py b/src/evaluation/graders/__init__.py new file mode 100644 index 00000000..f58a074e --- /dev/null +++ b/src/evaluation/graders/__init__.py @@ -0,0 +1,36 @@ +"""Pluggable grader registry. + +Each grader is a callable taking ``(scenario, answer, trajectory_text)`` +and returning a :class:`~evaluation.models.GradeResult`. Registration +happens via :func:`register`; the CLI looks up graders by name from +``scenario.grading_method`` (falling back to a CLI-supplied default). +""" + +from __future__ import annotations + +from typing import Callable + +from ..models import GradeResult, Scenario + +Grader = Callable[[Scenario, str, str], GradeResult] + +_REGISTRY: dict[str, Grader] = {} + + +def register(name: str, grader: Grader) -> None: + _REGISTRY[name] = grader + + +def get(name: str) -> Grader: + if name not in _REGISTRY: + raise KeyError( + f"unknown grader {name!r}; registered: {sorted(_REGISTRY)}" + ) + return _REGISTRY[name] + + +def names() -> list[str]: + return sorted(_REGISTRY) + + +from . import deterministic # noqa: E402,F401 — register-on-import diff --git a/src/evaluation/graders/deterministic.py b/src/evaluation/graders/deterministic.py new file mode 100644 index 00000000..35db1c29 --- /dev/null +++ b/src/evaluation/graders/deterministic.py @@ -0,0 +1,71 @@ +"""Pure deterministic graders — no LLM, no network.""" + +from __future__ import annotations + +import math + +from ..models import GradeResult, Scenario +from . import register + + +def exact_string_match( + scenario: Scenario, answer: str, trajectory_text: str +) -> GradeResult: + expected = scenario.expected_answer + if expected is None: + return GradeResult( + grading_method="exact_string_match", + passed=False, + score=0.0, + rationale="scenario has no expected_answer", + ) + + a = str(answer).strip().lower() + e = str(expected).strip().lower() + passed = a == e + return GradeResult( + grading_method="exact_string_match", + passed=passed, + score=1.0 if passed else 0.0, + rationale="" if passed else f"expected {expected!r}, got {answer!r}", + details={"expected": expected, "actual": answer}, + ) + + +def numeric_match( + scenario: Scenario, answer: str, trajectory_text: str +) -> GradeResult: + expected_raw = scenario.expected_answer + extra = scenario.model_extra or {} + tolerance = float(extra.get("tolerance", 1e-6)) + + if expected_raw is None: + return GradeResult( + grading_method="numeric_match", + passed=False, + rationale="scenario has no expected_answer", + ) + + try: + a = float(answer) + e = float(expected_raw) + except (TypeError, ValueError) as err: + return GradeResult( + grading_method="numeric_match", + passed=False, + rationale=f"could not parse numbers: {err}", + details={"expected": expected_raw, "actual": answer}, + ) + + passed = math.isclose(a, e, rel_tol=tolerance, abs_tol=tolerance) + return GradeResult( + grading_method="numeric_match", + passed=passed, + score=1.0 if passed else 0.0, + rationale="" if passed else f"|{a} - {e}| > tol={tolerance}", + details={"expected": e, "actual": a, "tolerance": tolerance}, + ) + + +register("exact_string_match", exact_string_match) +register("numeric_match", numeric_match) diff --git a/src/evaluation/graders/llm_judge.py b/src/evaluation/graders/llm_judge.py new file mode 100644 index 00000000..fb55bf73 --- /dev/null +++ b/src/evaluation/graders/llm_judge.py @@ -0,0 +1,144 @@ +"""LLM-judge grader. + +Free-form answers are scored against ``scenario.characteristic_form`` +using a six-criterion rubric (task completion, data retrieval accuracy, +result verification, agent sequence, clarity, hallucinations) — the +same shape as ``aobench/scenario-server/grading/graders.evaluation_agent`` +but built directly on :class:`~llm.LLMBackend` so the evaluation module +has no dependency on the scenario-server codebase. +""" + +from __future__ import annotations + +import json +import logging +import re + +from llm import LLMBackend + +from ..models import GradeResult, Scenario +from . import register + +_log = logging.getLogger(__name__) + +_RUBRIC_KEYS = ( + "task_completion", + "data_retrieval_accuracy", + "generalized_result_verification", + "agent_sequence_correct", + "clarity_and_justification", + "hallucinations", +) + +_PROMPT_TEMPLATE = """You are an evaluation judge for an industrial-asset-operations agent. + +Score the agent response against the expected characteristic answer using the six criteria below. Respond ONLY with a JSON object, no prose. + +QUESTION: +{question} + +EXPECTED CHARACTERISTIC: +{characteristic} + +AGENT RESPONSE: +{answer} + +AGENT TRAJECTORY (turns / tool calls / outputs): +{trajectory} + +Return JSON with these boolean fields plus a one-sentence reason: + +{{ + "task_completion": , + "data_retrieval_accuracy": , + "generalized_result_verification": , + "agent_sequence_correct": , + "clarity_and_justification": , + "hallucinations": , + "reason": "" +}} + +The agent passes overall iff the first five are true AND hallucinations is false.""" + + +class LLMJudgeGrader: + """Closure-style grader that holds an :class:`LLMBackend`.""" + + def __init__(self, llm: LLMBackend, name: str = "llm_judge") -> None: + self._llm = llm + self.name = name + + def __call__( + self, scenario: Scenario, answer: str, trajectory_text: str + ) -> GradeResult: + characteristic = scenario.characteristic_form or scenario.expected_answer or "" + if not characteristic: + return GradeResult( + grading_method=self.name, + passed=False, + rationale="scenario has neither characteristic_form nor expected_answer", + ) + + prompt = _PROMPT_TEMPLATE.format( + question=scenario.text, + characteristic=characteristic, + answer=answer, + trajectory=trajectory_text[:8000], + ) + + try: + raw = self._llm.generate(prompt) + except Exception as exc: # judge call failure is a grading failure, not a crash + _log.exception("llm_judge: backend error") + return GradeResult( + grading_method=self.name, + passed=False, + rationale=f"judge backend error: {exc}", + ) + + review = _parse_review(raw) + if review is None: + return GradeResult( + grading_method=self.name, + passed=False, + rationale="judge returned unparseable JSON", + details={"raw": raw[:2000]}, + ) + + passed = ( + review.get("task_completion") is True + and review.get("data_retrieval_accuracy") is True + and review.get("generalized_result_verification") is True + and review.get("agent_sequence_correct") is True + and review.get("clarity_and_justification") is True + and review.get("hallucinations") is False + ) + score = sum(1 for k in _RUBRIC_KEYS[:5] if review.get(k) is True) / 5.0 + if review.get("hallucinations") is True: + score = max(0.0, score - 0.2) + + return GradeResult( + grading_method=self.name, + passed=passed, + score=round(score, 3), + rationale=str(review.get("reason", ""))[:500], + details=review, + ) + + +def _parse_review(raw: str) -> dict | None: + if not raw: + return None + # Tolerate leading prose / markdown fences by extracting the first {...} block. + match = re.search(r"\{.*\}", raw, re.DOTALL) + if not match: + return None + try: + return json.loads(match.group(0)) + except json.JSONDecodeError: + return None + + +def install(llm: LLMBackend, name: str = "llm_judge") -> None: + """Register an LLM-judge grader bound to ``llm`` under ``name``.""" + register(name, LLMJudgeGrader(llm, name=name)) diff --git a/src/evaluation/loader.py b/src/evaluation/loader.py new file mode 100644 index 00000000..31b9c761 --- /dev/null +++ b/src/evaluation/loader.py @@ -0,0 +1,93 @@ +"""Load trajectories and scenarios, then join them by ``scenario_id``.""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Iterable, Iterator + +from .models import PersistedTrajectory, Scenario + +_log = logging.getLogger(__name__) + + +def load_trajectories(path: Path) -> list[PersistedTrajectory]: + """Load every ``*.json`` trajectory under ``path``. + + ``path`` may be a directory (the ``AGENT_TRAJECTORY_DIR`` layout) or + a single JSON file. Files that fail to parse are logged and + skipped — a partial directory should still yield a usable batch. + """ + p = Path(path) + if p.is_file(): + return [_load_one(p)] if p.suffix == ".json" else [] + + out: list[PersistedTrajectory] = [] + for child in sorted(p.glob("*.json")): + try: + out.append(_load_one(child)) + except Exception: + _log.exception("loader: failed to parse %s", child) + return out + + +def _load_one(path: Path) -> PersistedTrajectory: + raw = json.loads(path.read_text(encoding="utf-8")) + return PersistedTrajectory.from_raw(raw) + + +def load_scenarios(paths: Iterable[Path] | Path) -> list[Scenario]: + """Load scenarios from one or more files. + + Each file may be a JSON list, a single JSON object, or JSONL. + Scenario IDs are coerced to strings to make the join key uniform + (CouchDB-style trajectories use string IDs; local JSON files use + ints). + """ + if isinstance(paths, (str, Path)): + paths = [Path(paths)] + + out: list[Scenario] = [] + for p in paths: + out.extend(_load_scenario_file(Path(p))) + return out + + +def _load_scenario_file(path: Path) -> list[Scenario]: + text = path.read_text(encoding="utf-8").strip() + if not text: + return [] + + if path.suffix == ".jsonl": + return [ + Scenario.from_raw(json.loads(line)) + for line in text.splitlines() + if line.strip() + ] + + raw = json.loads(text) + if isinstance(raw, list): + return [Scenario.from_raw(item) for item in raw] + if isinstance(raw, dict): + return [Scenario.from_raw(raw)] + raise ValueError(f"unexpected scenario JSON shape in {path}: {type(raw).__name__}") + + +def join_records( + scenarios: list[Scenario], + trajectories: list[PersistedTrajectory], +) -> Iterator[tuple[Scenario, PersistedTrajectory]]: + """Yield (scenario, trajectory) pairs joined on ``scenario_id``. + + Scenarios with no matching trajectory and trajectories with no + matching scenario are silently dropped — the caller can compute the + diff from the input lists if reporting is needed. + """ + by_id: dict[str, Scenario] = {s.id: s for s in scenarios} + for traj in trajectories: + if traj.scenario_id is None: + continue + scenario = by_id.get(traj.scenario_id) + if scenario is not None: + yield scenario, traj diff --git a/src/evaluation/metrics.py b/src/evaluation/metrics.py new file mode 100644 index 00000000..325074a7 --- /dev/null +++ b/src/evaluation/metrics.py @@ -0,0 +1,125 @@ +"""Operational metric extraction and aggregation.""" + +from __future__ import annotations + +import statistics +from typing import Any + +from .models import AggregateOps, OpsMetrics, PersistedTrajectory, ScenarioResult + +# USD per 1M tokens, rough public list-prices. None when unknown. Used +# only for the optional ``est_cost_usd`` rollup; consumers should treat +# it as an estimate, not a billing source of truth. +_PRICE_PER_1M: dict[str, tuple[float, float]] = { + "claude-opus-4-5": (15.0, 75.0), + "claude-opus-4-1": (15.0, 75.0), + "claude-sonnet-4-6": (3.0, 15.0), + "claude-haiku-4-5": (1.0, 5.0), + "gpt-5": (10.0, 30.0), + "gpt-4.1": (3.0, 12.0), + "gpt-4o": (2.5, 10.0), + "llama-4-maverick": (0.27, 0.85), +} + + +def metrics_from_trajectory(record: PersistedTrajectory) -> OpsMetrics: + """Extract per-task ops metrics from a persisted trajectory record.""" + traj = record.trajectory + if traj is None: + return OpsMetrics() + + if isinstance(traj, dict) and "turns" in traj: + return _from_sdk_trajectory(traj, record.model) + if isinstance(traj, list): + return _from_plan_execute(traj, record.model) + return OpsMetrics() + + +def _from_sdk_trajectory(traj: dict, model: str) -> OpsMetrics: + turns = traj.get("turns", []) or [] + tokens_in = sum(int(t.get("input_tokens") or 0) for t in turns) + tokens_out = sum(int(t.get("output_tokens") or 0) for t in turns) + + durations_ms = [t.get("duration_ms") for t in turns if t.get("duration_ms") is not None] + duration_ms = sum(durations_ms) if durations_ms else None + + tool_names: list[str] = [] + for t in turns: + for tc in t.get("tool_calls") or []: + name = tc.get("name") + if name: + tool_names.append(name) + + return OpsMetrics( + turn_count=len(turns), + tool_call_count=len(tool_names), + unique_tools=sorted(set(tool_names)), + tokens_in=tokens_in, + tokens_out=tokens_out, + duration_ms=duration_ms, + est_cost_usd=_estimate_cost(model, tokens_in, tokens_out), + ) + + +def _from_plan_execute(steps: list[Any], model: str) -> OpsMetrics: + # plan-execute persists ``list[StepResult]``; the dataclass exposes + # ``server`` / ``tool`` / ``response`` fields but no per-step token + # counts, so we surface what is available and leave the rest at zero. + tool_names = [ + s.get("tool") + for s in steps + if isinstance(s, dict) and s.get("tool") + ] + return OpsMetrics( + turn_count=len(steps), + tool_call_count=len(tool_names), + unique_tools=sorted(set(tool_names)), + est_cost_usd=_estimate_cost(model, 0, 0), + ) + + +def _estimate_cost(model: str, tokens_in: int, tokens_out: int) -> float | None: + if not model or (tokens_in == 0 and tokens_out == 0): + return None + key = _normalize_model(model) + rate = _PRICE_PER_1M.get(key) + if rate is None: + return None + in_rate, out_rate = rate + return round((tokens_in * in_rate + tokens_out * out_rate) / 1_000_000, 6) + + +def _normalize_model(model: str) -> str: + # Strip provider prefixes like ``litellm_proxy/anthropic/`` and + # version suffixes like ``-20250101``. + tail = model.rsplit("/", 1)[-1].lower() + parts = tail.split("-") + if parts and parts[-1].isdigit() and len(parts[-1]) >= 6: + parts = parts[:-1] + return "-".join(parts) + + +def aggregate_ops(results: list[ScenarioResult]) -> AggregateOps: + if not results: + return AggregateOps() + + durations = [r.ops.duration_ms for r in results if r.ops.duration_ms is not None] + costs = [r.ops.est_cost_usd for r in results if r.ops.est_cost_usd is not None] + + return AggregateOps( + tokens_in_total=sum(r.ops.tokens_in for r in results), + tokens_out_total=sum(r.ops.tokens_out for r in results), + duration_ms_p50=_percentile(durations, 50), + duration_ms_p95=_percentile(durations, 95), + tool_calls_total=sum(r.ops.tool_call_count for r in results), + est_cost_usd_total=round(sum(costs), 6) if costs else None, + ) + + +def _percentile(values: list[float], pct: float) -> float | None: + if not values: + return None + if len(values) == 1: + return float(values[0]) + quantiles = statistics.quantiles(values, n=100, method="inclusive") + return float(quantiles[int(pct) - 1]) diff --git a/src/evaluation/models.py b/src/evaluation/models.py new file mode 100644 index 00000000..846e9891 --- /dev/null +++ b/src/evaluation/models.py @@ -0,0 +1,110 @@ +"""Pydantic models for the offline evaluation pipeline.""" + +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class Scenario(BaseModel): + """One evaluation scenario. + + Mirrors the on-disk shape under ``src/scenarios/`` and is permissive + via ``extra='allow'`` so domain-specific fields (e.g. category, + characteristic_form) survive the round-trip. + """ + + model_config = ConfigDict(extra="allow") + + id: str + text: str + type: str = "" + category: str = "" + characteristic_form: str | None = None + expected_answer: str | None = None + grading_method: str | None = None + + @classmethod + def from_raw(cls, raw: dict) -> "Scenario": + d = dict(raw) + if "id" in d: + d["id"] = str(d["id"]) + return cls.model_validate(d) + + +class PersistedTrajectory(BaseModel): + """Record written by ``observability.persistence.persist_trajectory``.""" + + model_config = ConfigDict(extra="allow") + + run_id: str + scenario_id: str | None = None + runner: str + model: str + question: str + answer: str + trajectory: Any = None + + @classmethod + def from_raw(cls, raw: dict) -> "PersistedTrajectory": + d = dict(raw) + if d.get("scenario_id") is not None: + d["scenario_id"] = str(d["scenario_id"]) + return cls.model_validate(d) + + +class OpsMetrics(BaseModel): + """Per-task operational metrics derived from a trajectory.""" + + turn_count: int = 0 + tool_call_count: int = 0 + unique_tools: list[str] = Field(default_factory=list) + tokens_in: int = 0 + tokens_out: int = 0 + duration_ms: float | None = None + est_cost_usd: float | None = None + + +class GradeResult(BaseModel): + grading_method: str + passed: bool + score: float = 0.0 + rationale: str = "" + details: dict[str, Any] = Field(default_factory=dict) + + +class ScenarioResult(BaseModel): + scenario_id: str + scenario_type: str = "" + runner: str + model: str + question: str + answer: str + grade: GradeResult + ops: OpsMetrics + + +class AggregateOps(BaseModel): + tokens_in_total: int = 0 + tokens_out_total: int = 0 + duration_ms_p50: float | None = None + duration_ms_p95: float | None = None + tool_calls_total: int = 0 + est_cost_usd_total: float | None = None + + +class TypeBreakdown(BaseModel): + total: int = 0 + passed: int = 0 + pass_rate: float = 0.0 + + +class EvalReport(BaseModel): + generated_at: str + runners: list[str] = Field(default_factory=list) + models: list[str] = Field(default_factory=list) + totals: dict[str, Any] = Field(default_factory=dict) + by_scenario_type: dict[str, TypeBreakdown] = Field(default_factory=dict) + ops: AggregateOps = Field(default_factory=AggregateOps) + results: list[ScenarioResult] = Field(default_factory=list) diff --git a/src/evaluation/report.py b/src/evaluation/report.py new file mode 100644 index 00000000..72ff9b0e --- /dev/null +++ b/src/evaluation/report.py @@ -0,0 +1,87 @@ +"""Build an :class:`EvalReport` from graded scenario results.""" + +from __future__ import annotations + +import datetime as _dt +import json +from collections import defaultdict +from pathlib import Path + +from .metrics import aggregate_ops +from .models import EvalReport, ScenarioResult, TypeBreakdown + + +def build_report(results: list[ScenarioResult]) -> EvalReport: + total = len(results) + passed = sum(1 for r in results if r.grade.passed) + + by_type: dict[str, list[ScenarioResult]] = defaultdict(list) + for r in results: + by_type[r.scenario_type or "unknown"].append(r) + + breakdown: dict[str, TypeBreakdown] = {} + for stype, items in by_type.items(): + n = len(items) + p = sum(1 for r in items if r.grade.passed) + breakdown[stype] = TypeBreakdown( + total=n, + passed=p, + pass_rate=round(p / n, 4) if n else 0.0, + ) + + return EvalReport( + generated_at=_dt.datetime.now(_dt.timezone.utc).isoformat(), + runners=sorted({r.runner for r in results}), + models=sorted({r.model for r in results}), + totals={ + "scenarios": total, + "graded": total, + "passed": passed, + "pass_rate": round(passed / total, 4) if total else 0.0, + }, + by_scenario_type=breakdown, + ops=aggregate_ops(results), + results=results, + ) + + +def write_report(report: EvalReport, output: Path) -> Path: + output = Path(output) + output.parent.mkdir(parents=True, exist_ok=True) + output.write_text(report.model_dump_json(indent=2), encoding="utf-8") + return output + + +def render_summary(report: EvalReport) -> str: + lines: list[str] = [] + t = report.totals + lines.append( + f"Scenarios: {t.get('scenarios', 0)} " + f"Passed: {t.get('passed', 0)} " + f"Pass rate: {t.get('pass_rate', 0):.1%}" + ) + if report.by_scenario_type: + lines.append("") + lines.append("By scenario type:") + for stype, b in sorted(report.by_scenario_type.items()): + lines.append( + f" {stype:<16} {b.passed:>4}/{b.total:<4} ({b.pass_rate:.1%})" + ) + o = report.ops + lines.append("") + lines.append("Operational metrics:") + lines.append(f" tokens_in_total: {o.tokens_in_total}") + lines.append(f" tokens_out_total: {o.tokens_out_total}") + lines.append(f" tool_calls_total: {o.tool_calls_total}") + if o.duration_ms_p50 is not None: + lines.append(f" duration_ms_p50: {o.duration_ms_p50:.1f}") + if o.duration_ms_p95 is not None: + lines.append(f" duration_ms_p95: {o.duration_ms_p95:.1f}") + if o.est_cost_usd_total is not None: + lines.append(f" est_cost_usd: ${o.est_cost_usd_total:.4f}") + return "\n".join(lines) + + +def report_to_json(report: EvalReport) -> str: + """Convenience JSON dump that round-trips through pydantic.""" + return json.dumps(json.loads(report.model_dump_json()), indent=2) diff --git a/src/evaluation/runner.py b/src/evaluation/runner.py new file mode 100644 index 00000000..f87da2fa --- /dev/null +++ b/src/evaluation/runner.py @@ -0,0 +1,68 @@ +"""Glue: load → grade → assemble report.""" + +from __future__ import annotations + +import json +import logging +from pathlib import Path + +from . import graders as grader_registry +from .loader import join_records, load_scenarios, load_trajectories +from .metrics import metrics_from_trajectory +from .models import EvalReport, PersistedTrajectory, Scenario, ScenarioResult +from .report import build_report + +_log = logging.getLogger(__name__) + + +def evaluate( + *, + trajectories_path: Path, + scenarios_paths: list[Path], + default_grading_method: str = "llm_judge", +) -> EvalReport: + """Load, grade, and aggregate. + + Per-scenario grader is picked from ``scenario.grading_method`` when + set, falling back to ``default_grading_method``. + """ + scenarios = load_scenarios(scenarios_paths) + trajectories = load_trajectories(trajectories_path) + + results: list[ScenarioResult] = [] + for scenario, traj in join_records(scenarios, trajectories): + results.append(_grade_one(scenario, traj, default_grading_method)) + + return build_report(results) + + +def _grade_one( + scenario: Scenario, + traj: PersistedTrajectory, + default_grading_method: str, +) -> ScenarioResult: + method = scenario.grading_method or default_grading_method + grader = grader_registry.get(method) + trajectory_text = _trajectory_to_text(traj) + grade = grader(scenario, traj.answer, trajectory_text) + + return ScenarioResult( + scenario_id=scenario.id, + scenario_type=scenario.type, + runner=traj.runner, + model=traj.model, + question=traj.question, + answer=traj.answer, + grade=grade, + ops=metrics_from_trajectory(traj), + ) + + +def _trajectory_to_text(traj: PersistedTrajectory) -> str: + """Flatten a trajectory to a text blob for the LLM judge prompt.""" + if traj.trajectory is None: + return "" + try: + return json.dumps(traj.trajectory, indent=2, default=str) + except (TypeError, ValueError): + return str(traj.trajectory) diff --git a/src/evaluation/tests/__init__.py b/src/evaluation/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/evaluation/tests/conftest.py b/src/evaluation/tests/conftest.py new file mode 100644 index 00000000..65eedf7d --- /dev/null +++ b/src/evaluation/tests/conftest.py @@ -0,0 +1,72 @@ +"""Shared fixtures for evaluation unit tests.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from evaluation.models import Scenario + + +@pytest.fixture +def make_scenario(): + def _factory(**overrides) -> Scenario: + defaults = { + "id": "1", + "text": "What sensors are on Chiller 6?", + "type": "iot", + "category": "Knowledge Query", + "characteristic_form": "Should list temperature, pressure, vibration sensors.", + } + defaults.update(overrides) + return Scenario.from_raw(defaults) + + return _factory + + +@pytest.fixture +def make_persisted_record(): + def _factory(**overrides) -> dict: + defaults = { + "run_id": "run-1", + "scenario_id": "1", + "runner": "plan-execute", + "model": "watsonx/ibm/granite", + "question": "Q?", + "answer": "A.", + "trajectory": { + "turns": [ + { + "index": 0, + "text": "thinking", + "tool_calls": [{"name": "sites", "input": {}}], + "input_tokens": 10, + "output_tokens": 5, + "duration_ms": 100.0, + }, + { + "index": 1, + "text": "answer", + "tool_calls": [], + "input_tokens": 12, + "output_tokens": 7, + "duration_ms": 200.0, + }, + ], + "started_at": "2026-04-27T00:00:00Z", + }, + } + defaults.update(overrides) + return defaults + + return _factory + + +@pytest.fixture +def trajectory_dir(tmp_path: Path, make_persisted_record): + """A directory pre-populated with one trajectory JSON file.""" + rec = make_persisted_record() + (tmp_path / f"{rec['run_id']}.json").write_text(json.dumps(rec), encoding="utf-8") + return tmp_path diff --git a/src/evaluation/tests/test_graders.py b/src/evaluation/tests/test_graders.py new file mode 100644 index 00000000..9002e5f9 --- /dev/null +++ b/src/evaluation/tests/test_graders.py @@ -0,0 +1,120 @@ +"""Tests for deterministic + LLM-judge graders.""" + +from __future__ import annotations + +from evaluation import graders as registry +from evaluation.graders.deterministic import exact_string_match, numeric_match +from evaluation.graders.llm_judge import LLMJudgeGrader, install +from llm import LLMBackend + + +class _StubLLM(LLMBackend): + def __init__(self, response: str) -> None: + self._response = response + + def generate(self, prompt: str, temperature: float = 0.0) -> str: + return self._response + + +class TestExactStringMatch: + def test_match_case_insensitive(self, make_scenario): + s = make_scenario(expected_answer="Hello World") + r = exact_string_match(s, "hello world", "") + assert r.passed and r.score == 1.0 + + def test_mismatch(self, make_scenario): + s = make_scenario(expected_answer="foo") + r = exact_string_match(s, "bar", "") + assert not r.passed + assert r.details["expected"] == "foo" + + def test_missing_expected(self, make_scenario): + s = make_scenario(expected_answer=None) + r = exact_string_match(s, "anything", "") + assert not r.passed + assert "expected_answer" in r.rationale + + +class TestNumericMatch: + def test_within_tolerance(self, make_scenario): + s = make_scenario(expected_answer="3.14159") + r = numeric_match(s, "3.141591", "") + assert r.passed + + def test_unparseable(self, make_scenario): + s = make_scenario(expected_answer="3.14") + r = numeric_match(s, "not a number", "") + assert not r.passed + assert "could not parse" in r.rationale + + def test_custom_tolerance(self, make_scenario): + s = make_scenario(expected_answer="100", tolerance=0.05) + r = numeric_match(s, "104", "") + assert r.passed + + +class TestRegistry: + def test_deterministic_graders_registered(self): + assert "exact_string_match" in registry.names() + assert "numeric_match" in registry.names() + + def test_get_unknown_raises(self): + try: + registry.get("does_not_exist") + except KeyError as e: + assert "does_not_exist" in str(e) + else: + raise AssertionError("expected KeyError") + + +class TestLLMJudgeGrader: + def _all_pass_response(self) -> str: + return ( + '{"task_completion": true, "data_retrieval_accuracy": true, ' + '"generalized_result_verification": true, "agent_sequence_correct": true, ' + '"clarity_and_justification": true, "hallucinations": false, ' + '"reason": "Looks good."}' + ) + + def test_passes_when_all_criteria_true(self, make_scenario): + grader = LLMJudgeGrader(_StubLLM(self._all_pass_response())) + r = grader(make_scenario(), "answer", "trajectory") + assert r.passed + assert r.score == 1.0 + assert r.rationale == "Looks good." + + def test_fails_on_hallucination(self, make_scenario): + resp = self._all_pass_response().replace( + '"hallucinations": false', '"hallucinations": true' + ) + grader = LLMJudgeGrader(_StubLLM(resp)) + r = grader(make_scenario(), "answer", "trajectory") + assert not r.passed + # Score is penalized but not zeroed when 5/5 criteria pass. + assert r.score < 1.0 + + def test_handles_unparseable_response(self, make_scenario): + grader = LLMJudgeGrader(_StubLLM("not json at all")) + r = grader(make_scenario(), "a", "t") + assert not r.passed + assert "unparseable" in r.rationale + + def test_handles_markdown_fenced_response(self, make_scenario): + wrapped = "Here you go:\n```json\n" + self._all_pass_response() + "\n```" + grader = LLMJudgeGrader(_StubLLM(wrapped)) + r = grader(make_scenario(), "a", "t") + assert r.passed + + def test_missing_characteristic_short_circuits(self, make_scenario): + grader = LLMJudgeGrader(_StubLLM(self._all_pass_response())) + s = make_scenario(characteristic_form=None, expected_answer=None) + r = grader(s, "a", "t") + assert not r.passed + assert "characteristic_form" in r.rationale + + def test_install_registers_under_default_name(self, make_scenario): + install(_StubLLM(self._all_pass_response())) + assert "llm_judge" in registry.names() + grader = registry.get("llm_judge") + r = grader(make_scenario(), "a", "t") + assert r.passed diff --git a/src/evaluation/tests/test_loader.py b/src/evaluation/tests/test_loader.py new file mode 100644 index 00000000..24260b34 --- /dev/null +++ b/src/evaluation/tests/test_loader.py @@ -0,0 +1,72 @@ +"""Tests for the trajectory + scenario loader.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from evaluation.loader import ( + join_records, + load_scenarios, + load_trajectories, +) +from evaluation.models import Scenario + + +def test_load_trajectories_from_dir(trajectory_dir: Path): + records = load_trajectories(trajectory_dir) + assert len(records) == 1 + assert records[0].run_id == "run-1" + assert records[0].scenario_id == "1" + + +def test_load_trajectories_skips_unparseable(tmp_path: Path, make_persisted_record): + (tmp_path / "good.json").write_text(json.dumps(make_persisted_record()), encoding="utf-8") + (tmp_path / "bad.json").write_text("{not json", encoding="utf-8") + records = load_trajectories(tmp_path) + assert len(records) == 1 + + +def test_load_scenarios_json_list(tmp_path: Path): + p = tmp_path / "s.json" + p.write_text( + json.dumps( + [{"id": 1, "text": "Q1"}, {"id": "2", "text": "Q2"}] + ), + encoding="utf-8", + ) + out = load_scenarios(p) + assert [s.id for s in out] == ["1", "2"] + + +def test_load_scenarios_jsonl(tmp_path: Path): + p = tmp_path / "s.jsonl" + p.write_text( + '{"id": 1, "text": "Q1"}\n{"id": 2, "text": "Q2"}\n', + encoding="utf-8", + ) + out = load_scenarios(p) + assert [s.id for s in out] == ["1", "2"] + + +def test_load_scenarios_single_object(tmp_path: Path): + p = tmp_path / "s.json" + p.write_text(json.dumps({"id": 7, "text": "Q"}), encoding="utf-8") + out = load_scenarios(p) + assert [s.id for s in out] == ["7"] + + +def test_join_drops_orphans(make_persisted_record): + from evaluation.models import PersistedTrajectory + + scenarios = [ + Scenario.from_raw({"id": 1, "text": "Q1"}), + Scenario.from_raw({"id": 2, "text": "Q2"}), + ] + trajs = [ + PersistedTrajectory.from_raw(make_persisted_record(scenario_id=1)), + PersistedTrajectory.from_raw(make_persisted_record(run_id="r2", scenario_id=99)), + ] + pairs = list(join_records(scenarios, trajs)) + assert len(pairs) == 1 + assert pairs[0][0].id == "1" diff --git a/src/evaluation/tests/test_metrics.py b/src/evaluation/tests/test_metrics.py new file mode 100644 index 00000000..80cdef62 --- /dev/null +++ b/src/evaluation/tests/test_metrics.py @@ -0,0 +1,101 @@ +"""Tests for ops metrics extraction and aggregation.""" + +from __future__ import annotations + +from evaluation.metrics import ( + _normalize_model, + aggregate_ops, + metrics_from_trajectory, +) +from evaluation.models import ( + GradeResult, + OpsMetrics, + PersistedTrajectory, + ScenarioResult, +) + + +def _result(passed: bool = True, ops: OpsMetrics | None = None) -> ScenarioResult: + return ScenarioResult( + scenario_id="1", + scenario_type="iot", + runner="plan-execute", + model="watsonx/ibm/granite", + question="q", + answer="a", + grade=GradeResult(grading_method="exact_string_match", passed=passed), + ops=ops or OpsMetrics(), + ) + + +class TestMetricsFromTrajectory: + def test_sdk_trajectory_sums_per_turn(self, make_persisted_record): + rec = PersistedTrajectory.from_raw(make_persisted_record()) + m = metrics_from_trajectory(rec) + assert m.turn_count == 2 + assert m.tokens_in == 22 + assert m.tokens_out == 12 + assert m.tool_call_count == 1 + assert m.unique_tools == ["sites"] + assert m.duration_ms == 300.0 + + def test_handles_none_trajectory(self, make_persisted_record): + rec = PersistedTrajectory.from_raw(make_persisted_record(trajectory=None)) + assert metrics_from_trajectory(rec) == OpsMetrics() + + def test_plan_execute_list_trajectory(self, make_persisted_record): + rec = PersistedTrajectory.from_raw( + make_persisted_record( + trajectory=[ + {"step_number": 1, "task": "t", "server": "iot", "tool": "sites", "response": "ok"}, + {"step_number": 2, "task": "t2", "server": "iot", "tool": "assets", "response": "ok"}, + {"step_number": 3, "task": "t3", "server": "iot", "tool": "sites", "response": "ok"}, + ] + ) + ) + m = metrics_from_trajectory(rec) + assert m.turn_count == 3 + assert m.tool_call_count == 3 + assert m.unique_tools == ["assets", "sites"] + + +class TestAggregateOps: + def test_empty(self): + agg = aggregate_ops([]) + assert agg.tokens_in_total == 0 + assert agg.duration_ms_p50 is None + + def test_sums_and_percentiles(self): + results = [ + _result(ops=OpsMetrics(tokens_in=10, tokens_out=5, duration_ms=100.0, tool_call_count=1)), + _result(ops=OpsMetrics(tokens_in=20, tokens_out=10, duration_ms=300.0, tool_call_count=2)), + _result(ops=OpsMetrics(tokens_in=30, tokens_out=15, duration_ms=500.0, tool_call_count=3)), + ] + agg = aggregate_ops(results) + assert agg.tokens_in_total == 60 + assert agg.tokens_out_total == 30 + assert agg.tool_calls_total == 6 + assert agg.duration_ms_p50 is not None + assert agg.duration_ms_p95 is not None + assert agg.duration_ms_p50 <= agg.duration_ms_p95 + + def test_cost_only_when_some_present(self): + results = [ + _result(ops=OpsMetrics(est_cost_usd=0.01)), + _result(ops=OpsMetrics(est_cost_usd=0.02)), + ] + agg = aggregate_ops(results) + assert agg.est_cost_usd_total == 0.03 + + +class TestNormalizeModel: + def test_strips_provider_prefix(self): + assert _normalize_model("litellm_proxy/anthropic/claude-opus-4-5") == "claude-opus-4-5" + assert _normalize_model("watsonx/ibm/granite-13b") == "granite-13b" + + def test_strips_long_numeric_suffix(self): + assert _normalize_model("claude-opus-4-5-20250101") == "claude-opus-4-5" + + def test_keeps_short_numeric_suffix(self): + # "4-5" suffix is the model version, not a date — leave it intact. + assert _normalize_model("claude-opus-4-5") == "claude-opus-4-5" diff --git a/src/evaluation/tests/test_models.py b/src/evaluation/tests/test_models.py new file mode 100644 index 00000000..4aca4d55 --- /dev/null +++ b/src/evaluation/tests/test_models.py @@ -0,0 +1,45 @@ +"""Tests for evaluation Pydantic models.""" + +from evaluation.models import PersistedTrajectory, Scenario + + +def test_scenario_from_raw_coerces_int_id_to_str(): + s = Scenario.from_raw({"id": 301, "text": "Q"}) + assert s.id == "301" + assert isinstance(s.id, str) + + +def test_scenario_preserves_extra_fields(): + s = Scenario.from_raw({"id": "1", "text": "Q", "characteristic_form": "X", "tolerance": 0.01}) + extra = s.model_extra or {} + assert extra.get("tolerance") == 0.01 + + +def test_persisted_trajectory_coerces_scenario_id(): + t = PersistedTrajectory.from_raw( + { + "run_id": "r", + "scenario_id": 42, + "runner": "plan-execute", + "model": "m", + "question": "q", + "answer": "a", + "trajectory": None, + } + ) + assert t.scenario_id == "42" + + +def test_persisted_trajectory_allows_none_scenario_id(): + t = PersistedTrajectory.from_raw( + { + "run_id": "r", + "scenario_id": None, + "runner": "plan-execute", + "model": "m", + "question": "q", + "answer": "a", + "trajectory": None, + } + ) + assert t.scenario_id is None diff --git a/src/evaluation/tests/test_report.py b/src/evaluation/tests/test_report.py new file mode 100644 index 00000000..14816832 --- /dev/null +++ b/src/evaluation/tests/test_report.py @@ -0,0 +1,74 @@ +"""Tests for EvalReport assembly and serialization.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from evaluation.models import ( + GradeResult, + OpsMetrics, + ScenarioResult, +) +from evaluation.report import build_report, render_summary, write_report + + +def _result(stype: str, passed: bool, **ops_kwargs) -> ScenarioResult: + return ScenarioResult( + scenario_id="x", + scenario_type=stype, + runner="plan-execute", + model="watsonx/ibm/granite", + question="q", + answer="a", + grade=GradeResult(grading_method="llm_judge", passed=passed, score=1.0 if passed else 0.0), + ops=OpsMetrics(**ops_kwargs), + ) + + +def test_build_report_totals_and_breakdown(): + results = [ + _result("iot", True, tokens_in=10, tokens_out=5), + _result("iot", False, tokens_in=8, tokens_out=4), + _result("tsfm", True, tokens_in=20, tokens_out=10), + ] + report = build_report(results) + + assert report.totals == { + "scenarios": 3, + "graded": 3, + "passed": 2, + "pass_rate": round(2 / 3, 4), + } + assert report.by_scenario_type["iot"].total == 2 + assert report.by_scenario_type["iot"].passed == 1 + assert report.by_scenario_type["tsfm"].pass_rate == 1.0 + assert report.ops.tokens_in_total == 38 + + +def test_build_report_handles_empty(): + report = build_report([]) + assert report.totals["scenarios"] == 0 + assert report.totals["pass_rate"] == 0.0 + assert report.by_scenario_type == {} + + +def test_write_report_round_trips(tmp_path: Path): + results = [_result("iot", True)] + report = build_report(results) + out = write_report(report, tmp_path / "nested" / "report.json") + assert out.exists() + data = json.loads(out.read_text(encoding="utf-8")) + assert data["totals"]["passed"] == 1 + assert data["by_scenario_type"]["iot"]["pass_rate"] == 1.0 + + +def test_render_summary_includes_headlines(): + results = [ + _result("iot", True, tokens_in=10, tokens_out=5, duration_ms=100.0, tool_call_count=1), + _result("iot", False, tokens_in=8, tokens_out=4, duration_ms=200.0), + ] + text = render_summary(build_report(results)) + assert "Pass rate" in text + assert "iot" in text + assert "tokens_in_total" in text diff --git a/src/evaluation/tests/test_runner.py b/src/evaluation/tests/test_runner.py new file mode 100644 index 00000000..ffab1688 --- /dev/null +++ b/src/evaluation/tests/test_runner.py @@ -0,0 +1,76 @@ +"""Smoke test for the end-to-end evaluation runner.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from evaluation.models import GradeResult, Scenario +from evaluation.runner import evaluate +from evaluation import graders as registry + + +def _always_pass_grader(scenario: Scenario, answer: str, trajectory_text: str) -> GradeResult: + return GradeResult(grading_method="stub", passed=True, score=1.0) + + +def test_evaluate_end_to_end(tmp_path: Path, make_persisted_record): + # Two trajectories, both joinable to scenarios. + rec_a = make_persisted_record(run_id="run-a", scenario_id=1, answer="A") + rec_b = make_persisted_record(run_id="run-b", scenario_id=2, answer="B") + (tmp_path / "run-a.json").write_text(json.dumps(rec_a), encoding="utf-8") + (tmp_path / "run-b.json").write_text(json.dumps(rec_b), encoding="utf-8") + + scenarios_path = tmp_path / "scenarios.json" + scenarios_path.write_text( + json.dumps( + [ + {"id": 1, "text": "Q1", "type": "iot"}, + {"id": 2, "text": "Q2", "type": "tsfm"}, + ] + ), + encoding="utf-8", + ) + + registry.register("stub", _always_pass_grader) + + report = evaluate( + trajectories_path=tmp_path, + scenarios_paths=[scenarios_path], + default_grading_method="stub", + ) + + assert report.totals["scenarios"] == 2 + assert report.totals["passed"] == 2 + assert set(report.by_scenario_type.keys()) == {"iot", "tsfm"} + assert report.ops.tokens_in_total > 0 + + +def test_evaluate_uses_per_scenario_grading_method(tmp_path: Path, make_persisted_record): + rec = make_persisted_record(run_id="run-x", scenario_id=1) + (tmp_path / "run-x.json").write_text(json.dumps(rec), encoding="utf-8") + + scenarios_path = tmp_path / "scenarios.json" + scenarios_path.write_text( + json.dumps( + [ + { + "id": 1, + "text": "Q", + "type": "iot", + "expected_answer": "A.", + "grading_method": "exact_string_match", + } + ] + ), + encoding="utf-8", + ) + + report = evaluate( + trajectories_path=tmp_path, + scenarios_paths=[scenarios_path], + default_grading_method="numeric_match", # would fail; per-scenario override wins + ) + + assert report.totals["passed"] == 1 + assert report.results[0].grade.grading_method == "exact_string_match"