Best practice to expose long running python activities to go orchestrator

we have a python service contains a few DB operations we want to execute in a workflow, some of the operation can take a longer time. There is also another go service where we typically define and trigger workflows. To execute the workflow, one way is to expose these operations from python service and call python api inside go service activities. But long running operations will need extra attention and we are also trying to not put resource pressure on the python service.

I wonder if I use unofficial python library to register temporal about those DB operations as activities, is that possible to define and trigger the workflow from go micro service? Will workflow engine call python micro services to run those DB related activities or run them in a lambda function fashion (without pressuring python service)?

Thanks!

1 Like

Invoking Python activities from Go workflow should work without any problem. Make sure that the Python activity worker listens on its own task queue and the Go process specifies that task queue name in the ActivityOptions.

Thanks for confirming, in this case sound like I will need both python worker and go worker, with python worker doing the DB work. Is there some sample code demonstrate this registration and trigger process?

I wonder what is needed on python side to register python activity in a go workflow. Tried some sample code in python and get connection error, wonder if I need to specify go workflow name somehow?
Thanks!

Activities Interface

class GreetingActivities:
@activity_method(task_queue=PYTHON_TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=1000))
async def compose_greeting(self, greeting: str, name: str) → str:
raise NotImplementedError

Activities Implementation

class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str):
return greeting + " " + name + " from python worker!"

async def client_main():
client = WorkflowClient.new_client(namespace=NAMESPACE, host=“temporal”)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(PYTHON_TASK_QUEUE)

worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
#worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
print("Worker factory start.....")

Worker factory start…
INFO:temporal.activity_loop:Activity task worker started: 31@3af54240a8e4
INFO:temporal.activity_loop:Activity loop ended
ERROR:temporal.activity_loop:activity_task_loop_func failed: , retrying in 3 seconds
Traceback (most recent call last):
File “/usr/local/lib/python3.7/site-packages/temporal/retry.py”, line 17, in retry_loop
await fp(*args, **kwargs)
File “/usr/local/lib/python3.7/site-packages/temporal/activity_loop.py”, line 39, in activity_task_loop_func
task = await service.poll_activity_task_queue(request=polling_request)
File “/usr/local/lib/python3.7/site-packages/temporal/api/workflowservice/v1.py”, line 893, in poll_activity_task_queue
PollActivityTaskQueueResponse,
File “/usr/local/lib/python3.7/site-packages/betterproto/init.py”, line 1132, in _unary_unary
await stream.send_message(request, end=True)
File “/usr/local/lib/python3.7/site-packages/grpclib/client.py”, line 241, in send_message
await self.send_request()
File “/usr/local/lib/python3.7/site-packages/grpclib/client.py”, line 173, in send_request
protocol = await self._channel.connect()
File “/usr/local/lib/python3.7/site-packages/grpclib/client.py”, line 696, in connect
self._protocol = await self._create_connection()
File “/usr/local/lib/python3.7/site-packages/grpclib/client.py”, line 681, in _create_connection
ssl=self._ssl,
File “/usr/local/lib/python3.7/asyncio/base_events.py”, line 913, in create_connection
type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)
File “/usr/local/lib/python3.7/asyncio/base_events.py”, line 1290, in _ensure_resolved
proto=proto, flags=flags)
File “/usr/local/lib/python3.7/asyncio/base_events.py”, line 792, in getaddrinfo
None, getaddr_func, host, port, family, type, proto, flags)
concurrent.futures._base.CancelledError

@firdaus do you have some sample for go workflow trigger python activities? Thanks!

@Bo_Gao Can you reach out in the #python-sdk slack channel. Quite a number over there do Go workflow → Python activity.

The polyglot sample doesn’t include Python, but still can be useful.

do you mean to add a tag for this post, or is there a slack channel link that I can join?

It is a slack channel. You can find the link to the Temporal Slack at the bottom of the temporal.io page.

@Bo_Gao

Try doing something like this

Python worker (activity):

import asyncio
import logging
from datetime import timedelta

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import WorkflowClient

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "python-activity-task-queue"
NAMESPACE = "default"

class HelloActivities:
    @activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=60))
    async def hello(self, name: str) -> str:
        return "Hello, " + name

async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(HelloActivities())
    factory.start()
    print("Worker started")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(worker_main())
    loop.run_forever()

Golang worker (workflow)

package main

import (
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
	"log"
	"time"
)

func Greetings(ctx workflow.Context, name string) (string, error) {

	options := workflow.ActivityOptions{
		TaskQueue:           "python-activity-task-queue",
		StartToCloseTimeout: 60 * time.Second,
	}
	ctx = workflow.WithActivityOptions(ctx, options)
	var result string
	err := workflow.ExecuteActivity(ctx, "HelloActivities::hello", name).Get(ctx, &result)
	if err != nil {
		return "", err
	}

	return result, nil
}

func main() {
	serviceClient, err := client.NewClient(client.Options{HostPort: "localhost:7233"})

	if err != nil {
		log.Fatalf("Unable to create client.  Error: %v", err)
	}

	w := worker.New(serviceClient, "golang-workflow-tq", worker.Options{})

	w.RegisterWorkflow(Greetings)

	err = w.Run(worker.InterruptCh())
	if err != nil {
		log.Fatalf("Unable to start worker.  Error: %v", err)

	}
}

Then you can run it with tctl like this:

% tctl workflow run --workflow_type "Greetings" --taskqueue "golang-workflow-tq" --execution_timeout 120 --input '"Bob2"'