Hello everyone!
I am looking at implementing an alert merging process in temporal. This is how it works :
- We have alerts and cases, alerts get merged in cases
- alerts have a “correlation hash” used to determine if they should be merged in the same case or not
- alerts with the same hash get merged in the same case. if there is no case open with this hash, a new case is created.
- alerts come from an external system
I thought of using signal-with-start for the merging strategy, with the “correlation hash” present in the workflow_id.
It works well unless my workflow has some remaining work to do when I want to close it.
See the code below for an example (can be run as-is):
import asyncio
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
@workflow.defn
class CaseWorkflow:
def __init__(self) -> None:
self._pending_alerts: asyncio.Queue[str] = asyncio.Queue()
self._exit = False
@workflow.run
async def run(self) -> list[str]:
# Continually handle from queue or wait for exit to be received
result: list[str] = []
# do something that takes time
# await asyncio.sleep(5)
# main loop
while True:
# Wait for queue item or exit
await workflow.wait_condition(
lambda: not self._pending_alerts.empty() or self._exit
)
# Drain and process queue
while not self._pending_alerts.empty():
result.append(f"Got alert {self._pending_alerts.get_nowait()}")
# Exit if complete
if self._exit:
return result
@workflow.signal
async def merge_alert(self, alert: str) -> None:
await self._pending_alerts.put(alert)
@workflow.update
async def close(self) -> None:
self._exit = True
async def main():
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="case-management-task-queue",
workflows=[CaseWorkflow],
):
# signal-with-start starts a workflow
await client.start_workflow(
CaseWorkflow.run,
id="case_workflow_id_1",
task_queue="case-management-task-queue",
start_signal="merge_alert",
start_signal_args=["alert 1"],
)
# this signals because the workflow is already running
await client.start_workflow(
CaseWorkflow.run,
id="case_workflow_id_1",
task_queue="case-management-task-queue",
start_signal="merge_alert",
start_signal_args=["alert 2"],
)
# update to exit workflow
handle = client.get_workflow_handle_for(CaseWorkflow.run, "case_workflow_id_1")
run_id = (await handle.describe()).run_id
print(f'closing. run_id: {run_id}')
await handle.execute_update(
CaseWorkflow.close
)
# I wish this started a new run, but if the previous workflow run did not finish, it will just
await client.start_workflow(
CaseWorkflow.run,
id="case_workflow_id_1",
task_queue="case-management-task-queue",
start_signal="merge_alert",
start_signal_args=["alert 3"],
)
if __name__ == "__main__":
asyncio.run(main())
I get the following output:
closing. run_id: 3381fd17-f34d-48ba-a661-7c4a10ab8124
closing. run_id: d43a960b-7cc4-449d-9be7-080dcec9b9fb
Alerts 1 & 2 get merged in case, then the case is closed, a new one is created with alert 3.
Now if I uncomment the line # await asyncio.sleep(5)
, the first workflow run is not done when I call the update to stop it. So the third alert is merged in this workflow (which is not desirable).
I get the following output:
closing. run_id: 358d5b24-7382-4d87-93f8-b022d86da7d8
closing. run_id: 358d5b24-7382-4d87-93f8-b022d86da7d8
Is this solvable with temporal features? or should I manage state separately (maybe using search attributes?)
Thank you!