yes, I did double-check that part.
In my setup, the workflow is started via gRPC, and right after that I call an Update API that returns the workflow state. Before any branching/processing happens, the workflow always executes one initial activity that loads the workflow definition from the DB.
The issue only appears when this DB-loading step is a local activity. When it’s local, the Update handler returns a reduced state (only the initial inputs). When I switch that same step back to a normal activity, the update returns the full state as expected.
I’m sharing a minimal version of the workflow below — all internal logic removed — just to show the structure and where the initial activity is executed. My goal here is to understand why using a local activity affects the workflow state seen by the Update call.
package workflow
import (
“encoding/json”
“fmt”
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
// Generic public-safe structs representing workflow metadata and definitions.
type FlowVariables struct{}
type WorkflowContext struct{}
type WorkflowDefinition struct {
Model
TaskNode
VarMap map[string]VariableDef
}
type TaskNode struct {
ID string
Name string
TaskType string
Next *NextNode
Input map[string]any
}
type NextNode struct{ Node string }
type VariableDef struct{ Value any }
// Placeholder activity handler for demonstration.
type Activities struct{}
// Example dynamic workflow structure demonstrating:
// 1. Loading workflow definition
// 2. Walking through tasks dynamically
// 3. Executing activities/local activities based on task type
func DynamicWorkflow(ctx workflow.Context, reference string, fv *FlowVariables) (*FlowVariables, error) {
// Load workflow model (typically from a DB or external storage)
var workflowMap map[string]any
loadCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 0,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: 0,
BackoffCoefficient: 0,
MaximumAttempts: 0,
},
})
// All sensitive logic removed — only illustrating flow
err := workflow.ExecuteActivity(loadCtx, "LoadWorkflowDefinition", reference).Get(loadCtx, &workflowMap)
if err != nil {
return nil, fmt.Errorf("failed to load workflow definition: %w", err)
}
// Convert the fetched map into a structured workflow definition
def, err := parseMapToWorkflowDefinition(workflowMap)
if err != nil {
return nil, err
}
// Configure Local Activity options for task execution
ctx = workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 0,
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 0,
},
})
activities := &Activities{}
// Build lookup for tasks by ID to support dynamic workflow jumps
taskMap := map[string]*TaskNode{}
for i := range def.Model {
taskMap[def.Model[i].ID] = &def.Model[i]
}
// Find the START node of the workflow
var current *TaskNode
for _, node := range def.Model {
if node.TaskType == "start" {
current = &node
break
}
}
// Main execution loop — walks through tasks based on dynamic rules
for current != nil {
switch current.TaskType {
case "start":
// Simply proceed to the next node
if current.Next != nil {
current = taskMap[current.Next.Node]
continue
}
current = nil
continue
case "condition":
// Branching logic hidden; demonstration only
// Conceptually: evaluate branch expressions → pick matching next node
current = pickNextBranchBasedOnConditions(current, taskMap)
continue
case "end":
// Marks completion of the workflow
current = nil
continue
default:
// Execute a task via a Local Activity.
// Actual internal logic is hidden; this demonstrates the pattern only.
result := &WorkflowContext{}
future := workflow.ExecuteLocalActivity(
ctx,
activities.RunGenericActivity, // generic placeholder
current.ID,
current.Name,
current.TaskType,
current.Input,
)
if err := future.Get(ctx, &result); err != nil {
return nil, fmt.Errorf("activity failed: %w", err)
}
// Move to next task if defined
if current.Next != nil {
current = taskMap[current.Next.Node]
} else {
current = nil
}
}
}
return fv, nil
}
// Converts an untyped map into a typed workflow definition.
// Safe for public demonstration.
func parseMapToWorkflowDefinition(m map[string]any) (*WorkflowDefinition, error) {
b, err := json.Marshal(m)
if err != nil {
return nil, err
}
var def WorkflowDefinition
if err := json.Unmarshal(b, &def); err != nil {
return nil, err
}
return &def, nil
}
// Simplified branching helper for demonstration.
// Real branching conditions are intentionally not included.
func pickNextBranchBasedOnConditions(node *TaskNode, taskMap map[string]*TaskNode) *TaskNode {
if node.Next != nil {
return taskMap[node.Next.Node]
}
return nil
}