Hello team,
I discovered Temporal a few months ago and almost instantly started evaluating whether it would fit our use cases. It’s been a great journey so far - definitely seems like it would streamline and simplify a lot of stuff.
Our service is responsible for executing heavyweight processes on virtual machines in the cloud. We have a workflow that represents one process on one machine, but we also have a workflow that represents multiple processes (P) spread across multiple machines (M), where M is less than or equal to P, depending on our business rules for the user. The latter is where a lot of complexity comes from, as we need some sort of coordinator that receives updates and synchronizes state. Since there are lots of optimizations that are done for this “batch” workflow, there are also some additional activities that need to be completed by it - like reusing hardware and so on.
My initial thoughts were that a single-parent workflow would be enough, but I can’t seem to find a good way of implementing it without having to worry about the workflow events history limit of 50000. I saw that there is workflow.ContinueAsNew
but it is really inconvenient to use in my current implementation, as I am waiting for completion of only one among multiple futures. My implementation boils down to the following (consider the snippet as pseudocode, some non-important bits to my questions are not shown):
// <workflow setup and additional steps, which are not important to the main flow control>
// We have the following variables prepared:
var (
selector workflow.Selector
availableMachines []string
schedulePoll bool = true
finishedJobs = 0
jobsIter JobsIterator // orders the remaining jobs, has standard iterator methods (HasNext, Next)
cancelLongPoll = func() {}
err error
)
for {
if schedulePoll {
pollCtx, cancelLongPoll = workflow.WithCancel(/* we add activity options here, too */)
newMachinesFuture = workflow.ExecuteActivity(pollCtx, /* activity that does long polling for new machines */)
selector.AddFuture(newMachinesFuture, func (f workflow.Future) {
var machines []string
err = f.Get(ctx, &machines)
availableMachines = append(availableMachines, machines)
schedulePoll = true
})
schedulePoll = false
}
if !jobsIter.HasNext() {
// Doing some machines release, not important to the question
availableMachines = nil
}
for _, machine := range availableMachines {
if !jobsIter.HasNext() {
break
}
nextJob := jobsIter.Next()
childFuture := workflow.ExecuteChildWorkflow(/* this is the workflow that starts a heavyweight process on a machine */)
selector.AddFuture(childFuture, func (f workflow.Future) {
err = f.Get(ctx, nil)
// this is executed outside of the loop, during selector.Select
// it is okay to append
availableMachines = append(availableMachines, machine)
finishedJobs++
})
}
// this basically waits for a single child to finish or new machines to appear
selector.Select(ctx)
if err != nil {
return "", err
}
if jobsIter.TotalLen() == finishedJobs {
// cancel and wait for the last future to finish
cancelLongPoll()
selector.Select(ctx)
break
}
}
The whole workflow works perfectly in my local environment but I am afraid that we will reach the history limit pretty fast. Is there anything that I can do about it, which would fit rather well in an implementation like this? I hope you can see why ContinueAsNew
is rather hard to use here, as there is not a single place where I have awaited all futures. I was thinking that manual or automatic event compaction would be pretty useful (merging multiple history events into one), but that’s probably quite hard to implement on your side.
Also, what are the implications of having more than 50000 history events? I didn’t see it being configurable, so there must be some restrictions on your side.