Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 284 additions & 8 deletions src/converter/anthropic2gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,163 @@ def clean_json_schema(schema: Any) -> Any:
return cleaned


# ============================================================================
# 4a. WebSearch 工具检测和分离
# ============================================================================

_WEB_SEARCH_TOOL_PREFIX = "web_search_"


def _is_web_search_tool(tool: Dict[str, Any]) -> bool:
"""检查工具是否为 Anthropic web_search 类型"""
tool_type = tool.get("type", "")
return isinstance(tool_type, str) and tool_type.startswith(_WEB_SEARCH_TOOL_PREFIX)


def separate_web_search_tools(
anthropic_tools: Optional[List[Dict[str, Any]]]
) -> tuple:
"""
从 Anthropic tools[] 中分离 web_search 工具和 function 工具

Args:
anthropic_tools: Anthropic 格式的工具列表

Returns:
(web_search_config, function_tools):
- web_search_config: dict 或 None (googleSearch 配置)
- function_tools: list 或 None (function 工具,用于 convert_tools)
"""
if not anthropic_tools:
return None, None

web_search_tool = None
function_tools: List[Dict[str, Any]] = []

for tool in anthropic_tools:
if not isinstance(tool, dict):
function_tools.append(tool)
continue
if _is_web_search_tool(tool):
if web_search_tool is not None:
log.warning("[WEB_SEARCH] Multiple web_search tools found, using the first one")
continue
web_search_tool = tool
else:
function_tools.append(tool)

if web_search_tool is None:
return None, anthropic_tools # 无 web_search,原样返回

# 记录不支持的参数
unsupported = []
if web_search_tool.get("allowed_domains"):
unsupported.append("allowed_domains")
if web_search_tool.get("blocked_domains"):
unsupported.append("blocked_domains")
if web_search_tool.get("user_location"):
unsupported.append("user_location")
if unsupported:
log.warning(
f"[WEB_SEARCH] Unsupported Anthropic params ignored: {unsupported}. "
f"Gemini googleSearch does not support domain filtering or user_location."
)

# GCLI/Antigravity 使用简化 googleSearch: {}
return {}, function_tools or None


# ============================================================================
# 4b. GroundingMetadata → Anthropic 响应块 转换
# ============================================================================


