Hello all,
workflows are panicing with the message
Potential deadlock detected: workflow goroutine “root” didn’t yield for over a second
stack trace
TaskQueue: "task_progress_q"
WorkerID: "7@7f1b9a937bfa@"
StackTrace: "process event for task_progress_q [panic]:
go.temporal.io/sdk/internal.(*coroutineState).call(0xc0063b8320)
go.temporal.io/sdk@v1.6.0/internal/internal_workflow.go:881 +0x217
go.temporal.io/sdk/internal.(*dispatcherImpl).ExecuteUntilAllBlocked(0xc0063b82d0, 0x0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_workflow.go:966 +0x2f2
go.temporal.io/sdk/internal.executeDispatcher(0x35d2ea0, 0xc00504e240, 0x35ed620, 0xc0063b82d0)
go.temporal.io/sdk@v1.6.0/internal/internal_workflow.go:569 +0xb3
go.temporal.io/sdk/internal.(*syncWorkflowDefinition).OnWorkflowTaskStarted(0xc00504e200)
go.temporal.io/sdk@v1.6.0/internal/internal_workflow.go:542 +0x4a
go.temporal.io/sdk/internal.(*workflowExecutionEventHandlerImpl).ProcessEvent(0xc0089e1c60, 0xc00267f2c0, 0x875d790100, 0x0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_event_handlers.go:795 +0x3c4
go.temporal.io/sdk/internal.(*workflowExecutionContextImpl).ProcessWorkflowTask(0xc0081a78c0, 0xc0047c8ff0, 0x4ac0ce0, 0xc0058bd9d0, 0xc0081a78c0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_handlers.go:876 +0x73c
go.temporal.io/sdk/internal.(*workflowTaskHandlerImpl).ProcessWorkflowTask(0xc000d40580, 0xc0047c8ff0, 0xc0049d0780, 0x0, 0x0, 0x0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_handlers.go:727 +0x739
go.temporal.io/sdk/internal.(*workflowTaskPoller).processWorkflowTask(0xc000745930, 0xc0047c8ff0, 0x0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_pollers.go:288 +0x4ae
go.temporal.io/sdk/internal.(*workflowTaskPoller).ProcessTask(0xc000745930, 0x2980340, 0xc0047c8ff0, 0x1, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_pollers.go:259 +0x85
go.temporal.io/sdk/internal.(*baseWorker).processTask(0xc0012c83c0, 0x297ff00, 0xc004072150)
go.temporal.io/sdk@v1.6.0/internal/internal_worker_base.go:342 +0xba
created by go.temporal.io/sdk/internal.(*baseWorker).runTaskDispatcher
go.temporal.io/sdk@v1.6.0/internal/internal_worker_base.go:269 +0xff"
Namespace: "default"
RunID: "9bd60fd8-5d9c-4e91-8a5d-5a51d4e26b1e"
context: {3}
stack_trace: "Workflow panic: Potential deadlock detected: workflow goroutine "root" didn't yield for over a second
goroutine 28366 [running]:
getmega.com/pkg/temporal.(*zapWrapper).Error(0xc00000eb38, 0x3045581, 0xe, 0xc005eba6c0, 0x12, 0x12)
getmega.com/pkg@v0.0.0/temporal/logger.go:37 +0x105
go.temporal.io/sdk/log.(*withLogger).Error(0xc0008f2630, 0x3045581, 0xe, 0xc0087cfd40, 0xc, 0xc)
go.temporal.io/sdk@v1.6.0/log/with_logger.go:71 +0xf0
go.temporal.io/sdk/internal.(*workflowExecutionContextImpl).applyWorkflowPanicPolicy(0xc0081a78c0, 0xc0047c8ff0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_handlers.go:947 +0x3a9
go.temporal.io/sdk/internal.(*workflowExecutionContextImpl).ProcessWorkflowTask(0xc0081a78c0, 0xc0047c8ff0, 0x4ac0ce0, 0xc0058bd9d0, 0xc0081a78c0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_handlers.go:925 +0x3ff
go.temporal.io/sdk/internal.(*workflowTaskHandlerImpl).ProcessWorkflowTask(0xc000d40580, 0xc0047c8ff0, 0xc0049d0780, 0x0, 0x0, 0x0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_handlers.go:727 +0x739
go.temporal.io/sdk/internal.(*workflowTaskPoller).processWorkflowTask(0xc000745930, 0xc0047c8ff0, 0x0, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_pollers.go:288 +0x4ae
go.temporal.io/sdk/internal.(*workflowTaskPoller).ProcessTask(0xc000745930, 0x2980340, 0xc0047c8ff0, 0x1, 0x0)
go.temporal.io/sdk@v1.6.0/internal/internal_task_pollers.go:259 +0x85
go.temporal.io/sdk/internal.(*baseWorker).processTask(0xc0012c83c0, 0x297ff00, 0xc004072150)
go.temporal.io/sdk@v1.6.0/internal/internal_worker_base.go:342 +0xba
created by go.temporal.io/sdk/internal.(*baseWorker).runTaskDispatcher
go.temporal.io/sdk@v1.6.0/internal/internal_worker_base.go:269 +0xff
the underlying workflows are long-running for eg like 50 days
workflow code :
func Workflow(ctx workflow.Context, pTask *models.PlayerTask) (models.TaskState, error) {
logger := workflow.GetLogger(ctx)
actx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: ActivityStartToCloseTimeout,
RetryPolicy: defaultRetryPolicy,
})
if pTask.State.IsVisible() {
//sleep till active
if err := workflow.Sleep(ctx, pTask.ActiveTime.Sub(workflow.Now(ctx))); err != nil {
return "", errors.WithStack(err)
}
pTask.State = models.TaskStateActive
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).CustomActivityDBUpdate0, pTask).Get(ctx, nil); err != nil {
return "", errors.WithStack(err)
}
}
// updating current tracking metrics for filtering the task for datapoint creation
currentTracking = some logic
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).CustomActivityDBUpdate1, pId, tId,currentTracking).Get(ctx, nil); err != nil {
return "", errors.WithStack(err)
}
if pTask.State.IsActive() {
// selector, similar to select in go
s := workflow.NewSelector(ctx)
// channel on which task is listening for the progress update
signalChan := workflow.GetSignalChannel(ctx, signalChannel)
var progress *models.UpdateHolder
// infinite for loop which will break only when task is expired or completed
for {
// listener on the progress update channel
s.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &progress)
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).CustomActivityDBUpdate2, pId, tId).Get(ctx, &pTask); err != nil {
return
}
if pTask == nil {
return
}
tMetric := pTask.SingleActionProps.TrackingMetric
prgrs.UpdateProgress(progress, tMetric,pTask)
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).CustomActivityDBUpdate3, pTask, prgrs).Get(ctx, nil); err != nil {
return
}
if pTask.State.IsComplete() {
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).CustomActivityDBUpdate4, pTask).Get(ctx, nil); err != nil {
return
}
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).CustomActivityDBUpdate2, pId, tId).Get(ctx, &pTask); err != nil {
return
}
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).CustomActivityDBUpdate0, pTask).Get(ctx, nil); err != nil {
return
}
}
}).AddFuture(workflow.NewTimer(ctx, pTask.ExpireTime.Sub(workflow.Now(ctx))), func(f workflow.Future) {
pTask.State = models.TaskStateExpired
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).CustomActivityDBUpdate0, pTask).Get(ctx, nil); err != nil {
return
}
})
// blocked until a update signal received or task expired
s.Select(ctx)
if pTask.State.IsComplete() || pTask.State.IsExpired() {
break
}
}
}
return pTask.State, nil
}
logic:
workflow is progressing a task on a particular user on his activities … progresses are recording using signals and if the provided time exceeded then the task will expire
the underlying workflows are still running as expected for the user tasks … but with logging this error occasionally
what could be the reason behind this?
version 1.8.2
thanks in advance,
Junaid