I’m looking for recommendations on how to handle different task queues in Temporal for production versus testing scenarios.
In our case, we have a workflow that distributes documents to different channels, and each channel has its own task queue and workers. Currently, these task queues are defined as constants. However, from what I understand, when testing a workflow, we’re limited to listening on a single task queue.
Since workflows don’t support dependency injection, I’m unsure how to override the task queues during testing. One idea is to pass the task queues as workflow arguments, and default to the constants if they’re null—but that feels a bit messy.
Is there a better pattern for this? For example, is it possible to configure alternate task queues when creating the worker in tests?
The start-local dev test server is a full server and can support as many task queues as needed.
If you have a workflow that sometimes needs to customize what it does or passes to things like activity based on the starter of the workflow (in this case tests), you have to have workflow code to support that customization.
Sure. Most users create a task queue as Guid.NewGuid().ToString() in their tests (including the SDK tests themselves). Same with workflow IDs.
Thanks, I realized that I can create multiple workers with unique task queues in the same test environment. I was stuck on thinking that I needed to use one worker, don’t know why.
But I had to get around it since we use the same activity interface for all our “channels” that we deliver documents in. And the workflow tries to match/send in each channel in a specific priority
Is this a reasonable way to run multiple workers in the test environment? Specifically using TaskCompletionSource to kill the workers when the main workflow is done
[Fact]
public async Task SendThrowsRetryableError_ScheduleToClose_TimeoutElapsed_Should_Skip_To_Next_Channel()
{
await using var env = await StartTestEnvironment();
string mainTaskQueue = $"task-queue-{Guid.NewGuid()}";
var taskQueueActivities = GetTaskQueueActivities(mainTaskQueue);
var commonActivities = new MockCommonActivities([ConsignmentChannel.Email, ConsignmentChannel.Print]);
var tcs = new TaskCompletionSource();
var emailSender = new MockRetryableSenderActivities(true, true, ConsignmentChannel.Email);
using var emailWorker = new TemporalWorker(env.Client, new TemporalWorkerOptions($"{mainTaskQueue}-{ConsignmentChannel.Email}")
.AddAllActivities(emailSender));
var emailWorkerRun = emailWorker.ExecuteAsync(async () => await tcs.Task);
var printSender = new MockSenderActivities(ConsignmentChannel.Print);
using var printWorker = new TemporalWorker(env.Client, new TemporalWorkerOptions($"{mainTaskQueue}-{ConsignmentChannel.Print}")
.AddAllActivities(printSender));
var printWorkerRun = printWorker.ExecuteAsync(async () => await tcs.Task);
var settingsActivities = new MockDistributionSettingsActivities(new ConsignmentDistributionSettings
{
WaitForSignalOnNonRetryableError = false,
RetryableErrorsTimeout = TimeSpan.FromSeconds(30)
});
using var workflowWorker = new TemporalWorker(env.Client, new TemporalWorkerOptions(mainTaskQueue)
.AddWorkflow<ConsignmentDistributionWorkflow>()
.AddAllActivities(commonActivities)
.AddAllActivities(taskQueueActivities)
.AddAllActivities(settingsActivities)
);
await workflowWorker.ExecuteAsync(async () =>
{
var handle = await env.Client.StartWorkflowAsync(
(ConsignmentDistributionWorkflow wf) =>
wf.Send(new ConsignmentWorkflowArguments(new ConsignmentKey(Guid.Empty))),
new(id: $"wf-{Guid.NewGuid()}", taskQueue: workflowWorker.Options.TaskQueue!));
var result = await handle.GetResultAsync();
Assert.True(result.Delivered);
Assert.Equal(ConsignmentChannel.Print, result.Channel);
tcs.SetResult();
await Task.WhenAll(emailWorkerRun, printWorkerRun);
});
}
Yes, if you need to run multiple workers (which is cheap), this is a reasonable way. You can also use a cancellation token to ExecuteAsync if that’s easier than a lambda + TaskCompletionSource for you. But it’s all basically the same thing.