Non-determinism and Delayed Workflows

I have a workflow that acts as follows (basically a queue like system)

A message gets sent to my API from user A. After receiving the message, I start a timeout of N seconds to wait for any new messages from user A. If user A sends another request within the timeout period N, the timeout will reset, and that new request will be added to the message buffer for that user. Now when that timeout expires, either because only 1 message was sent and the initial timeout expired, or because X messages were sent and the timeout after the Xth message expired, that entire block is passed to start an activity to process those messages.

After that block of messages starts the activity to process the messages, if user A sends any messages after the start of the activity until the end of the activity to process those messages, the new messages are held until the block currently being processed has completed.

I have a specific feature that I need to add a quiet hours period to the workflow. Basically even if N messages are sent within a time period of 3 hours every day (in local time), the messages are held up until the end of that time period. As I understand it, this would impact determinism if the logic is built into the workflow. So that would mean either moving the logic to an activity, adding a signal every day to all workflows to pause during that time period, or stopping the workers every day for that time period.

However for stopping the workers, that doesn’t necessarily work for my use case since that would mean when the worker is restarted it just replays through the events, which would delay processing of the messages into groups just as it would do if the worker is live. After the 3 hour period I would just want all the messages to be processed at once from the queue.

Since I am essentially changing the default timeout depending on the time of day, is the only way to do this make the timeout a signal? and then signal the change in timeout to all workflows every day during that downtime?

Basically even if N messages are sent within a time period of 3 hours every day (in local time), the messages are held up until the end of that time period. As I understand it, this would impact determinism if the logic is built into the workflow.

This should not cause any determinism problems if you use approved APIs for time and timers.

What is the maximum number of messages per workflow you expect in this 3-hour period?

The max number of messages per workflow would be <100 in this 3 hour period.

My queue timeout is 15 minutes by default for processing groups of messages. I want the timeout period to be a specific period of time in CT every single day. So during that time period in CT the timeout would change from 15 minutes to X time until the specific period is over. So if 10 messages were sent with random time gaps in between, the queue would be drained with the difference of time between the last message being received and the end of the timeout period in CT.

If I use workflow.now() would that enable me on each signal check the current workflow time and if that time is in between the period of time in CT, change the timeout? Or would it be better to just signal the timeout period from outside of the workflow?

import asyncio
from datetime import timedelta, datetime, time
from typing import List
from temporalio import workflow
from temporalio.common import RetryPolicy
from zoneinfo import ZoneInfo

from messaging.message_processing_activities import send_messages_to_api
from messaging.input_models import UserMessage, UserMessageBatch

ESTIMATED_MESSAGE_PROCESSING_TIME = 500  
TIMEOUT_FOR_NEW_MESSAGES = 900  # seconds
CT_TIMEZONE = ZoneInfo("America/Chicago")
DAILY_TIMEOUT_START = time(3, 0) 
DAILY_TIMEOUT_END = time(23, 0)  

@workflow.defn
class MessageProcessingWorkflow:
    def __init__(self) -> None:
        self._pending_user_messages: asyncio.Queue[UserMessage] = asyncio.Queue()
        self.user_id = None
        self.trainer_id = None
        self._exit = False
        self._last_message_time = None

    @workflow.run
    async def run(self) -> None:
        while True:
            # Wait for messages or exit signal
            await workflow.wait_condition(
                lambda: not self._pending_user_messages.empty() or self._exit,
                timeout=None,
            )

            # Check if the workflow should exit
            if self._exit:
                return

            # Get the current workflow time in CT
            current_time_ct = workflow.now().astimezone(CT_TIMEZONE)

            if current_time_ct.time() >= DAILY_TIMEOUT_START and current_time_ct.time() < DAILY_TIMEOUT_END:
                # Calculate the time until time out end
                if current_time_ct.time() >= DAILY_TIMEOUT_START:
                    next_processing_time = datetime.combine(current_time_ct.date() + timedelta(days=1), DAILY_TIMEOUT_END, tzinfo=CT_TIMEZONE)
                else:
                    next_processing_time = datetime.combine(current_time_ct.date(), DAILY_TIMEOUT_END, tzinfo=CT_TIMEZONE)
                remaining_time = next_processing_time - current_time_ct
                await asyncio.sleep(remaining_time.total_seconds())
            else:
                # Wait for X seconds after the last message
                if self._last_message_time is not None:
                    remaining_time = timedelta(seconds=TIMEOUT_FOR_NEW_MESSAGES) - (workflow.now() - self._last_message_time)
                    if remaining_time > timedelta(seconds=0):
                        await asyncio.sleep(remaining_time.total_seconds())

            # Drain and process queue
            messages: List[str] = []
            timestamp: int = int(workflow.now().timestamp())
            while not self._pending_user_messages.empty():
                batched_message = self._pending_user_messages.get_nowait()
                messages.append(batched_message.message)
                timestamp = batched_message.timestamp

            # Submit messages to activity if any
            if messages:
                user_message_batch = UserMessageBatch(
                    user_id=self.user_id,
                    trainer_id=self.trainer_id,
                    messages=messages,
                    timestamp=timestamp
                )
                await workflow.execute_activity(
                    send_messages_to_api,
                    user_message_batch,
                    schedule_to_close_timeout=timedelta(seconds=ESTIMATED_MESSAGE_PROCESSING_TIME),
                    retry_policy=RetryPolicy

So something like this ^ is what I was thinking of versus adding a signal

@workflow.signal
    def update_timeout(self, timeout: int) -> None:
        self._timeout_for_new_messages = timeout

Your code looks reasonable. I would not rely on an external timer for this.

Okay so since I am using workflow.now() and assessing the time produced by that call, the workflow remains deterministic and I can execute time based logic in this way?

Yes. Non determinism is usually caused by external APIs like time and random. That’s why Temporal provides the workflow safe equivalents.

1 Like

Is there a workflow safe equivalent for random? Like if I wanted to change the TIMEOUT_FOR_NEW_MESSAGES to be a random value within a range of values would that be possible?

There is, workflow.random() returns a deterministic random.Random instance.

1 Like

Ah, missed this. So if I use workflow.random() to get a deterministic random.Random instance and then produce a random integer, what is that relationship across workflow runs? Will it produce the same random integer per unique workflow?

For example in the wf above if I set the TIMEOUT_FOR_NEW_MESSAGES = workflow.random().randint(15,20), the value will be the same through the lifecycle of the workflow right?

It is basically seeded with the workflow run identifier. So it will be the same for the life of the workflow run (i.e. deterministic and safe for replay), but likely different for different runs.