Latest Results
feat(flotilla): Distributed Limit Counter (#6942)
## Changes Made
Replaces the materializing two-phase local+global limit with a streaming
limit driven by a Ray-actor-backed global counter.
- **`LimitCounterActor`** (`daft/execution/ray_distributed_limit.py`)
holds `(remaining_skip, remaining_take)`. `claim(input_id, num_rows) ->
(skip, take, done)` is atomic; `start_task(input_id)` refunds a prior
attempt's claims so retries see consistent state.
`await_limit_completion()` resolves once the limit is fully claimed and
returns the input_ids that actually consumed budget. Actor is pinned to
head node.
- **`DistributedLimitSink`**
(`src/daft-local-execution/src/streaming_sink/distributed_limit.rs`)
calls `claim(input_id, num_rows)` on the counter the actor per morsel,
and slices the morsel based on the returned `(skip, take, done)`.
- **`LimitNode`** (`src/daft-distributed/src/pipeline_node/limit.rs`)
creates the counter actor, and appends distributed_limit tasks to the
input tasks. It awaits the limit completion from the actor, and once
done, cancels the scheduling of any subsequent limit tasks. It is aware
of which input ids contributed to the limit, and only cancels tasks not
with these ids.
- **Scheduler** now filters cancelled tasks at `schedule_tasks` and
emits `TaskEvent::Cancelled` to avoid scheduling new limit tasks.
### Ordering note
Across-partition order is no longer preserved — workers race to claim.
`tests/integration/iceberg/test_partition_pruning.py` sorts before
limiting (matching the `_on_number` precedent already in that file); a
related dataframe test was loosened similarly.
## Related Issues
<!-- none -->
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Latest Branches
-12%
colin/flight-shuffle-perf 0%
0%
© 2026 CodSpeed Technology