Hi!
I’ve met an unexpected trouble when I tried to cancel child workflow on the event received from background coroutine, hope you’ll explain if it’s my fault or not.
In actual workflow I have background coroutine performing some polling activity and sending results through channel and child workflow which should be canceled on some event, and when that event happens main workflow panics with “lookup failed for scheduledEventID to activityID”.
Minimal code to reproduce:
package main
import (
"context"
"fmt"
"log"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
tlog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
const TaskQueue = "cancel"
const heartbeatInterval = 1 * time.Second
func Parent(ctx workflow.Context) error {
options := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, options)
ch := workflow.NewChannel(ctx)
logger := workflow.GetLogger(ctx)
child, cancel := spawnChild(ctx)
if err := child.GetChildWorkflowExecution().Get(ctx, nil); err != nil {
return fmt.Errorf("failed to start child workflow: %w", err)
}
workflow.GoNamed(ctx, "ctl", func(ctx workflow.Context) {
ctl(ctx, ch, logger)
})
res := ""
for {
s := workflow.NewSelector(ctx)
s.AddReceive(
ch,
func(c workflow.ReceiveChannel, more bool) {
_ = c.Receive(ctx, &res)
},
)
s.Select(ctx)
logger.Info("Received", "Value", res)
if res == "cancel" {
logger.Info("Canceling")
cancel()
r := child.Get(ctx, nil)
logger.Info("Canceled", "Result", r)
break
}
}
if err := workflow.Sleep(ctx, 30*time.Second); err != nil {
logger.Error("Workflow sleep failed", "Error", err)
}
return nil
}
func Child(ctx workflow.Context) error {
options := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, options)
if err := wait(ctx); err != nil {
return fmt.Errorf("wait failed: %w", err)
}
return nil
}
func Wait(ctx context.Context, d time.Duration) error {
deadline := time.Now().Add(d)
attempt := 0
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(heartbeatInterval):
activity.RecordHeartbeat(ctx, attempt)
}
attempt++
if time.Now().After(deadline) {
return nil
}
}
}
func spawnChild(ctx workflow.Context) (workflow.ChildWorkflowFuture, func()) {
ctx, cancel := workflow.WithCancel(ctx)
ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WaitForCancellation: true,
})
return workflow.ExecuteChildWorkflow(ctx, Child), cancel
}
func wait(ctx workflow.Context) error {
t := 100 * 365 * 24 * time.Hour
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: t,
WaitForCancellation: true,
HeartbeatTimeout: 30 * time.Second,
})
return workflow.ExecuteActivity(ctx, Wait, t).Get(ctx, nil)
}
func ctl(ctx workflow.Context, ch workflow.Channel, logger tlog.Logger) {
for {
if err := workflow.ExecuteActivity(
ctx, Wait, 1*time.Second,
).Get(ctx, nil); err != nil {
logger.Error("Wait failed", "Error", err)
} else {
ch.Send(ctx, "cancel")
}
}
}
func main() {
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("unable to create Temporal client", err)
}
defer c.Close()
w := worker.New(c, TaskQueue, worker.Options{})
w.RegisterWorkflow(Parent)
w.RegisterWorkflow(Child)
w.RegisterActivity(Wait)
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("unable to start Worker", err)
}
}
And its error:
2021/10/13 13:18:38 ERROR Workflow panic Namespace default TaskQueue cancel WorkerID 2132641@localhost@ WorkflowType Parent WorkflowID b6d5ed4a-3620-4898-a6c2-6e1c03e0308c RunID 061d626c-5782-49df-ba43-cf071a2469a2 Attempt 2 Error lookup failed for scheduledEventID to activityID: scheduleEventID: 17, activityID: 16 StackTrace process event for cancel [panic]:
go.temporal.io/sdk/internal.panicIllegalState(...)
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_decision_state_machine.go:395
go.temporal.io/sdk/internal.(*commandsHelper).handleActivityTaskScheduled(0xc0004ec230, 0xc000122530, 0x2, 0x11)
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_decision_state_machine.go:914 +0x165
go.temporal.io/sdk/internal.(*workflowExecutionEventHandlerImpl).ProcessEvent(0xc0003981f8, 0xc000496ec0, 0x1, 0x0, 0x0)
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_event_handlers.go:816 +0x45b
go.temporal.io/sdk/internal.(*workflowExecutionContextImpl).ProcessWorkflowTask(0xc0003c61c0, 0xc0003c9830, 0x1471b20, 0xc00031a690, 0xc0003c61c0, 0x0)
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_task_handlers.go:878 +0x72c
go.temporal.io/sdk/internal.(*workflowTaskHandlerImpl).ProcessWorkflowTask(0xc00049c000, 0xc0003c9830, 0xc0004de390, 0x0, 0x0, 0x0, 0x0)
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_task_handlers.go:729 +0x6d9
go.temporal.io/sdk/internal.(*workflowTaskPoller).processWorkflowTask(0xc000492270, 0xc0003c9830, 0x0, 0x0)
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_task_pollers.go:286 +0x4ae
go.temporal.io/sdk/internal.(*workflowTaskPoller).ProcessTask(0xc000492270, 0xd02760, 0xc0003c9830, 0x0, 0x0)
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_task_pollers.go:257 +0x85
go.temporal.io/sdk/internal.(*baseWorker).processTask(0xc0004a8000, 0xd02320, 0xc0004801b0)
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_worker_base.go:343 +0xb8
created by go.temporal.io/sdk/internal.(*baseWorker).runTaskDispatcher
.../go/pkg/mod/go.temporal.io/sdk@v1.10.0/internal/internal_worker_base.go:270 +0xff
Event history for parent:
1 WorkflowExecutionStarted Oct 13th 1:18:37 pm Close Timeout identity tctl@localhost Workflow Parent
2 WorkflowTaskScheduled Taskqueue cancel Timeout 10s
3 WorkflowTaskStarted requestId bde662e3-9878-46bd-a016-d28f0be33bf6
4 WorkflowTaskCompleted identity 2132641@localhost@
5 StartChildWorkflowExecutionInitiated Taskqueue cancel Workflow Child
6 ChildWorkflowExecutionStarted Workflow Child - 061d626c-5782-49df-ba43-cf071a2469a2_5
7 WorkflowTaskScheduled Taskqueue localhost:9fff2b4a-e875-4d99-bfa1-4203d6271561 Timeout 10s
8 WorkflowTaskStarted requestId 6b4e5800-10e3-4ab6-9821-507bd829d3fa
9 WorkflowTaskCompleted identity 2132641@localhost@
10 ActivityTaskScheduled Close Timeout Id 10 input [ 1000000000 ] Name Wait
11 ActivityTaskStarted attempt 1 identity 2132641@localhost@ requestId e0c7c083-7421-42f3-a0d2-d94744dde9fc
12 ActivityTaskCompleted 1s (+1s)
13 WorkflowTaskScheduled 1s Taskqueue localhost:9fff2b4a-e875-4d99-bfa1-4203d6271561 Timeout 10s
14 WorkflowTaskStarted 1s requestId 890e099a-518e-44e4-aecd-679a90d14f1a
15 WorkflowTaskCompleted 1s identity 2132641@localhost@
16 RequestCancelExternalWorkflowExecutionInitiated 1s workflowTaskCompletedEventId 15 namespace default workflowExecution.workflowId 061d626c-5782-49df-ba43-cf071a2469a2_5 workflowExecution.runId control childWorkflowOnly true
17 ActivityTaskScheduled 1s Close Timeout Id 16 input [ 1000000000 ] Name Wait
18 ExternalWorkflowExecutionCancelRequested 1s initiatedEventId 16 namespace default workflowExecution.workflowId 061d626c-5782-49df-ba43-cf071a2469a2_5 workflowExecution.runId
19 WorkflowTaskScheduled 1s Taskqueue localhost:9fff2b4a-e875-4d99-bfa1-4203d6271561 Timeout 10s
20 WorkflowTaskStarted 1s requestId 49846d98-240f-4883-bde1-03c33265182f
21 WorkflowTaskFailed 1s message lookup failed for scheduledEventID to activityID: scheduleEventID: 17, activityID: 16
One may notice that cancellation process parts are interleaved with activity schedule and possibly this leads to incorrect behaviour. Also when I protect event processing section (loop body) with sort of semaphore using variable and workflow.Await()
in ctl()
function error disappears but I had an impression that it shouldn’t be necessary because Temporal uses cooperative multitasking and goroutines and channels don’t require such protection.
Software versions:
Go 1.16
Temporal server 1.12.3
Temporal SDK 1.10.0 and 1.9.0