Error propagation with Activity triggered by Signal

Hi team,
I have a main workflow where the @workflow.run function executes Activity A and await for its completion. For error handling, ActivityError would be captured and raised which will fail the workflow as expected.

I also have a @workflow.signal function that could be triggered and starts Activity B (this signal does not wait for the Activity B completion). I expect the failure of Activity B also fails the workflow and blocks Activity A in the main function being continued, since those activities share the same task queue.

The problem I have is that when Activity B fails, neither the @workflow.run function or the @workflow.signal function could catch its error and fail the workflow, instead, Activity A continue to execute. Is there a way to get to my desired result?

You don’t have to capture, the workflow will fail on its own if the activity error is bubbled out.

Can you show how you’re calling this? If you don’t wait on completion, you can’t have the exception upon completion bubble out. You should consider having a signal set something in the class and the primary workflow function wait_condition until that’s available, and run the activity in the primary workflow function, and let the failure raise out of the workflow function so it fails the workflow.

Thank you Chad.

Here is my use case for the signaling: I’m trying to use it to achieve the ability of pausing and resuming an already running workflow, but noticing since I don’t wait for the pausing Activity, its failure will not propagate to the workflow.
There are three endpoints in the application side and here is the high level logic:

@router.post("/runWorkflow")
async def run_workflow(data):
    pass

@router.post("/pauseWorkflow")
async def pause_workflow(data):
    workflow_handle = get_workflow_handle_from_workflow_id(data)
    await workflow_handle.signal(MyWorkflow.pause, args=[...])
    # this endpoint returns "success" code as soon as the signaling is done but doesn't wait for the whole length of the pause time
    # but if the Activity for pausing logic fails, instead of continuing with the main Workflow logic I would expect the workflow execution to fail

@router.post("resumeWorkflow")
async def resume_workflow(data):
    workflow_handle = get_workflow_handle_from_workflow_id(data)
    await workflow_handle.signal(MyWorkflow.resume, args=[...])

Note that the “pause” and “resume” can be called at any time during the main workflow logic.

And here is the general structure of my Workflow and Activity classes:

@workflow.defn
class MyWorkflow:
    def __init__(self):
        self.activity_handle_pause = None

    @workflow.run
    async def run(self, data):
        # main workflow logic
        # since there is uncertainty of when the pause/resume happens, it's hard to add "wait_condition" in the main logic

    @workflow.signal
    async def pause(self, data):
        try:
            self.activity_handle_pause = workflow.start_activity_method(MyActivity.pause, ...)
        except ...
        # not sure if there's any thing to catch here

    @workflow.signal
    async def resume(self, data):
        self.activity_handle_pause.cancel()
        try:
            await self.activity_handle_pause
        except CancelledError:
            self.activity_handle_pause = None
        # not sure if there's any thing else to catch


class MyActivity:
    @activity.defn
    async def pause(self, data):
        try:
            # some business logic
            while True:
                await asyncio.sleep(1)
                activity.heartbeat("paused")
        except asyncio.CancelledError:
            raise

Do you have recommendation of how I can achieve my goal with the error handling in this case?

You don’t need an activity to “pause” workflow. Timers are much simpler option.

Pausing your signal handlers will not pause the main workflow method, so the sample will not achieve what you want.

Is Timer a Temporal feature? Are there any sample for the use of Timer?
The current way was to have the signaled activity takes up the queue so the main workflow activities will not proceed.

asyncio.sleep allows blocking workflows for any amount of time.