Latest Results
feat(checkpoint): distributed checkpoint filter via optimizer rule (DF-1779)
Replaces the temporary in-process HashSet checkpoint filter with a
distributed anti-join using the existing KeyFilteringJoin infrastructure,
and wires checkpoint tracking end-to-end from Python API through the
optimizer to the S3-backed store.
## How it works
When a user writes `daft.read_parquet(path, checkpoint=store, on="key")`,
the optimizer rewrites the Source node into:
Sink
└─ [user ops: Filter / Project / UDF]
└─ StageCheckpointKeys
└─ Join(Anti, KeyFiltering) {
left: Source { checkpoint cleared }
right: Source(BlobStoreCheckpointedKeysScanOperator)
}
The right side lazily enumerates sealed key parquet files from the
checkpoint store at scan-task time — no I/O at plan-build or optimizer
time. Already-processed rows are filtered out before reaching the
StageCheckpointKeys node or the write sink.
## Key changes
- Split `common/checkpoint-config` off daft-checkpoint so the logical
plan and execution sides share types without the full store dep.
- `CheckpointConfig` on Source node, threaded through the optimizer and
physical-plan translator.
- `LogicalPlan::StageCheckpointKeys` variant + matching LocalPhysicalPlan
and distributed pipeline node.
- `RewriteCheckpointSource` optimizer rule with map-only validation
(rejects shuffles, Limit, Offset, Sample downstream of a checkpointed
source, with worked examples in the denylist comments).
- `CheckpointTerminusNode` for sink-less plans (DF-1920).
- `BlobStoreCheckpointedKeysScanOperator` — scan operator for sealed key
files.
- Pushdown guards on `PushDownFilter` / `PushDownProjection` so rows
don't drop at the Parquet scan before SCKO can stage them.
- `CheckpointIdMap` for per-input `CheckpointId` in shared Flotilla
pipelines (fixes "already sealed" error under multi-partition input).
- Delete `InMemoryCheckpointStore` (no use case under Flotilla's
cross-process model); extract checkpoint-store contract tests into a
`generate_checkpoint_store_tests!` macro.
- Python API: `CheckpointStore` class, `read_parquet(checkpoint=, on=)`
kwargs, native-runner gate with remediation guidance.
- Terminology: rename "seal"/"sealed" → "checkpoint"/"checkpointed" in
docstrings, Rust comments, and test names.
## Follow-ups
- DF-1915: KFJ actor config (num_workers, cpus) is hardcoded
- DF-1916: right-side key column name should be canonical, not
source-matching
- DF-1917: idempotent catalog commits for Iceberg/Delta (consume
`get_checkpointed_files`)
- DF-1918: empty-source xfail — store needs to track in-flight writes
- DF-1921: multi-source (concat/union) checkpointingrohit/feature/df-1779-checkpoint-filter Latest Branches
-15%
BABTUNA:feat/symbolize-string-groupby -1%
gavin9402:introduce_file_resource -1%
oh/partition_refs_in_flight_distributed © 2026 CodSpeed Technology