Thank you for doing that, I don’t know in which order you ran it so I created one function to demonstrate the whole issue.
If you run this, depends of how long it takes your first workflow to actually start processing activities, you will encounter the issue once the second workflow tries to start. The important part here is that the first workflow has to be in the middle of running the activities before the new workflow starts (that’s why I’m using sleep in the main func.
Let me know if this happens for you as well.
package main
import (
"context"
"log"
"time"
"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)
func main() {
c1 := createClient()
w1 := createWorker(c1)
w1.Start()
workflowOptions1 := client.StartWorkflowOptions{
ID: "1",
TaskQueue: "ordersViewQueue",
WorkflowExecutionErrorWhenAlreadyStarted: false,
CronSchedule: "* * * * *",
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
we1, err := c1.ExecuteWorkflow(context.Background(), workflowOptions1, UpdateOrdersViewWorkflow, "ViewName1")
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we1.GetID(), "RunID", we1.GetRunID())
time.Sleep(time.Second * 40)
w1.Stop()
c1.Close()
// "server redeploys"
c2 := createClient()
defer c2.Close()
w2 := createWorker(c2)
w2.Start()
defer w2.Stop()
log.Println("After start second worker")
time.Sleep(time.Second * 10)
workflowOptions2 := client.StartWorkflowOptions{
ID: "1",
TaskQueue: "ordersViewQueue",
WorkflowExecutionErrorWhenAlreadyStarted: false,
CronSchedule: "* * * * *",
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
we2, err := c2.ExecuteWorkflow(context.Background(), workflowOptions2, UpdateOrdersViewWorkflow, "ViewName1")
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we2.GetID(), "RunID", we2.GetRunID())
time.Sleep(time.Second * 40)
}
func createClient() client.Client {
c, err := client.NewClient(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
return c
}
func createWorker(c client.Client) worker.Worker {
w := worker.New(c, "ordersViewQueue", worker.Options{})
w.RegisterWorkflow(UpdateOrdersViewWorkflow)
w.RegisterActivity(ApplyEventActivity)
return w
}
func ApplyEventActivity(ctx context.Context, viewName string) error {
return nil
}
func UpdateOrdersViewWorkflow(ctx workflow.Context, viewName string) error {
for i := 0; i < 1000; i++ {
opts := workflow.ActivityOptions{
TaskQueue: "ordersViewQueue",
ScheduleToCloseTimeout: time.Hour * 5,
StartToCloseTimeout: time.Second * 10,
HeartbeatTimeout: time.Second * 10,
WaitForCancellation: false,
ActivityID: uuid.New().String(),
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second * 10,
BackoffCoefficient: 2,
MaximumInterval: time.Minute * 10,
MaximumAttempts: 0},
}
ctx = workflow.WithActivityOptions(ctx, opts)
if err := workflow.ExecuteActivity(ctx, ApplyEventActivity, viewName, nil).Get(ctx, nil); err != nil {
return err
}
}
return nil
}