Pending activities when continue as new

my workflow is received multiple signals and each signal is running a sigle activity, i have set workflow to continue as new when wf continueAsNewSuggested is true. i try to send concurrent signal. this my code below

try {
    let isClose = false;
    wf.setHandler(arrowBill, async (kind: string, bill: Bill) => {
      try {
        await syncArrowBill(kind, bill);
      } catch (error) {
        await wf.continueAsNew();
      }
    });

    wf.setHandler(billToSyncTransaction, async (billReference: string) => {
      try {
        await insertLedger(billReference);
      } catch (error) {
        await wf.continueAsNew();
      }
    });

    if (wf.workflowInfo().continueAsNewSuggested) {
      await wf.continueAsNew();
    }

    await wf.condition(() => isClose);
  } catch (error) {
    return { error: 'Failed to handle workflow' };
  }

but temporal show warn

[TMPRL1102] Workflow finished while a signal handler was still running. This may have interrupted work that the signal handler was doing. You can wait for all update and signal handlers to complete by using await workflow.condition(workflow.allHandlersFinished). Alternatively, if both you and the clients sending the update are okay with interrupting running handlers when the workflow finishes, then you can disable this warning by passing an option when setting the handler: workflow.setHandler(mySignal, mySignalHandler, {unfinishedPolicy: HandlerUnfinishedPolicy.ABANDON});. The following signals were unfinished (and warnings were not disabled for their handler): [{“name”:“arrowBill”,“count”:38},{“name”:“billToSyncTransaction”,“count”:89}]
and
WARN temporal_sdk_core::worker::activities: Activity not found on completion. This may happen if the activity has already been cancelled but completed anyway. task_token=TaskToken(CiQzNzU1Y2NkZi1kNDYzLTQ0NTgtYTg3ZC0zZmNiMmZjMTAwMDESDWJpbGwtc3luYy1kZXYaJDM1MzI0ZGU5LWQzMzctNDA3OS04NjM2LWY1NjdmMWQ1MjZiOSC2BigBMgMyMTdCDGluc2VydExlZGdlckoJCAMQ1a6qDhgB) details=Status { code: NotFound, message: “workflow execution already completed”, details: b"\x08\x05\x12$workflow execution already completed\x1aB\n@type.googleapis.com/temporal.api.errordetails.v1.NotFoundFailure", metadata: MetadataMap { headers: {“content-type”: “application/grpc”} }, source: None }

i think this happen caused by any activities is pending but workflow is closed with continue as new. this my capture history workflow before close

then, i updated code in line follow

if (wf.workflowInfo().continueAsNewSuggested) {
      await wf.continueAsNew();
    }

to

if (wf.workflowInfo().continueAsNewSuggested) {
      await wf.condition(wf.allHandlersFinished);
      await wf.continueAsNew();
    }

then, i try again to concurrent send signal but workflow not continue as new until workflow can’t received a signal anymore.

my question is:

  1. what is best way to handle concurrent signals at once?
  2. what is best way to close e workflow with continue as new?

after several tries i decided to use a child workflow. this my code bellow

try {
    let isClose = false;
    wf.setHandler(arrowBill, async (kind: string, bill: Bill) => {
      try {
        await wf.startChild(childArrowBill, {
          workflowId: `childArrowBill-${kind}-${bill.reference}`,
          args: [kind, bill],
          parentClosePolicy: 'ABANDON',
        });
      } catch (error) {
        await wf.continueAsNew();
      }
    });

    wf.setHandler(billToSyncTransaction, async (billReference: string) => {
      try {
        await wf.startChild(childInsertLedger, {
          workflowId: `childInsertLedger-${billReference}`,
          args: [billReference],
          parentClosePolicy: 'ABANDON',
        });
      } catch (error) {
        await wf.continueAsNew();
      }
    });

    wf.setHandler(lengthHistory, () => {
      return wf.workflowInfo().historyLength;
    });

    wf.setHandler(conditionSignal, async (condition: boolean) => {
      if (condition) {
        await wf.continueAsNew();
      }
    });

    await wf.condition(() => isClose);
  } catch (error) {
    return { error: 'Failed to handle workflow' };
  }

that, lengthHistory query used to get length history workflow.
if history length reaches certain i send conditionSignal to close workflow and start with continue as new.
this way better than before because in history workflow i noticed only last child workflow is not finished. but in head information is zero activity pending. this capture history workflow before close continue as new

my question more

  1. this correct way to handle signal?
  2. my last child workflow not process this is behavior from temporal?
  3. i think to create batch system after received signal, this possible and any reference to learn more that?

I’m not sure how child workflows help here and why you need an external signal to decide when to continue as new.

  1. what is best way to handle concurrent signals at once?

Invoke their handling logic from the handler. I’m not sure I understand the question here.

  1. what is best way to close e workflow with continue as new?

This looks correct to me:

 if (wf.workflowInfo().continueAsNewSuggested) {
       await wf.condition(wf.allHandlersFinished);
      await wf.continueAsNew();
 }

Can you explain what you mean by “until workflow can’t received a signal anymore”? Do you mean that the workflow is not calling continueAsNew while you continue to send the workflow signals?

If you’re sending the workflow signals faster than the workflow is processing those signals (by calling syncArrowBill and insertLedger in your example), the workflow won’t reach a state where it isn’t currently processing a signal, and allHandlersFinished will continue to return false.

  1. why you need an external signal to decide when to continue as new.
    i use external signal to decide continue as new, because in code bellow the workflow is not continue as new
    if (wf.workflowInfo().continueAsNewSuggested) { await wf.condition(wf.allHandlersFinished); await wf.continueAsNew(); }

  2. how child workflows help here.
    I don’t really understand this either, I just separate the execution time to different workflows (child). Actually, is there a difference when receiving a signal to run one activity with starting a child?

correct, the workflow is not continueAsNew. i see, but if the signal received slows down at some point, will allHandlersFinished return true?

in my case, how to safe way continueAsNew?

That depends on how fast you’re sending signals and how long your signal handlers are taking to run your activities to handle the signal. If no signals arrive for long enough so that all your current signal handlers finish, then I imagine at that point allHandlersFinished would return true because you wouldn’t have any signal handlers running at that point.

If you’re concerned that in production you might never reach the state of not having any signal handlers currently running (you might be continuously sending signals faster than your long-running signal handlers process them), one pattern I’ve seen is at the point that you decide that it’s time to continue-as-new, you start buffering new signals instead of processing them. Then because in your signal handler you’re simply recording the signals you’ve received the signal handler won’t take any time to execute, and you’ll get allHandlersFinished true once all the previous signals have finished being processed. Then when you call continueAsNew you can pass the list of unprocessed signals to the new execution to be processed there. But that’s a complication you only need if you really do expect to be handling signals continuously in production.

can you provide a code sample or reference how to do buffering and process it in new workflow after receiving signal?

I think there was a code example somewhere but I don’t remember where now.

Something like

let draining = false;
let buffered_signals = [];

wf.setHandler(arrowBill, async (kind: string, bill: Bill) => {
  if (draining) {
    buffered_signals.push({type: "arrowBill", kind, bill});
    return;
  }
  ... process the signal as before
}

...
draining = true;
await wf.condition(wf.allHandlersFinished);
await wf.continueAsNew(buffered_signals);

Once we’ve started draining we record which signals we’re received, and then pass that list to the new workflow. The new workflow can then go through that list and process the signals as if it had received them itself.

thanks, i will try it