Please find below the test code used :
func ParentWorkflow(ctx workflow.Context, sleepDuration time.Duration) error {
terminalResultChannel := workflow.NewChannel(ctx)
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
workflowID := workflow.GetInfo(ctx).WorkflowExecution.ID
var msg DummyRequest
msg.Id = "1234"
var a *Activities // Used to call Activities by function pointer
defer func() {
// When the Workflow is canceled, it has to get a new disconnected context to execute any Activities
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err := workflow.ExecuteActivity(newCtx, a.CleanupActivity, " *** parent *** ").Get(ctx, nil)
if err != nil {
logger.Error("CleanupActivity failed", "Error", err)
}
}()
// Sleep till the scheduled time arrives
logger.Info("sleepDuration = ", sleepDuration)
if sleepDuration > 0 {
logger.Info("Sleep for ", sleepDuration, " seconds")
_ = workflow.Sleep(ctx, sleepDuration)
logger.Info("Woke up from sleep after ", sleepDuration, " seconds")
}
devPrefix := "D0001"
numOfRuns := 2
for i := 0; i < numOfRuns; i++ {
device := devPrefix + strconv.Itoa(i)
workflow.Go(ctx, func(ctx workflow.Context) {
var result DummyResponse
cwo := workflow.ChildWorkflowOptions{
WorkflowID: workflowID + "_" + device,
//ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
}
ctx = workflow.WithChildOptions(ctx, cwo)
err := workflow.ExecuteChildWorkflow(ctx, ChildWorkflow, device, workflowID, msg).Get(ctx, &result)
logger.Info("Parent received response from child . ", result)
if err != nil {
logger.Info("Failed for ", device, ", error = ", zap.Error(err))
}
terminalResultChannel.Send(ctx, result)
})
}
var results []DummyResponse
for i := 0; i < numOfDevices; i++ {
var result DummyResponse
terminalResultChannel.Receive(ctx, &result)
results = append(results, result)
}
log.Println("Parent Workflow Execution complete. ", results)
return nil
}
func ChildWorkflow(ctx workflow.Context, device string, workflowid string, msg DummyRequest) (DummyResponse, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
//HeartbeatTimeout: 5 * time.Second,
WaitForCancellation: true,
//ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
var a *Activities // Used to call Activities by function pointer
defer func() {
// When the Workflow is canceled, it has to get a new disconnected context to execute any Activities
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err := workflow.ExecuteActivity(newCtx, a.CleanupActivity, " *** child *** ").Get(ctx, nil)
if err != nil {
logger.Error("CleanupActivity failed", "Error", err)
}
}()
var result string
err := workflow.ExecuteActivity(ctx, a.ActivityToBeCanceled, device).Get(ctx, &result)
logger.Info(fmt.Sprintf("ActivityToBeCanceled returns %v, %v", result, err))
var event DummyResponse
signalName := SIGNAL_PREFIX + device
statusCh := workflow.GetSignalChannel(ctx, signalName)
selector := workflow.NewSelector(ctx)
selector.AddReceive(statusCh, func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &event)
logger.Info("ChildWorkflow Received signal ", signalName, " with event = ", event)
})
selector.AddReceive(ctx.Done(), func(c workflow.ReceiveChannel, more bool) {
logger.Info("!!!!!!!!!!! ChildWorkflow got cancelled !!!!!!!!!!!")
event.Result = "Cancelled"
})
selector.Select(ctx)
log.Println("Child Workflow Execution complete.", event)
return event, nil
}
type Activities struct{}
func (a *Activities) ActivityToBeCanceled(ctx context.Context, device string) (string, error) {
logger := activity.GetLogger(ctx)
return "sent", nil
}
func (a *Activities) CleanupActivity(ctx context.Context, from string) error {
logger := activity.GetLogger(ctx)
logger.Info("Cleanup Activity started for " + from)
return nil
}