Mutex Workflow and how to track workflow that have requested lock

I think it was my mistake by looking at the compact view. After taking a closer look, it does seem like the request is sent and recieved, but the workflow isn’t really canceling it.

Mutex

{
  "events": [
    {
      "eventId": "1",
      "eventTime": "2024-08-02T20:24:41.269566426Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
      "taskId": "12585703",
      "workflowExecutionStartedEventAttributes": {
        "workflowType": {
          "name": "MutexWorkflowWithCancellation"
        },
        "taskQueue": {
          "name": "mutex",
          "kind": "TASK_QUEUE_KIND_NORMAL"
        },
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "TestUseCase"
            },
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "6fe24f77-c1a4-4791-ae2e-b99826ef1565"
            },
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": 600000000000
            }
          ]
        },
        "workflowExecutionTimeout": "0s",
        "workflowRunTimeout": "0s",
        "workflowTaskTimeout": "10s",
        "originalExecutionRunId": "8c205172-9953-4d1f-beac-81efba44da32",
        "identity": "59433@T5FXQ5YV1Y@",
        "firstExecutionRunId": "8c205172-9953-4d1f-beac-81efba44da32",
        "retryPolicy": {
          "initialInterval": "1s",
          "backoffCoefficient": 2,
          "maximumInterval": "60s",
          "maximumAttempts": 5
        },
        "attempt": 1,
        "firstWorkflowTaskBackoff": "0s",
        "header": {},
        "workflowId": "mutex:TestUseCase:6fe24f77-c1a4-4791-ae2e-b99826ef1565"
      }
    },
    {
      "eventId": "2",
      "eventTime": "2024-08-02T20:24:41.269642010Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
      "taskId": "12585704",
      "workflowExecutionSignaledEventAttributes": {
        "signalName": "request-lock-event",
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
            }
          ]
        },
        "identity": "59433@T5FXQ5YV1Y@",
        "header": {}
      }
    },
    {
      "eventId": "3",
      "eventTime": "2024-08-02T20:24:41.269647343Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "12585705",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "mutex",
          "kind": "TASK_QUEUE_KIND_NORMAL"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "4",
      "eventTime": "2024-08-02T20:24:41.289274635Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
      "taskId": "12585709",
      "workflowExecutionSignaledEventAttributes": {
        "signalName": "request-lock-event",
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "SampleWorkflow2WithMutex_dd026b7f-f321-4ac0-a548-2fa4cccf118a"
            }
          ]
        },
        "identity": "59433@T5FXQ5YV1Y@",
        "header": {}
      }
    },
    {
      "eventId": "5",
      "eventTime": "2024-08-02T20:24:41.298923426Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "12585711",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "3",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "e1ba985f-b228-45c0-9707-907edfc135a8",
        "historySizeBytes": "787"
      }
    },
    {
      "eventId": "6",
      "eventTime": "2024-08-02T20:24:41.305215051Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "12585715",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "3",
        "startedEventId": "5",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {
          "langUsedFlags": [
            3
          ],
          "sdkName": "temporal-go",
          "sdkVersion": "1.28.1"
        },
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "7",
      "eventTime": "2024-08-02T20:24:41.305251218Z",
      "eventType": "EVENT_TYPE_MARKER_RECORDED",
      "taskId": "12585716",
      "markerRecordedEventAttributes": {
        "markerName": "SideEffect",
        "details": {
          "data": {
            "payloads": [
              {
                "metadata": {
                  "encoding": "anNvbi9wbGFpbg==",
                  "encodingDecoded": "json/plain"
                },
                "data": "unlock-event-SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
              }
            ]
          },
          "side-effect-id": {
            "payloads": [
              {
                "metadata": {
                  "encoding": "anNvbi9wbGFpbg==",
                  "encodingDecoded": "json/plain"
                },
                "data": 1
              }
            ]
          }
        },
        "workflowTaskCompletedEventId": "6"
      }
    },
    {
      "eventId": "8",
      "eventTime": "2024-08-02T20:24:41.305271135Z",
      "eventType": "EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED",
      "taskId": "12585717",
      "signalExternalWorkflowExecutionInitiatedEventAttributes": {
        "workflowTaskCompletedEventId": "6",
        "namespace": "default",
        "namespaceId": "dfb9aeb1-1e26-4ebf-9486-e9f6582d43ea",
        "workflowExecution": {
          "workflowId": "SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
        },
        "signalName": "acquire-lock-event",
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "unlock-event-SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
            }
          ]
        },
        "control": "8",
        "header": {}
      }
    },
    {
      "eventId": "9",
      "eventTime": "2024-08-02T20:24:41.305290135Z",
      "eventType": "EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED",
      "taskId": "12585718",
      "requestCancelExternalWorkflowExecutionInitiatedEventAttributes": {
        "workflowTaskCompletedEventId": "6",
        "namespace": "default",
        "namespaceId": "dfb9aeb1-1e26-4ebf-9486-e9f6582d43ea",
        "workflowExecution": {
          "workflowId": "SampleWorkflow2WithMutex_dd026b7f-f321-4ac0-a548-2fa4cccf118a"
        },
        "control": "9"
      }
    },
    {
      "eventId": "10",
      "eventTime": "2024-08-02T20:24:41.301776885Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED",
      "taskId": "12585719",
      "workflowExecutionSignaledEventAttributes": {
        "signalName": "request-lock-event",
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "SampleWorkflow3WithMutex_2ad3e1a1-4782-4423-a167-494300f9ee01"
            }
          ]
        },
        "identity": "59433@T5FXQ5YV1Y@",
        "header": {}
      }
    },
    {
      "eventId": "11",
      "eventTime": "2024-08-02T20:24:41.305302801Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "12585720",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "T5FXQ5YV1Y:573d7b95-7fc7-4ea7-b529-59e4fbc84a4b",
          "kind": "TASK_QUEUE_KIND_STICKY",
          "normalName": "mutex"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "12",
      "eventTime": "2024-08-02T20:24:41.305307510Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "12585721",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "11",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "request-from-RespondWorkflowTaskCompleted",
        "historySizeBytes": "912"
      }
    },
    {
      "eventId": "13",
      "eventTime": "2024-08-02T20:24:41.316277468Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "12585727",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "11",
        "startedEventId": "12",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {},
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "14",
      "eventTime": "2024-08-02T20:24:41.314178593Z",
      "eventType": "EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED",
      "taskId": "12585728",
      "externalWorkflowExecutionCancelRequestedEventAttributes": {
        "initiatedEventId": "9",
        "namespace": "default",
        "namespaceId": "dfb9aeb1-1e26-4ebf-9486-e9f6582d43ea",
        "workflowExecution": {
          "workflowId": "SampleWorkflow2WithMutex_dd026b7f-f321-4ac0-a548-2fa4cccf118a"
        }
      }
    },
    {
      "eventId": "15",
      "eventTime": "2024-08-02T20:24:41.316310343Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "12585729",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "T5FXQ5YV1Y:573d7b95-7fc7-4ea7-b529-59e4fbc84a4b",
          "kind": "TASK_QUEUE_KIND_STICKY",
          "normalName": "mutex"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "16",
      "eventTime": "2024-08-02T20:24:41.316314843Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "12585730",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "15",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "request-from-RespondWorkflowTaskCompleted",
        "historySizeBytes": "2042"
      }
    },
    {
      "eventId": "17",
      "eventTime": "2024-08-02T20:24:41.341047802Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "12585733",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "15",
        "startedEventId": "16",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {},
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "18",
      "eventTime": "2024-08-02T20:24:41.333563468Z",
      "eventType": "EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_SIGNALED",
      "taskId": "12585734",
      "externalWorkflowExecutionSignaledEventAttributes": {
        "initiatedEventId": "8",
        "namespace": "default",
        "namespaceId": "dfb9aeb1-1e26-4ebf-9486-e9f6582d43ea",
        "workflowExecution": {
          "workflowId": "SampleWorkflow1WithMutex_ec110573-3884-429e-ad44-eafa8a189ff2"
        },
        "control": "8"
      }
    },
    {
      "eventId": "19",
      "eventTime": "2024-08-02T20:24:41.341084593Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "12585735",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "T5FXQ5YV1Y:573d7b95-7fc7-4ea7-b529-59e4fbc84a4b",
          "kind": "TASK_QUEUE_KIND_STICKY",
          "normalName": "mutex"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "20",
      "eventTime": "2024-08-02T20:24:41.341089593Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "12585736",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "19",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "request-from-RespondWorkflowTaskCompleted",
        "historySizeBytes": "2499"
      }
    },
    {
      "eventId": "21",
      "eventTime": "2024-08-02T20:24:41.368550593Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "12585739",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "19",
        "startedEventId": "20",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {},
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "22",
      "eventTime": "2024-08-02T20:24:41.368585760Z",
      "eventType": "EVENT_TYPE_TIMER_STARTED",
      "taskId": "12585740",
      "timerStartedEventAttributes": {
        "timerId": "22",
        "startToFireTimeout": "600s",
        "workflowTaskCompletedEventId": "21"
      }
    }
  ]
}

