Hi,
We are seeing an issue where we use a selector for our processing
for {
// listener on the progress update channel, mutiple progress needed for task completion, so
s.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &progress)
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).UpdateTaskProgress, pTask,progress).Get(ctx, nil); err != nil {
return
}
// wait for the task to expire
}).AddFuture(workflow.NewTimer(ctx, pTask.ExpireTime.Sub(workflow.Now(ctx))), func(f workflow.Future) {
pTask.State = models.TaskStateExpired
if err := workflow.ExecuteActivity(actx, new(progress_updater.Service).UpdateTaskState, pTask).Get(ctx, nil); err != nil {
return
}
})
we have TimerFire at 12:AM ( pTask.ExpireTime) … so ideally it should be processed exactly at 12AM (all workflow do timeFire) … but we are seeing a staggering trend in task pickup in workers across workflows where some of them complete at 12AM but some of them delaying for around 5 to 50 min.
distribution for ref:
example history for a delayed workflow ( it took around 41 min extra to complete the desired … )
79 TimerFired Sep 3rd 12:00:02 am
80 WorkflowTaskScheduled Sep 3rd 12:00:02 am
81 WorkflowTaskTimedOut Sep 3rd 12:00:07 am (ScheduleToStart)
82 WorkflowTaskScheduled Sep 3rd 12:00:07 am
83 WorkflowTaskStarted Sep 3rd 12:16:19 am
84 WorkflowTaskCompleted Sep 3rd 12:16:19 am
85 ActivityTaskScheduled Sep 3rd 12:16:19 am
86 ActivityTaskStarted Sep 3rd 12:28:13 am
87 ActivityTaskCompleted Sep 3rd 12:28:13 am
88 WorkflowTaskScheduled Sep 3rd 12:28:13 am
89 WorkflowTaskTimedOut Sep 3rd 12:28:18 am (ScheduleToStart)
90 WorkflowTaskScheduled Sep 3rd 12:28:18 am
91 WorkflowTaskStarted Sep 3rd 12:41:06 am
all activity uses default workflow TaskQueue and we have the timeFire at 12AM for all workflows
-
what could be the reason for it?.
-
should we create a separate TaskQueue for an activity for each workflow ( assuming that queue is flooding and because of the scheduleToStart time out happening ). ?
-
preferred worker config to improve this?
we tried default, and currently, we have
options := worker.Options{
MaxConcurrentActivityExecutionSize: 500,
WorkerActivitiesPerSecond: 1000,
WorkerLocalActivitiesPerSecond: 1000,
TaskQueueActivitiesPerSecond: 1000,
MaxConcurrentWorkflowTaskExecutionSize: 500,
MaxConcurrentActivityTaskPollers: 4,
}
we have no of workflows in order 5k.
please help with your valuable feedbacks.
Thanks in advance,
Junaid