Add workflow cancellation support for Redis queue mode#321
Add workflow cancellation support for Redis queue mode#321t0mdavid-m wants to merge 4 commits intomainfrom
Conversation
Check for job cancellation before each command execution in CommandExecutor. This allows workflows running in Redis queue mode to stop at the next command boundary when the user presses Stop, rather than requiring multiple presses. https://claude.ai/code/session_013VFNJ1Sznugpony3r2Mgxt
📝 WalkthroughWalkthroughThe Changes
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/workflow/CommandExecutor.py`:
- Around line 112-116: run_multiple_commands() currently swallows exceptions
raised by run_command() running in threads, so WorkflowCancelled may be ignored;
modify run_multiple_commands() to collect exceptions from each thread (e.g.,
append exception objects to a shared list or set via a thread-safe structure)
and also store per-task success flags, then after joining all threads check the
collected exceptions and if any contain a WorkflowCancelled instance re-raise
that exception (or raise the first WorkflowCancelled) before returning; ensure
run_command(), run_multiple_commands(), and any usage of _check_cancellation()
reflect that exceptions propagate out of the parallel join logic rather than
being masked by all(results).
src/workflow/CommandExecutor.py
Outdated
| WorkflowCancelled: If the workflow was cancelled by the user. | ||
| Exception: If the command execution results in any errors. | ||
| """ | ||
| # Check for cancellation before starting the command | ||
| self._check_cancellation() |
There was a problem hiding this comment.
Propagate WorkflowCancelled out of parallel execution
When run_command() raises inside run_multiple_commands() threads, the exception is swallowed and all(results) can still return True, so cancellations can be ignored. Consider capturing thread exceptions and re-raising after joins.
Suggested fix
- results = []
+ results = []
+ exceptions = []
lock = threading.Lock()
def run_and_track(cmd):
- success = self.run_command(cmd)
- with lock:
- results.append(success)
+ try:
+ success = self.run_command(cmd)
+ with lock:
+ results.append(success)
+ except Exception as e:
+ with lock:
+ exceptions.append(e)
@@
for thread in threads:
thread.join()
+ if exceptions:
+ for e in exceptions:
+ if isinstance(e, WorkflowCancelled):
+ raise e
+ raise exceptions[0]
+
- return all(results)
+ return len(results) == len(commands) and all(results)🤖 Prompt for AI Agents
In `@src/workflow/CommandExecutor.py` around lines 112 - 116,
run_multiple_commands() currently swallows exceptions raised by run_command()
running in threads, so WorkflowCancelled may be ignored; modify
run_multiple_commands() to collect exceptions from each thread (e.g., append
exception objects to a shared list or set via a thread-safe structure) and also
store per-task success flags, then after joining all threads check the collected
exceptions and if any contain a WorkflowCancelled instance re-raise that
exception (or raise the first WorkflowCancelled) before returning; ensure
run_command(), run_multiple_commands(), and any usage of _check_cancellation()
reflect that exceptions propagate out of the parallel join logic rather than
being masked by all(results).
This reverts commit 43800b8.
Don't clear the .job_id file immediately after calling cancel_job(). This allows get_workflow_status() to see the "canceled" state and correctly report the workflow as stopped. Previously, clearing the file caused a fallback to PID checking, which showed "running" if commands were still executing. https://claude.ai/code/session_013VFNJ1Sznugpony3r2Mgxt
This reverts commit d4effba.
Summary
This PR adds support for cancelling workflows that are running in Redis queue mode. Users can now stop long-running workflows between command executions, with proper cleanup and status reporting.
Key Changes
WorkflowCancelledexception: Custom exception to signal user-initiated workflow cancellationset_cancellation_check()method toCommandExecutorthat accepts a callable to check if a workflow should stop_check_cancellation()is called before each command execution to allow stopping workflows between commandsexecute_workflow(), set up a cancellation check that monitors the Redis job'sis_stoppedstatusWorkflowCancelledthat:cancelled: TrueflagImplementation Details
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.