Hi I’m trying to get a simple example to work within Temporal. I keep getting the error: TypeError: Activity <unknown> missing attributes, was it decorated with @activity.def
Can anyone help a Newbie out–I can bribe with good Karma?
run_worker.py
import os
# Add the directory containing 'activities' and 'workflows' to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import asyncio
import logging
from temporalio import worker
from temporalio.client import Client
from activities.openai_activities import OpenAIActivities
from workflows.openai_workflow import OpenAIWorkflow
# Set up logging
logger = logging.getLogger("worker")
logging.basicConfig(level=logging.INFO)
async def main():
logger.info("Starting the Temporal worker...")
try:
client = await Client.connect("localhost:7233")
logger.info("Connected to Temporal server.")
activities = [OpenAIActivities()]
workflows = [OpenAIWorkflow]
worker_instance = worker.Worker(
client,
task_queue="openai-task-queue",
workflows=workflows,
activities=activities
)
logger.info("Worker instance created, now running...")
await worker_instance.run()
logger.info("Worker is running.")
except Exception as e:
logger.error(f"Error in running worker: {e}", exc_info=True)
if __name__ == "__main__":
asyncio.run(main())
openai_activities.py
import logging
from temporalio import activity
import os
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.environ.get('OPENAI_API_KEY')
# Set up logging
logger = logging.getLogger("OpenAIActivities")
logging.basicConfig(level=logging.INFO)
class OpenAIActivities:
@activity.defn(name="call_openai_api") # Ensure the activity is decorated and named
async def call_openai_api(self, prompt: str) -> str:
logger.info(f"Calling OpenAI API with prompt: {prompt}")
try:
client = openai.AsyncOpenAI(api_key=os.getenv('OPENAI_API_KEY'))
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
max_tokens=100
)
result = response['choices'][0]['message']['content'].strip()
logger.info(f"Received response: {result}")
return result
except Exception as e:
logger.error(f"Error calling OpenAI API: {e}", exc_info=True)
raise
main.py
import asyncio
import logging
from temporalio import client
# Set up logging
logger = logging.getLogger("main")
logging.basicConfig(level=logging.INFO)
async def main():
temporal_client = await client.connect("localhost:7233")
prompts = ["What is the capital of Spain?", "What is the most common color of an apple?", "What is the meaning of life?"]
try:
result = await temporal_client.execute_workflow(
workflow_type="OpenAIWorkflow.run",
workflow_args=[prompts],
id="openai-workflow-id",
task_queue="openai-task-queue"
)
logger.info(f"Workflow results: {result}")
except Exception as e:
logger.error(f"Failed to execute workflow: {e}", exc_info=True)
if __name__ == "__main__":
asyncio.run(main())
Activities are the decorated callables, not the containing class. This should be activities = [OpenAIActivities().call_openai_api] (or you can instantiate the OpenAIActivities earlier instead of inline).
thank you for your response. With that change to the code, run_worker executes without error:
When running main.py I get the below:
{
"message": "Workflow class OpenAIWorkflow.run is not registered on this worker, available workflows: OpenAIWorkflow",
"stackTrace": " File \"/Users/robert/Desktop/dev/tutorials/temporal/data_pipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_workflow.py\", line 232, in _handle_activation\n workflow = self._create_workflow_instance(act, start_job)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n File \"/Users/robert/Desktop/dev/tutorials/temporal/data_pipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_workflow.py\", line 345, in _create_workflow_instance\n raise temporalio.exceptions.ApplicationError(\n",
"applicationFailureInfo": {
"type": "NotFoundError"
}
}
getting this simple example to work (so I can learn for my own project) is greatly appreciated.
For posterity, this simple openai async example completes successfully
Hopefully, the temporal learning curve will flatten…at least a little.
Thanks for pointing me in the right direction
run_worker.py
import sys
import os
# Add the directory containing 'activities' and 'workflows' to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import asyncio
import logging
from temporalio import worker, activity
from temporalio.client import Client
from activities.openai_activities import OpenAIActivities
from workflows.openai_workflow import OpenAIWorkflow
# Set up logging
logger = logging.getLogger("worker")
logging.basicConfig(level=logging.INFO)
async def main():
logger.info("Starting the Temporal worker...")
try:
client = await Client.connect("localhost:7233")
logger.info("Connected to Temporal server.")
# Create an instance of OpenAIActivities
openai_activities = OpenAIActivities()
activities = [openai_activities.call_openai_api]
workflows = [OpenAIWorkflow]
worker_instance = worker.Worker(
client,
task_queue="openai-task-queue",
workflows=workflows,
activities=activities
)
activity.logger.info("Worker instance created, now running...")
await worker_instance.run()
activity.logger.info("Worker is running.")
except Exception as e:
activity.logger.error(f"Error in running worker: {e}", exc_info=True)
if __name__ == "__main__":
asyncio.run(main())
main.py
import logging
from temporalio import activity
from temporalio.client import Client
from workflows.openai_workflow import OpenAIWorkflow
# Set up logging
logger = logging.getLogger("main")
logging.basicConfig(level=logging.INFO)
async def main():
temporal_client = await Client.connect("localhost:7233")
prompts = ["What is the capital of Spain?", "What is the most common color of an apple?", "What is the meaning of life?"]
try:
result = await temporal_client.execute_workflow(
OpenAIWorkflow.run,
prompts,
id="openai-workflow-id",
task_queue="openai-task-queue"
)
activity.logger.info(f"Workflow results: {result}")
except Exception as e:
activity.logger.error(f"Failed to execute workflow: {e}", exc_info=True)
if __name__ == "__main__":
asyncio.run(main())
openai_activities
import logging
from temporalio import activity
import os
from dotenv import load_dotenv
import asyncio
load_dotenv()
openai.api_key = os.environ.get('OPENAI_API_KEY')
# Set up logging
logger = logging.getLogger("OpenAIActivities")
logging.basicConfig(level=logging.INFO)
class OpenAIActivities:
@activity.defn(name="call_openai_api") # Ensure the activity is decorated and named
async def call_openai_api(self, prompt: str) -> str:
logger.info(f"Calling OpenAI API with prompt: {prompt}")
try:
client = openai.AsyncOpenAI(api_key=os.getenv('OPENAI_API_KEY'))
response = await client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
max_tokens=100
)
result = response.choices[0].message.content.strip()
logger.info(f"Received response: {result}")
# result = 'Is this thing on'
return result
except Exception as e:
logger.error(f"Error calling OpenAI API: {e}", exc_info=True)
raise
def test_openai_activities():
async def run_test():
activity_instance = OpenAIActivities()
prompt = "What is the capital of France?"
try:
result = await activity_instance.call_openai_api(prompt)
print(f"Test result: {result}")
except Exception as e:
print(f"Test failed with error: {e}")
asyncio.run(run_test())
# Uncomment the following line to run the test function
if __name__ == "__main__":
test_openai_activities()
openai_workflow.py
from datetime import timedelta
import logging
import asyncio
# with workflow.unsafe.imports_passed_through():
# import openai
# Configure the logger for the workflow
workflow_logger = logging.getLogger("OpenAIWorkflow")
@workflow.defn(name="OpenAIWorkflow")
class OpenAIWorkflow:
@workflow.run
async def run(self, prompts: list) -> list:
results = []
futures = []
for prompt in prompts:
# Log the processing of each prompt at a high level
# workflow.side_effect(lambda: workflow_logger.info(f"Processing prompt: {prompt}"))
workflow_logger.info(f"Processing prompt: {prompt}")
future = workflow.execute_activity(
"call_openai_api", # Use the activity by name
args=[prompt],
start_to_close_timeout=timedelta(seconds=30)
)
futures.append(future)
results = await asyncio.gather(*futures)
# results = await workflow.await_all(*futures)
#workflow.side_effect(lambda: workflow_logger.info(f"Workflow completed with results: {results}"))
return results