Python activity worker in parallel

Hi everyone.

I am writing a system which basically runs in JVM. But there is some activities that I need to implement in Python.
I created an activities-only worker in Python and tested. I noticed that the tasks are running one by one so the remaining tasks are waiting in pending state.
In Java worker everything work as expected in parallel. Is it possible to get the same behaviour in python?

In Python they are expected to be parallel too. Many users run many hundreds of activities concurrently. Are you using async activities or sync activities in Python? If the former, make sure you are not doing anything blocking the thread. If the latter, make sure your thread pool executor has enough max workers and you aren’t setting max concurrent activities to a low value.

I am quite novice in Python, maybe I am missing something

I have worker code like this

async def main():
    parser = argparse.ArgumentParser(description='Python action worker')
    parser.add_argument('--temporal_address', type=str, help="Address of temporal server", required=True)
    parser.add_argument('--temporal_namespace', type=str, help="Name of temporal namespace", required=True)
    parser.add_argument('--queue', type=str, help='Queue this worker will listen to', required=True)
    args = parser.parse_args()

    client = await Client.connect(target_host=args.temporal_address, namespace=args.temporal_namespace)

    worker = Worker(client, task_queue=args.queue, activities=[run_python_notebook])
    await worker.run()


if __name__ == "__main__":
    asyncio.run(main())

And activity code like this:

import papermill as pm

from temporalio import activity
from temporalio.exceptions import ApplicationError
from papermill import PapermillExecutionError


@activity.defn(name='run_python_notebook')
async def run_python_notebook(args):
    notebook = args["notebook"]
    notebook_name = args["name"]
    notebook_args = args["parameters"]
    output_cell = args.get('outputCell')

    print(f"Settings arguments to {str(notebook_args)}")

    try:
        nb = pm.execute_notebook(
            notebook,
            f"executed_{notebook_name}.ipynb",
            parameters=notebook_args,
            progress_bar=False
        )
    except PapermillExecutionError as e:
        msg = f'Error executing the notebook: {e.ename}: {e.evalue}'
        print(msg)
        raise ApplicationError(
            message=f'Error executing the notebook {notebook}: {e.ename}: {e.evalue}',
            type="notebook_execution_error",
            non_retryable=True
        )

    print(f"Notebook {notebook} has been executed")

    if output_cell is None:
        return None

    cells = list(filter(lambda c: hasattr(c, 'execution_count') and c.execution_count == output_cell, nb.cells))
    cell = cells[0] if cells else None

    if cell is not None:
        outputs = cell.outputs
        output = outputs[0] if outputs else None
        return output.data.get('text/plain') if output else None

    return None

And Java workflow pushes 3 python notebooks to be executed and I see from logs they run one by one :frowning:

This appears to be dangerous thread-blocking code which is not allowed in async def functions in Python. You’ll stop the whole processing of the async system blocking the asyncio thread. You should not make this activity async since it’s not async-capable. Make it a normal def and then you’ll be required to pass in an activity_executor to the worker (usually best as a thread pool executor). See types of activities.

1 Like

Yes, that helped. Now I got the idea.
Thanks a lot!