Latest Results
fix: restore error handling in parallel execution and fix FileArray error object storage
This commit addresses two critical issues in the error handling system that were
causing failures in parallel execution with file-based storage backends.
## Issue 1: Missing `in_executor` parameter in error handling
The `in_executor` parameter was removed in commit 0d8e419b but was still being
used in the error handling logic. This parameter is crucial for distinguishing
between sequential (main process) and parallel (subprocess) execution contexts.
### Root Cause
- Error handling needs to know execution context to properly wrap exceptions
- In sequential mode: exceptions are raised directly (in_executor=False)
- In parallel mode: exceptions are caught and wrapped as ErrorSnapshot (in_executor=True)
- The parameter was accidentally removed during refactoring
### Fix
Restored the conditional logic in `_run_iteration()`:
```python
in_executor = executor is not None
if error_handling == "raise":
yield from func(*args, **kwargs, _return_mapspec_names=True, _in_executor=in_executor)
```
## Issue 2: FileArray not preserving ErrorSnapshot objects in parallel execution
FileArray storage was returning `np.ma.masked` instead of `PropagatedErrorSnapshot`
objects when errors occurred in parallel execution, causing test failures.
### Root Cause Analysis
1. **Storage Logic**: FileArray uses XOR logic to determine when to dump:
```python
if force_dump or (array.dump_in_subprocess ^ in_post_process):
```
2. **The Problem**:
- FileArray has `dump_in_subprocess=True`
- In subprocess: `True ^ False = True` → dumps correctly
- In main process: `True ^ True = False` → doesn't dump
- PropagatedErrorSnapshot created in main process wasn't being stored
3. **Execution Flow**:
- Sequential: ErrorSnapshot created and stored → PropagatedErrorSnapshot created and stored
- Parallel: ErrorSnapshot created in subprocess → stored correctly
PropagatedErrorSnapshot created in main process → NOT stored due to XOR logic
### Fix
Modified `_update_array()` to always dump error objects regardless of process context:
```python
from pipefunc.exceptions import ErrorSnapshot, PropagatedErrorSnapshot
# Always dump error objects, or follow normal dump rules
is_error_object = isinstance(_output, (ErrorSnapshot, PropagatedErrorSnapshot))
if is_error_object or force_dump or (array.dump_in_subprocess ^ in_post_process):
# ... dump logic ...
```
## Testing and Validation
Created extensive test suite to understand the issue:
- `test_file_array_error.py`: Verified FileArray can store/retrieve ErrorSnapshot
- `test_error_propagation.py`: Confirmed error propagation works with dict storage
- `test_error_propagation_trace.py`: Traced execution flow differences
- `test_debug_storage.py`: Compared parallel vs sequential execution
Key findings:
- Dict storage worked correctly (no serialization involved)
- FileArray failed only in parallel mode when storing PropagatedErrorSnapshot
- The issue was specific to the XOR dumping logic, not serialization
## Impact
This fix ensures:
1. Error handling works correctly in both sequential and parallel execution
2. All storage backends (dict, file_array, zarr) properly preserve error objects
3. Error propagation chain is maintained across pipeline stages
4. Tests in `test_error_handling_storage.py` now pass for all storage backends
## Technical Details
- ErrorSnapshot: Captures exception info with metadata (timestamp, user, machine, IP)
- PropagatedErrorSnapshot: Created when a function receives ErrorSnapshot as input
- FileArray: Uses cloudpickle for serialization, returns np.ma.masked for missing files
- The XOR logic `dump_in_subprocess ^ in_post_process` is preserved for normal data
but bypassed for error objects to ensure they are always persisted Active Branches
#854-1%
#7530%
#8610%
© 2025 CodSpeed Technology