diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 94c8fd510a382..22f4e54846e9b 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -18,7 +18,7 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::datasource::file_format::file_type_to_format; @@ -84,7 +84,7 @@ use datafusion_expr::expr::{ }; use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; -use datafusion_expr::utils::split_conjunction; +use datafusion_expr::utils::{expr_to_columns, split_conjunction}; use datafusion_expr::{ Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan, @@ -613,7 +613,7 @@ impl DefaultPhysicalPlanner { if let Some(provider) = target.as_any().downcast_ref::() { - let filters = extract_dml_filters(input)?; + let filters = extract_dml_filters(input, table_name)?; provider .table_provider .delete_from(session_state, filters) @@ -639,7 +639,7 @@ impl DefaultPhysicalPlanner { { // For UPDATE, the assignments are encoded in the projection of input // We pass the filters and let the provider handle the projection - let filters = extract_dml_filters(input)?; + let filters = extract_dml_filters(input, table_name)?; // Extract assignments from the projection in input plan let assignments = extract_update_assignments(input)?; provider @@ -1936,24 +1936,149 @@ fn get_physical_expr_pair( } /// Extract filter predicates from a DML input plan (DELETE/UPDATE). -/// Walks the logical plan tree and collects Filter predicates, -/// splitting AND conjunctions into individual expressions. -/// Column qualifiers are stripped so expressions can be evaluated against -/// the TableProvider's schema. /// -fn extract_dml_filters(input: &Arc) -> Result> { +/// Walks the logical plan tree and collects Filter predicates and any filters +/// pushed down into TableScan nodes, splitting AND conjunctions into individual expressions. +/// +/// For UPDATE...FROM queries involving multiple tables, this function only extracts predicates +/// that reference the target table. Filters from source table scans are excluded to prevent +/// incorrect filter semantics. +/// +/// Column qualifiers are stripped so expressions can be evaluated against the TableProvider's +/// schema. Deduplication is performed because filters may appear in both Filter nodes and +/// TableScan.filters when the optimizer performs partial (Inexact) filter pushdown. +/// +/// # Parameters +/// - `input`: The logical plan tree to extract filters from (typically a DELETE or UPDATE plan) +/// - `target`: The target table reference to scope filter extraction (prevents multi-table filter leakage) +/// +/// # Returns +/// A vector of unqualified filter expressions that can be passed to the TableProvider for execution. +/// Returns an empty vector if no applicable filters are found. +/// +fn extract_dml_filters( + input: &Arc, + target: &TableReference, +) -> Result> { let mut filters = Vec::new(); + let mut allowed_refs = vec![target.clone()]; + // First pass: collect any alias references to the target table input.apply(|node| { - if let LogicalPlan::Filter(filter) = node { - // Split AND predicates into individual expressions - filters.extend(split_conjunction(&filter.predicate).into_iter().cloned()); + if let LogicalPlan::SubqueryAlias(alias) = node + // Check if this alias points to the target table + && let LogicalPlan::TableScan(scan) = alias.input.as_ref() + && scan.table_name.resolved_eq(target) + { + allowed_refs.push(TableReference::bare(alias.alias.to_string())); } Ok(TreeNodeRecursion::Continue) })?; - // Strip table qualifiers from column references - filters.into_iter().map(strip_column_qualifiers).collect() + input.apply(|node| { + match node { + LogicalPlan::Filter(filter) => { + // Split AND predicates into individual expressions + for predicate in split_conjunction(&filter.predicate) { + if predicate_is_on_target_multi(predicate, &allowed_refs)? { + filters.push(predicate.clone()); + } + } + } + LogicalPlan::TableScan(TableScan { + table_name, + filters: scan_filters, + .. + }) => { + // Only extract filters from the target table scan. + // This prevents incorrect filter extraction in UPDATE...FROM scenarios + // where multiple table scans may have filters. + if table_name.resolved_eq(target) { + for filter in scan_filters { + filters.extend(split_conjunction(filter).into_iter().cloned()); + } + } + } + // Plans without filter information + LogicalPlan::EmptyRelation(_) + | LogicalPlan::Values(_) + | LogicalPlan::DescribeTable(_) + | LogicalPlan::Explain(_) + | LogicalPlan::Analyze(_) + | LogicalPlan::Distinct(_) + | LogicalPlan::Extension(_) + | LogicalPlan::Statement(_) + | LogicalPlan::Dml(_) + | LogicalPlan::Ddl(_) + | LogicalPlan::Copy(_) + | LogicalPlan::Unnest(_) + | LogicalPlan::RecursiveQuery(_) => { + // No filters to extract from leaf/meta plans + } + // Plans with inputs (may contain filters in children) + LogicalPlan::Projection(_) + | LogicalPlan::SubqueryAlias(_) + | LogicalPlan::Limit(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Union(_) + | LogicalPlan::Join(_) + | LogicalPlan::Repartition(_) + | LogicalPlan::Aggregate(_) + | LogicalPlan::Window(_) + | LogicalPlan::Subquery(_) => { + // Filter information may appear in child nodes; continue traversal + // to extract filters from Filter/TableScan nodes deeper in the plan + } + } + Ok(TreeNodeRecursion::Continue) + })?; + + // Strip qualifiers and deduplicate. This ensures: + // 1. Only target-table predicates are retained from Filter nodes + // 2. Qualifiers stripped for TableProvider compatibility + // 3. Duplicates removed (from Filter nodes + TableScan.filters) + // + // Deduplication is necessary because filters may appear in both Filter nodes + // and TableScan.filters when the optimizer performs partial (Inexact) pushdown. + let mut seen_filters = HashSet::new(); + filters + .into_iter() + .try_fold(Vec::new(), |mut deduped, filter| { + let unqualified = strip_column_qualifiers(filter).map_err(|e| { + e.context(format!( + "Failed to strip column qualifiers for DML filter on table '{target}'" + )) + })?; + if seen_filters.insert(unqualified.clone()) { + deduped.push(unqualified); + } + Ok(deduped) + }) +} + +/// Determine whether a predicate references only columns from the target table +/// or its aliases. +/// +/// Columns may be qualified with the target table name or any of its aliases. +/// Unqualified columns are also accepted as they implicitly belong to the target table. +fn predicate_is_on_target_multi( + expr: &Expr, + allowed_refs: &[TableReference], +) -> Result { + let mut columns = HashSet::new(); + expr_to_columns(expr, &mut columns)?; + + // Short-circuit on first mismatch: returns false if any column references a table not in allowed_refs. + // Columns are accepted if: + // 1. They are unqualified (no relation specified), OR + // 2. Their relation matches one of the allowed table references using resolved equality + Ok(!columns.iter().any(|column| { + column.relation.as_ref().is_some_and(|relation| { + !allowed_refs + .iter() + .any(|allowed| relation.resolved_eq(allowed)) + }) + })) } /// Strip table qualifiers from column references in an expression. diff --git a/datafusion/core/tests/custom_sources_cases/dml_planning.rs b/datafusion/core/tests/custom_sources_cases/dml_planning.rs index a4033e445c213..8ba2980bd339d 100644 --- a/datafusion/core/tests/custom_sources_cases/dml_planning.rs +++ b/datafusion/core/tests/custom_sources_cases/dml_planning.rs @@ -25,9 +25,12 @@ use async_trait::async_trait; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionConfig, SessionContext}; -use datafusion::logical_expr::Expr; +use datafusion::logical_expr::{ + Expr, LogicalPlan, TableProviderFilterPushDown, TableScan, +}; use datafusion_catalog::Session; use datafusion_common::ScalarValue; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_physical_plan::ExecutionPlan; use datafusion_physical_plan::empty::EmptyExec; @@ -35,6 +38,8 @@ use datafusion_physical_plan::empty::EmptyExec; struct CaptureDeleteProvider { schema: SchemaRef, received_filters: Arc>>>, + filter_pushdown: TableProviderFilterPushDown, + per_filter_pushdown: Option>, } impl CaptureDeleteProvider { @@ -42,6 +47,32 @@ impl CaptureDeleteProvider { Self { schema, received_filters: Arc::new(Mutex::new(None)), + filter_pushdown: TableProviderFilterPushDown::Unsupported, + per_filter_pushdown: None, + } + } + + fn new_with_filter_pushdown( + schema: SchemaRef, + filter_pushdown: TableProviderFilterPushDown, + ) -> Self { + Self { + schema, + received_filters: Arc::new(Mutex::new(None)), + filter_pushdown, + per_filter_pushdown: None, + } + } + + fn new_with_per_filter_pushdown( + schema: SchemaRef, + per_filter_pushdown: Vec, + ) -> Self { + Self { + schema, + received_filters: Arc::new(Mutex::new(None)), + filter_pushdown: TableProviderFilterPushDown::Unsupported, + per_filter_pushdown: Some(per_filter_pushdown), } } @@ -92,6 +123,19 @@ impl TableProvider for CaptureDeleteProvider { Field::new("count", DataType::UInt64, false), ]))))) } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + if let Some(per_filter) = &self.per_filter_pushdown + && per_filter.len() == filters.len() + { + return Ok(per_filter.clone()); + } + + Ok(vec![self.filter_pushdown.clone(); filters.len()]) + } } /// A TableProvider that captures filters and assignments passed to update(). @@ -100,6 +144,8 @@ struct CaptureUpdateProvider { schema: SchemaRef, received_filters: Arc>>>, received_assignments: Arc>>>, + filter_pushdown: TableProviderFilterPushDown, + per_filter_pushdown: Option>, } impl CaptureUpdateProvider { @@ -108,6 +154,21 @@ impl CaptureUpdateProvider { schema, received_filters: Arc::new(Mutex::new(None)), received_assignments: Arc::new(Mutex::new(None)), + filter_pushdown: TableProviderFilterPushDown::Unsupported, + per_filter_pushdown: None, + } + } + + fn new_with_filter_pushdown( + schema: SchemaRef, + filter_pushdown: TableProviderFilterPushDown, + ) -> Self { + Self { + schema, + received_filters: Arc::new(Mutex::new(None)), + received_assignments: Arc::new(Mutex::new(None)), + filter_pushdown, + per_filter_pushdown: None, } } @@ -164,6 +225,19 @@ impl TableProvider for CaptureUpdateProvider { Field::new("count", DataType::UInt64, false), ]))))) } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result> { + if let Some(per_filter) = &self.per_filter_pushdown + && per_filter.len() == filters.len() + { + return Ok(per_filter.clone()); + } + + Ok(vec![self.filter_pushdown.clone(); filters.len()]) + } } /// A TableProvider that captures whether truncate() was called. @@ -307,6 +381,168 @@ async fn test_delete_complex_expr() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_delete_filter_pushdown_extracts_table_scan_filters() -> Result<()> { + let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + let df = ctx.sql("DELETE FROM t WHERE id = 1").await?; + let optimized_plan = df.clone().into_optimized_plan()?; + + let mut scan_filters = Vec::new(); + optimized_plan.apply(|node| { + if let LogicalPlan::TableScan(TableScan { filters, .. }) = node { + scan_filters.extend(filters.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + + assert_eq!(scan_filters.len(), 1); + assert!(scan_filters[0].to_string().contains("id")); + + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!(filters.len(), 1); + assert!(filters[0].to_string().contains("id")); + Ok(()) +} + +#[tokio::test] +async fn test_delete_compound_filters_with_pushdown() -> Result<()> { + let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + ctx.sql("DELETE FROM t WHERE id = 1 AND status = 'active'") + .await? + .collect() + .await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + // Should receive both filters, not deduplicate valid separate predicates + assert_eq!( + filters.len(), + 2, + "compound filters should not be over-suppressed" + ); + + let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + assert!( + filter_strs.iter().any(|s| s.contains("id")), + "should contain id filter" + ); + assert!( + filter_strs.iter().any(|s| s.contains("status")), + "should contain status filter" + ); + Ok(()) +} + +#[tokio::test] +async fn test_delete_mixed_filter_locations() -> Result<()> { + // Test mixed-location filters: some in Filter node, some in TableScan.filters + // This happens when provider uses TableProviderFilterPushDown::Inexact, + // meaning it can push down some predicates but not others. + let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Inexact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + // Execute DELETE with compound WHERE clause + ctx.sql("DELETE FROM t WHERE id = 1 AND status = 'active'") + .await? + .collect() + .await?; + + // Verify that both predicates are extracted and passed to delete_from(), + // even though they may be split between Filter node and TableScan.filters + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!( + filters.len(), + 2, + "should extract both predicates (union of Filter and TableScan.filters)" + ); + + let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + assert!( + filter_strs.iter().any(|s| s.contains("id")), + "should contain id filter" + ); + assert!( + filter_strs.iter().any(|s| s.contains("status")), + "should contain status filter" + ); + Ok(()) +} + +#[tokio::test] +async fn test_delete_per_filter_pushdown_mixed_locations() -> Result<()> { + // Force per-filter pushdown decisions to exercise mixed locations in one query. + // First predicate is pushed down (Exact), second stays as residual (Unsupported). + let provider = Arc::new(CaptureDeleteProvider::new_with_per_filter_pushdown( + test_schema(), + vec![ + TableProviderFilterPushDown::Exact, + TableProviderFilterPushDown::Unsupported, + ], + )); + + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + let df = ctx + .sql("DELETE FROM t WHERE id = 1 AND status = 'active'") + .await?; + let optimized_plan = df.clone().into_optimized_plan()?; + + // Only the first predicate should be pushed to TableScan.filters. + let mut scan_filters = Vec::new(); + optimized_plan.apply(|node| { + if let LogicalPlan::TableScan(TableScan { filters, .. }) = node { + scan_filters.extend(filters.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + assert_eq!(scan_filters.len(), 1); + assert!(scan_filters[0].to_string().contains("id")); + + // Both predicates should still reach the provider (union + dedup behavior). + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!(filters.len(), 2); + + let filter_strs: Vec = filters.iter().map(|f| f.to_string()).collect(); + assert!( + filter_strs.iter().any(|s| s.contains("id")), + "should contain pushed-down id filter" + ); + assert!( + filter_strs.iter().any(|s| s.contains("status")), + "should contain residual status filter" + ); + + Ok(()) +} + #[tokio::test] async fn test_update_assignments() -> Result<()> { let provider = Arc::new(CaptureUpdateProvider::new(test_schema())); @@ -330,6 +566,80 @@ async fn test_update_assignments() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_update_filter_pushdown_extracts_table_scan_filters() -> Result<()> { + let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + let df = ctx.sql("UPDATE t SET value = 100 WHERE id = 1").await?; + let optimized_plan = df.clone().into_optimized_plan()?; + + // Verify that the optimizer pushed down the filter into TableScan + let mut scan_filters = Vec::new(); + optimized_plan.apply(|node| { + if let LogicalPlan::TableScan(TableScan { filters, .. }) = node { + scan_filters.extend(filters.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + + assert_eq!(scan_filters.len(), 1); + assert!(scan_filters[0].to_string().contains("id")); + + // Execute the UPDATE and verify filters were extracted and passed to update() + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!(filters.len(), 1); + assert!(filters[0].to_string().contains("id")); + Ok(()) +} + +#[tokio::test] +async fn test_update_filter_pushdown_passes_table_scan_filters() -> Result<()> { + let provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + let df = ctx + .sql("UPDATE t SET value = 42 WHERE status = 'ready'") + .await?; + let optimized_plan = df.clone().into_optimized_plan()?; + + let mut scan_filters = Vec::new(); + optimized_plan.apply(|node| { + if let LogicalPlan::TableScan(TableScan { filters, .. }) = node { + scan_filters.extend(filters.clone()); + } + Ok(TreeNodeRecursion::Continue) + })?; + + assert!( + !scan_filters.is_empty(), + "expected filter pushdown to populate TableScan filters" + ); + + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert!( + !filters.is_empty(), + "expected filters extracted from TableScan during UPDATE" + ); + Ok(()) +} + #[tokio::test] async fn test_truncate_calls_provider() -> Result<()> { let provider = Arc::new(CaptureTruncateProvider::new(test_schema())); @@ -379,6 +689,120 @@ async fn test_unsupported_table_update() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_delete_target_table_scoping() -> Result<()> { + // Test that DELETE only extracts filters from the target table, + // not from other tables (important for DELETE...FROM safety) + let target_provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table( + "target_t", + Arc::clone(&target_provider) as Arc, + )?; + + // For now, we test single-table DELETE + // and validate that the scoping logic is correct + let df = ctx.sql("DELETE FROM target_t WHERE id > 5").await?; + df.collect().await?; + + let filters = target_provider + .captured_filters() + .expect("filters should be captured"); + assert_eq!(filters.len(), 1); + assert!( + filters[0].to_string().contains("id"), + "Filter should be for id column" + ); + assert!( + filters[0].to_string().contains("5"), + "Filter should contain the value 5" + ); + Ok(()) +} + +#[tokio::test] +async fn test_update_from_drops_non_target_predicates() -> Result<()> { + // UPDATE ... FROM is currently not working + // TODO fix https://github.com/apache/datafusion/issues/19950 + let target_provider = Arc::new(CaptureUpdateProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t1", Arc::clone(&target_provider) as Arc)?; + + let source_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("status", DataType::Utf8, true), + // t2-only column to avoid false negatives after qualifier stripping + Field::new("src_only", DataType::Utf8, true), + ])); + let source_table = datafusion::datasource::empty::EmptyTable::new(source_schema); + ctx.register_table("t2", Arc::new(source_table))?; + + let result = ctx + .sql( + "UPDATE t1 SET value = 1 FROM t2 \ + WHERE t1.id = t2.id AND t2.src_only = 'active' AND t1.value > 10", + ) + .await; + + // Verify UPDATE ... FROM is rejected with appropriate error + // TODO fix https://github.com/apache/datafusion/issues/19950 + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("UPDATE ... FROM is not supported"), + "Expected 'UPDATE ... FROM is not supported' error, got: {err}" + ); + Ok(()) +} + +#[tokio::test] +async fn test_delete_qualifier_stripping_and_validation() -> Result<()> { + // Test that filter qualifiers are properly stripped and validated + // Unqualified predicates should work fine + let provider = Arc::new(CaptureDeleteProvider::new_with_filter_pushdown( + test_schema(), + TableProviderFilterPushDown::Exact, + )); + let ctx = SessionContext::new(); + ctx.register_table("t", Arc::clone(&provider) as Arc)?; + + // Execute DELETE with unqualified column reference + // (After parsing, the planner adds qualifiers, but our validation should accept them) + let df = ctx.sql("DELETE FROM t WHERE id = 1").await?; + df.collect().await?; + + let filters = provider + .captured_filters() + .expect("filters should be captured"); + assert!(!filters.is_empty(), "Should have extracted filter"); + + // Verify qualifiers are stripped: check that Column expressions have no qualifier + let has_qualified_column = filters[0] + .exists(|expr| Ok(matches!(expr, Expr::Column(col) if col.relation.is_some())))?; + assert!( + !has_qualified_column, + "Filter should have unqualified columns after stripping" + ); + + // Also verify the string representation doesn't contain table qualifiers + let filter_str = filters[0].to_string(); + assert!( + !filter_str.contains("t.id"), + "Filter should not contain qualified column reference, got: {filter_str}" + ); + assert!( + filter_str.contains("id") || filter_str.contains("1"), + "Filter should reference id column or the value 1, got: {filter_str}" + ); + Ok(()) +} + #[tokio::test] async fn test_unsupported_table_truncate() -> Result<()> { let schema = test_schema(); diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 4981db5537a74..b086e89f3e9ac 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -1067,9 +1067,18 @@ impl SqlToRel<'_, S> { }); // TODO: support multiple tables in UPDATE SET FROM if from_clauses.as_ref().is_some_and(|f| f.len() > 1) { - plan_err!("Multiple tables in UPDATE SET FROM not yet supported")?; + not_impl_err!( + "Multiple tables in UPDATE SET FROM not yet supported" + )?; } let update_from = from_clauses.and_then(|mut f| f.pop()); + + // UPDATE ... FROM is currently not working + // TODO fix https://github.com/apache/datafusion/issues/19950 + if update_from.is_some() { + return not_impl_err!("UPDATE ... FROM is not supported"); + } + if returning.is_some() { plan_err!("Update-returning clause not yet supported")?; } diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index a652ae7633e44..1cd2b626e3b8e 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -67,39 +67,48 @@ logical_plan physical_plan_error This feature is not implemented: Physical plan does not support logical expression ScalarSubquery() # set from other table -query TT +# UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 +query error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; ----- -logical_plan -01)Dml: op=[Update] table=[t1] -02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d -03)----Filter: t1.a = t2.a AND t1.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) -04)------Cross Join: -05)--------TableScan: t1 -06)--------TableScan: t2 -physical_plan -01)CooperativeExec -02)--DmlResultExec: rows_affected=0 +# test update from other table with actual data statement ok -create table t3(a int, b varchar, c double, d int); +insert into t1 values (1, 'zoo', 2.0, 10), (2, 'qux', 3.0, 20), (3, 'bar', 4.0, 30); + +statement ok +insert into t2 values (1, 'updated_b', 5.0, 40), (2, 'updated_b2', 2.5, 50), (4, 'updated_b3', 1.5, 60); + +# UPDATE ... FROM is currently unsupported - qualifier stripping breaks source column references +# causing assignments like 'b = t2.b' to resolve to target table's 'b' instead of source table's 'b' +# TODO fix https://github.com/apache/datafusion/issues/19950 +statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0; # set from multiple tables, DataFusion only supports from one table -query error DataFusion error: Error during planning: Multiple tables in UPDATE SET FROM not yet supported +statement error DataFusion error: This feature is not implemented: Multiple tables in UPDATE SET FROM not yet supported explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a; # test table alias -query TT +# UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 +statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0; ----- -logical_plan -01)Dml: op=[Update] table=[t1] -02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d -03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1) -04)------Cross Join: -05)--------SubqueryAlias: t -06)----------TableScan: t1 -07)--------TableScan: t2 -physical_plan -01)CooperativeExec -02)--DmlResultExec: rows_affected=0 + +# test update with table alias with actual data +statement ok +delete from t1; + +statement ok +delete from t2; + +statement ok +insert into t1 values (1, 'zebra', 1.5, 5), (2, 'wolf', 2.0, 10), (3, 'apple', 3.5, 15); + +statement ok +insert into t2 values (1, 'new_val', 2.0, 100), (2, 'new_val2', 1.5, 200); + +# UPDATE ... FROM is currently unsupported +# TODO fix https://github.com/apache/datafusion/issues/19950 +statement error DataFusion error: This feature is not implemented: UPDATE ... FROM is not supported +update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0;