Skip to content
60 changes: 58 additions & 2 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use std::sync::Arc;
use crate::parquet::Unit::Page;
use crate::parquet::{ContextWithParquet, Scenario};

use arrow::array::RecordBatch;
use arrow::array::{Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::PartitionedFile;
Expand All @@ -30,7 +31,7 @@ use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::metrics::MetricValue;
use datafusion::prelude::SessionContext;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, col, lit};
Expand All @@ -40,6 +41,8 @@ use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use futures::StreamExt;
use object_store::ObjectMeta;
use object_store::path::Path;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;

async fn get_parquet_exec(
state: &SessionState,
Expand Down Expand Up @@ -961,3 +964,56 @@ fn cast_count_metric(metric: MetricValue) -> Option<usize> {
_ => None,
}
}

#[tokio::test]
async fn test_parquet_opener_without_page_index() {
// Defines a simple schema and batch
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
)
.unwrap();

// Create a temp file
let file = tempfile::Builder::new()
.suffix(".parquet")
.tempfile()
.unwrap();
let path = file.path().to_str().unwrap().to_string();

// Write parquet WITHOUT page index
// The default WriterProperties does not write page index, but we set it explicitly
// to be robust against future changes in defaults as requested by reviewers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 -- I like the comments

let props = WriterProperties::builder()
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::None)
.build();

let file_fs = std::fs::File::create(&path).unwrap();
let mut writer = ArrowWriter::try_new(file_fs, batch.schema(), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

// Setup SessionContext with PageIndex enabled
// This triggers the ParquetOpener to try and load page index if available
let config = SessionConfig::new().with_parquet_page_index_pruning(true);

let ctx = SessionContext::new_with_config(config);

// Register the table
ctx.register_parquet("t", &path, Default::default())
.await
.unwrap();

// Query the table
// If the bug exists, this might fail because Opener tries to load PageIndex forcefully
let df = ctx.sql("SELECT * FROM t").await.unwrap();
let batches = df
.collect()
.await
.expect("Failed to read parquet file without page index");

// We expect this to succeed, but currently it might fail
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
}
4 changes: 2 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl FileOpener for ParquetOpener {
// Don't load the page index yet. Since it is not stored inline in
// the footer, loading the page index if it is not needed will do
// unnecessary I/O. We decide later if it is needed to evaluate the
// pruning predicates. Thus default to not requesting if from the
// pruning predicates. Thus default to not requesting it from the
// underlying reader.
let mut options = ArrowReaderOptions::new().with_page_index(false);
#[cfg(feature = "parquet_encryption")]
Expand Down Expand Up @@ -436,7 +436,7 @@ impl FileOpener for ParquetOpener {
reader_metadata,
&mut async_file_reader,
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
options.with_page_index(true),
options.with_page_index_policy(PageIndexPolicy::Optional),
)
.await?;
}
Expand Down