From 80f1c98a71a91eca986d4825e3aa99db8e2ccd27 Mon Sep 17 00:00:00 2001 From: zho Date: Thu, 12 Mar 2026 01:24:30 +0800 Subject: [PATCH 1/5] #56-added the processing for private api key --- pinecone_rag/config.py | 78 ++- pinecone_rag/ingestion.py | 639 ++++++++++++++---- pinecone_rag/preprocessor/utility.py | 11 + .../preprocessor/wg21paper_preprocessor.py | 12 +- 4 files changed, 596 insertions(+), 144 deletions(-) diff --git a/pinecone_rag/config.py b/pinecone_rag/config.py index d983cd3..69b5304 100644 --- a/pinecone_rag/config.py +++ b/pinecone_rag/config.py @@ -6,8 +6,17 @@ from typing import Optional from dataclasses import dataclass, field +from enum import Enum import os + +class PineconeInstance(str, Enum): + """Selects which Pinecone API key (instance) to use for index/upsert operations.""" + + PUBLIC = "public" + PRIVATE = "private" + + try: from dotenv import load_dotenv except ImportError: @@ -25,17 +34,18 @@ class PineconeConfig: IS_TEST = False api_key: str = os.getenv("PINECONE_API_KEY", "your-pinecone-api-key-here") + private_api_key: str = os.getenv("PINECONE_PRIVATE_API_KEY", "") environment: str = os.getenv("PINECONE_ENVIRONMENT", "us-central1") cloud: str = os.getenv("PINECONE_CLOUD", "gcp") index_name: str = os.getenv("PINECONE_INDEX_NAME", "rag-hybrid") # Note: dimension is automatically set by Pinecone when using integrated embeddings top_k: int = 5 # Default number of documents to retrieve - chunk_size: int = 1000 + chunk_size: int = 2000 # Document chunk size in characters # Note: Chunk size compatibility: # - llama-text-embed-v2 (dense): max 2048 tokens, recommends 400-500 tokens (~1600-2000 chars) - # - pinecone-sparse-english-v0 (sparse): default 512 tokens, max 2048 tokens (if configured) + # - pinecone-sparse-english-v0 (sparse): default 512 tokens, max 2048 tokens — configured via sparse_max_tokens # Current 1000 chars (~200-250 tokens) is safe for both models but smaller than optimal for dense. chunk_overlap: int = 200 # Chunk overlap in characters @@ -67,6 +77,18 @@ class EmbeddingConfig: pinecone_sparse_model: str = os.getenv( "PINECONE_SPARSE_EMBEDDING_MODEL", "pinecone-sparse-english-v0" ) + # Truncation when input exceeds model limit: "END" (truncate) or "NONE" (error). + # See https://docs.pinecone.io/reference/api/2025-10/control-plane/create_for_model + embed_truncate: str = os.getenv("PINECONE_EMBED_TRUNCATE", "END") + # input_type for upsert; use "passage" for documents. + embed_write_input_type: str = os.getenv( + "PINECONE_EMBED_WRITE_INPUT_TYPE", "passage" + ) + # input_type for search; use "query" for search text. + embed_read_input_type: str = os.getenv("PINECONE_EMBED_READ_INPUT_TYPE", "query") + # Max tokens per sequence for the sparse model (default 512, max 2048). + # See https://docs.pinecone.io/reference/api/2025-10/control-plane/create_for_model + sparse_max_tokens: int = int(os.getenv("PINECONE_SPARSE_MAX_TOKENS", "2048")) @dataclass @@ -80,6 +102,17 @@ class CacheConfig: cache_dir: str = "./cache" # Directory for disk-based cache +@dataclass +class SourceBaseConfig: + """Base configuration for all source types""" + + data_dir: str = os.getenv("DATA_DIR", "data") + md_data_dir: str = os.getenv("MD_DATA_DIR", "data/markdown") + namespace: str = os.getenv("NAMESPACE", "namespace") + is_chunked: bool = False + pinecone_instance: PineconeInstance = PineconeInstance.PUBLIC + + @dataclass class LoggingConfig: """Logging configuration""" @@ -90,19 +123,27 @@ class LoggingConfig: @dataclass -class MailConfig: +class MailConfig(SourceBaseConfig): """Mail preprocessor configuration""" # mail_data_dir: str = os.getenv("MAIL_DATA_DIR", "data/message_by_thread/en") - mail_data_dir: str = os.getenv("MAIL_DATA_DIR", "data/message_by_thread/json") - markdown_data_dir: str = os.getenv( - "MARKDOWN_DATA_DIR", "data/message_by_thread/markdown" - ) + data_dir: str = os.getenv("MAIL_DATA_DIR", "data/message_by_thread/json") + md_data_dir: str = os.getenv("MARKDOWN_DATA_DIR", "data/message_by_thread/markdown") namespace: str = os.getenv("MAIL_NAMESPACE", "mailing") @dataclass -class DocuConfig: +class ReflectorConfig(MailConfig): + """Reflector preprocessor configuration""" + + data_dir: str = os.getenv("REFLECTOR_DATA_DIR", "data/reflector/json") + md_data_dir: str = os.getenv("REFLECTOR_MD_DATA_DIR", "data/reflector/markdown") + namespace: str = os.getenv("REFLECTOR_NAMESPACE", "wg21-reflector") + pinecone_instance: PineconeInstance = PineconeInstance.PRIVATE + + +@dataclass +class DocuConfig(SourceBaseConfig): """Documentation preprocessor configuration""" raw_data_dir: str = os.getenv("RAW_DATA_DIR", "data/cpp_documentation") @@ -111,7 +152,7 @@ class DocuConfig: @dataclass -class SlackConfig: +class SlackConfig(SourceBaseConfig): """Slack database configuration""" db_host: str = os.getenv("SLACK_DB_HOST", "localhost") @@ -128,7 +169,7 @@ class SlackConfig: @dataclass -class WG21Config: +class WG21Config(SourceBaseConfig): """WG21 paper preprocessor configuration""" data_dir: str = os.getenv("WG21_DATA_DIR", "data/wg21_paper_1989_2025") @@ -136,34 +177,35 @@ class WG21Config: @dataclass -class YouTubeConfig: +class YouTubeConfig(SourceBaseConfig): """YouTube video preprocessor configuration""" transcripts_dir: str = os.getenv( "YOUTUBE_TRANSCRIPTS_DIR", "data/youtube/transcripts" ) metainfo_json_dir: str = os.getenv( - "YOUTUBE_METAINFO_JSON_DIR", "data/youtube/metainfo/missing_json" + "YOUTUBE_METAINFO_JSON_DIR", "data/youtube/metainfo" ) metainfo_raw_dir: Optional[str] = os.getenv("YOUTUBE_METAINFO_RAW_DIR", None) namespace: str = os.getenv("YOUTUBE_NAMESPACE", "youtube-scripts") + is_chunked: bool = True @dataclass -class BlogPdfConfig: +class BlogPdfConfig(SourceBaseConfig): """Blog PDF preprocessor configuration (e.g. Bjarne Stroustrup papers)""" data_dir: str = os.getenv( "BLOG_PDF_DATA_DIR", "data/blog-posts/Bjarne Stroustrup", ) - namespace: str = os.getenv("BLOG_PDF_NAMESPACE", "stroustrup-papers") + namespace: str = os.getenv("BLOG_PDF_NAMESPACE", "blog-posts") author: str = os.getenv("BLOG_PDF_AUTHOR", "Bjarne Stroustrup") source_url: str = os.getenv("BLOG_PDF_SOURCE_URL", "https://www.stroustrup.com") @dataclass -class BlogConfig: +class BlogConfig(SourceBaseConfig): """Blog preprocessor configuration (JSON + PDF under data/blog-posts)""" data_dir: str = os.getenv("BLOG_DATA_DIR", "data/blog-posts") @@ -176,11 +218,11 @@ class BlogConfig: @dataclass -class GitConfig: +class GitConfig(SourceBaseConfig): """GitHub issue/PR preprocessor configuration""" - data_dir: str = os.getenv("GIT_DATA_DIR", "data/github") - namespace: str = os.getenv("GIT_NAMESPACE", "github-compiler") + data_dir: str = os.getenv("GIT_DATA_DIR", "data/github/Clang") + namespace: str = os.getenv("GIT_NAMESPACE", "github-clang") min_content_length: int = int(os.getenv("GIT_MIN_CONTENT_LENGTH", "10")) diff --git a/pinecone_rag/ingestion.py b/pinecone_rag/ingestion.py index 170651e..fa924f5 100644 --- a/pinecone_rag/ingestion.py +++ b/pinecone_rag/ingestion.py @@ -9,8 +9,11 @@ import logging import re +import requests from typing import List, Dict, Any, Optional import hashlib +from enum import Enum +from tqdm import tqdm try: from pinecone import Pinecone @@ -24,20 +27,47 @@ else: _IMPORT_ERROR = None -from config import PineconeConfig, EmbeddingConfig +from config import PineconeConfig, EmbeddingConfig, PineconeInstance logger = logging.getLogger(__name__) +class ProcessType(str, Enum): + """Supported processing operations for Pinecone document handling.""" + + UPSERT = "upsert" + UPDATE_METADATA_BY_ID = "update_metadata_by_id" + UPDATE_DOCUMENT_BY_ID = "update_document_by_id" + UPDATE_VALUE_BY_ID = "update_value_by_id" + + @classmethod + def from_value(cls, value: "ProcessType | str") -> "ProcessType": + """Parse process type from enum or string value.""" + if isinstance(value, cls): + return value + try: + return cls(value) + except ValueError as exc: + allowed = ", ".join(v.value for v in cls) + raise ValueError( + f"Invalid process_type: {value}. Use one of: {allowed}" + ) from exc + + class PineconeIngestion: """Handles Pinecone index creation, document chunking, and vector operations.""" - def __init__(self): - """Initialize with configuration from environment variables.""" + def __init__(self, instance: PineconeInstance = PineconeInstance.PUBLIC): + """Initialize with configuration from environment variables. + + Args: + instance: Whether to use the public or private Pinecone API key. + """ self._validate_imports() self.config = PineconeConfig() self.embedding_config = EmbeddingConfig() + self.instance = instance self._setup_client() self._initialize_text_splitter() @@ -45,7 +75,8 @@ def __init__(self): logger.info( f"Using Pinecone hybrid search with dense model: {self.embedding_config.pinecone_model} " - f"and sparse model: {self.embedding_config.pinecone_sparse_model}" + f"and sparse model: {self.embedding_config.pinecone_sparse_model} " + f"(instance: {self.instance.value})" ) def _validate_imports(self) -> None: @@ -81,9 +112,11 @@ def _ensure_pinecone_client(self) -> None: """Initialize Pinecone client if needed.""" if not self._pc_initialized: try: - self.pc = Pinecone(api_key=self.config.api_key) + self.pc = Pinecone(api_key=self._active_api_key) self._pc_initialized = True - logger.info("Pinecone client initialized") + logger.info( + "Pinecone client initialized (instance: %s)", self.instance.value + ) except Exception as e: logger.error(f"Failed to initialize Pinecone client: {e}") raise ConnectionError( @@ -91,6 +124,13 @@ def _ensure_pinecone_client(self) -> None: f"Error: {e}" ) from e + @property + def _active_api_key(self) -> str: + """Return the API key for the currently selected instance.""" + if self.instance == PineconeInstance.PRIVATE: + return self.config.private_api_key + return self.config.api_key + def _get_or_create_indexes(self) -> None: """Get existing indexes or create new ones if they don't exist.""" if self._dense_index_initialized and self._sparse_index_initialized: @@ -151,7 +191,7 @@ def _create_new_indexes( dense_name, self.embedding_config.pinecone_model ) if sparse_name not in existing_indexes: - self._create_pinecone_index( + self._create_sparse_pinecone_index( sparse_name, self.embedding_config.pinecone_sparse_model ) @@ -161,10 +201,48 @@ def _create_new_indexes( self._handle_index_creation_error(create_error) def _create_pinecone_index(self, index_name: str, model_name: str) -> None: - """Create Pinecone index with embedding model.""" + """Create Pinecone index with embedding model and write/read parameters.""" logger.info(f"Creating index '{index_name}' with model: {model_name}") if self.pc is None: raise RuntimeError("Pinecone client not initialized") + write_params = { + "input_type": self.embedding_config.embed_write_input_type, + "truncate": self.embedding_config.embed_truncate, + } + read_params = { + "input_type": self.embedding_config.embed_read_input_type, + "truncate": self.embedding_config.embed_truncate, + } + self.pc.create_index_for_model( + name=index_name, + cloud=self.config.cloud, + region=self.config.environment, + embed={ + "model": model_name, + "field_map": {"text": "chunk_text"}, + "write_parameters": write_params, + "read_parameters": read_params, + }, + ) + + def _create_sparse_pinecone_index(self, index_name: str, model_name: str) -> None: + """Create sparse Pinecone index with max_tokens_per_sequence configured.""" + logger.info( + f"Creating sparse index '{index_name}' with model: {model_name}, " + f"max_tokens_per_sequence: {self.embedding_config.sparse_max_tokens}" + ) + if self.pc is None: + raise RuntimeError("Pinecone client not initialized") + write_params = { + "input_type": self.embedding_config.embed_write_input_type, + "truncate": self.embedding_config.embed_truncate, + "max_tokens_per_sequence": self.embedding_config.sparse_max_tokens, + } + read_params = { + "input_type": self.embedding_config.embed_read_input_type, + "truncate": self.embedding_config.embed_truncate, + "max_tokens_per_sequence": self.embedding_config.sparse_max_tokens, + } self.pc.create_index_for_model( name=index_name, cloud=self.config.cloud, @@ -172,6 +250,8 @@ def _create_pinecone_index(self, index_name: str, model_name: str) -> None: embed={ "model": model_name, "field_map": {"text": "chunk_text"}, + "write_parameters": write_params, + "read_parameters": read_params, }, ) @@ -186,6 +266,73 @@ def _handle_index_creation_error(self, error: Exception) -> None: ) from error raise error + def _ensure_namespace_schema( + self, namespace: str, schema_fields: List[str] + ) -> None: + """Create a namespace with a filterable metadata schema on both indexes if absent. + + The namespace creation API (2025-10) accepts a schema that marks metadata + fields as filterable. This is called once per upsert session, before the + first batch is written. The call is idempotent: if the namespace already + exists it is skipped. + + Args: + namespace: The Pinecone namespace name. + schema_fields: Metadata field names to mark as filterable. + """ + if not namespace or not schema_fields: + return + + schema_payload = {"fields": {f: {"filterable": True} for f in schema_fields}} + + for index in (self.dense_index, self.sparse_index): + if index is None: + continue + try: + existing_names = {ns.name for ns in index.list_namespaces()} + if namespace in existing_names: + logger.debug( + "Namespace '%s' already exists on index '%s', skipping schema creation", + namespace, + getattr(index, "_index_name", "unknown"), + ) + continue + + index_host = index._config.host + # Strip any scheme prefix — index_host may already be "https://..." + host_clean = index_host.removeprefix("https://").removeprefix("http://") + response = requests.post( + f"https://{host_clean}/namespaces", + headers={ + "Api-Key": self._active_api_key, + "X-Pinecone-Api-Version": "2025-10", + "Content-Type": "application/json", + }, + json={"name": namespace, "schema": schema_payload}, + timeout=30, + ) + if response.status_code == 409: + logger.debug( + "Namespace '%s' already exists (409), skipping", namespace + ) + elif not response.ok: + logger.warning( + "Failed to create namespace '%s' with schema: %s %s", + namespace, + response.status_code, + response.text, + ) + else: + logger.info( + "Created namespace '%s' with schema fields: %s", + namespace, + schema_fields, + ) + except Exception as e: + logger.warning( + "Could not create namespace '%s' with schema: %s", namespace, e + ) + def _is_valid_chunk(self, text: str, min_length: int, min_words: int) -> bool: """Check if chunk is valid.""" if not text or len(text) < min_length: @@ -223,106 +370,247 @@ def _is_mostly_punctuation(self, text: str) -> bool: len(non_space_chars) > 0 and punctuation_chars / len(non_space_chars) > 0.5 ) - def upsert_documents( + def process_documents( self, documents: List[Document], namespace: Optional[str] = None, is_chunked: bool = False, + process_type: ProcessType | str = ProcessType.UPSERT, + keys: Optional[List[str]] = None, ) -> Dict[str, Any]: - """Upsert documents to Pinecone indexes. Returns statistics.""" + """ + Process documents with one of supported process types. + + Supported process types: + - upsert + - update_metadata_by_id + - update_document_by_id + - update_value_by_id + """ try: + process_mode = ProcessType.from_value(process_type) if not documents: - logger.warning("No documents to upsert") - return {"upserted": 0, "errors": [], "failed_documents": []} + logger.warning("No documents to process") + return { + "processed": 0, + "total": 0, + "errors": [], + "failed_documents": [], + "process_type": process_mode.value, + } self._ensure_indexes_ready() - if not is_chunked: - chunked_documents = self.text_splitter.split_documents(documents) - else: - chunked_documents = documents - total_upserted, errors, failed_documents = self._upsert_all_batches( - chunked_documents, namespace - ) - return self._build_upsert_result( - total_upserted, len(documents), errors, failed_documents - ) + records = self._prepare_records_to_process(documents, is_chunked) + + if process_mode == ProcessType.UPSERT: + if records: + schema_fields = [ + k for k in records[0] if k not in ("id", "chunk_text") + ] + self._ensure_namespace_schema(namespace, schema_fields) + total_upserted, errors, failed_documents = self._upsert_records( + records, namespace + ) + return self._build_process_result( + total_processed=total_upserted, + total=len(documents), + errors=errors, + failed_documents=failed_documents, + process_type=process_mode, + ) + + if process_mode == ProcessType.UPDATE_METADATA_BY_ID: + return self._update_by_id( + records=records, + namespace=namespace, + process_type=ProcessType.UPDATE_METADATA_BY_ID, + keys=keys, + ) + if process_mode == ProcessType.UPDATE_DOCUMENT_BY_ID: + return self._update_by_id( + records=records, + namespace=namespace, + process_type=ProcessType.UPDATE_DOCUMENT_BY_ID, + ) + if process_mode == ProcessType.UPDATE_VALUE_BY_ID: + return self._update_by_id( + records=records, + namespace=namespace, + process_type=ProcessType.UPDATE_VALUE_BY_ID, + ) + + raise ValueError(f"Unhandled process_type: {process_mode.value}") except Exception as e: - logger.error(f"Error in upsert_documents: {e}") + logger.error( + "Error in process_documents (type=%s): %s", + process_type, + e, + ) raise def _mark_batch_failed( - self, batch: List[Document], error: Exception, start_idx: int + self, batch_records: List[Dict[str, Any]], error: Exception, start_idx: int ) -> List[Dict[str, Any]]: """Mark batch documents as failed.""" failed = [] - for idx, doc in enumerate(batch): - meta = doc.metadata or {} + for idx, doc in enumerate(batch_records): failed.append( { - "doc_id": meta.get("doc_id", f"doc_{start_idx}_{idx}"), - "type": meta.get("type", "unknown"), + "doc_id": doc.get("id", doc.get("url", f"doc_{start_idx}_{idx}")), + "type": doc.get("type", "unknown"), "reason": f"Batch upsert failed: {str(error)}", - "text_length": len(doc.page_content) if doc.page_content else 0, - "metadata": meta, + "metadata": { + k: v for k, v in doc.items() if k not in ("id", "chunk_text") + }, } ) return failed - def _upsert_all_batches( + def _upsert_records( self, documents: List[Document], namespace: Optional[str], ) -> tuple[int, List[str], List[Dict[str, Any]]]: """Upsert all batches.""" total_upserted, errors, failed_docs = 0, [], [] + total_skipped_existing = 0 batch_size = self.config.batch_size - - for i in range(0, len(documents), batch_size): + batch_start_idxs = list(range(0, len(documents), batch_size)) + for batch_idx, i in enumerate(batch_start_idxs): batch = documents[i : i + batch_size] - batch_num = i // batch_size + 1 + batch_num = batch_idx + 1 try: - records = self._prepare_batch_records(batch, i) - - if not records: - logger.warning(f"Batch {batch_num}: no valid records") + filtered_batch, skipped_existing = self._filter_existing_records( + batch, namespace + ) + total_skipped_existing += skipped_existing + + if not filtered_batch: + logger.info( + "Skipped batch %s/%s: %s records already exist", + batch_num, + len(batch_start_idxs), + skipped_existing, + ) continue - self._upsert_batch(records, namespace, batch_num) - total_upserted += len(records) + self._upsert_batch(filtered_batch, namespace, batch_num) + total_upserted += len(filtered_batch) logger.info( - f"Upserted batch {batch_num}: {len(records)}/{len(batch)} documents" + "Upserted batch %s/%s: %s documents (skipped existing: %s)", + batch_num, + len(batch_start_idxs), + len(filtered_batch), + skipped_existing, ) + except Exception as e: error_msg = f"Error upserting batch {batch_num}: {e}" logger.error(error_msg) errors.append(error_msg) failed_docs.extend(self._mark_batch_failed(batch, e, i)) + if total_skipped_existing > 0: + logger.info( + "Skipped %s existing records during upsert", + total_skipped_existing, + ) + return total_upserted, errors, failed_docs - def _build_upsert_result( + def _extract_existing_ids_from_fetch_response( + self, fetch_response: Any + ) -> set[str]: + """Normalize fetch response and return existing record IDs.""" + if fetch_response is None: + return set() + + # SDK responses may expose vectors either as an attribute or as a dict key. + vectors = getattr(fetch_response, "vectors", None) + if vectors is None and isinstance(fetch_response, dict): + vectors = fetch_response.get("vectors") + if not vectors: + return set() + + # vectors is commonly a dict[id, record], but keep fallback for list-like values. + if isinstance(vectors, dict): + return {str(record_id) for record_id in vectors.keys()} + + existing_ids: set[str] = set() + try: + for item in vectors: + record_id = getattr(item, "id", None) + if record_id is None and isinstance(item, dict): + record_id = item.get("id") + if record_id is not None: + existing_ids.add(str(record_id)) + except TypeError: + return set() + return existing_ids + + def _filter_existing_records( self, - total_upserted: int, + records: List[Dict[str, Any]], + namespace: Optional[str], + ) -> tuple[List[Dict[str, Any]], int]: + """ + Remove records that already exist in the dense index. + + This prevents unnecessary re-embedding and duplicate upsert writes when + the same document IDs are processed again. + """ + if not records: + return records, 0 + + self._ensure_indexes_ready() + record_ids = [str(r["id"]) for r in records if "id" in r] + if not record_ids: + return records, 0 + + try: + fetch_response = self.dense_index.fetch(ids=record_ids, namespace=namespace) + existing_ids = self._extract_existing_ids_from_fetch_response( + fetch_response + ) + if not existing_ids: + return records, 0 + + filtered = [r for r in records if str(r.get("id")) not in existing_ids] + return filtered, len(records) - len(filtered) + except Exception as e: # pylint: disable=broad-exception-caught + # Do not block ingestion if existence checks fail unexpectedly. + logger.warning( + "Existing record check failed; proceeding with full upsert: %s", + e, + ) + return records, 0 + + def _build_process_result( + self, + total_processed: int, total: int, errors: List[str], failed_documents: List[Dict[str, Any]], + process_type: ProcessType, ) -> Dict[str, Any]: - """Build upsert result.""" + """Build process result.""" result = { - "upserted": total_upserted, + "processed": total_processed, + "upserted": total_processed if process_type == ProcessType.UPSERT else 0, "total": total, "errors": errors, "failed_documents": failed_documents, "failed_count": len(failed_documents), + "process_type": process_type.value, } logger.info( - f"Upsert complete: {result['upserted']}/{result['total']} documents, " + f"Process complete ({process_type.value}): {result['processed']}/{result['total']} documents, " f"{result['failed_count']} failed" ) if failed_documents: logger.warning( - f"Failed documents summary: {result['failed_count']} documents failed to upsert" + f"Failed documents summary: {result['failed_count']} documents failed to process" ) return result @@ -333,13 +621,22 @@ def _ensure_indexes_ready(self) -> None: if self.dense_index is None or self.sparse_index is None: raise RuntimeError("Pinecone indexes not initialized") - def _prepare_batch_records( - self, batch: List[Document], batch_start_idx: int + def _prepare_records_to_process( + self, documents: List[Document], is_chunked: bool ) -> List[Dict[str, Any]]: """Prepare batch records for upsert.""" records = [] + chunked_documents = ( + self.text_splitter.split_documents(documents) + if not is_chunked + else documents + ) - for doc in batch: + for doc in tqdm( + chunked_documents, + desc="Preparing records", + leave=False, + ): text = doc.page_content.strip() if doc.page_content else "" if not self._is_valid_chunk( text, self.config.min_text_length, self.config.min_words @@ -347,12 +644,18 @@ def _prepare_batch_records( continue metadata = doc.metadata or {} + if "author" in metadata: + if isinstance(metadata["author"], list): + text = f"Author: {', '.join(metadata['author'])}\n\n{text}" + elif metadata["author"] != "": + text = f"Author: {metadata['author']}\n\n{text}" + if "title" in metadata and metadata["title"] != "": text = f"Title: {metadata['title']}\n\n{text}" + elif "subject" in metadata and metadata["subject"] != "": + text = f"Subject: {metadata['subject']}\n\n{text}" - original_doc_id = metadata.get( - "doc_id", metadata.get("url", f"doc_{batch_start_idx}_{len(records)}") - ) + original_doc_id = metadata.get("doc_id", metadata.get("url")) if "start_index" in metadata: original_doc_id = f"{original_doc_id}_{metadata['start_index']}" else: @@ -391,96 +694,141 @@ def _upsert_batch( ) -> None: """Upsert batch to both indexes.""" self._ensure_indexes_ready() - self._upsert_to_index(self.dense_index, records, namespace, batch_num, "dense") - self._upsert_to_index( - self.sparse_index, records, namespace, batch_num, "sparse" - ) - - def _upsert_to_index( - self, - index: Any, - records: List[Dict[str, Any]], - namespace: Optional[str], - batch_num: int, - index_type: str, - ) -> None: - """Upsert to single index.""" try: - index.upsert_records(records=records, namespace=namespace) + self.dense_index.upsert_records(records=records, namespace=namespace) + self.sparse_index.upsert_records(records=records, namespace=namespace) except Exception as e: record_ids = [r.get("id", "unknown") for r in records] logger.error( - f"Failed to upsert batch {batch_num} to {index_type} index: {e}. " + f"Failed to upsert batch {batch_num} to dense and sparse indexes: {e}. " f"Records: {record_ids}" ) raise - def update_documents( + def _update_to_index( self, - documents: List[Document], - namespace: Optional[str] = None, - ) -> Dict[str, Any]: - """Update existing documents in Pinecone indexes.""" - try: - if not documents: - logger.warning("No documents to update") - return {"updated": 0, "errors": []} - - self._ensure_indexes_ready() - chunked_documents = self.chunk_documents(documents) - updated, errors = self._update_all_batches(chunked_documents, namespace) + index: Any, + doc_id: str, + namespace: Optional[str], + index_type: str, + values: Optional[List[float]] = None, + sparse_values: Optional[Dict[str, Any]] = None, + set_metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """Update a single record in one index by ID.""" + update_kwargs: Dict[str, Any] = {"id": doc_id, "namespace": namespace} + if values is not None: + update_kwargs["values"] = values + if sparse_values is not None: + update_kwargs["sparse_values"] = sparse_values + if set_metadata is not None: + update_kwargs["set_metadata"] = set_metadata - result = { - "updated": updated, - "total": len(documents), - "errors": errors, - } - logger.info( - f"Update complete: {result['updated']}/{result['total']} documents" - ) - return result + try: + index.update(**update_kwargs) except Exception as e: - logger.error(f"Error updating documents: {e}") + logger.error(f"Failed to update id={doc_id} in {index_type} index: {e}") raise - def _update_all_batches( + def _resolve_target_id(self, metadata: Dict[str, Any], fallback_idx: int) -> str: + """Resolve target vector ID from metadata.""" + return str( + metadata.get("id") + or metadata.get("vector_id") + or metadata.get("pinecone_id") + or f"doc_{fallback_idx}" + ) + + def _extract_metadata_update(self, metadata: Dict[str, Any]) -> Dict[str, Any]: + """Extract metadata patch payload.""" + if isinstance(metadata.get("metadata_update"), dict): + return metadata["metadata_update"] + if isinstance(metadata.get("set_metadata"), dict): + return metadata["set_metadata"] + + reserved_keys = { + "id", + "vector_id", + "pinecone_id", + "values", + "sparse_values", + "chunk_text", + } + return {k: v for k, v in metadata.items() if k not in reserved_keys} + + def _update_by_id( self, - documents: List[Document], + records: List[Dict[str, Any]], namespace: Optional[str], - ) -> tuple[int, List[str]]: - """Update all batches.""" - total_updated, errors = 0, [] - batch_size = self.config.batch_size - - for i in range(0, len(documents), batch_size): - batch = documents[i : i + batch_size] - batch_num = i // batch_size + 1 - try: - records, _ = self._prepare_batch_records(batch, i) - if not records: + keys: Any = None, + process_type: ProcessType = ProcessType.UPDATE_DOCUMENT_BY_ID, + ) -> Dict[str, Any]: + """Update metadata only for existing vectors by ID.""" + processed, errors, failed_documents = 0, [], [] + selected_keys = [] + if process_type == ProcessType.UPDATE_DOCUMENT_BY_ID: + selected_keys = [k for k in records[0].keys() if k != "id"] + elif process_type == ProcessType.UPDATE_METADATA_BY_ID: + selected_keys = keys or [ + k for k in records[0].keys() if k not in ("id", "chunk_text") + ] + elif process_type == ProcessType.UPDATE_VALUE_BY_ID: + selected_keys = ["chunk_text"] + else: + raise ValueError(f"Unsupported process type: {process_type}") + + for record in tqdm( + records, + desc=f"Updating by id ({process_type.value})", + leave=False, + ): + doc_id = record["id"] + update_metadata = {} + for key in selected_keys: + if key not in record: continue - self._update_batch(records, namespace, batch_num) - total_updated += len(records) + update_metadata[key] = record[key] + if not update_metadata: + failed_documents.append( + { + "doc_id": doc_id, + "type": record.get("type", "unknown"), + "reason": f"Key {key} not found in record", + } + ) + continue + try: + self.dense_index.update( + id=doc_id, + namespace=namespace, + set_metadata=update_metadata, + ) + self.sparse_index.update( + id=doc_id, + namespace=namespace, + set_metadata=update_metadata, + ) + processed += 1 except Exception as e: - error_msg = f"Error updating batch {batch_num}: {e}" + error_msg = f"Error updating metadata for id={doc_id}: {e}" logger.error(error_msg) errors.append(error_msg) + failed_documents.append( + { + "doc_id": record.get("doc_id", record.get("url")), + "type": record.get("type", "unknown"), + "reason": error_msg, + "text_length": len(record.get("chunk_text", "")), + } + ) - return total_updated, errors - - def _update_batch( - self, - records: List[Dict[str, Any]], - namespace: Optional[str], - batch_num: int, - ) -> None: - """Update batch in both indexes.""" - self._ensure_indexes_ready() - self._upsert_to_index(self.dense_index, records, namespace, batch_num, "dense") - self._upsert_to_index( - self.sparse_index, records, namespace, batch_num, "sparse" + return self._build_process_result( + total_processed=processed, + total=len(records), + errors=errors, + failed_documents=failed_documents, + process_type=process_type, ) - logger.info(f"Updated batch {batch_num}: {len(records)} documents") def delete_documents( self, @@ -580,6 +928,20 @@ def get_index_stats(self) -> Dict[str, Any]: """Get statistics about the Pinecone indexes.""" try: self._ensure_indexes_ready() + # spares_name=f"{self.config.index_name}-sparse" + # self.pc.configure_index( + # name=spares_name, + # embed={ + # "write_parameters": { + # "max_tokens_per_sequence": 2048, + # "truncate": "END", + # }, + # "read_parameters": { + # "max_tokens_per_sequence": 2048, + # "truncate": "END", + # }, + # }, + # ) dense_stats = self.dense_index.describe_index_stats() # type: ignore sparse_stats = self.sparse_index.describe_index_stats() # type: ignore return self._format_index_stats(dense_stats, sparse_stats) @@ -590,20 +952,45 @@ def get_index_stats(self) -> Dict[str, Any]: def _format_index_stats( self, dense_stats: Dict[str, Any], sparse_stats: Dict[str, Any] ) -> Dict[str, Any]: - """Format index statistics.""" + """Format index statistics and compute per-namespace dense vs sparse deltas.""" + dense_ns = dense_stats.get("namespaces", {}) or {} + sparse_ns = sparse_stats.get("namespaces", {}) or {} + + def _vector_count(ns_dict: Dict[str, Any], name: str) -> int: + entry = ns_dict.get(name) + if entry is None: + return 0 + if isinstance(entry, dict): + return int(entry.get("vector_count", 0) or 0) + return int(getattr(entry, "vector_count", 0) or 0) + + all_names = set(dense_ns) | set(sparse_ns) + namespace_deltas: List[Dict[str, Any]] = [] + for name in sorted(all_names): + d = _vector_count(dense_ns, name) + s = _vector_count(sparse_ns, name) + delta = d - s + namespace_deltas.append( + {"namespace": name, "dense": d, "sparse": s, "delta": delta} + ) + + mismatched = [x for x in namespace_deltas if x["delta"] != 0] + return { "dense_index": { "total_vectors": dense_stats.get("total_vector_count", 0), "dimension": dense_stats.get("dimension", 0), "index_fullness": dense_stats.get("index_fullness", 0), - "namespaces": dense_stats.get("namespaces", {}), + "namespaces": dense_ns, }, "sparse_index": { "total_vectors": sparse_stats.get("total_vector_count", 0), "dimension": sparse_stats.get("dimension", 0), "index_fullness": sparse_stats.get("index_fullness", 0), - "namespaces": sparse_stats.get("namespaces", {}), + "namespaces": sparse_ns, }, + "namespace_deltas": namespace_deltas, + "mismatched_namespaces": mismatched, } def _get_empty_stats(self, error_msg: str) -> Dict[str, Any]: @@ -618,4 +1005,6 @@ def _get_empty_stats(self, error_msg: str) -> Dict[str, Any]: "error": error_msg, "dense_index": empty_stats.copy(), "sparse_index": empty_stats.copy(), + "namespace_deltas": [], + "mismatched_namespaces": [], } diff --git a/pinecone_rag/preprocessor/utility.py b/pinecone_rag/preprocessor/utility.py index 68ffb60..5a1a464 100644 --- a/pinecone_rag/preprocessor/utility.py +++ b/pinecone_rag/preprocessor/utility.py @@ -315,15 +315,26 @@ def clean_text(text: str, remove_extra_spaces: bool = True) -> str: if not text: return "" + from html import unescape + + text = unescape(text) + # Remove soft hyphens and other invisible characters text = ( text.replace("\xad", "") .replace("\u200b", "") .replace("\u200c", "") .replace("\u200d", "") + .replace("\xa0", " ") + .replace("\u2002", " ") + .replace("\u2003", " ") + .replace("\u2026", "...") + .replace("\u202f", "") + .replace("\\u00A0", " ") ) # Normalize line breaks + text = re.sub(r"\s+", " ", text) text = re.sub(r"\r\n", "\n", text) # Windows line breaks text = re.sub(r"\r", "\n", text) # Old Mac line breaks diff --git a/pinecone_rag/preprocessor/wg21paper_preprocessor.py b/pinecone_rag/preprocessor/wg21paper_preprocessor.py index 5f14fca..b1188bc 100644 --- a/pinecone_rag/preprocessor/wg21paper_preprocessor.py +++ b/pinecone_rag/preprocessor/wg21paper_preprocessor.py @@ -14,6 +14,7 @@ from preprocessor.utility import ( get_timestamp_from_date, validate_content_length, + clean_text, ) @@ -59,6 +60,9 @@ def _load_metadata(self) -> Dict[str, Dict[str, Any]]: with open(self.metadata_file, "r", encoding="utf-8") as f: reader = csv.DictReader(f) for row in reader: + for key, value in row.items(): + if isinstance(value, str): + row[key] = clean_text(value) # Use local_path as primary key local_path = row.get("local_path", "").strip() filename = row.get("filename", "").strip() @@ -142,7 +146,13 @@ def _create_document( # Get metadata fields url = metadata.get("url", "") if metadata else "" author = metadata.get("author", "").strip() if metadata else "" - authors = author.split(",") if author else ["unknown"] + if ", " not in author: + return None + authors = ( + [name.strip() for name in author.split(",") if name.strip()] + if author + else ["unknown"] + ) title = metadata.get("title", "").strip() if metadata else "" date_str = metadata.get("date", "") if metadata else "" From d96ecc8a32959bf8b3eb40051d7b03ac590336de Mon Sep 17 00:00:00 2001 From: zho Date: Sat, 14 Mar 2026 03:15:34 +0800 Subject: [PATCH 2/5] #56-addressed some issues suggested by coderabbitai --- pinecone_rag/ingestion.py | 33 ++++++++++++++-------------- pinecone_rag/preprocessor/utility.py | 9 ++++---- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/pinecone_rag/ingestion.py b/pinecone_rag/ingestion.py index fa924f5..2434def 100644 --- a/pinecone_rag/ingestion.py +++ b/pinecone_rag/ingestion.py @@ -128,7 +128,13 @@ def _ensure_pinecone_client(self) -> None: def _active_api_key(self) -> str: """Return the API key for the currently selected instance.""" if self.instance == PineconeInstance.PRIVATE: - return self.config.private_api_key + key = self.config.private_api_key + if not (key and key.strip()): + raise ValueError( + "PineconeInstance.PRIVATE is selected but PINECONE_PRIVATE_API_KEY " + "is not set or is empty. Set the environment variable before using the private instance." + ) + return key return self.config.api_key def _get_or_create_indexes(self) -> None: @@ -764,6 +770,14 @@ def _update_by_id( process_type: ProcessType = ProcessType.UPDATE_DOCUMENT_BY_ID, ) -> Dict[str, Any]: """Update metadata only for existing vectors by ID.""" + if not records: + return self._build_process_result( + total_processed=0, + total=0, + errors=[], + failed_documents=[], + process_type=process_type, + ) processed, errors, failed_documents = 0, [], [] selected_keys = [] if process_type == ProcessType.UPDATE_DOCUMENT_BY_ID: @@ -789,11 +803,12 @@ def _update_by_id( continue update_metadata[key] = record[key] if not update_metadata: + missing = [k for k in selected_keys if k not in record] failed_documents.append( { "doc_id": doc_id, "type": record.get("type", "unknown"), - "reason": f"Key {key} not found in record", + "reason": f"Keys not found in record: {missing}", } ) continue @@ -928,20 +943,6 @@ def get_index_stats(self) -> Dict[str, Any]: """Get statistics about the Pinecone indexes.""" try: self._ensure_indexes_ready() - # spares_name=f"{self.config.index_name}-sparse" - # self.pc.configure_index( - # name=spares_name, - # embed={ - # "write_parameters": { - # "max_tokens_per_sequence": 2048, - # "truncate": "END", - # }, - # "read_parameters": { - # "max_tokens_per_sequence": 2048, - # "truncate": "END", - # }, - # }, - # ) dense_stats = self.dense_index.describe_index_stats() # type: ignore sparse_stats = self.sparse_index.describe_index_stats() # type: ignore return self._format_index_stats(dense_stats, sparse_stats) diff --git a/pinecone_rag/preprocessor/utility.py b/pinecone_rag/preprocessor/utility.py index 5a1a464..f4664c2 100644 --- a/pinecone_rag/preprocessor/utility.py +++ b/pinecone_rag/preprocessor/utility.py @@ -329,18 +329,17 @@ def clean_text(text: str, remove_extra_spaces: bool = True) -> str: .replace("\u2002", " ") .replace("\u2003", " ") .replace("\u2026", "...") - .replace("\u202f", "") - .replace("\\u00A0", " ") + .replace("\u202f", " ") # Narrow no-break space -> space (preserve word boundary) ) - # Normalize line breaks - text = re.sub(r"\s+", " ", text) + # Normalize line breaks first so later steps can limit consecutive newlines text = re.sub(r"\r\n", "\n", text) # Windows line breaks text = re.sub(r"\r", "\n", text) # Old Mac line breaks if remove_extra_spaces: - # Remove multiple spaces + # Collapse horizontal whitespace (spaces, tabs) without removing newlines text = re.sub(r" +", " ", text) + text = re.sub(r"[^\S\n]+", " ", text) # Remove multiple newlines (keep max 2) text = re.sub(r"\n{3,}", "\n\n", text) # Remove spaces at start/end of lines From ab4071c1e38f76204a8cd0cad69c5e3bde8caf38 Mon Sep 17 00:00:00 2001 From: zho Date: Sat, 14 Mar 2026 04:21:33 +0800 Subject: [PATCH 3/5] #56-update ingestion.py --- pinecone_rag/ingestion.py | 70 +++++++++++++++++++++++++++++---------- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/pinecone_rag/ingestion.py b/pinecone_rag/ingestion.py index 2434def..2be11ca 100644 --- a/pinecone_rag/ingestion.py +++ b/pinecone_rag/ingestion.py @@ -410,9 +410,14 @@ def process_documents( if process_mode == ProcessType.UPSERT: if records: - schema_fields = [ - k for k in records[0] if k not in ("id", "chunk_text") - ] + schema_fields = sorted( + { + k + for record in records + for k in record + if k not in ("id", "chunk_text") + } + ) self._ensure_namespace_schema(namespace, schema_fields) total_upserted, errors, failed_documents = self._upsert_records( records, namespace @@ -515,7 +520,7 @@ def _upsert_records( error_msg = f"Error upserting batch {batch_num}: {e}" logger.error(error_msg) errors.append(error_msg) - failed_docs.extend(self._mark_batch_failed(batch, e, i)) + failed_docs.extend(self._mark_batch_failed(filtered_batch, e, i)) if total_skipped_existing > 0: logger.info( @@ -698,17 +703,39 @@ def _upsert_batch( namespace: Optional[str], batch_num: int, ) -> None: - """Upsert batch to both indexes.""" + """Upsert batch to both indexes. On sparse failure, rolls back dense to avoid partial state.""" self._ensure_indexes_ready() + record_ids = [r.get("id") for r in records if r.get("id")] try: self.dense_index.upsert_records(records=records, namespace=namespace) + except Exception as e: + logger.error( + "Failed to upsert batch %s to dense index: %s. Records: %s", + batch_num, + e, + record_ids, + ) + raise + try: self.sparse_index.upsert_records(records=records, namespace=namespace) except Exception as e: - record_ids = [r.get("id", "unknown") for r in records] logger.error( - f"Failed to upsert batch {batch_num} to dense and sparse indexes: {e}. " - f"Records: {record_ids}" + "Sparse upsert failed for batch %s, rolling back dense upsert: %s", + batch_num, + e, ) + if record_ids and namespace: + try: + self.dense_index.delete(ids=record_ids, namespace=namespace) + logger.info( + "Rolled back %s records from dense index after sparse failure", + len(record_ids), + ) + except Exception as rollback_e: + logger.error( + "Rollback of dense index failed after sparse upsert error: %s", + rollback_e, + ) raise def _update_to_index( @@ -813,16 +840,23 @@ def _update_by_id( ) continue try: - self.dense_index.update( - id=doc_id, - namespace=namespace, - set_metadata=update_metadata, - ) - self.sparse_index.update( - id=doc_id, - namespace=namespace, - set_metadata=update_metadata, - ) + dense_kwargs: Dict[str, Any] = { + "id": doc_id, + "namespace": namespace, + "set_metadata": update_metadata, + } + sparse_kwargs: Dict[str, Any] = { + "id": doc_id, + "namespace": namespace, + "set_metadata": update_metadata, + } + if process_type == ProcessType.UPDATE_VALUE_BY_ID: + if record.get("values") is not None: + dense_kwargs["values"] = record["values"] + if record.get("sparse_values") is not None: + sparse_kwargs["sparse_values"] = record["sparse_values"] + self.dense_index.update(**dense_kwargs) + self.sparse_index.update(**sparse_kwargs) processed += 1 except Exception as e: error_msg = f"Error updating metadata for id={doc_id}: {e}" From 1c9de187c087f423651adb1332b8c11a403096f6 Mon Sep 17 00:00:00 2001 From: zho Date: Sat, 14 Mar 2026 04:32:27 +0800 Subject: [PATCH 4/5] #56-add filter_batch initialization --- pinecone_rag/ingestion.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pinecone_rag/ingestion.py b/pinecone_rag/ingestion.py index 2be11ca..0f2abbe 100644 --- a/pinecone_rag/ingestion.py +++ b/pinecone_rag/ingestion.py @@ -490,6 +490,7 @@ def _upsert_records( batch_start_idxs = list(range(0, len(documents), batch_size)) for batch_idx, i in enumerate(batch_start_idxs): batch = documents[i : i + batch_size] + filtered_batch = batch batch_num = batch_idx + 1 try: filtered_batch, skipped_existing = self._filter_existing_records( From 5c413947a72044431880dc401b3ce61446087878 Mon Sep 17 00:00:00 2001 From: zho Date: Sat, 14 Mar 2026 04:50:54 +0800 Subject: [PATCH 5/5] #56-fix ingestion.py/ensure_namespace_schema --- pinecone_rag/ingestion.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pinecone_rag/ingestion.py b/pinecone_rag/ingestion.py index 0f2abbe..05c2f4f 100644 --- a/pinecone_rag/ingestion.py +++ b/pinecone_rag/ingestion.py @@ -291,7 +291,10 @@ def _ensure_namespace_schema( schema_payload = {"fields": {f: {"filterable": True} for f in schema_fields}} - for index in (self.dense_index, self.sparse_index): + index_names = (self.config.index_name, f"{self.config.index_name}-sparse") + for index, index_name in zip( + (self.dense_index, self.sparse_index), index_names, strict=True + ): if index is None: continue try: @@ -300,12 +303,21 @@ def _ensure_namespace_schema( logger.debug( "Namespace '%s' already exists on index '%s', skipping schema creation", namespace, - getattr(index, "_index_name", "unknown"), + index_name, ) continue - index_host = index._config.host - # Strip any scheme prefix — index_host may already be "https://..." + index_desc = self.pc.describe_index(name=index_name) + index_host = getattr(index_desc, "host", None) or ( + index_desc.get("host") if isinstance(index_desc, dict) else None + ) + if not index_host: + logger.warning( + "No host in describe_index for '%s', skipping namespace schema creation", + index_name, + ) + continue + # Strip any scheme prefix — host may be "https://..." or plain hostname host_clean = index_host.removeprefix("https://").removeprefix("http://") response = requests.post( f"https://{host_clean}/namespaces",