SDK: go.temporal.io/sdk v1.7.0
Temporal: v1.9.0
DB: cassandra 3.11.9
Temporal workflow history - see here
Expected output:
2021/06/22 10:40:46 INFO workflow started
2021/06/22 10:40:46 INFO start-id id=0
2021/06/22 10:40:51 WARN got interupt! id=5
2021/06/22 10:40:51 INFO continued activity id=5
2021/06/22 10:40:51 INFO Stopped Worker Namespace default
2021/06/22 10:40:51 INFO Started Worker Namespace default
2021/06/22 10:40:52 INFO start-id id=5
2021/06/22 10:40:59 INFO workflow finished
Actual output:
2021/06/22 10:40:46 INFO Started Worker Namespace default TaskQueue split-merge WorkerID 5263@m1.local@
2021/06/22 10:40:46 INFO workflow started Namespace default TaskQueue split-merge WorkerID 5263@m1.local@ WorkflowType SampleWorkflow WorkflowID bacaf3a6-c2e8-4e3f-8da8-18b138c877a3 RunID 4e8e62dc-542c-4c08-8ec3-72c07549d556 Attempt 1
2021/06/22 10:40:46 DEBUG ExecuteActivity Namespace default TaskQueue split-merge WorkerID 5263@m1.local@ WorkflowType SampleWorkflow WorkflowID bacaf3a6-c2e8-4e3f-8da8-18b138c877a3 RunID 4e8e62dc-542c-4c08-8ec3-72c07549d556 Attempt 1 ActivityID 5 ActivityType SampleActivity
2021/06/22 10:40:46 INFO start id Namespace default TaskQueue split-merge WorkerID 5263@m1.local@ ActivityID 5 ActivityType SampleActivity Attempt 1 WorkflowType SampleWorkflow WorkflowID bacaf3a6-c2e8-4e3f-8da8-18b138c877a3 RunID 4e8e62dc-542c-4c08-8ec3-72c07549d556 id 0
2021/06/22 10:40:51 WARN got interupt! Namespace default TaskQueue split-merge WorkerID 5263@m1.local@ ActivityID 5 ActivityType SampleActivity Attempt 1 WorkflowType SampleWorkflow WorkflowID bacaf3a6-c2e8-4e3f-8da8-18b138c877a3 RunID 4e8e62dc-542c-4c08-8ec3-72c07549d556 id 5
2021/06/22 10:40:51 INFO Task processing failed with error Namespace default TaskQueue split-merge WorkerID 5263@m1.local@ WorkerType ActivityWorker Error worker stopping
2021/06/22 10:40:51 INFO Stopped Worker Namespace default TaskQueue split-merge WorkerID 5263@m1.local@
2021/06/22 10:40:51 INFO Started Worker Namespace default TaskQueue split-merge WorkerID 5263@m1.local@
package main
import (
"context"
"flag"
"time"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
func newWorker(cl client.Client) worker.Worker {
w := worker.New(cl, "split-merge", worker.Options{
MaxConcurrentActivityExecutionSize: 10,
WorkerStopTimeout: 15 * time.Minute,
})
w.RegisterWorkflow(SampleWorkflow)
w.RegisterActivity(SampleActivity)
return w
}
func main() {
var temporalAddr string
flag.StringVar(&temporalAddr, "addr", client.DefaultHostPort, "temporal addr")
flag.Parse()
cl, err := client.NewClient(client.Options{HostPort: temporalAddr})
if err != nil {
panic(err)
}
_, err = cl.ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{TaskQueue: "split-merge"},
SampleWorkflow,
)
if err != nil {
panic(err)
}
w := newWorker(cl)
w.Start()
// simulate graceful shutdown after 5 seconds
time.Sleep(5 * time.Second)
w.Stop()
w = newWorker(cl)
w.Run(worker.InterruptCh())
}
type Resp struct {
lastID int
Interupt bool
}
func SampleWorkflow(ctx workflow.Context) error {
log := workflow.GetLogger(ctx)
log.Info("workflow started")
// for i := 0; i < 10; i++ {
var resp Resp
ao := workflow.ActivityOptions{StartToCloseTimeout: time.Hour}
ctx = workflow.WithActivityOptions(ctx, ao)
err := workflow.ExecuteActivity(ctx, SampleActivity, 0).Get(ctx, &resp)
if err != nil {
log.Info(err.Error())
}
if resp.Interupt {
log.Info("continued activity", "id", resp.lastID)
err := workflow.ExecuteActivity(ctx, SampleActivity, resp.lastID).Get(ctx, &resp)
if err != nil {
log.Info(err.Error())
}
}
log.Info("workflow finished")
// }
return nil
}
func SampleActivity(ctx context.Context, id int) (*Resp, error) {
log := activity.GetLogger(ctx)
log.Info("start-id", "id", id)
stopCh := activity.GetWorkerStopChannel(ctx)
for i := id; i < 20; i++ {
select {
case <-stopCh:
log.Warn("got interupt!", "id", i)
return &Resp{i, true}, nil
default:
time.Sleep(time.Second)
}
}
return &Resp{20, false}, nil
}