I am trying to dynamically create a new workflow class for every new workflow definition (JSON format) that the system receives. I have defined a the run method with the workflow.run decorator in the base class. I create a new class using the type function and pass the base class. I then decorate the new class with the workflow.defn decorator.
from temporalio import activity, workflow
from activities import compose_greeting
from dataclasses import dataclass
from datetime import timedelta
class BaseWorkflow:
@workflow.run
async def run(self, name:str, definition: dict) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
# Code to start parsing workflow
pass
new_workflow = type("New",(BaseWorkflow,),{})
new_workflow = workflow.defn(new_workflow)
However when I run the code, I get the following error
ValueError: Invalid workflow class for 2 reasons: @workflow.run method run must be defined on New, @workflow.run defined on BaseWorkflow.run but not on the override