Newbie: Is there Native Temporal Solution for Enforced Hierarchy Task Polling?

I’m a Temporal Newbie that suspects that I might be reinventing the wheel with an overly complex solution.
Question: Is there a Temporal native way to enforce a hierarchy of multiple TaskQueues: Forced, high, medium, and low?
As example, ALL ‘forced’ priority tasks must complete before ANY ‘high’ priority tasks can be executed…and so on with medium and low priority.
Is there a more Temporal-friendly way to accomplish the below with less code? Any push in the right direction is appreciated as I’m still deep in the learning curve of Temporal: It still feels heavy and complex.

import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.exceptions import WorkflowAlreadyStartedError
from app.debug.process_prompt.llm_workflow import PromptWorkflow, PriorityLevel, TaskQueueManager
from app.debug.process_prompt.llm_activities import generate_llm_completion
from app.debug.process_prompt.prompt_config import PROMPTS


workflow_logger = logging.getLogger("PromptWorkflow")
logging.basicConfig(level=logging.INFO)

async def execute_workflow(client, workflow_id, prompts, priority):
    """
    Execute a single workflow with retry logic for already started workflows.
    
    This function attempts to execute a workflow and handles the case where
    the workflow is already running by terminating it and retrying.
    """
    while True:
        try:
            # Execute the workflow using Temporal
            result = await client.execute_workflow(
                "PromptWorkflow",
                {"prompts": prompts},
                id=workflow_id,
                task_queue=TaskQueueManager.route_task(PriorityLevel(priority)),
            )
            workflow_logger.info(f"Workflow {workflow_id} results: {result}")
            return result
        
        except WorkflowAlreadyStartedError:
            # If the workflow is already running, terminate it and retry
            workflow_logger.info(f"Workflow {workflow_id} already exists. Cancelling and restarting.")
            await client.get_workflow_handle(workflow_id).terminate()
            await asyncio.sleep(2)  # Wait before retrying
        except Exception as e:
            workflow_logger.error(f"Unexpected error in workflow {workflow_id}: {e}")
            return None

async def execute_priority_group(client, prompts, priority):
    """
    Execute a group of prompts with the same priority concurrently.
    
    This function uses asyncio.TaskGroup to run multiple workflows
    in parallel for prompts with the same priority level.
    """
    async with asyncio.TaskGroup() as tg:
        for i, prompt_set in enumerate(prompts):
            workflow_id = f"{priority}-workflow-id-{i}"
            tg.create_task(execute_workflow(client, workflow_id, [prompt_set], priority))

async def execute_prompts_by_priority(client, prompts):
    """
    Execute prompts grouped by their priority levels.
    
    This function organizes prompts into priority groups and executes them
    sequentially, ensuring higher priority prompts are processed first.
    """
    # Group prompts by priority
    priority_groups = {level: [] for level in PriorityLevel}
    for prompt in prompts:
        priority_str = prompt.get("priority", "medium").lower()
        priority = PriorityLevel(priority_str)
        priority_groups[priority].append(prompt)
    
    # Execute prompts for each priority level
    for priority in PriorityLevel:
        if priority_groups[priority]:
            workflow_logger.info(f"Executing {priority.value} priority prompts")
            await execute_priority_group(client, priority_groups[priority], priority.value)
        
        # Wait for all workflows in this priority group to complete before moving to the next
        handles = client.list_workflows(query=f"WorkflowType='PromptWorkflow' AND TaskQueue='{TaskQueueManager.route_task(priority)}'")
        async for handle in handles:
            try:
                await handle.result()
            except Exception as e:
                workflow_logger.error(f"Error in workflow {handle.id}: {e}")

# TODO Check if Retry Logic Is Working/Implemented
# TODO Is LLM Logic Returning everything properly
# TODO ask community about Python Debugger and other issues
# TODO Attempt to add a ArangoDB Insert in workflow.
# TODO With LLM Tool Calls, try Recursion with a Temporal child workflow over an activity

async def main():
    """
    Main function to set up and run the workflow execution process.
    
    This function initializes the Temporal client, sets up workers for each
    priority level, and executes the prompts using the priority-based system.
    """
    # Connect to the Temporal server
    client = await Client.connect("localhost:7233")
    
    # Create workers for each priority level
    workers = {
        priority: Worker(
            client,
            task_queue=TaskQueueManager.route_task(priority),
            workflows=[PromptWorkflow],
            activities=[generate_llm_completion]
        ) for priority in PriorityLevel
    }

    async with asyncio.TaskGroup() as tg:
        # Start all workers concurrently
        for worker in workers.values():
            tg.create_task(worker.run())

        workflow_logger.info("All task workers started")

        try:
            # Execute prompts based on their priority
            await execute_prompts_by_priority(client, PROMPTS)
        except* asyncio.ExceptionGroup as eg:
            # Handle exceptions from the TaskGroup
            for exc in eg.exceptions:
                workflow_logger.error(f"Error in TaskGroup: {exc}")
        except* Exception as e:
            # Handle any unexpected errors
            workflow_logger.error(f"Unexpected error: {e}")


if __name__ == "__main__":
    asyncio.run(main())
