hey team I’m having a problem I could use a extra pair of eyes cause it’s killing me… it’s related to long living workflows which is create using a “workflow” factory…
The problem… I can create multiple instance of the workflow without problem… the problem is, the state of all these workflows is going to be in sync with the last created instead of having independent states
type Workflow struct {
state map[string]interface{}
}
func (w *Workflow) Exec(ctx context.Context, arg1, arg2 string) (interface{}, error) {
/* Exec logic and save eventually to state */
state["arg1"] = arg1
state["arg2"] = arg2
return w, nil
}
func (w *Workflow) GetState() map[string]interface{} {
return w.state
}
func (w *Workflow) SendSignal(payload Payload) map[string]interface{} {
/* */
}
type MyWorkflowFactory struct {
workflow Workflow
client client.Client
}
func (mw MyWorkflowFactory) getState() interface{} {
return mw.workflow.GetState()
}
func (mw MyWorkflowFactory) sendSignal(payload Payload) {
return mw.workflow.SendSignal(payload)
}
func (mw MyWorkflowFactory) init(wkr worker.Worker) client.Client {
var err error
mw.client, err = client.Dial(/* dial */)
if err != nil {
return nil, err
}
wkr.RegisterWorkflowWithOptions(mw.exec, workflow.RegisterOptions{
Name: "SampleWorkflow",
DisableAlreadyRegisteredCheck: false,
})
return mw.client, nil
}
func (mw MyWorkflowFactory) exec(ctx workflow.Context, arg1, arg2 string) (interface{}, error) {
getStateFn := func() error {
if err := workflow.SetQueryHandler(ctx, ENGINE_STATE, func(input []byte) (interface{}, error) {
return mw.getState(), nil
}); err != nil {
return err
}
return nil
}
sendSignalFn := func() error {
var signal Payload
signalChan := workflow.GetSignalChannel(ctx, SIGNAL_CHANNEL)
selector := workflow.NewSelector(ctx)
selector.AddReceive(signalChan, func(channel workflow.ReceiveChannel, more bool) {
channel.Receive(ctx, &signal)
})
for {
selector.Select(ctx)
mw.sendSignal(&signal)
}
}
// Register getStateFn
if err := getStateFn(); err != nil {
return nil, err
}
c := context.Background()
r, err := mw.workflow.Execute(c, arg1, arg2)
if err != nil {
return nil, err
}
// Block the workflow and wait for signals
if err := sendSignalFn(); err != nil {
return nil, err
}
}
func (mw MyWorkflowFactory) StartWorkflow(ctx context.Context, arg1, arg2 string) (string, error) {
id := GenerateID()
workflowOptions := client.StartWorkflowOptions{
ID: id,
TaskQueue: "THE_QUEUE",
}
l, err := mw.client.ExecuteWorkflow(ctx, workflowOptions, "SampleWorkflow", arg1, arg2); if err != nil {
return "", err
}
return l.GetRunID(), nil
}
func NewFactory(workflow Workflow) *MyWorkflowFactory {
return &MyWorkflowFactory{
workflow: workflow,
}
}
func main() {
w := Workflow{
state: map[string]interface{}
}
factory := NewFactory(w)
c := factory.init()
defer c.Close()
ctx := context.Background()
id1, _ := factory.StartWorkflow(c, "foo", "bar")
id2, _ := factory.StartWorkflow(c, "jaz", "zaz")
}
Assume a few stuff… the Exec Logic of Workflow basically means receive signals and update the state base on the payload… it can add anything to the state map.
At this point… i can create 2 instances of the workflow as the example, the problem is that the two workflows state will look like “jaz”, “zaz”… or the last one created… so it’s shares the state for all the instances.
Each workflow instance has it’s own id and runid…
Does it ring any bells? I think it’s cause the way I’m blocking the workflow using sendSignalFn, i think it might now be working the way I think… in fact, I think somehow I’m sharing the context between all the workflows.
Any help would be appreciated.