Commits
Click on a commit to change the comparison rangeFix memory leak in in-memory messaging system
closes #18605
This PR fixes a memory leak in the Prefect server when running without Redis for messaging.
<details>
<summary>Root Causes</summary>
1. **Orphaned Subscriptions**: Event services (EventPersister, ReactiveTriggers, EventLogger, Actions) created consumers with subscriptions but never cleaned them up on shutdown, leaving subscriptions and their unbounded queues in memory.
2. **Unbounded Queues**: Each subscription had two `asyncio.Queue` objects with no maxsize, allowing unlimited memory growth.
3. **No Cleanup Mechanism**: Consumers had no way to unsubscribe from topics when services stopped.
</details>
<details>
<summary>Changes Made</summary>
**Bounded Queues** (src/prefect/server/utilities/messaging/memory.py)
- Main queue: maxsize=10,000
- Retry queue: maxsize=1,000
- Messages dropped gracefully with warnings when queues are full
**Consumer Cleanup** (src/prefect/server/utilities/messaging/memory.py)
- Added cleanup() method to properly unsubscribe from topics
- Includes debug logging for observability
**Service Integration** (4 service files)
- EventPersister, ReactiveTriggers, EventLogger, and Actions now call cleanup() on shutdown
- Prevents subscription accumulation over time
**Test Coverage** (tests/server/utilities/test_messaging.py)
- 6 new tests covering bounded queues, cleanup, and memory leak prevention
- All 24 tests pass
</details>
**Benefits:**
- Fixes memory leak from orphaned subscriptions
- Prevents unbounded growth with queue limits
- Observable via warning logs when backpressure occurs
- Backward compatible with sensible defaults
- Can add configuration later if needed
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com> Merge branch 'main' into fix-memory-leak-in-messaging-subscriptions