Using channel from temporal library

Trying to insert into a channel but getting error, anyone have a good idea what is goning wrong here?

inChan := workflow.NewChannel(ctx)
for _, item := range items {
*	inChan.Send(ctx, item)
}
inChan.Close()

Error Potential deadlock detected: workflow goroutine “root” didn’t yield for over a second StackTrace process event for TASK_QUEUE_cadence_lib_dsl [panic]:

go.temporal.io/sdk/internal.(*coroutineState).call(0xc000032a50)
/Users/bgao/go/pkg/mod/go.temporal.io/sdk@v1.5.0/internal/internal_workflow.go:881 +0x379
go.temporal.io/sdk/internal.(*dispatcherImpl).ExecuteUntilAllBlocked(0xc0000329b0, 0x0, 0x0)
/Users/bgao/go/pkg/mod/go.temporal.io/sdk@v1.5.0/internal/internal_workflow.go:966 +0x535

Thanks
Bo

What are other workflow goroutines doing? What is the type of items?

Thanks for the response! I do not have other routine running yet, as this is the first step after input. I was hoping to use channel to hold these inout then spin up multiple routine to consume the channel, but the channel send give me errors. Type of item is interface{}, string in this case.

I copy pasted your code into a workflow and it worked as expected. The workflow is blocked (which is visible through stack trace view in the UI) on Send as the channel is not buffered.

func Workflow(ctx workflow.Context, name string) (string, error) {
	items := []string {"aa", "bb", "cc"}
	inChan := workflow.NewChannel(ctx)
	for _, item := range items {
		inChan.Send(ctx, item)
	}
	inChan.Close()
	return "foo", nil
}

What is the type of items in your case?

Maybe easier to paste the code snippet here: inChan is working as expected now, with its buffer field:<interface {}> (length: 2, cap: 2)
[0]:<interface {}(string)>)
data:“hello”

I wonder why inChan.Receive keep erroring out: “illegal access from outside of workflow context”? I guess the workflow just keep trying and finally end with Potential deadlock detected: workflow goroutine “root” didn’t yield for over a second.

inChan := workflow.NewBufferedChannel(ctx, len(items))
outChan := workflow.NewBufferedChannel(ctx, len(items))
// insert input
for _, item := range items {
	inChan.Send(ctx, item)
}
inChan.Close()

// spin up workers to consume input channel
for x := 0; x < *s.MaxConcurrency; x++ {
	childCtx, cancelHandler := workflow.WithCancel(ctx)
	workflow.Go(ctx, func(ctx workflow.Context) {
		var inVal interface{}
		for inChan.Receive(ctx, inVal) {
			if inVal == nil {
				outChan.Send(ctx, fmt.Errorf("Input item can not be nil"))
			} else {
				selector := workflow.NewSelector(ctx)
				future, settable := workflow.NewFuture(ctx)
		        	selector.AddFuture(future, func(f workflow.Future) {
					var r interface{}
					err := f.Get(ctx, &r)
					if err != nil {
						cancelHandler()
						outChan.Send(ctx, err)
					} else {
						outChan.Send(ctx, r)
					}
				})
				output, _, err := s.Iterator.Execute(ctx, *s, inVal)
				settable.Set(output, err)
				selector.Select(childCtx)
				}
				}
			})
		}

		var respVal interface{}
		for outChan.Receive(ctx, respVal) {
			resp = append(resp, respVal)
		}

One issue I see with this code is that it uses contexts across goroutines which is not allowed. For example the childCtx is used inside a goroutine that is not the same as the one that created it.

Only context (or its derivative) that is passed to the goroutine function as an argument can be used inside that goroutine.

Does not seem to be context specific, I have changed to use one ctx, still getting the same root did not yield error. Do you have some example for a concurrency pattern like this in go The Go Playground?

    inChan := workflow.NewBufferedChannel(ctx, len(items))
	outChan := workflow.NewBufferedChannel(ctx, len(items))
	// insert input
	for _, item := range items {
		inChan.Send(ctx, item)
	}
	inChan.Close()

	// spin up workers to consume input channel
	selector := workflow.NewSelector(ctx)
	for x := 0; x < *s.MaxConcurrency; x++ {
		workflow.Go(ctx, func(ctx workflow.Context) {
			var inVal interface{}
			for inChan.Receive(ctx, &inVal) {
				if inVal == nil {
					outChan.Send(ctx, fmt.Errorf("Input item can not be nil"))
				} else {
					//create future for each channel item
					future, settable := workflow.NewFuture(ctx)
					// register callback for select(ctx)
					selector.AddFuture(future, func(f workflow.Future) {
						var r interface{}
						err := f.Get(ctx, &r)
						if err != nil {
							outChan.Send(ctx, err)
						} else {
							outChan.Send(ctx, r)
						}
					})
					//mimic future ready by setting settable
					output := inVal
					settable.Set(output, nil)
				}
			}

		})
	}
	//block for each item
	for i := 0; i < len(items); i++ {
		selector.Select(ctx)
	}

