Latest Results
feat(dashboard): per-task rows/bytes stats with task-topology markers (#6861)
## Summary
Per-task external I/O tracking (rows in/out, bytes in/out) in the
Flotilla Tasks sidebar, with correct attribution across fused operator
chains.
The naive aggregator that just summed `rows.in`/`rows.out` across every
snapshot in `TaskEndEvent.stats` double-counted internal data: e.g. for
a fused chain `Source(rows_out=100) -> Filter(rows_in=100, rows_out=50)
-> Project(rows_in=50, rows_out=50)` it would report `rows_in=150` and
`rows_out=200` instead of the correct 0 / 50 (only external traffic
counts). This PR tags each local plan node as `is_task_root` /
`is_task_leaf` on the driver before the plan ships to a worker, and uses
those markers to filter snapshots during aggregation.
<img width="970" height="703" alt="image"
src="https://github.com/user-attachments/assets/a198d214-7c9c-4786-ab88-99636448b426"
/>
## Backend changes
- **`LocalNodeContext::is_task_root` / `is_task_leaf`**
(`src/daft-local-plan/src/plan.rs`): two boolean tree-position markers,
plus `LocalPhysicalPlan::mark_task_topology()` that walks the plan tree
and tags the root and every leaf. Called from
`SwordfishTaskBuilder::build` so the markers ride along on the plan to
the worker.
- **`NodeInfo`** (`src/common/metrics/src/ops.rs`): new `is_task_root` /
`is_task_leaf` fields with `#[serde(default)]` for backward-compat with
old serialized snapshots. Set in `next_node_info` from the
`LocalNodeContext`.
- **`TaskExternalIo`** (`src/common/metrics/src/task_io.rs`, new):
shared helper with an `accumulate(&NodeInfo, &StatSnapshot)` method.
Filters `cpu_us` summed across every node, `rows.in` / `bytes.in` from
leaves only, `rows.out` / `bytes.out` from the root only. Source-leaf
snapshots (`SourceSnapshot`) report `rows.out` / `bytes.read` instead of
`rows.in` / `bytes.in` because a source has no upstream operator; both
equal the task's external input, so we fall back to them when the leaf
doesn't expose an `*.in` key.
- **Aggregation sites** use the helper:
- `on_task_end` in `src/daft-context/src/subscribers/dashboard.rs`
(final task totals).
- `emit_per_task_stats_updates` in
`src/daft-local-execution/src/runtime_stats/mod.rs` (mid-flight per-tick
updates). Both stay in lockstep so changes to attribution semantics
happen in one place.
- **Dashboard server state** (`src/daft-dashboard/src/`):
- `TaskTotals`, `TaskInfo`, `TaskGroupSummary` gained `rows_in` /
`rows_out` / `bytes_in` / `bytes_out`.
- `update_task_stats` writes per-task totals; `end_task` adds the task's
total to the group sum on completion (same model as `cpu_us`).
- **Per-tick events** (`src/daft-context/src/subscribers/events.rs`):
`TaskStatsSnapshot` extended with the four I/O fields.
## Frontend changes (`src/daft-dashboard/frontend/src/app/query/`)
- `formatCount` helper in `stats-utils.tsx` (compact rows: `1.2K`,
`1.2M`).
- `TaskInfo` / `TaskGroupSummary` types extended (`types.ts`).
- `TaskTypeRow` extended with group totals (`tasks-grouping.ts`).
- `tasks-sidebar.tsx`:
- Running-tasks "top" section: added Rows in/out, Bytes in/out columns;
CPU still last.
- Group table: same four columns added before the CPU column.
- Expanded sub-rows: same four columns.
- Layout: bumped sidebar wrapper to `min-w-[940px] max-w-[1200px]` in
`page.tsx` so the expanded grid fits without internal scrolling on
typical screens (plan view yields width and scrolls itself instead).
Running-tasks animation wrapper switched from `overflow-hidden` to
`overflow-x-auto overflow-y-hidden` so horizontal overflow scrolls when
the sidebar is unusually narrow.
## Test plan
- [x] Unit tests for `mark_task_topology` covering single-node, linear
chain, multi-leaf (cross join), and "original plan unchanged" โ `cargo
test -p daft-local-plan --lib mark_task_topology` (4/4 pass).
- [x] Existing dashboard `task_store` tests updated for the new
`end_task` signature and new fields (18/18 pass).
- [x] `cargo check` clean across the affected crates with
`--all-features --all-targets`.
- [x] `cargo fmt --check` and the full `pre-commit run --all-files`
clean.
- [ ] End-to-end: run a Flotilla query with internal row reduction (scan
+ filter + limit) with `DAFT_TASK_EVENTS_ENABLED=true` and
`DAFT_DASHBOARD_URL=...`. Per-task `rows_out` should match the limit's
emit count; `rows_in` the scan's read count.
## Related
Implements DF-1992. Builds on #6783 (which removed an earlier broken
version of these stats) and the task-snapshot-events branch.
https://claude.ai/code/session_01RXTspChUo4qFRxfFjJZqjr
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Latest Branches
0%
0%
everettVT/uuidv7-arrow-kernel -1%
ยฉ 2026 CodSpeed Technology