Go SDK - Sequential by ID otherwise in parallel

ref: Events | Temporal / design patterns - Job queue with job affinity - Stack Overflow,

I am wanting to implement this in GO-SDK but can not figure out how it would be done.

Essentially I want to process sequentially based on an ID, if IDs don’t match then can be run by another worker in parallel.

The main goal for me is to not run in parallel if the ID is already currently being worked on.

You may need to clarify your question a little bit. In case if you understand how implementation suggested by Maxim in the above stackoverflow thread works and want to learn how to write a workflow in go then please consult our documentation and take a look at go samples for the reference.

In Go it is even simpler than in Java. The signals are returned through a Channel instance so a workflow should implement a loop that consumes from that channel and executes the activity. Also, it should call continue as new periodically to reset the history size.

@Vitaly, thank you for taking the time to respond.

I have spent many hours reading all of your documentation watched every video I can find and looked at all of the samples
I have looked sdk · pkg.go.dev also.

There is actually a lot of information to digest and I am still getting my head around all of this.

To be more specific with what I am trying to do:

think of each task being to configure a router

many tasks can arrive for many different routers

when configuring a router it can only be edited by one worker at a time

once router is configured another task for that router can run

other routers can be configured without an issue, could be 100s of routers

I have understood the first step is to set a unique ID for the queue

the method I can use for GO-SDK is SignalWithStartWorkflow

I am a little confused how to build the workflow to achieve the above.

the basic idea for GO-SDK seems to be

var signalVal string
signalChan := workflow.GetSignalChannel(ctx, signalName)

s := workflow.NewSelector(ctx)
s.AddReceive(signalChan, func(c workflow.Channel, more bool) {
	c.Receive(ctx, &signalVal)
	workflow.GetLogger(ctx).Info("Received signal!", "signal", signalName, "value", signalVal)
})
s.Select(ctx)

if len(signalVal) > 0 && signalVal != "SOME_VALUE" {
	return errors.New("signalVal")
}

If you can direct me to the documentation/examples I am happy to do more reading.

There is no need to use Selector if you are reading from a single channel. Channel.Receive already blocks if a channel is empty. I would rewrite this code as:

	signalChan := workflow.GetSignalChannel(ctx, signalName)

	for i := 0; i < CONTINUE_AS_NEW_FREQUENCY; i++ {
		var signalVal string
		signalChan.Receive(ctx, &signalVal)
		workflow.GetLogger(ctx).Info("Received signal!", "signal", signalName, "value", signalVal)
		// PROCESS SIGNAL
	}
	// Drain signal channel asynchronously to avoid signal loss
	for {
		var signalVal string
		ok := signalChan.ReceiveAsync(&signalVal)
		if !ok {
			break;
		}
		workflow.GetLogger(ctx).Info("Received signal!", "signal", signalName, "value", signalVal)
		// PROCESS SIGNAL
	}
	return workflow.NewContinueAsNewError(ctx, WorkflowFn)
1 Like

Thank you @maxim, sorry to say I am still a little lost with your suggestion.

Where do I get the value for “CONTINUE_AS_NEW_FREQUENCY”
and what is the WorkflowFn to be
I thought this was already the workflow Function

CONTINUE_AS_NEW_FREQUENCY is a constant that you define. There is no hard rule of how many iterations a workflow can make before calling continue as new. I would keep it under 1k.

I used WorkflowFn as the name of the workflow function. Put there whatever workflow function you use for your workflow. It is possible to call continue as new with a function that is different from the original workflow. It is rarely needed, but this is the reason that you have to pass it to the NewContinueAsNewError function.

Here is the documentation of continue as new.

Thank you @maxim for your assistance, still a lot to learn but my solution is coming together thanks to you.

Thank you for the solution. Is there a chance that the workflow will miss a signal if that is sent in the instant between last call to receiveAsync and the end of the workflow?

No, if you don’t do anything else that would cause the workflow to yield and do more server tasks (i.e. non-ok receive async is the last thing before returning continue as new) you will not miss the signal.

(Internally, if the server sends a signal in the time between, the continue as new will be rejected and workflow will be rerun with the signal. But you should not have to care about the internals.)