Suggestions to increase worker throughput

I am evaluating Temporal for a large scale cloud workflow plus async/batch job engine, but I need some help to tune performance at various levels. Performance-related topics in this forum did not help much.

My goal is to scale the workers horizontally and vertically. I am using a slightly modified version of the money-transfer workflow from examples to add some load to the system and measure activity throughput. But I am not able to get the total running activity count at any time beyond the 35-45 range, even with several thousands of workflows pending in the queue. I see the same throughput with two workers or ten workers. CPU usage on individual worker processes stays in the single-digit range most of the time. The docker host (24 HT cores, 128GB memory) is generally around 20% CPU usage with ~100GB free memory. I am using MySQL for persistence.

Changing matching.numTaskqueueReadPartitions to 10 (from default 4) did not seem to help. The bottleneck seems to be somewhere else.

Can you please point me to other performance tuning options? Thanks in advance.

Is this the server host?

Could you provide the service configuration as well as the workflow logic sample?

Make sure the number of history shards is large enough (for your load test, try 512 to 2K):

Thanks.

Is this the server host?

Yes. This is the server that runs all the dockers.

Could you provide the service configuration as well as the workflow logic sample?

The code is essentially same as MoneyTransfer Workflow example.

Make sure the number of history shards is large enough (for your load test, try 512 to 2K):

Can I add the history shards setting to dynamic config like this?

persistence:
  numHistoryShards: 1000

This worked for me:
worker.Options{

MaxConcurrentWorkflowTaskPollers: 40,
MaxConcurrentActivityTaskPollers: 40,

}

I tried to increase workflow and activity poll thread count to 40, but I don’t think it is helping. Not sure if I got the config right. I’ve copied the worker setup code below.

Any help to make progress here is appreciated. I guess it will be good to have a tutorial page on general performance tuning options.

@Singleton
public class WorkerService {
    private final Logger logger;
    private final Configuration configuration;
    private final MetricRegistry metricRegistry;
    private final HdrBuilder hdrBuilder;

    @Inject
    WorkerService(Configuration configuration, MetricRegistry metricRegistry, HdrBuilder hdrBuilder) {
        this.configuration = configuration;
        this.metricRegistry = metricRegistry;
        this.hdrBuilder = hdrBuilder;
        this.logger = LoggerFactory.getLogger(WorkerService.class);
    }

    public void run() {
        // WorkflowServiceStubs is a gRPC stubs wrapper that talks to the local Docker instance of the Temporal server.
        String temporalHostPort = configuration.getTemporalHost() + ":" + configuration.getTemporalPort();
        logger.info("Worker connecting to temporal service: " + temporalHostPort);
        WorkflowServiceStubsOptions workflowServiceStubsOptions =
                WorkflowServiceStubsOptions.newBuilder(WorkflowServiceStubsOptions.getDefaultInstance()).setTarget(temporalHostPort).build();
        WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(workflowServiceStubsOptions);
        WorkflowClient client = WorkflowClient.newInstance(service);
        // Worker factory is used to create Workers that poll specific Task Queues.
        WorkerFactoryOptions workerFactoryOptions = WorkerFactoryOptions.newBuilder().setWorkflowHostLocalPollThreadCount(40).build();
        WorkerFactory factory = WorkerFactory.newInstance(client, workerFactoryOptions);
        WorkerOptions workerOptions = WorkerOptions.newBuilder().setWorkflowPollThreadCount(40).setActivityPollThreadCount(40).build();
        Worker worker = factory.newWorker(Shared.MONEY_TRANSFER_TASK_QUEUE, workerOptions);
        worker.addWorkflowImplementationFactory(MoneyTransferWorkflow.class, () -> {
            return (MoneyTransferWorkflow) new MoneyTransferWorkflowImpl(metricRegistry, hdrBuilder);
        });
        // Activities are stateless and thread safe so a shared instance is used.
        worker.registerActivitiesImplementations(new AccountActivityImpl(metricRegistry));
        // Start listening to the Task Queue.
        factory.start();
    }
}

You have to wipe out and recreate the database after the shard count change. Just changing the config after DB was initialized is not going to change the number of shards.

Thanks. I had a quick call with Samar and it was very helpful. I am making progress. I will post my findings here soon.

With 1024 history shards, I can now vertically scale various temporal components in my first round of perf tests. Mysql DB was the second bottleneck (after history shards). With better DB system specs, I am able to get almost six times the worker throughput compared to the earlier number with four history shards.

Thanks for everyone’s help here.

3 Likes