How queries on completed workflows work

I’m querying a list of completed workflows based on a certain search criteria to get their result. From my understanding the worker has to pull and replay the workflow history to respond to a query. Just wondering if there is any optimisation done for completed workflows to prevent this?

I also believe i can store the result using memo, bypassing the need to query the result of each workflow after using ListWorkflow to get the list of executions i want. Exploring that as well, let me know if this is a good use case for memo.

I’ve also noticed that queries using workflowID and runID seem to ignore the runID and give me result of the most recent workflow run for that workflowID

@maxim just wondering if you have any thoughts?

Hi @Julio_Jones

From my understanding the worker has to pull and replay the workflow history to respond to a query.

Only if the worker does not have the workflow in cache.

I also believe i can store the result using memo,

It depends on the SDK, some SKDs do not allow setting Memo at runtime, only when the workflow is scheduled

If you only want the result, why don’t you get the workflow result using the workflowClient. For example in Java it would be something like

String result = client.newUntypedWorkflowStub(WORKFLOW_ID).getResult(String.class);

which SDK do you use?

I’ve also noticed that queries using workflowID and runID seem to ignore the runID and give me result of the most recent workflow run for that workflowID

Could you share some sample code?

Thanks for getting back to me. The reason why i was trying to avoid fetching the result is because i might have 100s of workflow runs using a certain workflowID so i don’t want to make 100 getResult calls to temporal cloud.

This is the workflow code

func QueryWorkflow(ctx workflow.Context) (QueryReturn, error) {
	resp := QueryReturn{}

	resp.QueryResult = "started"

	logger := workflow.GetLogger(ctx)
	logger.Info("QueryWorkflow started")
	// setup query handler for query type "state"
	err := workflow.SetQueryHandler(ctx, "state", func() (string, error) {
		return resp.QueryResult, nil
	})
	if err != nil {
		logger.Info("SetQueryHandler failed: " + err.Error())
		return resp, err
	}

	resp.QueryResult = "waiting on timer"
	_ = workflow.NewTimer(ctx, time.Second*120).Get(ctx, nil)
	logger.Info("Timer fired")

	resp.QueryResult = "donezo"

	memo := map[string]interface{}{
		"queryResult": resp.QueryResult,
	}
	err = workflow.UpsertMemo(ctx, memo)
	if err != nil {
		return resp, err
	}

	return resp, nil
}

This is how im starting the workflow, and i’ve fired off a bunch.

	workflowOptions := client.StartWorkflowOptions{
		ID:        "query_workflow",
		TaskQueue: "query",
	}

	we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, query.QueryWorkflow)
	if err != nil {
		log.Fatalln("Unable to execute workflow", err)
	}

This is how im fetching the workflow results

func main() {
	ctx := context.Background()

	// The client is a heavyweight object that should be created once per process.
	c, err := client.Dial(client.Options{
		HostPort: client.DefaultHostPort,
	})
	if err != nil {
		log.Fatalln("Unable to create client", err)
	}
	defer c.Close()

	// Get list of workflows
	queryStr := "WorkflowId='query_workflow'"
	results, err := GetWorkflows(ctx, c, queryStr)
	if err != nil {
		log.Fatalln("Unable to get workflows", err)
	}

	// Get result of each workflow
	for _, execution := range results {
		exec := execution.Execution
		log.Println("WorkflowID", exec.GetWorkflowId(), "RunID", exec.GetRunId())

		// Query each workflow to get the result
		q, err := c.QueryWorkflow(ctx, exec.GetWorkflowId(), exec.GetRunId(), "state")
		if err != nil {
			log.Fatalln("Unable to query workflow", err)
		}
		var result string
		if err := q.Get(&result); err != nil {
			log.Fatalln("Unable to decode query result", err)
		}
		log.Println("Received query result", "Result", result)

		// Or we can get the result that's stored in the memo
		log.Println("Memo", execution.Memo)
	}
}

func GetWorkflows(ctx context.Context, c client.Client, query string) ([]*workflowpb.WorkflowExecutionInfo, error) {
	var nextPageToken []byte
	var workflowExecutions []*workflowpb.WorkflowExecutionInfo
	for {
		resp, err := c.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
			Query:         query,
			NextPageToken: nextPageToken,
		})
		if err != nil {
			return nil, err
		}
		workflowExecutions = append(workflowExecutions, resp.Executions...)
		nextPageToken = resp.NextPageToken
		if len(nextPageToken) == 0 {
			return workflowExecutions, nil
		}
	}
}

What im finding is if i change QueryResult to be done instead of donezo and restart my worker and start a new workflow then run the main function above the query for all workflow runs returns done for queries of all workflow runs instead of just the ones run after workflow was updated

My apologies i think this is actually my misunderstanding of how temporal works. I think when it responds to a query it replays the history using the current workflow which has been modified therefore all the queries will return done

And just to double check, i should be good setting the memo at runtime using the go sdk right?

The purpose of Memo is to include additional information into “visibility record” which is returned as a result of list calls. The worklfow query feature is not related to this as it is not used for listing workflows.

@maxim But couldnt i store workflow state in the memo, eliminating the need for individual getResult/query calls following a list call to get workflow state

edit: i guess the drawback of doing this is that i’d need to keep track of modifications made to the object stored in memo, as deserialisation would fail if i change it.

What is the actual business requirement? Do you want to query individual workflows about their current state? Or do you want to list workflows based on their current state?

@maxim We have a workflow that only runs one at a time per account, that limitation is done by using a consistent workflow-ID per account. I want to get the result of completed workflows and get the state of the in flight workflow based on this workflow-id. Its essentially to get the history of workflow results as well as the status of current workflow for an account.

That’s why i was looking at storing the workflow result in the memo, because then i can search workflow executions by workflowID and have their result in the memo instead of having to get the result of each workflow.

I see. Currently, updating the memo from the workflow code is not yet supported. Also don’t forget that Temporal doesn’t store completed workflows and their associated visibility records (returned by list) for long.

So, I recommend storing these results in an external DB by executing an activity at the end of each workflow.

@maxim Thanks, understood. We don’t have a db but now might be time to look into it. Btw i see UpsertMemo on the go sdk says this

// UpsertMemo is used to add or update workflow memo.
// UpsertMemo will merge keys to the existing map in workflow. For example:
//
//	func MyWorkflow(ctx workflow.Context, input string) error {
//		memo1 := map[string]interface{}{
//			"Key1": 1,
//			"Key2": true,
//		}
//		workflow.UpsertMemo(ctx, memo1)
//
//		memo2 := map[string]interface{}{
//			"Key1": 2,
//			"Key3": "seattle",
//		}
//		workflow.UpsertMemo(ctx, memo2)
//	}
//
// The workflow memo will eventually be:
//
//	map[string]interface{}{
//		"Key1": 2,
//		"Key2": true,
//		"Key3": "seattle",
//	}

Doesn’t this imply i can update the memo from the workflow. I wrote a simple test on a local cluster and i seem to be able to update memo.

I was wrong. The memo update was added since I last looked at this, so it is supported. I still would consider using an external DB if you need to keep these records beyond the workflow retention period.

1 Like