Skip to content

feat: parallelize parquet load by row group#24808

Open
iamlinjunhong wants to merge 9 commits into
matrixorigin:4.0-devfrom
iamlinjunhong:d4-24254
Open

feat: parallelize parquet load by row group#24808
iamlinjunhong wants to merge 9 commits into
matrixorigin:4.0-devfrom
iamlinjunhong:d4-24254

Conversation

@iamlinjunhong

Copy link
Copy Markdown
Contributor

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #24254

What this PR does / why we need it:

Plan Parquet LOAD around file and row-group fanout, carry shard metadata through ExternalScan, and keep S3 prefetch behavior bounded for sharded readers.

Harden unsupported option handling, add Parquet profile stats, and cover compile/runtime/BVT regressions for schema, conversion, rollback, and parallel-load paths.

@qodo-code-review

Copy link
Copy Markdown

Qodo reviews are paused for this user.

Troubleshooting steps vary by plan Learn more →

On a Teams plan?
Reviews resume once this user has a paid seat and their Git account is linked in Qodo.
Link Git account →

Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center?
These require an Enterprise plan - Contact us
Contact us →

@matrix-meow matrix-meow added the size/XXL Denotes a PR that changes 2000+ lines label Jun 3, 2026
@iamlinjunhong

Copy link
Copy Markdown
Contributor Author

I found two substantive issues in the current patch.

  1. Empty parquet files can disable row-group fanout for the remaining non-empty files.
    The planner currently switches to row-group fanout only when footerStats.RowGroups > len(fileList). That means empty parquet files still count toward len(fileList) even though they contribute zero row groups. So a case like {empty.parquet, data.parquet(with 2 row groups)} falls back to whole-file fanout instead of splitting the non-empty file by row group, which defeats the new planning logic for a very realistic edge case.
  2. Small LOAD ... PARALLEL files now change empty numeric fields from the normal serial behavior to 0.
    ParallelLoadRequested is latched before the planner disables parallel load for small files. Later, the external path treats ParallelLoadRequested the same as actual parallel execution and turns on LoadEmptyNumericAsZero. So the same small CSV row with an empty numeric field can now be silently loaded as 0 just because the original statement asked for PARALLEL, even though the actual execution path is serial. That makes the result depend on the file-size threshold, which is a real semantic regression.

I think both of these should be fixed before approval.

fixed

iamlinjunhong and others added 6 commits June 23, 2026 14:34
Plan Parquet LOAD around file and row-group fanout, carry shard metadata through ExternalScan, and keep S3 prefetch behavior bounded for sharded readers.

Harden unsupported option handling, add Parquet profile stats, and cover compile/runtime/BVT regressions for schema, conversion, rollback, and parallel-load paths.
Add the missing DATE32 Parquet resource used by load_data_parquet.sql so the 4.0-dev BVT can load the cherry-picked date32-to-DATETIME case instead of failing on a missing file.
## What type of PR is this?

- [ ] API-change
- [x] BUG
- [ ] Improvement
- [ ] Documentation
- [ ] Feature
- [ ] Test and CI
- [ ] Code Refactoring

## Which issue(s) this PR fixes:

issue matrixorigin#24846

## What this PR does / why we need it:

This fixes LOAD external scan stats so large LOAD jobs keep
row/cardinality semantics for `Cost`, `Outcnt`, `TableCnt`, and
`BlockNum`, while preserving `Cost * Rowsize` as the input-size hint
used by external scan parallel sizing.

Previously the LOAD stats used input bytes as `Cost` with `Rowsize=1`
and forced `BlockNum=1` / `TableCnt=1`. That can make large CSV/TBL LOAD
choose `AP_ONECN` instead of the expected multi-CN AP path.

Tests:

```sh
CGO_CFLAGS="-I$(pwd)/cgo -I$(pwd)/thirdparties/install/include" go test ./pkg/sql/plan -count=1
```

(cherry picked from commit 4426ca7)

@XuPeng-SH XuPeng-SH left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-checked the latest head. The earlier blockers around row-group planning and the small-file ParallelLoadRequested regression look addressed, but there is still one remaining high-severity issue in the empty-file path.

In the new row-group fanout path, empty parquet files are effectively dropped from execution:

  • readLoadParquetRowGroupMetadata() only emits metadata rows for actual row groups
  • splitParquetRowGroupShards() only carries files into a shard when they own at least one row group
  • compileExternScanParquetRowGroupFanout() then executes only those sharded files

