Any way to get notified or execute callback for change in WorkflowExecutionStatus?

I’m building a “job status” system that will allow users to see the status (and potentially results or errors) of long-running tasks that we execute via temporal.

Roughly, I’m thinking I’ll keep track of each customer’s jobs by saving the workflowId/runId to the database when a job is started.

In non-exceptional cases (a job completes successfully or a known error state occurs) I can easily write the success/failure status back to the database from within a final activity in temporal.

However, I’m struggling to find a way to capture WorkflowExecutionStatus for things like cancelled/terminated/timedout, or even exceptional failures (unless I wrap all of my workflows in try-catch).

Ideally I’d be able to always execute some callback when WorkflowExecutionStatus changes for specific workflow types.

If that doesn’t exist, the only other thing I can think of is creating a cron workflow that occasionally scans my database for “running” jobs and uses DescribeWorkflowExecution to query the WorkflowExecutionStatus and write it back to the database if it’s in a terminal state.

I would recommend not using termination and workflow timeouts for business-level features. For all other scenarios, you can write an interceptor that stores the status of a workflow in your DB.

Thanks!

Re: termination – I don’t use termination as part of the workflow, but am just trying to guard against edge cases such as, an engineer has to terminate a workflow via the temporal UI due to some unforeseen bug. In this case I’d like my job status system to eventually reflect that it was terminated.

Re: workflow timeouts – Do you also suggest not using activity timeouts? For some types of jobs it feels strange to allow infinite time. One example would be bulk importing data from an external system. I’ve seen cases where a connection to the external system hung, etc. Ideally we implement the workflow in a way that a connection timeout causes this not to be possible and throws an appropriate error that fails the workflow, but having a sensible activity timeout has saved me from having a workflow that runs forever previously.

For termination, you can also implement a process that updates the job status.

Do you also suggest not using activity timeouts?

No. You must use activity timeouts, as activities use retries to deal with failures. Timeouts are the only way to detect process crashes and other edge cases. So use as short as possible a StartToClose timeout and use heartbeating with a HeartbeatTimeout for activities that can take a long time.

Thanks again! Last question (I think) – which interceptor method should I implement to handle workflow cancellation due to activity timeout / no more activity retries?

When Temporal talks about workflow cancellation it is always in the context of an external request to cancel workflow.

When a workflow doesn’t handle activity failure (after retries are exhausted), it is considered failed and not canceled.

The specific interceptor method depends on the SDK.

Sorry, I’m specifically asking about the typescript SDK.

Is an interceptor able to handle failed workflows due to activity failure (after retries are exhausted)?

Any workflow failure is reported as an exception. So, the interceptor would need to handle the exception from the main workflow method. Implement WorkflowInboundCallsInterceptor.execute for this.

@maxim – Two more questions:

  1. In the typescript SDK, is it possible to get runId from the handle returned by client.start()? I see firstExecutionRunId is available but not runId. Perhaps my question is irrelevant and they will always be the same upon client.start()?

  2. ContinueAsNew seems to surface as an exception in WorkflowInboundCallsInterceptor.execute and I can’t seem to figure out how to distinguish it from a real failure by any of the properties in the exception. Any ideas?

Hi @Shawn_Jones

To your first question, please see this comment in code:

    // runId is not used in handles created with `start*` calls because these
    // handles should allow interacting with the workflow if it continues as new.

for 2, I think this will work


      async execute(input: WorkflowExecuteInput, next) {
        logger.debug('Workflow started');
        const p = next(input).then(
            (res) => {
              logger.debug('Workflow completed');
              return res;
            },
            (error) => {
              if (typeof error === 'object' && error != null) {
                if (error.name === 'ContinueAsNew') {
                  logger.debug('Workflow continued as new');
                  throw error;
                }
              }
              logger.debug('Workflow failed');
              throw error;
            }
        );
        return p;
      },

Antonio

Thanks! Any idea why the temporal logger doesn’t log the error here?

import { log } from '@temporalio/workflow'

// within WorkflowStatusInterceptor.execute()
try {
        const ret = await next(input)
        return ret
} catch (e) {
        log.error('banana', e)
        log.error(e.name)
        log.error((e instanceof Error).toString())
}

produces:

2024-08-06T20:43:23.561Z [ERROR] banana {
  sdkComponent: 'workflow',
  taskQueue: 'my-queue',
  namespace: 'default',
  workflowId: my-wf-id',
  runId: '1b4076e7-a5a7-4ac0-9e8c-1acedb9401a3',
  workflowType: 'my-wf-type',
  command: {
    workflowType: 'my-wf-type',
    arguments: [ [Object] ],
    headers: {},
    taskQueue: 'my-queue',
    memo: undefined,
    searchAttributes: undefined,
    workflowRunTimeout: undefined,
    workflowTaskTimeout: undefined,
    versioningIntent: 0
  }
}

2024-08-06T20:43:23.561Z [ERROR] ContinueAsNew {
  sdkComponent: 'workflow',
  taskQueue: 'my-queue',
  namespace: 'default',
  workflowId: my-wf-id',
  runId: '1b4076e7-a5a7-4ac0-9e8c-1acedb9401a3',
  workflowType: 'my-wf-type'
}

2024-08-06T20:43:23.561Z [ERROR] true {
  sdkComponent: 'workflow',
  taskQueue: 'my-queue',
  namespace: 'default',
  workflowId: my-wf-id',
  runId: '1b4076e7-a5a7-4ac0-9e8c-1acedb9401a3',
  workflowType: 'my-wf-type'
}

@Shawn_Jones

sorry I don’t get the question, what error does not log?

What do you expect to see in logs?

Thanks

I was expecting to see the ContinueAsNew error in the first log. Somewhere in that log should be the error name: ContinueAsNew along with whatever other properties a ContinueAsNew error has (since the second log, log.error(e.name) logs ContinueAsNew).

@antonio.perez does this make sense? I can file a bug in github if that’s a better place, but it seems like logging an error only logs the temporal context and not any properties of the error itself.

Hi @Shawn_Jones

not sure if it should be printed, I would have to debug how the object is created.

but it seems like logging an error only logs the temporal context and not any properties of the error itself.

Feel free to open an issue, please

Antonio

Thank you sir! Issue opened here: [Bug] log.error doesn't log the error · Issue #1491 · temporalio/sdk-typescript · GitHub

1 Like

@antonio.perez How can I handle workflows that are cancelled (not terminated) via the temporal UI in an interceptor? I assumed this implementation would mark it as failed but it doesn’t (it stays in RUNNING).

export class WorkflowStatusInterceptor
  implements WorkflowInboundCallsInterceptor
{
  constructor() {}

  public async execute(
    input: WorkflowExecuteInput,
    next: Next<WorkflowInboundCallsInterceptor, 'execute'>
  ): Promise<unknown> {
      try {
        // update status to RUNNING
        const ret = await next(input)
        // update status to COMPLETED
        return ret
      } catch (error) {
        if (typeof error === 'object' && error !== null) {
          if (error.name === 'ContinueAsNew') {
            // update status to CONTINUEDASNEW
            throw error
          }
        }
        // update status to FAILED
        throw error
      }
    }

  }
}

@maxim or any other temporal folks, do you have any recommendations for how to capture “cancelled” (not terminated) workflows via my interceptor?

bump, hoping for insight from any of the temporal folks :slight_smile:

I’m not sure what is exactly wrong with your code. You absolutely can catch and handle the cancellation exception in an interceptor. If workflow keeps running it is usually from throwing an unexpected exception type instead of the Temporal exception.