diff --git a/examples/cookbook/haystack/.env.example b/examples/cookbook/haystack/.env.example new file mode 100644 index 00000000..d7e0d7fa --- /dev/null +++ b/examples/cookbook/haystack/.env.example @@ -0,0 +1,3 @@ +MOSS_PROJECT_ID=your-project-id +MOSS_PROJECT_KEY=your-project-key +GEMINI_API_KEY=your-gemini-key diff --git a/examples/cookbook/haystack/README.md b/examples/cookbook/haystack/README.md new file mode 100644 index 00000000..7edf40e2 --- /dev/null +++ b/examples/cookbook/haystack/README.md @@ -0,0 +1,140 @@ +# Haystack + Moss Cookbook Example + +Use [Moss](https://moss.dev) as realtime semantic search in [Haystack](https://haystack.deepset.ai/) RAG pipelines. Moss provides sub-10ms semantic search, Haystack orchestrates the retrieval-to-generation pipeline. + +> **Note:** This is a cookbook example, not a packaged integration. `moss_haystack.py` is a self-contained module you can adapt into your own project. + +## Installation + +```bash +pip install haystack-ai moss python-dotenv +``` + +## Setup + +Set your credentials in a `.env` file (see `.env.example`): + +```bash +MOSS_PROJECT_ID=your-project-id +MOSS_PROJECT_KEY=your-project-key +GEMINI_API_KEY=your-gemini-key +``` + +## Quick Start + +```python +from haystack import Document +from moss_haystack import MossDocumentStore, MossRetriever + +store = MossDocumentStore(index_name="knowledge-base") +store.write_documents([ + Document(id="1", content="I wake up at 6:30 AM on weekdays."), + Document(id="2", content="Cold showers improve circulation and alertness."), +]) + +retriever = MossRetriever(document_store=store, top_k=3) +retriever.load_index() +result = retriever.run(query="when do I wake up?") + +for doc in result["documents"]: + print(f"[{doc.score:.2f}] {doc.content}") +``` + +## Demo: Multi-Index Life Assistant + +The included `example_usage.py` runs an interactive CLI life assistant with **keyword-based routing** across two Moss indexes: + +``` +User Question + | + v +Keyword Router + | + +-- personal ("my", "I", "me") --> MossRetriever (life-personal) + | | + +-- general ("how to", "tips") --> MossRetriever (life-general) + | | + +-- combined (both or neither) --> Both retrievers → DocumentJoiner + | + v + PromptBuilder → Gemini LLM + | + v + Final Answer +``` + +### How it works + +1. **Two Moss indexes** with synthetic data: + - `life-personal` (15 docs) — daily routines, fitness schedule, diet, sleep habits + - `life-general` (15 docs) — tips, research, and advice on health, fitness, productivity + +2. **Keyword router** classifies queries: + - Personal pronouns ("my", "I", "me") → search personal index + - General keywords ("how to", "benefits", "tips") → search general index + - Both or neither → search both indexes and join results + +3. **Haystack RAG pipeline** retrieves docs → builds prompt → generates answer via Gemini + +### Run the demo + +```bash +cd examples/cookbook/haystack +python example_usage.py +``` + +``` +=== Life Assistant (Haystack + Moss) === +Ask about your habits or get general advice. +Type 'quit' to exit. + +You: What is my gym routine? + [Routed to: personal] +Assistant: You go to the gym Monday, Wednesday, and Friday... + +You: What are the benefits of cold showers? + [Routed to: general] +Assistant: Cold exposure therapy benefits include improved circulation... + +You: Should I change my morning routine? + [Routed to: combined] +Assistant: Your current morning routine includes yoga and lemon water... +``` + +## Components + +### MossDocumentStore + +Implements Haystack's `DocumentStore` protocol. Creates its own `MossClient` from credentials. + +| Method | Description | +|--------|-------------| +| `write_documents(docs, policy)` | Write documents. First call creates the index, subsequent calls upsert. | +| `count_documents()` | Return document count | +| `delete_documents(ids)` | Delete documents by ID | +| `load_index()` | Download index for fast local queries | + +### MossRetriever + +Haystack `@component` for semantic search. + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `document_store` | required | MossDocumentStore instance | +| `top_k` | 5 | Number of results | +| `alpha` | 0.8 | Hybrid search balance (0=keyword, 1=semantic) | + +| Method | Description | +|--------|-------------| +| `load_index()` | Load Moss index for fast local queries | +| `run(query, top_k)` | Search and return `{"documents": list[Document]}` | + +## Files + +| File | Description | +|------|-------------| +| `moss_haystack.py` | MossDocumentStore + MossRetriever implementation | +| `example_usage.py` | Multi-index life assistant with keyword routing | +| `data/` | Synthetic data: `personal_habits.json`, `general_knowledge.json` | +| `test_live.py` | Live platform tests | +| `.env.example` | Template for required environment variables | diff --git a/examples/cookbook/haystack/data/general_knowledge.json b/examples/cookbook/haystack/data/general_knowledge.json new file mode 100644 index 00000000..8f7b28d8 --- /dev/null +++ b/examples/cookbook/haystack/data/general_knowledge.json @@ -0,0 +1,77 @@ +[ + { + "id": "gen-001", + "text": "The Pomodoro Technique was invented by Francesco Cirillo in the late 1980s. It uses a timer to break work into 25-minute intervals separated by short breaks. Named after a tomato-shaped kitchen timer.", + "metadata": {"category": "productivity", "topic": "techniques"} + }, + { + "id": "gen-002", + "text": "Intermittent fasting 16:8 means fasting for 16 hours and eating within an 8-hour window. Studies show it can improve insulin sensitivity, reduce inflammation, and support weight management.", + "metadata": {"category": "health", "topic": "nutrition"} + }, + { + "id": "gen-003", + "text": "A good 5K time for a beginner runner is 30-35 minutes. Intermediate runners aim for 22-25 minutes. Elite runners complete it under 17 minutes. Consistent training can improve times by 1-2 minutes per month.", + "metadata": {"category": "fitness", "topic": "running"} + }, + { + "id": "gen-004", + "text": "Cold exposure therapy benefits include improved circulation, reduced inflammation, enhanced immune function, and increased mental resilience. Start with 30 seconds and gradually increase to 2-5 minutes.", + "metadata": {"category": "wellness", "topic": "recovery"} + }, + { + "id": "gen-005", + "text": "To build a consistent meditation habit, start with just 5 minutes daily. Guided apps like Headspace, Calm, or Insight Timer can help. Morning meditation pairs well with a morning routine for consistency.", + "metadata": {"category": "wellness", "topic": "meditation"} + }, + { + "id": "gen-006", + "text": "The ideal daily water intake is approximately 3.7 liters for men and 2.7 liters for women, including water from food. Active people and those in hot climates need more. Signs of dehydration include dark urine and fatigue.", + "metadata": {"category": "health", "topic": "hydration"} + }, + { + "id": "gen-007", + "text": "Meal prepping saves an average of 5-7 hours per week on cooking. Best containers are glass with snap-lock lids. Most prepped meals stay fresh for 4-5 days in the fridge. Chicken, rice, and vegetables are the most popular combo.", + "metadata": {"category": "nutrition", "topic": "meal-prep"} + }, + { + "id": "gen-008", + "text": "The best time to exercise for muscle growth is between 2-6 PM when body temperature peaks and testosterone levels are higher. Morning exercise is better for fat burning and establishing consistency.", + "metadata": {"category": "fitness", "topic": "timing"} + }, + { + "id": "gen-009", + "text": "Sleep hygiene tips: keep your room at 65-68F (18-20C), avoid screens 1 hour before bed, maintain a consistent schedule, and limit caffeine after noon. 7-9 hours of sleep is optimal for adults.", + "metadata": {"category": "health", "topic": "sleep"} + }, + { + "id": "gen-010", + "text": "Walking after meals for 15-30 minutes can lower blood sugar by up to 30%. It also improves digestion and reduces bloating. Even a slow-paced walk is effective.", + "metadata": {"category": "health", "topic": "walking"} + }, + { + "id": "gen-011", + "text": "The weekly review is a core practice from Getting Things Done (GTD) by David Allen. It involves reviewing all projects, clearing inboxes, and planning the next week. Best done on Sunday evenings.", + "metadata": {"category": "productivity", "topic": "planning"} + }, + { + "id": "gen-012", + "text": "Yoga for beginners: start with 15-20 minutes of basic poses — downward dog, warrior I and II, child's pose, and cat-cow. Consistency matters more than duration. Morning yoga improves flexibility and reduces stress.", + "metadata": {"category": "fitness", "topic": "yoga"} + }, + { + "id": "gen-013", + "text": "Oat milk has become the most popular plant-based milk alternative. It's naturally sweet, froths well for coffee, and has more fiber than almond milk. However, it's higher in calories and carbs than almond milk.", + "metadata": {"category": "nutrition", "topic": "alternatives"} + }, + { + "id": "gen-014", + "text": "Reading before bed improves sleep quality and reduces stress by 68% according to a University of Sussex study. Physical books are better than screens. Even 6 minutes of reading can lower heart rate and ease tension.", + "metadata": {"category": "wellness", "topic": "reading"} + }, + { + "id": "gen-015", + "text": "The two-alarm technique: set alarms 5 minutes apart. The first alarm starts your wake-up process, the second confirms it. Place the second alarm across the room to force getting out of bed.", + "metadata": {"category": "productivity", "topic": "wake-up"} + } +] diff --git a/examples/cookbook/haystack/data/personal_habits.json b/examples/cookbook/haystack/data/personal_habits.json new file mode 100644 index 00000000..80d6e0e2 --- /dev/null +++ b/examples/cookbook/haystack/data/personal_habits.json @@ -0,0 +1,77 @@ +[ + { + "id": "habit-001", + "text": "I wake up at 6:30 AM on weekdays and 8:00 AM on weekends. I set two alarms 5 minutes apart.", + "metadata": {"category": "routine", "time": "morning"} + }, + { + "id": "habit-002", + "text": "My morning routine: wake up, drink a glass of warm lemon water, 20 minutes of yoga, then shower. Takes about 45 minutes total.", + "metadata": {"category": "routine", "time": "morning"} + }, + { + "id": "habit-003", + "text": "I do intermittent fasting 16:8. My eating window is 12 PM to 8 PM. I skip breakfast and have black coffee instead.", + "metadata": {"category": "diet", "time": "daily"} + }, + { + "id": "habit-004", + "text": "I go to the gym Monday, Wednesday, and Friday. Upper body on Monday, legs on Wednesday, full body on Friday. Each session is 45-60 minutes.", + "metadata": {"category": "fitness", "time": "weekly"} + }, + { + "id": "habit-005", + "text": "I run 5K every Tuesday and Thursday morning at 7 AM in the park near my house. My current best time is 24 minutes.", + "metadata": {"category": "fitness", "time": "weekly"} + }, + { + "id": "habit-006", + "text": "I read for 30 minutes before bed every night. Currently reading non-fiction books about psychology and habits.", + "metadata": {"category": "routine", "time": "evening"} + }, + { + "id": "habit-007", + "text": "I meditate for 10 minutes every morning using the Headspace app. I prefer guided meditation focused on focus and clarity.", + "metadata": {"category": "wellness", "time": "morning"} + }, + { + "id": "habit-008", + "text": "I meal prep every Sunday for the week. Usually cook chicken, rice, and roasted vegetables. Takes about 2 hours.", + "metadata": {"category": "diet", "time": "weekly"} + }, + { + "id": "habit-009", + "text": "I drink at least 3 liters of water daily. I use a marked water bottle to track intake throughout the day.", + "metadata": {"category": "health", "time": "daily"} + }, + { + "id": "habit-010", + "text": "My sleep schedule is 10:30 PM to 6:30 AM. I use night mode on all devices after 9 PM and avoid screens 30 minutes before bed.", + "metadata": {"category": "routine", "time": "evening"} + }, + { + "id": "habit-011", + "text": "I take a 15-minute walk after lunch every day. It helps with digestion and gives me a mental break from work.", + "metadata": {"category": "wellness", "time": "afternoon"} + }, + { + "id": "habit-012", + "text": "I do a weekly review every Sunday evening. I plan the upcoming week, review goals, and journal about what went well and what to improve.", + "metadata": {"category": "productivity", "time": "weekly"} + }, + { + "id": "habit-013", + "text": "I limit coffee to 2 cups per day, both before noon. I switched from regular milk to oat milk six months ago.", + "metadata": {"category": "diet", "time": "daily"} + }, + { + "id": "habit-014", + "text": "I practice cold showers every morning for the last 3 months. Started with 30 seconds, now up to 2 minutes. Great for alertness.", + "metadata": {"category": "wellness", "time": "morning"} + }, + { + "id": "habit-015", + "text": "I use the Pomodoro technique for work: 25 minutes focused work, 5 minute break. I do 8 pomodoros on a productive day.", + "metadata": {"category": "productivity", "time": "daily"} + } +] diff --git a/examples/cookbook/haystack/example_usage.py b/examples/cookbook/haystack/example_usage.py new file mode 100644 index 00000000..a033897d --- /dev/null +++ b/examples/cookbook/haystack/example_usage.py @@ -0,0 +1,179 @@ +import json +import os + +from dotenv import load_dotenv +from haystack import Document, Pipeline +from haystack.components.builders import PromptBuilder +from haystack.components.generators import OpenAIGenerator +from haystack.components.joiners import DocumentJoiner +from haystack.utils import Secret + +from moss_haystack import MossDocumentStore, MossRetriever + +load_dotenv() + +DATA_DIR = os.path.join(os.path.dirname(__file__), "data") + +PROMPT_TEMPLATE = """ +You are a helpful life assistant. You have access to two knowledge bases: +- Personal habits: the user's daily routines, fitness schedule, diet, and preferences +- General knowledge: tips, research, and advice on health, fitness, and productivity + +Answer the question using the context below. If it's a personal question, be specific +about the user's habits. If it's general, give helpful advice. If both are relevant, +combine personal context with general advice. + +Context: +{% for doc in documents %} +- {{ doc.content }} +{% endfor %} + +Question: {{ query }} + +Answer: +""" + + +PERSONAL_KEYWORDS = [ + " my ", " i ", " me ", " mine ", "do i ", "am i ","my routine", "my schedule", "my diet", "my workout", +] + +GENERAL_KEYWORDS = [ + "how to", "tips for", "benefits of", "best way", + "recommend", "ideal", "research", "studies show", "how much", + "how long should", "how often should", +] + + +def route_query(query: str) -> str: + """Route query based on keywords: personal, general, or combined.""" + q = f" {query.lower()} " + is_personal = any(kw in q for kw in PERSONAL_KEYWORDS) + is_general = any(kw in q for kw in GENERAL_KEYWORDS) + + if is_personal and is_general: + return "combined" + if is_personal: + return "personal" + if is_general: + return "general" + return "combined" + + +def load_index(name, data_file): + """Load data into a Moss index.""" + with open(os.path.join(DATA_DIR, data_file)) as f: + raw = json.load(f) + + docs = [ + Document(id=item["id"], content=item["text"], meta=item.get("metadata", {})) + for item in raw + ] + + store = MossDocumentStore(index_name=name) + try: + store.count_documents() + print(f" '{name}' already exists.") + except RuntimeError: + print(f" Creating '{name}' ({len(docs)} docs)...") + store.write_documents(docs) + + return store + + +def make_rag_pipe(retriever): + """Build a RAG pipeline: retriever → prompt → LLM.""" + pipe = Pipeline() + pipe.add_component("retriever", retriever) + pipe.add_component("prompt", PromptBuilder( + template=PROMPT_TEMPLATE, required_variables=["documents", "query"] + )) + pipe.add_component("llm", OpenAIGenerator( + api_key=Secret.from_token(os.getenv("GEMINI_API_KEY")), + model="gemini-3-flash-preview", + api_base_url="https://generativelanguage.googleapis.com/v1beta/openai", + )) + pipe.connect("retriever.documents", "prompt.documents") + pipe.connect("prompt", "llm") + return pipe + + +def build_pipelines(personal_store, general_store): + """Build three RAG pipelines — personal, general, and combined.""" + # Combined: both retrievers → joiner → prompt → LLM + combined = Pipeline() + combined.add_component( + "personal_retriever", MossRetriever(document_store=personal_store, top_k=3) + ) + combined.add_component( + "general_retriever", MossRetriever(document_store=general_store, top_k=3) + ) + combined.add_component("joiner", DocumentJoiner()) + combined.add_component("prompt", PromptBuilder( + template=PROMPT_TEMPLATE, required_variables=["documents", "query"] + )) + combined.add_component("llm", OpenAIGenerator( + api_key=Secret.from_token(os.getenv("GEMINI_API_KEY")), + model="gemini-3-flash-preview", + api_base_url="https://generativelanguage.googleapis.com/v1beta/openai", + )) + combined.connect("personal_retriever.documents", "joiner.documents") + combined.connect("general_retriever.documents", "joiner.documents") + combined.connect("joiner.documents", "prompt.documents") + combined.connect("prompt", "llm") + + return { + "personal": make_rag_pipe(MossRetriever(document_store=personal_store, top_k=3)), + "general": make_rag_pipe(MossRetriever(document_store=general_store, top_k=3)), + "combined": combined, + } + + +def main(): + print("Setting up indexes...") + personal_store = load_index("life-personal", "personal_habits.json") + general_store = load_index("life-general", "general_knowledge.json") + print() + + pipes = build_pipelines(personal_store, general_store) + + print("=== Life Assistant (Haystack + Moss) ===") + print("Ask about your habits or get general advice.") + print("Type 'quit' to exit.\n") + + while True: + try: + question = input("You: ").strip() + except (EOFError, KeyboardInterrupt): + print("\nGoodbye!") + break + if not question: + continue + if question.lower() in ("quit", "exit", "q"): + print("Goodbye!") + break + + route = route_query(question) + pipe = pipes[route] + print(f" [Routed to: {route}]") + + try: + if route == "combined": + result = pipe.run({ + "personal_retriever": {"query": question}, + "general_retriever": {"query": question}, + "prompt": {"query": question}, + }) + else: + result = pipe.run({ + "retriever": {"query": question}, + "prompt": {"query": question}, + }) + answer = result["llm"]["replies"][0] + print(f"\nAssistant: {answer}\n") + except Exception as e: + print(f"\nError: {e}\n") + + +if __name__ == "__main__": + main() diff --git a/examples/cookbook/haystack/moss_haystack.py b/examples/cookbook/haystack/moss_haystack.py new file mode 100644 index 00000000..fd68d1df --- /dev/null +++ b/examples/cookbook/haystack/moss_haystack.py @@ -0,0 +1,306 @@ +"""Moss integration for Haystack pipelines. + +This is a cookbook example, not a full production-grade DocumentStore. + +Notably, ``MossDocumentStore.filter_documents`` supports the common +``filters=None`` case (returning all documents) but does not translate +Haystack's full filter DSL (``$eq``, ``$and``, ``$or``, ``$in``, ``$not``, +nested combinations) into Moss queries. Implementing that fully would +require a client-side filter evaluator, since Moss's ``get_docs`` does not +accept filters — and that creates a footgun on large indexes. + +For filtered retrieval, use ``MossRetriever`` with +``QueryOptions(filter=...)`` at query time; Moss supports this natively +server-side. +""" + +import asyncio +import json +import os +from typing import Any, Optional + +from haystack import Document, component, default_from_dict, default_to_dict +from haystack.document_stores.types import DocumentStore, DuplicatePolicy +from haystack.utils import Secret +from moss import ( + DocumentInfo, + GetDocumentsOptions, + MossClient, + MutationOptions, + QueryOptions, +) + + +def _run_async(coro: Any) -> Any: + """Run an async coroutine synchronously.""" + try: + return asyncio.run(coro) + except RuntimeError as e: + if "cannot be called from a running event loop" in str(e): + raise RuntimeError( + "Cannot run sync in an existing event loop. " + "Use nest_asyncio or run from a standard script." + ) from e + raise + + +_MOSS_TYPED_PREFIX = "__moss_typed__:" + + +def _serialize_metadata(meta: Optional[dict]) -> Optional[dict]: + """Convert arbitrary-typed metadata to Moss string-only metadata. + + Moss only accepts string values in metadata. Non-string values are + JSON-encoded and prefixed with ``__moss_typed__:`` so the deserializer + can recover the original type. Plain strings are stored as-is. + """ + if meta is None: + return None + + result = {} + for k, v in meta.items(): + if isinstance(v, str): + result[k] = v + else: + result[k] = f"{_MOSS_TYPED_PREFIX}{json.dumps(v)}" + return result + + +def _deserialize_metadata(meta: Optional[dict]) -> dict: + """Convert Moss string metadata back to original types. + + Values prefixed with ``__moss_typed__:`` are JSON-decoded to restore + their original type. All other values are returned as plain strings. + """ + if meta is None: + return {} + + result = {} + for k, v in meta.items(): + if isinstance(v, str) and v.startswith(_MOSS_TYPED_PREFIX): + json_str = v[len(_MOSS_TYPED_PREFIX):] + try: + result[k] = json.loads(json_str) + except (json.JSONDecodeError, TypeError): + result[k] = v + else: + result[k] = v + return result + + +def _haystack_doc_to_moss(doc: Document) -> DocumentInfo: + """Convert Haystack Document to Moss DocumentInfo.""" + return DocumentInfo( + id=doc.id, + text=doc.content or "", + metadata=_serialize_metadata(doc.meta), + embedding=doc.embedding, + ) + + +def _moss_doc_to_haystack(doc: Any, score: Optional[float] = None) -> Document: + """Convert Moss DocumentInfo/QueryResultDocumentInfo to Haystack Document.""" + return Document( + id=doc.id, + content=doc.text, + meta=_deserialize_metadata(getattr(doc, "metadata", None)), + embedding=getattr(doc, "embedding", None), + score=score if score is not None else getattr(doc, "score", None), + ) + + +# --- Document Store --- +class MossDocumentStore(DocumentStore): + """Moss-backed document store implementing the Haystack DocumentStore protocol.""" + + def __init__( + self, + project_id: Optional[str] = None, + project_key: Optional[Secret] = None, + index_name: str = "default", + model_id: str = "moss-minilm", + ): + self.project_id = project_id or os.getenv("MOSS_PROJECT_ID") + + # Accept Secret, raw string, or fall back to env var + if isinstance(project_key, Secret): + self.project_key = project_key + elif isinstance(project_key, str): + self.project_key = Secret.from_token(project_key) + else: + self.project_key = Secret.from_env_var("MOSS_PROJECT_KEY") + + resolved_key = self.project_key.resolve_value() + if not self.project_id or not resolved_key: + raise ValueError( + "Moss credentials required. Pass project_id/project_key " + "or set MOSS_PROJECT_ID/MOSS_PROJECT_KEY env vars." + ) + self.client = MossClient(self.project_id, resolved_key) + self.index_name = index_name + self.model_id = model_id + self._index_created = False + self._index_loaded = False + + def to_dict(self) -> dict[str, Any]: + return default_to_dict( + self, + project_id=self.project_id, + project_key=self.project_key.to_dict(), + index_name=self.index_name, + model_id=self.model_id, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "MossDocumentStore": + data["init_parameters"]["project_key"] = Secret.from_dict( + data["init_parameters"]["project_key"] + ) + return default_from_dict(cls, data) + + def count_documents(self) -> int: + info = _run_async(self.client.get_index(self.index_name)) + return getattr(info, "doc_count", 0) + + def write_documents( + self, + documents: list[Document], + policy: DuplicatePolicy = DuplicatePolicy.OVERWRITE, + ) -> int: + docs = [_haystack_doc_to_moss(doc) for doc in documents] + + if not self._index_created: + try: + _run_async( + self.client.create_index(self.index_name, docs, self.model_id) + ) + self._index_created = True + self._index_loaded = False + return len(docs) + except RuntimeError as e: + if "already exists" not in str(e): + raise + self._index_created = True + # Index exists — fall through to add/upsert docs below + + if policy == DuplicatePolicy.SKIP: + candidate_ids = [d.id for d in docs] + existing = _run_async( + self.client.get_docs( + self.index_name, GetDocumentsOptions(doc_ids=candidate_ids) + ) + ) + existing_ids = {doc.id for doc in existing} + docs = [d for d in docs if d.id not in existing_ids] + if not docs: + return 0 + + options = ( + MutationOptions(upsert=True) + if policy == DuplicatePolicy.OVERWRITE + else None + ) + _run_async(self.client.add_docs(self.index_name, docs, options)) + self._index_loaded = False + return len(docs) + + def delete_documents(self, document_ids: list[str]) -> None: + _run_async(self.client.delete_docs(self.index_name, document_ids)) + self._index_loaded = False + + def filter_documents(self, filters: Optional[dict] = None) -> list[Document]: + """Return documents from the index. + + When ``filters`` is None, returns every document in the index — the + common case used by Haystack writers and evaluation helpers. + + Filtered retrieval is not supported through this method because Moss + uses its own filter syntax (``$eq``, ``$and``, ``$in``, ``$near``) + applied at query time. For filtered search, use ``MossRetriever`` + with ``QueryOptions(filter=...)``. + """ + if filters: + raise NotImplementedError( + "Haystack-style filter_documents(filters=...) is not supported. " + "For filtered retrieval, pass filters via QueryOptions to " + "MossRetriever.run(). Call filter_documents(filters=None) to " + "fetch all documents." + ) + moss_docs = _run_async(self.client.get_docs(self.index_name)) + return [_moss_doc_to_haystack(doc) for doc in moss_docs] + + def load_index(self) -> None: + """Download index for fast local querying.""" + if not self._index_loaded: + _run_async(self.client.load_index(self.index_name)) + self._index_loaded = True + + +# --- Retriever --- +@component +class MossRetriever: + """Haystack retriever component backed by Moss semantic search. + + Use in a Haystack Pipeline to retrieve documents from a Moss index. + """ + + def __init__( + self, + document_store: MossDocumentStore, + top_k: int = 5, + alpha: float = 0.8, + ): + self.document_store = document_store + self.top_k = top_k + self.alpha = alpha + + def load_index(self) -> None: + """Load the Moss index for fast local queries.""" + self.document_store.load_index() + + def to_dict(self) -> dict[str, Any]: + return default_to_dict( + self, + document_store=self.document_store.to_dict(), + top_k=self.top_k, + alpha=self.alpha, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "MossRetriever": + data["init_parameters"]["document_store"] = MossDocumentStore.from_dict( + data["init_parameters"]["document_store"] + ) + return default_from_dict(cls, data) + + @component.output_types(documents=list[Document]) + def run( + self, + query: str, + top_k: Optional[int] = None, + ) -> dict[str, list[Document]]: + """Run semantic search against Moss index. + + Args: + query: Search query text. + top_k: Override default number of results. + + Returns: + {"documents": list[Document]} with scores. + """ + self.document_store.load_index() + + resolved_top_k = top_k if top_k is not None else self.top_k + opts = QueryOptions(top_k=resolved_top_k, alpha=self.alpha) + + results = _run_async( + self.document_store.client.query( + self.document_store.index_name, query, opts + ) + ) + + documents = [ + _moss_doc_to_haystack(doc, score=doc.score) for doc in results.docs + ] + + return {"documents": documents} diff --git a/examples/cookbook/haystack/pyproject.toml b/examples/cookbook/haystack/pyproject.toml new file mode 100644 index 00000000..58400b10 --- /dev/null +++ b/examples/cookbook/haystack/pyproject.toml @@ -0,0 +1,28 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "haystack-moss" +version = "0.1.0" +description = "Haystack integration for Moss semantic search" +readme = "README.md" +requires-python = ">=3.11,<3.14" +license = { text = "BSD-2-Clause" } +authors = [ + { name = "InferEdge Inc.", email = "contact@moss.dev" } +] +dependencies = [ + "haystack-ai", + "moss>=1.0.0", + "python-dotenv", +] + +[tool.hatch.build.targets.wheel] +packages = ["moss_haystack.py"] + +[tool.hatch.build.targets.sdist] +include = [ + "README.md", + "moss_haystack.py", +] diff --git a/examples/cookbook/haystack/test_live.py b/examples/cookbook/haystack/test_live.py new file mode 100644 index 00000000..44879ac0 --- /dev/null +++ b/examples/cookbook/haystack/test_live.py @@ -0,0 +1,99 @@ +import asyncio +import os + +import pytest +from dotenv import load_dotenv +from haystack import Document +from haystack.document_stores.types import DuplicatePolicy +from moss import MossClient + +from moss_haystack import MossDocumentStore, MossRetriever + +load_dotenv() + +PROJECT_ID = os.getenv("MOSS_PROJECT_ID") +PROJECT_KEY = os.getenv("MOSS_PROJECT_KEY") +TEST_INDEX = "haystack-live-test" + +skip_no_creds = pytest.mark.skipif( + not PROJECT_ID or not PROJECT_KEY, + reason="MOSS_PROJECT_ID and MOSS_PROJECT_KEY must be set", +) + + +@skip_no_creds +class TestHaystackLive: + """Live integration tests against the Moss platform.""" + + @pytest.fixture(autouse=True) + def setup_and_teardown(self): + """Create client, store, and clean up index after tests.""" + self.client = MossClient(PROJECT_ID, PROJECT_KEY) + self.store = MossDocumentStore( + project_id=PROJECT_ID, project_key=PROJECT_KEY, index_name=TEST_INDEX + ) + self.docs = [ + Document(id="doc-1", content="Moss delivers sub-10ms semantic search."), + Document( + id="doc-2", + content="CrewAI is a multi-agent orchestration framework.", + ), + Document(id="doc-3", content="Python is a popular programming language."), + Document( + id="doc-4", + content="Vector databases store embeddings for similarity search.", + ), + Document( + id="doc-5", + content="Hybrid search combines semantic and keyword matching.", + ), + ] + yield + # Cleanup: delete the test index + try: + asyncio.run(self.client.delete_index(TEST_INDEX)) + except Exception: + pass + + def test_write_documents_creates_index(self): + count = self.store.write_documents(self.docs, policy=DuplicatePolicy.OVERWRITE) + assert count == 5 + + def test_count_documents(self): + self.store.write_documents(self.docs, policy=DuplicatePolicy.OVERWRITE) + count = self.store.count_documents() + assert count == 5 + + def test_write_documents_upsert(self): + self.store.write_documents(self.docs, policy=DuplicatePolicy.OVERWRITE) + extra = [Document(id="doc-6", content="Reranking improves search quality.")] + count = self.store.write_documents(extra, policy=DuplicatePolicy.OVERWRITE) + assert count == 1 + + def test_filter_documents_returns_all_when_no_filters(self): + self.store.write_documents(self.docs, policy=DuplicatePolicy.OVERWRITE) + all_docs = self.store.filter_documents() + assert len(all_docs) == len(self.docs) + ids = {d.id for d in all_docs} + assert ids == {d.id for d in self.docs} + + def test_retriever_run(self): + self.store.write_documents(self.docs, policy=DuplicatePolicy.OVERWRITE) + self.store.load_index() + retriever = MossRetriever(document_store=self.store, top_k=3) + result = retriever.run(query="semantic search latency") + assert len(result["documents"]) > 0 + assert result["documents"][0].score > 0 + + def test_delete_documents(self): + self.store.write_documents(self.docs, policy=DuplicatePolicy.OVERWRITE) + self.store.delete_documents(["doc-5"]) + count = self.store.count_documents() + assert count == 4 + + def test_delete_index(self): + self.store.write_documents(self.docs, policy=DuplicatePolicy.OVERWRITE) + asyncio.run(self.client.delete_index(TEST_INDEX)) + indexes = asyncio.run(self.client.list_indexes()) + names = [idx.name for idx in indexes] + assert TEST_INDEX not in names