My changes to Workflow definition is not reflecting


I’ve changed the child Workflow ID to use UUID.new() but it still shows CHILD_WORKFLOW in the ID. How to purge this behavior. Is there a command to restart the cluster?

package workflow

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
	"time"

	"go.temporal.io/api/enums/v1"
	"go.temporal.io/sdk/activity"
	"go.temporal.io/sdk/workflow"

	_ "go.temporal.io/sdk/contrib/tools/workflowcheck/determinism"

	"github.com/google/uuid"
)

type contextKey string

type MapperStruct struct {
	Type    string            `json:"type"`
	LHS     string            `json:"lhs"`
	LHSID   string            `json:"lhsId"`
	RHS     string            `json:"rhs"`
	RHSID   string            `json:"rhsId"`
	Mapping map[string]string `json:"mapping"`
}

type HTTPLoad struct {
	Method       string
	URL          string
	QueryParams  map[string]string
	Headers      map[string]string
	Triggers     []string
	NodeID       string
	Body         string
	WorkflowID   string
	NodeName     string
	ActivityName string
	Mapper       []MapperStruct
}

var HTTPNodePayloadKey contextKey = "workflow_payload"
var HTTPNodeParamKey contextKey = "activity_params"

type HTTPNodeActivity struct {
	Method      string
	URL         string
	QueryParams map[string]string
	Headers     map[string]string
	Triggers    []string
	NodeID      string
	Body        string
	WorkflowID  string
	NodeName    string
}

type HTTPNodeWorkFlowParam struct {
	Payload    map[string]*HTTPLoad
	Name       string
	WorkflowID string
}

type ChildWorkflowResponse struct {
	Result string
	Err    error
}

// Workflow is a Hello World workflow definition.
func HTTPNodeWorkflow(ctx workflow.Context, param HTTPNodeWorkFlowParam) error {
	var childResult string
	ao := workflow.ActivityOptions{
		StartToCloseTimeout: 10 * time.Second,
	}
	ctx = workflow.WithActivityOptions(ctx, ao)

	logger := workflow.GetLogger(ctx)
	logger.Info("HTTPNode workflow started", "name", param.Name)

	var result string
	var childResponse workflow.Execution

	childWorkflowOptions := workflow.ChildWorkflowOptions{
		ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
		WorkflowID:        uuid.New().String(),
	}

	ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)

	childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, HTTPNodeChildWorkflowDefinition, param.Payload, childResult, 0, len(param.Payload))

	// Wait for the Child Workflow Execution to spawn

	if err := childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childResponse); err != nil {
		logger.Error("Parent execution received child execution failure.", "Error", err)
		return err
	}

	logger.Info("Parent execution completed.", "Result", result)
	return nil
}

func HTTPNodeChildWorkflowDefinition(ctx workflow.Context, params map[string]HTTPLoad, result string, totalCount, runCount int) (string, error) {
	logger := workflow.GetLogger(ctx)
	ao := workflow.ActivityOptions{
		StartToCloseTimeout: 10 * time.Second,
	}
	ctx = workflow.WithActivityOptions(ctx, ao)

	logger.Info("Child workflow execution started.")

	if runCount < 0 {
		logger.Error("Invalid valid for run count.", "RunCount", runCount)
		return "", errors.New("invalid run count")
	}

	var payload HTTPLoad

	if runCount > 0 {
		var cnt int = 0
		for thisid, param := range params {
			if cnt == totalCount {
				// Iterate over the params at each child workflow execution
				payload = param

				// Schema Mapping
				// Check if Result is not empty and method is POST
				if result != "" && param.Method == "POST" {
					// Initate maps for result and body store
					resultMap := make(map[string]string, 0)
					body := make(map[string]string, 0)

					// JSON unmarshal from string to Map
					json.Unmarshal([]byte(result), &resultMap)

					// Mapper
					for _, prm := range param.Mapper {
						// Check if RHSID is same as current context ID
						if prm.RHSID == thisid {
							// Iterate Over Mapping Map
							for k, v := range prm.Mapping {
								body[k] = resultMap[v]
							}
							// Marshal bck to byte
							out, _ := json.Marshal(body)
							// and then convert byte to string to assign it to payload Body
							payload.Body = string(out)
						}
					}
				}
			}
			// Increment the count for this loop, too mantain the count
			cnt++
		}
	}

	if runCount == 0 {
		result := fmt.Sprintf("Child workflow execution completed after %v runs", totalCount)
		logger.Info("Child workflow completed.", "Result", result)
		return result, nil
	}

	totalCount++
	runCount--

	err := workflow.ExecuteActivity(ctx, HTTPNodeActivityDefinition, payload).Get(ctx, &result)
	if err != nil {
		logger.Error("Activity failed.", "Error", err)
		return "", err
	}

	logger.Info("Child workflow starting new run.", "RunCount", runCount, "TotalCount", totalCount)

	childWorkflowOptions := workflow.ChildWorkflowOptions{
		ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
		WorkflowID:        uuid.New().String(),
	}

	ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)

	return result, workflow.NewContinueAsNewError(ctx, HTTPNodeChildWorkflowDefinition, params, result, totalCount, runCount)
}

