Hi again all, I am implementing querying in our long-running workflow processes and have been having trouble.
I’m trying to pass an activity stub to our main workflow activity process, which fires a data pipeline. The activity stub would call an activity method for each log message that the pipeline makes. I’m confused because when I pass the activity stub into the method, the call stack blows.
I’ve been led to understand that periodic updates from a workflow activity could also be implemented as a heartbeat – though it’s difficult to find any information on what this would look like in practice. If I were to need the values from each heartbeat, where would I be able to retrieve those? Would they be yielded from the activity method call as if it were a generator?
Would you post the code snippet that explains what you are trying to do? I don’t understand what
pass an activity stub to our main workflow activity process
means.
The activity heartbeat value is not delivered to the workflow unless activity times out. Then the last value is available as part of the timeout exception.
Would you describe the actual business logic you are trying to implement? I might help you with the Temporal friendly design.
Yes – the business logic is to get live data from our workflow execution while it is running – ideally we would be able to get a stream of the logs from the process using a query method. As for code, I have a snippet:
Inside of the tapToTarget method, there are calls to a method to process the logs line-by-line and in each line, a method from singerActivityCallback is called. In practice, though, this causes a stack overflow error
Now I understand. What you ask for is possible to implement at the framework level, but it is not currently supported. The activity stub must be used only inside the workflow code and doesn’t make sense inside of another activity.
You can achieve the same result using signals. An activity that wants to send information to the workflow can signal it as many times as needed. But be careful about passing large amounts of data through a single workflow. The temporal system doesn’t scale up in the size of a single workflow, it scales out with the number of workflow instances (we call them executions).
How much data do you want to stream from the activity into the workflow? What is the actual use case?
Thank you for the detailed reply @maxim – I appreciate it greatly.
You’re suggesting to send signals from the activity to the workflow, correct? Would you recommend creating a workflow stub within the activity to invoke the signal method? Or would some other approach be better suited?
The amount of data isn’t great, most likely fewer than 100 log lines per invocation of the activity. The use case is being able to monitor our long-running ETL processes during their execution.
You’re suggesting to send signals from the activity to the workflow, correct? Would you recommend creating a workflow stub within the activity to invoke the signal method? Or would some other approach be better suited?
Yes, create a workflow stub using WorkflowClient to send the signals.
The amount of data isn’t great, most likely fewer than 100 log lines per invocation of the activity. The use case is being able to monitor our long-running ETL processes during their execution.
Assuming that the total number of such signals is bounded it is would work. Another more scalable option is to store these logs in some other storage (s3?) and only return pointers to them as an activity result.
Excellent, thank you @maxim! The possibility of sending logs to s3 is very interesting, and may be a better long term solution as we build more functionality into our temporal workers.
The general pattern is to not pass large amounts of data through a workflow. Use some external store or task routing to a specific process and pass around references.