Issue with cancellation and activity running multiple times

Hey, I am Junaid,

I am playing around with temporal for some time, I am facing an issue. it would be really helpful if some could help with this.

while creating a workflow that has a blocking activity (running in an infinite loop with context cancel exit ), that activity doing some logic internally in the loop with the timer (in seconds, so I am not using cronjobs). I am experiencing multiple calls to the activity from the workflow and not able to cancel the activity.

workflow execution code provided below.

 wfopts := client.StartWorkflowOptions{
	TaskQueue:             temporalTaskQueue,
	ID:                    "CreateMatches_" + tournament.Id,
	WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
	WorkflowTaskTimeout:   1 * 24 * 7 * 52 * time.Hour,
	RetryPolicy:           &temporal.RetryPolicy{InitialInterval: time.Second},
}

 wr, err := s.wfClient.ExecuteWorkflow(ctx, wfopts, workflowCreateMatches, mProfiles, houseId, tournament.Id)

  err = wr.Get(ctx, &err)

and below is the activity invocation code within workflow “workflowCreateMatches”

func (ctx workflow.Context, mProfiles []MatchProfile, houseId, tourId string){

actx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
	ScheduleToStartTimeout: 10 * time.Minute,
	StartToCloseTimeout:    1 * 24 * 7 * 52 * time.Hour,
	HeartbeatTimeout:       5 * time.Minute,
	WaitForCancellation:    false,
	RetryPolicy:            &temporal.RetryPolicy{InitialInterval: time.Minute},
})

future := workflow.ExecuteActivity(actx, new(DirectorSvc).CreateAndAssignMatches, mProfiles, houseId, tourId)
err := future.Get(ctx, nil)
if err != nil {
	if temporal.IsCanceledError(err) {
		return nil
	}
}

and the activity implementation is looking like this.

 func (d *DirectorSvc) CreateAndAssignMatches(ctx context.Context, mProfiles []MatchProfile, houseId, tourId string) error {
for {
	select {
	case <-ctx.Done():
		return nil

	case <-time.After(time.Duration(logicInterval) * time.Second):
		{
			if time.Now().After(heartBeatTime.Add(time.Minute)) {
				activity.RecordHeartbeat(ctx, "")
				log.Debug(ctx, "HeartBeat", log.KV("tourId", tourId))
				heartBeatTime = time.Now()
			}
             //logic 
		}
	}
  }
return nil
}

I am trying to call this workflow but facing two issues that not able to solve

  1. the activity “CreateAndAssignMatches” getting called multiple times … like initially, log " HeartBeat" is coming was coming in 1 min interval… then it getting increased and it was coming like 6 times in a minute, so as per log, I am sure that this activity getting called multiple times, within this single workflow.

  2. I am trying to cancel this workflow so that I can exit from the activity by context cancel (i want to somehow stop the loop).

     err := s.wfClient.CancelWorkflow(ctx, "CreateMatches_" + tournament.Id,, "")
    if err != nil {
     }
    

