This problem is isolated to only one of our namespaces. We’re experiencing deadline exceeded exceptions in one of our workers, but the stacktrace doesn’t tie back to any of our own code, nor is there a clear indication of what area of the SDK is initiating this request.
I’ve included as much context as I thought might be helpful, but do let me know if you’d like us to hunt anything else down.
Server: 1.15.2 prod, 1.16.1 staging
Java SDK: 1.10.0
2022-05-04 11:34:03.616 INFO 64260 --- [nPool-worker-19] i.t.i.r.GrpcAsyncRetryer : Retrying after failure
io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 9.999851250s. [remote_addr=PLACEHOLDER]
at io.grpc.Status.asRuntimeException(Status.java:535)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.temporal.serviceclient.GrpcMetricsInterceptor$MetricsClientCall$1.onClose(GrpcMetricsInterceptor.java:123)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:553)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:68)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:739)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:718)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Context
We know that the Temporal server is fine, because other namespaces are performing great.
This is isolated to a newer namespace that just fully rolled out their first use case (~6,000 workflows, each one calling an activity that throws a retryable ApplicationFailure
with a retry policy of 30s). We have tried a retry policy of 60s as well, speculating that 30s is too aggressive of a retry interval, but it did not make any material impact on this error.
We’re also observing this across two Temporal cluster environments (staging and prod), so I’m assuming that it’s related to our implementation or the configuration of the worker and/or namespace itself. The namespace is configured the same between the two environments.
We don’t see any interesting / related errors on the Temporal server side, nor do we see any metrics that look abnormal from the server side.
From a dynamic config perspective, we have this task queue’s read & write partitions set to 7. I also made a speculative change of frontend.namespaceCount
for this namespace to 3000
, but didn’t see any change in behavior (I also don’t see any rate limits that would really make me believe this was a resolution).
From the workers’ perspective, the error isn’t isolated to a single worker process (although it is unevenly distributed across those workers). No worker has a shortfall of available threads and all latencies (including schedule to start) look great. The only “weird” metric we see is a lot of active workflow threads, but we don’t have a server-side polling implementation elsewhere in our ecosystem to compare to, so this may be completely expected.
Example activity exception
2022-05-04 11:35:03.950 WARN 64260 --- [ed-delivery": 6] i.t.i.a.POJOActivityTaskHandler : Activity failure. ActivityId=150b09a5-4a33-3575-bf6f-2763e2762d2f, activityType=ActuatorActivities-MonitorResource, attempt=3
io.temporal.failure.ApplicationFailure: message='continuation', type='expected', nonRetryable=false
at io.temporal.failure.ApplicationFailure.newFailureWithCause(ApplicationFailure.java:92)
at io.temporal.failure.ApplicationFailure.newFailure(ApplicationFailure.java:72)
at com.netflix.spinnaker.keel.scheduling.activities.DefaultActuatorActivities.monitorResource(DefaultActuatorActivities.kt:59)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityInboundCallsInterceptor.execute(POJOActivityTaskHandler.java:214)
at io.temporal.internal.activity.POJOActivityTaskHandler$POJOActivityImplementation.execute(POJOActivityTaskHandler.java:180)
at io.temporal.internal.activity.POJOActivityTaskHandler.handle(POJOActivityTaskHandler.java:120)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:204)
at io.temporal.internal.worker.ActivityWorker$TaskHandlerImpl.handle(ActivityWorker.java:164)
at io.temporal.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:93)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Workflow & activity impl
I’m not sure if seeing our implementations would help any, but here they are:
@WorkflowInterface
interface ResourceScheduler {
@WorkflowMethod
fun schedule(request: ScheduleResourceRequest)
@SignalMethod
fun checkNow()
data class ScheduleResourceRequest(
val resourceId: String,
val resourceKind: String
)
}
class ResourceSchedulerImpl : ResourceScheduler {
private val configActivites = SchedulingConfigActivities.get()
private val actuatorActivities = ActuatorActivities.get()
private var checkNow = false
override fun schedule(request: ResourceScheduler.ScheduleResourceRequest) {
var interval = configActivites.getResourceKindCheckInterval(CheckResourceKindRequest(request.resourceKind))
val minInterval = Duration.ofSeconds(30)
if (minInterval > interval) {
Workflow.getLogger(javaClass).warn("Configured interval for ${request.resourceKind} ($interval) is less than minimum $minInterval: Using minimum")
interval = minInterval
}
val pollerActivity = Workflow.newActivityStub(
ActuatorActivities::class.java,
ActivityOptions.newBuilder()
.setTaskQueue(Workflow.getInfo().taskQueue)
.setStartToCloseTimeout(Duration.ofMinutes(5))
.setRetryOptions(
RetryOptions.newBuilder()
.setBackoffCoefficient(1.0)
.setInitialInterval(interval)
.setMaximumInterval(interval)
.build()
)
.build()
)
lateinit var promise: Promise<Void>
val scope = Workflow.newCancellationScope(
Runnable {
promise = Async.procedure {
pollerActivity.monitorResource(
ActuatorActivities.MonitorResourceRequest(
request.resourceId,
request.resourceKind
)
)
}
}
)
scope.run()
Workflow.await { checkNow }
scope.cancel()
try {
promise.get()
} catch (e: ActivityFailure) {
if (e.cause is CanceledFailure && checkNow) {
Workflow.getLogger(javaClass.simpleName).info(
"Received checkNow signal for ${request.resourceKind}/${request.resourceId}: " +
"Aborted poller and will continueAsNew after immediate check"
)
actuatorActivities.checkResource(CheckResourceRequest(request.resourceId))
Workflow.continueAsNew(request)
}
}
}
override fun checkNow() {
checkNow = true
}
}
@Component
class DefaultActuatorActivities(
private val keelRepository: KeelRepository,
private val resourceActuator: ResourceActuator,
private val publisher: ApplicationEventPublisher,
private val clock: Clock,
private val spectator: Registry
) : ActuatorActivities {
override fun checkResource(request: ActuatorActivities.CheckResourceRequest) {
// ...
}
override fun monitorResource(request: ActuatorActivities.MonitorResourceRequest) {
checkResource(request.toCheckResourceRequest())
throw ApplicationFailure.newFailure("continuation", "expected")
}
}