Newbie: Temporal says activity decorator is missing, when not missing (activity is async)

Hi I’m trying to get a simple example to work within Temporal. I keep getting the error:
TypeError: Activity <unknown> missing attributes, was it decorated with @activity.def

Can anyone help a Newbie out–I can bribe with good Karma?

run_worker.py

import os

# Add the directory containing 'activities' and 'workflows' to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

import asyncio
import logging
from temporalio import worker
from temporalio.client import Client
from activities.openai_activities import OpenAIActivities
from workflows.openai_workflow import OpenAIWorkflow

# Set up logging
logger = logging.getLogger("worker")
logging.basicConfig(level=logging.INFO)

async def main():
    logger.info("Starting the Temporal worker...")
    try:
        client = await Client.connect("localhost:7233")
        logger.info("Connected to Temporal server.")

        activities = [OpenAIActivities()]
        workflows = [OpenAIWorkflow]

        worker_instance = worker.Worker(
            client,
            task_queue="openai-task-queue",
            workflows=workflows,
            activities=activities
        )

        logger.info("Worker instance created, now running...")
        await worker_instance.run()
        logger.info("Worker is running.")
    except Exception as e:
        logger.error(f"Error in running worker: {e}", exc_info=True)

if __name__ == "__main__":
    asyncio.run(main())

openai_activities.py

import logging
from temporalio import activity
import os
from dotenv import load_dotenv


load_dotenv()
openai.api_key = os.environ.get('OPENAI_API_KEY')

# Set up logging
logger = logging.getLogger("OpenAIActivities")
logging.basicConfig(level=logging.INFO)


class OpenAIActivities:
    @activity.defn(name="call_openai_api")  # Ensure the activity is decorated and named
    async def call_openai_api(self, prompt: str) -> str:
        logger.info(f"Calling OpenAI API with prompt: {prompt}")
        try:
            client = openai.AsyncOpenAI(api_key=os.getenv('OPENAI_API_KEY'))
            response = await client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=100
            )
            result = response['choices'][0]['message']['content'].strip()
            logger.info(f"Received response: {result}")
            return result
        except Exception as e:
            logger.error(f"Error calling OpenAI API: {e}", exc_info=True)
            raise

main.py

import asyncio
import logging
from temporalio import client

# Set up logging
logger = logging.getLogger("main")
logging.basicConfig(level=logging.INFO)

async def main():
    temporal_client = await client.connect("localhost:7233")

    prompts = ["What is the capital of Spain?", "What is the most common color of an apple?", "What is the meaning of life?"]
    try:
        result = await temporal_client.execute_workflow(
            workflow_type="OpenAIWorkflow.run",
            workflow_args=[prompts],
            id="openai-workflow-id",
            task_queue="openai-task-queue"
        )
        logger.info(f"Workflow results: {result}")
    except Exception as e:
        logger.error(f"Failed to execute workflow: {e}", exc_info=True)

if __name__ == "__main__":
    asyncio.run(main())

Activities are the decorated callables, not the containing class. This should be activities = [OpenAIActivities().call_openai_api] (or you can instantiate the OpenAIActivities earlier instead of inline).

thank you for your response. With that change to the code, run_worker executes without error:
When running main.py I get the below:

{
  "message": "Workflow class OpenAIWorkflow.run is not registered on this worker, available workflows: OpenAIWorkflow",
  "stackTrace": "  File \"/Users/robert/Desktop/dev/tutorials/temporal/data_pipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_workflow.py\", line 232, in _handle_activation\n    workflow = self._create_workflow_instance(act, start_job)\n               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/Users/robert/Desktop/dev/tutorials/temporal/data_pipeline/.venv/lib/python3.11/site-packages/temporalio/worker/_workflow.py\", line 345, in _create_workflow_instance\n    raise temporalio.exceptions.ApplicationError(\n",
  "applicationFailureInfo": {
    "type": "NotFoundError"
  }
}

getting this simple example to work (so I can learn for my own project) is greatly appreciated.

The workflow type is “OpenAIWorkflow”, though most use the typesafe form of execute_workflow(OpenAIWorkflow.run, prompts, ....

with those parameters, I get:

   result = await temporal_client.execute_workflow(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Client.execute_workflow() got an unexpected keyword argument 'workflow_type'

are there different parameters that Temporal expects?

Looking at a Temporal example, those parameters don’t seem to be in there

Right, there is no workflow_type argument. Which Temporal sample uses workflow_type=? Can you link it so we can to correct it? This is the common way:

For posterity, this simple openai async example completes successfully
Hopefully, the temporal learning curve will flatten…at least a little.
Thanks for pointing me in the right direction :slight_smile:

run_worker.py

import sys
import os

# Add the directory containing 'activities' and 'workflows' to the Python path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

import asyncio
import logging
from temporalio import worker, activity
from temporalio.client import Client
from activities.openai_activities import OpenAIActivities
from workflows.openai_workflow import OpenAIWorkflow

# Set up logging
logger = logging.getLogger("worker")
logging.basicConfig(level=logging.INFO)

async def main():
    logger.info("Starting the Temporal worker...")
    try:
        client = await Client.connect("localhost:7233")
        logger.info("Connected to Temporal server.")
        
        # Create an instance of OpenAIActivities
        openai_activities = OpenAIActivities()

        activities = [openai_activities.call_openai_api]
        workflows = [OpenAIWorkflow]

        worker_instance = worker.Worker(
            client,
            task_queue="openai-task-queue",
            workflows=workflows,
            activities=activities
        )

        activity.logger.info("Worker instance created, now running...")
        await worker_instance.run()
        activity.logger.info("Worker is running.")
    except Exception as e:
        activity.logger.error(f"Error in running worker: {e}", exc_info=True)

if __name__ == "__main__":
    asyncio.run(main())

main.py

import logging
from temporalio import activity
from temporalio.client import Client
from workflows.openai_workflow import OpenAIWorkflow

# Set up logging
logger = logging.getLogger("main")
logging.basicConfig(level=logging.INFO)

async def main():
    temporal_client = await Client.connect("localhost:7233")

    prompts = ["What is the capital of Spain?", "What is the most common color of an apple?", "What is the meaning of life?"]
    try:
        result = await temporal_client.execute_workflow(
            OpenAIWorkflow.run,
            prompts,
            id="openai-workflow-id",
            task_queue="openai-task-queue"
        )
        activity.logger.info(f"Workflow results: {result}")
    except Exception as e:
        activity.logger.error(f"Failed to execute workflow: {e}", exc_info=True)

if __name__ == "__main__":
    asyncio.run(main())

openai_activities

import logging
from temporalio import activity
import os
from dotenv import load_dotenv
import asyncio

load_dotenv()
openai.api_key = os.environ.get('OPENAI_API_KEY')

# Set up logging
logger = logging.getLogger("OpenAIActivities")
logging.basicConfig(level=logging.INFO)


class OpenAIActivities:
    @activity.defn(name="call_openai_api")  # Ensure the activity is decorated and named
    async def call_openai_api(self, prompt: str) -> str:
        logger.info(f"Calling OpenAI API with prompt: {prompt}")
        try:
            client = openai.AsyncOpenAI(api_key=os.getenv('OPENAI_API_KEY'))
            response = await client.chat.completions.create(
                model="gpt-4o",
                messages=[
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt}
                ],
                max_tokens=100
            )
            result = response.choices[0].message.content.strip()
            logger.info(f"Received response: {result}")
            # result = 'Is this thing on'
            return result
        except Exception as e:
            logger.error(f"Error calling OpenAI API: {e}", exc_info=True)
            raise

def test_openai_activities():
    async def run_test():
        activity_instance = OpenAIActivities()
        prompt = "What is the capital of France?"
        try:
            result = await activity_instance.call_openai_api(prompt)
            print(f"Test result: {result}")
        except Exception as e:
            print(f"Test failed with error: {e}")

    asyncio.run(run_test())

# Uncomment the following line to run the test function
if __name__ == "__main__":
    test_openai_activities()

openai_workflow.py

from datetime import timedelta
import logging
import asyncio
# with workflow.unsafe.imports_passed_through():
#        import openai

# Configure the logger for the workflow
workflow_logger = logging.getLogger("OpenAIWorkflow")

@workflow.defn(name="OpenAIWorkflow") 
class OpenAIWorkflow:
    @workflow.run
    async def run(self, prompts: list) -> list:
        results = []
        futures = []

        for prompt in prompts:
            # Log the processing of each prompt at a high level
            # workflow.side_effect(lambda: workflow_logger.info(f"Processing prompt: {prompt}"))
            workflow_logger.info(f"Processing prompt: {prompt}")
            future = workflow.execute_activity(
                "call_openai_api",  # Use the activity by name
                args=[prompt],
                start_to_close_timeout=timedelta(seconds=30)
            )
            futures.append(future)
        
        results = await asyncio.gather(*futures)
        # results = await workflow.await_all(*futures)
        #workflow.side_effect(lambda: workflow_logger.info(f"Workflow completed with results: {results}"))
        return results