Mutex Workflow and how to track workflow that have requested lock

Hi,

  • Let’s say you have workflow A, B, C, D and each of these workflow calls the mutex workflow M on resource resource_1.
  • Workflow A starts and locks up resource_1 and runs for 20 min
  • Within that 20 min, workflow B, C, D starts but is effectively queued up until workflow A is complete. As A complete, B starts and when B completes, C starts.

In my case, I want all the queued up workflows on the mutex workflow other then the latest to be canceled; in this case B, C gets canceled. When A completes, D will start.

I looked at the sample example for a mutex samples-go/mutex at main · temporalio/samples-go · GitHub.

On line samples-go/mutex/mutex_workflow.go at main · temporalio/samples-go · GitHub,

when a workflow request the lock, line 100 gets hit.

Initially, I thought I could just add a workflow query state where I would track all the workflowIDs that have requested the lock.

var queuedResources []string

if err := workflow.SetQueryHandler(ctx, "queueRequest", func() ([]string, error) {
	return queuedResources, nil
}); err != nil {
	return err
}

_ = workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
	workflow.GetLogger(ctx).Info("request lock signaled")
        queuedResources = append(queuedResources, senderWorkflowID)		
	return generateUnlockChannelName(senderWorkflowID)
}).Get(&releaseLockChannelName)

When I run the example, I see the logs printed for “request lock signaled” for each of the workflow ‘A’, ‘B’, ‘C’, ‘D’ but I only see one workflow ID for workflow A.

I was hoping that with the queue, I could see the size and just dequeue the queuedResources and call cancel workflow until there is only one element.

Questions:

  • Is there a better approach to implement this solution
  • Can you not update the workflow statue inside a SideEffect or is it because SignalWithStartWorkflow is being used to start the mutex workflow?

Thanks,
Derek

You cannot update workflow state from side effect, this can break determinism.

I think the simplest solution in your case is to start a goroutine for each received signal. Then in that goroutine you either grant the lock if its is not taken or cancel workflow if the lock is taken.

Not fully tested, but I believe it sloves your problem:

func MutexWorkflowWithCancellation(
	ctx workflow.Context,
	namespace string,
	resourceID string,
	unlockTimeout time.Duration,
) error {
	currentWorkflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
	if currentWorkflowID == "default-test-workflow-id" {
		// unit testing hack, see https://github.com/uber-go/cadence-client/issues/663
		_ = workflow.Sleep(ctx, 10*time.Millisecond)
	}
	logger := workflow.GetLogger(ctx)
	logger.Info("started", "currentWorkflowID", currentWorkflowID)
	requestLockCh := workflow.GetSignalChannel(ctx, RequestLockSignalName)
	locked := false
	goroutineCount := 0
	for {
		var senderWorkflowID string
		if !requestLockCh.ReceiveAsync(&senderWorkflowID) {
			logger.Info("no more signals")
			break
		}
		goroutineCount++
		workflow.Go(ctx, func(ctx workflow.Context) {
			if locked {
				cancelSender(ctx, senderWorkflowID)
			} else {
				locked = true
				tryLock(ctx, senderWorkflowID, unlockTimeout)
				locked = false
			}
			goroutineCount--
		})
	}
	workflow.Await(ctx, func() bool {
		return goroutineCount == 0
	})
	return nil
}

func cancelSender(ctx workflow.Context, senderWorkflowID string) {
	logger := workflow.GetLogger(ctx)
	err := workflow.RequestCancelExternalWorkflow(ctx, senderWorkflowID, "").Get(ctx, nil)
	if err != nil {
		logger.Info("CancelExternalWorkflow error", "Error", err)
	}
}

func tryLock(ctx workflow.Context, senderWorkflowID string, unlockTimeout time.Duration) {
	logger := workflow.GetLogger(ctx)
	var releaseLockChannelName string
	_ = workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
		return generateUnlockChannelName(senderWorkflowID)
	}).Get(&releaseLockChannelName)
	logger.Info("generated release lock channel name", "releaseLockChannelName", releaseLockChannelName)
	// Send release lock channel name back to a senderWorkflowID, so that it can
	// release the lock using release lock channel name
	err := workflow.SignalExternalWorkflow(ctx, senderWorkflowID, "",
		AcquireLockSignalName, releaseLockChannelName).Get(ctx, nil)
	if err != nil {
		// .Get(ctx, nil) blocks until the signal is sent.
		// If the senderWorkflowID is closed (terminated/canceled/timeouted/completed/etc), this would return error.
		// In this case we release the lock immediately instead of failing the mutex workflow.
		// Mutex workflow failing would lead to all workflows that have sent requestLock will be waiting.
		logger.Info("SignalExternalWorkflow error", "Error", err)
		return
	}
	logger.Info("signaled external workflow")
	var ack string
	workflow.GetSignalChannel(ctx, releaseLockChannelName).ReceiveWithTimeout(ctx, unlockTimeout, &ack)
	logger.Info("release signal received: " + ack)
}

1 Like

@maxim

Maybe i’m missing something, but when i tested this

func cancelSender(ctx workflow.Context, senderWorkflowID string) {
	logger := workflow.GetLogger(ctx)
	err := workflow.RequestCancelExternalWorkflow(ctx, senderWorkflowID, "").Get(ctx, nil)
	if err != nil {
		logger.Info("CancelExternalWorkflow error", "Error", err)
	}
	logger.Info(fmt.Sprintf("Request to cancel workflow: [%s]", senderWorkflowID))
}

I see the “Request to cancel workflow” but I’m not actually seeing the request get canceled

2024/08/01 21:26:48 INFO  No logger configured for temporal client. Created default one.
2024/08/01 21:26:48 INFO  Started Worker Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@
2024/08/01 21:26:53 INFO  started Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType SampleWorkflowWithMutex WorkflowID SampleWorkflow1WithMutex_2e2b99f9-98fb-4303-a129-d133ea2c4774 RunID b1ecc4d2-dd4b-4210-8915-462992c63840 Attempt 1 currentWorkflowID SampleWorkflow1WithMutex_2e2b99f9-98fb-4303-a129-d133ea2c4774 resourceID 3a47066d-6ad3-48ba-b622-afe7c53eb0b7
2024/08/01 21:26:53 INFO  started Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType SampleWorkflowWithMutex WorkflowID SampleWorkflow2WithMutex_fb1aaec4-a3b0-4966-b6c2-50efac14ff1f RunID e2ba1cdf-ab10-405c-9747-ea67d29946a9 Attempt 1 currentWorkflowID SampleWorkflow2WithMutex_fb1aaec4-a3b0-4966-b6c2-50efac14ff1f resourceID 3a47066d-6ad3-48ba-b622-afe7c53eb0b7
2024/08/01 21:26:53 INFO  Signaled and started Workflow Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ ActivityID 1 ActivityType SignalWithStartMutexWorkflowActivity Attempt 1 WorkflowType SampleWorkflowWithMutex WorkflowID SampleWorkflow1WithMutex_2e2b99f9-98fb-4303-a129-d133ea2c4774 RunID b1ecc4d2-dd4b-4210-8915-462992c63840 WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d
2024/08/01 21:26:53 INFO  started Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType SampleWorkflowWithMutex WorkflowID SampleWorkflow3WithMutex_9aca808f-b713-48c3-9b12-d4c0a95967f0 RunID ca029a60-70b9-4a9e-b17f-dc26346757f2 Attempt 1 currentWorkflowID SampleWorkflow3WithMutex_9aca808f-b713-48c3-9b12-d4c0a95967f0 resourceID 3a47066d-6ad3-48ba-b622-afe7c53eb0b7
2024/08/01 21:26:53 INFO  Signaled and started Workflow Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ ActivityID 1 ActivityType SignalWithStartMutexWorkflowActivity Attempt 1 WorkflowType SampleWorkflowWithMutex WorkflowID SampleWorkflow2WithMutex_fb1aaec4-a3b0-4966-b6c2-50efac14ff1f RunID e2ba1cdf-ab10-405c-9747-ea67d29946a9 WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d
2024/08/01 21:26:53 INFO  Signaled and started Workflow Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ ActivityID 1 ActivityType SignalWithStartMutexWorkflowActivity Attempt 1 WorkflowType SampleWorkflowWithMutex WorkflowID SampleWorkflow3WithMutex_9aca808f-b713-48c3-9b12-d4c0a95967f0 RunID ca029a60-70b9-4a9e-b17f-dc26346757f2 WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d
2024/08/01 21:26:53 INFO  started Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 currentWorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7
2024/08/01 21:26:53 INFO  no more signals Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1
2024/08/01 21:26:53 INFO  Start go routines Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1
2024/08/01 21:26:53 INFO  routine:  Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 locked false
2024/08/01 21:26:53 INFO  tryLock Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 senderWorkflowID SampleWorkflow1WithMutex_2e2b99f9-98fb-4303-a129-d133ea2c4774
2024/08/01 21:26:53 DEBUG SideEffect Marker added Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 SideEffectID 1
2024/08/01 21:26:53 INFO  generated release lock channel name Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 releaseLockChannelName unlock-event-SampleWorkflow1WithMutex_2e2b99f9-98fb-4303-a129-d133ea2c4774
2024/08/01 21:26:53 INFO  routine:  Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 locked true
2024/08/01 21:26:53 INFO  cancel Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 senderWorkflowID SampleWorkflow2WithMutex_fb1aaec4-a3b0-4966-b6c2-50efac14ff1f
2024/08/01 21:26:53 INFO  routine:  Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 locked true
2024/08/01 21:26:53 INFO  cancel Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 senderWorkflowID SampleWorkflow3WithMutex_9aca808f-b713-48c3-9b12-d4c0a95967f0
2024/08/01 21:26:53 INFO  critical operation started Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType SampleWorkflowWithMutex WorkflowID SampleWorkflow1WithMutex_2e2b99f9-98fb-4303-a129-d133ea2c4774 RunID b1ecc4d2-dd4b-4210-8915-462992c63840 Attempt 1
2024/08/01 21:26:53 DEBUG NewTimer Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType SampleWorkflowWithMutex WorkflowID SampleWorkflow1WithMutex_2e2b99f9-98fb-4303-a129-d133ea2c4774 RunID b1ecc4d2-dd4b-4210-8915-462992c63840 Attempt 1 TimerID 10 Duration 10m0s
2024/08/01 21:26:53 INFO  signaled external workflow Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1
2024/08/01 21:26:53 DEBUG NewTimer Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1 TimerID 18 Duration 10m0s
2024/08/01 21:26:53 INFO  Request to cancel workflow: [SampleWorkflow2WithMutex_fb1aaec4-a3b0-4966-b6c2-50efac14ff1f] Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1
2024/08/01 21:26:53 INFO  Request to cancel workflow: [SampleWorkflow3WithMutex_9aca808f-b713-48c3-9b12-d4c0a95967f0] Namespace default TaskQueue mutex WorkerID 36561@T5FXQ5YV1Y@ WorkflowType MutexWorkflowWithCancellation WorkflowID mutex:TestUseCase:3a47066d-6ad3-48ba-b622-afe7c53eb0b7 RunID fa2a4f69-3f67-45f5-8b28-02b029e0516d Attempt 1

On those requested cancelled workflows, I would expect to see a cancelation msg




  • I expected the requested cancel workflow to display some banner saying that the cancel request was received

Code:


func (s *Mutex) LockWithCancellation(ctx workflow.Context,
	resourceID string, unlockTimeout time.Duration) (UnlockFunc, error) {

	activityCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
		ScheduleToCloseTimeout: time.Minute,
		RetryPolicy: &temporal.RetryPolicy{
			InitialInterval:    time.Second,
			BackoffCoefficient: 2.0,
			MaximumInterval:    time.Minute,
			MaximumAttempts:    5,
		},
	})

	var releaseLockChannelName string
	var execution workflow.Execution
	err := workflow.ExecuteLocalActivity(activityCtx,
		SignalWithStartMutexWorkflowActivity, s.lockNamespace,
		resourceID, s.currentWorkflowID, unlockTimeout).Get(ctx, &execution)
	if err != nil {
		return nil, err
	}
	workflow.GetSignalChannel(ctx, AcquireLockSignalName).
		Receive(ctx, &releaseLockChannelName)

	unlockFunc := func() error {
		return workflow.SignalExternalWorkflow(ctx, execution.ID, execution.RunID,
			releaseLockChannelName, "releaseLock").Get(ctx, nil)
	}
	return unlockFunc, nil
}

func MutexWorkflowWithCancellation(
	ctx workflow.Context,
	namespace string,
	resourceID string,
	unlockTimeout time.Duration,
) error {
	currentWorkflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
	if currentWorkflowID == "default-test-workflow-id" {
		// unit testing hack, see https://github.com/uber-go/cadence-client/issues/663
		_ = workflow.Sleep(ctx, 10*time.Millisecond)
	}
	logger := workflow.GetLogger(ctx)
	logger.Info("started", "currentWorkflowID", currentWorkflowID)
	requestLockCh := workflow.GetSignalChannel(ctx, RequestLockSignalName)
	locked := false
	goroutineCount := 0
	for {
		var senderWorkflowID string
		if !requestLockCh.ReceiveAsync(&senderWorkflowID) {
			logger.Info("no more signals")
			break
		}
		goroutineCount++
		workflow.Go(ctx, func(ctx workflow.Context) {
			logger.Info("routine: ", "locked", locked)
			if locked {
				logger.Info("cancel", "senderWorkflowID", senderWorkflowID)
				cancelSender(ctx, senderWorkflowID)
			} else {
				logger.Info("tryLock", "senderWorkflowID", senderWorkflowID)
				locked = true
				tryLock(ctx, senderWorkflowID, unlockTimeout)
				locked = false
			}
			goroutineCount--
		})
	}

	// when mutex can exit
	logger.Info("Start go routines")
	workflow.Await(ctx, func() bool {
		return goroutineCount == 0
	})
	return nil
}

func cancelSender(ctx workflow.Context, senderWorkflowID string) {
	logger := workflow.GetLogger(ctx)
	err := workflow.RequestCancelExternalWorkflow(ctx, senderWorkflowID, "").Get(ctx, nil)
	if err != nil {
		logger.Info("CancelExternalWorkflow error", "Error", err)
		return
	} else {
		logger.Info(fmt.Sprintf("Request to cancel workflow: [%s]", senderWorkflowID))
	}
}

func tryLock(ctx workflow.Context, senderWorkflowID string, unlockTimeout time.Duration) {
	logger := workflow.GetLogger(ctx)
	var releaseLockChannelName string
	_ = workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
		return generateUnlockChannelName(senderWorkflowID)
	}).Get(&releaseLockChannelName)
	logger.Info("generated release lock channel name", "releaseLockChannelName", releaseLockChannelName)
	// Send release lock channel name back to a senderWorkflowID, so that it can
	// release the lock using release lock channel name
	err := workflow.SignalExternalWorkflow(ctx, senderWorkflowID, "",
		AcquireLockSignalName, releaseLockChannelName).Get(ctx, nil)
	if err != nil {
		// .Get(ctx, nil) blocks until the signal is sent.
		// If the senderWorkflowID is closed (terminated/canceled/timeouted/completed/etc), this would return error.
		// In this case we release the lock immediately instead of failing the mutex workflow.
		// Mutex workflow failing would lead to all workflows that have sent requestLock will be waiting.
		logger.Info("SignalExternalWorkflow error", "Error", err)
		return
	}
	logger.Info("signaled external workflow")
	var ack string
	workflow.GetSignalChannel(ctx, releaseLockChannelName).ReceiveWithTimeout(ctx, unlockTimeout, &ack)
	logger.Info("release signal received: " + ack)
}


func SampleWorkflowWithMutex(
	ctx workflow.Context,
	resourceID string,
) error {

	currentWorkflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
	logger := workflow.GetLogger(ctx)
	logger.Info("started", "currentWorkflowID", currentWorkflowID, "resourceID", resourceID)

	mutex := NewMutex(currentWorkflowID, "TestUseCase")
	unlockFunc, err := mutex.LockWithCancellation(ctx, resourceID, 10*time.Minute)
	if err != nil {
		return err
	}

	// emulate long running process
	logger.Info("critical operation started")
	_ = workflow.Sleep(ctx, 10*time.Minute)
	logger.Info("critical operation finished")

	_ = unlockFunc()

	logger.Info("finished")
	return nil
}

main

func main() {
	// The client is a heavyweight object that should be created once per process.
	c, err := client.Dial(client.Options{
		HostPort: client.DefaultHostPort,
	})
	if err != nil {
		log.Fatalln("Unable to create client", err)
	}
	defer c.Close()

	// This workflow ID can be user business logic identifier as well.
	resourceID := uuid.New()
	workflow1Options := client.StartWorkflowOptions{
		ID:        "SampleWorkflow1WithMutex_" + uuid.New(),
		TaskQueue: "mutex",
	}

	workflow2Options := client.StartWorkflowOptions{
		ID:        "SampleWorkflow2WithMutex_" + uuid.New(),
		TaskQueue: "mutex",
	}

	workflow3Options := client.StartWorkflowOptions{
		ID:        "SampleWorkflow3WithMutex_" + uuid.New(),
		TaskQueue: "mutex",
	}

	we, err := c.ExecuteWorkflow(context.Background(), workflow1Options, mutex.SampleWorkflowWithMutex, resourceID)
	if err != nil {
		log.Fatalln("Unable to execute workflow1", err)
	} else {
		log.Println("Started workflow1", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
	}

	we, err = c.ExecuteWorkflow(context.Background(), workflow2Options, mutex.SampleWorkflowWithMutex, resourceID)
	if err != nil {
		log.Fatalln("Unable to execute workflow2", err)
	} else {
		log.Println("Started workflow2", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
	}

	we, err = c.ExecuteWorkflow(context.Background(), workflow3Options, mutex.SampleWorkflowWithMutex, resourceID)
	if err != nil {
		log.Fatalln("Unable to execute workflow3", err)
	} else {
		log.Println("Started workflow3", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
	}
}

based on the call stack of the workflow, it seems to still be waiting for mutex to be unlocked even thou it should have received a cancel request

Even when the timeout triggers, the other requested canceled workflows remains stuck

wouldn’t the code be something like this

goroutineCount++
workflow.Go(ctx, func(ctx workflow.Context) {
	 // if the queue has more then 1, cancel it since we only want the latest workflow request waiting for the mutex .
     if goroutineCount > 1 {
		cancelSender(ctx, senderWorkflowID)
	 } 
	
     tryLock(ctx, senderWorkflowID, unlockTimeout)
     goroutineCount--
})

In the example, workflow A is locking mutex, workflow B, C both request locks in that order. We want B to be canceled so that C would be the only one waiting for the lock.

The one concern I have thou is with the async call to cancel the workflow. If the workflow doesn’t cancel between the time request is sent to the mutex getting unlocked, you could potentially run into a race condition.

I could have it trigger an activity afterwards to block till its complete, wonder if the sdk provides a blocking mechanism like that?

Could you post the mutex workflow history?

I think it was my mistake by looking at the compact view. After taking a closer look, it does seem like the request is sent and recieved, but the workflow isn’t really canceling it.

Mutex

{
  "events": [
    {
      "eventId": "1",
      "eventTime": "2024-08-02T20:24:41.269566426Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
      "taskId": "12585703",
      "workflowExecutionStartedEventAttributes": {
        "workflowType": {
          "name": "MutexWorkflowWithCancellation"
        },
        "taskQueue": {
          "name": "mutex",
          "kind": "TASK_QUEUE_KIND_NORMAL"
        },
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "TestUseCase"
            },
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "6fe24f77-c1a4-4791-ae2e-b99826ef1565"
            },
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": 600000000000
            }
          ]
        },
        "workflowExecutionTimeout": "0s",
        "workflowRunTimeout": "0s",
        "workflowTaskTimeout": "10s",
        "originalExecutionRunId": "8c205172-9953-4d1f-beac-81efba44da32",
        "identity": "59433@T5FXQ5YV1Y@",
        "firstExecutionRunId": "8c205172-9953-4d1f-beac-81efba44da32",
        "retryPolicy": {
          "initialInterval": "1s",
          "backoffCoefficient": 2,
          "maximumInterval": "60s",
          "maximumAttempts": 5
        },
        "attempt": 1,
        "firstWorkflowTaskBackoff": "0s",
        "header": {},
        "workflowId": "mutex:TestUseCase:6fe24f77-c1a4-4791-ae2e-b99826ef1565"
      }
    },
    {
      "eventId": "2",
      "eventTime": "2024-08-02T20:24:41.269642010Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
      "taskId": "12585704",
      "workflowExecutionSignaledEventAttributes": {
        "signalName": "request-lock-event",
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
            }
          ]
        },
        "identity": "59433@T5FXQ5YV1Y@",
        "header": {}
      }
    },
    {
      "eventId": "3",
      "eventTime": "2024-08-02T20:24:41.269647343Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "12585705",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "mutex",
          "kind": "TASK_QUEUE_KIND_NORMAL"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "4",
      "eventTime": "2024-08-02T20:24:41.289274635Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
      "taskId": "12585709",
      "workflowExecutionSignaledEventAttributes": {
        "signalName": "request-lock-event",
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "SampleWorkflow2WithMutex_dd026b7f-f321-4ac0-a548-2fa4cccf118a"
            }
          ]
        },
        "identity": "59433@T5FXQ5YV1Y@",
        "header": {}
      }
    },
    {
      "eventId": "5",
      "eventTime": "2024-08-02T20:24:41.298923426Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "12585711",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "3",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "e1ba985f-b228-45c0-9707-907edfc135a8",
        "historySizeBytes": "787"
      }
    },
    {
      "eventId": "6",
      "eventTime": "2024-08-02T20:24:41.305215051Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "12585715",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "3",
        "startedEventId": "5",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {
          "langUsedFlags": [
            3
          ],
          "sdkName": "temporal-go",
          "sdkVersion": "1.28.1"
        },
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "7",
      "eventTime": "2024-08-02T20:24:41.305251218Z",
      "eventType": "EVENT_TYPE_MARKER_RECORDED",
      "taskId": "12585716",
      "markerRecordedEventAttributes": {
        "markerName": "SideEffect",
        "details": {
          "data": {
            "payloads": [
              {
                "metadata": {
                  "encoding": "anNvbi9wbGFpbg==",
                  "encodingDecoded": "json/plain"
                },
                "data": "unlock-event-SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
              }
            ]
          },
          "side-effect-id": {
            "payloads": [
              {
                "metadata": {
                  "encoding": "anNvbi9wbGFpbg==",
                  "encodingDecoded": "json/plain"
                },
                "data": 1
              }
            ]
          }
        },
        "workflowTaskCompletedEventId": "6"
      }
    },
    {
      "eventId": "8",
      "eventTime": "2024-08-02T20:24:41.305271135Z",
      "eventType": "EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED",
      "taskId": "12585717",
      "signalExternalWorkflowExecutionInitiatedEventAttributes": {
        "workflowTaskCompletedEventId": "6",
        "namespace": "default",
        "namespaceId": "dfb9aeb1-1e26-4ebf-9486-e9f6582d43ea",
        "workflowExecution": {
          "workflowId": "SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
        },
        "signalName": "acquire-lock-event",
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "unlock-event-SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
            }
          ]
        },
        "control": "8",
        "header": {}
      }
    },
    {
      "eventId": "9",
      "eventTime": "2024-08-02T20:24:41.305290135Z",
      "eventType": "EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED",
      "taskId": "12585718",
      "requestCancelExternalWorkflowExecutionInitiatedEventAttributes": {
        "workflowTaskCompletedEventId": "6",
        "namespace": "default",
        "namespaceId": "dfb9aeb1-1e26-4ebf-9486-e9f6582d43ea",
        "workflowExecution": {
          "workflowId": "SampleWorkflow2WithMutex_dd026b7f-f321-4ac0-a548-2fa4cccf118a"
        },
        "control": "9"
      }
    },
    {
      "eventId": "10",
      "eventTime": "2024-08-02T20:24:41.301776885Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
      "taskId": "12585719",
      "workflowExecutionSignaledEventAttributes": {
        "signalName": "request-lock-event",
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "SampleWorkflow3WithMutex_2ad3e1a1-4782-4423-a167-494300f9ee01"
            }
          ]
        },
        "identity": "59433@T5FXQ5YV1Y@",
        "header": {}
      }
    },
    {
      "eventId": "11",
      "eventTime": "2024-08-02T20:24:41.305302801Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "12585720",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "T5FXQ5YV1Y:573d7b95-7fc7-4ea7-b529-59e4fbc84a4b",
          "kind": "TASK_QUEUE_KIND_STICKY",
          "normalName": "mutex"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "12",
      "eventTime": "2024-08-02T20:24:41.305307510Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "12585721",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "11",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "request-from-RespondWorkflowTaskCompleted",
        "historySizeBytes": "912"
      }
    },
    {
      "eventId": "13",
      "eventTime": "2024-08-02T20:24:41.316277468Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "12585727",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "11",
        "startedEventId": "12",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {},
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "14",
      "eventTime": "2024-08-02T20:24:41.314178593Z",
      "eventType": "EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED",
      "taskId": "12585728",
      "externalWorkflowExecutionCancelRequestedEventAttributes": {
        "initiatedEventId": "9",
        "namespace": "default",
        "namespaceId": "dfb9aeb1-1e26-4ebf-9486-e9f6582d43ea",
        "workflowExecution": {
          "workflowId": "SampleWorkflow2WithMutex_dd026b7f-f321-4ac0-a548-2fa4cccf118a"
        }
      }
    },
    {
      "eventId": "15",
      "eventTime": "2024-08-02T20:24:41.316310343Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "12585729",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "T5FXQ5YV1Y:573d7b95-7fc7-4ea7-b529-59e4fbc84a4b",
          "kind": "TASK_QUEUE_KIND_STICKY",
          "normalName": "mutex"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "16",
      "eventTime": "2024-08-02T20:24:41.316314843Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "12585730",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "15",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "request-from-RespondWorkflowTaskCompleted",
        "historySizeBytes": "2042"
      }
    },
    {
      "eventId": "17",
      "eventTime": "2024-08-02T20:24:41.341047802Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "12585733",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "15",
        "startedEventId": "16",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {},
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "18",
      "eventTime": "2024-08-02T20:24:41.333563468Z",
      "eventType": "EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED",
      "taskId": "12585734",
      "externalWorkflowExecutionSignaledEventAttributes": {
        "initiatedEventId": "8",
        "namespace": "default",
        "namespaceId": "dfb9aeb1-1e26-4ebf-9486-e9f6582d43ea",
        "workflowExecution": {
          "workflowId": "SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
        },
        "control": "8"
      }
    },
    {
      "eventId": "19",
      "eventTime": "2024-08-02T20:24:41.341084593Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "12585735",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "T5FXQ5YV1Y:573d7b95-7fc7-4ea7-b529-59e4fbc84a4b",
          "kind": "TASK_QUEUE_KIND_STICKY",
          "normalName": "mutex"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "20",
      "eventTime": "2024-08-02T20:24:41.341089593Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "12585736",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "19",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "request-from-RespondWorkflowTaskCompleted",
        "historySizeBytes": "2499"
      }
    },
    {
      "eventId": "21",
      "eventTime": "2024-08-02T20:24:41.368550593Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "12585739",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "19",
        "startedEventId": "20",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {},
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "22",
      "eventTime": "2024-08-02T20:24:41.368585760Z",
      "eventType": "EVENT_TYPE_TIMER_STARTED",
      "taskId": "12585740",
      "timerStartedEventAttributes": {
        "timerId": "22",
        "startToFireTimeout": "600s",
        "workflowTaskCompletedEventId": "21"
      }
    }
  ]
}

So it looks like the request did get sent out. Looks like the request was recieved but the workflow is still running…

{
  "events": [
    {
      "eventId": "1",
      "eventTime": "2024-08-02T20:24:41.258784968Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
      "taskId": "13633231",
      "workflowExecutionStartedEventAttributes": {
        "workflowType": {
          "name": "SampleWorkflowWithMutex"
        },
        "taskQueue": {
          "name": "mutex",
          "kind": "TASK_QUEUE_KIND_NORMAL"
        },
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "6fe24f77-c1a4-4791-ae2e-b99826ef1565"
            }
          ]
        },
        "workflowExecutionTimeout": "0s",
        "workflowRunTimeout": "0s",
        "workflowTaskTimeout": "10s",
        "originalExecutionRunId": "0ad839ad-b5e2-4300-9d42-3b6f3a2a9123",
        "identity": "60922@T5FXQ5YV1Y@",
        "firstExecutionRunId": "0ad839ad-b5e2-4300-9d42-3b6f3a2a9123",
        "attempt": 1,
        "firstWorkflowTaskBackoff": "0s",
        "header": {},
        "workflowId": "SampleWorkflow2WithMutex_dd026b7f-f321-4ac0-a548-2fa4cccf118a"
      }
    },
    {
      "eventId": "2",
      "eventTime": "2024-08-02T20:24:41.258833301Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "13633232",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "mutex",
          "kind": "TASK_QUEUE_KIND_NORMAL"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "3",
      "eventTime": "2024-08-02T20:24:41.268374468Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "13633241",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "2",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "560ca5c0-72fe-4d49-b283-48b1b3e7e489",
        "historySizeBytes": "354"
      }
    },
    {
      "eventId": "4",
      "eventTime": "2024-08-02T20:24:41.298014176Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "13633249",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "2",
        "startedEventId": "3",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {
          "langUsedFlags": [
            3
          ],
          "sdkName": "temporal-go",
          "sdkVersion": "1.28.1"
        },
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "5",
      "eventTime": "2024-08-02T20:24:41.298052760Z",
      "eventType": "EVENT_TYPE_MARKER_RECORDED",
      "taskId": "13633250",
      "markerRecordedEventAttributes": {
        "markerName": "LocalActivity",
        "details": {
          "data": {
            "payloads": [
              {
                "metadata": {
                  "encoding": "anNvbi9wbGFpbg==",
                  "encodingDecoded": "json/plain"
                },
                "data": {
                  "ActivityID": "1",
                  "ActivityType": "SignalWithStartMutexWorkflowActivity",
                  "ReplayTime": "2024-08-02T20:24:41.278378551Z",
                  "Attempt": 1,
                  "Backoff": 0
                }
              }
            ]
          },
          "result": {
            "payloads": [
              {
                "metadata": {
                  "encoding": "anNvbi9wbGFpbg==",
                  "encodingDecoded": "json/plain"
                },
                "data": {
                  "ID": "mutex:TestUseCase:6fe24f77-c1a4-4791-ae2e-b99826ef1565",
                  "RunID": "8c205172-9953-4d1f-beac-81efba44da32"
                }
              }
            ]
          }
        },
        "workflowTaskCompletedEventId": "4"
      }
    },
    {
      "eventId": "6",
      "eventTime": "2024-08-02T20:24:41.311327343Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED",
      "taskId": "13633253",
      "workflowExecutionCancelRequestedEventAttributes": {
        "externalInitiatedEventId": "9",
        "externalWorkflowExecution": {
          "workflowId": "mutex:TestUseCase:6fe24f77-c1a4-4791-ae2e-b99826ef1565",
          "runId": "8c205172-9953-4d1f-beac-81efba44da32"
        },
        "identity": "history-service"
      }
    },
    {
      "eventId": "7",
      "eventTime": "2024-08-02T20:24:41.311363260Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "13633254",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "T5FXQ5YV1Y:573d7b95-7fc7-4ea7-b529-59e4fbc84a4b",
          "kind": "TASK_QUEUE_KIND_STICKY",
          "normalName": "mutex"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "8",
      "eventTime": "2024-08-02T20:24:41.315159510Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "13633258",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "7",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "a0882d43-0218-4f1c-973d-ee0b6a67fc8f",
        "historySizeBytes": "1213"
      }
    },
    {
      "eventId": "9",
      "eventTime": "2024-08-02T20:24:41.320556885Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "13633262",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "7",
        "startedEventId": "8",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {},
        "meteringMetadata": {}
      }
    }
  ]
}

