I’ve changed the child Workflow ID to use UUID.new() but it still shows CHILD_WORKFLOW in the ID. How to purge this behavior. Is there a command to restart the cluster?
package workflow
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"time"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
_ "go.temporal.io/sdk/contrib/tools/workflowcheck/determinism"
"github.com/google/uuid"
)
type contextKey string
type MapperStruct struct {
Type string `json:"type"`
LHS string `json:"lhs"`
LHSID string `json:"lhsId"`
RHS string `json:"rhs"`
RHSID string `json:"rhsId"`
Mapping map[string]string `json:"mapping"`
}
type HTTPLoad struct {
Method string
URL string
QueryParams map[string]string
Headers map[string]string
Triggers []string
NodeID string
Body string
WorkflowID string
NodeName string
ActivityName string
Mapper []MapperStruct
}
var HTTPNodePayloadKey contextKey = "workflow_payload"
var HTTPNodeParamKey contextKey = "activity_params"
type HTTPNodeActivity struct {
Method string
URL string
QueryParams map[string]string
Headers map[string]string
Triggers []string
NodeID string
Body string
WorkflowID string
NodeName string
}
type HTTPNodeWorkFlowParam struct {
Payload map[string]*HTTPLoad
Name string
WorkflowID string
}
type ChildWorkflowResponse struct {
Result string
Err error
}
// Workflow is a Hello World workflow definition.
func HTTPNodeWorkflow(ctx workflow.Context, param HTTPNodeWorkFlowParam) error {
var childResult string
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("HTTPNode workflow started", "name", param.Name)
var result string
var childResponse workflow.Execution
childWorkflowOptions := workflow.ChildWorkflowOptions{
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
WorkflowID: uuid.New().String(),
}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, HTTPNodeChildWorkflowDefinition, param.Payload, childResult, 0, len(param.Payload))
// Wait for the Child Workflow Execution to spawn
if err := childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childResponse); err != nil {
logger.Error("Parent execution received child execution failure.", "Error", err)
return err
}
logger.Info("Parent execution completed.", "Result", result)
return nil
}
func HTTPNodeChildWorkflowDefinition(ctx workflow.Context, params map[string]HTTPLoad, result string, totalCount, runCount int) (string, error) {
logger := workflow.GetLogger(ctx)
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger.Info("Child workflow execution started.")
if runCount < 0 {
logger.Error("Invalid valid for run count.", "RunCount", runCount)
return "", errors.New("invalid run count")
}
var payload HTTPLoad
if runCount > 0 {
var cnt int = 0
for thisid, param := range params {
if cnt == totalCount {
// Iterate over the params at each child workflow execution
payload = param
// Schema Mapping
// Check if Result is not empty and method is POST
if result != "" && param.Method == "POST" {
// Initate maps for result and body store
resultMap := make(map[string]string, 0)
body := make(map[string]string, 0)
// JSON unmarshal from string to Map
json.Unmarshal([]byte(result), &resultMap)
// Mapper
for _, prm := range param.Mapper {
// Check if RHSID is same as current context ID
if prm.RHSID == thisid {
// Iterate Over Mapping Map
for k, v := range prm.Mapping {
body[k] = resultMap[v]
}
// Marshal bck to byte
out, _ := json.Marshal(body)
// and then convert byte to string to assign it to payload Body
payload.Body = string(out)
}
}
}
}
// Increment the count for this loop, too mantain the count
cnt++
}
}
if runCount == 0 {
result := fmt.Sprintf("Child workflow execution completed after %v runs", totalCount)
logger.Info("Child workflow completed.", "Result", result)
return result, nil
}
totalCount++
runCount--
err := workflow.ExecuteActivity(ctx, HTTPNodeActivityDefinition, payload).Get(ctx, &result)
if err != nil {
logger.Error("Activity failed.", "Error", err)
return "", err
}
logger.Info("Child workflow starting new run.", "RunCount", runCount, "TotalCount", totalCount)
childWorkflowOptions := workflow.ChildWorkflowOptions{
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
WorkflowID: uuid.New().String(),
}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
return result, workflow.NewContinueAsNewError(ctx, HTTPNodeChildWorkflowDefinition, params, result, totalCount, runCount)
}
func HTTPNodeActivityDefinition(ctx context.Context, params HTTPLoad) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info("Activity", "name", params.ActivityName)
var req *http.Request
var httpErr error
var bd map[string]interface{}
// HTTP Client
client := &http.Client{}
if params.Method == "GET" {
req, httpErr = http.NewRequest(params.Method, params.URL, nil)
} else if params.Method == "POST" {
b := new(bytes.Buffer)
json.Unmarshal([]byte(params.Body), bd)
json.NewEncoder(b).Encode(params.Body)
req, httpErr = http.NewRequest(params.Method, params.URL, b)
req.Header.Set("Content-Type", "application/json")
}
if httpErr != nil {
logger.Info("HTTP Error", httpErr)
return "HTTP Error", httpErr
}
for k, v := range params.Headers {
req.Header.Set(k, v)
}
q := req.URL.Query()
for k, v := range params.QueryParams {
q.Add(k, v)
}
req.URL.RawQuery = q.Encode()
// Execute client with request
resp, clientErr := client.Do(req)
// HTTP Client Error
if clientErr != nil {
log.Print("HTTP Error", clientErr)
return "HTTP client error", clientErr
}
// Parse Response
parsedResp, readErr := io.ReadAll(resp.Body)
// Read Error
if readErr != nil {
log.Print("Error while parsing response body")
return "Error while parsing response body", readErr
}
return string(parsedResp), nil
}