Adding a state to the mutex workflow example

Based on this sample example of creating a mutex workflow: samples-go/mutex/mutex_workflow.go at main · temporalio/samples-go · GitHub

For my use case, I need to know what workflow is locking up the mutex. This is so that I can have another workflow call cancel to unlock that workflow.

For example:
workflow1, workflow2

  • workflow 1 locks up the resource by starting the mutex workflow where it will defer to unlock when the workflow is canceled.
  • workflow 2 starts and wants to know if the same resource is being locked by that mutex workflow. If it is, it wants to call cancel on workflow1 which will unlock the mutex

To solve this problem, I added a workflow state to the mutex workflow where it can be queried. This will track what current workflow is locking up the resource.

I was wondering if there is any concerns with this approach or is there a better solution:

Example of the code

func MutexWorkflow(...) {
    var lockedResourceWorkflowId Status
	err := workflow.SetQueryHandler(ctx, QueryKeyForStatus, func(_ []byte) (string, error) {
	       return lockedResourceWorkflowId   
	})
	if err != nil {
		return fmt.Errorf("failed to initialize setQueryHandler: %w", err)
	}

	var lockedResourceWorkflowId string
	....
	for {
		var senderWorkflowID string
		....
		logger.Info("signaled external workflow")
		lockedResourceWorkflowId = senderWorkflowID
		selector := workflow.NewSelector(ctx)
		selector.AddFuture(workflow.NewTimer(ctx, unlockTimeout), func(f workflow.Future) {
			logger.Info("unlockTimeout exceeded")
		})
		selector.AddReceive(workflow.GetSignalChannel(ctx, releaseLockChannelName), func(c workflow.ReceiveChannel, more bool) {
			c.Receive(ctx, &ack)
			logger.Info("release signal received")
		})
		selector.Select(ctx)
		lockedResourceWorkflowId = ""
	}
	return nil
}

the key part is just setting and resetting the workflow state

Question

  • I wasn’t sure if I had anything to worry about where Select where lockedResourceWorkflowId where it would for some reason fail and lockedResourceWorkflowId never get reset to "". Or if there is any concerns with concurrency issues.

Alternatively, another option would be to store the lockedResourceWorkflowId into the context. But not sure if this really works since I don’t know if the context gets propagated to the SetQueryHandler. I tried to test this via unit test and it didn’t seem to work.

Example:

func MutexWorkflow(...) {
    var lockedResourceWorkflowId Status
    type workflowIdKey struct {}
	err := workflow.SetQueryHandler(ctx, QueryKeyForStatus, func(_ []byte) (string, error) {
	       wfID, ok := ctx.Value(workflowKey{}).(string)
		if ok {
			return Status{
				AcquiredLockWorkflowID: wfID,
			}, nil
		} else {
			return Status{
				AcquiredLockWorkflowID: "",
			}, nil
		}
	})
	if err != nil {
		return fmt.Errorf("failed to initialize setQueryHandler: %w", err)
	}

	var lockedResourceWorkflowId string
	....
	for {
		var senderWorkflowID string
		....
		logger.Info("signaled external workflow")
		selector := workflow.NewSelector(ctx)
		selector.AddFuture(workflow.NewTimer(ctx, unlockTimeout), func(f workflow.Future) {
			logger.Info("unlockTimeout exceeded")
		})
		selector.AddReceive(workflow.GetSignalChannel(ctx, releaseLockChannelName), func(c workflow.ReceiveChannel, more bool) {
			c.Receive(ctx, &ack)
			logger.Info("release signal received")
		})
        selectorCtx := workflow.WithValue(ctx, workflowKey{}, senderWorkflowID)
		selector.Select(selectorCtx)
	}
	return nil
}

Test (Failed when trying to put the workflow ID into the context)