PROMPTS for reference:
[ {
        "service": "anthropic",
        "prompt": "What is the weather in London? Provide an answer only in JSON format",
        "tools": ["get_weather"],
        "model": "claude-3-5-sonnet-20240620",
        "json_mode": True,
        "priority": "high"
    },
    {
        "service": "groq",
        "prompt": "What is the weather in Dublin? Provide an answer only in JSON format",
        "tools": ["all_tools"],
        "model": "mixtral-8x7b-32768",
        "json_mode": True,
        "priority": "high"
    },
...
]

When you say “all” higher priority tasks must complete before lower priority tasks, to you mean globally, across all client requests? It looks like right now you’re ordering the tasks in the client, which is fine, but would mean the tasks would be ordered within a particular client request, but not globally.

I also notice that you’re running workflows in different Temporal task queues, do you have a reason to do that?

Thanks for your response:) in my case, I would prefer that priority would be global as other long running tasks will be occurring. Mostly long running machine learning tasks.
The code I provided is an isolated example so I can learn how to integrate/use Temporal.

As far as code/design reasons, I’m still struggling a bit with Temporal. Trying to determine if a recursive LLM call (for multiple tool calls and error correction should be a child workflow or an activity, or something else.

Upshot: If a temporal native hierarchical task queue and/or a temporal-friendly workflow design pattern (for a large number of async LLM calls with LLM call recursion) exists, I’d be much appreciative of a similar preexisting temporal example and/or pseudo-code.

Hopefully, I’ll get over the learning curve soon.

OK, if you want multiple workflows to coordinate, one way to do that is to add a coordination workflow that the other workflows send signals to. Another way is to give them an external system to coordinate with.

The potential issue with a coordination workflow is that a single workflow instance can only process a few events per second. So you’d want to be confident that the coordination workflow wouldn’t become a bottleneck.

Using an external system, you could for example use a database to store how many executions are currently being performed at each priority level. When a workflow started an execution, it could call an activity to increment the count in the database for that priority level, and then call an activity to decrement the count when it finished the execution.

When a workflow was ready to start an execution, it could poll the database to wait until the higher priority counts were zero. You could have the activity that reads the counts from the database fail if it they weren’t zero. That way you could use a retry policy to automatically retry until the counts did reach zero.

Note this doesn’t absolutely prevent a lower priority task from starting after a higher priority task starts. It would be possible for a workflow to read a count of zero, and then for another workflow to start a higher priority task, and then for the first workflow to start executing the lower priority task. This design would be more for limiting the runs of lower priority tasks. We’d need a different design if you need to always prevent the execution of a lower priority task while a higher priority task is running.

Making LLM calls would be done in an activity.

Whether to perform the workflow logic in one workflow or to divide it up into child workflows is largely a matter of convenience. For example, workflows are listed in the Temporal UI, so you might want to use child workflows if being able to see their execution independently might be useful, or you might prefer not to use child workflows if they might simply clutter the UI.

What is LLM call recursion?

Thank you for the detailed reply.
Llm recursion is a llm call function that keeps calling itself until all potential tools are called and the answer is deemed correct.

Instead of a single question answer, the function may call the llm 5-10 times if necessary–a cutoff iteration point is enforced here
So basically, a single LLM call might turn into 5-10 calls if necessary

Well, what makes a workflow useful is when you have logic that needs to be executed reliably, even when servers crash or you have other kinds of failures.

You could have a single activity that does the entire recursive llm call function.

Then, if the activity worker might crash in the middle, Temporal would run the entire activity again.

Or, if perhaps the calls are expensive enough or time consuming enough that you wouldn’t want to repeat calls that had successfully completed, it might be useful to have a workflow execute each call in a separate activity so that they can be retried individually.

Temporal is a solution for reliable execution in the presence of failures. So one question is, is this a problem you have, that needs solving? What failures, if any, do you anticipate, that you want to ensure are handled?

Your original question was whether Temporal had a native or built-in solution for task prioritization. I think the answer to that is not really? In Temporal, you implement the logic of your system in workflows, and that could include prioritization. A workflow could decide to execute a task, or to delay executing a task, based on its priority and what other tasks are running.

Oh, I just realized that a better way to use the database would be for the workflow to make a single database query that in a transaction would increment the count of the current priority executions only if no higher priority executions are executing. This would avoid a lower priority execution starting in between the two database calls I had mentioned before.

Why might you want to use Temporal for this? Why not just put the logic in a regular service without using Temporal?

Consider that if the system crashes, we could have recorded that we started a task and then not decremented the count again. This would block lower priority tasks from running until we went in and cleaned it up.

Using Temporal execution would be reliable, so even if a workflow or activity worker crashed, the workflow execution would resume so that it would finish the task and decrement the count.

But, where Temporal really becomes useful is when we have long running workflows. If all I needed to do was prioritize tasks and if something crashed I might for example just have the client retry like I would for a typical service, I might at least look to see if there was a simpler solution.

Thank you for your replies :slight_smile: I’ve come to the conclusion that I need to finish a (working) solution first, and then, decide how/if Temporal can/should be used to support it re failures. When it’s done in a week/two, I’ll reassess. The process doesn’t run for a year–but it will run overnight or for several days. I think I’ll have better refined questions when it’s complete.