Postgres Table as Event Source

Still new to Temporal. I have a table in my postgresql db that represents ‘tickets’.

model Ticket{
  id              String          @id @default(uuid())
  ticket_date    DateTime
  status          TICKET_STATUS   @default(PENDING)
  @@map("ticket")
}

I want to use temporal to process pending tickets. I have already built the activities and a workflow that can process a single ticket, updating it’s status on the db to ‘DONE’.

What I can’t quite get my head around is how to handle the event sourcing. Do I

a) Have a non-temporal process that reads ‘Pending’ tickets, updates the status to PROCESSING and uses a client to trigger an instance of the processing workflow

b) have a ‘runs forever’ temporal workflow (maybe on a seperate queue) that reads ‘Pending’ tickets, updates the status to PROCESSING and uses a client to trigger an instance of the processing workflow.

c) make use of signals somehow… I don’t really get this one

d) some other thing.

I guess the design constraint is that whatever is dealing initially with the tickets table, needs to be singular and always on. If there are two, then they will get all tangled and I might need to mess about with mutexes which I want to avoid. if there is none, then my tickets won’t get processed. If it’s a workflow, how do I ‘trigger’ the one instance? can I just configure temporal to say … always have one of these running?

Why do you want to keep the database table? It is not needed for individual ticket processing. You might need it for indexing across tickets and for keeping historical data.

The main design principle with Tempoal is that DB is updated by workflows, but almost never read by them to decide what to do next. So, workflows are source of truth and DB is a mirror used for querying only.

What business problem are you trying to solve by “event sourcing”?

hi @maxim , great callout. The table is part of an existing application so this is something of a retrofit. I get that Ideally, my application would submit a workflow instance directly instead of inserting a row into a table but that obviously requires the temporal instance to be available to the rest of my application. For the moment, this is not the case and I am self hosting temporal locally and my web-application is hosted in the cloud on netlify… so they can’t talk. the Database is also cloud hosted so it’s accessible to both.

I’ve thought about it some more and I think I will have an ‘event source’ workflow which just runs a while true loop. checks the table, updates to processing and kicks off a processing workflow for each ticket. I’m intending to use WorkflowIdReusePolicy.REJECT_DUPLICATE to prevent multiple instances of the event source workflow

Consider using an activity to poll the table: What is the best practice for a polling activity? - #2 by maxim

1 Like

@maxim thans for your help. FWIW, this is what i’m going with (typescript)

export async function pollForTicket(): Promise<void> {
  const { getPendingTickets } = proxyActivities<typeof activities>({
    retry: {
      initialInterval: '1 minute',
      maximumInterval: '1 minute',
      backoffCoefficient: 1,  //spin forever (maximumAttempts not set + no exponential backoff)
      nonRetryableErrorTypes: ['InvalidConfigurationError']
    },
    startToCloseTimeout: '1 minute'
  });

  const { markTicketAsProcessing } = proxyActivities<typeof activities>({
    retry: {
      initialInterval: '1 second',
      maximumInterval: '10 minutes',
      backoffCoefficient: 2,
      maximumAttempts: 500,
      nonRetryableErrorTypes: ['InvalidConfigurationError']
    },
    startToCloseTimeout: '1 minute'
  });

  while (true) {
    // because of the retry config, this will retry forever.  it throws if it finds no tickets.
    const pendingTickets = await getPendingTickets();

    for (const ticket of pendingTickets) {
      let markTicketAsProcessingResult: boolean;
      try {
        markTicketAsProcessingResult = await markTicketAsProcessing(ticket);
      } catch (markProcessingErr) {
        continue;
      }
      if (!markTicketAsProcessingResult) {
        continue;
      }

      const handle = await startChild(ticketProcessingActivity, {
        args: [ticket],
        taskQueue: taskQueueName,
        workflowId: `process-ticket-${ticket.id}`
      });

      log.info(
        `Started Workflow ${handle.workflowId} with RunID ${handle.firstExecutionRunId} for ticket ${ticket.id}`
      );
    }

    log.info('Waiting 1 minute before looking for new tickets...');
    await sleep('1 minute');
  }
}
2 Likes