Question about using either Workflow or Activity for long running Google Pubsub Subscription

On our service, we have a Google Pubsub Subscription that is always waiting for messages all the time. We used to have a systemd unit for it but we are planning to move it to Temporal. So my question is, should I add the logic on the Workflow or Activity? I currently have it implemented on the workflow but I was thinking of using Activity because it might be non-deterministic but not really sure. I believe both implementation would retry if ever the subscriber ever goes down or throws an error. Here is an example of that workflow:

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;

public class FooWorkflowImpl implements FooWorkflow {
    @Override
    public void start() {
        String projectId = "foo";
        String subscriptionId = "foo-subscription";

        try {
            ProjectSubscriptionName subscriptionName =
                    ProjectSubscriptionName.of(projectId, subscriptionId);

            MessageReceiver receiver =
                    (PubsubMessage message, AckReplyConsumer consumer) -> {
                        ByteString data = message.getData();

                        try {
                            // DO SOMETHING HERE
                        } catch (Exception e) {
                            System.out.printf("%s\n", e);
                        }
                        consumer.ack();
                    };

            Subscriber subscriber = null;
            try {
                subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
                subscriber.startAsync().awaitRunning();
                // Keep listening
                subscriber.awaitTerminated();
            } catch (IllegalStateException illegalStateException) {
                // Shut down the subscriber after 30s. Stop receiving messages.
                assert subscriber != null;
                subscriber.stopAsync();
            }
        }  catch (Exception e) {
            System.out.printf("%s\n", e);
        }

    }
}

Thanks in advance

Hi @aalmacin,

It’s recommended that your code that might be interacting with external systems, such as databases, services, any uses libraries that depend on / use system time, etc be implemented as activities.

For your use case, it’s typical to have and outside consumer that consumes events and sends them as signals to workflows executions that are responsible for handling them.
In that case your subscriber can be a process that receives events and signals your workflow upon receiving them. Your workflow implementation would wait on signals then invoke activities that perform the
// DO SOMETHING HERE
operations.

Just to add regarding workflow determinism:

You can see workflow implementation constraints here.
Some other thing to avoid:

  • Don’t use explicit synchronization in your workflow code.
  • You can use non-static fields in your workflow definition without having to worry about isolation issues.
  • For static fields use io.temporal.workflow.WorkflowLocal or io.temporal.workflow.WorkflowThreadLocal depending on your use case You can use Atomic variables, but not really needed.
  • Don’t use synchronized lists, as that will break workflow determinism

@tihomir Thanks for the answer. If I understand correctly, what you meant by “consumer that consumes events and sends them as signals to workflows” is that the consumer is a separate process outside of temporal. Is that similar to what we currently have which is a systemd unit executing the pubsub subscriber script. With the suggestion, do you mean we should have:

  • pubsub process - separate script that runs outside of temporal
  • a single workflow - receives signal from the pubsub process
  • activities - activities that are run everytime a signal is received

If that’s the case, instead of having the pubsub subscriber inside a workflow/activity, we will have it outside of temporal. We were hoping to get rid of the systemd unit and only worry about running temporal.

Temporal doesn’t run your code for workflows and activities. So you have to run some process that hosts them. The same process can host the pubsub consumer.

Thanks for the response @maxim. If I understand correctly, it aligns with what I am planning to do. Here is what I thought, I will still need to run a process to run my Temporal workers. Here is what I thought I would need infrastructure-wise

Deploy

  • Temporal Cluster
  • Service - runs the Temporal workers on a process and Pubsub script as a separate process

I currently have a separate process for Temporal workers and another for Pubsub. I will have a systemd for both but I am just wondering if I can utilize the workflow + activity so that I only need to worry about running workers on my server.

Let me know if I’m off on my assumptions. Thanks

Your approach looks good to me. Consider running the pubsub consumer in the same process if you want to minimize the number of processes.