Error unknown command CommandType: Activity

I need some help understanding why I am getting this error and see what I need to do differently.
I have a cron type workflow running every minute, sometimes I change an object used by one of the activities, for example if this is my activity:

func myActivity(ctx context.Context, param1 string) {
order, err := storage.getOrder(....)
}

If I change the order struct (adding fields) I am getting a panic:
Error unknown command CommandType: Activity, ID: 597f0f6a-df97-4a16-bb0b-e23978e0dbe2, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition

This keeps happening until I terminate the workflow.

I understand versioning for workflows and have used it before but for my understanding activities are open for changes… So why am I getting this panic?

Thanks!

Update: the cron fails on this error even if the worker just stops and restarts while it has activities still working. It’s not dependent on anything in the activity that changes

From the description its not clear to me if you are changing the activity input params or result?
Can you share the event history when this happens?

It looks like your workflow code is not deterministic. Make sure that it obeys all the constraints outlined in the documentation.

My workflow is simple, it gets an array of items from one activity, then ranges over that array and creates an activity from each of those items.
I just saw this: Workflow code can not directly iterate over maps using range because the order of the map’s iteration is randomized.
I am not using a map, I’m using an array, could that still be the reason?

The range over an array shouldn’t be a problem. Any time manipulation?

Are you 100% sure that the workflow code wasn’t changed while some workflow were running?

This is the workflow code:

var a *UpdateViewActivities

	progress := &NextCronStart{
		Latest: 0,
	}
	if workflow.HasLastCompletionResult(ctx) {
		_ = workflow.GetLastCompletionResult(ctx, &progress)
	}

	var entries []*eventstore.EventEntry
	ctx = workflow.WithActivityOptions(ctx, defaultOptions.NewActivityOptionsForOrdersViewQueue())
	if err := workflow.ExecuteActivity(ctx, a.GetAllEventsActivity, progress).Get(ctx, &entries); err != nil {
		return nil, err
	}

	opts := defaultOptions.NewActivityOptionsForOrdersViewQueue()
	for _, eventEntry := range entries {
		ctx = workflow.WithActivityOptions(ctx, opts)
		if err := workflow.ExecuteActivity(ctx, a.ApplyEventActivity, viewName, eventEntry).Get(ctx, nil); err != nil {
			if err == events.ErrEventsOutOfOrder {
				workflow.GetLogger(ctx).Error("events out of order", eventEntry.Seq, eventEntry.StreamID, eventEntry.EventType)
			} else {
				workflow.GetLogger(ctx).Error("error on applying to order", eventEntry.Seq, eventEntry.StreamID, eventEntry.EventType)
				return &NextCronStart{Latest: int64(eventEntry.EventID) - 1}, nil
			}
		}
	}

	res := &NextCronStart{Latest: progress.Latest}
	if len(entries) > 0 {
		res.Latest = int64(entries[len(entries)-1].EventID)
	}

	return res, nil

First I thought that this error is happening only when I change a struct fields that the activity is using but I realized that if my cron is in the middle of a run and my server is deploying (meaning the worker stops and starts again) I am getting this error, no matter what I do I have to terminate the old cron and start a new one.

I don’t see anything obvious. Is this the complete code? Could it be that the issue is somewhere else?

You can try running this code under a debugger (just set the environment variable TEMPORAL_DEBUG to true).

That’s the complete workflow code.
I will try, though just to understand what do I want to search for?

Find why the workflow takes a different path during initial execution versus recovery after the worker restart.

Hey,

So I managed to get very consistent results with a very simple workflow and activity.
I think the problem is trying to call execute workflow with a new options object while the cron is running.
The options are completely the same but it’s a different pointer (which will always happen because the server redeployed).

Workflow code:

func UpdateOrdersViewWorkflow(ctx workflow.Context, defaultOptions *options.ActivitiesOptions, viewName string) (*NextCronStart, error) {

	var a *UpdateViewActivities
	for i := 0; i < 1000; i++ {
		opts := defaultOptions.NewActivityOptionsForOrdersViewQueue()

		ctx = workflow.WithActivityOptions(ctx, opts)
		if err := workflow.ExecuteActivity(ctx, a.ApplyEventActivity, viewName, nil).Get(ctx, nil); err != nil {
			return nil, err
		}
	}
	return nil, nil
}

Activity code:

func (a *UpdateViewActivities) ApplyEventActivity(ctx context.Context, viewName string, eventEntry *eventstore.EventEntry) error {
	return nil
}

So this workflow is basically just running 1000 activities that returns nil.
My way to replicate the issue:

  1. Run client and worker
  2. Execute the workflow with
options1 := client.StartWorkflowOptions{
		ID:                                       "1",
		TaskQueue:                                "ordersViewQueue",
		WorkflowExecutionErrorWhenAlreadyStarted: false,
		CronSchedule:                             "* * * * *",
		WorkflowIDReusePolicy:                    enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
	}
  1. Let the workflow start
  2. Stop workers, stop client
  3. Init new client and worker
  4. Execute workflow again with
