From 28a56051768221b5933366e6d53975d7bd1d9614 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sat, 7 Mar 2026 23:51:37 +0530 Subject: [PATCH 01/47] move logging utils --- verifiers/envs/experimental/cli_agent_env.py | 2 ++ verifiers/envs/experimental/opencode_env.py | 2 +- verifiers/utils/interception_utils.py | 13 +++++-------- verifiers/utils/logging_utils.py | 5 +++++ 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 1db700520..b2b5ab2af 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -29,6 +29,7 @@ deliver_response, synthesize_stream, ) +from verifiers.utils.logging_utils import truncate from verifiers.utils.message_utils import normalize_messages from verifiers.utils.worker_utils import get_free_port @@ -281,6 +282,7 @@ async def start_agent(self, state: State) -> None: """Start the agent command using background job.""" sandbox_id = state["sandbox_id"] + self.logger.debug(f"Starting agent with {truncate(self.run_command, 100)}") background_job: BackgroundJob = await self.sandbox_client.start_background_job( sandbox_id, self.run_command, diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index 3fea441e2..d62a8f0da 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -10,7 +10,7 @@ import verifiers as vf from verifiers.envs.experimental.cli_agent_env import CliAgentEnv from verifiers.types import AssistantMessage, Messages, ToolCall -from verifiers.utils.interception_utils import _truncate as truncate +from verifiers.utils.logging_utils import truncate logger = logging.getLogger(__name__) diff --git a/verifiers/utils/interception_utils.py b/verifiers/utils/interception_utils.py index 88e4829e6..e398c4abc 100644 --- a/verifiers/utils/interception_utils.py +++ b/verifiers/utils/interception_utils.py @@ -24,6 +24,7 @@ ) from verifiers.types import Response +from verifiers.utils.logging_utils import truncate logger = logging.getLogger(__name__) @@ -434,10 +435,6 @@ def serialize_intercept_response(response: Any) -> dict[str, Any]: return dict(response) -def _truncate(s: str, limit: int = 200) -> str: - return (s[:limit] + "...") if len(s) > limit else s - - def _log_request(rollout_id: str, body: dict) -> None: """Log an intercepted request.""" log_msg = f"[{rollout_id}] <- INTERCEPTED REQUEST" @@ -448,12 +445,12 @@ def _log_request(rollout_id: str, body: dict) -> None: for msg in body.get("messages", []): content = msg.get("content", "") if isinstance(content, str): - log_msg += f"\n[{msg.get('role', '?')}] {_truncate(content)}" + log_msg += f"\n[{msg.get('role', '?')}] {truncate(content)}" else: log_msg += f"\n[{msg.get('role', '?')}] " for tc in msg.get("tool_calls") or []: func = tc.get("function", {}) - log_msg += f"\n[tool_call]\n{func.get('name')}({_truncate(func.get('arguments', ''), 100)})" + log_msg += f"\n[tool_call]\n{func.get('name')}({truncate(func.get('arguments', ''), 100)})" logger.debug(log_msg) @@ -462,8 +459,8 @@ def _log_response(rollout_id: str, response: dict) -> None: log_msg = f"[{rollout_id}] -> RESPONSE" msg = response.get("choices", [{}])[0].get("message", {}) if msg.get("content"): - log_msg += f"\n[assistant]\n{_truncate(msg['content'])}" + log_msg += f"\n[assistant]\n{truncate(msg['content'])}" for tc in msg.get("tool_calls") or []: func = tc.get("function", {}) - log_msg += f"\n[tool_call]\n{func.get('name')}({_truncate(func.get('arguments', ''), 100)})" + log_msg += f"\n[tool_call]\n{func.get('name')}({truncate(func.get('arguments', ''), 100)})" logger.debug(log_msg) diff --git a/verifiers/utils/logging_utils.py b/verifiers/utils/logging_utils.py index 6d0084f64..38591c9f9 100644 --- a/verifiers/utils/logging_utils.py +++ b/verifiers/utils/logging_utils.py @@ -209,3 +209,8 @@ def print_time(time_s: float) -> str: return f"{ms:.0f}ms" else: return f"{time_s:.0f}s" + + +def truncate(s: str, limit: int = 200) -> str: + """Truncate a string to a given length.""" + return (s[:limit] + "...") if len(s) > limit else s From 602177b577cdcaba8f14b3fa6ee1417758ac490e Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sat, 7 Mar 2026 23:58:52 +0530 Subject: [PATCH 02/47] update agent start cmd --- verifiers/envs/experimental/cli_agent_env.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index b2b5ab2af..639676487 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -29,7 +29,6 @@ deliver_response, synthesize_stream, ) -from verifiers.utils.logging_utils import truncate from verifiers.utils.message_utils import normalize_messages from verifiers.utils.worker_utils import get_free_port @@ -282,7 +281,7 @@ async def start_agent(self, state: State) -> None: """Start the agent command using background job.""" sandbox_id = state["sandbox_id"] - self.logger.debug(f"Starting agent with {truncate(self.run_command, 100)}") + self.logger.debug(f"Starting agent in sandbox {sandbox_id}") background_job: BackgroundJob = await self.sandbox_client.start_background_job( sandbox_id, self.run_command, From 4b94f8a75463415cb2a898294b069cbb2627af52 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 04:14:59 +0530 Subject: [PATCH 03/47] adjust wording + remove task management prompt --- verifiers/envs/experimental/opencode_env.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index d62a8f0da..02aa3ad74 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -39,19 +39,13 @@ # Tone and style - Only use emojis if the user explicitly requests it. Avoid using emojis in all communication unless asked. - Your output will be displayed on a command line interface. Your responses should be short and concise. You can use Github-flavored markdown for formatting, and will be rendered in a monospace font using the CommonMark specification. -- Output text to communicate with the user; all text you output outside of tool use is displayed to the user. Only use tools to complete tasks. Never use tools like Bash or code comments as means to communicate with the user during the session. +- Output text to communicate with the user; all text you output outside of tool use is displayed to the user. Only use tools to complete tasks. Never use tools like bash or code comments as means to communicate with the user during the session. - NEVER create files unless they're absolutely necessary for achieving your goal. ALWAYS prefer editing an existing file to creating a new one. This includes markdown files. # Professional objectivity Prioritize technical accuracy and truthfulness over validating the user's beliefs. Focus on facts and problem-solving, providing direct, objective technical info without any unnecessary superlatives, praise, or emotional validation. It is best for the user if OpenCode honestly applies the same rigorous standards to all ideas and disagrees when necessary, even if it may not be what the user wants to hear. Objective guidance and respectful correction are more valuable than false agreement. Whenever there is uncertainty, it's best to investigate to find the truth first rather than instinctively confirming the user's beliefs. """ -TASK_MANAGEMENT_SYSTEM_PROMPT = """\ -# Task Management -You have access to the TodoWrite tools to help you manage and plan tasks. Use these tools frequently to ensure that you are tracking your tasks and giving the user visibility into your progress. These tools are also helpful for planning tasks, and for breaking down larger complex tasks into smaller steps. If you do not use this tool when planning, you may forget to do important tasks - and that is unacceptable. It is critical that you mark todos as completed as soon as you are done with a task. Do not batch up multiple tasks before marking them as completed. -""" - - DEFAULT_INSTALL_COMMAND = ( "curl -fsSL https://opencode.ai/install | bash -s -- --version v1.2.15" ) @@ -183,13 +177,6 @@ def __init__( enable_interleaved=enable_interleaved, ) - if ( - disabled_tools is not None - and system_prompt is not None - and "todowrite" not in disabled_tools - ): - system_prompt += "\n" + TASK_MANAGEMENT_SYSTEM_PROMPT - super().__init__( run_command=run_command, dataset=dataset, From 996362b654cd93cdbb05c7f854d7569bcfc54085 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 05:20:27 +0530 Subject: [PATCH 04/47] fix event loop blocking: offload serialization to thread, skip log string building - zmq_env_server: offload model_dump + msgpack.packb to asyncio.to_thread; serializing large rollout states (O(n_turns^2) token arrays) was blocking the event loop for seconds - interception_utils: guard _log_request/_log_response with isEnabledFor(DEBUG) to skip O(n_messages) string building on every API call when debug is off Co-Authored-By: Claude Sonnet 4.6 --- verifiers/utils/interception_utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/verifiers/utils/interception_utils.py b/verifiers/utils/interception_utils.py index e398c4abc..4a789b193 100644 --- a/verifiers/utils/interception_utils.py +++ b/verifiers/utils/interception_utils.py @@ -437,6 +437,8 @@ def serialize_intercept_response(response: Any) -> dict[str, Any]: def _log_request(rollout_id: str, body: dict) -> None: """Log an intercepted request.""" + if not logger.isEnabledFor(logging.DEBUG): + return log_msg = f"[{rollout_id}] <- INTERCEPTED REQUEST" tools = body.get("tools", []) log_msg += f" ({len(tools)} tool(s))" @@ -456,6 +458,8 @@ def _log_request(rollout_id: str, body: dict) -> None: def _log_response(rollout_id: str, response: dict) -> None: """Log the response from the model.""" + if not logger.isEnabledFor(logging.DEBUG): + return log_msg = f"[{rollout_id}] -> RESPONSE" msg = response.get("choices", [{}])[0].get("message", {}) if msg.get("content"): From 6116061fd72816d474a98d837a277380994459d3 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 05:21:39 +0530 Subject: [PATCH 05/47] fix: offload rollout serialization to thread to unblock event loop model_dump + msgpack.packb on large rollout states (O(n_turns^2) token arrays) was blocking the event loop for seconds per completion. Co-Authored-By: Claude Sonnet 4.6 --- verifiers/workers/server/zmq_env_server.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/verifiers/workers/server/zmq_env_server.py b/verifiers/workers/server/zmq_env_server.py index 5be4dbb82..0b2c28833 100644 --- a/verifiers/workers/server/zmq_env_server.py +++ b/verifiers/workers/server/zmq_env_server.py @@ -5,7 +5,6 @@ import msgpack import zmq import zmq.asyncio - from verifiers.utils.logging_utils import print_time from verifiers.utils.worker_utils import msgpack_encoder from verifiers.workers.server.env_server import EnvServer @@ -242,15 +241,17 @@ async def process_request( error=repr(e), ) - # serialize response using Pydantic - response_bytes = cast( - bytes, - msgpack.packb( - response.model_dump(mode="python", warnings=False), - default=msgpack_encoder, - use_bin_type=True, - ), - ) + def serialize_response() -> bytes: + return cast( + bytes, + msgpack.packb( + response.model_dump(mode="python", warnings=False), + default=msgpack_encoder, + use_bin_type=True, + ), + ) + + response_bytes = await asyncio.to_thread(serialize_response) # send response: [client_id, request_id, response] try: From 003744ef445df114d1fd75ef69e7af3d2c2c7341 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 05:37:51 +0530 Subject: [PATCH 06/47] compact info logs --- verifiers/envs/experimental/cli_agent_env.py | 47 +++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 639676487..c7bf929a3 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -3,6 +3,7 @@ import math import time import uuid +from collections import Counter from typing import Any, cast from prime_sandboxes import ( @@ -17,13 +18,16 @@ from verifiers.clients import Client from verifiers.envs.experimental.sandbox_mixin import SandboxMixin, SandboxMonitorRubric from verifiers.types import ( + AssistantMessage, Messages, MessageType, Response, SamplingArgs, State, Tool, + ToolCall, ) +from verifiers.utils.logging_utils import print_time, truncate from verifiers.utils.interception_utils import ( InterceptionServer, deliver_response, @@ -255,6 +259,17 @@ async def setup_state(self, state: State) -> State: await self.start_agent(state) + prompt_preview = "" + for msg in state.get("prompt", []): + if hasattr(msg, "role") and msg.role == "user": + c = msg.content + prompt_preview = c if isinstance(c, str) else str(c) + logger.info( + f"[start] rollout_id={state['rollout_id']}" + f" | model={state.get('model')}" + f" | prompt={truncate(prompt_preview, 120)!r}" + ) + return state async def get_docker_image(self, state: State) -> str: @@ -583,7 +598,37 @@ async def post_rollout(self, state: State): Override for custom post-rollout logic. For example, if sandbox state is needed for reward functions, run computation here and cache the result in state before sandbox is destroyed. """ - pass + tool_counts: Counter[str] = Counter() + for step in state.get("trajectory", []): + for msg in step.get("completion", []): + if isinstance(msg, AssistantMessage) and isinstance(msg.tool_calls, list): + for tc in msg.tool_calls: + if isinstance(tc, ToolCall): + tool_counts[tc.name] += 1 + + num_turns = len(state.get("trajectory", [])) + stop_condition = state.get("stop_condition", "unknown") + error = state.get("error") + error_info = ( + f"{type(error).__name__}: {truncate(str(error), 80)}" if error else None + ) + exit_code = state.get("agent_exit_code") + timed_out = state.get("agent_timed_out", False) + duration_s = state["timing"].get("total_ms", 0) / 1000 + tools_str = ",".join(f"{k}:{v}" for k, v in tool_counts.most_common()) + parts = [ + f"[end] rollout_id={state.get('rollout_id')}", + f"turns={num_turns}", + f"tools=[{tools_str}]", + f"stop={stop_condition}", + f"exit_code={exit_code}", + f"duration={print_time(duration_s)}", + ] + if timed_out: + parts.append("timed_out=True") + if error_info: + parts.append(f"error={error_info}") + logger.info(" | ".join(parts)) @vf.cleanup async def destroy_sandbox(self, state: State): From 46bb77f276cc4f011984fd2541fc84dd20104aa9 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 05:39:28 +0530 Subject: [PATCH 07/47] ruff --- verifiers/envs/experimental/cli_agent_env.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index c7bf929a3..328b806e0 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -601,7 +601,9 @@ async def post_rollout(self, state: State): tool_counts: Counter[str] = Counter() for step in state.get("trajectory", []): for msg in step.get("completion", []): - if isinstance(msg, AssistantMessage) and isinstance(msg.tool_calls, list): + if isinstance(msg, AssistantMessage) and isinstance( + msg.tool_calls, list + ): for tc in msg.tool_calls: if isinstance(tc, ToolCall): tool_counts[tc.name] += 1 From 17be841b07a3be8d656a14d99159a15b78369961 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 05:44:43 +0530 Subject: [PATCH 08/47] less verbose start log --- verifiers/envs/experimental/cli_agent_env.py | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 328b806e0..1501a2af4 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -27,12 +27,12 @@ Tool, ToolCall, ) -from verifiers.utils.logging_utils import print_time, truncate from verifiers.utils.interception_utils import ( InterceptionServer, deliver_response, synthesize_stream, ) +from verifiers.utils.logging_utils import print_time, truncate from verifiers.utils.message_utils import normalize_messages from verifiers.utils.worker_utils import get_free_port @@ -259,15 +259,8 @@ async def setup_state(self, state: State) -> State: await self.start_agent(state) - prompt_preview = "" - for msg in state.get("prompt", []): - if hasattr(msg, "role") and msg.role == "user": - c = msg.content - prompt_preview = c if isinstance(c, str) else str(c) logger.info( - f"[start] rollout_id={state['rollout_id']}" - f" | model={state.get('model')}" - f" | prompt={truncate(prompt_preview, 120)!r}" + f"[start] rollout_id={state['rollout_id']} example_id={state['example_id']}" ) return state From c73e3c813c835cd41cf23409bc0ea8de498b9304 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 17:00:02 +0530 Subject: [PATCH 09/47] use constant timeout --- verifiers/utils/client_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/verifiers/utils/client_utils.py b/verifiers/utils/client_utils.py index 7e2be8188..cb93c11e6 100644 --- a/verifiers/utils/client_utils.py +++ b/verifiers/utils/client_utils.py @@ -78,14 +78,13 @@ def _build_headers_and_api_key( def _build_http_client( config: ClientConfig, headers: dict[str, str] ) -> httpx.AsyncClient: - timeout = httpx.Timeout(config.timeout, connect=5.0) limits = httpx.Limits( max_connections=config.max_connections, max_keepalive_connections=config.max_keepalive_connections, ) return httpx.AsyncClient( limits=limits, - timeout=timeout, + timeout=config.timeout, headers=headers, ) From 69882bf7dd725fb5f55bfbbc8facb69615941a28 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 17:03:54 +0530 Subject: [PATCH 10/47] 30s timeout --- verifiers/utils/client_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/verifiers/utils/client_utils.py b/verifiers/utils/client_utils.py index cb93c11e6..aa8f372fb 100644 --- a/verifiers/utils/client_utils.py +++ b/verifiers/utils/client_utils.py @@ -78,13 +78,14 @@ def _build_headers_and_api_key( def _build_http_client( config: ClientConfig, headers: dict[str, str] ) -> httpx.AsyncClient: + timeout = httpx.Timeout(config.timeout, connect=30.0) limits = httpx.Limits( max_connections=config.max_connections, max_keepalive_connections=config.max_keepalive_connections, ) return httpx.AsyncClient( limits=limits, - timeout=config.timeout, + timeout=timeout, headers=headers, ) From f6469ba7c2ca26de701a779b6a2ab4c3854f790f Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 17:05:59 +0530 Subject: [PATCH 11/47] bring back task management prompt --- verifiers/envs/experimental/opencode_env.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index 02aa3ad74..ba30e3a76 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -46,6 +46,12 @@ Prioritize technical accuracy and truthfulness over validating the user's beliefs. Focus on facts and problem-solving, providing direct, objective technical info without any unnecessary superlatives, praise, or emotional validation. It is best for the user if OpenCode honestly applies the same rigorous standards to all ideas and disagrees when necessary, even if it may not be what the user wants to hear. Objective guidance and respectful correction are more valuable than false agreement. Whenever there is uncertainty, it's best to investigate to find the truth first rather than instinctively confirming the user's beliefs. """ +TASK_MANAGEMENT_SYSTEM_PROMPT = """\ +# Task Management +You have access to tools to help you manage and plan tasks. Use these tools frequently to ensure that you are tracking your tasks and giving the user visibility into your progress. These tools are also helpful for planning tasks, and for breaking down larger complex tasks into smaller steps. If you do not use this tool when planning, you may forget to do important tasks - and that is unacceptable. It is critical that you mark todos as completed as soon as you are done with a task. Do not batch up multiple tasks before marking them as completed. +""" + + DEFAULT_INSTALL_COMMAND = ( "curl -fsSL https://opencode.ai/install | bash -s -- --version v1.2.15" ) @@ -167,6 +173,13 @@ def __init__( self.disabled_tools = disabled_tools self.provider_timeout_ms = provider_timeout_ms + if ( + disabled_tools is not None + and system_prompt is not None + and "todowrite" not in disabled_tools + ): + system_prompt += "\n" + TASK_MANAGEMENT_SYSTEM_PROMPT + run_command = self.build_run_command( run_command_template, agent_workdir, From c67d4b2ee798d4a69449346f3791fc200d75529d Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 17:18:00 +0530 Subject: [PATCH 12/47] do not redundantly log model abort error + streaming error --- verifiers/utils/interception_utils.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/verifiers/utils/interception_utils.py b/verifiers/utils/interception_utils.py index 4a789b193..d5787def8 100644 --- a/verifiers/utils/interception_utils.py +++ b/verifiers/utils/interception_utils.py @@ -172,7 +172,7 @@ async def _handle_request(self, request: Any) -> Any: except asyncio.CancelledError: return web.json_response({"error": "Rollout cancelled"}, status=499) except Exception as e: - logger.error(f"Error processing intercepted request: {e}") + logger.debug(f"[{rollout_id}] Rollout error surfaced in non-streaming request: {type(e).__name__}: {e}") return web.json_response({"error": str(e)}, status=500) response_dict = serialize_intercept_response(response) @@ -207,12 +207,16 @@ async def _handle_streaming_response( chunk_json = json.dumps(chunk_dict) await response.write(f"data: {chunk_json}\n\n".encode()) - await response_future - except asyncio.CancelledError: logger.debug(f"[{rollout_id}] Streaming cancelled") except Exception as e: logger.error(f"[{rollout_id}] Streaming error: {e}") + return response + + try: + await response_future + except Exception as e: + logger.debug(f"[{rollout_id}] Rollout error surfaced in stream: {type(e).__name__}: {e}") try: await response.write_eof() From 3dde6a9fd1b9e2d088dd47ae0849f878eb56ac9e Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Sun, 8 Mar 2026 17:21:53 +0530 Subject: [PATCH 13/47] minor logging improvements --- verifiers/envs/experimental/cli_agent_env.py | 12 ++++++++---- verifiers/utils/interception_utils.py | 10 +++++++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 1501a2af4..25a621913 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -259,9 +259,11 @@ async def setup_state(self, state: State) -> State: await self.start_agent(state) - logger.info( - f"[start] rollout_id={state['rollout_id']} example_id={state['example_id']}" - ) + parts = [ + f"Started rollout_id={state['rollout_id']}", + f"example_id={state['example_id']}", + ] + logger.info(" | ".join(parts)) return state @@ -601,6 +603,7 @@ async def post_rollout(self, state: State): if isinstance(tc, ToolCall): tool_counts[tc.name] += 1 + example_id = state.get("example_id") num_turns = len(state.get("trajectory", [])) stop_condition = state.get("stop_condition", "unknown") error = state.get("error") @@ -612,7 +615,8 @@ async def post_rollout(self, state: State): duration_s = state["timing"].get("total_ms", 0) / 1000 tools_str = ",".join(f"{k}:{v}" for k, v in tool_counts.most_common()) parts = [ - f"[end] rollout_id={state.get('rollout_id')}", + f"Finished rollout_id={state.get('rollout_id')}", + f"example_id={example_id}", f"turns={num_turns}", f"tools=[{tools_str}]", f"stop={stop_condition}", diff --git a/verifiers/utils/interception_utils.py b/verifiers/utils/interception_utils.py index d5787def8..ca4c4a535 100644 --- a/verifiers/utils/interception_utils.py +++ b/verifiers/utils/interception_utils.py @@ -172,7 +172,9 @@ async def _handle_request(self, request: Any) -> Any: except asyncio.CancelledError: return web.json_response({"error": "Rollout cancelled"}, status=499) except Exception as e: - logger.debug(f"[{rollout_id}] Rollout error surfaced in non-streaming request: {type(e).__name__}: {e}") + logger.debug( + f"[{rollout_id}] Rollout error surfaced in non-streaming request: {type(e).__name__}: {e}" + ) return web.json_response({"error": str(e)}, status=500) response_dict = serialize_intercept_response(response) @@ -216,7 +218,9 @@ async def _handle_streaming_response( try: await response_future except Exception as e: - logger.debug(f"[{rollout_id}] Rollout error surfaced in stream: {type(e).__name__}: {e}") + logger.debug( + f"[{rollout_id}] Rollout error surfaced in stream: {type(e).__name__}: {e}" + ) try: await response.write_eof() @@ -447,7 +451,7 @@ def _log_request(rollout_id: str, body: dict) -> None: tools = body.get("tools", []) log_msg += f" ({len(tools)} tool(s))" if tools: - log_msg += f"\n[tools]\n{', '.join([tool.get('function', {}).get('name', '?') for tool in tools])}" + log_msg += f"\n[tools] {', '.join([tool.get('function', {}).get('name', '?') for tool in tools])}" for msg in body.get("messages", []): content = msg.get("content", "") if isinstance(content, str): From e5f439e8dba9f6e79021f92706fa13f8404d182e Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Mon, 9 Mar 2026 19:19:37 +0530 Subject: [PATCH 14/47] raise agent error --- verifiers/envs/experimental/cli_agent_env.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 25a621913..10c4bce33 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -39,6 +39,10 @@ logger = logging.getLogger(__name__) +class AgentError(vf.InfraError): + """Raised when the agent process fails or exits unexpectedly.""" + + class CliAgentMonitorRubric(vf.Rubric): """Monitor rubric that tracks CLI agent execution state.""" @@ -324,7 +328,9 @@ async def wait_for_completion(self, state: State) -> None: logger.debug("Completion wait task cancelled") raise except Exception as e: - logger.debug(f"Completion wait ended: {e}") + error = AgentError(f"Agent polling failed: {e}") + state["error"] = error + raise error from e finally: state["agent_completed"] = True From db3977902d788310171aefbaf82a6233698319b1 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Mon, 9 Mar 2026 19:25:43 +0530 Subject: [PATCH 15/47] raise sandbox error if agent bg job fails --- verifiers/envs/experimental/cli_agent_env.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 10c4bce33..1ed268e37 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -296,10 +296,15 @@ async def start_agent(self, state: State) -> None: sandbox_id = state["sandbox_id"] self.logger.debug(f"Starting agent in sandbox {sandbox_id}") - background_job: BackgroundJob = await self.sandbox_client.start_background_job( - sandbox_id, - self.run_command, - ) + try: + background_job: BackgroundJob = ( + await self.sandbox_client.start_background_job( + sandbox_id, + self.run_command, + ) + ) + except Exception as e: + raise vf.SandboxError(f"Failed to start agent: {e}") from e state["background_job"] = background_job state["agent_start_time"] = time.time() From 0f46882e5c88a2caf408e9548c4631b96919b149 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Mon, 9 Mar 2026 20:46:39 +0530 Subject: [PATCH 16/47] Add EventLoopBlockingDetector to capture stack traces when event loop blocks >10s Co-Authored-By: Claude Opus 4.6 --- verifiers/utils/async_utils.py | 82 ++++++++++++++++++++++ verifiers/workers/server/zmq_env_server.py | 10 +++ 2 files changed, 92 insertions(+) diff --git a/verifiers/utils/async_utils.py b/verifiers/utils/async_utils.py index e8bf95c22..a42b7960c 100644 --- a/verifiers/utils/async_utils.py +++ b/verifiers/utils/async_utils.py @@ -1,6 +1,9 @@ import asyncio import inspect import logging +import sys +import threading +import traceback from collections.abc import Coroutine from time import perf_counter from typing import Any, AsyncContextManager, Callable, Optional, TypeVar @@ -104,6 +107,85 @@ def run_in_background(self): return asyncio.create_task(self.run()) +class EventLoopBlockingDetector: + """Detects when the event loop is blocked and captures stack traces. + + Runs a watchdog thread that periodically checks if the event loop is + responsive. When the event loop fails to respond within the threshold, + captures stack traces of ALL threads to identify the blocker. + """ + + def __init__( + self, + loop: asyncio.AbstractEventLoop, + threshold: float = 5.0, + check_interval: float = 1.0, + logger: Any | None = None, + ): + self.loop = loop + self.threshold = threshold + self.check_interval = check_interval + self.logger = logger or logging.getLogger(f"{__name__}.{self.__class__.__name__}") + self._stop_event = threading.Event() + self._last_response_time = perf_counter() + self._main_thread_id = threading.get_ident() + self._thread: threading.Thread | None = None + + def start(self): + """Start the blocking detector watchdog thread.""" + self._thread = threading.Thread( + target=self._watchdog_loop, + name="event-loop-blocking-detector", + daemon=True, + ) + self._thread.start() + # Schedule the first heartbeat + self.loop.call_soon_threadsafe(self._heartbeat) + self.logger.info( + f"EventLoopBlockingDetector started (threshold={self.threshold}s)" + ) + + def stop(self): + """Stop the blocking detector.""" + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=5) + + def _heartbeat(self): + """Called on the event loop to record responsiveness.""" + self._last_response_time = perf_counter() + # Schedule next heartbeat + if not self._stop_event.is_set(): + self.loop.call_later(self.check_interval / 2, self._heartbeat) + + def _watchdog_loop(self): + """Runs in a separate thread, checking event loop responsiveness.""" + while not self._stop_event.wait(self.check_interval): + now = perf_counter() + elapsed = now - self._last_response_time + if elapsed > self.threshold: + self._capture_and_log_stacks(elapsed) + + def _capture_and_log_stacks(self, elapsed: float): + """Capture stack traces of all threads when blocking is detected.""" + frames = sys._current_frames() + lines = [ + f"EVENT LOOP BLOCKED for {elapsed:.1f}s! Stack traces of all threads:" + ] + for thread_id, frame in sorted(frames.items()): + thread_name = "unknown" + for t in threading.enumerate(): + if t.ident == thread_id: + thread_name = t.name + break + is_main = thread_id == self._main_thread_id + marker = " [MAIN/EVENT-LOOP]" if is_main else "" + lines.append(f"\n--- Thread {thread_id} ({thread_name}){marker} ---") + lines.append("".join(traceback.format_stack(frame))) + + self.logger.warning("\n".join(lines)) + + def maybe_retry( func: Callable[..., Coroutine[Any, Any, T]], max_retries: int = 0, diff --git a/verifiers/workers/server/zmq_env_server.py b/verifiers/workers/server/zmq_env_server.py index 0b2c28833..11f0731da 100644 --- a/verifiers/workers/server/zmq_env_server.py +++ b/verifiers/workers/server/zmq_env_server.py @@ -7,6 +7,7 @@ import zmq.asyncio from verifiers.utils.logging_utils import print_time from verifiers.utils.worker_utils import msgpack_encoder +from verifiers.utils.async_utils import EventLoopBlockingDetector from verifiers.workers.server.env_server import EnvServer from verifiers.workers.types import ( BaseResponse, @@ -91,6 +92,14 @@ async def serve(self, stop_event: asyncio.Event | None = None) -> None: lag_monitor_task = self.lag_monitor.run_in_background() + # Start blocking detector (captures stack traces when event loop is blocked >10s) + blocking_detector = EventLoopBlockingDetector( + loop=asyncio.get_event_loop(), + threshold=10.0, + logger=self.logger, + ) + blocking_detector.start() + # Start statistics logger log_stats_task = asyncio.create_task(self.log_stats_loop()) @@ -145,6 +154,7 @@ async def serve(self, stop_event: asyncio.Event | None = None) -> None: except Exception as e: self.logger.error(f"Error in server loop: {e}", exc_info=True) finally: + blocking_detector.stop() poller.unregister(self.socket) for t in (log_stats_task, lag_monitor_task): t.cancel() From c717430515f85a92e2aedd8f2bd1fea5690c4eb6 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Mon, 9 Mar 2026 20:58:47 +0530 Subject: [PATCH 17/47] Revert "Add EventLoopBlockingDetector to capture stack traces when event loop blocks >10s" This reverts commit 0f46882e5c88a2caf408e9548c4631b96919b149. --- verifiers/utils/async_utils.py | 82 ---------------------- verifiers/workers/server/zmq_env_server.py | 10 --- 2 files changed, 92 deletions(-) diff --git a/verifiers/utils/async_utils.py b/verifiers/utils/async_utils.py index a42b7960c..e8bf95c22 100644 --- a/verifiers/utils/async_utils.py +++ b/verifiers/utils/async_utils.py @@ -1,9 +1,6 @@ import asyncio import inspect import logging -import sys -import threading -import traceback from collections.abc import Coroutine from time import perf_counter from typing import Any, AsyncContextManager, Callable, Optional, TypeVar @@ -107,85 +104,6 @@ def run_in_background(self): return asyncio.create_task(self.run()) -class EventLoopBlockingDetector: - """Detects when the event loop is blocked and captures stack traces. - - Runs a watchdog thread that periodically checks if the event loop is - responsive. When the event loop fails to respond within the threshold, - captures stack traces of ALL threads to identify the blocker. - """ - - def __init__( - self, - loop: asyncio.AbstractEventLoop, - threshold: float = 5.0, - check_interval: float = 1.0, - logger: Any | None = None, - ): - self.loop = loop - self.threshold = threshold - self.check_interval = check_interval - self.logger = logger or logging.getLogger(f"{__name__}.{self.__class__.__name__}") - self._stop_event = threading.Event() - self._last_response_time = perf_counter() - self._main_thread_id = threading.get_ident() - self._thread: threading.Thread | None = None - - def start(self): - """Start the blocking detector watchdog thread.""" - self._thread = threading.Thread( - target=self._watchdog_loop, - name="event-loop-blocking-detector", - daemon=True, - ) - self._thread.start() - # Schedule the first heartbeat - self.loop.call_soon_threadsafe(self._heartbeat) - self.logger.info( - f"EventLoopBlockingDetector started (threshold={self.threshold}s)" - ) - - def stop(self): - """Stop the blocking detector.""" - self._stop_event.set() - if self._thread is not None: - self._thread.join(timeout=5) - - def _heartbeat(self): - """Called on the event loop to record responsiveness.""" - self._last_response_time = perf_counter() - # Schedule next heartbeat - if not self._stop_event.is_set(): - self.loop.call_later(self.check_interval / 2, self._heartbeat) - - def _watchdog_loop(self): - """Runs in a separate thread, checking event loop responsiveness.""" - while not self._stop_event.wait(self.check_interval): - now = perf_counter() - elapsed = now - self._last_response_time - if elapsed > self.threshold: - self._capture_and_log_stacks(elapsed) - - def _capture_and_log_stacks(self, elapsed: float): - """Capture stack traces of all threads when blocking is detected.""" - frames = sys._current_frames() - lines = [ - f"EVENT LOOP BLOCKED for {elapsed:.1f}s! Stack traces of all threads:" - ] - for thread_id, frame in sorted(frames.items()): - thread_name = "unknown" - for t in threading.enumerate(): - if t.ident == thread_id: - thread_name = t.name - break - is_main = thread_id == self._main_thread_id - marker = " [MAIN/EVENT-LOOP]" if is_main else "" - lines.append(f"\n--- Thread {thread_id} ({thread_name}){marker} ---") - lines.append("".join(traceback.format_stack(frame))) - - self.logger.warning("\n".join(lines)) - - def maybe_retry( func: Callable[..., Coroutine[Any, Any, T]], max_retries: int = 0, diff --git a/verifiers/workers/server/zmq_env_server.py b/verifiers/workers/server/zmq_env_server.py index 11f0731da..0b2c28833 100644 --- a/verifiers/workers/server/zmq_env_server.py +++ b/verifiers/workers/server/zmq_env_server.py @@ -7,7 +7,6 @@ import zmq.asyncio from verifiers.utils.logging_utils import print_time from verifiers.utils.worker_utils import msgpack_encoder -from verifiers.utils.async_utils import EventLoopBlockingDetector from verifiers.workers.server.env_server import EnvServer from verifiers.workers.types import ( BaseResponse, @@ -92,14 +91,6 @@ async def serve(self, stop_event: asyncio.Event | None = None) -> None: lag_monitor_task = self.lag_monitor.run_in_background() - # Start blocking detector (captures stack traces when event loop is blocked >10s) - blocking_detector = EventLoopBlockingDetector( - loop=asyncio.get_event_loop(), - threshold=10.0, - logger=self.logger, - ) - blocking_detector.start() - # Start statistics logger log_stats_task = asyncio.create_task(self.log_stats_loop()) @@ -154,7 +145,6 @@ async def serve(self, stop_event: asyncio.Event | None = None) -> None: except Exception as e: self.logger.error(f"Error in server loop: {e}", exc_info=True) finally: - blocking_detector.stop() poller.unregister(self.socket) for t in (log_stats_task, lag_monitor_task): t.cancel() From a6c5bedd69b9df030d605a67f6b36b9c90c4cb85 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Mon, 9 Mar 2026 23:14:40 +0530 Subject: [PATCH 18/47] add task sys prompt --- verifiers/envs/experimental/opencode_env.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index ba30e3a76..e1d5c2fb4 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -46,12 +46,6 @@ Prioritize technical accuracy and truthfulness over validating the user's beliefs. Focus on facts and problem-solving, providing direct, objective technical info without any unnecessary superlatives, praise, or emotional validation. It is best for the user if OpenCode honestly applies the same rigorous standards to all ideas and disagrees when necessary, even if it may not be what the user wants to hear. Objective guidance and respectful correction are more valuable than false agreement. Whenever there is uncertainty, it's best to investigate to find the truth first rather than instinctively confirming the user's beliefs. """ -TASK_MANAGEMENT_SYSTEM_PROMPT = """\ -# Task Management -You have access to tools to help you manage and plan tasks. Use these tools frequently to ensure that you are tracking your tasks and giving the user visibility into your progress. These tools are also helpful for planning tasks, and for breaking down larger complex tasks into smaller steps. If you do not use this tool when planning, you may forget to do important tasks - and that is unacceptable. It is critical that you mark todos as completed as soon as you are done with a task. Do not batch up multiple tasks before marking them as completed. -""" - - DEFAULT_INSTALL_COMMAND = ( "curl -fsSL https://opencode.ai/install | bash -s -- --version v1.2.15" ) @@ -153,6 +147,8 @@ class OpenCodeEnv(CliAgentEnv): DEFAULT_PROVIDER_TIMEOUT_MS = 1_800_000 # 30min DEFAULT_DISABLE_COMPACTION = True DEFAULT_ENABLE_INTERLEAVED = True + DEFAULT_INCLUDE_TASK_SYSTEM_PROMPT = False + DEFAULT_TASK_SYSTEM_PROMPT = "" def __init__( self, @@ -166,6 +162,8 @@ def __init__( disable_compaction: bool = DEFAULT_DISABLE_COMPACTION, enable_interleaved: bool = DEFAULT_ENABLE_INTERLEAVED, provider_timeout_ms: int = DEFAULT_PROVIDER_TIMEOUT_MS, + task_system_prompt: str = DEFAULT_TASK_SYSTEM_PROMPT, + include_task_system_prompt: bool = DEFAULT_INCLUDE_TASK_SYSTEM_PROMPT, **kwargs, ): self.asset_dir = asset_dir @@ -173,12 +171,8 @@ def __init__( self.disabled_tools = disabled_tools self.provider_timeout_ms = provider_timeout_ms - if ( - disabled_tools is not None - and system_prompt is not None - and "todowrite" not in disabled_tools - ): - system_prompt += "\n" + TASK_MANAGEMENT_SYSTEM_PROMPT + if system_prompt is not None and include_task_system_prompt: + system_prompt += "\n" + task_system_prompt run_command = self.build_run_command( run_command_template, From a41609e44a6400053e0fe4d94bab62fc883660ce Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Tue, 10 Mar 2026 00:23:18 +0530 Subject: [PATCH 19/47] handle bg job polling errors --- verifiers/envs/experimental/cli_agent_env.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 1ed268e37..8911da3d8 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -343,10 +343,23 @@ async def poll_job_completion( self, state: State, sandbox_id: str, background_job: BackgroundJob ) -> None: """Poll until background job completes, capturing output.""" + consecutive_errors = 0 + max_consecutive_errors = 5 while True: - status: BackgroundJobStatus = await self.sandbox_client.get_background_job( - sandbox_id, background_job - ) + try: + status: BackgroundJobStatus = await self.sandbox_client.get_background_job( + sandbox_id, background_job + ) + consecutive_errors = 0 + except Exception as e: + consecutive_errors += 1 + logger.warning( + f"Polling error ({consecutive_errors}/{max_consecutive_errors}): {e}" + ) + if consecutive_errors >= max_consecutive_errors: + raise + await asyncio.sleep(2) + continue if status.completed: state["agent_exit_code"] = status.exit_code state["agent_stdout"] = status.stdout From a88a2167254fb3c4e34c38a7f4f8690987fe790c Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Tue, 10 Mar 2026 00:27:25 +0530 Subject: [PATCH 20/47] 30min default timeouts --- verifiers/envs/experimental/cli_agent_env.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 8911da3d8..e494d97a9 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -279,9 +279,9 @@ async def build_env_vars(self, state: State) -> dict[str, str]: """Build environment variables for the sandbox. Override to add custom vars.""" env_vars = dict(self.environment_vars) if self.environment_vars else {} env_vars["OPENAI_BASE_URL"] = state["interception_base_url"] - env_vars.setdefault("OPENAI_TIMEOUT", "600") - env_vars.setdefault("OPENAI_REQUEST_TIMEOUT", "600") - env_vars.setdefault("HTTPX_TIMEOUT", "600") + env_vars.setdefault("OPENAI_TIMEOUT", "1800") + env_vars.setdefault("OPENAI_REQUEST_TIMEOUT", "1800") + env_vars.setdefault("HTTPX_TIMEOUT", "1800") model = state.get("model") if model: env_vars["OPENAI_MODEL"] = model @@ -347,8 +347,10 @@ async def poll_job_completion( max_consecutive_errors = 5 while True: try: - status: BackgroundJobStatus = await self.sandbox_client.get_background_job( - sandbox_id, background_job + status: BackgroundJobStatus = ( + await self.sandbox_client.get_background_job( + sandbox_id, background_job + ) ) consecutive_errors = 0 except Exception as e: From 617ad938ee6d860882f778b09fc94a8fbddf599e Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Tue, 10 Mar 2026 04:43:30 +0530 Subject: [PATCH 21/47] pipe agent failure --- verifiers/envs/experimental/opencode_env.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index e1d5c2fb4..ecd5c872b 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -51,7 +51,7 @@ ) DEFAULT_RUN_COMMAND_TEMPLATE = """\ -set -e +set -eo pipefail apt-get update && apt-get install -y curl From ac201f95e468df6210ddd3c838b50dc2a73e9de5 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Tue, 10 Mar 2026 04:44:01 +0530 Subject: [PATCH 22/47] http prob for tunnel liveness --- verifiers/envs/experimental/cli_agent_env.py | 59 +++++++++++++++----- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index e494d97a9..196014d3a 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -6,6 +6,8 @@ from collections import Counter from typing import Any, cast +import httpx + from prime_sandboxes import ( AdvancedConfigs, BackgroundJob, @@ -190,28 +192,55 @@ async def get_tunnel_url(self) -> str: return self._tunnel.url async def _tunnel_health_monitor(self, interval: float = 30.0) -> None: - """Background task that checks tunnel liveness and restarts a dead tunnel.""" + """Background task that probes tunnel liveness and restarts on failure. + + Sends an HTTP request through the tunnel every `interval` seconds. + Any response (even 404) confirms the tunnel is routing traffic. + After 3 consecutive failures the tunnel is torn down and recreated. + """ + consecutive_failures = 0 + max_consecutive_failures = 3 try: while True: await asyncio.sleep(interval) async with self._tunnel_lock: - if self._tunnel is not None and not self._tunnel.is_running: - frpc_output = "\n".join(self._tunnel.recent_output) + if self._tunnel is None or self._tunnel.url is None: + continue + + try: + async with httpx.AsyncClient() as client: + await client.get( + f"{self._tunnel.url}/health", + timeout=10.0, + ) + consecutive_failures = 0 + except Exception as e: + consecutive_failures += 1 logger.warning( - f"Health monitor: tunnel dead. frpc output:\n{frpc_output}" + f"Health monitor: tunnel probe failed " + f"({consecutive_failures}/{max_consecutive_failures}): {e}" ) - self._tunnel.sync_stop() - interception_server = self._require_interception_server() - port = interception_server.port - if logger.isEnabledFor(logging.DEBUG): - self._tunnel = Tunnel( - local_port=port, - log_level="debug", + if consecutive_failures >= max_consecutive_failures: + logger.warning( + f"Health monitor: tunnel unreachable after " + f"{max_consecutive_failures} consecutive probes, " + f"restarting" + ) + self._tunnel.sync_stop() + interception_server = self._require_interception_server() + port = interception_server.port + if logger.isEnabledFor(logging.DEBUG): + self._tunnel = Tunnel( + local_port=port, + log_level="debug", + ) + else: + self._tunnel = Tunnel(local_port=port) + url = await self._tunnel.start() + consecutive_failures = 0 + logger.info( + f"Health monitor: restarted tunnel url={url}" ) - else: - self._tunnel = Tunnel(local_port=port) - url = await self._tunnel.start() - logger.info(f"Health monitor: restarted tunnel url={url}") except asyncio.CancelledError: return From ffa2f9c3a62fcd11d8081167437429ae8de7a611 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Tue, 10 Mar 2026 04:45:09 +0530 Subject: [PATCH 23/47] ruff --- verifiers/envs/experimental/cli_agent_env.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 196014d3a..779ac37c5 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -238,9 +238,7 @@ async def _tunnel_health_monitor(self, interval: float = 30.0) -> None: self._tunnel = Tunnel(local_port=port) url = await self._tunnel.start() consecutive_failures = 0 - logger.info( - f"Health monitor: restarted tunnel url={url}" - ) + logger.info(f"Health monitor: restarted tunnel url={url}") except asyncio.CancelledError: return From 60a2194f019c7c27e844f8ed7229fa6961c001be Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Wed, 11 Mar 2026 17:19:12 +0530 Subject: [PATCH 24/47] dont re-raise agent error, but log error --- verifiers/envs/experimental/cli_agent_env.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 779ac37c5..a8242811f 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -362,7 +362,7 @@ async def wait_for_completion(self, state: State) -> None: except Exception as e: error = AgentError(f"Agent polling failed: {e}") state["error"] = error - raise error from e + logger.error(f"Agent polling failed: {e}") finally: state["agent_completed"] = True From 7542021b404b0f30d305edfd1f04a5bead02f8cb Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Wed, 11 Mar 2026 20:53:35 +0530 Subject: [PATCH 25/47] update hybrid math rubric constructor --- .../experimental/hybrid_math_rubric.py | 36 ++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/verifiers/rubrics/experimental/hybrid_math_rubric.py b/verifiers/rubrics/experimental/hybrid_math_rubric.py index eb5ebc822..1249b5a16 100644 --- a/verifiers/rubrics/experimental/hybrid_math_rubric.py +++ b/verifiers/rubrics/experimental/hybrid_math_rubric.py @@ -1,10 +1,10 @@ from math_verify import parse, verify from openai import AsyncOpenAI - -import verifiers as vf from verifiers.parsers.parser import Parser from verifiers.utils.data_utils import extract_boxed_answer +import verifiers as vf + # https://github.com/open-compass/CompassVerifier/blob/2d7cba6df0b21f9c6121786ac1e5770c68473598/src/prompts.py#L28 DEFAULT_JUDGE_PROMPT = """\ As a grading expert, your task is to determine whether the candidate's final answer matches the provided standard answer. Follow these evaluation guidelines precisely: @@ -84,16 +84,26 @@ class HybridMathRubric(vf.JudgeRubric): """Runs rule-based math verification first, with optional LLM judge fallback.""" + DEFAULT_JUDGE_PARSER = None + DEFAULT_JUDGE_MODEL = "openai/gpt-5-nano" + DEFAULT_JUDGE_CLIENT = None + DEFAULT_JUDGE_PROMPT = DEFAULT_JUDGE_PROMPT + DEFAULT_JUDGE_SAMPLING_ARGS = {} + DEFAULT_USE_JUDGE_FALLBACK = False + DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS = 5 + def __init__( self, - judge_parser: Parser | None = None, - judge_model: str | None = None, - judge_client: AsyncOpenAI | None = None, - judge_sampling_args: dict = {}, + judge_parser: Parser | None = DEFAULT_JUDGE_PARSER, + use_judge_fallback: bool = DEFAULT_USE_JUDGE_FALLBACK, + judge_client: AsyncOpenAI | None = DEFAULT_JUDGE_CLIENT, + judge_model: str = DEFAULT_JUDGE_MODEL, judge_prompt: str = DEFAULT_JUDGE_PROMPT, - timeout_seconds: float = 5, + judge_sampling_args: dict | None = None, + math_verify_timeout_seconds: float = DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS, **kwargs, ): + judge_sampling_args = judge_sampling_args or self.DEFAULT_JUDGE_SAMPLING_ARGS super().__init__( judge_client=judge_client, judge_sampling_args=judge_sampling_args, @@ -106,9 +116,9 @@ def __init__( self.add_reward_func(self.judge_score, weight=0) self.add_reward_func(self.correct_answer, weight=1) - self.timeout_seconds = timeout_seconds - self.judge_model = judge_model - self.class_objects["judge_model"] = judge_model + self.math_verify_timeout_seconds = math_verify_timeout_seconds + self.judge_model = judge_model if use_judge_fallback else None + self.class_objects["judge_model"] = self.judge_model async def math_verify_score( self, completion: vf.Messages, answer: str, state: vf.State, **kwargs @@ -124,13 +134,13 @@ async def math_verify_score( verify( parse( f"\\boxed{{{answer}}}", - parsing_timeout=int(self.timeout_seconds), + parsing_timeout=int(self.math_verify_timeout_seconds), ), parse( f"\\boxed{{{response}}}", - parsing_timeout=int(self.timeout_seconds), + parsing_timeout=int(self.math_verify_timeout_seconds), ), - timeout_seconds=int(self.timeout_seconds), + timeout_seconds=int(self.math_verify_timeout_seconds), ) ) except BaseException as e: From be2253bc5b2c968419158fea89d3b3cb06b38176 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Wed, 11 Mar 2026 21:02:24 +0530 Subject: [PATCH 26/47] allow offline difficulty filtering --- .../envs/experimental/opencode_qa_env.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/verifiers/envs/experimental/opencode_qa_env.py b/verifiers/envs/experimental/opencode_qa_env.py index bff428d09..976fb287d 100644 --- a/verifiers/envs/experimental/opencode_qa_env.py +++ b/verifiers/envs/experimental/opencode_qa_env.py @@ -13,6 +13,9 @@ class OpenCodeQAEnv(OpenCodeEnv): DEFAULT_ANSWER_KEY = "answer" DEFAULT_INSTRUCTION_PROMPT = "" DEFAULT_INSTRUCTION_PROMPT_POST = "" + DEFAULT_DIFFICULTY_KEY = None + DEFAULT_MIN_AVG_REWARD = 0.0 + DEFAULT_MAX_AVG_REWARD = 1.0 def __init__( self, @@ -24,6 +27,9 @@ def __init__( answer_key: str = DEFAULT_ANSWER_KEY, instruction_prompt: str = DEFAULT_INSTRUCTION_PROMPT, instruction_prompt_post: str = DEFAULT_INSTRUCTION_PROMPT_POST, + difficulty_key: str | None = DEFAULT_DIFFICULTY_KEY, + min_avg_reward: float = DEFAULT_MIN_AVG_REWARD, + max_avg_reward: float = DEFAULT_MAX_AVG_REWARD, **kwargs, ): dataset = self.construct_dataset( @@ -34,6 +40,9 @@ def __init__( answer_key, instruction_prompt, instruction_prompt_post, + difficulty_key, + min_avg_reward, + max_avg_reward, ) super().__init__(dataset=dataset, rubric=rubric, **kwargs) @@ -47,6 +56,9 @@ def construct_dataset( answer_key: str, instruction_prompt: str, instruction_prompt_post: str, + difficulty_key: str | None = None, + min_avg_reward: float = 0.0, + max_avg_reward: float = 1.0, ) -> Dataset: """Constructs a general QA dataset.""" @@ -57,6 +69,13 @@ def construct_dataset( ) dataset = dataset_obj + if difficulty_key is not None: + column_names = dataset.column_names or [] + if difficulty_key in column_names: + dataset = dataset.filter( + lambda x: min_avg_reward <= x[difficulty_key] <= max_avg_reward + ) + column_names = dataset.column_names if column_names is None: raise ValueError("Dataset has no columns.") From 00c1a0f28314042f4914fa61a94ec5c531238032 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Wed, 11 Mar 2026 21:05:47 +0530 Subject: [PATCH 27/47] fix default model --- verifiers/rubrics/experimental/hybrid_math_rubric.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verifiers/rubrics/experimental/hybrid_math_rubric.py b/verifiers/rubrics/experimental/hybrid_math_rubric.py index 1249b5a16..45236e3c1 100644 --- a/verifiers/rubrics/experimental/hybrid_math_rubric.py +++ b/verifiers/rubrics/experimental/hybrid_math_rubric.py @@ -85,7 +85,7 @@ class HybridMathRubric(vf.JudgeRubric): """Runs rule-based math verification first, with optional LLM judge fallback.""" DEFAULT_JUDGE_PARSER = None - DEFAULT_JUDGE_MODEL = "openai/gpt-5-nano" + DEFAULT_JUDGE_MODEL = "gpt-5-nano" DEFAULT_JUDGE_CLIENT = None DEFAULT_JUDGE_PROMPT = DEFAULT_JUDGE_PROMPT DEFAULT_JUDGE_SAMPLING_ARGS = {} From c7857f42d992cc997a0a733be470efb21737510a Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Wed, 11 Mar 2026 21:31:04 +0530 Subject: [PATCH 28/47] collect agent logs --- verifiers/envs/experimental/opencode_env.py | 26 +++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index ecd5c872b..fff9af6dc 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -302,6 +302,32 @@ def normalize_response(self, response: vf.Response) -> vf.Response: ) return response.model_copy(update={"message": normalized_message}) + async def post_rollout(self, state: vf.State) -> None: + """Collect agent logs from sandbox before teardown.""" + sandbox_id = state.get("sandbox_id") + if sandbox_id: + try: + result = await self.sandbox_client.execute_command( + sandbox_id, + f"cat {self.remote_logs_path} 2>/dev/null || echo ''", + working_dir=None, + ) + agent_logs = (result.stdout or "").strip() + state["agent_logs"] = agent_logs + + # Log agent output on error or empty trajectory for debugging + num_turns = len(state.get("trajectory", [])) + agent_error = state.get("agent_exit_code", 0) != 0 + if (agent_error or num_turns == 0) and agent_logs: + logger.debug( + f"Agent logs (example_id={state.get('example_id')}, " + f"exit_code={state.get('agent_exit_code')}, turns={num_turns}):\n{agent_logs}" + ) + except Exception as e: + logger.warning(f"Failed to collect agent logs: {e}") + + await super().post_rollout(state) + def build_prompt(self, state: vf.State) -> str: """Build the prompt to be uploaded to OpenCode.""" return state["prompt"][-1]["content"] From b8c1791617c13d1fa25146a310a32674c1ada155 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Wed, 11 Mar 2026 21:33:40 +0530 Subject: [PATCH 29/47] use warning log --- verifiers/envs/experimental/opencode_env.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index fff9af6dc..5953697bf 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -319,7 +319,7 @@ async def post_rollout(self, state: vf.State) -> None: num_turns = len(state.get("trajectory", [])) agent_error = state.get("agent_exit_code", 0) != 0 if (agent_error or num_turns == 0) and agent_logs: - logger.debug( + logger.warning( f"Agent logs (example_id={state.get('example_id')}, " f"exit_code={state.get('agent_exit_code')}, turns={num_turns}):\n{agent_logs}" ) From 7c9b59b25b2ca00cb68744a74a51b0763f732be9 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 19:17:00 +0530 Subject: [PATCH 30/47] agent per-request timeout 30min->1h --- verifiers/envs/experimental/opencode_env.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/verifiers/envs/experimental/opencode_env.py b/verifiers/envs/experimental/opencode_env.py index 5953697bf..014491b52 100644 --- a/verifiers/envs/experimental/opencode_env.py +++ b/verifiers/envs/experimental/opencode_env.py @@ -144,7 +144,7 @@ class OpenCodeEnv(CliAgentEnv): DEFAULT_INSTALL_COMMAND = DEFAULT_INSTALL_COMMAND DEFAULT_RUN_COMMAND_TEMPLATE = DEFAULT_RUN_COMMAND_TEMPLATE DEFAULT_SYSTEM_PROMPT = DEFAULT_SYSTEM_PROMPT - DEFAULT_PROVIDER_TIMEOUT_MS = 1_800_000 # 30min + DEFAULT_PROVIDER_TIMEOUT_MS = 3_600_000 # 1h DEFAULT_DISABLE_COMPACTION = True DEFAULT_ENABLE_INTERLEAVED = True DEFAULT_INCLUDE_TASK_SYSTEM_PROMPT = False From 8347df40e7a82a43a1122ddb4246823315e7a566 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 19:32:14 +0530 Subject: [PATCH 31/47] remove http health probe again --- verifiers/envs/experimental/cli_agent_env.py | 72 -------------------- 1 file changed, 72 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index a8242811f..a509ad272 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -145,7 +145,6 @@ def init_interception( self.interception_url = interception_url self._tunnel: Tunnel | None = None self._tunnel_lock = asyncio.Lock() - self._tunnel_monitor_task: asyncio.Task | None = None self._interception_server = InterceptionServer(port=interception_port) def _require_interception_server(self) -> InterceptionServer: @@ -177,71 +176,11 @@ async def get_tunnel_url(self) -> str: url = await self._tunnel.start() logger.debug(f"Prime Tunnel started: {url}") - # Lazily start health monitor on first tunnel creation - if ( - self._tunnel_monitor_task is None - or self._tunnel_monitor_task.done() - ): - self._tunnel_monitor_task = asyncio.create_task( - self._tunnel_health_monitor() - ) - return url else: assert self._tunnel.url is not None, "Tunnel started but URL is None" return self._tunnel.url - async def _tunnel_health_monitor(self, interval: float = 30.0) -> None: - """Background task that probes tunnel liveness and restarts on failure. - - Sends an HTTP request through the tunnel every `interval` seconds. - Any response (even 404) confirms the tunnel is routing traffic. - After 3 consecutive failures the tunnel is torn down and recreated. - """ - consecutive_failures = 0 - max_consecutive_failures = 3 - try: - while True: - await asyncio.sleep(interval) - async with self._tunnel_lock: - if self._tunnel is None or self._tunnel.url is None: - continue - - try: - async with httpx.AsyncClient() as client: - await client.get( - f"{self._tunnel.url}/health", - timeout=10.0, - ) - consecutive_failures = 0 - except Exception as e: - consecutive_failures += 1 - logger.warning( - f"Health monitor: tunnel probe failed " - f"({consecutive_failures}/{max_consecutive_failures}): {e}" - ) - if consecutive_failures >= max_consecutive_failures: - logger.warning( - f"Health monitor: tunnel unreachable after " - f"{max_consecutive_failures} consecutive probes, " - f"restarting" - ) - self._tunnel.sync_stop() - interception_server = self._require_interception_server() - port = interception_server.port - if logger.isEnabledFor(logging.DEBUG): - self._tunnel = Tunnel( - local_port=port, - log_level="debug", - ) - else: - self._tunnel = Tunnel(local_port=port) - url = await self._tunnel.start() - consecutive_failures = 0 - logger.info(f"Health monitor: restarted tunnel url={url}") - except asyncio.CancelledError: - return - async def setup_state(self, state: State) -> State: """Setup sandbox + interception for this rollout""" state = await super().setup_state(state) @@ -589,17 +528,6 @@ async def add_model_response( @vf.teardown async def teardown_resources(self): """Stop Prime Tunnel and HTTP interception server.""" - if ( - self._tunnel_monitor_task is not None - and not self._tunnel_monitor_task.done() - ): - self._tunnel_monitor_task.cancel() - try: - await self._tunnel_monitor_task - except asyncio.CancelledError: - pass - self._tunnel_monitor_task = None - async with self._tunnel_lock: if self._tunnel is not None: try: From d729f1d0ce27f280cde5cb347c687bb9b3e4f382 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 19:33:00 +0530 Subject: [PATCH 32/47] default timeouts to 1h --- verifiers/envs/experimental/cli_agent_env.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index a509ad272..325b67fde 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -245,9 +245,9 @@ async def build_env_vars(self, state: State) -> dict[str, str]: """Build environment variables for the sandbox. Override to add custom vars.""" env_vars = dict(self.environment_vars) if self.environment_vars else {} env_vars["OPENAI_BASE_URL"] = state["interception_base_url"] - env_vars.setdefault("OPENAI_TIMEOUT", "1800") - env_vars.setdefault("OPENAI_REQUEST_TIMEOUT", "1800") - env_vars.setdefault("HTTPX_TIMEOUT", "1800") + env_vars.setdefault("OPENAI_TIMEOUT", "3600") + env_vars.setdefault("OPENAI_REQUEST_TIMEOUT", "3600") + env_vars.setdefault("HTTPX_TIMEOUT", "3600") model = state.get("model") if model: env_vars["OPENAI_MODEL"] = model From 67ea4bf69c0f4dab5b2e4d89899bf8ffab508f4c Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 19:35:47 +0530 Subject: [PATCH 33/47] remove tunnel lock (not needed bc no concurrent health monitor) --- verifiers/envs/experimental/cli_agent_env.py | 43 +++++++++----------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 325b67fde..3f7e101b3 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -144,7 +144,6 @@ def init_interception( self.interception_port = interception_port self.interception_url = interception_url self._tunnel: Tunnel | None = None - self._tunnel_lock = asyncio.Lock() self._interception_server = InterceptionServer(port=interception_port) def _require_interception_server(self) -> InterceptionServer: @@ -154,16 +153,15 @@ def _require_interception_server(self) -> InterceptionServer: async def get_tunnel_url(self) -> str: """Get tunnel URL, starting the tunnel if needed. Recreates dead tunnels.""" - async with self._tunnel_lock: - if self._tunnel is not None and not self._tunnel.is_running: - frpc_output = "\n".join(self._tunnel.recent_output) - logger.warning( - f"Tunnel process died, recreating. frpc output:\n{frpc_output}" - ) - self._tunnel.sync_stop() - self._tunnel = None + if self._tunnel is not None and not self._tunnel.is_running: + frpc_output = "\n".join(self._tunnel.recent_output) + logger.warning( + f"Tunnel process died, recreating. frpc output:\n{frpc_output}" + ) + self._tunnel.sync_stop() + self._tunnel = None - if self._tunnel is None: + if self._tunnel is None: interception_server = self._require_interception_server() port = interception_server.port if logger.isEnabledFor(logging.DEBUG): @@ -176,10 +174,10 @@ async def get_tunnel_url(self) -> str: url = await self._tunnel.start() logger.debug(f"Prime Tunnel started: {url}") - return url - else: - assert self._tunnel.url is not None, "Tunnel started but URL is None" - return self._tunnel.url + return url + else: + assert self._tunnel.url is not None, "Tunnel started but URL is None" + return self._tunnel.url async def setup_state(self, state: State) -> State: """Setup sandbox + interception for this rollout""" @@ -528,15 +526,14 @@ async def add_model_response( @vf.teardown async def teardown_resources(self): """Stop Prime Tunnel and HTTP interception server.""" - async with self._tunnel_lock: - if self._tunnel is not None: - try: - self._tunnel.sync_stop() - logger.debug("Prime Tunnel stopped") - except Exception as e: - logger.warning(f"Error stopping Prime Tunnel: {e}") - finally: - self._tunnel = None + if self._tunnel is not None: + try: + self._tunnel.sync_stop() + logger.debug("Prime Tunnel stopped") + except Exception as e: + logger.warning(f"Error stopping Prime Tunnel: {e}") + finally: + self._tunnel = None if self._interception_server is not None: await self._interception_server.stop() From 53c831b1c926d1ae3548e65dd0284cb9a0f762c3 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 19:37:24 +0530 Subject: [PATCH 34/47] simplify poll_job --- verifiers/envs/experimental/cli_agent_env.py | 44 +++++++------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 3f7e101b3..577b67c99 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -162,18 +162,17 @@ async def get_tunnel_url(self) -> str: self._tunnel = None if self._tunnel is None: - interception_server = self._require_interception_server() - port = interception_server.port - if logger.isEnabledFor(logging.DEBUG): - self._tunnel = Tunnel( - local_port=port, - log_level="debug", - ) - else: - self._tunnel = Tunnel(local_port=port) - url = await self._tunnel.start() - logger.debug(f"Prime Tunnel started: {url}") - + interception_server = self._require_interception_server() + port = interception_server.port + if logger.isEnabledFor(logging.DEBUG): + self._tunnel = Tunnel( + local_port=port, + log_level="debug", + ) + else: + self._tunnel = Tunnel(local_port=port) + url = await self._tunnel.start() + logger.debug(f"Prime Tunnel started: {url}") return url else: assert self._tunnel.url is not None, "Tunnel started but URL is None" @@ -307,25 +306,10 @@ async def poll_job_completion( self, state: State, sandbox_id: str, background_job: BackgroundJob ) -> None: """Poll until background job completes, capturing output.""" - consecutive_errors = 0 - max_consecutive_errors = 5 while True: - try: - status: BackgroundJobStatus = ( - await self.sandbox_client.get_background_job( - sandbox_id, background_job - ) - ) - consecutive_errors = 0 - except Exception as e: - consecutive_errors += 1 - logger.warning( - f"Polling error ({consecutive_errors}/{max_consecutive_errors}): {e}" - ) - if consecutive_errors >= max_consecutive_errors: - raise - await asyncio.sleep(2) - continue + status: BackgroundJobStatus = await self.sandbox_client.get_background_job( + sandbox_id, background_job + ) if status.completed: state["agent_exit_code"] = status.exit_code state["agent_stdout"] = status.stdout From b149c96d6cfd5c33d8ca8b25292f52ba2b77ca1a Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 19:41:31 +0530 Subject: [PATCH 35/47] use cls logger --- verifiers/envs/experimental/cli_agent_env.py | 26 +++++++++----------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 577b67c99..667bc99cf 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -6,8 +6,6 @@ from collections import Counter from typing import Any, cast -import httpx - from prime_sandboxes import ( AdvancedConfigs, BackgroundJob, @@ -155,7 +153,7 @@ async def get_tunnel_url(self) -> str: """Get tunnel URL, starting the tunnel if needed. Recreates dead tunnels.""" if self._tunnel is not None and not self._tunnel.is_running: frpc_output = "\n".join(self._tunnel.recent_output) - logger.warning( + self.logger.warning( f"Tunnel process died, recreating. frpc output:\n{frpc_output}" ) self._tunnel.sync_stop() @@ -172,7 +170,7 @@ async def get_tunnel_url(self) -> str: else: self._tunnel = Tunnel(local_port=port) url = await self._tunnel.start() - logger.debug(f"Prime Tunnel started: {url}") + self.logger.debug(f"Prime Tunnel started: {url}") return url else: assert self._tunnel.url is not None, "Tunnel started but URL is None" @@ -213,7 +211,7 @@ async def setup_state(self, state: State) -> State: advanced_configs=self.advanced_configs, labels=self.labels if self.labels else [], ) - logger.debug( + self.logger.debug( f"Creating sandbox with OPENAI_BASE_URL={env_vars.get('OPENAI_BASE_URL')} " f"docker_image={docker_image}" ) @@ -230,7 +228,7 @@ async def setup_state(self, state: State) -> State: f"Started rollout_id={state['rollout_id']}", f"example_id={state['example_id']}", ] - logger.info(" | ".join(parts)) + self.logger.info(" | ".join(parts)) return state @@ -290,15 +288,15 @@ async def wait_for_completion(self, state: State) -> None: timeout=self.timeout_seconds, ) except asyncio.TimeoutError: - logger.warning(f"Agent timed out after {self.timeout_seconds}s") + self.logger.warning(f"Agent timed out after {self.timeout_seconds}s") state["agent_timed_out"] = True except asyncio.CancelledError: - logger.debug("Completion wait task cancelled") + self.logger.debug("Completion wait task cancelled") raise except Exception as e: error = AgentError(f"Agent polling failed: {e}") state["error"] = error - logger.error(f"Agent polling failed: {e}") + self.logger.error(f"Agent polling failed: {e}") finally: state["agent_completed"] = True @@ -315,11 +313,11 @@ async def poll_job_completion( state["agent_stdout"] = status.stdout state["agent_stderr"] = status.stderr if status.exit_code == 0: - logger.debug( + self.logger.debug( f"Agent completed successfully (exit_code={status.exit_code})" ) else: - logger.warning( + self.logger.warning( f"Agent failed (exit_code={status.exit_code}) stdout={status.stdout}, stderr={status.stderr}" ) return @@ -513,9 +511,9 @@ async def teardown_resources(self): if self._tunnel is not None: try: self._tunnel.sync_stop() - logger.debug("Prime Tunnel stopped") + self.logger.debug("Prime Tunnel stopped") except Exception as e: - logger.warning(f"Error stopping Prime Tunnel: {e}") + self.logger.warning(f"Error stopping Prime Tunnel: {e}") finally: self._tunnel = None if self._interception_server is not None: @@ -589,7 +587,7 @@ async def post_rollout(self, state: State): parts.append("timed_out=True") if error_info: parts.append(f"error={error_info}") - logger.info(" | ".join(parts)) + self.logger.info(" | ".join(parts)) @vf.cleanup async def destroy_sandbox(self, state: State): From 5dbf8a2ab2fc14bba25f83691c3b9cbeb8da01a5 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 20:19:34 +0530 Subject: [PATCH 36/47] oops bring back tunnel lock --- verifiers/envs/experimental/cli_agent_env.py | 63 ++++++++++---------- 1 file changed, 33 insertions(+), 30 deletions(-) diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 667bc99cf..94a2929c5 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -142,6 +142,7 @@ def init_interception( self.interception_port = interception_port self.interception_url = interception_url self._tunnel: Tunnel | None = None + self._tunnel_lock = asyncio.Lock() self._interception_server = InterceptionServer(port=interception_port) def _require_interception_server(self) -> InterceptionServer: @@ -151,30 +152,31 @@ def _require_interception_server(self) -> InterceptionServer: async def get_tunnel_url(self) -> str: """Get tunnel URL, starting the tunnel if needed. Recreates dead tunnels.""" - if self._tunnel is not None and not self._tunnel.is_running: - frpc_output = "\n".join(self._tunnel.recent_output) - self.logger.warning( - f"Tunnel process died, recreating. frpc output:\n{frpc_output}" - ) - self._tunnel.sync_stop() - self._tunnel = None - - if self._tunnel is None: - interception_server = self._require_interception_server() - port = interception_server.port - if logger.isEnabledFor(logging.DEBUG): - self._tunnel = Tunnel( - local_port=port, - log_level="debug", + async with self._tunnel_lock: + if self._tunnel is not None and not self._tunnel.is_running: + frpc_output = "\n".join(self._tunnel.recent_output) + self.logger.warning( + f"Tunnel process died, recreating. frpc output:\n{frpc_output}" ) + self._tunnel.sync_stop() + self._tunnel = None + + if self._tunnel is None: + interception_server = self._require_interception_server() + port = interception_server.port + if logger.isEnabledFor(logging.DEBUG): + self._tunnel = Tunnel( + local_port=port, + log_level="debug", + ) + else: + self._tunnel = Tunnel(local_port=port) + url = await self._tunnel.start() + self.logger.debug(f"Prime Tunnel started: {url}") + return url else: - self._tunnel = Tunnel(local_port=port) - url = await self._tunnel.start() - self.logger.debug(f"Prime Tunnel started: {url}") - return url - else: - assert self._tunnel.url is not None, "Tunnel started but URL is None" - return self._tunnel.url + assert self._tunnel.url is not None, "Tunnel started but URL is None" + return self._tunnel.url async def setup_state(self, state: State) -> State: """Setup sandbox + interception for this rollout""" @@ -508,14 +510,15 @@ async def add_model_response( @vf.teardown async def teardown_resources(self): """Stop Prime Tunnel and HTTP interception server.""" - if self._tunnel is not None: - try: - self._tunnel.sync_stop() - self.logger.debug("Prime Tunnel stopped") - except Exception as e: - self.logger.warning(f"Error stopping Prime Tunnel: {e}") - finally: - self._tunnel = None + async with self._tunnel_lock: + if self._tunnel is not None: + try: + self._tunnel.sync_stop() + self.logger.debug("Prime Tunnel stopped") + except Exception as e: + self.logger.warning(f"Error stopping Prime Tunnel: {e}") + finally: + self._tunnel = None if self._interception_server is not None: await self._interception_server.stop() From cfb150edffa0b76c4f0401ae35142d5267b6761c Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 21:49:59 +0530 Subject: [PATCH 37/47] add file upload helpers --- verifiers/envs/experimental/__init__.py | 4 +- verifiers/envs/experimental/sandbox_mixin.py | 160 ++++++++++++++++++ .../experimental/hybrid_math_rubric.py | 12 +- .../scripts/math_verify_scorer.py | 27 +++ 4 files changed, 200 insertions(+), 3 deletions(-) create mode 100644 verifiers/rubrics/experimental/scripts/math_verify_scorer.py diff --git a/verifiers/envs/experimental/__init__.py b/verifiers/envs/experimental/__init__.py index ff81549e0..53605f80e 100644 --- a/verifiers/envs/experimental/__init__.py +++ b/verifiers/envs/experimental/__init__.py @@ -1,4 +1,4 @@ from verifiers.envs.experimental.rollout_gateway_mixin import RolloutGatewayMixin -from verifiers.envs.experimental.sandbox_mixin import SandboxMixin +from verifiers.envs.experimental.sandbox_mixin import SandboxMixin, SandboxScorer -__all__ = ["RolloutGatewayMixin", "SandboxMixin"] +__all__ = ["RolloutGatewayMixin", "SandboxMixin", "SandboxScorer"] diff --git a/verifiers/envs/experimental/sandbox_mixin.py b/verifiers/envs/experimental/sandbox_mixin.py index cf177e2d5..9352c96a0 100644 --- a/verifiers/envs/experimental/sandbox_mixin.py +++ b/verifiers/envs/experimental/sandbox_mixin.py @@ -1,11 +1,17 @@ import asyncio +import io import logging import os +import tarfile +import tempfile +from dataclasses import dataclass, field +from pathlib import Path from typing import Any, Callable, cast import httpx import tenacity as tc from prime_sandboxes import ( + APIError, CommandTimeoutError, CreateSandboxRequest, SandboxClient, @@ -26,6 +32,32 @@ httpcore_logger.setLevel(getattr(logging, _httpx_log_level, logging.DEBUG)) +@dataclass +class SandboxScorer: + """A scorer that runs a script inside the sandbox after rollout. + + The scorer uploads ``script`` and any ``files`` to ``dest_dir`` via bundle, + then executes the script. The last line of stdout is parsed as a float and + stored in ``state[state_key]``. + + Attributes: + name: Human-readable name for logging. + script: Python source code to execute in the sandbox. + state_key: Key under which the score is stored in state. + dest_dir: Directory inside the sandbox to upload files to. + files_fn: Callable that receives ``state`` and returns a dict of + ``{filename: content}`` to upload alongside the script. + timeout: Timeout in seconds for the scorer script execution. + """ + + name: str + script: str + state_key: str + dest_dir: str = "/app" + files_fn: Callable[[dict], dict[str, str]] = field(default_factory=lambda: lambda state: {}) + timeout: int = 30 + + class SandboxCreationError(vf.SandboxError): ... @@ -58,6 +90,7 @@ class SandboxMixin: active_sandboxes: set[str] sandbox_client: ThreadedAsyncSandboxClient sandbox_wait_for_creation_max_attempts: int + sandbox_scorers: list[SandboxScorer] with_retry: Callable def init_sandbox_client( @@ -75,6 +108,8 @@ def init_sandbox_client( """Initialize sandbox client and retry wrapper. Call from subclass __init__.""" self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") self.active_sandboxes = set() + if not hasattr(self, "sandbox_scorers"): + self.sandbox_scorers = [] self.sandbox_wait_for_creation_max_attempts = ( sandbox_wait_for_creation_max_attempts ) @@ -211,6 +246,131 @@ async def run_background_job( sandbox_id=sandbox_id, command=command, timeout=timeout ) + async def upload_file( + self, + sandbox_id: str, + remote_path: str, + local_path: str, + ) -> None: + """Upload a local file to the sandbox.""" + try: + await self.sandbox_client.upload_file(sandbox_id, remote_path, local_path) + except SandboxOOMError as e: + raise vf.SandboxError( + f"Sandbox {sandbox_id} OOM during upload to {remote_path}" + ) from e + except SandboxTimeoutError as e: + raise vf.SandboxError( + f"Sandbox {sandbox_id} timeout during upload to {remote_path}" + ) from e + except APIError as e: + raise vf.SandboxError( + f"API error uploading to {remote_path} in {sandbox_id}: {e}" + ) from e + + async def upload_content( + self, + sandbox_id: str, + content: str, + remote_path: str, + ) -> None: + """Upload a string as a file to the sandbox.""" + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: + f.write(content) + local_path = f.name + try: + await self.upload_file(sandbox_id, remote_path, local_path) + finally: + Path(local_path).unlink(missing_ok=True) + + async def upload_bundle( + self, + sandbox_id: str, + file_map: dict[str, str], + dest_dir: str, + ) -> None: + """Upload a bundle of files to the sandbox. + + Builds a tar.gz archive from ``file_map`` (relative path → UTF-8 + content), uploads it, and extracts into ``dest_dir``. + """ + buf = io.BytesIO() + with tarfile.open(fileobj=buf, mode="w:gz") as tar: + for rel_path, content in file_map.items(): + data = content.encode("utf-8") + info = tarfile.TarInfo(name=rel_path) + info.size = len(data) + tar.addfile(info, io.BytesIO(data)) + bundle_bytes = buf.getvalue() + + archive_remote = f"{dest_dir}/_bundle.tar.gz" + with tempfile.NamedTemporaryFile(delete=False, suffix=".tar.gz") as f: + f.write(bundle_bytes) + tmp_path = f.name + try: + await self.upload_file(sandbox_id, archive_remote, tmp_path) + finally: + Path(tmp_path).unlink(missing_ok=True) + + extract_cmd = ( + f"mkdir -p {dest_dir} && " + f'python3 -c "import tarfile; ' + f"tarfile.open('{archive_remote}', 'r:gz').extractall('{dest_dir}')\" && " + f"rm -f {archive_remote}" + ) + result = await self.sandbox_client.execute_command( + sandbox_id, + extract_cmd, + timeout=60, + ) + if result.exit_code != 0: + raise vf.SandboxError( + f"Bundle extract failed in {sandbox_id} (exit={result.exit_code}): " + f"{(result.stderr or '')[:200]}" + ) + + def add_sandbox_scorer(self, scorer: SandboxScorer) -> None: + """Register a sandbox scorer to run during post_rollout.""" + self.sandbox_scorers.append(scorer) + + async def run_sandbox_scorers(self, state: dict[str, Any]) -> None: + """Run all registered sandbox scorers sequentially. + + Each scorer uploads its files + script to the sandbox via bundle, runs + the script, and stores the result in ``state[scorer.state_key]``. + Skipped when there is no sandbox or an error occurred. + """ + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + return + if state.get("error") or state.get("sandbox_error"): + return + + for scorer in self.sandbox_scorers: + try: + files = scorer.files_fn(state) + files["score.py"] = scorer.script + await self.upload_bundle(sandbox_id, file_map=files, dest_dir=scorer.dest_dir) + result = await self.sandbox_client.execute_command( + sandbox_id, + f"python3 {scorer.dest_dir}/score.py", + working_dir=scorer.dest_dir, + timeout=scorer.timeout, + ) + if result.exit_code == 0 and result.stdout.strip(): + state[scorer.state_key] = float(result.stdout.strip().splitlines()[-1]) + else: + stderr = (result.stderr or "")[:200] + self.logger.warning( + f"Sandbox scorer '{scorer.name}' failed (exit={result.exit_code}): {stderr}" + ) + state[scorer.state_key] = 0.0 + except Exception as e: + self.logger.warning( + f"Sandbox scorer '{scorer.name}' error: {type(e).__name__}: {e}" + ) + state[scorer.state_key] = 0.0 + def teardown_sandboxes(self): """Delete all active sandboxes using sync client. diff --git a/verifiers/rubrics/experimental/hybrid_math_rubric.py b/verifiers/rubrics/experimental/hybrid_math_rubric.py index 45236e3c1..ad6d9313d 100644 --- a/verifiers/rubrics/experimental/hybrid_math_rubric.py +++ b/verifiers/rubrics/experimental/hybrid_math_rubric.py @@ -91,6 +91,7 @@ class HybridMathRubric(vf.JudgeRubric): DEFAULT_JUDGE_SAMPLING_ARGS = {} DEFAULT_USE_JUDGE_FALLBACK = False DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS = 5 + DEFAULT_SCORE_REMOTELY = False def __init__( self, @@ -101,6 +102,7 @@ def __init__( judge_prompt: str = DEFAULT_JUDGE_PROMPT, judge_sampling_args: dict | None = None, math_verify_timeout_seconds: float = DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS, + score_remotely: bool = DEFAULT_SCORE_REMOTELY, **kwargs, ): judge_sampling_args = judge_sampling_args or self.DEFAULT_JUDGE_SAMPLING_ARGS @@ -117,13 +119,21 @@ def __init__( self.add_reward_func(self.correct_answer, weight=1) self.math_verify_timeout_seconds = math_verify_timeout_seconds + self.score_remotely = score_remotely self.judge_model = judge_model if use_judge_fallback else None self.class_objects["judge_model"] = self.judge_model async def math_verify_score( self, completion: vf.Messages, answer: str, state: vf.State, **kwargs ) -> float: - """Basic rule-based math verification.""" + """Basic rule-based math verification. + + When ``score_remotely=True``, assumes the score was already computed by + a sandbox scorer and stored in ``state["math_verify_score"]``. + """ + if self.score_remotely: + return float(state.get("math_verify_score", 0.0)) + response = self.parser.parse_answer(completion) or "" if response == "": math_verify_score = 0.0 diff --git a/verifiers/rubrics/experimental/scripts/math_verify_scorer.py b/verifiers/rubrics/experimental/scripts/math_verify_scorer.py new file mode 100644 index 000000000..5e2b9df4f --- /dev/null +++ b/verifiers/rubrics/experimental/scripts/math_verify_scorer.py @@ -0,0 +1,27 @@ +"""Sandbox scoring script for math_verify. + +Reads ground truth from ground_truth.txt and the agent's answer from answer.txt. +Prints a single float (1.0 or 0.0) to stdout. +""" + +from pathlib import Path + +from math_verify import parse, verify + +ground_truth = Path("ground_truth.txt").read_text() +response = Path("answer.txt").read_text() + +if not response: + print(0.0) +else: + try: + score = float( + verify( + parse(ground_truth, parsing_timeout=5), + parse(response, parsing_timeout=5), + timeout_seconds=5, + ) + ) + print(score) + except BaseException: + print(0.0) From 556b164785df8adcbef9def00f35b35f485a4741 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Thu, 12 Mar 2026 23:51:03 +0530 Subject: [PATCH 38/47] allow rubric cleanup --- verifiers/envs/environment.py | 6 ++++++ verifiers/rubrics/rubric.py | 17 ++++++++++++++++- verifiers/rubrics/rubric_group.py | 5 +++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/verifiers/envs/environment.py b/verifiers/envs/environment.py index a8adfff2d..f880ab8b7 100644 --- a/verifiers/envs/environment.py +++ b/verifiers/envs/environment.py @@ -747,6 +747,8 @@ async def run_rollout_attempt() -> State: else: await self.rubric.dummy_score_rollout(state) + await self.rubric.cleanup(state) + return state state = await maybe_retry(run_rollout_attempt, max_retries=max_retries)() @@ -804,6 +806,10 @@ async def run_group_attempt() -> list[State]: await self.rubric.score_group(group_states) else: await self.rubric.dummy_score_group(group_states) + + for state in group_states: + await self.rubric.cleanup(state) + return group_states group_states = await maybe_retry(run_group_attempt, max_retries=max_retries)() diff --git a/verifiers/rubrics/rubric.py b/verifiers/rubrics/rubric.py index 83c5c15c7..57b98dc74 100644 --- a/verifiers/rubrics/rubric.py +++ b/verifiers/rubrics/rubric.py @@ -2,7 +2,7 @@ import inspect import logging import time -from typing import Any, cast +from typing import Any, Awaitable, Callable, cast import verifiers as vf from verifiers.types import ( @@ -53,6 +53,16 @@ def __init__( if self.parser: self.class_objects["parser"] = self.parser + # Discover @vf.cleanup-decorated methods + self._cleanup_handlers: list[Callable[[State], Awaitable[None]]] = [ + method + for _, method in inspect.getmembers(self, predicate=inspect.ismethod) + if hasattr(method, "cleanup") and callable(method) + ] + self._cleanup_handlers.sort( + key=lambda m: (-getattr(m, "cleanup_priority", 0), m.__name__) + ) + # public helpers def add_reward_func(self, func: RewardFunc, weight: float = 1.0): self.funcs.append(func) @@ -213,6 +223,11 @@ async def _call_group_reward_func( ans = [0.0] * len(states) return ans + async def cleanup(self, state: State): + """Run all @vf.cleanup-decorated methods on this rubric.""" + for handler in self._cleanup_handlers: + await handler(state) + async def dummy_score_rollout(self, state: State): """Score a single rollout with dummy rewards.""" state["reward"] = 0.0 diff --git a/verifiers/rubrics/rubric_group.py b/verifiers/rubrics/rubric_group.py index b0d8bbb71..248217ef0 100644 --- a/verifiers/rubrics/rubric_group.py +++ b/verifiers/rubrics/rubric_group.py @@ -77,6 +77,11 @@ async def score_rollout(self, state: State): state["reward"] = total_reward state["metrics"] = aggregated_metrics + async def cleanup(self, state: State): + """Run cleanup for all rubrics in the group.""" + for rubric in self.rubrics: + await rubric.cleanup(state) + async def score_group(self, states: list[State]): """ Evaluate all reward functions in-place for a group of rollouts. From bed4ed026702c244e5297b311ccac7e53e1adc0b Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 00:01:38 +0530 Subject: [PATCH 39/47] sandbox scoring in hybrid math rubric --- verifiers/envs/experimental/__init__.py | 4 +- verifiers/envs/experimental/cli_agent_env.py | 15 +- verifiers/envs/experimental/sandbox_mixin.py | 72 --------- .../experimental/hybrid_math_rubric.py | 149 ++++++++++++++++-- 4 files changed, 148 insertions(+), 92 deletions(-) diff --git a/verifiers/envs/experimental/__init__.py b/verifiers/envs/experimental/__init__.py index 53605f80e..ff81549e0 100644 --- a/verifiers/envs/experimental/__init__.py +++ b/verifiers/envs/experimental/__init__.py @@ -1,4 +1,4 @@ from verifiers.envs.experimental.rollout_gateway_mixin import RolloutGatewayMixin -from verifiers.envs.experimental.sandbox_mixin import SandboxMixin, SandboxScorer +from verifiers.envs.experimental.sandbox_mixin import SandboxMixin -__all__ = ["RolloutGatewayMixin", "SandboxMixin", "SandboxScorer"] +__all__ = ["RolloutGatewayMixin", "SandboxMixin"] diff --git a/verifiers/envs/experimental/cli_agent_env.py b/verifiers/envs/experimental/cli_agent_env.py index 94a2929c5..ea9f8f9ad 100644 --- a/verifiers/envs/experimental/cli_agent_env.py +++ b/verifiers/envs/experimental/cli_agent_env.py @@ -98,6 +98,7 @@ def __init__( sandbox_client_max_connections: int = 100, sandbox_client_max_keepalive_connections: int = 50, sandbox_wait_for_creation_max_attempts: int = 120, + keep_sandbox_for_scoring: bool = False, **kwargs, ): super().__init__(max_turns=max_turns, message_type="chat", **kwargs) @@ -112,6 +113,7 @@ def __init__( sandbox_client_max_keepalive_connections=sandbox_client_max_keepalive_connections, sandbox_wait_for_creation_max_attempts=sandbox_wait_for_creation_max_attempts, ) + self.keep_sandbox_for_scoring = keep_sandbox_for_scoring self.run_command = run_command self.poll_interval = poll_interval self.timeout_seconds = timeout_seconds @@ -594,11 +596,16 @@ async def post_rollout(self, state: State): @vf.cleanup async def destroy_sandbox(self, state: State): - """Cleanup sandbox after rollout.""" + """Cleanup sandbox after rollout. + + When `keep_sandbox_for_scoring` is True, sandbox deletion is deferred + (e.g. when the rubric needs sandbox access during scoring). + """ await self.post_rollout(state) - sandbox_id = state.get("sandbox_id") - if sandbox_id: - await self.delete_sandbox(sandbox_id) + if not self.keep_sandbox_for_scoring: + sandbox_id = state.get("sandbox_id") + if sandbox_id: + await self.delete_sandbox(sandbox_id) async def env_response( self, messages: Messages, state: State, **kwargs diff --git a/verifiers/envs/experimental/sandbox_mixin.py b/verifiers/envs/experimental/sandbox_mixin.py index 9352c96a0..ba3d8bcef 100644 --- a/verifiers/envs/experimental/sandbox_mixin.py +++ b/verifiers/envs/experimental/sandbox_mixin.py @@ -4,7 +4,6 @@ import os import tarfile import tempfile -from dataclasses import dataclass, field from pathlib import Path from typing import Any, Callable, cast @@ -32,32 +31,6 @@ httpcore_logger.setLevel(getattr(logging, _httpx_log_level, logging.DEBUG)) -@dataclass -class SandboxScorer: - """A scorer that runs a script inside the sandbox after rollout. - - The scorer uploads ``script`` and any ``files`` to ``dest_dir`` via bundle, - then executes the script. The last line of stdout is parsed as a float and - stored in ``state[state_key]``. - - Attributes: - name: Human-readable name for logging. - script: Python source code to execute in the sandbox. - state_key: Key under which the score is stored in state. - dest_dir: Directory inside the sandbox to upload files to. - files_fn: Callable that receives ``state`` and returns a dict of - ``{filename: content}`` to upload alongside the script. - timeout: Timeout in seconds for the scorer script execution. - """ - - name: str - script: str - state_key: str - dest_dir: str = "/app" - files_fn: Callable[[dict], dict[str, str]] = field(default_factory=lambda: lambda state: {}) - timeout: int = 30 - - class SandboxCreationError(vf.SandboxError): ... @@ -90,7 +63,6 @@ class SandboxMixin: active_sandboxes: set[str] sandbox_client: ThreadedAsyncSandboxClient sandbox_wait_for_creation_max_attempts: int - sandbox_scorers: list[SandboxScorer] with_retry: Callable def init_sandbox_client( @@ -108,8 +80,6 @@ def init_sandbox_client( """Initialize sandbox client and retry wrapper. Call from subclass __init__.""" self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") self.active_sandboxes = set() - if not hasattr(self, "sandbox_scorers"): - self.sandbox_scorers = [] self.sandbox_wait_for_creation_max_attempts = ( sandbox_wait_for_creation_max_attempts ) @@ -329,48 +299,6 @@ async def upload_bundle( f"{(result.stderr or '')[:200]}" ) - def add_sandbox_scorer(self, scorer: SandboxScorer) -> None: - """Register a sandbox scorer to run during post_rollout.""" - self.sandbox_scorers.append(scorer) - - async def run_sandbox_scorers(self, state: dict[str, Any]) -> None: - """Run all registered sandbox scorers sequentially. - - Each scorer uploads its files + script to the sandbox via bundle, runs - the script, and stores the result in ``state[scorer.state_key]``. - Skipped when there is no sandbox or an error occurred. - """ - sandbox_id = state.get("sandbox_id") - if not sandbox_id: - return - if state.get("error") or state.get("sandbox_error"): - return - - for scorer in self.sandbox_scorers: - try: - files = scorer.files_fn(state) - files["score.py"] = scorer.script - await self.upload_bundle(sandbox_id, file_map=files, dest_dir=scorer.dest_dir) - result = await self.sandbox_client.execute_command( - sandbox_id, - f"python3 {scorer.dest_dir}/score.py", - working_dir=scorer.dest_dir, - timeout=scorer.timeout, - ) - if result.exit_code == 0 and result.stdout.strip(): - state[scorer.state_key] = float(result.stdout.strip().splitlines()[-1]) - else: - stderr = (result.stderr or "")[:200] - self.logger.warning( - f"Sandbox scorer '{scorer.name}' failed (exit={result.exit_code}): {stderr}" - ) - state[scorer.state_key] = 0.0 - except Exception as e: - self.logger.warning( - f"Sandbox scorer '{scorer.name}' error: {type(e).__name__}: {e}" - ) - state[scorer.state_key] = 0.0 - def teardown_sandboxes(self): """Delete all active sandboxes using sync client. diff --git a/verifiers/rubrics/experimental/hybrid_math_rubric.py b/verifiers/rubrics/experimental/hybrid_math_rubric.py index ad6d9313d..592eca076 100644 --- a/verifiers/rubrics/experimental/hybrid_math_rubric.py +++ b/verifiers/rubrics/experimental/hybrid_math_rubric.py @@ -1,5 +1,11 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + from math_verify import parse, verify from openai import AsyncOpenAI +from verifiers.envs.experimental.sandbox_mixin import SandboxMixin from verifiers.parsers.parser import Parser from verifiers.utils.data_utils import extract_boxed_answer @@ -80,9 +86,38 @@ Analysis step by step and Final Judgment: """ +MATH_VERIFY_SCORER_SCRIPT_TEMPLATE = """\ +from pathlib import Path +from math_verify import parse, verify + +solution = Path("{solution_path}").read_text() +answer = Path("{answer_path}").read_text() + +if not answer: + print(0.0) +else: + try: + score = float( + verify( + parse(solution, parsing_timeout=5), + parse(answer, parsing_timeout=5), + timeout_seconds=5, + ) + ) + print(score) + except BaseException: + print(0.0) +""" + + +class HybridMathRubric(SandboxMixin, vf.JudgeRubric): + """Runs rule-based math verification first, with optional LLM judge fallback. -class HybridMathRubric(vf.JudgeRubric): - """Runs rule-based math verification first, with optional LLM judge fallback.""" + When ``score_remotely=True``, math verification runs inside the sandbox + created by the environment. The env must set ``keep_sandbox_for_scoring=True`` + so the sandbox stays alive through scoring; this rubric deletes it in its + ``@vf.cleanup`` handler. + """ DEFAULT_JUDGE_PARSER = None DEFAULT_JUDGE_MODEL = "gpt-5-nano" @@ -92,6 +127,10 @@ class HybridMathRubric(vf.JudgeRubric): DEFAULT_USE_JUDGE_FALLBACK = False DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS = 5 DEFAULT_SCORE_REMOTELY = False + DEFAULT_ANSWER_PATH = "/app/answer.txt" + DEFAULT_SOLUTION_PATH = "/app/solution.txt" + DEFAULT_SCORER_DEST_DIR = "/app" + DEFAULT_SCORER_TIMEOUT = 30 def __init__( self, @@ -103,6 +142,13 @@ def __init__( judge_sampling_args: dict | None = None, math_verify_timeout_seconds: float = DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS, score_remotely: bool = DEFAULT_SCORE_REMOTELY, + answer_path: str = DEFAULT_ANSWER_PATH, + solution_path: str = DEFAULT_SOLUTION_PATH, + scorer_dest_dir: str = DEFAULT_SCORER_DEST_DIR, + scorer_timeout: int = DEFAULT_SCORER_TIMEOUT, + sandbox_client_max_workers: int = 10, + sandbox_client_max_connections: int = 100, + sandbox_client_max_keepalive_connections: int = 50, **kwargs, ): judge_sampling_args = judge_sampling_args or self.DEFAULT_JUDGE_SAMPLING_ARGS @@ -120,24 +166,75 @@ def __init__( self.math_verify_timeout_seconds = math_verify_timeout_seconds self.score_remotely = score_remotely + self.solution_filename = Path(solution_path).name + self.score_script = MATH_VERIFY_SCORER_SCRIPT_TEMPLATE.format( + answer_path=answer_path, + solution_path=solution_path, + ) + self.scorer_dest_dir = scorer_dest_dir + self.scorer_timeout = scorer_timeout self.judge_model = judge_model if use_judge_fallback else None self.class_objects["judge_model"] = self.judge_model - async def math_verify_score( - self, completion: vf.Messages, answer: str, state: vf.State, **kwargs + if self.score_remotely: + self.logger.warning( + "score_remotely=True: expects a sandbox to be kept alive for scoring " + f"(keep_sandbox_for_scoring=True) and the agent's solution written to {answer_path}" + ) + self.init_sandbox_client( + sandbox_client_max_workers=sandbox_client_max_workers, + sandbox_client_max_connections=sandbox_client_max_connections, + sandbox_client_max_keepalive_connections=sandbox_client_max_keepalive_connections, + ) + + async def remote_math_verify_score( + self, answer: str, state: dict[str, Any] ) -> float: - """Basic rule-based math verification. + """Run math_verify inside the sandbox and return the score. - When ``score_remotely=True``, assumes the score was already computed by - a sandbox scorer and stored in ``state["math_verify_score"]``. + Uploads ground trust answer and the scorer script, then compares with the + agent's answer which is expected to be in """ - if self.score_remotely: - return float(state.get("math_verify_score", 0.0)) + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + return 0.0 + if state.get("error") or state.get("sandbox_error"): + return 0.0 + files = {self.solution_filename: answer, "score.py": self.score_script} + try: + await self.upload_bundle( + sandbox_id, file_map=files, dest_dir=self.scorer_dest_dir + ) + result = await self.sandbox_client.execute_command( + sandbox_id, + f"python3 {self.scorer_dest_dir}/score.py", + working_dir=self.scorer_dest_dir, + timeout=self.scorer_timeout, + ) + if result.exit_code == 0 and result.stdout.strip(): + score = float(result.stdout.strip().splitlines()[-1]) + self.logger.debug(f"Remote math_verify scorer scored {score=}") + return score + else: + stderr = (result.stderr or "")[:200] + self.logger.warning( + f"Remote math_verify scorer failed (exit={result.exit_code}): {stderr}" + ) + return 0.0 + except Exception as e: + self.logger.warning( + f"Remote math_verify scorer error: {type(e).__name__}: {e}" + ) + return 0.0 + + async def local_math_verify_score( + self, completion: vf.Messages, answer: str, state: vf.State, **kwargs + ) -> float: response = self.parser.parse_answer(completion) or "" if response == "": - math_verify_score = 0.0 self.logger.debug("Parsed response is empty.") + return 0.0 else: try: math_verify_score = float( @@ -154,10 +251,25 @@ async def math_verify_score( ) ) except BaseException as e: - self.logger.warning( - f"Math verification failed with {type(e).__name__}: {e!r}" - ) - math_verify_score = 0.0 + self.logger.warning(f"Math verification failed: {e!r}") + return 0.0 + state["math_verify_score"] = math_verify_score + return math_verify_score + + async def math_verify_score( + self, completion: vf.Messages, answer: str, state: vf.State, **kwargs + ) -> float: + """Basic rule-based math verification. + + When ``score_remotely=True``, runs the scorer script inside the + sandbox that the environment created. + """ + if self.score_remotely: + math_verify_score = await self.remote_math_verify_score(answer, state) + else: + math_verify_score = await self.local_math_verify_score( + completion, answer, state + ) state["math_verify_score"] = math_verify_score return math_verify_score @@ -190,3 +302,12 @@ async def correct_answer(self, state: vf.State, **kwargs) -> float: return float( state.get("math_verify_score", 0.0) or state.get("judge_score", 0.0) ) + + @vf.cleanup + async def cleanup_sandbox(self, state: vf.State) -> None: + """Delete the sandbox after scoring is complete.""" + if not self.score_remotely: + return + sandbox_id = state.get("sandbox_id") + if sandbox_id: + await self.delete_sandbox(sandbox_id) From ee1cc99b20c0fe8814c030926c6eb189cb6e4c80 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 00:12:39 +0530 Subject: [PATCH 40/47] do not upload bundle --- .../experimental/hybrid_math_rubric.py | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/verifiers/rubrics/experimental/hybrid_math_rubric.py b/verifiers/rubrics/experimental/hybrid_math_rubric.py index 592eca076..5e91eaf54 100644 --- a/verifiers/rubrics/experimental/hybrid_math_rubric.py +++ b/verifiers/rubrics/experimental/hybrid_math_rubric.py @@ -1,6 +1,6 @@ from __future__ import annotations -from pathlib import Path +import asyncio from typing import Any from math_verify import parse, verify @@ -126,10 +126,12 @@ class HybridMathRubric(SandboxMixin, vf.JudgeRubric): DEFAULT_JUDGE_SAMPLING_ARGS = {} DEFAULT_USE_JUDGE_FALLBACK = False DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS = 5 + + # Remote scoring DEFAULT_SCORE_REMOTELY = False DEFAULT_ANSWER_PATH = "/app/answer.txt" DEFAULT_SOLUTION_PATH = "/app/solution.txt" - DEFAULT_SCORER_DEST_DIR = "/app" + DEFAULT_SCORER_PATH = "/app/score.py" DEFAULT_SCORER_TIMEOUT = 30 def __init__( @@ -144,7 +146,7 @@ def __init__( score_remotely: bool = DEFAULT_SCORE_REMOTELY, answer_path: str = DEFAULT_ANSWER_PATH, solution_path: str = DEFAULT_SOLUTION_PATH, - scorer_dest_dir: str = DEFAULT_SCORER_DEST_DIR, + scorer_path: str = DEFAULT_SCORER_PATH, scorer_timeout: int = DEFAULT_SCORER_TIMEOUT, sandbox_client_max_workers: int = 10, sandbox_client_max_connections: int = 100, @@ -166,12 +168,12 @@ def __init__( self.math_verify_timeout_seconds = math_verify_timeout_seconds self.score_remotely = score_remotely - self.solution_filename = Path(solution_path).name + self.solution_path = solution_path + self.scorer_path = scorer_path self.score_script = MATH_VERIFY_SCORER_SCRIPT_TEMPLATE.format( answer_path=answer_path, solution_path=solution_path, ) - self.scorer_dest_dir = scorer_dest_dir self.scorer_timeout = scorer_timeout self.judge_model = judge_model if use_judge_fallback else None self.class_objects["judge_model"] = self.judge_model @@ -201,15 +203,14 @@ async def remote_math_verify_score( if state.get("error") or state.get("sandbox_error"): return 0.0 - files = {self.solution_filename: answer, "score.py": self.score_script} try: - await self.upload_bundle( - sandbox_id, file_map=files, dest_dir=self.scorer_dest_dir + await asyncio.gather( + self.upload_content(sandbox_id, answer, self.solution_path), + self.upload_content(sandbox_id, self.score_script, self.scorer_path), ) result = await self.sandbox_client.execute_command( sandbox_id, - f"python3 {self.scorer_dest_dir}/score.py", - working_dir=self.scorer_dest_dir, + f"python3 {self.scorer_path}", timeout=self.scorer_timeout, ) if result.exit_code == 0 and result.stdout.strip(): From bb4b86c13b2ee93cb6e306a6dc6043964366e68c Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 00:17:22 +0530 Subject: [PATCH 41/47] align local and remote math verify --- .../experimental/hybrid_math_rubric.py | 49 +++++++------------ 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/verifiers/rubrics/experimental/hybrid_math_rubric.py b/verifiers/rubrics/experimental/hybrid_math_rubric.py index 5e91eaf54..9bf77cf99 100644 --- a/verifiers/rubrics/experimental/hybrid_math_rubric.py +++ b/verifiers/rubrics/experimental/hybrid_math_rubric.py @@ -99,8 +99,8 @@ try: score = float( verify( - parse(solution, parsing_timeout=5), - parse(answer, parsing_timeout=5), + parse("\\\\boxed{{" + solution + "}}", parsing_timeout=5), + parse("\\\\boxed{{" + answer + "}}", parsing_timeout=5), timeout_seconds=5, ) ) @@ -125,8 +125,7 @@ class HybridMathRubric(SandboxMixin, vf.JudgeRubric): DEFAULT_JUDGE_PROMPT = DEFAULT_JUDGE_PROMPT DEFAULT_JUDGE_SAMPLING_ARGS = {} DEFAULT_USE_JUDGE_FALLBACK = False - DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS = 5 - + # Remote scoring DEFAULT_SCORE_REMOTELY = False DEFAULT_ANSWER_PATH = "/app/answer.txt" @@ -142,7 +141,6 @@ def __init__( judge_model: str = DEFAULT_JUDGE_MODEL, judge_prompt: str = DEFAULT_JUDGE_PROMPT, judge_sampling_args: dict | None = None, - math_verify_timeout_seconds: float = DEFAULT_MATH_VERIFY_TIMEOUT_SECONDS, score_remotely: bool = DEFAULT_SCORE_REMOTELY, answer_path: str = DEFAULT_ANSWER_PATH, solution_path: str = DEFAULT_SOLUTION_PATH, @@ -166,7 +164,6 @@ def __init__( self.add_reward_func(self.judge_score, weight=0) self.add_reward_func(self.correct_answer, weight=1) - self.math_verify_timeout_seconds = math_verify_timeout_seconds self.score_remotely = score_remotely self.solution_path = solution_path self.scorer_path = scorer_path @@ -227,35 +224,29 @@ async def remote_math_verify_score( self.logger.warning( f"Remote math_verify scorer error: {type(e).__name__}: {e}" ) - return 0.0 + return 0.0 async def local_math_verify_score( - self, completion: vf.Messages, answer: str, state: vf.State, **kwargs + self, completion: vf.Messages, answer: str, **kwargs ) -> float: response = self.parser.parse_answer(completion) or "" if response == "": self.logger.debug("Parsed response is empty.") return 0.0 - else: - try: - math_verify_score = float( - verify( - parse( - f"\\boxed{{{answer}}}", - parsing_timeout=int(self.math_verify_timeout_seconds), - ), - parse( - f"\\boxed{{{response}}}", - parsing_timeout=int(self.math_verify_timeout_seconds), - ), - timeout_seconds=int(self.math_verify_timeout_seconds), - ) + + try: + score = float( + verify( + parse(f"\\boxed{{{answer}}}", parsing_timeout=5), + parse(f"\\boxed{{{response}}}", parsing_timeout=5), + timeout_seconds=5, ) - except BaseException as e: - self.logger.warning(f"Math verification failed: {e!r}") - return 0.0 - state["math_verify_score"] = math_verify_score - return math_verify_score + ) + self.logger.debug(f"Local math_verify scored {score=}") + return score + except BaseException as e: + self.logger.warning(f"Math verification failed: {e!r}") + return 0.0 async def math_verify_score( self, completion: vf.Messages, answer: str, state: vf.State, **kwargs @@ -268,9 +259,7 @@ async def math_verify_score( if self.score_remotely: math_verify_score = await self.remote_math_verify_score(answer, state) else: - math_verify_score = await self.local_math_verify_score( - completion, answer, state - ) + math_verify_score = await self.local_math_verify_score(completion, answer) state["math_verify_score"] = math_verify_score return math_verify_score From e0a67131966eb133cf181a3f7efa9e462085e666 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 01:17:04 +0530 Subject: [PATCH 42/47] add read_file --- verifiers/envs/experimental/sandbox_mixin.py | 22 ++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/verifiers/envs/experimental/sandbox_mixin.py b/verifiers/envs/experimental/sandbox_mixin.py index ba3d8bcef..3d715490d 100644 --- a/verifiers/envs/experimental/sandbox_mixin.py +++ b/verifiers/envs/experimental/sandbox_mixin.py @@ -253,6 +253,28 @@ async def upload_content( finally: Path(local_path).unlink(missing_ok=True) + async def read_file( + self, + sandbox_id: str, + remote_path: str, + timeout: int = 10, + ) -> str | None: + """Read a file from the sandbox, returning its contents or None on failure.""" + try: + result = await self.sandbox_client.execute_command( + sandbox_id, + f"cat {remote_path}", + timeout=timeout, + ) + if result.exit_code == 0: + return result.stdout or "" + return None + except Exception as e: + self.logger.warning( + f"Failed to read {remote_path} from {sandbox_id}: {type(e).__name__}: {e}" + ) + return None + async def upload_bundle( self, sandbox_id: str, From 7996d8ba3f72e45bc9344575a4e78210cea59d8c Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 01:30:04 +0530 Subject: [PATCH 43/47] remote math rubric --- .../experimental/hybrid_math_rubric.py | 250 ++++++++++-------- 1 file changed, 140 insertions(+), 110 deletions(-) diff --git a/verifiers/rubrics/experimental/hybrid_math_rubric.py b/verifiers/rubrics/experimental/hybrid_math_rubric.py index 9bf77cf99..91e1c2b7e 100644 --- a/verifiers/rubrics/experimental/hybrid_math_rubric.py +++ b/verifiers/rubrics/experimental/hybrid_math_rubric.py @@ -1,13 +1,13 @@ from __future__ import annotations import asyncio -from typing import Any from math_verify import parse, verify from openai import AsyncOpenAI from verifiers.envs.experimental.sandbox_mixin import SandboxMixin from verifiers.parsers.parser import Parser from verifiers.utils.data_utils import extract_boxed_answer +from verifiers.utils.logging_utils import truncate import verifiers as vf @@ -86,6 +86,105 @@ Analysis step by step and Final Judgment: """ + +class HybridMathRubric(vf.JudgeRubric): + """Runs rule-based math verification first, with optional LLM judge fallback.""" + + DEFAULT_JUDGE_PARSER = None + DEFAULT_JUDGE_MODEL = "gpt-5-nano" + DEFAULT_JUDGE_CLIENT = None + DEFAULT_JUDGE_PROMPT = DEFAULT_JUDGE_PROMPT + DEFAULT_JUDGE_SAMPLING_ARGS = {} + DEFAULT_USE_JUDGE_FALLBACK = False + + def __init__( + self, + parser: Parser | None = DEFAULT_JUDGE_PARSER, + use_judge_fallback: bool = DEFAULT_USE_JUDGE_FALLBACK, + judge_client: AsyncOpenAI | None = DEFAULT_JUDGE_CLIENT, + judge_model: str = DEFAULT_JUDGE_MODEL, + judge_prompt: str = DEFAULT_JUDGE_PROMPT, + judge_sampling_args: dict | None = None, + **kwargs, + ): + judge_sampling_args = judge_sampling_args or self.DEFAULT_JUDGE_SAMPLING_ARGS + if judge_client is None and not use_judge_fallback: + judge_client = AsyncOpenAI(api_key="unused") + super().__init__( + judge_client=judge_client, + judge_sampling_args=judge_sampling_args, + judge_prompt=judge_prompt, + parser=parser, + **kwargs, + ) + # Reward functions + self.add_reward_func(self.math_verify_score, weight=0) + self.add_reward_func(self.judge_score, weight=0) + self.add_reward_func(self.correct_answer, weight=1) + + self.judge_model = judge_model if use_judge_fallback else None + self.class_objects["judge_model"] = self.judge_model + + async def math_verify_score( + self, completion: vf.Messages, answer: str, state: vf.State, **kwargs + ) -> float: + """Basic rule-based math verification.""" + response = self.parser.parse_answer(completion) or "" + if response == "": + self.logger.debug("Parsed response is empty.") + state["math_verify_score"] = 0.0 + return 0.0 + + try: + score = float( + verify( + parse(f"\\boxed{{{answer}}}", parsing_timeout=5), + parse(f"\\boxed{{{response}}}", parsing_timeout=5), + timeout_seconds=5, + ) + ) + answer_str = truncate(answer.strip(), 20) + response_str = truncate(response.strip(), 20) + self.logger.debug( + f"Local math_verify {score} (answer={answer_str}, response={response_str})" + ) + except BaseException as e: + self.logger.warning(f"Math verification failed: {e!r}") + score = 0.0 + state["math_verify_score"] = score + return score + + async def judge_score( + self, + prompt: vf.Messages, + completion: vf.Messages, + answer: str, + state: vf.State, + **kwargs, + ) -> float: + """Calls judge model if math verification did not pass and a judge model is set, else returns math verification score.""" + if state.get("math_verify_score", 0) == 1 or self.judge_model is None: + return state.get("math_verify_score", 0) + + judge_response = await self.judge(prompt, completion, answer, state) + judge_result = ( + extract_boxed_answer(judge_response) + if len(judge_response) != 1 + else judge_response + ) + judge_score = 1.0 if judge_result == "A" else 0.0 + self.logger.debug(f"{judge_score=} ({judge_result=})") + state["judge_result"] = judge_result + state["judge_score"] = judge_score + return judge_score + + async def correct_answer(self, state: vf.State, **kwargs) -> float: + """Whether either math verification or judge passed.""" + return float( + state.get("math_verify_score", 0.0) or state.get("judge_score", 0.0) + ) + + MATH_VERIFY_SCORER_SCRIPT_TEMPLATE = """\ from pathlib import Path from math_verify import parse, verify @@ -110,24 +209,16 @@ """ -class HybridMathRubric(SandboxMixin, vf.JudgeRubric): - """Runs rule-based math verification first, with optional LLM judge fallback. +class RemoteHybridMathRubric(SandboxMixin, HybridMathRubric): + """HybridMathRubric that scores inside the sandbox. - When ``score_remotely=True``, math verification runs inside the sandbox - created by the environment. The env must set ``keep_sandbox_for_scoring=True`` - so the sandbox stays alive through scoring; this rubric deletes it in its - ``@vf.cleanup`` handler. + Expects the environment to keep the sandbox alive for scoring + (``keep_sandbox_for_scoring=True``) and the agent's answer written to + ``answer_path``. Ground-truth is uploaded to ``solution_path`` and a + scorer script is uploaded and executed. The sandbox is deleted in the + ``@vf.cleanup`` handler after scoring. """ - DEFAULT_JUDGE_PARSER = None - DEFAULT_JUDGE_MODEL = "gpt-5-nano" - DEFAULT_JUDGE_CLIENT = None - DEFAULT_JUDGE_PROMPT = DEFAULT_JUDGE_PROMPT - DEFAULT_JUDGE_SAMPLING_ARGS = {} - DEFAULT_USE_JUDGE_FALLBACK = False - - # Remote scoring - DEFAULT_SCORE_REMOTELY = False DEFAULT_ANSWER_PATH = "/app/answer.txt" DEFAULT_SOLUTION_PATH = "/app/solution.txt" DEFAULT_SCORER_PATH = "/app/score.py" @@ -135,13 +226,6 @@ class HybridMathRubric(SandboxMixin, vf.JudgeRubric): def __init__( self, - judge_parser: Parser | None = DEFAULT_JUDGE_PARSER, - use_judge_fallback: bool = DEFAULT_USE_JUDGE_FALLBACK, - judge_client: AsyncOpenAI | None = DEFAULT_JUDGE_CLIENT, - judge_model: str = DEFAULT_JUDGE_MODEL, - judge_prompt: str = DEFAULT_JUDGE_PROMPT, - judge_sampling_args: dict | None = None, - score_remotely: bool = DEFAULT_SCORE_REMOTELY, answer_path: str = DEFAULT_ANSWER_PATH, solution_path: str = DEFAULT_SOLUTION_PATH, scorer_path: str = DEFAULT_SCORER_PATH, @@ -151,53 +235,36 @@ def __init__( sandbox_client_max_keepalive_connections: int = 50, **kwargs, ): - judge_sampling_args = judge_sampling_args or self.DEFAULT_JUDGE_SAMPLING_ARGS - super().__init__( - judge_client=judge_client, - judge_sampling_args=judge_sampling_args, - judge_prompt=judge_prompt, - parser=judge_parser, - **kwargs, - ) - # Reward functions - self.add_reward_func(self.math_verify_score, weight=0) - self.add_reward_func(self.judge_score, weight=0) - self.add_reward_func(self.correct_answer, weight=1) - - self.score_remotely = score_remotely + super().__init__(**kwargs) + self.answer_path = answer_path self.solution_path = solution_path self.scorer_path = scorer_path + self.scorer_timeout = scorer_timeout self.score_script = MATH_VERIFY_SCORER_SCRIPT_TEMPLATE.format( answer_path=answer_path, solution_path=solution_path, ) - self.scorer_timeout = scorer_timeout - self.judge_model = judge_model if use_judge_fallback else None - self.class_objects["judge_model"] = self.judge_model - if self.score_remotely: - self.logger.warning( - "score_remotely=True: expects a sandbox to be kept alive for scoring " - f"(keep_sandbox_for_scoring=True) and the agent's solution written to {answer_path}" - ) - self.init_sandbox_client( - sandbox_client_max_workers=sandbox_client_max_workers, - sandbox_client_max_connections=sandbox_client_max_connections, - sandbox_client_max_keepalive_connections=sandbox_client_max_keepalive_connections, - ) + self.logger.warning( + "RemoteHybridMathRubric expects a sandbox kept alive for scoring " + f"(keep_sandbox_for_scoring=True) and the agent's answer written to {answer_path}" + ) + self.init_sandbox_client( + sandbox_client_max_workers=sandbox_client_max_workers, + sandbox_client_max_connections=sandbox_client_max_connections, + sandbox_client_max_keepalive_connections=sandbox_client_max_keepalive_connections, + ) - async def remote_math_verify_score( - self, answer: str, state: dict[str, Any] + async def math_verify_score( + self, completion: vf.Messages, answer: str, state: vf.State, **kwargs ) -> float: - """Run math_verify inside the sandbox and return the score. - - Uploads ground trust answer and the scorer script, then compares with the - agent's answer which is expected to be in - """ + """Run math_verify inside the sandbox.""" sandbox_id = state.get("sandbox_id") if not sandbox_id: + state["math_verify_score"] = 0.0 return 0.0 if state.get("error") or state.get("sandbox_error"): + state["math_verify_score"] = 0.0 return 0.0 try: @@ -212,56 +279,19 @@ async def remote_math_verify_score( ) if result.exit_code == 0 and result.stdout.strip(): score = float(result.stdout.strip().splitlines()[-1]) - self.logger.debug(f"Remote math_verify scorer scored {score=}") - return score + self.logger.debug(f"Remote math_verify scored {score=}") else: stderr = (result.stderr or "")[:200] self.logger.warning( - f"Remote math_verify scorer failed (exit={result.exit_code}): {stderr}" + f"Remote math_verify failed (exit={result.exit_code}): {stderr}" ) - return 0.0 + score = 0.0 except Exception as e: - self.logger.warning( - f"Remote math_verify scorer error: {type(e).__name__}: {e}" - ) - return 0.0 - - async def local_math_verify_score( - self, completion: vf.Messages, answer: str, **kwargs - ) -> float: - response = self.parser.parse_answer(completion) or "" - if response == "": - self.logger.debug("Parsed response is empty.") - return 0.0 - - try: - score = float( - verify( - parse(f"\\boxed{{{answer}}}", parsing_timeout=5), - parse(f"\\boxed{{{response}}}", parsing_timeout=5), - timeout_seconds=5, - ) - ) - self.logger.debug(f"Local math_verify scored {score=}") - return score - except BaseException as e: - self.logger.warning(f"Math verification failed: {e!r}") - return 0.0 + self.logger.warning(f"Remote math_verify error: {type(e).__name__}: {e}") + score = 0.0 - async def math_verify_score( - self, completion: vf.Messages, answer: str, state: vf.State, **kwargs - ) -> float: - """Basic rule-based math verification. - - When ``score_remotely=True``, runs the scorer script inside the - sandbox that the environment created. - """ - if self.score_remotely: - math_verify_score = await self.remote_math_verify_score(answer, state) - else: - math_verify_score = await self.local_math_verify_score(completion, answer) - state["math_verify_score"] = math_verify_score - return math_verify_score + state["math_verify_score"] = score + return score async def judge_score( self, @@ -271,10 +301,18 @@ async def judge_score( state: vf.State, **kwargs, ) -> float: - """Calls judge model if math verification did not pass and a judge model is set, else returns math verification score.""" + """Judge with response read from the sandbox file.""" if state.get("math_verify_score", 0) == 1 or self.judge_model is None: return state.get("math_verify_score", 0) + sandbox_id = state.get("sandbox_id") + if not sandbox_id: + return 0.0 + response = await self.read_file(sandbox_id, self.answer_path) + if not response: + return 0.0 + completion = [vf.AssistantMessage(content=response)] + judge_response = await self.judge(prompt, completion, answer, state) judge_result = ( extract_boxed_answer(judge_response) @@ -287,17 +325,9 @@ async def judge_score( state["judge_score"] = judge_score return judge_score - async def correct_answer(self, state: vf.State, **kwargs) -> float: - """Whether either math verification or judge passed.""" - return float( - state.get("math_verify_score", 0.0) or state.get("judge_score", 0.0) - ) - @vf.cleanup async def cleanup_sandbox(self, state: vf.State) -> None: """Delete the sandbox after scoring is complete.""" - if not self.score_remotely: - return sandbox_id = state.get("sandbox_id") if sandbox_id: await self.delete_sandbox(sandbox_id) From 724383dd37572bb5be3f8c95133299e685c31a75 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 01:53:52 +0530 Subject: [PATCH 44/47] remove script --- .../scripts/math_verify_scorer.py | 27 ------------------- 1 file changed, 27 deletions(-) delete mode 100644 verifiers/rubrics/experimental/scripts/math_verify_scorer.py diff --git a/verifiers/rubrics/experimental/scripts/math_verify_scorer.py b/verifiers/rubrics/experimental/scripts/math_verify_scorer.py deleted file mode 100644 index 5e2b9df4f..000000000 --- a/verifiers/rubrics/experimental/scripts/math_verify_scorer.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Sandbox scoring script for math_verify. - -Reads ground truth from ground_truth.txt and the agent's answer from answer.txt. -Prints a single float (1.0 or 0.0) to stdout. -""" - -from pathlib import Path - -from math_verify import parse, verify - -ground_truth = Path("ground_truth.txt").read_text() -response = Path("answer.txt").read_text() - -if not response: - print(0.0) -else: - try: - score = float( - verify( - parse(ground_truth, parsing_timeout=5), - parse(response, parsing_timeout=5), - timeout_seconds=5, - ) - ) - print(score) - except BaseException: - print(0.0) From 4f46a4edfa7e55769c3aa2ba52bff2a3ef9f4a29 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 02:29:38 +0530 Subject: [PATCH 45/47] ind retries for scoring --- tests/test_environment_extra.py | 3 ++ verifiers/envs/env_group.py | 10 ++++++- verifiers/envs/environment.py | 38 +++++++++++++++++++++----- verifiers/types.py | 2 ++ verifiers/utils/eval_utils.py | 4 +++ verifiers/workers/client/env_client.py | 8 ++++++ verifiers/workers/server/env_server.py | 4 +++ verifiers/workers/types.py | 4 +++ 8 files changed, 65 insertions(+), 8 deletions(-) diff --git a/tests/test_environment_extra.py b/tests/test_environment_extra.py index 789e477df..df7fc46b8 100644 --- a/tests/test_environment_extra.py +++ b/tests/test_environment_extra.py @@ -329,6 +329,7 @@ async def run_group( sampling_args, max_retries, state_columns, + **kwargs, ): assert isinstance(client_config, ClientConfig) self.client_urls_per_group.append(str(client_config.api_base_url)) @@ -424,6 +425,7 @@ async def run_group( sampling_args, max_retries, state_columns, + **kwargs, ): assert isinstance(client_config, ClientConfig) self.client_url = str(client_config.api_base_url) @@ -483,6 +485,7 @@ async def run_rollout( sampling_args, max_retries, state_columns, + **kwargs, ): assert isinstance(client_config, ClientConfig) self.client_url = str(client_config.api_base_url) diff --git a/verifiers/envs/env_group.py b/verifiers/envs/env_group.py index bf17df5ba..67f0b8f97 100644 --- a/verifiers/envs/env_group.py +++ b/verifiers/envs/env_group.py @@ -278,11 +278,15 @@ async def run_rollout( # type: ignore[override] max_retries: int = 0, state_columns: list[str] | None = None, env_client: EnvClient | None = None, + max_rollout_retries: int | None = None, + max_scoring_retries: int | None = None, ) -> vf.RolloutOutput: env = self.get_env_for_task(input["task"]) env_client = env_client or env.env_client or self.env_client return await env.run_rollout( - input, client, model, sampling_args, max_retries, state_columns, env_client + input, client, model, sampling_args, max_retries, state_columns, env_client, + max_rollout_retries=max_rollout_retries, + max_scoring_retries=max_scoring_retries, ) @final @@ -295,6 +299,8 @@ async def run_group( # type: ignore[override] max_retries: int = 0, state_columns: list[str] | None = None, env_client: EnvClient | None = None, + max_rollout_retries: int | None = None, + max_scoring_retries: int | None = None, ) -> list[vf.RolloutOutput]: env = self.get_env_for_task(group_inputs[0]["task"]) env_client = env_client or env.env_client or self.env_client @@ -306,6 +312,8 @@ async def run_group( # type: ignore[override] max_retries, state_columns, env_client, + max_rollout_retries=max_rollout_retries, + max_scoring_retries=max_scoring_retries, ) @final diff --git a/verifiers/envs/environment.py b/verifiers/envs/environment.py index f880ab8b7..845b7b85f 100644 --- a/verifiers/envs/environment.py +++ b/verifiers/envs/environment.py @@ -710,9 +710,14 @@ async def run_rollout( max_retries: int = 0, state_columns: list[str] | None = None, env_client: EnvClient | None = None, + max_rollout_retries: int | None = None, + max_scoring_retries: int | None = None, ) -> RolloutOutput: """Generate and, optionally, score a rollout.""" + effective_rollout_retries = max_rollout_retries if max_rollout_retries is not None else max_retries + effective_scoring_retries = max_scoring_retries if max_scoring_retries is not None else max_retries + resolved_client_config: ClientConfig | None = None if isinstance(client, ClientConfig): resolved_client_config = resolve_client_config(client) @@ -730,28 +735,31 @@ async def run_rollout( sampling_args, max_retries, state_columns, + max_rollout_retries=effective_rollout_retries, + max_scoring_retries=effective_scoring_retries, ) resolved_client = resolve_client(client) async def run_rollout_attempt() -> State: - state = await self.rollout( + return await self.rollout( input, resolved_client, model, sampling_args, ) + state = await maybe_retry(run_rollout_attempt, max_retries=effective_rollout_retries)() + + async def run_scoring_attempt() -> State: if self.score_rollouts: await self.rubric.score_rollout(state) else: await self.rubric.dummy_score_rollout(state) - await self.rubric.cleanup(state) - return state - state = await maybe_retry(run_rollout_attempt, max_retries=max_retries)() + state = await maybe_retry(run_scoring_attempt, max_retries=effective_scoring_retries)() output = state_to_output(state, state_columns or []) return output @@ -765,10 +773,15 @@ async def run_group( max_retries: int = 0, state_columns: list[str] | None = None, env_client: EnvClient | None = None, + max_rollout_retries: int | None = None, + max_scoring_retries: int | None = None, **kwargs, ) -> list[RolloutOutput]: """Generate and, optionally, score one group.""" + effective_rollout_retries = max_rollout_retries if max_rollout_retries is not None else max_retries + effective_scoring_retries = max_scoring_retries if max_scoring_retries is not None else max_retries + resolved_client_config: ClientConfig | None = None if isinstance(client, ClientConfig): resolved_client_config = resolve_client_config(client) @@ -786,11 +799,13 @@ async def run_group( sampling_args, max_retries, state_columns, + max_rollout_retries=effective_rollout_retries, + max_scoring_retries=effective_scoring_retries, ) resolved_client = resolve_client(client) - async def run_group_attempt() -> list[State]: + async def run_group_rollout_attempt() -> list[State]: rollout_tasks = [ self.rollout( input, @@ -800,8 +815,11 @@ async def run_group_attempt() -> list[State]: ) for input in group_inputs ] - group_states = await asyncio.gather(*rollout_tasks) + return await asyncio.gather(*rollout_tasks) + + group_states = await maybe_retry(run_group_rollout_attempt, max_retries=effective_rollout_retries)() + async def run_group_scoring_attempt() -> list[State]: if self.score_rollouts: await self.rubric.score_group(group_states) else: @@ -812,7 +830,7 @@ async def run_group_attempt() -> list[State]: return group_states - group_states = await maybe_retry(run_group_attempt, max_retries=max_retries)() + group_states = await maybe_retry(run_group_scoring_attempt, max_retries=effective_scoring_retries)() outputs = [ state_to_output(state, state_columns or []) for state in group_states ] @@ -832,6 +850,8 @@ async def generate( hf_hub_dataset_name: str | None = None, independent_scoring: bool = False, max_retries: int = 0, + max_rollout_retries: int | None = None, + max_scoring_retries: int | None = None, on_start: StartCallback | None = None, on_progress: ProgressCallback | list[ProgressCallback] | None = None, on_log: LogCallback | None = None, @@ -1027,6 +1047,8 @@ def get_client_for_group() -> Client | ClientConfig: sampling_args, max_retries=max_retries, state_columns=state_columns, + max_rollout_retries=max_rollout_retries, + max_scoring_retries=max_scoring_retries, ), ), ) @@ -1053,6 +1075,8 @@ def get_client_for_group() -> Client | ClientConfig: sampling_args, max_retries=max_retries, state_columns=state_columns, + max_rollout_retries=max_rollout_retries, + max_scoring_retries=max_scoring_retries, ), ), ) diff --git a/verifiers/types.py b/verifiers/types.py index 76a0069dc..f6bb8fd2d 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -491,6 +491,8 @@ class EvalConfig(BaseModel): independent_scoring: bool = False extra_env_kwargs: dict = {} max_retries: int = 0 + max_rollout_retries: int | None = None + max_scoring_retries: int | None = None disable_env_server: bool = False # logging verbose: bool = False diff --git a/verifiers/utils/eval_utils.py b/verifiers/utils/eval_utils.py index 7737a3002..9499679d3 100644 --- a/verifiers/utils/eval_utils.py +++ b/verifiers/utils/eval_utils.py @@ -329,6 +329,8 @@ def load_toml_config(path: Path) -> list[dict]: "max_concurrent", "independent_scoring", "max_retries", + "max_rollout_retries", + "max_scoring_retries", "disable_env_server", # logging "verbose", @@ -656,6 +658,8 @@ async def run_evaluation( hf_hub_dataset_name=config.hf_hub_dataset_name, independent_scoring=config.independent_scoring, max_retries=config.max_retries, + max_rollout_retries=config.max_rollout_retries, + max_scoring_retries=config.max_scoring_retries, on_start=on_start, on_progress=on_progress, on_log=on_log, diff --git a/verifiers/workers/client/env_client.py b/verifiers/workers/client/env_client.py index 3a0e6c806..c8e018179 100644 --- a/verifiers/workers/client/env_client.py +++ b/verifiers/workers/client/env_client.py @@ -50,6 +50,8 @@ async def run_rollout( sampling_args: SamplingArgs, max_retries: int = 0, state_columns: list[str] | None = None, + max_rollout_retries: int | None = None, + max_scoring_retries: int | None = None, ) -> RolloutOutput: resolved_client_config = resolve_client_config(client_config) request = RunRolloutRequest( @@ -59,6 +61,8 @@ async def run_rollout( sampling_args=sampling_args, max_retries=max_retries, state_columns=state_columns, + max_rollout_retries=max_rollout_retries, + max_scoring_retries=max_scoring_retries, ) response = await self.handle_run_rollout_request(request, timeout=None) assert response.output is not None @@ -72,6 +76,8 @@ async def run_group( sampling_args: SamplingArgs, max_retries: int = 0, state_columns: list[str] | None = None, + max_rollout_retries: int | None = None, + max_scoring_retries: int | None = None, ) -> list[RolloutOutput]: resolved_client_config = resolve_client_config(client_config) request = RunGroupRequest( @@ -81,6 +87,8 @@ async def run_group( sampling_args=sampling_args, max_retries=max_retries, state_columns=state_columns, + max_rollout_retries=max_rollout_retries, + max_scoring_retries=max_scoring_retries, ) response = await self.handle_run_group_request(request, timeout=None) assert response.outputs is not None diff --git a/verifiers/workers/server/env_server.py b/verifiers/workers/server/env_server.py index cf279549e..b783432b7 100644 --- a/verifiers/workers/server/env_server.py +++ b/verifiers/workers/server/env_server.py @@ -145,6 +145,8 @@ async def handle_run_rollout( sampling_args=request.sampling_args, max_retries=request.max_retries, state_columns=request.state_columns, + max_rollout_retries=request.max_rollout_retries, + max_scoring_retries=request.max_scoring_retries, ) return RunRolloutResponse(output=output) @@ -157,6 +159,8 @@ async def handle_run_group(self, request: RunGroupRequest) -> RunGroupResponse: sampling_args=request.sampling_args, max_retries=request.max_retries, state_columns=request.state_columns, + max_rollout_retries=request.max_rollout_retries, + max_scoring_retries=request.max_scoring_retries, ) return RunGroupResponse(outputs=outputs) diff --git a/verifiers/workers/types.py b/verifiers/workers/types.py index 25ddbbc94..a0eb10223 100644 --- a/verifiers/workers/types.py +++ b/verifiers/workers/types.py @@ -49,6 +49,8 @@ class RunRolloutRequest(BaseRequest): sampling_args: SamplingArgs max_retries: int state_columns: list[str] | None + max_rollout_retries: int | None = None + max_scoring_retries: int | None = None class RunRolloutResponse(BaseResponse): @@ -66,6 +68,8 @@ class RunGroupRequest(BaseRequest): sampling_args: SamplingArgs max_retries: int state_columns: list[str] | None + max_rollout_retries: int | None = None + max_scoring_retries: int | None = None class RunGroupResponse(BaseResponse): From f607f4abf25310fc75a6ffd08f3927a0a0ccfc16 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 02:36:36 +0530 Subject: [PATCH 46/47] catch vf.errors to populate state to allow retries --- verifiers/rubrics/rubric.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/verifiers/rubrics/rubric.py b/verifiers/rubrics/rubric.py index 57b98dc74..83fadb64d 100644 --- a/verifiers/rubrics/rubric.py +++ b/verifiers/rubrics/rubric.py @@ -14,6 +14,7 @@ from verifiers.utils.async_utils import maybe_await + class Rubric: """ Rubric class for reward functions. @@ -148,6 +149,9 @@ def func(completion, answer, **kwargs): if any(p.kind == p.VAR_KEYWORD for p in sig.parameters.values()): try: ans = float(await maybe_await(func, **merged)) + except vf.Error as e: + state["error"] = e + ans = 0.0 except Exception as e: self.logger.error( f"Error calling reward function {func.__name__}: {e}" # type: ignore[unresolved-attribute] @@ -157,6 +161,9 @@ def func(completion, answer, **kwargs): allowed = {k: v for k, v in merged.items() if k in sig.parameters} try: ans = float(await maybe_await(func, **allowed)) + except vf.Error as e: + state["error"] = e + ans = 0.0 except Exception as e: self.logger.error( f"Error calling reward function {func.__name__}: {e}" # type: ignore[unresolved-attribute] @@ -207,6 +214,10 @@ async def _call_group_reward_func( if any(p.kind == p.VAR_KEYWORD for p in sig.parameters.values()): try: ans = await maybe_await(func, **merged) + except vf.Error as e: + for state in states: + state["error"] = e + ans = [0.0] * len(states) except Exception as e: self.logger.error( f"Error calling group reward function {func.__name__}: {e}" # type: ignore[unresolved-attribute] @@ -216,6 +227,10 @@ async def _call_group_reward_func( allowed = {k: v for k, v in merged.items() if k in sig.parameters} try: ans = await maybe_await(func, **allowed) + except vf.Error as e: + for state in states: + state["error"] = e + ans = [0.0] * len(states) except Exception as e: self.logger.error( f"Error calling group reward function {func.__name__}: {e}" # type: ignore[unresolved-attribute] From 302f0d0e82515768643767b301dbe953f979dfe2 Mon Sep 17 00:00:00 2001 From: Mika Senghaas Date: Fri, 13 Mar 2026 02:50:52 +0530 Subject: [PATCH 47/47] scoring retry tests and state copy for retry isolation Co-Authored-By: Claude Opus 4.6 --- tests/test_environment.py | 110 ++++++++++++++++++++++++++++++++++ verifiers/envs/environment.py | 13 ++++ 2 files changed, 123 insertions(+) diff --git a/tests/test_environment.py b/tests/test_environment.py index fce273e2a..ad228f7d4 100644 --- a/tests/test_environment.py +++ b/tests/test_environment.py @@ -619,6 +619,116 @@ async def test_error_in_state_after_max_retries_exhausted( assert "InfraError" == error_info["error"] +class RetryScoringRubric(Rubric): + """Rubric with a reward function that fails first N times with configurable error type.""" + + def __init__(self, fail_count: int, error_type: type = vf.InfraError, **kwargs): + self.fail_count = fail_count + self.error_type = error_type + self.scoring_call_count = 0 + + def failing_reward(completion, answer, **kw): + self.scoring_call_count += 1 + if self.scoring_call_count <= self.fail_count: + raise self.error_type( + f"Simulated scoring failure {self.scoring_call_count}/{self.fail_count}" + ) + return 1.0 + + super().__init__(funcs=[failing_reward], **kwargs) + + +class TestScoringRetry: + """Test cases for scoring retry functionality.""" + + @pytest.mark.asyncio + async def test_scoring_retry_after_retryable_error(self, mock_client, make_input): + """Scoring retries on InfraError, succeeds after failures.""" + dataset = Dataset.from_dict({"question": ["test"], "answer": ["test"]}) + rubric = RetryScoringRubric(fail_count=2) + env = SimpleEnvironment( + dataset=dataset, parser=Parser(), rubric=rubric, score_rollouts=True + ) + + inputs = [make_input()] + outputs = await env.generate( + inputs, client=mock_client, model="test-model", max_scoring_retries=3 + ) + + assert rubric.scoring_call_count == 3 # 2 failures + 1 success + assert outputs["outputs"][0].get("error") is None + assert outputs["outputs"][0]["reward"] == 1.0 + + @pytest.mark.asyncio + async def test_scoring_no_retry_after_non_retryable_error( + self, mock_client, make_input + ): + """Non-retryable error type is NOT retried during scoring.""" + dataset = Dataset.from_dict({"question": ["test"], "answer": ["test"]}) + rubric = RetryScoringRubric(fail_count=10, error_type=vf.ToolError) + env = SimpleEnvironment( + dataset=dataset, parser=Parser(), rubric=rubric, score_rollouts=True + ) + + inputs = [make_input()] + outputs = await env.generate( + inputs, client=mock_client, model="test-model", max_scoring_retries=3 + ) + + assert rubric.scoring_call_count == 1 # No retries for non-retryable error + # ToolError is stored in state but not retried; reward falls back to 0.0 + assert outputs["outputs"][0]["reward"] == 0.0 + + @pytest.mark.asyncio + async def test_scoring_error_after_retries_exhausted( + self, mock_client, make_input + ): + """Error persists after all scoring retries exhausted.""" + dataset = Dataset.from_dict({"question": ["test"], "answer": ["test"]}) + rubric = RetryScoringRubric(fail_count=10) + env = SimpleEnvironment( + dataset=dataset, parser=Parser(), rubric=rubric, score_rollouts=True + ) + + inputs = [make_input()] + outputs = await env.generate( + inputs, client=mock_client, model="test-model", max_scoring_retries=2 + ) + + assert rubric.scoring_call_count == 3 # 1 initial + 2 retries + assert outputs["outputs"][0].get("error") is not None + assert outputs["outputs"][0]["error"]["error"] == "InfraError" + + @pytest.mark.asyncio + async def test_independent_rollout_and_scoring_retries( + self, mock_client, make_input + ): + """Rollout and scoring use independent retry counts.""" + dataset = Dataset.from_dict({"question": ["test"], "answer": ["test"]}) + rubric = RetryScoringRubric(fail_count=1) + env = RetryCounterEnv( + fail_count=1, + dataset=dataset, + parser=Parser(), + rubric=rubric, + score_rollouts=True, + ) + + inputs = [make_input()] + outputs = await env.generate( + inputs, + client=mock_client, + model="test-model", + max_rollout_retries=2, + max_scoring_retries=2, + ) + + assert env.call_counts[0] == 2 # 1 failure + 1 success for rollout + assert rubric.scoring_call_count == 2 # 1 failure + 1 success for scoring + assert outputs["outputs"][0].get("error") is None + assert outputs["outputs"][0]["reward"] == 1.0 + + class TestEmptyModelResponseErrors: """Test cases for empty and invalid model response error handling.""" diff --git a/verifiers/envs/environment.py b/verifiers/envs/environment.py index 845b7b85f..4d3a89abd 100644 --- a/verifiers/envs/environment.py +++ b/verifiers/envs/environment.py @@ -88,6 +88,14 @@ _MESSAGE_TYPE_UNSET = object() +def _copy_state_for_scoring(state: "State") -> "State": + """Shallow-copy a state, duplicating only the mutable objects that scoring writes to.""" + copied = State(state) + copied["timing"] = dict(state.get("timing", {})) + copied["trajectory"] = [dict(t) for t in state.get("trajectory", [])] + return copied + + class Environment(ABC): """ Base class for all environments. @@ -750,8 +758,10 @@ async def run_rollout_attempt() -> State: ) state = await maybe_retry(run_rollout_attempt, max_retries=effective_rollout_retries)() + rollout_state = _copy_state_for_scoring(state) async def run_scoring_attempt() -> State: + state = _copy_state_for_scoring(rollout_state) if self.score_rollouts: await self.rubric.score_rollout(state) else: @@ -818,8 +828,11 @@ async def run_group_rollout_attempt() -> list[State]: return await asyncio.gather(*rollout_tasks) group_states = await maybe_retry(run_group_rollout_attempt, max_retries=effective_rollout_retries)() + rollout_group_states = [_copy_state_for_scoring(s) for s in group_states] async def run_group_scoring_attempt() -> list[State]: + group_states = [_copy_state_for_scoring(s) for s in rollout_group_states] + if self.score_rollouts: await self.rubric.score_group(group_states) else: