Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ ctx.add(
)
print(ctx.get(external_id="conversation-2026-03-01#turn-1"))
ctx.delete(external_id="conversation-2026-03-01#turn-1")

# Cross-modal retrieval: plug in a multi-modal (e.g. CLIP-style) embedder that
# maps text and media into one shared space. Images are auto-embedded via
# `embed_media`; a text query (embedded via `embed_texts`) then retrieves them.
# lance-context bundles no models — supply your own provider.
clip_ctx = Context.create("multimodal.lance", embedding_dim=512,
embedding_provider=my_clip_provider) # implements MultiModalEmbeddingProvider
clip_ctx.add("user", image_bytes, content_type="image/png", external_id="img-1")
results = clip_ctx.search("a photo of a cat") # text query -> image results
assert ctx.get(external_id="conversation-2026-03-01#turn-1") is None

# Scoped recall and provenance-oriented metadata
Expand Down Expand Up @@ -137,6 +146,12 @@ hits = ctx.search(
)
service_context = ctx.related("service://service-a", relation="describes")

# Multi-modal-friendly reads: skip large media bytes for metadata/search
# queries, then fetch a record's bytes on demand.
lean = ctx.list(filters={"content_type": "image/png"}, include_binary=False)
image_bytes = ctx.get_blob(lean[0]["id"])
hits = ctx.search(query_embedding, include_binary=False) # no bytes pulled into results