That means an empty parquet file matched by the load pattern never reaches newParquetHandler() once the plan has switched to row-group fanout.

But the empty-file compatibility check still lives in pkg/sql/colexec/external/parquet.go:63-81: that is where an empty parquet with the wrong column count is supposed to fail. So with the current fanout logic, a wildcard load that matches {empty_bad_schema.parquet, data.parquet(with row groups)} can silently skip the empty file and lose the existing schema/column-count validation instead of rejecting the load.

The current tests actually show this behavior clearly:

  • pkg/sql/compile/scope_test.go:885-950 asserts the fanout scopes contain only the non-empty file
  • pkg/sql/colexec/external/parquet_test.go:3055-3093 shows the mismatch is only caught when the empty file is actually opened by newParquetHandler()

Concrete suggestions

  1. Validate empty parquet files during footer enumeration before discarding them from row-group metadata.
  2. Or, carry matched empty files into at least one execution shard so newParquetHandler() still opens them and applies the existing empty-file checks.
  3. Add one regression test for the mixed case explicitly: load pattern matches both
    • an empty parquet with schema/column-count mismatch, and
    • a non-empty multi-row-group parquet,
      and the whole load must still fail.

Non-blocking follow-up only: I still think an end-to-end case that actually forces the row-group-parallel execution path on a threshold-crossing parquet file would be a useful extra proof after this semantic hole is fixed.

@iamlinjunhong

Copy link
Copy Markdown
Contributor Author

I re-checked the latest head. The earlier blockers around row-group planning and the small-file ParallelLoadRequested regression look addressed, but there is still one remaining high-severity issue in the empty-file path.

In the new row-group fanout path, empty parquet files are effectively dropped from execution:

  • readLoadParquetRowGroupMetadata() only emits metadata rows for actual row groups
  • splitParquetRowGroupShards() only carries files into a shard when they own at least one row group
  • compileExternScanParquetRowGroupFanout() then executes only those sharded files

That means an empty parquet file matched by the load pattern never reaches newParquetHandler() once the plan has switched to row-group fanout.

But the empty-file compatibility check still lives in pkg/sql/colexec/external/parquet.go:63-81: that is where an empty parquet with the wrong column count is supposed to fail. So with the current fanout logic, a wildcard load that matches {empty_bad_schema.parquet, data.parquet(with row groups)} can silently skip the empty file and lose the existing schema/column-count validation instead of rejecting the load.

The current tests actually show this behavior clearly:

  • pkg/sql/compile/scope_test.go:885-950 asserts the fanout scopes contain only the non-empty file
  • pkg/sql/colexec/external/parquet_test.go:3055-3093 shows the mismatch is only caught when the empty file is actually opened by newParquetHandler()

Concrete suggestions

  1. Validate empty parquet files during footer enumeration before discarding them from row-group metadata.

  2. Or, carry matched empty files into at least one execution shard so newParquetHandler() still opens them and applies the existing empty-file checks.

  3. Add one regression test for the mixed case explicitly: load pattern matches both

    • an empty parquet with schema/column-count mismatch, and
    • a non-empty multi-row-group parquet,
      and the whole load must still fail.

Non-blocking follow-up only: I still think an end-to-end case that actually forces the row-group-parallel execution path on a threshold-crossing parquet file would be a useful extra proof after this semantic hole is fixed.

fixed

@XuPeng-SH XuPeng-SH left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-checked the latest head. The previous blockers around row-group planning and the small-file ParallelLoadRequested regression are fixed, and the remaining empty-parquet semantic hole is now closed as well: empty parquet files are validated during footer enumeration before being dropped from row-group fanout, with a mixed empty+non-empty regression test to prove it. I do not see a remaining correctness issue that should keep this blocked.

@mergify

mergify Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Queued — the merge queue status continues in this comment ↓.

@mergify

mergify Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Merge Queue Status

  • Entered queue2026-06-24 10:21 UTC · Rule: release-4.0 · triggered by rule Automatic queue on approval for release-4.0
  • Checks timed out · in-place
  • 🚫 Left the queue2026-06-24 14:22 UTC · at ebd471cb81582c166fa362bcd046d3f037d65ec9

This pull request spent 4 hours 1 minute 7 seconds in the queue, with no time running CI.

