diff --git a/.github/workflows/bench-pr.yml b/.github/workflows/bench-pr.yml index 1e4c062f5a2..9d40a527be5 100644 --- a/.github/workflows/bench-pr.yml +++ b/.github/workflows/bench-pr.yml @@ -46,8 +46,19 @@ jobs: - id: random-access-bench name: Random Access build_args: "--features lance" + run_args: "--formats parquet,lance,vortex" - id: compress-bench name: Compression + build_args: "" + run_args: "--formats parquet,lance,vortex" + - id: compress-memory-bench + name: Compression Memory + build_args: "--features dhat,lance" + run_args: "--formats parquet,vortex" + - id: tpch-datafusion-memory-bench + name: TPC-H DataFusion Memory + build_args: "--features dhat" + run_args: "" if: ${{ contains(github.event.head_commit.message, '[benchmark]') || github.event.label.name == 'action/benchmark' && github.event_name == 'pull_request' }} steps: - uses: runs-on/action@v2 @@ -93,7 +104,7 @@ jobs: env: RUST_BACKTRACE: full run: | - target/release_debug/${{ matrix.benchmark.id }} -d gh-json -o results.json + target/release_debug/${{ matrix.benchmark.id }} ${{ matrix.benchmark.run_args }} -d gh-json -o results.json - name: Setup AWS CLI if: github.event.pull_request.head.repo.fork == false diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 852baa26cda..1123b4d30a6 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -43,11 +43,19 @@ jobs: - id: random-access-bench name: Random Access build_args: "--features lance" - formats: "parquet,lance,vortex" + run_args: "--formats parquet,lance,vortex" - id: compress-bench name: Compression build_args: "--features lance" - formats: "parquet,lance,vortex" + run_args: "--formats parquet,lance,vortex" + - id: compress-memory-bench + name: Compression Memory + build_args: "--features dhat,lance" + run_args: "--formats parquet,vortex" + - id: tpch-datafusion-memory-bench + name: TPC-H DataFusion Memory + build_args: "--features dhat" + run_args: "" steps: - uses: runs-on/action@v2 if: github.repository == 'vortex-data/vortex' @@ -89,7 +97,7 @@ jobs: env: RUST_BACKTRACE: full run: | - target/release_debug/${{ matrix.benchmark.id }} --formats ${{ matrix.benchmark.formats }} -d gh-json -o results.json + target/release_debug/${{ matrix.benchmark.id }} ${{ matrix.benchmark.run_args }} -d gh-json -o results.json - name: Setup AWS CLI uses: aws-actions/configure-aws-credentials@v5 diff --git a/Cargo.lock b/Cargo.lock index d380a3d6229..810a228c216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" +dependencies = [ + "gimli", +] + [[package]] name = "adler2" version = "2.0.1" @@ -657,6 +666,21 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backtrace" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-link 0.2.1", +] + [[package]] name = "base64" version = "0.22.1" @@ -718,13 +742,13 @@ dependencies = [ "bitflags", "cexpr", "clang-sys", - "itertools 0.11.0", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 2.1.1", "shlex", "syn 2.0.117", ] @@ -1361,6 +1385,20 @@ dependencies = [ "vortex-bench", ] +[[package]] +name = "compress-memory-bench" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "compress-bench", + "indicatif", + "regex", + "tokio", + "tracing", + "vortex-bench", +] + [[package]] name = "compression-codecs" version = "0.4.37" @@ -3247,6 +3285,22 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "dhat" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cd11d84628e233de0ce467de10b8633f4ddaecafadefc86e13b84b8739b827" +dependencies = [ + "backtrace", + "lazy_static", + "mintex", + "parking_lot", + "rustc-hash 1.1.0", + "serde", + "serde_json", + "thousands", +] + [[package]] name = "digest" version = "0.10.7" @@ -4028,6 +4082,12 @@ dependencies = [ "wasip3", ] +[[package]] +name = "gimli" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" + [[package]] name = "glob" version = "0.3.3" @@ -5801,6 +5861,12 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "mintex" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c505b3e17ed6b70a7ed2e67fbb2c560ee327353556120d6e72f5232b6880d536" + [[package]] name = "mio" version = "1.1.1" @@ -6849,7 +6915,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools 0.11.0", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -6881,7 +6947,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -7087,7 +7153,7 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 2.1.1", "rustls", "socket2 0.6.2", "thiserror 2.0.18", @@ -7107,7 +7173,7 @@ dependencies = [ "lru-slab", "rand 0.9.2", "ring", - "rustc-hash", + "rustc-hash 2.1.1", "rustls", "rustls-pki-types", "slab", @@ -7712,6 +7778,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rustc-demangle" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d" + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -8698,7 +8776,7 @@ dependencies = [ "rayon", "regex", "rust-stemmers", - "rustc-hash", + "rustc-hash 2.1.1", "serde", "serde_json", "sketches-ddsketch", @@ -8957,6 +9035,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "thousands" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" + [[package]] name = "thread-id" version = "4.2.2" @@ -9278,6 +9362,20 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" +[[package]] +name = "tpch-datafusion-memory-bench" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "datafusion 52.2.0", + "datafusion-bench", + "indicatif", + "tokio", + "tracing", + "vortex-bench", +] + [[package]] name = "tpchgen" version = "2.0.2" @@ -9668,7 +9766,7 @@ dependencies = [ "prost 0.14.3", "rand 0.9.2", "rstest", - "rustc-hash", + "rustc-hash 2.1.1", "vortex-array", "vortex-buffer", "vortex-error", @@ -9721,7 +9819,7 @@ dependencies = [ "rand_distr 0.5.1", "rstest", "rstest_reuse", - "rustc-hash", + "rustc-hash 2.1.1", "serde", "serde_json", "serde_test", @@ -9753,6 +9851,7 @@ dependencies = [ "bytes", "bzip2", "clap", + "dhat", "futures", "get_dir", "glob", @@ -9797,7 +9896,7 @@ dependencies = [ "pco", "rand 0.9.2", "rstest", - "rustc-hash", + "rustc-hash 2.1.1", "test-with", "tracing", "vortex-alp", @@ -10246,7 +10345,7 @@ dependencies = [ "pin-project-lite", "prost 0.14.3", "rstest", - "rustc-hash", + "rustc-hash 2.1.1", "termtree", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 0da5ee805ba..8e75afa5256 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,9 +51,11 @@ members = [ # Benchmarks "benchmarks/lance-bench", "benchmarks/compress-bench", + "benchmarks/compress-memory-bench", "benchmarks/datafusion-bench", "benchmarks/duckdb-bench", "benchmarks/random-access-bench", + "benchmarks/tpch-datafusion-memory-bench", "vortex-sqllogictest", ] exclude = ["java/testfiles", "wasm-test"] @@ -136,6 +138,7 @@ datafusion-physical-expr-common = { version = "52" } datafusion-physical-plan = { version = "52" } datafusion-pruning = { version = "52" } datafusion-sqllogictest = { version = "52" } +dhat = "0.3.3" dirs = "6.0.0" divan = { package = "codspeed-divan-compat", version = "4.0.4" } enum-iterator = "2.0.0" diff --git a/benchmarks-website/server.js b/benchmarks-website/server.js index e1320ceaef0..cefaaa0dac3 100644 --- a/benchmarks-website/server.js +++ b/benchmarks-website/server.js @@ -27,7 +27,9 @@ const USE_LOCAL_DATA = process.env.USE_LOCAL_DATA === "true"; const GROUPS = [ "Random Access", "Compression", + "Compression Memory", "Compression Size", + "TPC-H Memory", ...QUERY_SUITES.filter((s) => !s.skip && !s.fanOut).map((s) => s.displayName), ...FAN_OUT_GROUPS, ]; @@ -105,6 +107,21 @@ function getGroup(benchmark) { return "Compression"; } + if ( + lower.startsWith("compress peak memory/") || + lower.startsWith("decompress peak memory/") || + lower.startsWith("parquet_rs-zstd compress peak memory/") || + lower.startsWith("parquet_rs-zstd decompress peak memory/") || + lower.startsWith("lance compress peak memory/") || + lower.startsWith("lance decompress peak memory/") + ) { + return "Compression Memory"; + } + + if (lower.startsWith("tpch peak memory/")) { + return "TPC-H Memory"; + } + // SQL query suites: match "{prefix}_q..." or "{prefix}/..." for (const suite of QUERY_SUITES) { if ( diff --git a/benchmarks-website/src/config.js b/benchmarks-website/src/config.js index c6ce9060428..4c81e800c9b 100644 --- a/benchmarks-website/src/config.js +++ b/benchmarks-website/src/config.js @@ -123,6 +123,30 @@ const BESPOKE_CONFIGS = [ ]), renamedDatasets: { lance: "lance", Lance: "lance", LANCE: "lance" }, }, + { + name: "Compression Memory", + keptCharts: [ + "COMPRESS PEAK MEMORY", + "DECOMPRESS PEAK MEMORY", + "PARQUET RS ZSTD COMPRESS PEAK MEMORY", + "PARQUET RS ZSTD DECOMPRESS PEAK MEMORY", + "LANCE COMPRESS PEAK MEMORY", + "LANCE DECOMPRESS PEAK MEMORY", + ], + hiddenDatasets: new Set([ + "wide table cols=1000 chunks=1 rows=1000", + "wide table cols=1000 chunks=50 rows=1000", + ]), + removedDatasets: new Set([ + "TPC-H l_comment canonical", + "TPC-H l_comment chunked without fsst", + "wide table cols=10 chunks=1 rows=1000", + "wide table cols=100 chunks=1 rows=1000", + "wide table cols=10 chunks=50 rows=1000", + "wide table cols=100 chunks=50 rows=1000", + ]), + renamedDatasets: { lance: "lance", Lance: "lance", LANCE: "lance" }, + }, { name: "Compression Size", keptCharts: [ @@ -141,6 +165,10 @@ const BESPOKE_CONFIGS = [ ]), renamedDatasets: { lance: "lance", Lance: "lance", LANCE: "lance" }, }, + { + name: "TPC-H Memory", + renamedDatasets: { ...ENGINE_RENAMES }, + }, ]; function querySuiteConfig(name, suite) { @@ -180,6 +208,12 @@ export const CHART_NAME_MAP = { "PARQUET RS ZSTD DECOMPRESS TIME": "PARQUET SCAN TIME (DECOMPRESSION)", "LANCE COMPRESS TIME": "LANCE WRITE TIME (COMPRESSION)", "LANCE DECOMPRESS TIME": "LANCE SCAN TIME (DECOMPRESSION)", + "COMPRESS PEAK MEMORY": "VORTEX WRITE PEAK MEMORY", + "DECOMPRESS PEAK MEMORY": "VORTEX SCAN PEAK MEMORY", + "PARQUET RS ZSTD COMPRESS PEAK MEMORY": "PARQUET WRITE PEAK MEMORY", + "PARQUET RS ZSTD DECOMPRESS PEAK MEMORY": "PARQUET SCAN PEAK MEMORY", + "LANCE COMPRESS PEAK MEMORY": "LANCE WRITE PEAK MEMORY", + "LANCE DECOMPRESS PEAK MEMORY": "LANCE SCAN PEAK MEMORY", "VORTEX SIZE": "VORTEX SIZE", "PARQUET ZSTD SIZE": "PARQUET SIZE", "LANCE SIZE": "LANCE SIZE", @@ -198,7 +232,9 @@ export const CHART_NAME_MAP = { export const CATEGORY_TAGS = { "Random Access": ["Read/Write"], Compression: ["Read/Write"], + "Compression Memory": ["Memory"], "Compression Size": ["Read/Write"], + "TPC-H Memory": ["Memory"], }; for (const s of QUERY_SUITES) { if (!s.skip && !s.fanOut && s.tags) CATEGORY_TAGS[s.displayName] = s.tags; @@ -217,8 +253,12 @@ export const BENCHMARK_DESCRIPTIONS = { "Tests performance of selecting arbitrary row indices from a file on NVMe storage", Compression: "Measures encoding and decoding throughput (MB/s) for Vortex files and Parquet files (with zstd page compression)", + "Compression Memory": + "Measures peak heap usage during compression and decompression workloads using dhat", "Compression Size": "Compares compressed file sizes and compression ratios across different encoding strategies", + "TPC-H Memory": + "Measures peak heap usage for the full TPC-H workload on DataFusion over Parquet and Vortex files using dhat", }; for (const s of QUERY_SUITES) { if (s.description) BENCHMARK_DESCRIPTIONS[s.displayName] = s.description; diff --git a/benchmarks-website/src/utils.js b/benchmarks-website/src/utils.js index 2d6575fe814..f393010e532 100644 --- a/benchmarks-website/src/utils.js +++ b/benchmarks-website/src/utils.js @@ -95,7 +95,9 @@ export function getBenchmarkDescription(categoryName) { const descriptions = { 'Random Access': 'Tests performance of selecting arbitrary row indices from a file on NVMe storage', 'Compression': 'Measures encoding and decoding throughput (MB/s) for Vortex and Parquet files', + 'Compression Memory': 'Measures peak heap usage during compression and decompression workloads using dhat', 'Compression Size': 'Compares compressed file sizes across different encoding strategies', + 'TPC-H Memory': 'Measures peak heap usage for the full TPC-H workload on DataFusion over Parquet and Vortex files using dhat', 'Clickbench': "ClickHouse's analytical benchmark suite on web analytics data", 'Statistical and Population Genetics': 'Statistical and population genetics queries on gnomAD dataset', }; diff --git a/benchmarks/compress-memory-bench/Cargo.toml b/benchmarks/compress-memory-bench/Cargo.toml new file mode 100644 index 00000000000..63aa43955cb --- /dev/null +++ b/benchmarks/compress-memory-bench/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "compress-memory-bench" +description = "Compression memory benchmarks for Vortex and other formats" +authors.workspace = true +categories.workspace = true +edition.workspace = true +homepage.workspace = true +include.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true +publish = false + +[dependencies] +anyhow = { workspace = true } +clap = { workspace = true, features = ["derive"] } +compress-bench = { path = "../compress-bench" } +indicatif = { workspace = true } +regex = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +vortex-bench = { workspace = true } + +[features] +dhat = ["vortex-bench/dhat-heap"] +lance = ["compress-bench/lance"] + +[lints] +workspace = true diff --git a/benchmarks/compress-memory-bench/src/main.rs b/benchmarks/compress-memory-bench/src/main.rs new file mode 100644 index 00000000000..520affbc2a8 --- /dev/null +++ b/benchmarks/compress-memory-bench/src/main.rs @@ -0,0 +1,204 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#[cfg(not(feature = "dhat"))] +compile_error!("compress-memory-bench requires the `dhat` feature"); + +use std::path::Path; +use std::path::PathBuf; + +use clap::Parser; +#[cfg(feature = "lance")] +use compress_bench::LanceCompressor; +use compress_bench::parquet::ParquetCompressor; +use compress_bench::vortex::VortexCompressor; +use indicatif::ProgressBar; +use regex::Regex; +use vortex_bench::BenchmarkOutput; +use vortex_bench::Engine; +use vortex_bench::Format; +use vortex_bench::Target; +use vortex_bench::compress::CompressOp; +use vortex_bench::compress::Compressor; +use vortex_bench::datasets::Dataset; +use vortex_bench::datasets::struct_list_of_ints::StructListOfInts; +use vortex_bench::datasets::taxi_data::TaxiData; +use vortex_bench::datasets::tpch_l_comment::TPCHLCommentCanonical; +use vortex_bench::datasets::tpch_l_comment::TPCHLCommentChunked; +use vortex_bench::dhat::start_heap_profiling; +use vortex_bench::display::DisplayFormat; +use vortex_bench::display::print_measurements_json; +use vortex_bench::display::render_table; +use vortex_bench::downloadable_dataset::DownloadableDataset; +use vortex_bench::measurements::CompressionMemoryMeasurement; +use vortex_bench::public_bi::PBI_DATASETS; +use vortex_bench::public_bi::PBIDataset::Arade; +use vortex_bench::public_bi::PBIDataset::Bimbo; +use vortex_bench::public_bi::PBIDataset::CMSprovider; +use vortex_bench::public_bi::PBIDataset::Euro2016; +use vortex_bench::public_bi::PBIDataset::Food; +use vortex_bench::public_bi::PBIDataset::HashTags; +use vortex_bench::setup_logging_and_tracing; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + #[arg( + long, + value_delimiter = ',', + value_enum, + default_values_t = vec![Format::Parquet, Format::OnDiskVortex] + )] + formats: Vec, + #[arg( + long, + value_enum, + default_values_t = vec![CompressOp::Compress, CompressOp::Decompress] + )] + ops: Vec, + #[arg(long)] + datasets: Option, + #[arg(short, long)] + verbose: bool, + #[arg(short, long, default_value_t, value_enum)] + display_format: DisplayFormat, + #[arg(short, long)] + output_path: Option, + #[arg(long)] + tracing: bool, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + setup_logging_and_tracing(args.verbose, args.tracing)?; + + run_compress_memory( + args.datasets.map(|d| Regex::new(&d)).transpose()?, + args.formats, + args.ops, + args.display_format, + args.output_path, + ) + .await +} + +fn get_compressor(format: Format) -> Box { + match format { + Format::OnDiskVortex => Box::new(VortexCompressor), + Format::Parquet => Box::new(ParquetCompressor::new()), + #[cfg(feature = "lance")] + Format::Lance => Box::new(LanceCompressor), + _ => unimplemented!("Compression memory benchmark not implemented for {format}"), + } +} + +fn measurement_name(op: CompressOp, bench_name: &str) -> String { + match op { + CompressOp::Compress => format!("compress peak memory/{bench_name}"), + CompressOp::Decompress => format!("decompress peak memory/{bench_name}"), + } +} + +async fn measure_peak_memory( + compressor: &dyn Compressor, + parquet_path: &Path, + op: CompressOp, + bench_name: &str, +) -> anyhow::Result { + let profiler = start_heap_profiling()?; + match op { + CompressOp::Compress => { + let _ = compressor.compress(parquet_path).await?; + } + CompressOp::Decompress => { + let _ = compressor.decompress(parquet_path).await?; + } + } + let stats = profiler.finish(); + + Ok(CompressionMemoryMeasurement { + name: measurement_name(op, bench_name), + format: compressor.format(), + value_mib: stats.max_mib(), + }) +} + +async fn run_compress_memory( + datasets_filter: Option, + formats: Vec, + ops: Vec, + display_format: DisplayFormat, + output_path: Option, +) -> anyhow::Result<()> { + let targets = formats + .iter() + .map(|f| Target::new(Engine::default(), *f)) + .collect::>(); + + let struct_list_of_ints = [ + StructListOfInts::new(100, 1000, 1), + StructListOfInts::new(1000, 1000, 1), + StructListOfInts::new(10000, 1000, 1), + StructListOfInts::new(100, 1000, 50), + StructListOfInts::new(1000, 1000, 50), + StructListOfInts::new(10000, 1000, 50), + ]; + + let datasets: Vec<&dyn Dataset> = [ + &TaxiData as &dyn Dataset, + PBI_DATASETS.get(Arade), + PBI_DATASETS.get(Bimbo), + PBI_DATASETS.get(CMSprovider), + PBI_DATASETS.get(Euro2016), + PBI_DATASETS.get(Food), + PBI_DATASETS.get(HashTags), + &TPCHLCommentChunked, + &TPCHLCommentCanonical, + &DownloadableDataset::RPlace, + &DownloadableDataset::AirQuality, + ] + .into_iter() + .chain(struct_list_of_ints.iter().map(|d| d as &dyn Dataset)) + .filter(|d| { + if let Some(filter) = datasets_filter.as_ref() { + filter.is_match(d.name()) + } else { + d.name() != "airquality" && d.name() != "rplace" + } + }) + .collect(); + + let progress = ProgressBar::new((datasets.len() * formats.len() * ops.len()) as u64); + let mut measurements = Vec::new(); + + for dataset in datasets { + let parquet_path = dataset.to_parquet_path().await?; + let bench_name = dataset.name(); + tracing::info!("Running memory benchmark for {bench_name}"); + + for format in &formats { + let compressor = get_compressor(*format); + for op in &ops { + measurements.push( + measure_peak_memory(compressor.as_ref(), &parquet_path, *op, bench_name) + .await?, + ); + progress.inc(1); + } + } + } + + progress.finish(); + + let output = BenchmarkOutput::with_path("compress-memory", output_path); + let mut writer = output.create_writer()?; + + match display_format { + DisplayFormat::Table => render_table(&mut writer, measurements, &targets)?, + DisplayFormat::GhJson => print_measurements_json(&mut writer, measurements)?, + } + + Ok(()) +} diff --git a/benchmarks/tpch-datafusion-memory-bench/Cargo.toml b/benchmarks/tpch-datafusion-memory-bench/Cargo.toml new file mode 100644 index 00000000000..6a3dd68c4f6 --- /dev/null +++ b/benchmarks/tpch-datafusion-memory-bench/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "tpch-datafusion-memory-bench" +description = "TPC-H DataFusion/Vortex memory benchmark" +authors.workspace = true +categories.workspace = true +edition.workspace = true +homepage.workspace = true +include.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true +publish = false + +[dependencies] +anyhow = { workspace = true } +clap = { workspace = true, features = ["derive"] } +datafusion = { workspace = true } +datafusion-bench = { path = "../datafusion-bench" } +indicatif = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tracing = { workspace = true } +vortex-bench = { workspace = true } + +[features] +dhat = ["vortex-bench/dhat-heap"] + +[lints] +workspace = true diff --git a/benchmarks/tpch-datafusion-memory-bench/src/main.rs b/benchmarks/tpch-datafusion-memory-bench/src/main.rs new file mode 100644 index 00000000000..cb02988798c --- /dev/null +++ b/benchmarks/tpch-datafusion-memory-bench/src/main.rs @@ -0,0 +1,172 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#[cfg(not(feature = "dhat"))] +compile_error!("tpch-datafusion-memory-bench requires the `dhat` feature"); + +use std::path::PathBuf; +use std::sync::Arc; + +use clap::Parser; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::listing::ListingTable; +use datafusion::datasource::listing::ListingTableConfig; +use datafusion::datasource::listing::ListingTableUrl; +use datafusion::prelude::SessionContext; +use indicatif::ProgressBar; +use vortex_bench::Benchmark; +use vortex_bench::BenchmarkOutput; +use vortex_bench::CompactionStrategy; +use vortex_bench::Engine; +use vortex_bench::Format; +use vortex_bench::Target; +use vortex_bench::conversions::convert_parquet_directory_to_vortex; +use vortex_bench::dhat::start_heap_profiling; +use vortex_bench::display::DisplayFormat; +use vortex_bench::display::print_measurements_json; +use vortex_bench::display::render_table; +use vortex_bench::measurements::NamedMeasurement; +use vortex_bench::setup_logging_and_tracing; +use vortex_bench::tpch::benchmark::TpcHBenchmark; +use vortex_bench::utils::constants::STORAGE_NVME; + +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + #[arg(long, default_value = "1.0")] + scale_factor: String, + #[arg( + long, + value_delimiter = ',', + value_enum, + default_values_t = vec![Format::Parquet, Format::OnDiskVortex] + )] + formats: Vec, + #[arg(short, long)] + verbose: bool, + #[arg(long)] + tracing: bool, + #[arg(short, long, default_value_t, value_enum)] + display_format: DisplayFormat, + #[arg(short, long)] + output_path: Option, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + setup_logging_and_tracing(args.verbose, args.tracing)?; + + run_tpch_memory( + args.scale_factor, + args.formats, + args.display_format, + args.output_path, + ) + .await +} + +async fn run_tpch_memory( + scale_factor: String, + formats: Vec, + display_format: DisplayFormat, + output_path: Option, +) -> anyhow::Result<()> { + for format in &formats { + anyhow::ensure!( + matches!(format, Format::Parquet | Format::OnDiskVortex), + "TPC-H memory benchmark only supports parquet and vortex formats", + ); + } + + let benchmark = TpcHBenchmark::new(scale_factor, None)?; + benchmark.generate_base_data().await?; + + let queries = benchmark.queries()?; + let expected_row_counts = benchmark.expected_row_counts().map(|rows| rows.to_vec()); + let progress = ProgressBar::new((queries.len() * formats.len()) as u64); + let targets = formats + .iter() + .map(|format| Target::new(Engine::DataFusion, *format)) + .collect::>(); + let mut measurements = Vec::with_capacity(formats.len()); + + if formats.contains(&Format::OnDiskVortex) { + let base_path = benchmark + .data_url() + .to_file_path() + .map_err(|_| anyhow::anyhow!("Invalid file URL: {}", benchmark.data_url()))?; + convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Default).await?; + } + + for format in formats { + let session = datafusion_bench::get_session_context(); + datafusion_bench::make_object_store(&session, benchmark.data_url())?; + register_tables(&session, &benchmark, format).await?; + + let profiler = start_heap_profiling()?; + for (query_idx, query) in &queries { + tracing::info!(query_idx, %format, "Running TPC-H query"); + let batches = session.sql(query).await?.collect().await?; + let row_count = batches.iter().map(|batch| batch.num_rows()).sum::(); + + if let Some(expected) = &expected_row_counts { + assert_eq!( + row_count, expected[*query_idx], + "Row count mismatch for query {query_idx}", + ); + } + + progress.inc(1); + } + + let stats = profiler.finish(); + measurements.push(NamedMeasurement { + name: format!("tpch peak memory/datafusion:{}", format.name()), + target: Target::new(Engine::DataFusion, format), + unit: "MiB".into(), + value: stats.max_mib(), + storage: Some(STORAGE_NVME.to_string()), + }); + } + progress.finish(); + let output = BenchmarkOutput::with_path("tpch-datafusion-memory", output_path); + let mut writer = output.create_writer()?; + + match display_format { + DisplayFormat::Table => render_table(&mut writer, measurements, &targets)?, + DisplayFormat::GhJson => print_measurements_json(&mut writer, measurements)?, + } + + Ok(()) +} + +async fn register_tables( + session: &SessionContext, + benchmark: &impl Benchmark, + format: Format, +) -> anyhow::Result<()> { + let benchmark_base = benchmark.data_url().join(&format!("{}/", format.name()))?; + let file_format = datafusion_bench::format_to_df_format(format); + + for table in benchmark.table_specs() { + let table_url = ListingTableUrl::try_new( + benchmark_base.clone(), + benchmark.pattern(table.name, format), + )?; + + let listing_options = ListingOptions::new(file_format.clone()) + .with_session_config_options(session.state().config()); + let mut config = ListingTableConfig::new(table_url).with_listing_options(listing_options); + + config = match table.schema.as_ref() { + Some(schema) => config.with_schema(Arc::new(schema.clone())), + None => config.infer_schema(&session.state()).await?, + }; + + let listing_table = Arc::new(ListingTable::try_new(config)?); + session.register_table(table.name, listing_table)?; + } + + Ok(()) +} diff --git a/docs/developer-guide/benchmarking.md b/docs/developer-guide/benchmarking.md index e971db2b239..f6bb1569fff 100644 --- a/docs/developer-guide/benchmarking.md +++ b/docs/developer-guide/benchmarking.md @@ -46,6 +46,24 @@ cargo run --release --bin datafusion-bench -- cargo run --release --bin duckdb-bench -- ``` +## Memory Benchmarks + +Peak-memory workload benchmarks use `dhat` and run as separate binaries so they do not affect +timing benchmarks. + +Run compression memory benchmarks with: + +```bash +cargo run --profile release_debug -p compress-memory-bench --features dhat,lance -- \ + --formats parquet,lance,vortex +``` + +Run the fixed TPC-H memory workload (DataFusion on Parquet and Vortex) with: + +```bash +cargo run --profile release_debug -p tpch-datafusion-memory-bench --features dhat -- +``` + ## Orchestrator The `bench-orchestrator` is a Python CLI tool (`vx-bench`) that coordinates running benchmarks @@ -100,6 +118,9 @@ Benchmarks run automatically on all commits to `develop` and can be run on-deman - **Post-commit** -- compression, random access, and SQL benchmarks run on every commit to `develop`, with results uploaded for historical tracking. +- **Memory workloads** -- compression and a fixed TPC-H DataFusion workload (Parquet + Vortex) + run with + `dhat` on every commit to `develop`, with results published to the benchmarks website. - **PR benchmarks** -- triggered by the `action/benchmark` label. Results are compared against the latest `develop` run and posted as a PR comment. - **SQL benchmarks** -- triggered by the `action/benchmark-sql` label. Runs a parametric matrix diff --git a/scripts/compare-benchmark-jsons.py b/scripts/compare-benchmark-jsons.py index eea999315ad..132f9e8a50d 100644 --- a/scripts/compare-benchmark-jsons.py +++ b/scripts/compare-benchmark-jsons.py @@ -10,6 +10,7 @@ # SPDX-FileCopyrightText: Copyright the Vortex contributors import math +from numbers import Integral import sys import pandas as pd @@ -207,11 +208,17 @@ def build_group_summary(group_df): return ratio_summary, significant_improvements, significant_regressions -def format_integer_value(value): +def format_value(value): if pd.isna(value): return "" - return str(int(value)) + if isinstance(value, Integral): + return str(int(value)) + + if isinstance(value, float) and value.is_integer(): + return str(int(value)) + + return f"{value:.2f}" def format_name_with_highlight(name, ratio): @@ -240,8 +247,8 @@ def format_name_with_highlight(name, ratio): "name": [ format_name_with_highlight(name, ratio) for name, ratio in zip(group_df["name"], group_df["ratio"]) ], - f"PR {pr_commit_id[:8]} ({unit})": group_df["value_pr"].map(format_integer_value), - f"base {base_commit_id[:8]} ({unit})": group_df["value_base"].map(format_integer_value), + f"PR {pr_commit_id[:8]} ({unit})": group_df["value_pr"].map(format_value), + f"base {base_commit_id[:8]} ({unit})": group_df["value_base"].map(format_value), "ratio (PR/base)": group_df["ratio"], } ) diff --git a/vortex-bench/Cargo.toml b/vortex-bench/Cargo.toml index 62d302f12e1..ae8d9d7a4d5 100644 --- a/vortex-bench/Cargo.toml +++ b/vortex-bench/Cargo.toml @@ -16,6 +16,9 @@ version = { workspace = true } [lints] workspace = true +[features] +dhat-heap = ["dep:dhat"] + [dependencies] anyhow = { workspace = true } arrow-array = { workspace = true } @@ -25,6 +28,7 @@ async-trait = { workspace = true } bytes = { workspace = true } bzip2 = { workspace = true } clap = { workspace = true, features = ["derive"] } +dhat = { workspace = true, optional = true } futures = { workspace = true } get_dir = { workspace = true } glob = { workspace = true } diff --git a/vortex-bench/src/dhat.rs b/vortex-bench/src/dhat.rs new file mode 100644 index 00000000000..d0db116b0b1 --- /dev/null +++ b/vortex-bench/src/dhat.rs @@ -0,0 +1,57 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#[cfg(not(feature = "dhat-heap"))] +use anyhow::bail; + +/// Peak heap statistics collected by dhat. +#[derive(Debug, Clone, Copy)] +pub struct HeapStats { + pub max_bytes: u64, +} + +impl HeapStats { + pub fn max_mib(self) -> f64 { + self.max_bytes as f64 / 1024.0 / 1024.0 + } +} + +/// Guard for a dhat heap profile. +pub struct HeapProfiler { + #[cfg(feature = "dhat-heap")] + profiler: ::dhat::Profiler, +} + +/// Start a heap profile for the current thread. +pub fn start_heap_profiling() -> anyhow::Result { + #[cfg(feature = "dhat-heap")] + { + Ok(HeapProfiler { + profiler: ::dhat::Profiler::builder().testing().build(), + }) + } + + #[cfg(not(feature = "dhat-heap"))] + { + bail!("dhat heap profiling is disabled; rebuild with the `dhat-heap` feature") + } +} + +impl HeapProfiler { + pub fn finish(self) -> HeapStats { + #[cfg(feature = "dhat-heap")] + { + let stats = ::dhat::HeapStats::get(); + drop(self.profiler); + HeapStats { + max_bytes: u64::try_from(stats.max_bytes).unwrap_or(u64::MAX), + } + } + + #[cfg(not(feature = "dhat-heap"))] + { + let _ = self; + unreachable!("HeapProfiler can only be constructed with the `dhat-heap` feature"); + } + } +} diff --git a/vortex-bench/src/lib.rs b/vortex-bench/src/lib.rs index 6dad0f0f6a1..683f74f2c21 100644 --- a/vortex-bench/src/lib.rs +++ b/vortex-bench/src/lib.rs @@ -37,6 +37,7 @@ pub mod clickbench; pub mod compress; pub mod conversions; pub mod datasets; +pub mod dhat; pub mod display; pub mod downloadable_dataset; pub mod fineweb; @@ -63,7 +64,13 @@ pub use vortex::error::vortex_panic; use vortex::io::session::RuntimeSessionExt; use vortex::session::VortexSession; -// All benchmarks run with mimalloc for consistency. +// Memory benchmarks opt into dhat so peak allocation tracking can be collected in-process. +#[cfg(feature = "dhat-heap")] +#[global_allocator] +static GLOBAL: ::dhat::Alloc = ::dhat::Alloc; + +// All non-memory benchmarks run with mimalloc for consistency. +#[cfg(not(feature = "dhat-heap"))] #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; diff --git a/vortex-bench/src/measurements.rs b/vortex-bench/src/measurements.rs index f49349cd95e..8c5a8a2d3f0 100644 --- a/vortex-bench/src/measurements.rs +++ b/vortex-bench/src/measurements.rs @@ -381,6 +381,50 @@ impl ToTable for CompressionTimingMeasurement { } } +#[derive(Clone, Debug)] +pub struct CompressionMemoryMeasurement { + pub name: String, + pub format: Format, + pub value_mib: f64, +} + +impl ToJson for CompressionMemoryMeasurement { + fn to_json(&self) -> serde_json::Value { + let (name, engine) = match self.format { + Format::OnDiskVortex => (self.name.to_string(), Engine::Vortex), + Format::Parquet => (format!("parquet_rs-zstd {}", self.name), Engine::Arrow), + Format::Lance => (format!("lance {}", self.name), Engine::Arrow), + _ => vortex_panic!( + "CompressionMemoryMeasurement only supports vortex, lance, and parquet formats" + ), + }; + + serde_json::to_value(JsonValue { + name, + storage: None, + unit: Some(Cow::from("MiB")), + value: MeasurementValue::Float(self.value_mib), + time: None, + bytes: None, + commit_id: Cow::from(GIT_COMMIT_ID.as_str()), + target: Target::new(engine, self.format), + }) + .expect("value is valid JSON") + } +} + +impl ToTable for CompressionMemoryMeasurement { + fn to_table(&self) -> TableValue { + TableValue { + id: None, + name: self.name.clone(), + target: Target::new(Engine::default(), self.format), + unit: Cow::from("MiB"), + value: MeasurementValue::Float(self.value_mib), + } + } +} + #[derive(Clone, Debug)] pub struct CustomUnitMeasurement { pub name: String, @@ -425,6 +469,43 @@ impl ToTable for CustomUnitMeasurement { } } +#[derive(Clone, Debug)] +pub struct NamedMeasurement { + pub name: String, + pub target: Target, + pub unit: Cow<'static, str>, + pub value: f64, + pub storage: Option, +} + +impl ToJson for NamedMeasurement { + fn to_json(&self) -> serde_json::Value { + serde_json::to_value(JsonValue { + name: self.name.clone(), + storage: self.storage.clone(), + unit: Some(self.unit.clone()), + value: MeasurementValue::Float(self.value), + time: None, + bytes: None, + commit_id: Cow::from(GIT_COMMIT_ID.as_str()), + target: self.target, + }) + .expect("value is valid JSON") + } +} + +impl ToTable for NamedMeasurement { + fn to_table(&self) -> TableValue { + TableValue { + id: None, + name: self.name.clone(), + target: self.target, + unit: self.unit.clone(), + value: MeasurementValue::Float(self.value), + } + } +} + #[derive(Clone, Debug)] pub struct MemoryMeasurement { pub query_idx: usize,