func HTTPNodeActivityDefinition(ctx context.Context, params HTTPLoad) (string, error) {
	logger := activity.GetLogger(ctx)
	logger.Info("Activity", "name", params.ActivityName)

	var req *http.Request
	var httpErr error
	var bd map[string]interface{}

	// HTTP Client
	client := &http.Client{}

	if params.Method == "GET" {
		req, httpErr = http.NewRequest(params.Method, params.URL, nil)
	} else if params.Method == "POST" {
		b := new(bytes.Buffer)
		json.Unmarshal([]byte(params.Body), bd)
		json.NewEncoder(b).Encode(params.Body)
		req, httpErr = http.NewRequest(params.Method, params.URL, b)
		req.Header.Set("Content-Type", "application/json")
	}

	if httpErr != nil {
		logger.Info("HTTP Error", httpErr)
		return "HTTP Error", httpErr
	}

	for k, v := range params.Headers {
		req.Header.Set(k, v)
	}

	q := req.URL.Query()
	for k, v := range params.QueryParams {
		q.Add(k, v)
	}

	req.URL.RawQuery = q.Encode()

	// Execute client with request
	resp, clientErr := client.Do(req)

	// HTTP Client Error
	if clientErr != nil {
		log.Print("HTTP Error", clientErr)
		return "HTTP client error", clientErr
	}

	// Parse Response
	parsedResp, readErr := io.ReadAll(resp.Body)

	// Read Error
	if readErr != nil {
		log.Print("Error while parsing response body")
		return "Error while parsing response body", readErr
	}

	return string(parsedResp), nil
}

Hello @Tirumal_Rao

Workflow and activity code is executed by your workers, Have you redeployed/restarted your workers after making the change?

or maybe you have more than one worker running? you can verify it with tctl

tctl taskqueue describe --taskqueue <taskqueueName>

1 Like

Hello @antonio.perez ,

Thanks for the reply. Yes, I’ve restarted the worker pod on k8s. And there is only one replica of workers running right now.

I got it, I didn’t re-deployed the worker on the cluster. Let me deploy it and check. Thanks :slight_smile:

After building and deploying the worker, I got it working. Thanks for helping me out. :slight_smile:

Don’t use uuid.New() in the workflow code. It is not deterministic and is going to break your workflows. Either use the SideEffect to generate UUID or don’t assign the child WorkflowID at all.

Also don’t iterate over maps in workflow code as map iteration is not deterministic in Go.

Please read the documentation that explains what type of code is prohibited in workflows.

1 Like

@maxim Thanks a lot for reviewing my code and pointing out the errors. I’m reading the documentation you’ve shared. I’ll make these changes accordingly and come back if have any doubts.

Thanks for your feedback, I really appreciate that :slight_smile:

Best regards.