-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtools.py
More file actions
executable file
·275 lines (222 loc) · 9.46 KB
/
tools.py
File metadata and controls
executable file
·275 lines (222 loc) · 9.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
"""
Pythonnet Tool Registry - MCP Client
====================================
Connects to headless Pythonnet MCP server via HTTP.
Tool definitions are fetched from the server (single source of truth).
"""
import requests
import json
import uuid
import time
import logging
import os
from typing import Optional, Dict, Any, Type, List
logger = logging.getLogger(__name__)
# Suppress annoying FastMCP session cleanup errors
logging.getLogger("mcp.server.streamable_http").setLevel(logging.CRITICAL)
# Configuration for Docker MCP server
# Use Docker service name when running in container, localhost when on host
PYTHONNET_MCP_URL = os.getenv("PYTHONNET_MCP_URL")
if not PYTHONNET_MCP_URL:
if os.path.exists("/.dockerenv"):
# Running inside Docker container - use service name
PYTHONNET_MCP_URL = "http://pythonnet-mcp:8001/mcp"
else:
# Running on host - use localhost
PYTHONNET_MCP_URL = "http://localhost:8001/mcp"
# Cache for tool definitions (avoid repeated HTTP calls)
_tools_cache: Optional[List[Dict[str, Any]]] = None
_tools_cache_time: float = 0
_TOOLS_CACHE_TTL: float = 300.0 # 5 minutes
def _list_pythonnet_tools(base_url: str, timeout: int = 10) -> List[Dict[str, Any]]:
"""Fetch tool definitions from MCP server via tools/list.
This makes tools.py a proper MCP client - schemas come from the server,
not hardcoded duplicates.
Args:
base_url: MCP server URL (e.g., http://localhost:8001/mcp)
timeout: Request timeout in seconds
Returns:
List of tool definitions with name, description, inputSchema
"""
try:
response = requests.post(
base_url,
headers={"Content-Type": "application/json", "Accept": "application/json"},
json={"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
timeout=timeout,
)
result = response.json()
if result.get("error"):
logger.warning(f"[PYTHONNET] tools/list error: {result['error']}")
return []
tools = result.get("result", {}).get("tools", [])
logger.info(f"[PYTHONNET] Fetched {len(tools)} tools from MCP server")
return tools
except requests.exceptions.Timeout:
logger.error(f"[PYTHONNET] tools/list timed out ({timeout}s)")
return []
except requests.exceptions.ConnectionError:
logger.error("[PYTHONNET] Cannot connect to MCP server for tools/list")
return []
except Exception as e:
logger.error(f"[PYTHONNET] tools/list failed: {e}")
return []
def _call_pythonnet_mcp(
base_url: str,
tool_name: str,
arguments: dict,
timeout: int = 60,
session_id: Optional[str] = None,
) -> str:
"""Call pythonnet MCP tool via HTTP."""
# Generate unique session ID if not provided
if session_id is None:
session_id = f"sara-{uuid.uuid4().hex[:8]}"
try:
response = requests.post(
f"{base_url}?session_id={session_id}",
headers={
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
},
json={
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
},
timeout=timeout,
)
result = response.json()
if result.get("error"):
error_msg = result["error"].get("message", "Unknown error")
return f"Error: {error_msg}"
# Extract result from MCP response
content = result.get("result", {})
# Try structuredContent first (cleaner), fall back to text content
if "structuredContent" in content:
return json.dumps(content["structuredContent"], indent=2)
elif "content" in content and isinstance(content["content"], list):
# Extract text from content blocks
texts = []
for block in content["content"]:
if isinstance(block, dict) and block.get("type") == "text":
texts.append(block["text"])
return "\n".join(texts)
return str(content)
except requests.exceptions.Timeout:
return f"Error: Pythonnet MCP request timed out ({timeout}s)"
except requests.exceptions.ConnectionError:
return "Error: Cannot connect to Pythonnet MCP server. Is Docker running on localhost:8001?"
except Exception as e:
return f"Error calling Pythonnet MCP: {str(e)}"
class PythonnetToolRegistry:
"""MCP client for Pythonnet tool set via Docker HTTP.
This class fetches tool definitions from the MCP server (server.py)
rather than hardcoding them. The server is the single source of truth.
NOTE: This class is intentionally *instance-based*.
Each analysis run should create its own instance with its own session_id
to avoid cross-run lock conflicts.
Backward compatibility:
- is_available() and get_tool_definitions() remain static
- execute_tool() is an instance method
"""
def __init__(
self, session_id: Optional[str] = None, base_url: Optional[str] = None
):
self.session_id = session_id or f"sara-{uuid.uuid4().hex[:8]}"
self.base_url = base_url or PYTHONNET_MCP_URL
@staticmethod
def get_policy() -> Optional[Type]:
"""Return the policy class for Pythonnet tools."""
try:
from services.pythonnet.policy import PythonnetToolPolicy
return PythonnetToolPolicy
except ImportError:
return None
@staticmethod
def is_available() -> bool:
"""Check if Pythonnet Docker server is available"""
try:
url = f"{PYTHONNET_MCP_URL}?session_id=health-check"
logger.debug(f"[PYTHONNET] Health check: {url}")
response = requests.get(url, timeout=5)
logger.debug(f"[PYTHONNET] Response: {response.status_code}")
result = response.status_code in [
200,
400,
406,
] # Any response means server is up
if not result:
logger.warning(f"[PYTHONNET] Unexpected status: {response.status_code}")
return result
except Exception as e:
logger.error(f"[PYTHONNET] Health check error: {e}")
return False
@staticmethod
def get_tool_definitions() -> list[dict]:
"""Fetch tool definitions from MCP server.
This is a proper MCP client - schemas come from the server via tools/list,
not hardcoded duplicates. The server (server.py) is the single source of truth.
Returns:
List of tool definitions with name, description, inputSchema
"""
global _tools_cache, _tools_cache_time
# Return cached tools if still valid
if (
_tools_cache is not None
and (time.time() - _tools_cache_time) < _TOOLS_CACHE_TTL
):
return _tools_cache
# Fetch fresh tools from server
tools = _list_pythonnet_tools(PYTHONNET_MCP_URL)
# Add client-side metadata that the server doesn't know about
for tool in tools:
# pythonnet_load_binary should never be extracted from LLM narration
if tool.get("name") == "pythonnet_load_binary":
tool["extractable"] = False
# Update cache
_tools_cache = tools
_tools_cache_time = time.time()
return tools
def execute_tool(self, tool_name: str, tool_input: dict) -> str:
"""
Execute a Pythonnet tool via Docker HTTP MCP server
Args:
tool_name: Tool name with pythonnet_ prefix
tool_input: Tool parameters
Returns:
Execution result as string
"""
try:
# Use longer timeout for binary loading (5 minutes)
timeout = 300 if tool_name == "pythonnet_load_binary" else 60
# Path normalisation for pythonnet_load_binary.
# Both SARA and the pythonnet-mcp container mount the host directory
# ./data/samples at /data/samples, so the path is already correct in
# the normal case. We only handle two edge cases:
# - bare filename with no directory component → prepend /data/samples/
# - legacy relative path "data/samples/file" → make absolute
if tool_name == "pythonnet_load_binary" and "binary_path" in tool_input:
path = tool_input["binary_path"]
# Handle legacy relative path: data/samples/file.dll -> /data/samples/file.dll
if path.startswith("data/samples/"):
tool_input["binary_path"] = f"/{path}"
logger.info(
f"[PYTHONNET] Path normalised: {path} -> {tool_input['binary_path']}"
)
# Bare filenames are resolved server-side (searches /data/samples/ then /data/pythonnet_projects/_uploads/)
# Call the MCP server with session tracking
result = _call_pythonnet_mcp(
self.base_url,
tool_name,
tool_input,
timeout=timeout,
session_id=self.session_id,
)
return result
except Exception as e:
import traceback
return f"Error executing {tool_name}: {str(e)}\n{traceback.format_exc()}"
# Export the registry
__all__ = ["PythonnetToolRegistry", "PYTHONNET_MCP_URL"]