I’m a Temporal Newbie that suspects that I might be reinventing the wheel with an overly complex solution.
Question: Is there a Temporal native way to enforce a hierarchy of multiple TaskQueues: Forced, high, medium, and low?
As example, ALL ‘forced’ priority tasks must complete before ANY ‘high’ priority tasks can be executed…and so on with medium and low priority.
Is there a more Temporal-friendly way to accomplish the below with less code? Any push in the right direction is appreciated as I’m still deep in the learning curve of Temporal: It still feels heavy and complex.
import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker
from temporalio.exceptions import WorkflowAlreadyStartedError
from app.debug.process_prompt.llm_workflow import PromptWorkflow, PriorityLevel, TaskQueueManager
from app.debug.process_prompt.llm_activities import generate_llm_completion
from app.debug.process_prompt.prompt_config import PROMPTS
workflow_logger = logging.getLogger("PromptWorkflow")
logging.basicConfig(level=logging.INFO)
async def execute_workflow(client, workflow_id, prompts, priority):
"""
Execute a single workflow with retry logic for already started workflows.
This function attempts to execute a workflow and handles the case where
the workflow is already running by terminating it and retrying.
"""
while True:
try:
# Execute the workflow using Temporal
result = await client.execute_workflow(
"PromptWorkflow",
{"prompts": prompts},
id=workflow_id,
task_queue=TaskQueueManager.route_task(PriorityLevel(priority)),
)
workflow_logger.info(f"Workflow {workflow_id} results: {result}")
return result
except WorkflowAlreadyStartedError:
# If the workflow is already running, terminate it and retry
workflow_logger.info(f"Workflow {workflow_id} already exists. Cancelling and restarting.")
await client.get_workflow_handle(workflow_id).terminate()
await asyncio.sleep(2) # Wait before retrying
except Exception as e:
workflow_logger.error(f"Unexpected error in workflow {workflow_id}: {e}")
return None
async def execute_priority_group(client, prompts, priority):
"""
Execute a group of prompts with the same priority concurrently.
This function uses asyncio.TaskGroup to run multiple workflows
in parallel for prompts with the same priority level.
"""
async with asyncio.TaskGroup() as tg:
for i, prompt_set in enumerate(prompts):
workflow_id = f"{priority}-workflow-id-{i}"
tg.create_task(execute_workflow(client, workflow_id, [prompt_set], priority))
async def execute_prompts_by_priority(client, prompts):
"""
Execute prompts grouped by their priority levels.
This function organizes prompts into priority groups and executes them
sequentially, ensuring higher priority prompts are processed first.
"""
# Group prompts by priority
priority_groups = {level: [] for level in PriorityLevel}
for prompt in prompts:
priority_str = prompt.get("priority", "medium").lower()
priority = PriorityLevel(priority_str)
priority_groups[priority].append(prompt)
# Execute prompts for each priority level
for priority in PriorityLevel:
if priority_groups[priority]:
workflow_logger.info(f"Executing {priority.value} priority prompts")
await execute_priority_group(client, priority_groups[priority], priority.value)
# Wait for all workflows in this priority group to complete before moving to the next
handles = client.list_workflows(query=f"WorkflowType='PromptWorkflow' AND TaskQueue='{TaskQueueManager.route_task(priority)}'")
async for handle in handles:
try:
await handle.result()
except Exception as e:
workflow_logger.error(f"Error in workflow {handle.id}: {e}")
# TODO Check if Retry Logic Is Working/Implemented
# TODO Is LLM Logic Returning everything properly
# TODO ask community about Python Debugger and other issues
# TODO Attempt to add a ArangoDB Insert in workflow.
# TODO With LLM Tool Calls, try Recursion with a Temporal child workflow over an activity
async def main():
"""
Main function to set up and run the workflow execution process.
This function initializes the Temporal client, sets up workers for each
priority level, and executes the prompts using the priority-based system.
"""
# Connect to the Temporal server
client = await Client.connect("localhost:7233")
# Create workers for each priority level
workers = {
priority: Worker(
client,
task_queue=TaskQueueManager.route_task(priority),
workflows=[PromptWorkflow],
activities=[generate_llm_completion]
) for priority in PriorityLevel
}
async with asyncio.TaskGroup() as tg:
# Start all workers concurrently
for worker in workers.values():
tg.create_task(worker.run())
workflow_logger.info("All task workers started")
try:
# Execute prompts based on their priority
await execute_prompts_by_priority(client, PROMPTS)
except* asyncio.ExceptionGroup as eg:
# Handle exceptions from the TaskGroup
for exc in eg.exceptions:
workflow_logger.error(f"Error in TaskGroup: {exc}")
except* Exception as e:
# Handle any unexpected errors
workflow_logger.error(f"Unexpected error: {e}")
if __name__ == "__main__":
asyncio.run(main())
PROMPTS for reference:
[ {
"service": "anthropic",
"prompt": "What is the weather in London? Provide an answer only in JSON format",
"tools": ["get_weather"],
"model": "claude-3-5-sonnet-20240620",
"json_mode": True,
"priority": "high"
},
{
"service": "groq",
"prompt": "What is the weather in Dublin? Provide an answer only in JSON format",
"tools": ["all_tools"],
"model": "mixtral-8x7b-32768",
"json_mode": True,
"priority": "high"
},
...
]