Based on this sample example of creating a mutex workflow: samples-go/mutex/mutex_workflow.go at main · temporalio/samples-go · GitHub
For my use case, I need to know what workflow is locking up the mutex. This is so that I can have another workflow call cancel to unlock that workflow.
For example:
workflow1, workflow2
- workflow 1 locks up the resource by starting the mutex workflow where it will defer to unlock when the workflow is canceled.
- workflow 2 starts and wants to know if the same resource is being locked by that mutex workflow. If it is, it wants to call cancel on workflow1 which will unlock the mutex
To solve this problem, I added a workflow state to the mutex workflow where it can be queried. This will track what current workflow is locking up the resource.
I was wondering if there is any concerns with this approach or is there a better solution:
Example of the code
func MutexWorkflow(...) {
var lockedResourceWorkflowId Status
err := workflow.SetQueryHandler(ctx, QueryKeyForStatus, func(_ []byte) (string, error) {
return lockedResourceWorkflowId
})
if err != nil {
return fmt.Errorf("failed to initialize setQueryHandler: %w", err)
}
var lockedResourceWorkflowId string
....
for {
var senderWorkflowID string
....
logger.Info("signaled external workflow")
lockedResourceWorkflowId = senderWorkflowID
selector := workflow.NewSelector(ctx)
selector.AddFuture(workflow.NewTimer(ctx, unlockTimeout), func(f workflow.Future) {
logger.Info("unlockTimeout exceeded")
})
selector.AddReceive(workflow.GetSignalChannel(ctx, releaseLockChannelName), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &ack)
logger.Info("release signal received")
})
selector.Select(ctx)
lockedResourceWorkflowId = ""
}
return nil
}
the key part is just setting and resetting the workflow state
Question
- I wasn’t sure if I had anything to worry about where
Select
wherelockedResourceWorkflowId
where it would for some reason fail andlockedResourceWorkflowId
never get reset to""
. Or if there is any concerns with concurrency issues.
Alternatively, another option would be to store the lockedResourceWorkflowId
into the context. But not sure if this really works since I don’t know if the context gets propagated to the SetQueryHandler
. I tried to test this via unit test and it didn’t seem to work.
Example:
func MutexWorkflow(...) {
var lockedResourceWorkflowId Status
type workflowIdKey struct {}
err := workflow.SetQueryHandler(ctx, QueryKeyForStatus, func(_ []byte) (string, error) {
wfID, ok := ctx.Value(workflowKey{}).(string)
if ok {
return Status{
AcquiredLockWorkflowID: wfID,
}, nil
} else {
return Status{
AcquiredLockWorkflowID: "",
}, nil
}
})
if err != nil {
return fmt.Errorf("failed to initialize setQueryHandler: %w", err)
}
var lockedResourceWorkflowId string
....
for {
var senderWorkflowID string
....
logger.Info("signaled external workflow")
selector := workflow.NewSelector(ctx)
selector.AddFuture(workflow.NewTimer(ctx, unlockTimeout), func(f workflow.Future) {
logger.Info("unlockTimeout exceeded")
})
selector.AddReceive(workflow.GetSignalChannel(ctx, releaseLockChannelName), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &ack)
logger.Info("release signal received")
})
selectorCtx := workflow.WithValue(ctx, workflowKey{}, senderWorkflowID)
selector.Select(selectorCtx)
}
return nil
}
Test (Failed when trying to put the workflow ID into the context)
func (s *mutexSuite) Test_MutexWorkflow_Success() {
requestWorkflowID1 := "requestWorkflowID_1"
requestWorkflowID2 := "requestWorkflowID_2"
s.env.RegisterDelayedCallback(
func() {
s.env.SignalWorkflow(signalRequestLock, mutexRequest{
WorkflowID: requestWorkflowID1,
Timeout: time.Minute,
})
},
0*time.Millisecond,
)
s.env.RegisterDelayedCallback(
func() {
s.env.SignalWorkflow(signalRequestLock, mutexRequest{
WorkflowID: requestWorkflowID2,
Timeout: time.Minute,
})
},
2*time.Millisecond,
)
s.env.RegisterDelayedCallback(
func() {
encodedStatus, err := s.env.QueryWorkflow(QueryKeyForStatus)
s.NoError(err)
s.NotNil(encodedStatus)
var status string
err = encodedStatus.Get(&status)
s.NoError(err)
s.True(status.IsLocked())
s.Equal(requestWorkflowID1, status)
s.env.SignalWorkflow("unlock-"+requestWorkflowID1, "releaseLock")
},
5*time.Millisecond,
)
s.env.RegisterDelayedCallback(
func() {
encodedStatus, err := s.env.QueryWorkflow(QueryKeyForStatus)
s.NoError(err)
s.NotNil(encodedStatus)
var status Status
err = encodedStatus.Get(&status)
s.NoError(err)
s.True(status.IsLocked())
s.Equal(requestWorkflowID2, status.AcquiredLockWorkflowID)
s.env.SignalWorkflow("unlock-"+requestWorkflowID2, "releaseLock")
},
10*time.Millisecond,
)
s.env.OnSignalExternalWorkflow(
mock.Anything,
requestWorkflowID1, "",
signalAcquireLock,
mock.Anything,
).Return(nil).Times(1)
s.env.OnSignalExternalWorkflow(
mock.Anything,
requestWorkflowID2, "",
signalAcquireLock,
mock.Anything,
).Return(nil).Times(1)
s.env.ExecuteWorkflow(mutexWorkflow)
s.True(s.env.IsWorkflowCompleted())
s.NoError(s.env.GetWorkflowError())
}
Questions for this example:
- Is the context in
SetQueryHandler
the same as the context passed into theselector.Select(selectorCtx
or is it separate - If this is suppose to work, is there a bug in the test framework where the context is not being propagated from the
Select
to theQuery
Handler - If i got this route, I’m guessing I would need to use context propagation to work properly across different workers as well.
Thanks