def _extract_grounding_metadata(
response_data: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""从 Gemini 响应中提取 groundingMetadata"""
candidates = response_data.get("candidates", []) or []
if not candidates:
return None
candidate = candidates[0] or {}
return candidate.get("groundingMetadata")


def _build_web_search_tool_result(
grounding_meta: Dict[str, Any],
tool_use_id: str,
) -> Dict[str, Any]:
"""将 Gemini groundingChunks 转换为 Anthropic web_search_tool_result 块"""
chunks = grounding_meta.get("groundingChunks", []) or []
results = []
for chunk in chunks:
if not isinstance(chunk, dict):
continue
web = chunk.get("web", {}) or {}
uri = web.get("uri", "")
title = web.get("title", "")
if uri or title:
results.append({
"type": "web_search_result",
"url": uri,
"title": title,
})

return {
"type": "web_search_tool_result",
"tool_use_id": tool_use_id,
"content": results,
}


def _build_citations_for_text(
text: str,
grounding_meta: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""
根据 groundingSupports 为 text 块构建 citations 列表
将 segment 匹配到 groundingChunks 以构建 web_search_result_location
"""
supports = grounding_meta.get("groundingSupports", []) or []
chunks = grounding_meta.get("groundingChunks", []) or []

citations: List[Dict[str, Any]] = []
for support in supports:
if not isinstance(support, dict):
continue
segment = support.get("segment", {}) or {}
cited_text = segment.get("text", "")
indices = support.get("groundingChunkIndices", []) or []

# 取第一个关联的 chunk 的 url/title
url = ""
title = ""
if indices and 0 <= indices[0] < len(chunks):
chunk = chunks[indices[0]]
if isinstance(chunk, dict):
web = chunk.get("web", {}) or {}
url = web.get("uri", "")
title = web.get("title", "")

if url and cited_text:
citations.append({
"type": "web_search_result_location",
"url": url,
"title": title,
"cited_text": cited_text[:150], # Anthropic 规范: 最多 150 字符
})

return citations


def _has_grounding_content(grounding_meta: Optional[Dict[str, Any]]) -> bool:
"""检查 groundingMetadata 是否包含有效内容"""
if not grounding_meta:
return False
chunks = grounding_meta.get("groundingChunks", []) or []
return len(chunks) > 0


# ============================================================================
# 4. Tools 转换
# ============================================================================
Expand Down Expand Up @@ -748,9 +905,17 @@ async def anthropic_to_gemini_request(payload: Dict[str, Any]) -> Dict[str, Any]

contents = reorganize_tool_messages(contents)

# 转换工具
tools = convert_tools(payload.get("tools"))

# 转换工具 — 先分离 web_search,再转换 function 工具
web_search_config, function_tools = separate_web_search_tools(payload.get("tools"))
tools = convert_tools(function_tools)

# 合并 googleSearch 到 tools(如有 web_search 工具)
if web_search_config is not None:
if tools is None:
tools = []
tools.append({"googleSearch": web_search_config})
log.debug("[ANTHROPIC2GEMINI] Injected googleSearch tool from web_search tool definition")

# 转换 tool_choice
tool_config = convert_tool_choice_to_tool_config(payload.get("tool_choice"))

Expand Down Expand Up @@ -806,6 +971,9 @@ def gemini_to_anthropic_response(
candidate = response_data.get("candidates", [{}])[0] or {}
parts = candidate.get("content", {}).get("parts", []) or []

# 提取 grounding metadata (web search results)
grounding_meta = _extract_grounding_metadata(response_data)

# 获取 usage metadata
usage_metadata = {}
if "usageMetadata" in response_data:
Expand All @@ -817,6 +985,27 @@ def gemini_to_anthropic_response(
content = []
has_tool_use = False

# 插入 web search grounding 块 (如果有)
if _has_grounding_content(grounding_meta):
tool_use_id = f"srvtoolu_{uuid.uuid4().hex}"
queries = grounding_meta.get("webSearchQueries", []) or []

# server_tool_use 块
content.append({
"type": "server_tool_use",
"id": tool_use_id,
"name": "web_search",
"input": {"query": queries[0] if queries else ""},
})

# web_search_tool_result 块
content.append(_build_web_search_tool_result(grounding_meta, tool_use_id))

log.debug(
f"[ANTHROPIC2GEMINI] Injected grounding blocks: "
f"{len(grounding_meta.get('groundingChunks', []) or [])} sources"
)

for part in parts:
if not isinstance(part, dict):
continue
Expand All @@ -839,7 +1028,13 @@ def gemini_to_anthropic_response(

# 处理文本块
if "text" in part:
content.append({"type": "text", "text": part.get("text", "")})
text_block = {"type": "text", "text": part.get("text", "")}
# 添加 citations (from groundingSupports)
if _has_grounding_content(grounding_meta):
citations = _build_citations_for_text(text_block["text"], grounding_meta)
if citations:
text_block["citations"] = citations
content.append(text_block)
continue

# 处理工具调用
Expand Down Expand Up @@ -895,6 +1090,16 @@ def gemini_to_anthropic_response(

# 构建 Anthropic 响应
message_id = f"msg_{uuid.uuid4().hex}"
usage_dict: Dict[str, Any] = {
"input_tokens": int(input_tokens or 0),
"output_tokens": int(output_tokens or 0),
}

# 添加 web_search_requests 计数
if _has_grounding_content(grounding_meta):
usage_dict["server_tool_use"] = {
"web_search_requests": 1
}

return {
"id": message_id,
Expand All @@ -904,10 +1109,7 @@ def gemini_to_anthropic_response(
"content": content,
"stop_reason": stop_reason,
"stop_sequence": None,
"usage": {
"input_tokens": int(input_tokens or 0),
"output_tokens": int(output_tokens or 0),
},
"usage": usage_dict,
}


Expand Down Expand Up @@ -945,6 +1147,7 @@ async def gemini_stream_to_anthropic_stream(
input_tokens = 0
output_tokens = 0
finish_reason: Optional[str] = None
grounding_meta: Optional[Dict[str, Any]] = None # web search grounding metadata

def _sse_event(event: str, data: Dict[str, Any]) -> bytes:
"""生成 SSE 事件"""
Expand Down Expand Up @@ -1005,6 +1208,14 @@ def _close_block() -> Optional[bytes]:
candidate = (response.get("candidates", []) or [{}])[0] or {}
parts = (candidate.get("content", {}) or {}).get("parts", []) or []

# 提取 grounding metadata (可能在任意 chunk 中出现)
if not grounding_meta:
candidate_gm = candidate.get("groundingMetadata")
if candidate_gm:
grounding_meta = candidate_gm
log.debug(f"[GEMINI_TO_ANTHROPIC] Found groundingMetadata with "
f"{len(grounding_meta.get('groundingChunks', []) or [])} sources")

# 更新 usage metadata
if "usageMetadata" in response:
usage = response["usageMetadata"]
Expand Down Expand Up @@ -1220,6 +1431,69 @@ def _close_block() -> Optional[bytes]:
f"input_tokens={input_tokens}, output_tokens={output_tokens}"
)

# 发送 web search grounding 事件 (如果有)
if _has_grounding_content(grounding_meta):
tool_use_id = f"srvtoolu_{uuid.uuid4().hex}"
queries = grounding_meta.get("webSearchQueries", []) or [] # type: ignore[union-attr]

# server_tool_use content_block_start
current_block_index += 1
yield _sse_event(
"content_block_start",
{
"type": "content_block_start",
"index": current_block_index,
"content_block": {
"type": "server_tool_use",
"id": tool_use_id,
"name": "web_search",
"input": {},
},
},
)
# input_json_delta
yield _sse_event(
"content_block_delta",
{
"type": "content_block_delta",
"index": current_block_index,
"delta": {
"type": "input_json_delta",
"partial_json": json.dumps(
{"query": queries[0] if queries else ""},
ensure_ascii=False, separators=(",", ":")
),
},
},
)
# close server_tool_use
yield _sse_event(
"content_block_stop",
{"type": "content_block_stop", "index": current_block_index},
)

# web_search_tool_result content_block_start
current_block_index += 1
result_block = _build_web_search_tool_result(grounding_meta, tool_use_id) # type: ignore[arg-type]
yield _sse_event(
"content_block_start",
{
"type": "content_block_start",
"index": current_block_index,
"content_block": result_block,
},
)
# close web_search_tool_result (single complete block)
yield _sse_event(
"content_block_stop",
{"type": "content_block_stop", "index": current_block_index},
)

log.debug(
f"[GEMINI_TO_ANTHROPIC] Injected stream grounding events: "
f"{len(grounding_meta.get('groundingChunks', []) or [])} sources" # type: ignore[union-attr]
)

# 发送 message_delta 和 message_stop
yield _sse_event(
"message_delta",
Expand All @@ -1228,6 +1502,8 @@ def _close_block() -> Optional[bytes]:
"delta": {"stop_reason": stop_reason, "stop_sequence": None},
"usage": {
"output_tokens": output_tokens,
**({"server_tool_use": {"web_search_requests": 1}}
if _has_grounding_content(grounding_meta) else {}),
},
},
)
Expand Down
Loading