Alert merging workflow using signal-with-start - how to close atomically?

Hello everyone!

I am looking at implementing an alert merging process in temporal. This is how it works :

  • We have alerts and cases, alerts get merged in cases
  • alerts have a “correlation hash” used to determine if they should be merged in the same case or not
  • alerts with the same hash get merged in the same case. if there is no case open with this hash, a new case is created.
  • alerts come from an external system

I thought of using signal-with-start for the merging strategy, with the “correlation hash” present in the workflow_id.

It works well unless my workflow has some remaining work to do when I want to close it.

See the code below for an example (can be run as-is):

import asyncio

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker

@workflow.defn
class CaseWorkflow:
    def __init__(self) -> None:
        self._pending_alerts: asyncio.Queue[str] = asyncio.Queue()
        self._exit = False

    @workflow.run
    async def run(self) -> list[str]:
        # Continually handle from queue or wait for exit to be received
        result: list[str] = []

        # do something that takes time
        # await asyncio.sleep(5)

        # main loop
        while True:
            # Wait for queue item or exit
            await workflow.wait_condition(
                lambda: not self._pending_alerts.empty() or self._exit
            )

            # Drain and process queue
            while not self._pending_alerts.empty():
                result.append(f"Got alert {self._pending_alerts.get_nowait()}")

            # Exit if complete
            if self._exit:
                return result

    @workflow.signal
    async def merge_alert(self, alert: str) -> None:
        await self._pending_alerts.put(alert)


    @workflow.update
    async def close(self) -> None:
        self._exit = True



async def main():
    # Start client
    client = await Client.connect("localhost:7233")

    # Run a worker for the workflow
    async with Worker(
        client,
        task_queue="case-management-task-queue",
        workflows=[CaseWorkflow],
    ):

        # signal-with-start starts a workflow
        await client.start_workflow(
            CaseWorkflow.run,
            id="case_workflow_id_1",
            task_queue="case-management-task-queue",
            start_signal="merge_alert",
            start_signal_args=["alert 1"],
        )

        # this signals because the workflow is already running
        await client.start_workflow(
            CaseWorkflow.run,
            id="case_workflow_id_1",
            task_queue="case-management-task-queue",
            start_signal="merge_alert",
            start_signal_args=["alert 2"],
        )
        
        # update to exit workflow
        handle = client.get_workflow_handle_for(CaseWorkflow.run, "case_workflow_id_1")
        run_id = (await handle.describe()).run_id
        print(f'closing. run_id: {run_id}')
        await handle.execute_update(
            CaseWorkflow.close
        )

        # I wish this started a new run, but if the previous workflow run did not finish, it will just
        await client.start_workflow(
            CaseWorkflow.run,
            id="case_workflow_id_1",
            task_queue="case-management-task-queue",
            start_signal="merge_alert",
            start_signal_args=["alert 3"],
        )



if __name__ == "__main__":
    asyncio.run(main())

I get the following output:

closing. run_id: 3381fd17-f34d-48ba-a661-7c4a10ab8124
closing. run_id: d43a960b-7cc4-449d-9be7-080dcec9b9fb

Alerts 1 & 2 get merged in case, then the case is closed, a new one is created with alert 3.

Now if I uncomment the line # await asyncio.sleep(5), the first workflow run is not done when I call the update to stop it. So the third alert is merged in this workflow (which is not desirable).

I get the following output:

closing. run_id: 358d5b24-7382-4d87-93f8-b022d86da7d8
closing. run_id: 358d5b24-7382-4d87-93f8-b022d86da7d8

Is this solvable with temporal features? or should I manage state separately (maybe using search attributes?)

Thank you!

I would pass the remaining alerts in the queue to the next workflow using continue-as-new.

1 Like