Hi,
can someone help explain the difference between the local activity vs activity?
Hi,
can someone help explain the difference between the local activity vs activity?
This part I already know. To clarify, my question is what’s going to happen when a workflow has 2 local activity and 1 regular activity, and when the last regular activity fail, is entire workflow going to be retried, or only the 3rd one on a different machine?
Here is a sequence of steps to execute a single activity workflow.
ScheduleActivityTask
command.CompleteWorkflowExecution
command.Here is a sequence of steps to execute a local activity.
RecordMarker
and CompleteWorkflowExecution
commands. The marker command contains the result of the local activity execution.Note that steps 3, 4, and 5 are in memory of the worker and do not require any DB operations. So the number of updates in case of normal activity is 9 versus 3 in case of the local one. The difference becomes even more pronounced when multiple activities are executed in parallel.
The advantages of the local activity are clear. Here are the drawbacks:
We’ve seen multiple cases when use of local activities without understanding their limitations caused various production issues. So my advice is to stick to normal activities unless your use case requires very high throughput and large activity fanouts of very short activities.
In the long term, we plan to implement a mixed model. An activity invocation with a short timeout would start as a local activity, but in case of failure changed to a normal activity which is more efficient for long retries.
my question is what’s going to happen when a workflow has 2 local activity and 1 regular activity, and when the last regular activity fails, is entire workflow going to be retried, or only the 3rd one on a different machine?
It depends on how activities are chained together. Let’ say they are executed in a sequence. When 2 local activities complete the workflow task is completed. The completion request is going to contain three commands:
So when activity 3 fails the workflow knows that local activities are already executed by looking at the correspondent MarkerRecorded events and is not going to rerun them.
Appreciate the long and detailed answer Maxim! My colleague will post some follow up questions on the local state handling and state transfer. Thank you!
Hello,
Could you please clarify what’s the recommended activity vs local activity vs direct IO from workflow in the following scenario:
Looks like I have 2 options, if I want to avoid overload Temporal backend with IO:
If I want to avoid handling host machine failure manually it seems to me Option 2 is a way to go. If I choose #1 I’ll have to manually reschedule workflow from scratch to reconstruct local machine state.
Artem,
Local activities are not a good fit for data processing scenarios as they are not tied to any specific host and can migrate across workers at any time.
Option 2 is similar to executing the whole process in a single activity. If you want to do this don’t use the local activity, but just a single normal activity that executes all the steps.
The design depends on what type of processing each chunk requires. If it is purely CPU intensive work then having an activity that iterates over records and executes the logic one chunk at a time hertbeating progress back is a way to go. You can consider running multiple such activities on different boxes if downloading the file (or part of the file) to each box is feasible.
But if processing each chunk is a complex operation that makes remote calls and can have its own lifecycle then you might end up creating a separate workflow for each chunk.
Thank you Maxim,
Each chunk represents a create or update RPC call to a remote server. I want to execute those as fast as possible. Putting everything in a single activity would work, but that would eliminate all of the checkpointing that activities provide in case a replay on a different host is necessary.
It sounds like you recommend reading a big file from Cloud storage, split it into separate work items and write smaller files back to Cloud Storage. Finally, read those smaller files back in subsequent activities that process chunks of data.
Let me use a simpler scenario to clarify my understanding.
Let’s say workflow does the following:
Read media file from Cloud Storage (100Mb), save to local file
Transcode media file (from local file)
Save the result in Cloud Storage (from local file)
Let’s say I want to have 1, 2 and 3 as separate activities for checkpointing purposes. This means I have to use a host-specific task to make sure 2 and 3 run on the same host as #1.
What are possible/recommended ways to keep 1, 2 and 3 as separate activities. And at the same time allow Workflow to automatically retry on a different host successfully if workflow machine fails during #2 or #3? My understanding is that when workflow recovers on a different host, #1 will not execute again as it was previously complete and completion was recorded in the Temporal backend. That means #2 and #3 can’t succeed (local file does not exist).
That would mean the whole workflow would have to be cancelled and scheduled again by some external process.
It seems that only way to avoid that it move #1 from Activity to the Workflow itself doing IO. When workflow is replayed, the file will be read and stored at a new local host. With name accessible to subsequent activities.
Or is there some configuration setting that can force Activity #1 to rerun after workflow execution is moved to a different host?
Each chunk represents a create or update RPC call to a remote server. I want to execute those as fast as possible. Putting everything in a single activity would work, but that would eliminate all of the checkpointing that activities provide in case a replay on a different host is necessary.
Not all as an activity can save progress information in the heartbeat.
It sounds like you recommend reading a big file from Cloud storage, split it into separate work items and write smaller files back to Cloud Storage. Finally, read those smaller files back in subsequent activities that process chunks of data.
AFAIK at least S3 supports reading big files from any offset. So each activity would be able to download its part of the file directly from the blob store.
What are possible/recommended ways to keep 1, 2 and 3 as separate activities. And at the same time allow Workflow to automatically retry on a different host successfully if workflow machine fails during #2 or #3? My understanding is that when workflow recovers on a different host, #1 will not execute again as it was previously complete and completion was recorded in the Temporal backend. That means #2 and #3 can’t succeed (local file does not exist).
I think there is some confusion here. Use host-specific task lists to route activities only. Workflows are never expected to be linked to any specific machine. So if a workflow machine fails then no action is needed as the workflow is automatically migrated to any other machine. If the machine that executes activities fails then you have to execute the whole sequence from the beginning on a different machine as the cached file is lost.
See fileprocessing sample that demonstrates exactly this technique (Java and Go versions). Note that the Go version uses the higher-level abstraction of a Session which Java doesn’t implement yet.
Let’s say I used Workflow.retry(execute(activity1, activity2 (host-specific), activity3 (host-specific)))
. Let’s assume activity1, activity2 finished successfully. But activity3 failed.
What will be retried?
I assume #1 will happen (because framework don’t know whether #2 results can be reused when running in a new host). Does some method exist in the framework that would allow #2 to not be retried magically? Retry all non host specific activities that incomplete host-specific activities depend on. Reuse activity result, otherwise?
Actually, will
Workflow.retry(execute(activity1));
execute(activity2);
execute(activity3);
do what I want? Retry activity1 on any failure, execute activity2 & activity3 outside of retry clause, only if they failed before.
I’m confused about your requirements. I thought that you want to retry all three activities in the host goes down. So
Workflow.retry(() -> {
execute(activity1);
execute(activity2 (host-specific));
activity3 (host-specific)
});
implements this;
If you want to skip retrying activity2 in case of the retry you can do something like:
boolean activity2Executed = false;
Workflow.retry(() -> {
execute(activity1);
if (!activity2Executed) {
execute(activity2 (host-specific));
activity2Executed = true;
}
activity3 (host-specific)
});
Note that there is no “magically” in the workflow code. It is just code and you use your normal programming techniques to achieve what you want.
I want fastest possible, maximum parallelism possible execution that has as close as possible to exactly once semantics. Trying to understand the limits of the framework of what it will do for me. And what I’ll have to implement myself. I’ll play more with it to see what happens with different kinds of failure.
So trying to achieve something like below:
Workflow.retry(() -> {
result1 = execute(activity1); // I want this to execute as it populates machine specific context.
result2 = execute(activity2(result1)); // I want this to return recorded activity result during retry instead of brand new execution.
result3 = execute(activity3(result1, result2)); // I want this to execute as it failed before.
});
I think the following implements the requirements:
Activity2ResultType activity2Result;
Workflow.retry(() -> {
execute(activity1);
if (activy2Result == null) {
activity2Result = execute(activity2 (host-specific));
}
activity3 (host-specific)
});
Hi Maxim, I have a follow-up question here.
in this case, I am thinking of changing all activities into local activities. But I am concerned about the 2 cases, I want to explain them using this pseudo-code:
1. activity-1: save some data
2. activity-2: query data-A from DB
3. some workflow logic
4. activity-3: send out a Kafka message
5. wait on a signal based on
6. activity-4: send out a Kafka message
7. activity-5: save some data
concern 1) more re-execution in abnormal cases: since “local activities only send an event to the cloud when all the locals are finished while normal activity sends the event to the cloud one by one” if I change them to local activities if it breaks in No4 halfway, all of the activities from No1-No4 are missing from temporal’s cloud sight, and will be re-executed next time.
concern 2) you mentioned that retries are always okay for normal activities. I want these activities to retry forever, will the local activities stop me from that?
The change to local comes from a simple idea that since the activity messages go from DC-1 to DC-2, it costs too much if the contents of the activity is just sending out some message to kafka or query some data from our local DC’s cluster.
concern 1)
This is correct. The whole sequence from 1 to 4 will be reexecuted on worker failure. A worker failure is expected a pretty rare event. So I wouldn’t be too concerned about a performance hit in this case. Obviously, you need to make sure that such re-execution doesn’t break functional correctness.
concern 2)
Local activities can be retried for a long time. The problem is that each retry appends multiple events to the history. So very long retries can lead to a workflow history exceeding its limit.
The change to local comes from a simple idea that since the activity messages go from DC-1 to DC-2, it costs too much if the contents of the activity is just sending out some message to kafka or query some data from our local DC’s cluster.
I would measure the performance with normal activities in your setup and start thinking about optimizations only if needed.
I made some experiments and found that :
1. activity-1: save some data
2. activity-2: query data-A from DB
3. some workflow logic
4. activity-3: send out a Kafka message
5. wait on a signal based on
6. activity-4: send out a Kafka message
7. activity-5: save some data
for concern 1):
I find that if LocalActivity-1 succeeded and LocalActivity-2 failed, the SDK uploaded MarkerRecord of success of 1 to the cloud so that it does not retry from the beginning, and start another round of workflow tasks from LocalActivity -2; If both of the local activities succeeded, the SDK just upload all the markerRecords of them together to the cloud.
So what do you mean by referring to the failure of the workflow and restart from the beginning? Seem some gap in understanding here.
another finding:
for local activity, the event history doesn’t record its name …emm hard to read from the events
another suggestion:
the input window is very small, and I feel clumsy putting complex ideas and well-formatted explanations in it.
So what do you mean by referring to the failure of the workflow and restart from the beginning? Seem some gap in understanding here.
I meant not a particular activity failure, but a crash of the workflow worker while the second activity is executing. It would cause workflow task retry that would reexecute both activities.
Please file issues for the UI improvements.
I seem to find another question about local activity. I read some go-SDK code, not very sure if the local activity affects queries? My basic understanding is:
if this is the case, I suppose workflow execution should use one thread for local activities and one thread for pure orchestration…T_T doing them in the same processTask function seems to make things complicated
Yes, local activities do delay queries as a query cannot be delivered to the worker while local activity is running until the workflow task heartbeat. So if a low query latency is a requirement then local activities are not a good fit.