Latest Results
feat(checkpoint): distributed checkpoint filter via optimizer rule (DF-1779)
Replaces the temporary in-process HashSet checkpoint filter with a
distributed anti-join using the existing KeyFilteringJoin infrastructure,
and wires checkpoint tracking end-to-end from Python API through the
optimizer to the S3-backed store.
## How it works
When a user writes `daft.read_parquet(path, checkpoint=store, on="key")`,
the optimizer rewrites the Source node into:
Sink
└─ [user ops: Filter / Project / UDF]
└─ StageCheckpointKeys
└─ Join(Anti, KeyFiltering) {
left: Source { checkpoint cleared }
right: Source(BlobStoreCheckpointedKeysScanOperator)
}
The right side lazily enumerates sealed key parquet files from the
checkpoint store at scan-task time — no I/O at plan-build or optimizer
time. Already-processed rows are filtered out before reaching the
StageCheckpointKeys node or the write sink.
## Key changes
- Split `common/checkpoint-config` off daft-checkpoint so the logical
plan and execution sides share types without the full store dep.
- `CheckpointConfig` on Source node, threaded through the optimizer and
physical-plan translator.
- `LogicalPlan::StageCheckpointKeys` variant + matching LocalPhysicalPlan
and distributed pipeline node.
- `RewriteCheckpointSource` optimizer rule with map-only validation
(rejects shuffles, Limit, Offset, Sample downstream of a checkpointed
source, with worked examples in the denylist comments).
- `CheckpointTerminusNode` for sink-less plans (DF-1920).
- `BlobStoreCheckpointedKeysScanOperator` — scan operator for sealed key
files.
- Pushdown guards on `PushDownFilter` / `PushDownProjection` so rows
don't drop at the Parquet scan before SCKO can stage them.
- `CheckpointIdMap` for per-input `CheckpointId` in shared Flotilla
pipelines (fixes "already sealed" error under multi-partition input).
- Delete `InMemoryCheckpointStore` (no use case under Flotilla's
cross-process model); extract checkpoint-store contract tests into a
`generate_checkpoint_store_tests!` macro.
- Python API: `CheckpointStore` class, `read_parquet(checkpoint=, on=)`
kwargs, native-runner gate with remediation guidance.
- Terminology: rename "seal"/"sealed" → "checkpoint"/"checkpointed" in
docstrings, Rust comments, and test names.
## Follow-ups
- DF-1915: KFJ actor config (num_workers, cpus) is hardcoded
- DF-1916: right-side key column name should be canonical, not
source-matching
- DF-1917: idempotent catalog commits for Iceberg/Delta (consume
`get_checkpointed_files`)
- DF-1918: empty-source xfail — store needs to track in-flight writes
- DF-1921: multi-source (concat/union) checkpointingrohit/feature/df-1779-checkpoint-filter fix(checkpoint): preserve checkpoint field through pushdown optimizations
Pushdown rules (filter, projection, limit, aggregation, shard) all create
a new Source node when pushing a predicate/columns/limit/shard into the
scan. They used Source::new() which resets every field — including the
new `checkpoint: Option<CheckpointConfig>` — silently dropping it.
Any pipeline with an intermediate filter, projection, limit, etc. between
a checkpoint-enabled source and the sink would therefore lose the
checkpoint config before RewriteCheckpointSource could see it. No
anti-join, no stage_keys, no sealing — re-runs reprocess everything.
Add `Source::with_source_info(new_info)` that preserves every other
metadata field (output schema, plan/node ids, stats state, checkpoint).
Update all pushdown sites to use it instead of `Source::new`.
Integration test: `test_checkpoint_s3_filter_between_source_and_sink`
verifies that rows filtered out between source and sink still have their
source keys sealed — an expensive filter is never re-evaluated on re-run
(matches the source-oriented design intent from discussion #6446).rohit/feature/df-1779-checkpoint-filter Latest Branches
-1%
oh/partition_refs_in_flight_distributed -1%
rohit/feature/df-1779-checkpoint-filter 0%
euanlimzx:euan/asof_binary_search © 2026 CodSpeed Technology