I have an implementation with 2 primary activities (please see the code below). The loader activity is kicked off at first to send all the signals for the describe activities to get kicked off. But when trying to test for failure on loader activity, i get “potential deadlock detected error”
full error:
Error: Error "workflow execution error (type: SyncWorkflow, workflowID: default-test-workflow-id, runID: default-test-run-id): [TMPRL1101] Potential deadlock detected: workflow goroutine \"root\" didn't yield for over a second" does not contain "loader activity failed"
func SyncWorkflow(ctx workflow.Context, w WorkflowArgs) error {
logger := workflow.GetLogger(ctx)
metrics := &WorkflowMetrics{}
ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
TaskQueue: sync.TaskQueue,
StartToCloseTimeout: time.Minute * 15,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: 100 * time.Second,
MaximumAttempts: 3,
},
})
info := workflow.GetInfo(ctx)
//var loaderError error
var totalSignals int
loaderErr := workflow.ExecuteActivity(ctx, shared.LoaderActivity, w.Notification, info.WorkflowExecution.ID, info.WorkflowExecution.RunID).Get(ctx, &totalSignals)
if loaderErr != nil {
logger.Error("Loader activity failed", "error", loaderErr)
return loaderErr
}
signalChan := workflow.GetSignalChannel(ctx, shared.IDSignal)
selector := workflow.NewSelector(ctx)
// Handle incoming signals
selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
var subID string
c.Receive(ctx, &subID)
metrics.DescribeStarted++
fut := workflow.ExecuteActivity(ctx, shared.DescribeSyncActivity, w.Notification, subID)
selector.AddFuture(fut, func(f workflow.Future) {
err := f.Get(ctx, nil)
if err != nil {
logger.Error("Describe activity failed", "Subscription ID", subID, "error", err)
} else {
logger.Info("Describe activity completed", "Subscription ID", subID)
}
})
})
// Main workflow loop
for totalSignals > 0 && metrics.DescribeStarted < totalSignals {
logger.Info("Waiting for activities to complete", "describeStarted", metrics.DescribeStarted, "totalSignals", totalSignals)
selector.Select(ctx)
}
logger.Info("All activities completed")
return nil
}
Below is the test setup
type UnitTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
env *testsuite.TestWorkflowEnvironment
}
func (s *UnitTestSuite) SetupTest() {
s.env = s.NewTestWorkflowEnvironment()
logger := zerolog.New(zerolog.NewConsoleWriter()).With().Logger()
s.SetLogger(utils.NewZeroLoggerAdapter(logger))
}
func TestUnitTestSuite(t *testing.T) {
suite.Run(t, new(UnitTestSuite))
}
func (s *UnitTestSuite) Test_LoaderActivity_Failure() {
ctrl := gomock.NewController(s.T())
defer ctrl.Finish()
temporalMock := mocks.NewMockTemporal(ctrl)
lHandler := shared.NewLoaderActivityHandler(shared.LoaderParams{
Client: temporalMock,
})
// Mock LoaderActivity
s.env.OnActivity(lHandler.LoaderActivity, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(0, errors.New("loader activity failed")).Times(3) // as activities are set to run thrice on failure
// Prepare workflow arguments
notification := entity.Notification{
RequestID: "test-request-id",
Tables: nil,
SkipDependentTables: false,
Details: &pb.Details{SubscriptionId: []string{"subscription1", "subscription2"}},
}
// Execute the workflow
s.env.ExecuteWorkflow(SyncWorkflow, WorkflowArgs{Notification: notification})
// Assertions
s.True(s.env.IsWorkflowCompleted())
err := s.env.GetWorkflowError()
s.ErrorContains(err, "loader activity failed")
}
is there something that might be causing the loader to not yield?