Call stack for that

Looks like based on this Cancelled workflow waiting for signals suggestion with using a selector and ctx.done i’m able to get it to cancel.

The flow is a bit strange thou because at this point I have to manually handle the cancelation that you receive in the signal and propagate it to the callers. Example:

func (s *Mutex) LockWithCancellation(ctx workflow.Context,
	resourceID string, unlockTimeout time.Duration) (UnlockFunc, error) {

	activityCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
		ScheduleToCloseTimeout: time.Minute,
		RetryPolicy: &temporal.RetryPolicy{
			InitialInterval:    time.Second,
			BackoffCoefficient: 2.0,
			MaximumInterval:    time.Minute,
			MaximumAttempts:    5,
		},
	})

	var releaseLockChannelName string
	var execution workflow.Execution
	err := workflow.ExecuteLocalActivity(activityCtx, SignalWithStartMutexWorkflowActivity, s.lockNamespace,
		resourceID, s.currentWorkflowID, unlockTimeout).Get(ctx, &execution)
	if err != nil {
		return nil, err
	}

	isCanceled := false
	selector := workflow.NewSelector(ctx)
	selector.AddReceive(workflow.GetSignalChannel(ctx, AcquireLockSignalName), func(c workflow.ReceiveChannel, more bool) {
		c.Receive(ctx, &releaseLockChannelName)
		workflow.GetLogger(ctx).Info("release signal received")
	})
	selector.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) {
		isCanceled = true
		workflow.GetLogger(ctx).Info("recieved cancel")
	})
	selector.Select(ctx)

	if !isCanceled {
		unlockFunc := func() error {
			return workflow.SignalExternalWorkflow(ctx, execution.ID, execution.RunID,
				releaseLockChannelName, "releaseLock").Get(ctx, nil)
		}
		return unlockFunc, nil
	}

	return nil, nil
}

in this case, i use isCanceled to identify if a cancel has been propagated. The workfow that references this mutex has tohandle the case with unlock return is nil.

Summary of the questions left:

  • Is there a better way to handle this cancel propagation?
  • And advice on this one
The one concern I have thou is with the async call to cancel the workflow. If the workflow doesn’t cancel between the time request is sent to the mutex getting unlocked, you could potentially run into a race condition.

I could have it trigger an activity afterwards to block till its complete, wonder if the sdk provides a blocking mechanism like that?

@maxim here is the repo GitHub - darewreck54/samples-go: Temporal Go SDK samples to reproduce it.

Listening on ctx.Done is the right way to handle cancellation in this case.

Thanks @maxim ,

To implement the original use case (mutex that queues only the latest request to lock), I have a mutex workflow that tracks what is currently queued. When a new request comes in, it will update that internal workflow state.

In my example, I have workflow A, B,C,D. When all are queued up, I expect A to be running, B and C canceled, and D running waiting for the mutex to unlock.

Code:

func MutexWorkflowWithCancellation(
	ctx workflow.Context,
	namespace string,
	resourceID string,
	unlockTimeout time.Duration,
) error {
	currentWorkflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
	if currentWorkflowID == "default-test-workflow-id" {
		// unit testing hack, see https://github.com/uber-go/cadence-client/issues/663
		_ = workflow.Sleep(ctx, 10*time.Millisecond)
	}
	logger := workflow.GetLogger(ctx)
	logger.Info("started", "currentWorkflowID", currentWorkflowID)
	requestLockCh := workflow.GetSignalChannel(ctx, RequestLockSignalName)
	locked := false
	goroutineCount := 0
	var queuedWorkerId string
	for {
		var senderWorkflowID string

		if !requestLockCh.ReceiveAsync(&senderWorkflowID) {
			logger.Info("no more signals")
			break
		} else {
			logger.Info(fmt.Sprintf("request lock: [%s]", senderWorkflowID))
		}

		goroutineCount++
		workflow.Go(ctx, func(ctx workflow.Context) {
			if locked {
				logger.Info(fmt.Sprintf("lock request for [%s] w/ queuedWorkerId: [%s]", senderWorkflowID,
					queuedWorkerId))
				if queuedWorkerId != "" {
					idCancel := queuedWorkerId
					queuedWorkerId = senderWorkflowID
					logger.Info(fmt.Sprintf("cancel [%s]", idCancel))
					cancelSender(ctx, idCancel)
				} else {
					queuedWorkerId = senderWorkflowID
				}
				logger.Info(fmt.Sprintf("updated queuedWorkerId: [%v]", queuedWorkerId))
			} else {
				logger.Info(fmt.Sprintf("tryLock [%s]", senderWorkflowID))
				locked = true
				tryLock(ctx, senderWorkflowID, unlockTimeout)
				locked = false
				logger.Info(fmt.Sprintf("lock freeed [%s]", senderWorkflowID))
			}
			goroutineCount--
		})
	}

	// when mutex can exit
	workflow.Await(ctx, func() bool {
		return goroutineCount == 0 && queuedWorkerId == ""
	})
	return nil
}