I believe Selector was never intended to be used from multiple goroutines as well.

Now I removed selector, but getting Error unknown command CommandType: Timer, ID: 5, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition. Is it doable to spin up multiple workers to consume items from a channel use temporal API?

	inChan := workflow.NewNamedBufferedChannel(ctx, "in", len(items))
	outChan := workflow.NewNamedBufferedChannel(ctx, "out", len(items))
	// insert input
	for _, item := range items {
		inChan.Send(ctx, item)
	}
	// spin up workers to consume input channel
	for x := 0; x < *s.MaxConcurrency; x++ {
		workflow.Go(ctx, func(ctx workflow.Context) {
			var inVal interface{}
			for inChan.Receive(ctx, &inVal) {
				if inVal == nil {
					outChan.Send(ctx, fmt.Errorf("Input item can not be nil"))
				} else {
					output, _, err := s.Iterator.Execute(ctx, *s, inVal)
					if err != nil {
						outChan.Send(ctx, err)
					} else {
						outChan.Send(ctx, output)
					}
				}
			}

		})
	}

	var respVal interface{}
	for outChan.Receive(ctx, &respVal) {
		resp = append(resp, respVal)
	}

It looks like the code is not deterministic. Have you checked that none of the constraints is violated?

  • What is the type of items?
  • What happens inside s.Iterator.Execute?

I copied your code and set items to be of type []string and replaced s.Iterator.Execute(ctx, *s, inVal) with an activity invocation. Code runs without any problems.

Interesting, now I comment out s.Iterator.Execute call, and replace with outChan.Send(ctx, inVal), workflow timeout. From debugger I can see outChan.Send does not change outChan’s buffer size. From test output I see various go routine errors.

goroutine 83 [running]:

testing.(*M).startAlarm.func1()

-usr-local-Cellar-go-1.16.3-libexec-src-testing-testing.go:1700 +0xe5

created by time.goFunc

-usr-local-Cellar-go-1.16.3-libexec-src-time-sleep.go:180 +0x45

goroutine 1 [chan receive]:

testing.(*T).Run(0xc000102d80, 0x1b6b573, 0x11, 0x1bacda8, 0x10945c6)

-usr-local-Cellar-go-1.16.3-libexec-src-testing-testing.go:1239 +0x2da

testing.runTests.func1(0xc000102c00)

-usr-local-Cellar-go-1.16.3-libexec-src-testing-testing.go:1511 +0x78

testing.tRunner(0xc000102c00, 0xc0003b3de0)

-usr-local-Cellar-go-1.16.3-libexec-src-testing-testing.go:1193 +0xef

testing.runTests(0xc00032a8a0, 0x21eed70, 0x1, 0x1, 0xc015b34ce2237c98, 0x6fc544772, 0x2208000, 0x1b683de)

-usr-local-Cellar-go-1.16.3-libexec-src-testing-testing.go:1509 +0x2fe

testing.(*M).Run(0xc00018c480, 0x0)

-usr-local-Cellar-go-1.16.3-libexec-src-testing-testing.go:1417 +0x1eb

main.main()

_testmain.go:43 +0x138

goroutine 20 [select]:

google.golang.org-grpc-internal-transport.(*Stream).waitOnHeader(0xc000624300)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-internal-transport-transport.go:321 +0x99

google.golang.org-grpc-internal-transport.(*Stream).RecvCompress(…)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-internal-transport-transport.go:336

google.golang.org-grpc.(*csAttempt).recvMsg(0xc00018de80, 0x1b27240, 0xc000732640, 0x0, 0x0, 0x0)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-stream.go:915 +0x731

google.golang.org-grpc.(*clientStream).RecvMsg.func1(0xc00018de80, 0xc0006320d0, 0xcf)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-stream.go:780 +0x46

google.golang.org-grpc.(*clientStream).withRetry(0xc00062ca20, 0xc000194ff8, 0xc000194fc8, 0xc00071e1d8, 0x2236b00)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-stream.go:638 +0x9f

google.golang.org-grpc.(*clientStream).RecvMsg(0xc00062ca20, 0x1b27240, 0xc000732640, 0x0, 0x0)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-stream.go:779 +0x105

google.golang.org-grpc.invoke(0x1c8a468, 0xc00072ad20, 0x1b9e0b2, 0x4c, 0x1b33c00, 0xc0000a2910, 0x1b27240, 0xc000732640, 0xc0003cf180, 0x0, …)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-call.go:73 +0x142

reflect.Value.call(0xc0003f4ae0, 0xc00012eef8, 0x13, 0x1b5feda, 0x4, 0xc000080e30, 0x1, 0x1, 0xc000080cf8, 0x100d50a, …)

-usr-local-Cellar-go-1.16.3-libexec-src-reflect-value.go:476 +0x8e7

