Hello everyone!
I am looking at implementing an alert merging process in temporal. This is how it works :
- We have alerts and cases, alerts get merged in cases
 - alerts have a “correlation hash” used to determine if they should be merged in the same case or not
 - alerts with the same hash get merged in the same case. if there is no case open with this hash, a new case is created.
 - alerts come from an external system
 
I thought of using signal-with-start for the merging strategy, with the “correlation hash” present in the workflow_id.
It works well unless my workflow has some remaining work to do when I want to close it.
See the code below for an example (can be run as-is):
import asyncio
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
@workflow.defn
class CaseWorkflow:
    def __init__(self) -> None:
        self._pending_alerts: asyncio.Queue[str] = asyncio.Queue()
        self._exit = False
    @workflow.run
    async def run(self) -> list[str]:
        # Continually handle from queue or wait for exit to be received
        result: list[str] = []
        # do something that takes time
        # await asyncio.sleep(5)
        # main loop
        while True:
            # Wait for queue item or exit
            await workflow.wait_condition(
                lambda: not self._pending_alerts.empty() or self._exit
            )
            # Drain and process queue
            while not self._pending_alerts.empty():
                result.append(f"Got alert {self._pending_alerts.get_nowait()}")
            # Exit if complete
            if self._exit:
                return result
    @workflow.signal
    async def merge_alert(self, alert: str) -> None:
        await self._pending_alerts.put(alert)
    @workflow.update
    async def close(self) -> None:
        self._exit = True
async def main():
    # Start client
    client = await Client.connect("localhost:7233")
    # Run a worker for the workflow
    async with Worker(
        client,
        task_queue="case-management-task-queue",
        workflows=[CaseWorkflow],
    ):
        # signal-with-start starts a workflow
        await client.start_workflow(
            CaseWorkflow.run,
            id="case_workflow_id_1",
            task_queue="case-management-task-queue",
            start_signal="merge_alert",
            start_signal_args=["alert 1"],
        )
        # this signals because the workflow is already running
        await client.start_workflow(
            CaseWorkflow.run,
            id="case_workflow_id_1",
            task_queue="case-management-task-queue",
            start_signal="merge_alert",
            start_signal_args=["alert 2"],
        )
        
        # update to exit workflow
        handle = client.get_workflow_handle_for(CaseWorkflow.run, "case_workflow_id_1")
        run_id = (await handle.describe()).run_id
        print(f'closing. run_id: {run_id}')
        await handle.execute_update(
            CaseWorkflow.close
        )
        # I wish this started a new run, but if the previous workflow run did not finish, it will just
        await client.start_workflow(
            CaseWorkflow.run,
            id="case_workflow_id_1",
            task_queue="case-management-task-queue",
            start_signal="merge_alert",
            start_signal_args=["alert 3"],
        )
if __name__ == "__main__":
    asyncio.run(main())
I get the following output:
closing. run_id: 3381fd17-f34d-48ba-a661-7c4a10ab8124
closing. run_id: d43a960b-7cc4-449d-9be7-080dcec9b9fb
Alerts 1 & 2 get merged in case, then the case is closed, a new one is created with alert 3.
Now if I uncomment the line # await asyncio.sleep(5), the first workflow run is not done when I call the update to stop it. So the third alert is merged in this workflow (which is not desirable).
I get the following output:
closing. run_id: 358d5b24-7382-4d87-93f8-b022d86da7d8
closing. run_id: 358d5b24-7382-4d87-93f8-b022d86da7d8
Is this solvable with temporal features? or should I manage state separately (maybe using search attributes?)
Thank you!