options2 := client.StartWorkflowOptions{
		ID:                                       "1",
		TaskQueue:                                "ordersViewQueue",
		WorkflowExecutionErrorWhenAlreadyStarted: false,
		CronSchedule:                             "* * * * *",
		WorkflowIDReusePolicy:                    enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
	}

So if I use the same struct (options1) I’m not getting an error and the cron just keeps running from where it stopped, if I’m using options2 (which have same values!) I will get a panic.

Of course using the same struct is not an option for me when server is redeploying … only option to avoid this is checking if workflow is running before trying to execute again and do not count on temporal’s reuse policy.

I copied your code into this change. I’m not able to reproduce the problem.

Would you create an executable example that reproduces the issue you are having?

Thank you for doing that, I don’t know in which order you ran it so I created one function to demonstrate the whole issue.

If you run this, depends of how long it takes your first workflow to actually start processing activities, you will encounter the issue once the second workflow tries to start. The important part here is that the first workflow has to be in the middle of running the activities before the new workflow starts (that’s why I’m using sleep in the main func.

Let me know if this happens for you as well.

package main

import (
	"context"
	"log"
	"time"

	"github.com/google/uuid"
	"go.temporal.io/api/enums/v1"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporal"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

func main() {

	c1 := createClient()

	w1 := createWorker(c1)
	w1.Start()

	workflowOptions1 := client.StartWorkflowOptions{
		ID:                                       "1",
		TaskQueue:                                "ordersViewQueue",
		WorkflowExecutionErrorWhenAlreadyStarted: false,
		CronSchedule:                             "* * * * *",
		WorkflowIDReusePolicy:                    enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
	}

	we1, err := c1.ExecuteWorkflow(context.Background(), workflowOptions1, UpdateOrdersViewWorkflow, "ViewName1")
	if err != nil {
		log.Fatalln("Unable to execute workflow", err)
	}

	log.Println("Started workflow", "WorkflowID", we1.GetID(), "RunID", we1.GetRunID())

	time.Sleep(time.Second * 40)

	w1.Stop()
	c1.Close()




// "server redeploys" 

	c2 := createClient()

	defer c2.Close()

	w2 := createWorker(c2)
	w2.Start()

	defer w2.Stop()

	log.Println("After start second worker")

	time.Sleep(time.Second * 10)
	workflowOptions2 := client.StartWorkflowOptions{
		ID:                                       "1",
		TaskQueue:                                "ordersViewQueue",
		WorkflowExecutionErrorWhenAlreadyStarted: false,
		CronSchedule:                             "* * * * *",
		WorkflowIDReusePolicy:                    enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
	}

	we2, err := c2.ExecuteWorkflow(context.Background(), workflowOptions2, UpdateOrdersViewWorkflow, "ViewName1")
	if err != nil {
		log.Fatalln("Unable to execute workflow", err)
	}

	log.Println("Started workflow", "WorkflowID", we2.GetID(), "RunID", we2.GetRunID())

	time.Sleep(time.Second * 40)

}

func createClient() client.Client {
	c, err := client.NewClient(client.Options{})
	if err != nil {
		log.Fatalln("Unable to create client", err)
	}

	return c
}

func createWorker(c client.Client) worker.Worker {
	w := worker.New(c, "ordersViewQueue", worker.Options{})

	w.RegisterWorkflow(UpdateOrdersViewWorkflow)
	w.RegisterActivity(ApplyEventActivity)

	return w
}

func ApplyEventActivity(ctx context.Context, viewName string) error {
	return nil
}

func UpdateOrdersViewWorkflow(ctx workflow.Context, viewName string) error {

	for i := 0; i < 1000; i++ {
		opts := workflow.ActivityOptions{
			TaskQueue:              "ordersViewQueue",
			ScheduleToCloseTimeout: time.Hour * 5,
			StartToCloseTimeout:    time.Second * 10,
			HeartbeatTimeout:       time.Second * 10,
			WaitForCancellation:    false,
			ActivityID:             uuid.New().String(),
			RetryPolicy: &temporal.RetryPolicy{
				InitialInterval:    time.Second * 10,
				BackoffCoefficient: 2,
				MaximumInterval:    time.Minute * 10,
				MaximumAttempts:    0},
		}

		ctx = workflow.WithActivityOptions(ctx, opts)
		if err := workflow.ExecuteActivity(ctx, ApplyEventActivity, viewName, nil).Get(ctx, nil); err != nil {
			return err
		}
	}
	return nil
}

Thank you for creating the stand-alone reproduction. The reason that replay fails with “possible causes is nondeterministic workflow definition code” is because the code indeed is not deterministic. It uses randomly generated UUID as an activity ID. During replay, all activity IDs are regenerated and the framework cannot match them to the history events. The only reason to assign activity ID explicitly is when you want to complete them by ID using ManualClient. Otherwise, I would recommend not assigning them at all to let the framework handle the assignments. If you really want to use UUIDs in the workflow code use SideEffect to generate them.

3 Likes

Thank you!
I removed the activity ID from options.

1 Like