Parameters for Dynamic Workflows

Hey there! I have a use case where I’m trying to supply a parameter (struct) to a dynamic workflow (i.e. ones started with ExecuteWorkflow(ctx, opts, "WorkflowName", <args>)).

There is an example here which shows how to start a workflow by name (as above) but without any arguments.

Essentially, I have something like this:

type (
    Args struct {
        Prop1 string
        Prop2 string
    }

   Result struct { ... }
)

func MyWorkflow(ctx workflow.Context, args Args) (*Result, error) {
    // implementation here
}

I want to be able to launch this (from a CLI in a different repo) using this:

func main() {
    client, err := ... // create a Temporal client

    client.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions {
        ID:                    "workflowID",
        "TaskQueue": "work",
    }, "MyWorkflow", "JSON Representation of Args")
}

I’ve tried using converters.JSONPayloadConverter and just extracting the Data field, but no luck.

Has anyone figured out a way to do this?

I don’t think this is supported out of the box. The workaround is to create your own type, “MyJSON,” for example, and then create your own DataConverter for this type.

Hey @maxim thanks so much for the response!

Would you suggest I create a new wrapper type for all args and then update all of my existing workflows to use this new type as an argument?

No, I suggest you create a single type for all raw JSON arguments, something like the “RawJSON” type. It would create output Payload in exactly the same format as JSONPayloadConverter.

Thank you so much for the pointers @maxim! I was able to get this working using the method you described.

In case anyone else runs into this, here’s what I ended up with:

package workflows

import (
	"go.temporal.io/api/common/v1"
	"go.temporal.io/sdk/converter"
)

type (
	// Arg represents a dynamic JSON argument. This is encoded as a JSON payload under the hood so that decoding on the
	// temporal worker will be the corresponding arg type for the workflow.
	//
	// It is up to the caller to ensure what's passed in can be decoded server-side.
	Arg struct {
		Json []byte // The raw JSON.
	}

	// A custom payload converter for encoding Args
	payloadConverter struct {
		*converter.JSONPayloadConverter
	}
)

// NewDataConverter returns a composite DataConverter that includes support for encoding Arg objects.
func NewDataConverter() converter.DataConverter {
	return converter.NewCompositeDataConverter(
		converter.NewNilPayloadConverter(),
		converter.NewByteSlicePayloadConverter(),
		converter.NewProtoJSONPayloadConverter(),
		converter.NewProtoPayloadConverter(),
		converter.NewJSONPayloadConverter(),
		&payloadConverter{converter.NewJSONPayloadConverter()},
	)
}

// ToPayload creates a Payload for the bytes within the Arg object.
//
// NB: This returns nil (and no error) when this converter should not be used.
func (p *payloadConverter) ToPayload(v any) (*common.Payload, error) {
	if v, ok := v.(Arg); ok {
		return &common.Payload{
			Metadata: map[string][]byte{
				converter.MetadataEncoding: []byte(p.Encoding()),
			},
			Data: v.Json,
		}, nil
	}

	return nil, nil
}

And then when creating the client:

client, err := client.Dial(client.Options{
	HostPort:      "localhost:7233",
	DataConverter: workflows.NewDataConverter(),
	...
})
...
...

I don’t understand how your code works. The converters are evaluated in the order of registration with the composite data converter. So, your custom data converter is never called, as the JSONPayloadConverter would try to serialize the Arg and not give it to the next converter. I think you want to change the order of registration for your custom converter to precede the JSONPayloadConverter. And I would not serialize any other type than Arg and just pass it to the next one.

That was my understanding as well. However, if I move the custom one anywhere above JSONPayloadConverter, I get { "Json": "<value>" } instead of the custom output.

I would look at this in the debugger, as this doesn’t make sense.