# Hybrid retrieval combines lexical recall, vector recall, and existing filters
# over the same context records.
hybrid_hits = ctx.retrieve(
Expand Down
2 changes: 1 addition & 1 deletion crates/lance-context-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use record::{
};
pub use store::{
CompactionConfig, CompactionStats, ContextStore, ContextStoreOptions, DistanceMetric,
IdIndexType,
IdIndexType, ReadProjection,
};

// Re-export CompactionMetrics from lance for Python bindings
Expand Down
272 changes: 252 additions & 20 deletions crates/lance-context-core/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,55 @@ struct ExternalIdState {
has_non_tombstone: bool,
}

/// Which large/optional payload columns to load on a read.
///
/// Excluding `binary` (and optionally `text` / `embedding`) lets metadata and
/// search queries avoid materializing large media bytes; the omitted fields
/// come back as `None`. Fetch a single record's bytes on demand with
/// [`ContextStore::get_blob`]. Defaults to loading everything (backward
/// compatible).
#[derive(Debug, Clone, Copy)]
pub struct ReadProjection {
pub text: bool,
pub binary: bool,
pub embedding: bool,
}

impl Default for ReadProjection {
fn default() -> Self {
Self {
text: true,
binary: true,
embedding: true,
}
}
}

impl ReadProjection {
/// Load only scalar/metadata columns (no text, binary, or embedding).
#[must_use]
pub fn metadata_only() -> Self {
Self {
text: false,
binary: false,
embedding: false,
}
}

/// Load everything except the (potentially large) `binary_payload`.
#[must_use]
pub fn without_binary() -> Self {
Self {
binary: false,
..Self::default()
}
}

fn loads_all(self) -> bool {
self.text && self.binary && self.embedding
}
}

impl ContextStore {
/// Open an existing context dataset or create a new one with the project schema.
pub async fn open(uri: &str) -> LanceResult<Self> {
Expand Down Expand Up @@ -1164,7 +1213,23 @@ impl ContextStore {
filters: Option<&RecordFilters>,
options: LifecycleQueryOptions,
) -> LanceResult<Vec<ContextRecord>> {
let scanner = self.lsm_scanner().await?;
self.list_filtered_projected(limit, offset, filters, options, ReadProjection::default())
.await
}

/// Like [`Self::list_filtered_with_options`] but with column projection, so
/// large payload columns can be skipped (see [`ReadProjection`]). Omitted
/// payloads come back as `None`; fetch bytes on demand via
/// [`Self::get_blob`].
pub async fn list_filtered_projected(
&self,
limit: Option<usize>,
offset: Option<usize>,
filters: Option<&RecordFilters>,
options: LifecycleQueryOptions,
projection: ReadProjection,
) -> LanceResult<Vec<ContextRecord>> {
let scanner = self.lsm_scanner_projected(projection).await?;
let mut stream = scanner.try_into_stream().await?;
let mut results = Vec::new();
while let Some(batch) = stream.try_next().await? {
Expand Down Expand Up @@ -1296,6 +1361,22 @@ impl ContextStore {
limit: Option<usize>,
filters: Option<&RecordFilters>,
options: LifecycleQueryOptions,
) -> LanceResult<Vec<SearchResult>> {
self.search_filtered_projected(query, limit, filters, options, ReadProjection::default())
.await
}

/// Like [`Self::search_filtered_with_options`] but with column projection on
/// the returned records (see [`ReadProjection`]). Embeddings are always read
/// internally to score, then dropped from the results if `projection.embedding`
/// is `false`.
pub async fn search_filtered_projected(
&self,
query: &[f32],
limit: Option<usize>,
filters: Option<&RecordFilters>,
options: LifecycleQueryOptions,
projection: ReadProjection,
) -> LanceResult<Vec<SearchResult>> {
validate_query_dimension(query, self.embedding_dim)?;

Expand All @@ -1304,14 +1385,23 @@ impl ContextStore {
return Ok(Vec::new());
}

// Embedding is required to score; force it on for the scan but honor the
// caller's text/binary choices.
let scan_projection = ReadProjection {
embedding: true,
..projection
};
let mut results: Vec<SearchResult> = self
.list_filtered_with_options(None, None, filters, options)
.list_filtered_projected(None, None, filters, options, scan_projection)
.await?
.into_iter()
.filter_map(|record| {
.filter_map(|mut record| {
let distance = self
.distance_metric
.distance(query, record.embedding.as_ref()?);
if !projection.embedding {
record.embedding = None;
}
Some(SearchResult { record, distance })
})
.collect();
Expand Down Expand Up @@ -1449,6 +1539,60 @@ impl ContextStore {
))
}

/// Top-level column names to read for a projection (drops the excluded
/// payload columns; everything else is always loaded so lifecycle
/// filtering and metadata stay correct).
fn projected_columns(&self, projection: ReadProjection) -> Vec<String> {
self.dataset
.schema()
.fields
.iter()
.map(|field| field.name.clone())
.filter(|name| {
(projection.text || name != "text_payload")
&& (projection.binary || name != "binary_payload")
&& (projection.embedding || name != "embedding")
})
.collect()
}

/// An LSM scanner that only reads the columns required by `projection`.
async fn lsm_scanner_projected(&self, projection: ReadProjection) -> LanceResult<LsmScanner> {
let scanner = self.lsm_scanner().await?;
if projection.loads_all() {
return Ok(scanner);
}
let columns = self.projected_columns(projection);
let refs: Vec<&str> = columns.iter().map(String::as_str).collect();
Ok(scanner.project(&refs))
}

/// Fetch a single record's `binary_payload` on demand, without loading it
/// during `list`/`search`. Returns `None` if the record or its binary
/// payload is absent.
pub async fn get_blob(&self, id: &str) -> LanceResult<Option<Vec<u8>>> {
let filter = format!("id IN ({})", sql_quoted_list(&[id]));
let scanner = self
.lsm_scanner()
.await?
.project(&["id", "binary_payload"])
.filter(&filter)?;
let mut stream = scanner.try_into_stream().await?;
while let Some(batch) = stream.try_next().await? {
let id_array = column_as::<StringArray>(&batch, "id")?;
let binary_array = column_as_optional::<LargeBinaryArray>(&batch, "binary_payload");
for row in 0..batch.num_rows() {
if id_array.value(row) == id {
return Ok(match binary_array {
Some(arr) if !arr.is_null(row) => Some(arr.value(row).to_vec()),
_ => None,
});
}
}
}
Ok(None)
}

/// Manually trigger compaction to merge small fragments.
pub async fn compact(
&mut self,
Expand Down Expand Up @@ -2222,21 +2366,22 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult<Vec<ContextRecord>> {
let supersedes_id_array = column_as_optional::<StringArray>(batch, "supersedes_id");
let superseded_by_id_array = column_as_optional::<StringArray>(batch, "superseded_by_id");
let content_type_array = column_as::<StringArray>(batch, "content_type")?;
let binary_array = column_as::<LargeBinaryArray>(batch, "binary_payload")?;
let embedding_array = column_as::<FixedSizeListArray>(batch, "embedding")?;
let binary_array = column_as_optional::<LargeBinaryArray>(batch, "binary_payload");
let embedding_array = column_as_optional::<FixedSizeListArray>(batch, "embedding");

// Auto-detect whether text_payload is LargeBinary (blob) or LargeUtf8 (default)
// `text_payload` may be projected out, or stored as LargeBinary (blob) or LargeUtf8.
let has_text = batch.schema().field_with_name("text_payload").is_ok();
let text_is_binary = batch
.schema()
.field_with_name("text_payload")
.is_ok_and(|f| f.data_type() == &DataType::LargeBinary);

let text_string_array = if !text_is_binary {
let text_string_array = if has_text && !text_is_binary {
Some(column_as::<LargeStringArray>(batch, "text_payload")?)
} else {
None
};
let text_binary_array = if text_is_binary {
let text_binary_array = if has_text && text_is_binary {
Some(column_as::<LargeBinaryArray>(batch, "text_payload")?)
} else {
None
Expand Down Expand Up @@ -2314,32 +2459,30 @@ fn batch_to_records(batch: &RecordBatch) -> LanceResult<Vec<ContextRecord>> {
})
};

let text_payload = if text_is_binary {
let arr = text_binary_array.unwrap();
let text_payload = if let Some(arr) = text_binary_array {
if arr.is_null(row) {
None
} else {
Some(String::from_utf8_lossy(arr.value(row)).to_string())
}
} else {
let arr = text_string_array.unwrap();
} else if let Some(arr) = text_string_array {
if arr.is_null(row) {
None
} else {
Some(arr.value(row).to_string())
}
} else {
None
};

let binary_payload = if binary_array.is_null(row) {
None
} else {
Some(binary_array.value(row).to_vec())
let binary_payload = match binary_array {
Some(arr) if !arr.is_null(row) => Some(arr.value(row).to_vec()),
_ => None,
};

let embedding = if embedding_array.is_null(row) {
None
} else {
Some(embedding_from_list(embedding_array, row)?)
let embedding = match embedding_array {
Some(arr) if !arr.is_null(row) => Some(embedding_from_list(arr, row)?),
_ => None,
};

let role = if role_array.is_null(row) {
Expand Down Expand Up @@ -4779,4 +4922,93 @@ mod tests {
assert_eq!(v1, v2, "ensure_id_index should not recreate existing index");
});
}

#[test]
fn projection_excludes_binary_but_keeps_metadata() {
let dir = TempDir::new().unwrap();
let uri = dir.path().to_string_lossy().to_string();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let mut store = ContextStore::open(&uri).await.unwrap();
let mut record = text_record("img", 0.0);
record.content_type = "image/png".to_string();
record.binary_payload = Some(vec![1, 2, 3, 4]);
store.add(std::slice::from_ref(&record)).await.unwrap();

// Default read still includes the bytes.
let full = store.list(None, None).await.unwrap();
assert_eq!(full[0].binary_payload.as_deref(), Some(&[1, 2, 3, 4][..]));
assert!(full[0].embedding.is_some());

// Projected read drops binary, keeps metadata + embedding.
let projected = store
.list_filtered_projected(
None,
None,
None,
LifecycleQueryOptions::default(),
ReadProjection::without_binary(),
)
.await
.unwrap();
assert_eq!(projected.len(), 1);
assert!(projected[0].binary_payload.is_none());
assert_eq!(projected[0].id, "img");
assert_eq!(projected[0].content_type, "image/png");
assert!(projected[0].embedding.is_some());

// metadata_only drops embedding too.
let meta = store
.list_filtered_projected(
None,
None,
None,
LifecycleQueryOptions::default(),
ReadProjection::metadata_only(),
)
.await
.unwrap();
assert!(meta[0].binary_payload.is_none());
assert!(meta[0].embedding.is_none());
assert_eq!(meta[0].id, "img");

// get_blob fetches the bytes on demand.
let blob = store.get_blob("img").await.unwrap();
assert_eq!(blob.as_deref(), Some(&[1, 2, 3, 4][..]));
assert!(store.get_blob("missing").await.unwrap().is_none());
});
}

#[test]
fn search_projection_excludes_binary_keeps_ranking() {
let dir = TempDir::new().unwrap();
let uri = dir.path().to_string_lossy().to_string();
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(async {
let mut store = ContextStore::open(&uri).await.unwrap();
let mut a = text_record("a", 0.0);
a.binary_payload = Some(vec![9, 9, 9]);
let mut b = text_record("b", 1.0);
b.binary_payload = Some(vec![8, 8, 8]);
store.add(&[a, b]).await.unwrap();

let query = make_embedding(0.0);
let results = store
.search_filtered_projected(
&query,
Some(5),
None,
LifecycleQueryOptions::default(),
ReadProjection::without_binary(),
)
.await
.unwrap();

assert_eq!(results.len(), 2);
assert_eq!(results[0].record.id, "a"); // closest to query pivot 0.0
assert!(results.iter().all(|r| r.record.binary_payload.is_none()));
// embedding kept by default projection (without_binary keeps embedding)
assert!(results[0].record.embedding.is_some());
});
}
}
Loading
Loading