Understanding worker.run(), worker.getState() and worker.shutDown() better (Express app and k8s probes)

Im not sure I fully understand the behaviour of an instance of the worker, worker.run() etc

We deploy the worker as an app via k8s. Currently the application just initializes the worker and we have an Express server running too to expose a /health endpoint.

The Worker has been working fine and polling etc so no problems there. However, it was only until I wanted to use the /health endpoint calling worker.getState() (for k8s liveness / readiness probes) that I realised that none of the Express server logs would show after the worker was initalized and .run() was called (locally or deployed). Also, killing the app locally didn’t necessarily kill the worker process and I would have to force a SIGTERM.

Inital Setup Basic Example:

export const initExpressApp = () => {
  const app = express();

  app.get('/', (req, res) => {
    // this is where I wanted to add worker.getState()
    res.send({ message: 'Worker app running' });
  });

  return app;
};

export const initServer = (
  app: Express,
  config: {
    healthCheckHost: string;
    healthCheckPort: number;
  },
) => {
  return app.listen(config.healthCheckPort, config.healthCheckHost, () => {
    // Never see this log!!
    console.log(`[ ready ] http://${config.healthCheckHost}:${config.healthCheckPort}`);
  });
};

const initApp = async () => {
  // get config here etc

  // init the the Worker, worker.run() etc
  // this seems to be the only logs that show - what is going on here?
  // I have some ideas but not sure how to confirm...
  await initWorker();

  // not sure this stuff gets done; the `/health` endpoint is flaky or didn't return the `getState()` when I added it until I killed the app and will then return `SHUTDOWN` (the worker is still running and able to return the state...)
  const app = initExpressApp();
  const server = initServer(app, config);

  process.on('SIGTERM', () => {
    // doesnt seem to kill the worker
    logger.error('SIGTERM signal received: stopping workflow-workers');
    server.close();
    process.exit(0);
  });
};

// init App
initApp().catch((error) => {
  console.log('Error initializing app', error);
  process.exit(1);
});

When I finally moved the code around a bit and called worker.run() after the Express server set up that it sort of worked as expected (although worker.shutdown() on app shutdown still doesn’t seem to work as I would expect).

Changed Setup Basic Example:

// is this correct?
type HealthyState = Extract<State, 'INITIALIZED' | 'RUNNING'>;

function isHealthyState(state: State): state is HealthyState {
  return state === 'INITIALIZED' || state === 'RUNNING';
}

export const initExpressApp = (worker: Worker, logger: Logger) => {
  const app = express();

  app.get('/health', (req, res) => {
    // is worker.getState() the right thing to even use?
    logger.debug('/health - Worker State', { workerState: worker.getState() });
    if (isHealthyState(worker.getState())) {
      res.send({ message: 'Worker app running' });
    } else {
      res.status(500).send({ message: 'Worker app running, but Temporal Worker is not healthy' });
    }
  });

  return app;
};

export const initServer = (
  app: Express,
  config: {
    healthCheckHost: string;
    healthCheckPort: number;
  },
) => {
  return app.listen(config.healthCheckPort, config.healthCheckHost, () => {
    // see this log now!
    console.log(`[ ready ] http://${config.healthCheckHost}:${config.healthCheckPort}`);
  });
};

// init Temporal Workers (worker, workflows, activities)
const initApp = async () => {
  // get config etc

  // this creates an instance of Worker and then returns it
  const worker = await initOnboardingWorker();

  // can see these logs and hit the `/health` endpoint
  const app = initExpressApp(worker, logger);
  const server = initServer(app, config);

  //Step 2: Start accepting tasks on the queue
  logger.info('Starting Worker...');
  await worker.run();

  process.on('SIGTERM', () => {
    logger.error('SIGTERM signal received: stopping workflow-workers');
    server.close();
    worker.shutdown(); // this still doesn't seem to work...
    process.exit(0);
  });
};

// init App
initApp().catch((error) => {
  console.log('Error initializing app', error);
  process.exit(1);
});

This currently seems to work in terms of the probing and pod seems to be ok in a dev deploy. Still doesn’t mean its the right thing to use…

So my questions are:

  • where is my gap here of understanding the worker.run() and that this is doing and why my initial setup didn’t work? Once you call worker.run(), what happens? Why did I not see any of the Express server logs? Is worker.run blocking? I don’t quite get it.
  • is worker.getState() the right thing to use for a /health endpoint in k8s probes?
  • does worker.shutdown() actually stop the worker or just stop polling? What is a graceful way to shutdown the app otherwise?

Any other tips would be welcome. I may have some misunderstanding here that I would like to uncover. Thanks.

This looks a lot like our load test worker setup: sdk-typescript/packages/test/src/load/worker.ts at 840fc3649690c6fc799e1f31ec7a8a31b334d30c · temporalio/sdk-typescript · GitHub

Your intuition is correct here, the worker is considered healthy if its state is not FAILED though, there are a few shutdown stages that I would consider healthy (DRAINING, SHUTDOWN - this one can be ignored if you exit the program once the worker is shut down).

worker.run() blocks until the worker is shut down, and by default any of the configured shutdown signals on the Runtime object will cause your worker to shutdown (you can set this to an empty array to handle shutdown yourself via Runtime.install()).

Shutdown stops polling and waits for running activities and workflow to drain for a configurable duration, you can also force shutdown the worker with shutdownForceTime.

In your example, process.on will never be called.

Thanks @bergundy

Ill stick with the failed states for now and can revisit if I need to.

Makes sense with worker.run() now being blocking.

However, I don’t see the default shutdown signals as being acknowledged by the worker. I moved the process.on below the worker.run() and added for both SIGTERM and SIGINT to work as expected…

However, I don’t see the default shutdown signals as being acknowledged by the worker.

I’m not sure what you mean, maybe you have some code that clears our signal handlers.
You should be able to remove the process.on call and when the signal is received the worker will start draining.