I have a workflow where I send a request to an endpoint and returns a job Id. Once the job completes it will send a webhook message to my endpoint and this is where I want to send a signal back to my scheduled workflow along with the jobId to continue with the workflow.
Workflow.ts
// Define a signal to indicate that a job scrape has been completed
const signalJobCompleted = defineSignal<[string]>('signalJobCompleted');
export async function jobManagerWorkflow() : Promise<void> {
const activeJobs : Jobs = {
'1' : false,
'2' : false,
'3' : false,
'4' : false,
'5' : false,
}; // Tracks job completion status
let jobsInitialized = false;
// When a job is completed, update the status in the activeJobs map
setHandler(
signalJobCompleted, (jobId
: string) = > {
console.log(jobId);
if (jobsInitialized && activeJobs[jobId] == = false) {
activeJobs[jobId] = true;
}
});
for (const req of requests) {
const jobs = await createJob(req);
activeJobs[Object.keys(scrapeJob)[0]] = Object.values(scrapeJob)[0];
}
// All jobs have been initialized
jobsInitialized = true;
// Wait for all jobs to complete
while (Object.values(activeJobs).some((status) = > !status)) {
for (const[jobId, isComplete] of Object.entries(activeJobs)) {
if (isComplete) {
await processJobA(jobId); // Process completed jobs
delete activeJobs[jobId]; // Clean up processed jobs
}
}
}
}
Then in my client I specifically defined workflowId
async function run() {
const client = new Client({
connection: await Connection.connect(),
});
// https://typescript.temporal.io/api/classes/client.ScheduleClient#create
const schedule = await client.schedule.create({
action: {
type: 'startWorkflow',
workflowType: jobManagerWorkflow,
args: [],
taskQueue: 'job-queue',
workflowId: 'unique-workflow-id',
},
scheduleId: 'job-schedules',
policies: {
catchupWindow: '1 day',
overlap: ScheduleOverlapPolicy.ALLOW_ALL,
},
spec: {
intervals: [{ every: '10s' }],
// or periodic calendar times:
// calendars: [
// {
// comment: 'every thursday at 8:30pm',
// dayOfWeek: 'THURSDAY',
// hour: 17,
// minute: 37,
// },
// ],
// or a single datetime:
// calendars: [
// {
// comment: '1/1/23 at 9am',
// year: 2023,
// month: 1,
// dayOfMonth: 1,
// hour: 9,
// },
// ],
},
});
await client.connection.close();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
Finally here is my webhook endpoint
import { Connection, Client } from "@temporalio/client";
import { NextRequest, NextResponse } from "next/server";
export async function POST(request: NextRequest) {
const { id, type } = await request.json();
// Establish a connection to Temporal
const connection = await Connection.connect();
const client = new Client();
// Retrieve the workflow handle using the known workflow ID
const workflow = client.workflow.getHandle('unique-workflow-id');
console.log(workflow);
// Send the signal to the workflow
await workflow.signal("signalJobCompleted", id);
return NextResponse.json({ status: 200, message: "OK" });
}
I can see in my console.log that the workflowId is correct but I get
{
client: WorkflowClient {
connection: Connection {
options: [Object],
client: [ServiceClientImpl],
workflowService: [Service],
operatorService: [Service],
healthService: [Service],
callContextStorage: [AsyncLocalStorage],
apiKeyFnRef: {}
},
loadedDataConverter: {
payloadConverter: [DefaultPayloadConverter],
failureConverter: [DefaultFailureConverter],
payloadCodecs: []
},
options: {
dataConverter: [Object],
identity: '123@private-Laptop.local',
namespace: 'default',
interceptors: [],
queryRejectCondition: 'NONE',
connection: [Connection],
loadedDataConverter: [Object]
}
},
workflowId: 'unique-workflow-id',
result: [AsyncFunction: result],
terminate: [AsyncFunction: terminate],
cancel: [AsyncFunction: cancel],
describe: [AsyncFunction: describe],
fetchHistory: [AsyncFunction: fetchHistory],
startUpdate: [AsyncFunction: startUpdate],
executeUpdate: [AsyncFunction: executeUpdate],
getUpdateHandle: [Function: getUpdateHandle],
signal: [AsyncFunction: signal],
query: [AsyncFunction: query]
}
тип Error [WorkflowNotFoundError]: workflow not found for ID: unique-workflow-id
at async POST (apps/web/app/api/webhook/route.ts:15:2)
13 | console.log(workflow);
14 | // Send the signal to the workflow
> 15 | await workflow.signal("signalJobCompleted", id);
| ^
16 |
17 | return NextResponse.json({ status: 200, message: "OK" });
18 | } {
workflowId: 'unique-workflow-id',
runId: undefined
}