In this case, B gets canceled by C doesn’t.

I think there something wrong with how i’m managing the queue state across the multiple go routines.

Thanks,
Derek

After taking a closer look, I’m not sure if this would work. This is original mutex code which i was trying to extend to support the use case samples-go/mutex/mutex_workflow.go at main · temporalio/samples-go · GitHub. The reason why it works is because in the mutex workflow it actually blocks in the for loop until it gets an acquire signal. During the block it records all the signals to request a lock and queues it up.

However, by having the workflow.Await at the very end outside of the for loop your only processing the request received in that one instance otherwise it jumps right out of the for loop. This is more apparent when you start workflows with a 1 second delay between them and in this case, the mutex only process the first workflow before the mutex workflow gets killed.

You would have to somehow update the code to keep listening to request to lock while the mutex is still processing the go routines.

Got a working solution but not sure if this is the best or if there is any potential issues. samples-go/mutex at workingMutex · darewreck54/samples-go · GitHub

func MutexWorkflowWithCancellation(
	ctx workflow.Context,
	namespace string,
	resourceID string,
	unlockTimeout time.Duration,
) error {
	currentWorkflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
	if currentWorkflowID == "default-test-workflow-id" {
		// unit testing hack, see https://github.com/uber-go/cadence-client/issues/663
		_ = workflow.Sleep(ctx, 10*time.Millisecond)
	}
	logger := workflow.GetLogger(ctx)
	logger.Info("started", "currentWorkflowID", currentWorkflowID)
	requestLockCh := workflow.GetSignalChannel(ctx, RequestLockSignalName)
	locked := false

	goRoutineCount := 0
	queuedIds := make([]string, 0)
	workflow.Go(ctx, func(ctx workflow.Context) {
		for {
			var senderWorkflowID string
			if requestLockCh.Receive(ctx, &senderWorkflowID) {
				goRoutineCount++
				if locked {
					// when locked, we only allow one queued item, and the rest is canceled
					for _, id := range queuedIds {
						cancelSender(ctx, id)
						goRoutineCount--
					}
					queuedIds = make([]string, 0)
				}

				logger.Info(fmt.Sprintf("Adding [%s] to [%v]", senderWorkflowID, queuedIds))
				queuedIds = append(queuedIds, senderWorkflowID)
			}
		}
	})

	// Since we are starting a go routine to listen to for "request-lock" signals, let's delay by a second
	// Note sure if this is needed or how fast `requestLockCh.Receive` will get the signal before it his the `for` loop
	workflow.Sleep(ctx, time.Second)

	for {
		if !locked {
			logger.Info(fmt.Sprintf("Processing [%v]", queuedIds))
			if len(queuedIds) == 0 {
				break
			} else if len(queuedIds) == 1 {
				locked = true
				id := queuedIds[0]
				queuedIds = make([]string, 0)
				tryLock(ctx, id, unlockTimeout)
				locked = false
				goRoutineCount--
			} else {
				logger.Error("queue should never be more then 1")
			}
		}
	}

	workflow.Await(ctx, func() bool {
		return goRoutineCount == 0
	})

	return nil
}

The other part is that when the mutex expires, I also cancel the locked workflow. However, workflow.RequestCancelExternalWorkflow doesn’t seem to have the ability to add a message to indicate why its being canceled. Is there a way to pass that in the ctx?

Would you file an issue to get the reason added to this call?

SignalExternalWorkflow might help for now

cancelWorkflowWithReason := func(workflowID string, reason string) {
		// Signal the workflow with the cancellation reason
		workflow.SignalExternalWorkflow(ctx, workflowID, "", "cancellationReason", reason)
		// Request cancellation
		workflow.RequestCancelExternalWorkflow(ctx, workflowID, "")
	}

if len(lockQueue) > 2 {
					// keeping only the first and the last
					for i := 1; i < len(lockQueue)-1; i++ {
						cancelWorkflowWithReason(lockQueue[i], "cancelling this workflow, resource not available")
					}
					lockQueue = append([]string{lockQueue[0]}, lockQueue[len(lockQueue)-1])
					logger.Info("Updated queue", "lockQueue", lockQueue)
				}

[Feature Request] Add reason field to RequestCancelWorkflowExecutionRequest · Issue #2426 · temporalio/temporal · GitHub created 2 years ago