Waiting for any of
  • check-neutral = Matrixone CI / SCA Test on Ubuntu/x86
  • check-skipped = Matrixone CI / SCA Test on Ubuntu/x86
  • check-success = Matrixone CI / SCA Test on Ubuntu/x86
All conditions
  • any of [🛡 GitHub branch protection]:
    • check-neutral = Matrixone CI / SCA Test on Ubuntu/x86
    • check-skipped = Matrixone CI / SCA Test on Ubuntu/x86
    • check-success = Matrixone CI / SCA Test on Ubuntu/x86
  • #approved-reviews-by >= 1 [🛡 GitHub branch protection]
  • #review-threads-unresolved = 0 [🛡 GitHub branch protection]
  • github-review-decision = APPROVED [🛡 GitHub branch protection]
  • any of [🛡 GitHub branch protection]:
    • check-success = Matrixone Compose CI / multi cn e2e bvt test docker compose(PESSIMISTIC)
    • check-neutral = Matrixone Compose CI / multi cn e2e bvt test docker compose(PESSIMISTIC)
    • check-skipped = Matrixone Compose CI / multi cn e2e bvt test docker compose(PESSIMISTIC)
  • any of [🛡 GitHub branch protection]:
    • check-success = Matrixone Standlone CI / Multi-CN e2e BVT Test on Linux/x64(LAUNCH, PROXY)
    • check-neutral = Matrixone Standlone CI / Multi-CN e2e BVT Test on Linux/x64(LAUNCH, PROXY)
    • check-skipped = Matrixone Standlone CI / Multi-CN e2e BVT Test on Linux/x64(LAUNCH, PROXY)
  • any of [🛡 GitHub branch protection]:
    • check-success = Matrixone Standlone CI / e2e BVT Test on Linux/x64(LAUNCH, PESSIMISTIC)
    • check-neutral = Matrixone Standlone CI / e2e BVT Test on Linux/x64(LAUNCH, PESSIMISTIC)
    • check-skipped = Matrixone Standlone CI / e2e BVT Test on Linux/x64(LAUNCH, PESSIMISTIC)
  • any of [🛡 GitHub branch protection]:
    • check-success = Matrixone Compose CI / multi cn e2e bvt test docker compose(Optimistic/PUSH)
    • check-neutral = Matrixone Compose CI / multi cn e2e bvt test docker compose(Optimistic/PUSH)
    • check-skipped = Matrixone Compose CI / multi cn e2e bvt test docker compose(Optimistic/PUSH)
  • any of [🛡 GitHub branch protection]:
    • check-success = Matrixone Standlone CI / e2e BVT Test on Linux/x64(LAUNCH,Optimistic)
    • check-neutral = Matrixone Standlone CI / e2e BVT Test on Linux/x64(LAUNCH,Optimistic)
    • check-skipped = Matrixone Standlone CI / e2e BVT Test on Linux/x64(LAUNCH,Optimistic)
  • any of [🛡 GitHub branch protection]:
    • check-success = Matrixone Upgrade CI / Compatibility Test With Target on Linux/x64(LAUNCH)
    • check-neutral = Matrixone Upgrade CI / Compatibility Test With Target on Linux/x64(LAUNCH)
    • check-skipped = Matrixone Upgrade CI / Compatibility Test With Target on Linux/x64(LAUNCH)
  • any of [🛡 GitHub branch protection]:
    • check-success = Matrixone Utils CI / Coverage
    • check-neutral = Matrixone Utils CI / Coverage
    • check-skipped = Matrixone Utils CI / Coverage
  • any of [🛡 GitHub branch protection]:
    • check-success = Matrixone CI / UT Test on Ubuntu/x86
    • check-neutral = Matrixone CI / UT Test on Ubuntu/x86
    • check-skipped = Matrixone CI / UT Test on Ubuntu/x86

Reason

The merge conditions cannot be satisfied: the checks did not pass within the checks timeout of 4 hours

Hint

You may have to fix your CI before adding the pull request to the queue again.

If you update this pull request, to fix the CI, it will automatically be requeued once the queue conditions match again.
If you think this was a flaky issue instead, you can requeue the pull request, without updating it, by posting a @mergifyio queue comment.

Tick the box to put this pull request back in the merge queue (same as @mergifyio queue).

  • Requeue this pull request

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size/XXL Denotes a PR that changes 2000+ lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants