Local Activity vs Activity

Hi,

can someone help explain the difference between the local activity vs activity?

4 Likes
  • Creates client stub to local activities that implement given interface. A local activity is
  • similar to a regular activity, but with some key differences: 1. Local activity is scheduled
  • and run by the workflow worker locally. 2. Local activity does not need Temporal server to
  • schedule activity task and does not rely on activity worker. 3. Local activity is for short
  • living activities (usually finishes within seconds). 4. Local activity cannot heartbeat.

This part I already know. To clarify, my question is what’s going to happen when a workflow has 2 local activity and 1 regular activity, and when the last regular activity fail, is entire workflow going to be retried, or only the 3rd one on a different machine?

Here is a sequence of steps to execute a single activity workflow.

  1. Workflow is started which adds a workflow task into a workflow task queue.
  2. A workflow task is received by a workflow worker listening to the workflow task queue.
  3. The workflow task is completed with ScheduleActivityTask command.
  4. An activity task is added to an activity task queue.
  5. The activity task is received by an activity worker.
  6. The activity task is completed by the activity worker.
  7. The workflow task is added to the workflow task queue.
  8. The workflow task with the result of the activity is received by a workflow worker.
  9. The workflow task is completed with CompleteWorkflowExecution command.

Here is a sequence of steps to execute a local activity.

  1. Workflow is started which adds a workflow task into a workflow task queue.
  2. A workflow task is received by a workflow worker listening to the workflow task queue.
  3. A workflow schedules a local activity task into the in-process local activity task queue.
  4. Local activity is executed by a local activity worker that listens to the in-process local activity task queue.
  5. The local activity task is completed back to the workflow.
  6. The workflow task is completed with RecordMarker and CompleteWorkflowExecution commands. The marker command contains the result of the local activity execution.

Note that steps 3, 4, and 5 are in memory of the worker and do not require any DB operations. So the number of updates in case of normal activity is 9 versus 3 in case of the local one. The difference becomes even more pronounced when multiple activities are executed in parallel.

The advantages of the local activity are clear. Here are the drawbacks:

  • It works only for short activities that do not exceed the workflow task timeout. It implies that heartbeating is not supported for local activities.
  • It is not efficient for long retries. When a retry interval exceeds the workflow task timeout a timer is used to schedule the next retry. This implies that multiple events are added to the history on each retry. Normal activities can be retried practically indefinitely.
  • Local activity has at least once semantic as a failure of a workflow task would lead to their re-execution. This includes reexecution of the whole sequences of local activities.
  • Local activity extends workflow task execution. While the task is running it cannot react to signals. So it increases the latency for signal handling.

We’ve seen multiple cases when use of local activities without understanding their limitations caused various production issues. So my advice is to stick to normal activities unless your use case requires very high throughput and large activity fanouts of very short activities.

In the long term, we plan to implement a mixed model. An activity invocation with a short timeout would start as a local activity, but in case of failure changed to a normal activity which is more efficient for long retries.

my question is what’s going to happen when a workflow has 2 local activity and 1 regular activity, and when the last regular activity fails, is entire workflow going to be retried, or only the 3rd one on a different machine?

It depends on how activities are chained together. Let’ say they are executed in a sequence. When 2 local activities complete the workflow task is completed. The completion request is going to contain three commands:

  1. RecordMarker with the result of local activity 1
  2. RecordMarker with the result of local activity 2
  3. ScheduleActivityTask for the activity 3

So when activity 3 fails the workflow knows that local activities are already executed by looking at the correspondent MarkerRecorded events and is not going to rerun them.

21 Likes

Appreciate the long and detailed answer Maxim! My colleague will post some follow up questions on the local state handling and state transfer. Thank you!

1 Like

Hello,

Could you please clarify what’s the recommended activity vs local activity vs direct IO from workflow in the following scenario:

  1. Read 100Mb payload from Cloud Storage.
  2. Split payload into 100Kb chunks on average. One chunk can be as big as 10Mb in extreme cases.
  3. Invoke 1000 parallel activities to process small payloads.
  4. Reconcile the result into 100Mb output payload and write back to Cloud Storage.

Looks like I have 2 options, if I want to avoid overload Temporal backend with IO:

  1. Use workflow with host-specific task queues & local files:
  • Use local activity to read 100Mb payload, save in local file.
  • Make workflow read local file, split it 1000 ways, write back to local fields.
  • Invoke multiple local host specific activities to process those files.
  • Once all is done, have final activity to combine everything from local files in a single result.
  • Write combined file back to Cloud Storage
  1. Read a file from Cloud storage from the workflow directly.
  • Ignore recommendation to not do IO from workflow itself.
  • Read 100Mb payload from Cloud storage.
  • Split it 1000 times.
  • Invoke 1000 activities / local activities to process chunks.
  • Reconcile, write to local file.
  • Invoke host-specific activity reading from local file and writing back to Cloud storage using activity.

If I want to avoid handling host machine failure manually it seems to me Option 2 is a way to go. If I choose #1 I’ll have to manually reschedule workflow from scratch to reconstruct local machine state.

1 Like

Artem,
Local activities are not a good fit for data processing scenarios as they are not tied to any specific host and can migrate across workers at any time.

Option 2 is similar to executing the whole process in a single activity. If you want to do this don’t use the local activity, but just a single normal activity that executes all the steps.

The design depends on what type of processing each chunk requires. If it is purely CPU intensive work then having an activity that iterates over records and executes the logic one chunk at a time hertbeating progress back is a way to go. You can consider running multiple such activities on different boxes if downloading the file (or part of the file) to each box is feasible.

But if processing each chunk is a complex operation that makes remote calls and can have its own lifecycle then you might end up creating a separate workflow for each chunk.

Thank you Maxim,

Each chunk represents a create or update RPC call to a remote server. I want to execute those as fast as possible. Putting everything in a single activity would work, but that would eliminate all of the checkpointing that activities provide in case a replay on a different host is necessary.

It sounds like you recommend reading a big file from Cloud storage, split it into separate work items and write smaller files back to Cloud Storage. Finally, read those smaller files back in subsequent activities that process chunks of data.

Let me use a simpler scenario to clarify my understanding.

Let’s say workflow does the following:

  1. Read media file from Cloud Storage (100Mb), save to local file

  2. Transcode media file (from local file)

  3. Save the result in Cloud Storage (from local file)

Let’s say I want to have 1, 2 and 3 as separate activities for checkpointing purposes. This means I have to use a host-specific task to make sure 2 and 3 run on the same host as #1.

What are possible/recommended ways to keep 1, 2 and 3 as separate activities. And at the same time allow Workflow to automatically retry on a different host successfully if workflow machine fails during #2 or #3? My understanding is that when workflow recovers on a different host, #1 will not execute again as it was previously complete and completion was recorded in the Temporal backend. That means #2 and #3 can’t succeed (local file does not exist).

That would mean the whole workflow would have to be cancelled and scheduled again by some external process.

It seems that only way to avoid that it move #1 from Activity to the Workflow itself doing IO. When workflow is replayed, the file will be read and stored at a new local host. With name accessible to subsequent activities.

Or is there some configuration setting that can force Activity #1 to rerun after workflow execution is moved to a different host?

Each chunk represents a create or update RPC call to a remote server. I want to execute those as fast as possible. Putting everything in a single activity would work, but that would eliminate all of the checkpointing that activities provide in case a replay on a different host is necessary.

Not all as an activity can save progress information in the heartbeat.

It sounds like you recommend reading a big file from Cloud storage, split it into separate work items and write smaller files back to Cloud Storage. Finally, read those smaller files back in subsequent activities that process chunks of data.

AFAIK at least S3 supports reading big files from any offset. So each activity would be able to download its part of the file directly from the blob store.

What are possible/recommended ways to keep 1, 2 and 3 as separate activities. And at the same time allow Workflow to automatically retry on a different host successfully if workflow machine fails during #2 or #3? My understanding is that when workflow recovers on a different host, #1 will not execute again as it was previously complete and completion was recorded in the Temporal backend. That means #2 and #3 can’t succeed (local file does not exist).

I think there is some confusion here. Use host-specific task lists to route activities only. Workflows are never expected to be linked to any specific machine. So if a workflow machine fails then no action is needed as the workflow is automatically migrated to any other machine. If the machine that executes activities fails then you have to execute the whole sequence from the beginning on a different machine as the cached file is lost.

See fileprocessing sample that demonstrates exactly this technique (Java and Go versions). Note that the Go version uses the higher-level abstraction of a Session which Java doesn’t implement yet.

Let’s say I used Workflow.retry(execute(activity1, activity2 (host-specific), activity3 (host-specific))). Let’s assume activity1, activity2 finished successfully. But activity3 failed.

What will be retried?

  1. activity1, activity2, activity3
  2. activity1, activity3 (because activity2 is recorded as completed successfully)

I assume #1 will happen (because framework don’t know whether #2 results can be reused when running in a new host). Does some method exist in the framework that would allow #2 to not be retried magically? Retry all non host specific activities that incomplete host-specific activities depend on. Reuse activity result, otherwise?

Actually, will

Workflow.retry(execute(activity1));
execute(activity2);
execute(activity3);

do what I want? Retry activity1 on any failure, execute activity2 & activity3 outside of retry clause, only if they failed before.

I’m confused about your requirements. I thought that you want to retry all three activities in the host goes down. So

Workflow.retry(() -> {
        execute(activity1);
        execute(activity2 (host-specific)); 
        activity3 (host-specific)
  });

implements this;

If you want to skip retrying activity2 in case of the retry you can do something like:

boolean activity2Executed = false;

Workflow.retry(() -> {
        execute(activity1);
        if (!activity2Executed) {
           execute(activity2 (host-specific)); 
           activity2Executed = true;
        }
        activity3 (host-specific)
  });

Note that there is no “magically” in the workflow code. It is just code and you use your normal programming techniques to achieve what you want.

I want fastest possible, maximum parallelism possible execution that has as close as possible to exactly once semantics. Trying to understand the limits of the framework of what it will do for me. And what I’ll have to implement myself. I’ll play more with it to see what happens with different kinds of failure.
So trying to achieve something like below:

Workflow.retry(() -> {
        result1 = execute(activity1); // I want this to execute as it populates machine specific context.
        result2 = execute(activity2(result1)); // I want this to return recorded activity result during retry instead of brand new execution.
        result3 = execute(activity3(result1, result2)); // I want this to execute as it failed before.
  });
1 Like

I think the following implements the requirements:

Activity2ResultType activity2Result;

Workflow.retry(() -> {
        execute(activity1);
        if (activy2Result == null) {
           activity2Result = execute(activity2 (host-specific)); 
        }
        activity3 (host-specific)
  });
2 Likes

Hi Maxim, I have a follow-up question here.

  1. our center is in a different DC from the temporal cloud which indicates a complicated network environment, says from Aliyun to AWS.
  2. our activities are just CRUD simple logic of DB.

in this case, I am thinking of changing all activities into local activities. But I am concerned about the 2 cases, I want to explain them using this pseudo-code:

1. activity-1: save some data
2. activity-2: query data-A from DB
3. some workflow logic 
4. activity-3: send out a Kafka message
5. wait on a signal based on 
6. activity-4: send out a Kafka message
7. activity-5: save some data

concern 1) more re-execution in abnormal cases: since “local activities only send an event to the cloud when all the locals are finished while normal activity sends the event to the cloud one by one” if I change them to local activities if it breaks in No4 halfway, all of the activities from No1-No4 are missing from temporal’s cloud sight, and will be re-executed next time.

concern 2) you mentioned that retries are always okay for normal activities. I want these activities to retry forever, will the local activities stop me from that?

The change to local comes from a simple idea that since the activity messages go from DC-1 to DC-2, it costs too much if the contents of the activity is just sending out some message to kafka or query some data from our local DC’s cluster.

concern 1)

This is correct. The whole sequence from 1 to 4 will be reexecuted on worker failure. A worker failure is expected a pretty rare event. So I wouldn’t be too concerned about a performance hit in this case. Obviously, you need to make sure that such re-execution doesn’t break functional correctness.

concern 2)

Local activities can be retried for a long time. The problem is that each retry appends multiple events to the history. So very long retries can lead to a workflow history exceeding its limit.

The change to local comes from a simple idea that since the activity messages go from DC-1 to DC-2, it costs too much if the contents of the activity is just sending out some message to kafka or query some data from our local DC’s cluster.

I would measure the performance with normal activities in your setup and start thinking about optimizations only if needed.

1 Like

I made some experiments and found that :

1. activity-1: save some data
2. activity-2: query data-A from DB
3. some workflow logic 
4. activity-3: send out a Kafka message
5. wait on a signal based on 
6. activity-4: send out a Kafka message
7. activity-5: save some data

for concern 1):
I find that if LocalActivity-1 succeeded and LocalActivity-2 failed, the SDK uploaded MarkerRecord of success of 1 to the cloud so that it does not retry from the beginning, and start another round of workflow tasks from LocalActivity -2; If both of the local activities succeeded, the SDK just upload all the markerRecords of them together to the cloud.

So what do you mean by referring to the failure of the workflow and restart from the beginning? Seem some gap in understanding here.

another finding:
for local activity, the event history doesn’t record its name …emm hard to read from the events

another suggestion:
the input window is very small, and I feel clumsy putting complex ideas and well-formatted explanations in it.

So what do you mean by referring to the failure of the workflow and restart from the beginning? Seem some gap in understanding here.

I meant not a particular activity failure, but a crash of the workflow worker while the second activity is executing. It would cause workflow task retry that would reexecute both activities.

1 Like

Please file issues for the UI improvements.

1 Like

I seem to find another question about local activity. I read some go-SDK code, not very sure if the local activity affects queries? My basic understanding is:

  1. though queries are read-only, they must have the current runtime context of the running workflow, so they must be handled by the same worker handling the queried workflow execution.
  2. And I find query is only handled by the workflow contexts.CompleteWorkflowTask which is a heartbeat function of the local activity, so if I change the activities to local, the queries may become slow right?

if this is the case, I suppose workflow execution should use one thread for local activities and one thread for pure orchestration…T_T doing them in the same processTask function seems to make things complicated

Yes, local activities do delay queries as a query cannot be delivered to the worker while local activity is running until the workflow task heartbeat. So if a low query latency is a requirement then local activities are not a good fit.

1 Like