I need to validate the data in activity before proceeding further, the question is how I can achieve that.
By re-running workflow if data is not valid? Can I do it inside the signal definition? Or is there a better way?
@workflow.run
async def run(self, uuid: str) -> str:
while True:
await workflow.wait_condition(lambda: not (self._data is None), timeout=45)
valid = await workflow.execute_activity(
validate_data,
self._data,
start_to_close_timeout=timedelta(minutes=2)
)
if valid:
#we are good
else
#signal that data is wrong, wait for new data
Yes, you can do it in the signal definition or, as is often clearer, what you have there where you wait for the an attribute like _data to be set by a signal handler. It’s up to you on how to handle invalid data. If you want to fail the workflow, you can raise temporalio.exceptions.ApplicationError("some error").
Yes, however it can be confusing for readers if you put too much async logic inside of signal handlers. It separates the logic quite a bit so if, say, someone comes later and wants to add a try/except to catch all workflow errors and do something, they will miss errors that could occur here (errors out of signal handlers suspend/fail workflows just like if they were thrown out of run).
Often people will do something like you had before with wait_condition and set an attribute, or use an instance attribute of asyncio.Queue to push signals to and get them on the main run method. This keeps logic close together.