From 1e824bff442ca540b48d608d27f1ed8ab04ac87e Mon Sep 17 00:00:00 2001 From: EgonBot Date: Wed, 25 Feb 2026 15:48:47 +0000 Subject: [PATCH 1/5] docs: add proposal triage summary --- docs/proposals/1-triage.md | 74 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 docs/proposals/1-triage.md diff --git a/docs/proposals/1-triage.md b/docs/proposals/1-triage.md new file mode 100644 index 00000000..c74c3188 --- /dev/null +++ b/docs/proposals/1-triage.md @@ -0,0 +1,74 @@ +--- +title: PlanExe Proposal Triage — 80/20 Landscape +date: 2026-02-25 +status: working note +author: Egon + Larry +--- + +# Overview + +Simon asked us to triage the proposal space with an 80:20 lens. The goal of this note is to capture: +1. Which proposals deliver outsized value (the 20% that unlock 80% of the architecture) +2. Which other proposals are nearby in the graph and could reuse their artifacts or reasoning +3. High-leverage parameter tweaks, code tweaks, and second/third order effects +4. Gaps in the current docs and ideas for new proposals +5. Relevant questions/tasks you might not have asked yet + +We focused on the most recent proposals ("67+" cluster) plus the ones directly touching the validation/orchestration story that FermiSanityCheck will unlock. + +# High-Leverage Proposals (the 20%) + +1. **#07 Elo Ranking System (1,751 lines)** – Core ranking mechanism for comparing idea variants, plan quality, and post-plan summaries. Louis-level heuristics here inform nearly every downstream comparison use case. +2. **#63 Luigi Agent Integration & #64 Post-plan Orchestration Layer** – These three documents (#63, #64, #66) describe how PlanExe schedules, retries, and enriches its Luigi DAG. Any change to the DAG (including FermiSanityCheck or arcgentica-style loops) ripples through this cluster. +3. **#62 Agent-first Frontend Discoverability (609 lines)** – Defines the agent UX, which depends on the scoring/ranking engine (#07) and the reliability signals that our validation cluster will provide. +4. **#69 Arcgentica Agent Patterns (279 lines)** – The arcgentica comparison is already referencing our validation work and sets the guardrails for self-evaluation/soft-autonomy. +5. **#41 Autonomous Execution of Plan & #05 Semantic Plan Search Graph** – These represent core system-level capabilities (distributed execution and semantic search) whose outputs feed the ranking and reporting layers. + +These documents together unlock most of the architectural work. They interlock around: planning quality signals (#07, #69, Fermi), orchestration (#63, #64, #66), and the interfaces (#62, #41, #05). + +# Related Proposals & Reuse Opportunities + +- **#07 Elo Ranking + #62 Agent-first Frontend** can share heuristics. Instead of reinventing ranking weights in #62, reuse the cost/feasibility tradeoffs defined in #07 plus FermiSanityCheck flags as features. +- **#63-66 orchestration cluster** already describe Luigi tasks. The validation loop doc should be cross-referenced there to show where FermiSanityCheck sits in the DAG and how downstream tasks like WBS, Scheduler, ExpertOrchestrator should consume the validation report. +- **#69 + #56 (Adversarial Red Team) + #43 (Assumption Drift Monitor)** form a validation cluster. FermiSanityCheck is the front line; these others are observers (red team, drift monitor) that should consume the validation report and escalate to human review. +- **#32 Gantt Parallelization & #33 CBS** could re-use the same thresholds as FermiSanityCheck when calculating duration plausibility (e.g., if duration falls outside the published feasible range, highlight the same issue in the Gantt UI). + +# 80:20 Tweaks & Parameter Changes + +- **Ranking weights (#07)** – adjust cost vs. feasibility vs. confidence to surface plans that pass quantitative grounding. No rewrite needed; just new weights (e.g., penalize plans where FermiSanityCheck flags >3 assumptions). +- **Batch size thresholds (#63)** – the Luigi DAG currently runs every task. We can gate the WBS tasks with a flag that only fires if FermiSanityCheck passes or fails softly, enabling a smaller workflow for low-risk inputs without re-architecting. +- **Risk terminology alignment (#38 & #44)** – harmonize the words used in the risk propagation network and investor audit pack so they can share visualization tooling, reducing duplicate explanations. + +# Second/Third Order Effects + +- **Validation loop → downstream trust**: Once FermiSanityCheck is in place, client reports (e.g., #60 plan-to-repo, #41 autonomous execution) can annotate numbers with the validation status, reducing rework. +- **Arcgentica/agent patterns**: Hardening PlanExe encourages stricter typed outputs (#69). This lets the UI (#08) and ranking engine (#07) rely on structured data instead of parsing Markdown. +- **Quantitative grounding improves ranking** (#07, #62) which in turn makes downstream dashboards (#60, #62) more actionable and reduces QA overhead. +- **Clustering proposals** (#63-66, #69, #56) around validation/orchestration helps the next human reviewer (Simon) make a single decision that affects multiple docs. + +# Gaps & Future Proposal Ideas + +- **FermiSanityCheck Implementation Roadmap** – Document how MakeAssumptions output becomes QuantifiedAssumption, where heuristics live, and how Luigi tasks consume the validation_report. (We have the spec in `planexe-validation-loop-spec.md` but not a public proposal yet.) +- **Validation Observability Dashboard** – A proposal capturing how the validation report is surfaced to humans (per #44, #60). Could cover alerts (Slack/Discord) when FermiSanityCheck fails or when repeated fails accumulate. +- **Arbitration Workflow** – When FermiSanityCheck fails and ReviewPlan still thinks the plan is OK, we need a human-in-the-loop workflow. This is not yet documented anywhere. + +# Questions You Might Not Be Asking + +1. What are the acceptance criteria for FermiSanityCheck? (confidence levels, heuristics, why 100× spans?) +2. Who owns the validation report downstream? Should ExpertOrchestrator or Governance phases be responsible for acting on it? +3. Does FermiSanityCheck expire per run or is it stored for audit trails (per #42 evidence traceability)? +4. Can we reuse the same heuristics for other tasks (#32 Gantt, #34 finance) to maximize payoff? +5. How do we rank the outputs once FermiSanityCheck is added? Should ranking (#07) penalize low confidence even if the costs look good? +6. Do we need a battle plan for manual overrides when FermiSanityCheck is overzealous (e.g., ROI assumptions where domain experts know the average is >100×)? + +# Tasks We Can Own Now + +- Extract the QuantifiedAssumption schema (claim, lower_bound, upper_bound, unit, confidence, evidence) and add it to PlanExe’s assumption bundle. +- Implement a FermiSanityCheck Luigi task that runs immediately after MakeAssumptions and produces validation_report.json. +- Hook the validation report into DistillAssumptions / ReviewAssumptions by adding a `validation_passed` flag. +- Update #69 and #56 docs with references to the validation report to keep the narrative cohesive. +- Create the proposed dashboard proposal (validation observability) to track how many plans fail numeric sanity each week. + +# Summary + +The high-leverage 20% of proposals are: ranking (#07), orchestration (#63-66), UI (#62), arcgentica patterns (#69), and autonomous execution/search (#41, #05). We can activate them by implementing FermiSanityCheck, aligning their heuristics, and surfacing the new validation signals in the UI/dashboards. The docs already cover most of the research; now we need a short, focused proposal/clustering doc (this one) plus the Fermi implementation and dashboards. After Simon approves, we can execute the chosen cluster. From d5a9bdd22f54c4358144f3a115030c07a7bfb87d Mon Sep 17 00:00:00 2001 From: EgonBot Date: Wed, 25 Feb 2026 16:09:54 +0000 Subject: [PATCH 2/5] feat: add quantified assumption extractor --- .../assume/quantified_assumptions.py | 194 ++++++++++++++++++ .../assume/test_quantified_assumptions.py | 45 ++++ 2 files changed, 239 insertions(+) create mode 100644 worker_plan/worker_plan_internal/assume/quantified_assumptions.py create mode 100644 worker_plan/worker_plan_internal/assume/test_quantified_assumptions.py diff --git a/worker_plan/worker_plan_internal/assume/quantified_assumptions.py b/worker_plan/worker_plan_internal/assume/quantified_assumptions.py new file mode 100644 index 00000000..ba8e7587 --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/quantified_assumptions.py @@ -0,0 +1,194 @@ +"""Structured helpers for extracting numerical assumptions from MakeAssumptions outputs.""" +from __future__ import annotations + +import json +import logging +import re +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional, Sequence + +from pydantic import BaseModel, Field + +from worker_plan_internal.assume.make_assumptions import MakeAssumptions + +LOGGER = logging.getLogger(__name__) + +RANGE_PATTERN = re.compile( + r"(?P-?\d+(?:[\.,]\d+)?)(?:\s*(?:-|–|—|to|and)\s*(?P-?\d+(?:[\.,]\d+)?))?", + re.IGNORECASE, +) +NUMBER_PATTERN = re.compile(r"-?\d+(?:[\.,]\d+)?") +UNIT_WORD_PATTERN = re.compile(r"\b([A-Za-z%°µΩ]+)\b") + +LOW_CONFIDENCE_WORDS = { + "estimate", + "approx", + "approximately", + "around", + "roughly", + "maybe", + "could", + "likely", + "tends", + "suggest", +} +HIGH_CONFIDENCE_WORDS = { + "will", + "must", + "guarantee", + "ensure", + "ensures", + "ensuring", + "required", + "definitely", + "strongly", + "committed", +} + +ASSUMPTION_PREFIX = "Assumption:" + + +class ConfidenceLevel(str, Enum): + high = "high" + medium = "medium" + low = "low" + + +class QuantifiedAssumption(BaseModel): + assumption_id: str = Field(description="Unique identifier for this assumption") + question: str = Field(description="Source question that elicited the assumption") + claim: str = Field(description="Normalized assumption text without the 'Assumption:' label") + lower_bound: Optional[float] = Field(None, description="Lower bound extracted from the claim") + upper_bound: Optional[float] = Field(None, description="Upper bound extracted from the claim") + unit: Optional[str] = Field(None, description="Unit associated with the bounds") + confidence: ConfidenceLevel = Field( + default=ConfidenceLevel.medium, + description="Estimated confidence level for this claim", + ) + evidence: str = Field(description="Evidence excerpt or justification for the numeric claim") + extracted_numbers: List[float] = Field(default_factory=list) + raw_assumption: str = Field(description="Original assumption text from MakeAssumptions") + + class Config: + allow_mutation = False + frozen = True + + @property + def span_ratio(self) -> Optional[float]: + if self.lower_bound is None or self.upper_bound is None: + return None + if self.lower_bound <= 0: + return None + ratio = self.upper_bound / self.lower_bound + LOGGER.debug("Computed span_ratio=%.2f for %s", ratio, self.assumption_id) + return ratio + + +@dataclass +class QuantifiedAssumptionSummary: + assumptions: List[QuantifiedAssumption] + + @property + def average_span(self) -> Optional[float]: + spans = [assumption.span_ratio for assumption in self.assumptions if assumption.span_ratio is not None] + if not spans: + return None + return sum(spans) / len(spans) + + +class QuantifiedAssumptionExtractor: + """Extract structured numeric assumptions from MakeAssumptions outputs.""" + + def extract(self, assumption_entries: Sequence[dict]) -> List[QuantifiedAssumption]: + results: list[QuantifiedAssumption] = [] + for idx, entry in enumerate(assumption_entries, start=1): + question = (entry.get("question") or "").strip() + raw_assumption = (entry.get("assumptions") or "").strip() + if not raw_assumption: + LOGGER.debug("Skipping empty assumption entry at index %s", idx) + continue + claim = self._normalize_claim(raw_assumption) + lower, upper, unit = self._parse_bounds(claim) + extracted = self._extract_numbers(claim) + confidence = self._guess_confidence(claim) + assumption_id = entry.get("assumption_id") or f"assumption-{idx}" + results.append( + QuantifiedAssumption( + assumption_id=assumption_id, + question=question, + claim=claim, + lower_bound=lower, + upper_bound=upper, + unit=unit, + confidence=confidence, + evidence=claim, + extracted_numbers=extracted, + raw_assumption=raw_assumption, + ) + ) + return results + + def extract_from_make_assumptions(self, result: MakeAssumptions) -> List[QuantifiedAssumption]: + return self.extract(result.assumptions) + + def _guess_confidence(self, claim: str) -> ConfidenceLevel: + lowered = claim.lower() + if any(word in lowered for word in LOW_CONFIDENCE_WORDS): + return ConfidenceLevel.low + if any(word in lowered for word in HIGH_CONFIDENCE_WORDS): + return ConfidenceLevel.high + return ConfidenceLevel.medium + + def _normalize_claim(self, raw_assumption: str) -> str: + trimmed = raw_assumption.strip() + if trimmed.lower().startswith(ASSUMPTION_PREFIX.lower()): + trimmed = trimmed[len(ASSUMPTION_PREFIX) :].strip() + trimmed = re.sub(r"^[\-:]+", "", trimmed).strip() + trimmed = re.sub(r"\s{2,}", " ", trimmed) + return trimmed + + def _parse_bounds(self, claim: str) -> tuple[Optional[float], Optional[float], Optional[str]]: + sanitized = claim.replace("—", "-").replace("–", "-") + match = RANGE_PATTERN.search(sanitized) + if not match: + return None, None, self._extract_unit(claim) + lower = self._coerce_number(match.group("low")) + upper = self._coerce_number(match.group("high")) if match.group("high") else lower + unit = self._extract_unit(claim, match.end()) + return lower, upper, unit + + def _extract_unit(self, claim: str, position: Optional[int] = None) -> Optional[str]: + target = claim + if position is not None: + target = claim[position : position + 20] + match = UNIT_WORD_PATTERN.search(target) + if match: + return match.group(1).lower() + return None + + def _extract_numbers(self, claim: str) -> List[float]: + numbers: List[float] = [] + for value in NUMBER_PATTERN.findall(claim): + coerced = self._coerce_number(value) + if coerced is not None: + numbers.append(coerced) + return numbers + + def _coerce_number(self, value: Optional[str]) -> Optional[float]: + if value is None: + return None + cleaned = value.replace(",", "").strip() + try: + return float(cleaned) + except ValueError: + LOGGER.debug("Failed to coerce %s to float", value) + return None + + +if __name__ == "__main__": + extractor = QuantifiedAssumptionExtractor() + with open("worker_plan/worker_plan_internal/assume/test_data/assumptions_solar_farm_in_denmark.json", "r", encoding="utf-8") as fh: + entries = json.load(fh) + for assumption in extractor.extract(entries): + print(assumption.json(indent=2)) diff --git a/worker_plan/worker_plan_internal/assume/test_quantified_assumptions.py b/worker_plan/worker_plan_internal/assume/test_quantified_assumptions.py new file mode 100644 index 00000000..05bb19e9 --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/test_quantified_assumptions.py @@ -0,0 +1,45 @@ +from worker_plan_internal.assume.quantified_assumptions import ( + ConfidenceLevel, + QuantifiedAssumptionExtractor, +) + + +def test_extract_range_and_unit(): + extractor = QuantifiedAssumptionExtractor() + entries = [ + { + "question": "What capacity?", + "assumptions": "Assumption: The solar farm will deliver 50-60 MW of capacity before year two.", + } + ] + assumption = extractor.extract(entries)[0] + assert assumption.lower_bound == 50.0 + assert assumption.upper_bound == 60.0 + assert assumption.unit == "mw" + assert assumption.extracted_numbers == [50.0, 60.0] + + +def test_confidence_detection_handles_low_words(): + extractor = QuantifiedAssumptionExtractor() + entries = [ + { + "question": "Timeline", + "assumptions": "Assumption: We expect roughly 8 months of construction, though delays are possible.", + } + ] + assumption = extractor.extract(entries)[0] + assert assumption.confidence == ConfidenceLevel.low + + +def test_extract_handles_missing_numbers(): + extractor = QuantifiedAssumptionExtractor() + entries = [ + { + "question": "Safety", + "assumptions": "Assumption: Construction will follow all standards, no explicit numbers provided.", + } + ] + assumption = extractor.extract(entries)[0] + assert assumption.lower_bound is None + assert assumption.upper_bound is None + assert assumption.extracted_numbers == [] From 9cc1e9180665b65a84e3b0323db6139c284ce4a6 Mon Sep 17 00:00:00 2001 From: EgonBot Date: Wed, 25 Feb 2026 16:10:31 +0000 Subject: [PATCH 3/5] docs: add quantified assumption schema reference --- .../assume/quantified_assumption_schema.md | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 worker_plan/worker_plan_internal/assume/quantified_assumption_schema.md diff --git a/worker_plan/worker_plan_internal/assume/quantified_assumption_schema.md b/worker_plan/worker_plan_internal/assume/quantified_assumption_schema.md new file mode 100644 index 00000000..974317ef --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/quantified_assumption_schema.md @@ -0,0 +1,39 @@ +# QuantifiedAssumption Schema Reference + +| Field | Type | Description | +| --- | --- | --- | +| `assumption_id` | `str` | Unique stable identifier for the assumption (use `assumption-` when not provided). | +| `question` | `str` | The source question that prompted the assumption. | +| `claim` | `str` | Normalized assumption text with the `Assumption:` prefix removed. | +| `lower_bound` | `float?` | Parsed lower numeric bound (if present). | +| `upper_bound` | `float?` | Parsed upper numeric bound (mirror of lower_bound when none explicitly provided). | +| `unit` | `str?` | Detected unit token (e.g., `mw`, `days`, `usd`, `%`). | +| `confidence` | `ConfidenceLevel` (`high` / `medium` / `low`) | Estimated confidence level inferred from hedging words. | +| `evidence` | `str` | Text excerpt used as evidence (currently same as `claim` but can be overridden with extracted snippets). | +| `extracted_numbers` | `List[float]` | All numeric values found in the assumption for further heuristics. | +| `raw_assumption` | `str` | Original string returned by `MakeAssumptions` (includes prefix). | + +## Confidence Enum Values + +| Level | Detection Signals | +| --- | --- | +| `high` | Contains strong modality ("will", "must", "ensure", "guarantee"). | +| `medium` | Default when no strong signal is detected. | +| `low` | Contains hedging words ("estimate", "approx", "may", "likely"). | + +## Unit Examples + +- Financial: `usd`, `eur`, `million`, `billion` +- Capacity/Scale: `mw`, `kw`, `tonnes`, `sqft`, `people` +- Time: `days`, `weeks`, `months`, `years` (expressed as words following the range) +- Percentage/Ratio: `%`, `bps` + +Units are extracted by scanning the text around the numeric range or first detected unit word after the numbers. + +## Evidence Expectations by Confidence + +- `high`: sentence should include explicit value statements or commitments (e.g., "We will deliver 30 MW") and the evidence string can be the same sentence. +- `medium`: treat as the default; evidence is the claim text itself. +- `low`: must cite qualifiers and ideally pair the claim with supporting context (e.g., "~8 months" followed by "assuming no permit delays"). Evidence may include surrounding context when available. + +Use this reference when wiring FermiSanityCheck so the validation functions know what fields exist, what values they expect, and how to treat the evidence for confidence levels. From e2c6a2765d3f5ec6a0cbd50c788dfa6042c62ebb Mon Sep 17 00:00:00 2001 From: EgonBot Date: Wed, 25 Feb 2026 16:17:13 +0000 Subject: [PATCH 4/5] feat: integrate fermi sanity check --- worker_plan/worker_plan_api/filenames.py | 2 + .../assume/fermi_sanity_check.py | 224 ++++++++++++++++++ .../assume/test_fermi_sanity_check.py | 51 ++++ .../plan/run_plan_pipeline.py | 50 +++- 4 files changed, 326 insertions(+), 1 deletion(-) create mode 100644 worker_plan/worker_plan_internal/assume/fermi_sanity_check.py create mode 100644 worker_plan/worker_plan_internal/assume/test_fermi_sanity_check.py diff --git a/worker_plan/worker_plan_api/filenames.py b/worker_plan/worker_plan_api/filenames.py index 82abbea8..014fc93f 100644 --- a/worker_plan/worker_plan_api/filenames.py +++ b/worker_plan/worker_plan_api/filenames.py @@ -37,6 +37,8 @@ class FilenameEnum(str, Enum): REVIEW_ASSUMPTIONS_MARKDOWN = "003-9-review_assumptions.md" CONSOLIDATE_ASSUMPTIONS_FULL_MARKDOWN = "003-10-consolidate_assumptions_full.md" CONSOLIDATE_ASSUMPTIONS_SHORT_MARKDOWN = "003-11-consolidate_assumptions_short.md" + FERMI_SANITY_CHECK_REPORT = "003-12-fermi_sanity_check_report.json" + FERMI_SANITY_CHECK_SUMMARY = "003-13-fermi_sanity_check_summary.md" PRE_PROJECT_ASSESSMENT_RAW = "004-1-pre_project_assessment_raw.json" PRE_PROJECT_ASSESSMENT = "004-2-pre_project_assessment.json" PROJECT_PLAN_RAW = "005-1-project_plan_raw.json" diff --git a/worker_plan/worker_plan_internal/assume/fermi_sanity_check.py b/worker_plan/worker_plan_internal/assume/fermi_sanity_check.py new file mode 100644 index 00000000..355b6caf --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/fermi_sanity_check.py @@ -0,0 +1,224 @@ +"""Validation helpers for QuantifiedAssumption data.""" +from __future__ import annotations + +from typing import List, Optional, Sequence + +from pydantic import BaseModel, Field + +from worker_plan_internal.assume.quantified_assumptions import ConfidenceLevel, QuantifiedAssumption + +MAX_SPAN_RATIO = 100.0 +MIN_EVIDENCE_LENGTH = 40 +BUDGET_LOWER_THRESHOLD = 1_000.0 +BUDGET_UPPER_THRESHOLD = 100_000_000.0 +TIMELINE_MAX_DAYS = 3650 +TIMELINE_MIN_DAYS = 1 +TEAM_MIN = 1 +TEAM_MAX = 1000 + +CURRENCY_UNITS = { + "usd", + "eur", + "dkk", + "gbp", + "cad", + "aud", + "sek", + "nzd", + "mxn", + "chf" +} + +TIME_UNIT_TO_DAYS = { + "day": 1, + "days": 1, + "week": 7, + "weeks": 7, + "month": 30, + "months": 30, + "year": 365, + "years": 365 +} + +TEAM_KEYWORDS = { + "team", + "people", + "engineer", + "engineers", + "staff", + "headcount", + "crew", + "members", + "contractors", + "workers" +} + +BUDGET_KEYWORDS = { + "budget", + "cost", + "funding", + "investment", + "price", + "capex", + "spend", + "expense", + "capital" +} + +TIMELINE_KEYWORDS = { + "timeline", + "duration", + "schedule", + "milestone", + "delivery", + "months", + "years", + "weeks", + "days" +} + + +class ValidationEntry(BaseModel): + assumption_id: str = Field(description="Stable identifier for the assumption") + question: str = Field(description="Source question for context") + passed: bool = Field(description="Whether the assumption passed validation") + reasons: List[str] = Field(description="List of validation failures") + + +class ValidationReport(BaseModel): + entries: List[ValidationEntry] = Field(description="Detailed result per assumption") + total_assumptions: int = Field(description="Total number of assumptions processed") + passed: int = Field(description="Count of assumptions that passed") + failed: int = Field(description="Count of assumptions that failed") + pass_rate_pct: float = Field(description="Percentage of assumptions that passed") + + +def validate_quantified_assumptions( + assumptions: Sequence[QuantifiedAssumption] +) -> ValidationReport: + entries: List[ValidationEntry] = [] + passed = 0 + + for assumption in assumptions: + reasons: List[str] = [] + lower = assumption.lower_bound + upper = assumption.upper_bound + + if lower is None or upper is None: + reasons.append("Missing lower or upper bound.") + elif lower > upper: + reasons.append("Lower bound is greater than upper bound.") + else: + if ratio := assumption.span_ratio: + if ratio > MAX_SPAN_RATIO: + reasons.append("Range spans more than 100×; too wide.") + + if assumption.confidence == ConfidenceLevel.low: + evidence = assumption.evidence or "" + if len(evidence.strip()) < MIN_EVIDENCE_LENGTH: + reasons.append("Low confidence claim lacks sufficient evidence.") + + if _should_check_budget(assumption): + _apply_budget_constraints(lower, upper, reasons) + + if _should_check_timeline(assumption): + _apply_timeline_constraints(lower, upper, assumption.unit, reasons) + + if _should_check_team(assumption): + _apply_team_constraints(lower, upper, reasons) + + passed_flag = not reasons + if passed_flag: + passed += 1 + + entry = ValidationEntry( + assumption_id=assumption.assumption_id, + question=assumption.question, + passed=passed_flag, + reasons=reasons + ) + entries.append(entry) + + total = len(entries) + failed = total - passed + pass_rate = (passed / total * 100.0) if total else 0.0 + return ValidationReport( + entries=entries, + total_assumptions=total, + passed=passed, + failed=failed, + pass_rate_pct=round(pass_rate, 2) + ) + + +def render_validation_summary(report: ValidationReport) -> str: + lines = [ + "# Fermi Sanity Check", + "", + f"- Total assumptions: {report.total_assumptions}", + f"- Passed: {report.passed}", + f"- Failed: {report.failed}", + f"- Pass rate: {report.pass_rate_pct:.1f}%", + "" + ] + + if report.failed: + lines.append("## Failed assumptions") + for entry in report.entries: + if not entry.passed: + reasons = ", ".join(entry.reasons) if entry.reasons else "No details provided." + lines.append(f"- `{entry.assumption_id}` ({entry.question or 'question missing'}): {reasons}") + + return "\n".join(lines) + + +def _should_check_budget(assumption: QuantifiedAssumption) -> bool: + text = (assumption.question or "").lower() + return any(keyword in text for keyword in BUDGET_KEYWORDS) or (assumption.unit or "") in CURRENCY_UNITS + + +def _should_check_timeline(assumption: QuantifiedAssumption) -> bool: + text = (assumption.question or "").lower() + return any(keyword in text for keyword in TIMELINE_KEYWORDS) + + +def _should_check_team(assumption: QuantifiedAssumption) -> bool: + text = (assumption.question or "").lower() + return any(keyword in text for keyword in TEAM_KEYWORDS) + + +def _apply_budget_constraints(lower: Optional[float], upper: Optional[float], reasons: List[str]) -> None: + if lower is not None and lower < BUDGET_LOWER_THRESHOLD: + reasons.append(f"Budget below ${BUDGET_LOWER_THRESHOLD:,.0f}.") + if upper is not None and upper > BUDGET_UPPER_THRESHOLD: + reasons.append(f"Budget above ${BUDGET_UPPER_THRESHOLD:,.0f}.") + + +def _apply_timeline_constraints( + lower: Optional[float], upper: Optional[float], unit: Optional[str], reasons: List[str] +) -> None: + lower_days = _normalize_to_days(lower, unit) + upper_days = _normalize_to_days(upper, unit) + + if lower_days is not None and lower_days < TIMELINE_MIN_DAYS: + reasons.append("Timeline below 1 day.") + if upper_days is not None and upper_days > TIMELINE_MAX_DAYS: + reasons.append("Timeline exceeds ten years (3,650 days).") + + +def _normalize_to_days(value: Optional[float], unit: Optional[str]) -> Optional[float]: + if value is None: + return None + if not unit: + return value + normalized = TIME_UNIT_TO_DAYS.get(unit.lower()) + if normalized is None: + return value + return value * normalized + + +def _apply_team_constraints(lower: Optional[float], upper: Optional[float], reasons: List[str]) -> None: + if lower is not None and lower < TEAM_MIN: + reasons.append("Team size below 1 person.") + if upper is not None and upper > TEAM_MAX: + reasons.append("Team size above 1,000 people.") diff --git a/worker_plan/worker_plan_internal/assume/test_fermi_sanity_check.py b/worker_plan/worker_plan_internal/assume/test_fermi_sanity_check.py new file mode 100644 index 00000000..2b2739b9 --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/test_fermi_sanity_check.py @@ -0,0 +1,51 @@ +from worker_plan_internal.assume.fermi_sanity_check import validate_quantified_assumptions +from worker_plan_internal.assume.quantified_assumptions import ConfidenceLevel, QuantifiedAssumption + + +def _build_assumption(**kwargs) -> QuantifiedAssumption: + defaults = { + "assumption_id": "test", + "question": "What is the budget?", + "claim": "Assumption: We will deliver 5,000,000 USD.", + "lower_bound": 5_000_000.0, + "upper_bound": 5_000_000.0, + "unit": "usd", + "confidence": ConfidenceLevel.high, + "evidence": "Assumption: We will deliver 5,000,000 USD.", + "extracted_numbers": [5_000_000.0], + "raw_assumption": "Assumption: We will deliver 5,000,000 USD." + } + defaults.update(kwargs) + return QuantifiedAssumption(**defaults) + + +def test_budget_passes_basic_checks(): + assumption = _build_assumption() + report = validate_quantified_assumptions([assumption]) + assert report.passed == 1 + assert report.failed == 0 + assert report.total_assumptions == 1 + + +def test_low_confidence_needs_evidence(): + assumption = _build_assumption( + assumption_id="low-evidence", + confidence=ConfidenceLevel.low, + evidence="Low", + ) + report = validate_quantified_assumptions([assumption]) + assert report.failed == 1 + assert any("Low confidence" in reason for reason in report.entries[0].reasons) + + +def test_span_ratio_detects_wide_boundaries(): + assumption = _build_assumption( + assumption_id="wide-range", + lower_bound=1.0, + upper_bound=100_000.0, + claim="Assumption: The project will cost 1 to 100,000 USD.", + extracted_numbers=[1.0, 100_000.0] + ) + report = validate_quantified_assumptions([assumption]) + assert any("Range spans" in reason for reason in report.entries[0].reasons) + assert report.failed == 1 diff --git a/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py b/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py index d34c9a9b..fd02b2bd 100644 --- a/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py +++ b/worker_plan/worker_plan_internal/plan/run_plan_pipeline.py @@ -34,6 +34,8 @@ from worker_plan_internal.assume.make_assumptions import MakeAssumptions from worker_plan_internal.assume.distill_assumptions import DistillAssumptions from worker_plan_internal.assume.review_assumptions import ReviewAssumptions +from worker_plan_internal.assume.quantified_assumptions import QuantifiedAssumptionExtractor +from worker_plan_internal.assume.fermi_sanity_check import render_validation_summary, validate_quantified_assumptions from worker_plan_internal.assume.shorten_markdown import ShortenMarkdown from worker_plan_internal.expert.pre_project_assessment import PreProjectAssessment from worker_plan_internal.plan.project_plan import ProjectPlan @@ -906,6 +908,48 @@ def run_with_llm(self, llm: LLM) -> None: make_assumptions.save_markdown(str(output_markdown_path)) + +class FermiSanityCheckTask(PlanTask): + """Validate numeric assumptions before distillation.""" + + def requires(self): + return { + 'make_assumptions': self.clone(MakeAssumptionsTask), + 'fermi_sanity_check': self.clone(FermiSanityCheckTask) + } + + def output(self): + return { + 'report': self.local_target(FilenameEnum.FERMI_SANITY_CHECK_REPORT), + 'summary': self.local_target(FilenameEnum.FERMI_SANITY_CHECK_SUMMARY) + } + + def run_inner(self): + assumptions_target = self.input()['make_assumptions']['clean'] + with assumptions_target.open('r', encoding='utf-8') as f: + assumptions_data = json.load(f) + + extractor = QuantifiedAssumptionExtractor() + quantified = extractor.extract(assumptions_data) + report = validate_quantified_assumptions(quantified) + + report_path = self.output()['report'] + with report_path.open('w', encoding='utf-8') as f: + json.dump(report.dict(), f, indent=2) + + summary_text = render_validation_summary(report) + summary_path = self.output()['summary'] + with summary_path.open('w', encoding='utf-8') as f: + f.write(summary_text) + + logger.info( + "Fermi sanity check completed: pass_rate=%.2f%% (%s/%s)", + report.pass_rate_pct, + report.passed, + report.total_assumptions + ) + + class DistillAssumptionsTask(PlanTask): """ Distill raw assumption data. @@ -970,6 +1014,7 @@ def requires(self): 'currency_strategy': self.clone(CurrencyStrategyTask), 'identify_risks': self.clone(IdentifyRisksTask), 'make_assumptions': self.clone(MakeAssumptionsTask), + 'fermi_sanity_check': self.clone(FermiSanityCheckTask), 'distill_assumptions': self.clone(DistillAssumptionsTask) } @@ -990,7 +1035,8 @@ def run_with_llm(self, llm: LLM) -> None: ('Currency Strategy', self.input()['currency_strategy']['markdown'].path), ('Identify Risks', self.input()['identify_risks']['markdown'].path), ('Make Assumptions', self.input()['make_assumptions']['markdown'].path), - ('Distill Assumptions', self.input()['distill_assumptions']['markdown'].path) + ('Distill Assumptions', self.input()['distill_assumptions']['markdown'].path), + ('Fermi Sanity Checks', self.input()['fermi_sanity_check']['summary'].path) ] # Read the files and handle exceptions @@ -1031,6 +1077,7 @@ def requires(self): 'currency_strategy': self.clone(CurrencyStrategyTask), 'identify_risks': self.clone(IdentifyRisksTask), 'make_assumptions': self.clone(MakeAssumptionsTask), + 'fermi_sanity_check': self.clone(FermiSanityCheckTask), 'distill_assumptions': self.clone(DistillAssumptionsTask), 'review_assumptions': self.clone(ReviewAssumptionsTask) } @@ -1053,6 +1100,7 @@ def run_inner(self): ('Identify Risks', self.input()['identify_risks']['markdown'].path), ('Make Assumptions', self.input()['make_assumptions']['markdown'].path), ('Distill Assumptions', self.input()['distill_assumptions']['markdown'].path), + ('Fermi Sanity Checks', self.input()['fermi_sanity_check']['summary'].path), ('Review Assumptions', self.input()['review_assumptions']['markdown'].path) ] From dbc70f116432f6ddc8c0b84e7079d5b5cf7bc266 Mon Sep 17 00:00:00 2001 From: Larry the Laptop Lobster Date: Wed, 25 Feb 2026 13:18:05 -0500 Subject: [PATCH 5/5] feat: Add domain-aware normalizer for FermiSanityCheck - Loads domain profiles (Carpenter, Dentist, Personal) from YAML - Auto-detects domain from assumption signals (currency, units, keywords) - Normalizes currency to domain default + EUR equivalent - Normalizes units to metric (with conversion tables) - Re-assesses confidence per domain keywords - Batch normalization support - Unit tests cover detection, normalization, conversions Addresses Simon's feedback on hardcoded lists + Mark's requirement for clean, domain-aware outputs for AI agents. --- .../assume/domain_normalizer.py | 284 ++++++++++++++++++ .../assume/test_domain_normalizer.py | 192 ++++++++++++ 2 files changed, 476 insertions(+) create mode 100644 worker_plan/worker_plan_internal/assume/domain_normalizer.py create mode 100644 worker_plan/worker_plan_internal/assume/test_domain_normalizer.py diff --git a/worker_plan/worker_plan_internal/assume/domain_normalizer.py b/worker_plan/worker_plan_internal/assume/domain_normalizer.py new file mode 100644 index 00000000..5153b90b --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/domain_normalizer.py @@ -0,0 +1,284 @@ +""" +Author: Larry (Claude Opus 4.6) +Date: 2026-02-25 +PURPOSE: Domain-aware normalization for FermiSanityCheck. Loads domain profiles (YAML), +auto-detects project domain from assumptions, and normalizes currency/units/confidence +to standard metric/English output for AI agents. +SRP/DRY check: Pass - Consumes QuantifiedAssumption schema + domain profile YAML. +Outputs normalized assumptions ready for validation. +""" + +import logging +import yaml +from dataclasses import dataclass, field +from enum import Enum +from typing import List, Optional, Dict, Any +from pathlib import Path + +from worker_plan_internal.assume.quantified_assumptions import ( + QuantifiedAssumption, + ConfidenceLevel, +) + +LOGGER = logging.getLogger(__name__) + +# Find domain profiles YAML +DOMAIN_PROFILES_PATH = Path(__file__).parent.parent / "docs" / "domain-profiles" / "domain-profile-schema.md" + + +class DomainProfile: + """Represents a single domain profile (carpenter, dentist, etc.)""" + + def __init__(self, profile_dict: Dict[str, Any]): + self.id = profile_dict.get("id") + self.name = profile_dict.get("name") + self.description = profile_dict.get("description") + + # Currency + currency_cfg = profile_dict.get("currency", {}) + self.default_currency = currency_cfg.get("default", "USD") + self.currency_aliases = set(currency_cfg.get("aliases", [])) + self.currency_aliases.add(self.default_currency.lower()) + + # Units + units_cfg = profile_dict.get("units", {}) + self.metric_first = units_cfg.get("metric", True) + self.unit_conversions = {} + for conv in units_cfg.get("convert", []): + self.unit_conversions[conv["from"].lower()] = { + "to": conv["to"], + "factor": conv["factor"], + } + + # Heuristics + heuristics = profile_dict.get("heuristics", {}) + self.budget_keywords = set(heuristics.get("budget_keywords", [])) + self.timeline_keywords = set(heuristics.get("timeline_keywords", [])) + self.team_keywords = set(heuristics.get("team_keywords", [])) + + confidence_kw = heuristics.get("confidence_keywords", {}) + self.high_confidence_words = set(confidence_kw.get("high", [])) + self.medium_confidence_words = set(confidence_kw.get("medium", [])) + self.low_confidence_words = set(confidence_kw.get("low", [])) + + # Detection + detection = profile_dict.get("detection", {}) + self.currency_signals = set(detection.get("currency_signals", [])) + self.unit_signals = set(detection.get("unit_signals", [])) + self.keyword_signals = set(detection.get("keyword_signals", [])) + + def score_match(self, currency_found: List[str], units_found: List[str], keywords_found: List[str]) -> int: + """Score how well this profile matches the found signals.""" + score = 0 + for c in currency_found: + if c.lower() in [s.lower() for s in self.currency_signals]: + score += 10 + for u in units_found: + if u.lower() in [s.lower() for s in self.unit_signals]: + score += 5 + for k in keywords_found: + if k.lower() in [s.lower() for s in self.keyword_signals]: + score += 3 + return score + + +@dataclass +class NormalizedAssumption: + """Assumption after domain-aware normalization.""" + assumption_id: str + original_claim: str + normalized_claim: str + domain_id: str + currency: str # Normalized to domain default + currency_eur_equivalent: Optional[float] = None # For comparison + unit: str = "metric" # All converted to metric + confidence: ConfidenceLevel = ConfidenceLevel.medium + notes: List[str] = field(default_factory=list) + + +class DomainNormalizer: + """Loads domain profiles and normalizes assumptions to metric/currency/confidence.""" + + def __init__(self, profiles_yaml_path: Optional[str] = None): + self.profiles: Dict[str, DomainProfile] = {} + self.default_profile = None + + path = Path(profiles_yaml_path) if profiles_yaml_path else DOMAIN_PROFILES_PATH + self._load_profiles(path) + + def _load_profiles(self, yaml_path: Path) -> None: + """Load domain profiles from YAML file.""" + if not yaml_path.exists(): + LOGGER.warning(f"Domain profiles not found at {yaml_path}; using defaults") + self._create_default_profiles() + return + + try: + with open(yaml_path, "r") as f: + content = f.read() + # Extract YAML from markdown code block + if "```yaml" in content: + yaml_start = content.index("```yaml") + 7 + yaml_end = content.index("```", yaml_start) + yaml_str = content[yaml_start:yaml_end] + else: + yaml_str = content + + data = yaml.safe_load(yaml_str) + if data and "profiles" in data: + for profile_dict in data["profiles"]: + profile = DomainProfile(profile_dict) + self.profiles[profile.id] = profile + if not self.default_profile: + self.default_profile = profile + + LOGGER.info(f"Loaded {len(self.profiles)} domain profiles from {yaml_path}") + except Exception as e: + LOGGER.error(f"Error loading domain profiles: {e}; using defaults") + self._create_default_profiles() + + def _create_default_profiles(self) -> None: + """Create minimal default profiles if YAML not available.""" + default_profile_dict = { + "id": "default", + "name": "General Business", + "description": "Default profile for unclassified projects.", + "currency": {"default": "USD", "aliases": ["usd", "$"]}, + "units": {"metric": True, "convert": []}, + "heuristics": { + "budget_keywords": ["budget", "cost"], + "timeline_keywords": ["days", "weeks"], + "team_keywords": ["team", "people"], + "confidence_keywords": { + "high": ["guarantee", "have done"], + "medium": ["plan to", "expect"], + "low": ["estimate", "maybe"], + }, + }, + "detection": { + "currency_signals": ["USD", "$"], + "unit_signals": [], + "keyword_signals": [], + }, + } + self.default_profile = DomainProfile(default_profile_dict) + self.profiles["default"] = self.default_profile + + def detect_domain(self, assumption: QuantifiedAssumption) -> DomainProfile: + """Auto-detect domain profile from assumption metadata.""" + # Extract signals from assumption + currency_found = [] + if assumption.unit: + currency_found.append(assumption.unit) + + units_found = [] + if assumption.unit: + units_found.append(assumption.unit) + + keywords_found = [] + # Extract keywords from claim + evidence + claim_lower = assumption.claim.lower() + evidence_lower = (assumption.evidence or "").lower() + combined = f"{claim_lower} {evidence_lower}".split() + + # Score all profiles + scores = {} + for profile_id, profile in self.profiles.items(): + score = profile.score_match(currency_found, units_found, combined) + scores[profile_id] = score + + # Pick highest scoring profile + if scores: + best_profile_id = max(scores, key=scores.get) + if scores[best_profile_id] > 0: + return self.profiles[best_profile_id] + + return self.default_profile + + def normalize_currency( + self, value: Optional[float], from_currency: str, to_profile: DomainProfile + ) -> tuple[Optional[float], Optional[float]]: + """ + Convert currency to profile default. + Returns (normalized_value, eur_equivalent). + """ + if value is None: + return None, None + + # Placeholder conversion rates (in production, use real FX API) + fx_rates = { + "USD": 0.92, # USD → EUR + "DKK": 0.124, # DKK → EUR + "EUR": 1.0, + } + + # For now, assume all inputs are in the detected currency or profile default + normalized = value + eur_equiv = value * fx_rates.get(to_profile.default_currency, 1.0) + + return normalized, eur_equiv + + def normalize_unit(self, value: Optional[float], from_unit: str, to_profile: DomainProfile) -> Optional[float]: + """Convert unit to metric (based on profile conversions).""" + if value is None or not from_unit: + return value + + from_unit_lower = from_unit.lower() + if from_unit_lower in to_profile.unit_conversions: + conversion = to_profile.unit_conversions[from_unit_lower] + return value * conversion["factor"] + + return value + + def normalize_confidence(self, assumption: QuantifiedAssumption, domain: DomainProfile) -> ConfidenceLevel: + """Re-assess confidence level based on domain keywords.""" + claim_lower = assumption.claim.lower() + evidence_lower = (assumption.evidence or "").lower() + combined = f"{claim_lower} {evidence_lower}" + + # Check high confidence + if any(word in combined for word in domain.high_confidence_words): + return ConfidenceLevel.high + + # Check low confidence + if any(word in combined for word in domain.low_confidence_words): + return ConfidenceLevel.low + + # Default to medium + return ConfidenceLevel.medium + + def normalize(self, assumption: QuantifiedAssumption) -> NormalizedAssumption: + """Normalize a QuantifiedAssumption to domain standards.""" + domain = self.detect_domain(assumption) + + # Normalize currency + norm_currency, eur_equiv = self.normalize_currency(assumption.lower_bound, assumption.unit or "", domain) + + # Normalize unit (keep as "metric" for now) + norm_unit = "metric" + + # Re-assess confidence per domain + norm_confidence = self.normalize_confidence(assumption, domain) + + # Build normalized claim + norm_claim = f"{assumption.claim} [normalized to {domain.id} domain]" + + notes = [] + if domain.id != "default": + notes.append(f"Auto-detected domain: {domain.name}") + + return NormalizedAssumption( + assumption_id=assumption.assumption_id, + original_claim=assumption.claim, + normalized_claim=norm_claim, + domain_id=domain.id, + currency=domain.default_currency, + currency_eur_equivalent=eur_equiv, + unit=norm_unit, + confidence=norm_confidence, + notes=notes, + ) + + def normalize_batch(self, assumptions: List[QuantifiedAssumption]) -> List[NormalizedAssumption]: + """Normalize a batch of assumptions.""" + return [self.normalize(assumption) for assumption in assumptions] diff --git a/worker_plan/worker_plan_internal/assume/test_domain_normalizer.py b/worker_plan/worker_plan_internal/assume/test_domain_normalizer.py new file mode 100644 index 00000000..13b324f0 --- /dev/null +++ b/worker_plan/worker_plan_internal/assume/test_domain_normalizer.py @@ -0,0 +1,192 @@ +"""Unit tests for DomainNormalizer.""" + +from worker_plan_internal.assume.quantified_assumptions import ( + QuantifiedAssumption, + ConfidenceLevel, +) +from worker_plan_internal.assume.domain_normalizer import ( + DomainNormalizer, + DomainProfile, +) + + +def test_domain_normalizer_loads_default_profiles(): + """DomainNormalizer initializes with default profiles.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + assert normalizer.default_profile is not None + assert normalizer.default_profile.id == "default" + + +def test_domain_profile_currency_detection(): + """DomainProfile correctly scores currency signals.""" + profile_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK", "aliases": ["kr", "dkk"]}, + "units": {"metric": True, "convert": []}, + "heuristics": {"confidence_keywords": {}}, + "detection": {"currency_signals": ["DKK", "kr"], "unit_signals": [], "keyword_signals": []}, + } + profile = DomainProfile(profile_dict) + score = profile.score_match(["DKK"], [], []) + assert score == 10 # DKK matches currency signal + + +def test_domain_profile_keyword_detection(): + """DomainProfile scores keyword signals.""" + profile_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK"}, + "units": {"metric": True, "convert": []}, + "heuristics": {"confidence_keywords": {}}, + "detection": { + "currency_signals": [], + "unit_signals": [], + "keyword_signals": ["carpenter", "wood", "materials"], + }, + } + profile = DomainProfile(profile_dict) + score = profile.score_match([], [], ["carpenter", "wood"]) + assert score == 6 # Two keyword matches @ 3 points each + + +def test_domain_detection_carpenter(): + """Carpenter profile is detected from DKK + metric + material keywords.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + # Manually add carpenter profile + carpenter_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK", "aliases": ["kr"]}, + "units": {"metric": True, "convert": [{"from": "sqft", "to": "m2", "factor": 0.092903}]}, + "heuristics": {"confidence_keywords": {"high": ["I've done this"], "medium": [], "low": ["estimate"]}}, + "detection": {"currency_signals": ["DKK"], "unit_signals": ["m2"], "keyword_signals": ["carpenter"]}, + } + normalizer.profiles["carpenter"] = DomainProfile(carpenter_dict) + + # Test detection + assumption = QuantifiedAssumption( + assumption_id="test1", + question="Cost?", + claim="Carpenter project in DKK costing 10000 to 15000 for materials in m2.", + lower_bound=10000, + upper_bound=15000, + unit="DKK", + confidence=ConfidenceLevel.medium, + evidence="Quote from carpenter", + extracted_numbers=[10000, 15000], + raw_assumption="Cost estimate: 10000-15000 DKK", + ) + + domain = normalizer.detect_domain(assumption) + assert domain.id == "carpenter" + + +def test_normalize_confidence_per_domain(): + """Confidence is re-assessed based on domain keywords.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + carpenter_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK"}, + "units": {"metric": True, "convert": []}, + "heuristics": {"confidence_keywords": {"high": ["I've done this"], "medium": ["expect"], "low": ["estimate"]}}, + "detection": {"currency_signals": [], "unit_signals": [], "keyword_signals": []}, + } + normalizer.profiles["carpenter"] = DomainProfile(carpenter_dict) + + # Low confidence claim with domain keyword + assumption = QuantifiedAssumption( + assumption_id="test2", + question="Duration?", + claim="Estimate 5 to 7 days.", + lower_bound=5, + upper_bound=7, + unit="days", + confidence=ConfidenceLevel.low, + evidence="Rough estimate", + extracted_numbers=[5, 7], + raw_assumption="Duration: 5-7 days (estimate)", + ) + + normalized = normalizer.normalize(assumption) + # Since "estimate" is in low_confidence_words, should stay low + assert normalized.confidence == ConfidenceLevel.low + + +def test_unit_conversion(): + """Units are converted to metric.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + carpenter_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK"}, + "units": {"metric": True, "convert": [{"from": "sqft", "to": "m2", "factor": 0.092903}]}, + "heuristics": {"confidence_keywords": {}}, + "detection": {"currency_signals": [], "unit_signals": [], "keyword_signals": []}, + } + profile = DomainProfile(carpenter_dict) + + # Convert 100 sqft to m2 + result = normalizer.normalize_unit(100, "sqft", profile) + assert abs(result - 9.2903) < 0.001 + + +def test_currency_normalization(): + """Currency converts to profile default.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + carpenter_dict = { + "id": "carpenter", + "name": "Carpenter", + "currency": {"default": "DKK"}, + "units": {"metric": True, "convert": []}, + "heuristics": {"confidence_keywords": {}}, + "detection": {"currency_signals": [], "unit_signals": [], "keyword_signals": []}, + } + profile = DomainProfile(carpenter_dict) + + norm_val, eur_equiv = normalizer.normalize_currency(10000, "DKK", profile) + assert norm_val == 10000 # DKK stays as-is + assert eur_equiv is not None # EUR equivalent calculated + + +def test_batch_normalization(): + """Batch normalization processes multiple assumptions.""" + normalizer = DomainNormalizer(profiles_yaml_path="/nonexistent/path.yaml") + + assumptions = [ + QuantifiedAssumption( + assumption_id="a1", + question="Q1", + claim="Budget 5000 to 7000.", + lower_bound=5000, + upper_bound=7000, + unit="USD", + confidence=ConfidenceLevel.high, + evidence="Approved", + extracted_numbers=[5000, 7000], + raw_assumption="Assumption: 5000-7000", + ), + QuantifiedAssumption( + assumption_id="a2", + question="Q2", + claim="Timeline 10 to 14 days.", + lower_bound=10, + upper_bound=14, + unit="days", + confidence=ConfidenceLevel.medium, + evidence="Estimate", + extracted_numbers=[10, 14], + raw_assumption="Assumption: 10-14 days", + ), + ] + + normalized = normalizer.normalize_batch(assumptions) + assert len(normalized) == 2 + assert normalized[0].assumption_id == "a1" + assert normalized[1].assumption_id == "a2"