PrefectHQ
prefect
BlogDocsChangelog

Mitigate memory leak in in-memory messaging system

#19136Merged
Comparing
fix-memory-leak-in-messaging-subscriptions
(
162de5e
) with
main
(
cfac1c6
)
CodSpeed Performance Gauge
0%
Untouched
2
Ignored
6

Benchmarks

Passed

bench_task_decorator
benches/bench_tasks.py
CodSpeed Performance Gauge
0%
458.6 µs457.7 µs
bench_import_prefect_flow
benches/bench_import.py
CodSpeed Performance Gauge
0%
1.4 s1.4 s

Ignored

bench_flow_call[options0]
benches/bench_flows.py
Ignored
CodSpeed Performance Gauge
+5%
170.8 ms163.2 ms
bench_import_prefect
benches/bench_import.py
Ignored
CodSpeed Performance Gauge
0%
2.7 ms2.6 ms
bench_flow_decorator
benches/bench_flows.py
Ignored
CodSpeed Performance Gauge
-3%
4.1 ms4.3 ms
bench_task_submit
benches/bench_tasks.py
Ignored
CodSpeed Performance Gauge
+6%
5.9 ms5.6 ms
bench_flow_call[options1]
benches/bench_flows.py
Ignored
CodSpeed Performance Gauge
0%
106 ms106 ms
bench_task_call
benches/bench_tasks.py
Ignored
CodSpeed Performance Gauge
-7%
157.1 ms169.7 ms

Commits

Click on a commit to change the comparison range
Base
main
cfac1c6
+0.08%
Fix memory leak in in-memory messaging system closes #18605 This PR fixes a memory leak in the Prefect server when running without Redis for messaging. <details> <summary>Root Causes</summary> 1. **Orphaned Subscriptions**: Event services (EventPersister, ReactiveTriggers, EventLogger, Actions) created consumers with subscriptions but never cleaned them up on shutdown, leaving subscriptions and their unbounded queues in memory. 2. **Unbounded Queues**: Each subscription had two `asyncio.Queue` objects with no maxsize, allowing unlimited memory growth. 3. **No Cleanup Mechanism**: Consumers had no way to unsubscribe from topics when services stopped. </details> <details> <summary>Changes Made</summary> **Bounded Queues** (src/prefect/server/utilities/messaging/memory.py) - Main queue: maxsize=10,000 - Retry queue: maxsize=1,000 - Messages dropped gracefully with warnings when queues are full **Consumer Cleanup** (src/prefect/server/utilities/messaging/memory.py) - Added cleanup() method to properly unsubscribe from topics - Includes debug logging for observability **Service Integration** (4 service files) - EventPersister, ReactiveTriggers, EventLogger, and Actions now call cleanup() on shutdown - Prevents subscription accumulation over time **Test Coverage** (tests/server/utilities/test_messaging.py) - 6 new tests covering bounded queues, cleanup, and memory leak prevention - All 24 tests pass </details> **Benefits:** - Fixes memory leak from orphaned subscriptions - Prevents unbounded growth with queue limits - Observable via warning logs when backpressure occurs - Backward compatible with sensible defaults - Can add configuration later if needed 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
508081a
2 days ago
by desertaxle
+0.11%
Merge branch 'main' into fix-memory-leak-in-messaging-subscriptions
162de5e
2 days ago
by desertaxle
© 2025 CodSpeed Technology
Home Terms Privacy Docs