Context:
The workflow I’m working on involves the creation of multiple copies of the same resource. Once a resource is created, polling is needed to wait for the readiness of the resource before performing the next action. The polling can take a long time, like an hour or so.
Currently I’m composing the workflow in such a way
# Create resource, say total 5 copies
# resource creation does not take lots of time
# so it is ok to create them one by one
resources = []
for i in range(5):
rsc_handle = await workflow.execute_activity(create_resource, payload,
start_to_close_timeout=timedelta(seconds=10))
resources.append(rsc_handle)
# start polling for each resource
# ideally, the next action should be performed on a resource
# as soon as it becomes ready
for rsc_handle in resources:
# polling for a resource
await workflow.execute_activity(polling, rsc_handle,
start_to_close_timeout=timedelta(seconds=5),
retry_policy=RetryPolicy(
backoff_coefficient=1.5,
maximum_attempts=10,
initial_interval=timedelta(minutes=10),
), )
# once the target is ready, proceed with the next set of actions...
await workflow.execute_activity(next_action,
start_to_close_timeout=timedelta(seconds=10))
Problems
I’m relying on the temporal engine to retry the periodic polling for me. That is, in the activity which performs polling, if the resource is found as not ready, an exception is simply thrown so that temporal will later retry this polling activity automatically
In this case, I observe that the temporal engine won’t start to poll for the second resource until the first resource is ready, so on and so forth.
The time of polling for a resource varies greatly. It can take as short as 30 minutes, and it can also take as long as about 2 hours. In the unfortunate scenario, the very first resource to poll for could be the last one to become ready. I want to proceed to the next step on a resource as soon as it becomes ready. Resource that is the slowest to become ready should not block other resources from proceeding to the next action.
I could rewrite the logic of polling activity from
if resource is found to be not ready:
# leave it to temporal for scheduling the next polling
raise NotReady()
to
for _ in range(50):
if not ready:
asyncio.sleep(...)
but this way I have to implement the retry logics by myself, which I don’t want (I want to leave it to temporal. Also, I have to change the code which launches the polling from
for rsc_handle in resources:
# polling for a resource
await workflow.execute_activity(polling, rsc_handle, ...)
to
for rsc_handle in resources:
# polling for a resource
asyncio.create_task(workflow.execute_activity(polling, rsc_handle, ...))
Please advice what is the best practices for handling my situation. Thank you