Use case: async input collector

Hi @maxim,

As discussed on slack, I will try to explain the my current use case in detail, which I am trying to migrate to temporal. Attached is a drawing to describe the various parts of the system to help the description. I want to get your inputs on the design before I starting with the implementation.

The initial design which you can see in the attachment is coming from these business requirements:

  1. A workflow consists of multiple input tasks (T#1, T#2, T#3…T#8) which are organized into task groups. In each task group there can be multiple tasks, which can be fulfilled in parallel)
  2. Task groups (TG#1, TG#2, TG#4, TG#5, TG#8) are notified sequentially, when the previous task group completed
  3. A Task group completes when all tasks/inputs within the group have been marked completed (e.g. TG#2 completes when T#2 and T#3 both have been supplied with external input)
  4. A workflow completes when the last task group is completed
  5. A task/input has to be able to accept input even before being assigned/notified (e.g. last notification is TG#2 but T#8 wants to provide its input and complete task T#8)
  6. Previously completed input tasks can be marked as incomplete by a third party user/service and so they have to be reopened while the main workflow is running. This action should make the re-opened task group the current task group (e.g. current task group is TG#5 but T#4 input contains invalid data an administrator has to be able to re-open task T#4 making TG#4 the currently active task group, hence sending out the notification for input completion again for TG#4)
  7. Having a completed business workflow, an administrator has to be able to reopen tasks, reverting the business workflow to a „running“ status. The notification for input completion should be sent out for the currently new active task group (e.g. after completed workflow, T#4 is deemed incorrect or incomplete, the workflow should be started again, but rehydrating with the already completed task groups, so that only TG#4 is assigned and is required for completion, because TG#1, TG#2, TG#5 and TG#8 are already correct and complete.
  8. The required inputs for the individual tasks can be provided by either humans (e-mail notification) or by multiple third party services (event notification)

The question which I raised on Slack was due to me trying to tackle the state management and rehydration within replays and or between run IDs. This is where I really confused myself having state within temporal and the database as well.

Side note: The database behind the workflows is used currently as the primary source of truth for the clients and for search and reporting purposes.

I appreciate you taking time to dissect the concept. :slight_smile:

Interesting use case. Thanks for sharing!

Ideally, you wouldn’t need a database for storing workflow state. But the requirement of updating a closed workflow is not directly supported by Temporal. I filed an issue to add SignalWithReset API to support your scenario.

In the meantime I propose the following workaround:

  • Have an activity that checks if there are any outstanding “revert task” requests. It would require storing such requests in some external datastore.
  • Call this activity just before the workflow is completed. If no such requests found then complete the workflow. If such a request found then handle it.
  • To “revert task” in a closed workflow reset the workflow to a point just before that activity invocation. The activity is reexecuted, finds the new “revert task” request and reexecutes the necessary part of your logic.

Thanks @maxim for filing the SignalWithReset!

Do you mean to say that we should drop the database and just use the query capabilities of temporal to present the state to the clients? Can that perform well even in NDC environments? Also it would be interesting to see if more complex queries via custom search attributes perform well enough, or is it just a question of resources?

I like your workaround idea and sat down to write a pseudo version of it. Can you check it to make sure I understood your idea? I can’t find much on “reset” itself, only the tctl descriptions. Can you link a go example for reseting a workflow?

The main workflow:

func Container(ctx workflow.Context, workflowInstance *WorkflowData) (*WorkflowData, error) {
	state := NewStateMachine(ctx, workflowInstance)
	err := state.RegisterQueryHandlers()
	if err != nil {
		return nil, err
	}
	sel := workflow.NewSelector(ctx)
	signalHandlers := state.PrepareSignalHandlers()
	for signalCh, handler := range signalHandlers {
		sel.AddReceive(signalCh, handler)
	}
	taskGroups := state.CreateTaskGroups()
	for tg, fn := range taskGroups {
		sel.AddFuture(tg, fn)
	}
	// main loop
	for state.Current() != CompletedState {
		sel.Select(ctx)
		// workaround code
		err := state.CheckReopenTaskGroup()
		if err != nil {
			return nil, err
		}
	}
	return state.WorkflowData(), nil
}

The task group itself:

func TaskGroup(ctx workflow.Context, taskGroupInstance *TaskGroupData) (*TaskGroupData, error) {
	sel := workflow.NewSelector(ctx)
	finishedStepsC := 0
	allStepsC := len(taskGroupInstance.Steps)
	sel.AddReceive(workflow.GetSignalChannel(ctx, "reopen"), func(c workflow.ReceiveChannel, more bool) {
		var in *input
		c.Receive(ctx, &in)
		if in == nil {
			return
		}

		openStep(taskGroupInstance.Steps, in)
	})
	sel.AddReceive(workflow.GetSignalChannel(ctx, "input"), func(c workflow.ReceiveChannel, more bool) {
		var in *input
		c.Receive(ctx, &in)
		if in == nil {
			return
		}

		closeStep(taskGroupInstance.Steps, in)
	})
	// main loop
	for finishedStepsC < allStepsC {
		sel.Select(ctx)

		// workaround code
		checkReopenStep(ctx, func(in *input) {
			openStep(taskGroupInstance.Steps, in)
		})
	}
	return taskGroupInstance, nil
}

A few key methods for the main workflow:

func (s *stateMachine) PrepareSignalHandlers() map[workflow.ReceiveChannel]func(c workflow.ReceiveChannel, more bool) {
	sm := make(map[workflow.ReceiveChannel]func(c workflow.ReceiveChannel, more bool), 10)
	sm[workflow.GetSignalChannel(s.ctx, "reopen")] = func(c workflow.ReceiveChannel, more bool) {
		var in *input
		c.Receive(s.ctx, &in)
		if in == nil {
			return
		}

		openTaskGroup(in.StepId, in.Payload, func() {
			// patch-through
			workflow.SignalExternalWorkflow(s.ctx, tg.Id, "", "reopen", in)
		})
	}
	return sm
}

func (s *stateMachine) CheckReopenTaskGroup() error {
	var in *input
	if err := workflow.ExecuteLocalActivity(s.ctx, CheckReopenTask, s.workflow.Id()).Get(s.ctx, &in); err != nil {
		return err
	}

	tg := s.FindTaskGroup(in.StepId)
	resetTaskGroupWorkflowToListeningState(s.ctx, tg)
}

func (s *stateMachine) CreateTaskGroups() map[workflow.ChildWorkflowFuture]func(f workflow.Future) {
	taskGroups := s.calculateAllTaskGroups()
	tgmp := make(map[workflow.ChildWorkflowFuture]func(f workflow.Future), len(taskGroups))
	for _, tg := range taskGroups {
		we := workflow.ExecuteChildWorkflow(s.ctx, TaskGroup, tg)
		tgmp[we] = func(f workflow.Future) {
			var taskGroup *TaskGroupData
			if err := f.Get(s.ctx, &taskGroup); err != nil {
				s.closeTaskGroup(taskGroup)
			}
		}
	}
	return tgmp
}

It is hard for me to follow the complete logic. But I see a couple of issues.

The most important one is that due to the determinism requirements the range over a map is prohibited in the workflow code. This is due to the randomization of iteration order by the Go runtime.

I also see that workflow.GetSignalChannel(s.ctx, "reopen") is called in multiple places. Note that this call returns the same channel instance. So if there are multiple consumers calling into the same channel in parallel each is going to receive a subset of messages. I’m not sure if it is a problem in your case.

Thanks, I‘ll just put the concept to test and try to come up with a demo for next week.

Good point regarding the range over maps. I always forget about that :+1:

Those two „reopen“ are supposed to be in two separate workflow types/contexts. I was under the impression that the signal only has to be unique per workflow. Do I need to start working with global uniqueness?

Those two „reopen“ are supposed to be in two separate workflow types/contexts. I was under the impression that the signal only has to be unique per workflow. Do I need to start working with global uniqueness?

Then you are fine. I missed that these calls live in different workflows. All signal types are per workflow instance.

1 Like

@maxim can you point me to an example of a „reset“ go code sample? I assume something like I would need to find the event id of the last activity, which would be the „check pending“ activity, and reset to that point? Or do you have a nicer way to do this?

We don’t have a sample. Here is the tctl implementation of reset.