Go SDK - Sequential by ID otherwise in parallel

ref: Events | Temporal / design patterns - Job queue with job affinity - Stack Overflow,

I am wanting to implement this in GO-SDK but can not figure out how it would be done.

Essentially I want to process sequentially based on an ID, if IDs don’t match then can be run by another worker in parallel.

The main goal for me is to not run in parallel if the ID is already currently being worked on.

You may need to clarify your question a little bit. In case if you understand how implementation suggested by Maxim in the above stackoverflow thread works and want to learn how to write a workflow in go then please consult our documentation and take a look at go samples for the reference.

In Go it is even simpler than in Java. The signals are returned through a Channel instance so a workflow should implement a loop that consumes from that channel and executes the activity. Also, it should call continue as new periodically to reset the history size.

@Vitaly, thank you for taking the time to respond.

I have spent many hours reading all of your documentation watched every video I can find and looked at all of the samples
I have looked sdk · pkg.go.dev also.

There is actually a lot of information to digest and I am still getting my head around all of this.

To be more specific with what I am trying to do:

think of each task being to configure a router

many tasks can arrive for many different routers

when configuring a router it can only be edited by one worker at a time

once router is configured another task for that router can run

other routers can be configured without an issue, could be 100s of routers

I have understood the first step is to set a unique ID for the queue

the method I can use for GO-SDK is SignalWithStartWorkflow

I am a little confused how to build the workflow to achieve the above.

the basic idea for GO-SDK seems to be

var signalVal string
signalChan := workflow.GetSignalChannel(ctx, signalName)

s := workflow.NewSelector(ctx)
s.AddReceive(signalChan, func(c workflow.Channel, more bool) {
	c.Receive(ctx, &signalVal)
	workflow.GetLogger(ctx).Info("Received signal!", "signal", signalName, "value", signalVal)
})
s.Select(ctx)

if len(signalVal) > 0 && signalVal != "SOME_VALUE" {
	return errors.New("signalVal")
}

If you can direct me to the documentation/examples I am happy to do more reading.

There is no need to use Selector if you are reading from a single channel. Channel.Receive already blocks if a channel is empty. I would rewrite this code as:

	signalChan := workflow.GetSignalChannel(ctx, signalName)

	for i := 0; i < CONTINUE_AS_NEW_FREQUENCY; i++ {
		var signalVal string
		signalChan.Receive(ctx, &signalVal)
		workflow.GetLogger(ctx).Info("Received signal!", "signal", signalName, "value", signalVal)
		// PROCESS SIGNAL
	}
	// Drain signal channel asynchronously to avoid signal loss
	for {
		var signalVal string
		ok := signalChan.ReceiveAsync(&signalVal)
		if !ok {
			break;
		}
		workflow.GetLogger(ctx).Info("Received signal!", "signal", signalName, "value", signalVal)
		// PROCESS SIGNAL
	}
	return workflow.NewContinueAsNewError(ctx, WorkflowFn)
1 Like

Thank you @maxim, sorry to say I am still a little lost with your suggestion.

Where do I get the value for “CONTINUE_AS_NEW_FREQUENCY”
and what is the WorkflowFn to be
I thought this was already the workflow Function

CONTINUE_AS_NEW_FREQUENCY is a constant that you define. There is no hard rule of how many iterations a workflow can make before calling continue as new. I would keep it under 1k.

I used WorkflowFn as the name of the workflow function. Put there whatever workflow function you use for your workflow. It is possible to call continue as new with a function that is different from the original workflow. It is rarely needed, but this is the reason that you have to pass it to the NewContinueAsNewError function.

Here is the documentation of continue as new.

Thank you @maxim for your assistance, still a lot to learn but my solution is coming together thanks to you.