Latest Results
fix(shuffle): concat recordbatches before repartition (#7064)
## Summary
Follows #7056 (merged).
The flight repartition sink buffers ~256MB of scan morsels, partitions
the buffer, and accumulates the partitioned output until finalize writes
it in one shot. `partition_by_*` partitions **each record batch of its
input separately**, so a B-batch flush buffer (dozens of morsels) yields
B fragments per output partition per flush. At high partition counts the
accumulated per-fragment overhead dwarfs the data: **~1.7GB of measured
map input reached 40–57GB resident** at 8192 partitions on TPC-H SF1000
(512 × ~2GB files), OOM-killing workers within the first minute of the
map phase.
Fix: fuse each flush buffer into a single record batch before
partitioning — one fragment per partition per flush instead of one per
buffered batch, with each input byte copied exactly once. As a side
effect, partitioning hashes one contiguous batch instead of dozens of
small ones, which is a sizeable map-phase win at scale. The write path
and on-disk layout are unchanged.
One file, +13 lines.
## Benchmarks
TPC-H lineitem shuffle (hash repartition on orderkey + parquet write to
S3), 32 × r8g.2xlarge workers, lz4 IPC compression, flight shuffle. All
binaries sha256-verified on every node before each run; "without fix" =
current `main` shuffle code minus this commit.
| config | without fix | with fix |
|---|---|---|
| 1TB, 2048 partitions | 135.4s | **129.5s** |
| 1TB, 4096 partitions | 175–179s | **161.2s** |
| 1TB, 8192 partitions | **worker OOM** (map task killed at 57.5GB, ~33s
into the map phase; reproduced) | **222.4s** |
| 10TB, 4096 partitions | 1083s (best prior reference) | **934.4s** |
| 10TB, 8192 partitions | 1457s (best prior reference) | **1154.8s** |
The speedup grows with partition count, as expected from the mechanism:
partitioning previously ran once per buffered batch (dozens per flush),
each pass building `num_partitions` outputs.
All outputs verified row-exact (1TB = 5,999,989,709 rows; 10TB =
59,999,994,267). Correctness additionally validated via materialize-once
fingerprinting (row counts, integer column sums, distinct keys,
per-group count fingerprints) at P ∈ {8, 512, 2048}.
Note: `partition_by_random` previously seeded each input batch as `seed
+ batch_idx`; with a single fused batch it always uses `seed`.
Distribution remains uniform; row→partition placement changes for random
repartitions.
## Follow-ups
1. Gather low-32 sequence-id collision fix (per-state counters can
collide across concurrent states in the server registry).
2. Compact `FlightMapOutput` map-task results (one summary object per
task instead of `num_partitions` pyo3 refs).
3. Optional stripe-spill hard memory bound for map inputs beyond a
per-task budget (this fix bounds memory to the measured input size;
spill would bound it regardless of input size).
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> simplifycolin/flight-shuffle-map-side-memory Latest Branches
0%
nish2292:feat/string-distance-functions 0%
colin/flight-shuffle-map-side-memory 0%
wuleiwuleiwulei:wl_0603_enum © 2026 CodSpeed Technology