Looking into this, i believe the issue is not in the 60 parallel activities, but rather in the signals listener go-routine.
The listener is in an infinite loop waiting for signals to process. For each signal, it will eval changes to a struct and trigger an activity that we call “Upsert” to send the updated version of that struct to an external API. This activity receives the struct as argument.
Created a sample code that simulate this in my local env
1 - add a go routine with a listener to a Signal channel and loops forever. Added a WaitGroup to prevent Main Workflow from completing
2 - Signal listener handler executes an activity passing a bigger struct ( ex : 100 kb ) as argument.
3 - The Activity return nil
Script a loop to send 500 signals. After some of the signals are sent, the worker suddenly stops processing the events.
In the UI, the workflow status is Terminated
On server, can see this error :
temporal_1 | {“level”:“error”,“ts”:“2020-12-07T18:22:59.087Z”,“msg”:“history size exceeds error limit.”,“service”:“history”,“shard-id”:1,“address”:“172.18.0.3:7234”,“shard-item”:“0xc0004a5580”,“component”:“history-cache”,“wf-namespace-id”:“b0908e9c-8fbe-49f2-88b5-18c06e448a1e”,“wf-id”:“hello_world_workflowID”,“wf-run-id”:“27e91634-bca7-4f40-81e2-c1450dc55c0a”,“wf-history-size”:52466599,“wf-event-count”:3628,“logging-call-at”:“workflowExecutionContext.go:1207”,“stacktrace”:“go.temporal.io/server/common/log/loggerimpl.(*loggerImpl).Error\n\t/temporal/common/log/loggerimpl/logger.go:138\ngo.temporal.io/server/service/history.(*workflowExecutionContextImpl).enforceSizeCheck\n\t/temporal/service/history/workflowExecutionContext.go:1207\ngo.temporal.io/server/service/history.(*workflowExecutionContextImpl).updateWorkflowExecutionAsActive\n\t/temporal/service/history/workflowExecutionContext.go:596\ngo.temporal.io/server/service/history.(*historyEngineImpl).updateWorkflowHelper\n\t/temporal/service/history/historyEngine.go:2438\ngo.temporal.io/server/service/history.(*historyEngineImpl).updateWorkflowExecutionWithAction\n\t/temporal/service/history/historyEngine.go:2392\ngo.temporal.io/server/service/history.(*historyEngineImpl).updateWorkflowExecution\n\t/temporal/service/history/historyEngine.go:2462\ngo.temporal.io/server/service/history.(*historyEngineImpl).RecordActivityTaskStarted\n\t/temporal/service/history/historyEngine.go:1308\ngo.temporal.io/server/service/history.(*Handler).RecordActivityTaskStarted\n\t/temporal/service/history/handler.go:333\ngo.temporal.io/server/api/historyservice/v1._HistoryService_RecordActivityTaskStarted_Handler.func1\n\t/temporal/api/historyservice/v1/service.pb.go:947\ngo.temporal.io/server/common/rpc.ServiceErrorInterceptor\n\t/temporal/common/rpc/grpc.go:100\ngo.temporal.io/server/api/historyservice/v1._HistoryService_RecordActivityTaskStarted_Handler\n\t/temporal/api/historyservice/v1/service.pb.go:949\ngoogle.golang.org/grpc.(*Server).processUnaryRPC\n\t/go/pkg/mod/google.golang.org/grpc@v1.33.2/server.go:1210\ngoogle.golang.org/grpc.(*Server).handleStream\n\t/go/pkg/mod/google.golang.org/grpc@v1.33.2/server.go:1533\ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2\n\t/go/pkg/mod/google.golang.org/grpc@v1.33.2/server.go:871”}
The logic solution to this is to stop passing the 100kb as arg to the Activity. The issue is that in our use case, we really need to send the struct to the Activity. Another approach would be to compress the arg, but again if the history of events grows, we will have issues again.
@maxim glad to send the sample if needed. ( Can’t attach here )
Thanks again.
Pedro Almeida