func (s *mutexSuite) Test_MutexWorkflow_Success() {
	requestWorkflowID1 := "requestWorkflowID_1"
	requestWorkflowID2 := "requestWorkflowID_2"
	s.env.RegisterDelayedCallback(
		func() {
			s.env.SignalWorkflow(signalRequestLock, mutexRequest{
				WorkflowID: requestWorkflowID1,
				Timeout:    time.Minute,
			})
		},
		0*time.Millisecond,
	)

	s.env.RegisterDelayedCallback(
		func() {
			s.env.SignalWorkflow(signalRequestLock, mutexRequest{
				WorkflowID: requestWorkflowID2,
				Timeout:    time.Minute,
			})
		},
		2*time.Millisecond,
	)

	s.env.RegisterDelayedCallback(
		func() {
			encodedStatus, err := s.env.QueryWorkflow(QueryKeyForStatus)
			s.NoError(err)
			s.NotNil(encodedStatus)
			var status string
			err = encodedStatus.Get(&status)
			s.NoError(err)
			s.True(status.IsLocked())
			s.Equal(requestWorkflowID1, status)

			s.env.SignalWorkflow("unlock-"+requestWorkflowID1, "releaseLock")
		},
		5*time.Millisecond,
	)

	s.env.RegisterDelayedCallback(
		func() {
			encodedStatus, err := s.env.QueryWorkflow(QueryKeyForStatus)
			s.NoError(err)
			s.NotNil(encodedStatus)
			var status Status
			err = encodedStatus.Get(&status)
			s.NoError(err)
			s.True(status.IsLocked())
			s.Equal(requestWorkflowID2, status.AcquiredLockWorkflowID)

			s.env.SignalWorkflow("unlock-"+requestWorkflowID2, "releaseLock")
		},
		10*time.Millisecond,
	)

	s.env.OnSignalExternalWorkflow(
		mock.Anything,
		requestWorkflowID1, "",
		signalAcquireLock,
		mock.Anything,
	).Return(nil).Times(1)
	s.env.OnSignalExternalWorkflow(
		mock.Anything,
		requestWorkflowID2, "",
		signalAcquireLock,
		mock.Anything,
	).Return(nil).Times(1)

	s.env.ExecuteWorkflow(mutexWorkflow)
	s.True(s.env.IsWorkflowCompleted())
	s.NoError(s.env.GetWorkflowError())
}

Questions for this example:

  • Is the context in SetQueryHandler the same as the context passed into the selector.Select(selectorCtx or is it separate
  • If this is suppose to work, is there a bug in the test framework where the context is not being propagated from the Select to the Query Handler
  • If i got this route, I’m guessing I would need to use context propagation to work properly across different workers as well.

Thanks

In Go you can use a defer statement to ensure that some cleanup code is executed in the presence of multiple returns and panics. So you can set lockedResourceWorkflowId to “” in a defer statement.

I wouldn’t use context based values when normal local or other variables can do the work.

I’m unsure why you need to use a query for your use case. I would make the mutex workflow send a cancellation signal to the workflow that is currently holding the mutex if a new higher-priority request is received instead.

Hi Maxim, regardless of the recommended approach, is it expected that the value was not pulled from the context or is it a bug of the testing framework? We faced similar behavior in a few other places and couldn’t figure out what was actually expected.

Thanks!

@Anton I’m not sure what is the exact problem you faced. Looking at the sample above it is clear that it cannot work as written. The values attached to context create a copy of context and do not modify a parent context.

Yes, misread the code. Thanks.

if a new higher-priority request is received instead.

I guess the way you would extend the example is to have a workflow state where you have the current priority and when it receives the signal compare it to that. Then trigger a cancelation within the sideEffect.

I have another use case where instead of determining the priority, I want to do some logic where I just process the latest request workflow and cancel the in between. I thought I could do this by storing a workflow query state in the mutex workflow to track this. As the mutex workflow receives lock requests, the logic will only keep the latest workflow id that has requesetd a lock and cancel the rest. However, I’m running into issue trying to actually update the state even thou I see the log statement printed. I"m guessing this is due to the code being in a sideEffect or that the mutex lock is triggered using SignalWithStartWorkflow

I’m prob not understanding how the section of the code related to the sideEffect and the processing of the request-lock signal works.

After first workflow that locks the mutex execute, isn’t the code just blocked at line 124 until the release signal or timeout occurs. When a new workflow triggers, SignalWithStartWorkflow, releaseLockCh channel gets injected with a signal and line 100 gets executed. At least thats what i can tell from the code and log printouts. However, i’m not clear how that even works since the workflow is still blocked on line 124

I’ve started another thread to capture this:

When the mutex workflow is waiting for lock release at line 124 all new lock requests are buffered in the requestLockChannel. So the code at line 100 is executed only when 124 returns,.

The side effect is used to generate releaseLockChannel name using some nondeterministic code.

1 Like