reflect.Value.Call(0xc0003f4ae0, 0xc00012eef8, 0x13, 0xc000080e30, 0x1, 0x1, 0x1df9461, 0x3a, 0x438)

-usr-local-Cellar-go-1.16.3-libexec-src-reflect-value.go:337 +0xb9

github.com-stretchr-testify-suite.Run.func1(0xc000103080)

-Users-bgao-go-pkg-mod-github.com-stretchr-testify@v1.6.1-suite-suite.go:158 +0x379

testing.tRunner(0xc000103080, 0xc000164bd0)

-usr-local-Cellar-go-1.16.3-libexec-src-testing-testing.go:1193 +0xef

created by testing.(*T).Run

-usr-local-Cellar-go-1.16.3-libexec-src-testing-testing.go:1238 +0x2b3

goroutine 34 [IO wait]:

internal-poll.runtime_pollWait(0x24f4660, 0x72, 0xffffffffffffffff)

-usr-local-Cellar-go-1.16.3-libexec-src-runtime-netpoll.go:222 +0x55

internal-poll.(*pollDesc).wait(0xc0003e3c98, 0x72, 0x8000, 0x8000, 0xffffffffffffffff)

-usr-local-Cellar-go-1.16.3-libexec-src-internal-poll-fd_poll_runtime.go:87 +0x45

internal-poll.(*pollDesc).waitRead(…)

-usr-local-Cellar-go-1.16.3-libexec-src-internal-poll-fd_poll_runtime.go:92

internal-poll.(*FD).Read(0xc0003e3c80, 0xc00048a000, 0x8000, 0x8000, 0x0, 0x0, 0x0)

-usr-local-Cellar-go-1.16.3-libexec-src-internal-poll-fd_unix.go:166 +0x1d5

net.(*netFD).Read(0xc0003e3c80, 0xc00048a000, 0x8000, 0x8000, 0x16ad4de, 0x800000601, 0x0)

-usr-local-Cellar-go-1.16.3-libexec-src-net-fd_posix.go:55 +0x4f

net.(*conn).Read(0xc000486000, 0xc00048a000, 0x8000, 0x8000, 0x0, 0x0, 0x0)

-usr-local-Cellar-go-1.16.3-libexec-src-net-net.go:183 +0x91

bufio.(*Reader).Read(0xc000482060, 0xc0004a4038, 0x9, 0x9, 0x14, 0x24373c8, 0xc0000b9de0)

-usr-local-Cellar-go-1.16.3-libexec-src-bufio-bufio.go:227 +0x222

io.ReadAtLeast(0x1c6ed20, 0xc000482060, 0xc0004a4038, 0x9, 0x9, 0x9, 0x18, 0xc000882cd8, 0xc000a90000)

-usr-local-Cellar-go-1.16.3-libexec-src-io-io.go:328 +0x87

io.ReadFull(…)

-usr-local-Cellar-go-1.16.3-libexec-src-io-io.go:347

golang.org-x-net-http2.readFrameHeader(0xc0004a4038, 0x9, 0x9, 0x1c6ed20, 0xc000482060, 0x0, 0xc000000000, 0xc000882c01, 0xc0000b9e60)

-Users-bgao-go-pkg-mod-golang.org-x-net@v0.0.0-20201224014010-6772e930b67b-http2-frame.go:237 +0x89

golang.org-x-net-http2.(*Framer).ReadFrame(0xc0004a4000, 0xc0008980f0, 0xc0008980f0, 0x0, 0x0)

-Users-bgao-go-pkg-mod-golang.org-x-net@v0.0.0-20201224014010-6772e930b67b-http2-frame.go:492 +0xa5

google.golang.org-grpc-internal-transport.(*http2Client).reader(0xc0004ac000)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-internal-transport-http2_client.go:1330 +0x185

created by google.golang.org-grpc-internal-transport.newHTTP2Client

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-internal-transport-http2_client.go:345 +0x1011

goroutine 35 [select]:

google.golang.org-grpc-internal-transport.(*controlBuffer).get(0xc0004a2050, 0x1, 0x0, 0x0, 0x0, 0x0)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-internal-transport-controlbuf.go:395 +0xff

google.golang.org-grpc-internal-transport.(*loopyWriter).run(0xc000482120, 0x0, 0x0)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-internal-transport-controlbuf.go:515 +0x1dd

google.golang.org-grpc-internal-transport.newHTTP2Client.func3(0xc0004ac000)

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-internal-transport-http2_client.go:391 +0x7b

created by google.golang.org-grpc-internal-transport.newHTTP2Client

-Users-bgao-go-pkg-mod-google.golang.org-grpc@v1.36.1-internal-transport-http2_client.go:389 +0x11e7

exit status 2

Good. Try getting the workflow stack dump using temporal UI or CLI while it is running. Consider increasing workflow timeout if it closes fast.

My guess is that outChan is not closed which causes the loop at the end of the workflow to never exit.

Yes, that is the issue, once I update to close after receive all items, workflow succeed, thank you!

1 Like