my workflow is received multiple signals and each signal is running a sigle activity, i have set workflow to continue as new when wf continueAsNewSuggested is true. i try to send concurrent signal. this my code below
[TMPRL1102] Workflow finished while a signal handler was still running. This may have interrupted work that the signal handler was doing. You can wait for all update and signal handlers to complete by using await workflow.condition(workflow.allHandlersFinished). Alternatively, if both you and the clients sending the update are okay with interrupting running handlers when the workflow finishes, then you can disable this warning by passing an option when setting the handler: workflow.setHandler(mySignal, mySignalHandler, {unfinishedPolicy: HandlerUnfinishedPolicy.ABANDON});. The following signals were unfinished (and warnings were not disabled for their handler): [{“name”:“arrowBill”,“count”:38},{“name”:“billToSyncTransaction”,“count”:89}]
and
WARN temporal_sdk_core::worker::activities: Activity not found on completion. This may happen if the activity has already been cancelled but completed anyway. task_token=TaskToken(CiQzNzU1Y2NkZi1kNDYzLTQ0NTgtYTg3ZC0zZmNiMmZjMTAwMDESDWJpbGwtc3luYy1kZXYaJDM1MzI0ZGU5LWQzMzctNDA3OS04NjM2LWY1NjdmMWQ1MjZiOSC2BigBMgMyMTdCDGluc2VydExlZGdlckoJCAMQ1a6qDhgB) details=Status { code: NotFound, message: “workflow execution already completed”, details: b"\x08\x05\x12$workflow execution already completed\x1aB\n@type.googleapis.com/temporal.api.errordetails.v1.NotFoundFailure", metadata: MetadataMap { headers: {“content-type”: “application/grpc”} }, source: None }
i think this happen caused by any activities is pending but workflow is closed with continue as new. this my capture history workflow before close
that, lengthHistory query used to get length history workflow.
if history length reaches certain i send conditionSignal to close workflow and start with continue as new.
this way better than before because in history workflow i noticed only last child workflow is not finished. but in head information is zero activity pending. this capture history workflow before close continue as new
Can you explain what you mean by “until workflow can’t received a signal anymore”? Do you mean that the workflow is not calling continueAsNew while you continue to send the workflow signals?
If you’re sending the workflow signals faster than the workflow is processing those signals (by calling syncArrowBill and insertLedger in your example), the workflow won’t reach a state where it isn’t currently processing a signal, and allHandlersFinished will continue to return false.
why you need an external signal to decide when to continue as new.
i use external signal to decide continue as new, because in code bellow the workflow is not continue as new if (wf.workflowInfo().continueAsNewSuggested) { await wf.condition(wf.allHandlersFinished); await wf.continueAsNew(); }
how child workflows help here.
I don’t really understand this either, I just separate the execution time to different workflows (child). Actually, is there a difference when receiving a signal to run one activity with starting a child?
That depends on how fast you’re sending signals and how long your signal handlers are taking to run your activities to handle the signal. If no signals arrive for long enough so that all your current signal handlers finish, then I imagine at that point allHandlersFinished would return true because you wouldn’t have any signal handlers running at that point.
If you’re concerned that in production you might never reach the state of not having any signal handlers currently running (you might be continuously sending signals faster than your long-running signal handlers process them), one pattern I’ve seen is at the point that you decide that it’s time to continue-as-new, you start buffering new signals instead of processing them. Then because in your signal handler you’re simply recording the signals you’ve received the signal handler won’t take any time to execute, and you’ll get allHandlersFinished true once all the previous signals have finished being processed. Then when you call continueAsNew you can pass the list of unprocessed signals to the new execution to be processed there. But that’s a complication you only need if you really do expect to be handling signals continuously in production.
I think there was a code example somewhere but I don’t remember where now.
Something like
let draining = false;
let buffered_signals = [];
wf.setHandler(arrowBill, async (kind: string, bill: Bill) => {
if (draining) {
buffered_signals.push({type: "arrowBill", kind, bill});
return;
}
... process the signal as before
}
...
draining = true;
await wf.condition(wf.allHandlersFinished);
await wf.continueAsNew(buffered_signals);
Once we’ve started draining we record which signals we’re received, and then pass that list to the new workflow. The new workflow can then go through that list and process the signals as if it had received them itself.