Worker.suspendPolling not working?

Hi, I am having a problem with an activity that I want to make sure is executed only once on a given worker. I placed a Worker.suspendPolling() call at the beginning of the activity function, however this seems to not prevent the method from being invoked again on the same worker.

Has anyone encountered anything like this?

1 Like

Can you give more info on your user case please? Are you asking how to limit activity retries?
Suspending worker should stop it from polling workflow / activity tasks, so if you can share maybe more info and can show some code as well think that would help.

The use case is as follows.

I have a workflow that needs to execute a sequence of activities on the same worker (the activities share state in memory). I accomplish that by having the first activity in this chain report a host specific task queue (inspired by this: samples-java/src/main/java/io/temporal/samples/fileprocessing at main · temporalio/samples-java · GitHub). Subsequent activities are then sent via this queue so no other worker grabs them. The first activity calls Worker.suspendPolling in order to make sure it doesn’t take any additional requests (this is because in my application a worker can only process one such sequence of activities and must then be killed and restarted). Unfortunately, what actually ends up happening is that this worker does get other tasks, leading me to suspect that Worker.suspendPolling is actually not working as expected.

leading me to suspect that Worker.suspendPolling is actually not working as expected.

Think its by design, as in suspendPolling suspends any new worker poll requests, but currently outstanding ones could still return workflow/activity tasks after its called (see comment in code here). Long-poll timeout is 60s.

this is because in my application a worker can only process one such sequence of activities and must then be killed and restarted

Can you give more info on this part please?

1 Like

What does “long polls” mean in this context? What causes the creation of a new worker poll request vs continuing to use the initial one?

To add some color, we have a pretty large legacy application that we have moved to run under Temporal on AKS. For performance reasons, we want the app to start up and do much of its initialization before a specific piece of data that it needs is ready. When that piece is ready, we send a CONTINUE signal to the workflow, which then invokes a CONTINUE activity on the worker which unblocks the app to proceed. The app has a lot of state, so it cannot process the next run without being restarted.

Note also that we actually have two instances of io.temporal.worker.Worker at play in our activity worker process. The first listens on a fixed primary task queue, and the only activity it services is GET_HOST_SPECIFIC_QUEUE to report the host specific queue (and then suspend polling so that is only done once). The second Worker instance services that host-specific queue with two activities: START and CONTINUE.

I wouldn’t use suspend but recreate a worker. You can decide if you need a clean worker shutdown that can take a long time (up to 60 seconds due to long poll) or ask it to shut down as fast as possible, which should be faster.

1 Like

Hi Maxim, what does “recreate worker” mean? Do you mean io.temporal.worker.Worker or the process?
Note that I have two instances of io.temporal.worker.Worker - the first listening on a generically named task queue, the second on a unique queue for this process. I want the second one to keep running as it will be receiving and handling additional activities. The first one should not receive any more requests – that is why I want to suspend polling on it. I cannot restart the process.

You want two factories in this case as only the factory has shutdown methods. So you create a worker with its own factory then shutdown and start a new one when needed. All factories should share the same WorkflowClient to avoid excessive resource usage.

Yes, I am creating a separate factory for each of the workers. So sounds like you are suggesting calling WorkerFactory.shutdown instead of (or maybe in addition to?) Worker.suspendPolling. I will give that a try. Thanks!

Thanks, this solved the issue.