but it says it canceled the workflow, but actually the activity keeps on running (i think context is not canceling, even though I am recording the heartbeat.

my doubts are :

  1. why this single activity getting called multiple times and running separately on the worker ( how multiple invocations happened to the activity, I am sure it is not coming from retry. because there was no failure happening ).
  2. how can I exit from this blocking activity. (using record activity in go)

the steps is shown in temporal UI are,

1  WorkflowExecutionStarted.     workflowType.name. workflowCreateMatches 

2 WorkflowTaskScheduled                 taskQueue.name     matchCreate_tq
                                           taskQueue.kind      Normal
                                           startToCloseTimeout   60.  

3.  WorkflowTaskStarted
4  WorkflowTaskCompleted
5 ActivityTaskScheduled               activityId      5
                                activityType.name    CreateAndAssignMatches

after exactly 1hr and 45s later

6. ActivityTaskTimedOut    1h 45s (+1h 45s)    

         failure.timeoutFailureInfo.timeoutType   ScheduleToStart
                          failure.failureInfo.    timeoutFailureInfo
                            scheduledEventId.       5

7 WorkflowTaskScheduled    1h 45s       

      taskQueue.name      8255f05a42c0:51047173-1314-40e7-82c5-1888c216503a
       taskQueue.kind      Sticky
       startToCloseTimeout   60



8.  WorkflowTaskTimedOut.   1h 50s (+5s)

           scheduledEventId     7
             startedEventId     0
           timeoutType     ScheduleToStart

9 WorkflowTaskScheduled     1h 50s

          taskQueue.name          matchCreate_tq
         taskQueue.kind              Normal
        startToCloseTimeout                60

10.  WorkflowTaskStarted

11.  WorkflowTaskCompleted

12     WorkflowExecutionFailed       1h 50s
      failure.message:       failed to exicute CreateMatches: activity task error (scheduledEventID: 5, 
      TimeoutType: ScheduleToStart, Cause: <nil>
     Failure.source    GoSDK

    failure.cause.message:
   activity task error (scheduledEventID: 5, startedEventID: 0, identity: ): TimeoutType: 
    ScheduleToStart, Cause: <nil>

     Failure.cause.source    GoSDK

the entire program is stopping after this 1hr 50s, i am not able to understand what is going on.

please ignore the parameters

Please help with this.

Thanks in advance,
Junaid

The most logical explanation is that the activity times out and is retried according to the retry policy. I would recommend setting the activity retryPolicy.maximumAttempts to 1 to disable retries to simplify troubleshooting.

I would also recommend removing the if condition that throttles heartbeat calls as the SDK already throttles them.

Usually to see what’s going on during activity execution look at the summary page in the UI. It shows all activities in-flight with the number of attempts and the last failure they returned. The corresponding tctl command is workflow describe.

I would recommend starting from the cancel activity sample that might give you some idea what is different about your workflow.

BTW. There are a couple of weird timeout values in your workflow.

WorkflowTaskTimeout should never be set to a large value. It defines for how long service waits to continue executing workflow if a worker fails while processing a workflow task. I doubt you want to wait a year for your workflow to resume execution. I’m pretty sure that service detects this and sets this timeout to the allowed maximum of 1 minute. But still, I would recommend not setting it at all and relying on the default.

Activity ScheduleToStartTimeout defines how fast an activity times out if a worker didn’t pick it up from a task queue. This timeout is not retryable as it doesn’t make sense to retry as it would end up putting a task into the same queue. So we recommend either not setting it (which defaults it to the scheduleToClose or the workflow timeout) or setting to a value that is large enough to account for all possible downtimes and outages of all system components.

Thank you @maxim for fast responding, i will definitely try the retryPolicy.maximumAttempts to 1 , and I will update.

  • what you mean by SDK throttles heartbeat, if I have let say 5min is the heartbeat timeout … event if send 1 per second. SDK will throttle to some less number of heartbeats?.. from where I can learn more about this throttling?

Thanks in advance,
Junaid

what you mean by SDK throttles heartbeat, if I have let say 5min is the heartbeat timeout … event if send 1 per second. SDK will throttle to some less number of heartbeats?.. from where I can learn more about this throttling?

Yes, it is going to wait for 80% of the timeout before sending a heartbeat, then wait again. So you can call heartbeat as frequently as you want.

This is not an ideal throttling algorithm we inherited from Cadence. We have plans to improve it in the future.

Here is the actual implementation.

1 Like

glad that you responded really fast. @maxim , i removed the

   wr.Get(ctx, &err)

for workflow, it is not present in the example code. (the link you provided)… but now the heartbeat timeout happening ( i can see in history page… ) but I am calling heartbeat.

I even tried to cancel the workflow. it was successful. but it’s not cancelled the activity, so I think in my case heartbeat is not working, Is there any reason why heart beating not getting recorded…

updated for workflow

wfopts := client.StartWorkflowOptions{
	TaskQueue:             temporalTaskQueue,
	ID:                    "CreateMatches_" + tournament.Id,
	WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
	RetryPolicy:           &temporal.RetryPolicy{InitialInterval: time.Second},
}

for activity

actx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
	ScheduleToCloseTimeout: 1 * 24 * 7 * 52 * time.Hour,
	HeartbeatTimeout:       5 * time.Minute,
	WaitForCancellation:    true,
	RetryPolicy:            &temporal.RetryPolicy{InitialInterval: time.Minute, MaximumAttempts: 1},
})

after exactly 5 min … heartbeat timeout happened?

please help me with what could go wrong, and if there is some reference that I can look at that would be really great.

I would recommend double checking if your activity code indeed calls the heartbeat.

double-checked… will debug myself , but

wr.Get(ctx, &err)

with and without this should be the same right? i mean waiting on the workflow future and without waiting, behaviour should be same?

Behavior of the workflow is the same.