So it looks like the request did get sent out. Looks like the request was recieved but the workflow is still running…

{
  "events": [
    {
      "eventId": "1",
      "eventTime": "2024-08-02T20:24:41.258784968Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
      "taskId": "13633231",
      "workflowExecutionStartedEventAttributes": {
        "workflowType": {
          "name": "SampleWorkflowWithMutex"
        },
        "taskQueue": {
          "name": "mutex",
          "kind": "TASK_QUEUE_KIND_NORMAL"
        },
        "input": {
          "payloads": [
            {
              "metadata": {
                "encoding": "anNvbi9wbGFpbg==",
                "encodingDecoded": "json/plain"
              },
              "data": "6fe24f77-c1a4-4791-ae2e-b99826ef1565"
            }
          ]
        },
        "workflowExecutionTimeout": "0s",
        "workflowRunTimeout": "0s",
        "workflowTaskTimeout": "10s",
        "originalExecutionRunId": "0ad839ad-b5e2-4300-9d42-3b6f3a2a9123",
        "identity": "60922@T5FXQ5YV1Y@",
        "firstExecutionRunId": "0ad839ad-b5e2-4300-9d42-3b6f3a2a9123",
        "attempt": 1,
        "firstWorkflowTaskBackoff": "0s",
        "header": {},
        "workflowId": "SampleWorkflow2WithMutex_dd026b7f-f321-4ac0-a548-2fa4cccf118a"
      }
    },
    {
      "eventId": "2",
      "eventTime": "2024-08-02T20:24:41.258833301Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "13633232",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "mutex",
          "kind": "TASK_QUEUE_KIND_NORMAL"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "3",
      "eventTime": "2024-08-02T20:24:41.268374468Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "13633241",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "2",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "560ca5c0-72fe-4d49-b283-48b1b3e7e489",
        "historySizeBytes": "354"
      }
    },
    {
      "eventId": "4",
      "eventTime": "2024-08-02T20:24:41.298014176Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "13633249",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "2",
        "startedEventId": "3",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {
          "langUsedFlags": [
            3
          ],
          "sdkName": "temporal-go",
          "sdkVersion": "1.28.1"
        },
        "meteringMetadata": {}
      }
    },
    {
      "eventId": "5",
      "eventTime": "2024-08-02T20:24:41.298052760Z",
      "eventType": "EVENT_TYPE_MARKER_RECORDED",
      "taskId": "13633250",
      "markerRecordedEventAttributes": {
        "markerName": "LocalActivity",
        "details": {
          "data": {
            "payloads": [
              {
                "metadata": {
                  "encoding": "anNvbi9wbGFpbg==",
                  "encodingDecoded": "json/plain"
                },
                "data": {
                  "ActivityID": "1",
                  "ActivityType": "SignalWithStartMutexWorkflowActivity",
                  "ReplayTime": "2024-08-02T20:24:41.278378551Z",
                  "Attempt": 1,
                  "Backoff": 0
                }
              }
            ]
          },
          "result": {
            "payloads": [
              {
                "metadata": {
                  "encoding": "anNvbi9wbGFpbg==",
                  "encodingDecoded": "json/plain"
                },
                "data": {
                  "ID": "mutex:TestUseCase:6fe24f77-c1a4-4791-ae2e-b99826ef1565",
                  "RunID": "8c205172-9953-4d1f-beac-81efba44da32"
                }
              }
            ]
          }
        },
        "workflowTaskCompletedEventId": "4"
      }
    },
    {
      "eventId": "6",
      "eventTime": "2024-08-02T20:24:41.311327343Z",
      "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED",
      "taskId": "13633253",
      "workflowExecutionCancelRequestedEventAttributes": {
        "externalInitiatedEventId": "9",
        "externalWorkflowExecution": {
          "workflowId": "mutex:TestUseCase:6fe24f77-c1a4-4791-ae2e-b99826ef1565",
          "runId": "8c205172-9953-4d1f-beac-81efba44da32"
        },
        "identity": "history-service"
      }
    },
    {
      "eventId": "7",
      "eventTime": "2024-08-02T20:24:41.311363260Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
      "taskId": "13633254",
      "workflowTaskScheduledEventAttributes": {
        "taskQueue": {
          "name": "T5FXQ5YV1Y:573d7b95-7fc7-4ea7-b529-59e4fbc84a4b",
          "kind": "TASK_QUEUE_KIND_STICKY",
          "normalName": "mutex"
        },
        "startToCloseTimeout": "10s",
        "attempt": 1
      }
    },
    {
      "eventId": "8",
      "eventTime": "2024-08-02T20:24:41.315159510Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
      "taskId": "13633258",
      "workflowTaskStartedEventAttributes": {
        "scheduledEventId": "7",
        "identity": "59433@T5FXQ5YV1Y@",
        "requestId": "a0882d43-0218-4f1c-973d-ee0b6a67fc8f",
        "historySizeBytes": "1213"
      }
    },
    {
      "eventId": "9",
      "eventTime": "2024-08-02T20:24:41.320556885Z",
      "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
      "taskId": "13633262",
      "workflowTaskCompletedEventAttributes": {
        "scheduledEventId": "7",
        "startedEventId": "8",
        "identity": "59433@T5FXQ5YV1Y@",
        "workerVersion": {
          "buildId": "18984158064982fa1d18d2dc18288cd4"
        },
        "sdkMetadata": {},
        "meteringMetadata": {}
      }
    }
  ]
}

Call stack for that

Looks like based on this Cancelled workflow waiting for signals suggestion with using a selector and ctx.done i’m able to get it to cancel.

The flow is a bit strange thou because at this point I have to manually handle the cancelation that you receive in the signal and propagate it to the callers. Example:

func (s *Mutex) LockWithCancellation(ctx workflow.Context,
	resourceID string, unlockTimeout time.Duration) (UnlockFunc, error) {

	activityCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
		ScheduleToCloseTimeout: time.Minute,
		RetryPolicy: &temporal.RetryPolicy{
			InitialInterval:    time.Second,
			BackoffCoefficient: 2.0,
			MaximumInterval:    time.Minute,
			MaximumAttempts:    5,
		},
	})

	var releaseLockChannelName string
	var execution workflow.Execution
	err := workflow.ExecuteLocalActivity(activityCtx, SignalWithStartMutexWorkflowActivity, s.lockNamespace,
		resourceID, s.currentWorkflowID, unlockTimeout).Get(ctx, &execution)
	if err != nil {
		return nil, err
	}

	isCanceled := false
	selector := workflow.NewSelector(ctx)
	selector.AddReceive(workflow.GetSignalChannel(ctx, AcquireLockSignalName), func(c workflow.ReceiveChannel, more bool) {
		c.Receive(ctx, &releaseLockChannelName)
		workflow.GetLogger(ctx).Info("release signal received")
	})
	selector.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) {
		isCanceled = true
		workflow.GetLogger(ctx).Info("recieved cancel")
	})
	selector.Select(ctx)

	if !isCanceled {
		unlockFunc := func() error {
			return workflow.SignalExternalWorkflow(ctx, execution.ID, execution.RunID,
				releaseLockChannelName, "releaseLock").Get(ctx, nil)
		}
		return unlockFunc, nil
	}

	return nil, nil
}

in this case, i use isCanceled to identify if a cancel has been propagated. The workfow that references this mutex has tohandle the case with unlock return is nil.

Summary of the questions left:

  • Is there a better way to handle this cancel propagation?
  • And advice on this one
The one concern I have thou is with the async call to cancel the workflow. If the workflow doesn’t cancel between the time request is sent to the mutex getting unlocked, you could potentially run into a race condition.

I could have it trigger an activity afterwards to block till its complete, wonder if the sdk provides a blocking mechanism like that?