Adding a state to the mutex workflow example

Based on this sample example of creating a mutex workflow: samples-go/mutex/mutex_workflow.go at main · temporalio/samples-go · GitHub

For my use case, I need to know what workflow is locking up the mutex. This is so that I can have another workflow call cancel to unlock that workflow.

For example:
workflow1, workflow2

  • workflow 1 locks up the resource by starting the mutex workflow where it will defer to unlock when the workflow is canceled.
  • workflow 2 starts and wants to know if the same resource is being locked by that mutex workflow. If it is, it wants to call cancel on workflow1 which will unlock the mutex

To solve this problem, I added a workflow state to the mutex workflow where it can be queried. This will track what current workflow is locking up the resource.

I was wondering if there is any concerns with this approach or is there a better solution:

Example of the code

func MutexWorkflow(...) {
    var lockedResourceWorkflowId Status
	err := workflow.SetQueryHandler(ctx, QueryKeyForStatus, func(_ []byte) (string, error) {
	       return lockedResourceWorkflowId   
	})
	if err != nil {
		return fmt.Errorf("failed to initialize setQueryHandler: %w", err)
	}

	var lockedResourceWorkflowId string
	....
	for {
		var senderWorkflowID string
		....
		logger.Info("signaled external workflow")
		lockedResourceWorkflowId = senderWorkflowID
		selector := workflow.NewSelector(ctx)
		selector.AddFuture(workflow.NewTimer(ctx, unlockTimeout), func(f workflow.Future) {
			logger.Info("unlockTimeout exceeded")
		})
		selector.AddReceive(workflow.GetSignalChannel(ctx, releaseLockChannelName), func(c workflow.ReceiveChannel, more bool) {
			c.Receive(ctx, &ack)
			logger.Info("release signal received")
		})
		selector.Select(ctx)
		lockedResourceWorkflowId = ""
	}
	return nil
}

the key part is just setting and resetting the workflow state

Question

  • I wasn’t sure if I had anything to worry about where Select where lockedResourceWorkflowId where it would for some reason fail and lockedResourceWorkflowId never get reset to "". Or if there is any concerns with concurrency issues.

Alternatively, another option would be to store the lockedResourceWorkflowId into the context. But not sure if this really works since I don’t know if the context gets propagated to the SetQueryHandler. I tried to test this via unit test and it didn’t seem to work.

Example:

func MutexWorkflow(...) {
    var lockedResourceWorkflowId Status
    type workflowIdKey struct {}
	err := workflow.SetQueryHandler(ctx, QueryKeyForStatus, func(_ []byte) (string, error) {
	       wfID, ok := ctx.Value(workflowKey{}).(string)
		if ok {
			return Status{
				AcquiredLockWorkflowID: wfID,
			}, nil
		} else {
			return Status{
				AcquiredLockWorkflowID: "",
			}, nil
		}
	})
	if err != nil {
		return fmt.Errorf("failed to initialize setQueryHandler: %w", err)
	}

	var lockedResourceWorkflowId string
	....
	for {
		var senderWorkflowID string
		....
		logger.Info("signaled external workflow")
		selector := workflow.NewSelector(ctx)
		selector.AddFuture(workflow.NewTimer(ctx, unlockTimeout), func(f workflow.Future) {
			logger.Info("unlockTimeout exceeded")
		})
		selector.AddReceive(workflow.GetSignalChannel(ctx, releaseLockChannelName), func(c workflow.ReceiveChannel, more bool) {
			c.Receive(ctx, &ack)
			logger.Info("release signal received")
		})
        selectorCtx := workflow.WithValue(ctx, workflowKey{}, senderWorkflowID)
		selector.Select(selectorCtx)
	}
	return nil
}

Test (Failed when trying to put the workflow ID into the context)

func (s *mutexSuite) Test_MutexWorkflow_Success() {
	requestWorkflowID1 := "requestWorkflowID_1"
	requestWorkflowID2 := "requestWorkflowID_2"
	s.env.RegisterDelayedCallback(
		func() {
			s.env.SignalWorkflow(signalRequestLock, mutexRequest{
				WorkflowID: requestWorkflowID1,
				Timeout:    time.Minute,
			})
		},
		0*time.Millisecond,
	)

	s.env.RegisterDelayedCallback(
		func() {
			s.env.SignalWorkflow(signalRequestLock, mutexRequest{
				WorkflowID: requestWorkflowID2,
				Timeout:    time.Minute,
			})
		},
		2*time.Millisecond,
	)

	s.env.RegisterDelayedCallback(
		func() {
			encodedStatus, err := s.env.QueryWorkflow(QueryKeyForStatus)
			s.NoError(err)
			s.NotNil(encodedStatus)
			var status string
			err = encodedStatus.Get(&status)
			s.NoError(err)
			s.True(status.IsLocked())
			s.Equal(requestWorkflowID1, status)

			s.env.SignalWorkflow("unlock-"+requestWorkflowID1, "releaseLock")
		},
		5*time.Millisecond,
	)

	s.env.RegisterDelayedCallback(
		func() {
			encodedStatus, err := s.env.QueryWorkflow(QueryKeyForStatus)
			s.NoError(err)
			s.NotNil(encodedStatus)
			var status Status
			err = encodedStatus.Get(&status)
			s.NoError(err)
			s.True(status.IsLocked())
			s.Equal(requestWorkflowID2, status.AcquiredLockWorkflowID)

			s.env.SignalWorkflow("unlock-"+requestWorkflowID2, "releaseLock")
		},
		10*time.Millisecond,
	)

	s.env.OnSignalExternalWorkflow(
		mock.Anything,
		requestWorkflowID1, "",
		signalAcquireLock,
		mock.Anything,
	).Return(nil).Times(1)
	s.env.OnSignalExternalWorkflow(
		mock.Anything,
		requestWorkflowID2, "",
		signalAcquireLock,
		mock.Anything,
	).Return(nil).Times(1)

	s.env.ExecuteWorkflow(mutexWorkflow)
	s.True(s.env.IsWorkflowCompleted())
	s.NoError(s.env.GetWorkflowError())
}

Questions for this example:

  • Is the context in SetQueryHandler the same as the context passed into the selector.Select(selectorCtx or is it separate
  • If this is suppose to work, is there a bug in the test framework where the context is not being propagated from the Select to the Query Handler
  • If i got this route, I’m guessing I would need to use context propagation to work properly across different workers as well.

Thanks

In Go you can use a defer statement to ensure that some cleanup code is executed in the presence of multiple returns and panics. So you can set lockedResourceWorkflowId to “” in a defer statement.

I wouldn’t use context based values when normal local or other variables can do the work.

I’m unsure why you need to use a query for your use case. I would make the mutex workflow send a cancellation signal to the workflow that is currently holding the mutex if a new higher-priority request is received instead.

Hi Maxim, regardless of the recommended approach, is it expected that the value was not pulled from the context or is it a bug of the testing framework? We faced similar behavior in a few other places and couldn’t figure out what was actually expected.

Thanks!

@Anton I’m not sure what is the exact problem you faced. Looking at the sample above it is clear that it cannot work as written. The values attached to context create a copy of context and do not modify a parent context.

Yes, misread the code. Thanks.