Signaling activity from workflow

My use case - I have two sets of activities - one for produces events in external system (let’s say Kafka) and another for consuming these events. Both activities are working simultaneously. Is there a mechanism for notifying consumer activity that producer activity ended and once you process all remaining events from external system you can exit, and not wait for other events to be added?

In normal case (when workers are not failing) canceling consumer activities context will do the job. But is there more reliable way of doing that?

I’m not sure I understand your design. What do you mean by activities that consume Kafka events? We recommend having an independent Kafka consumer that consumes events and sends them as signals to the workflows that care about them.

In my use case Kafka is just a third party system not related to Temporal (could be anything else)

in pseudocode my workflow looks like this

consumer := workflow.ExecuteActivity(ctx, ConsumerActivity)

ch := workflow.Channel()
for i:=0; i< numPublishers; i++ {
   producer :=  workflow.ExecuteActivity(ctx, ProducerActivity)
   workflow.Go(ctx, func(){
      var numRows int 
      producer.Get(ctx, &numRows)
      ch.Send(ctx, numRows)
   })
}

var rowsProduced int
for i:=0; i< numPublishers; i++ {
  var res int
  ch.Receive(ctx, &v)
  rowsProduced += res
}

// signal consumer somehow that all producers ended working
// HOW TO DO THAT?

var rowsProcessed int
consumer.Get(ctx, &rowsProcessed)

return rowsProduced, rowsProcessed, nil 

where the consumer activity looks like

var numRows int
for {
item, err := externalQueue.Pop()
if err == io.EOF{
     if signalFromWorkflow received {
          return numRows
     }
     time.Sleep(time.Second)
     continue
}
doProcessing(item)
numRows++
}

Are you creating a Kafka stream per workflow instance? Then your design makes sense.

But if you have a single Kafka stream for all workflow instances then you don’t need to run the consumer in an activity. It can be just always running consumer that signals workflow upon receiving a message.

yes, exactly - single stream per workflow

the only difficulty I have is to signal consumer activity that it can no longer wait after consuming all the messages from stream.

There is of course a workaround - to add activity after publishing that will write dummy EOF message in the end of the stream. But just wanted to know if there is any better option?

Interesting. Have you considered not creating such stream and just signal workflow directly? Kafka stream per workflow instance sounds like a very heavyweight solution.

Independently, if you decide to stick to your current design just cancel the second activity. It is done by cancelling its context. Activity must heartbeat and have reasonably small heartbeat timeout to be cancellable. Here is the relevant sample.

Sorry, I looked at the sample and while it demonstrates how to implement an activity to be cancellable it doesn’t demonstrate how to cancel it explicitly. Use workflow.WithCancel to create a cancellable context.

workflow.WithCancel was my initial thought, but it wouldn’t work in case when all the producers finished their job and consumer context was cancelled, but not all the messages from stream are consumed yet. Correct me if I’m wrong - if the worker will crash in this moment, consuming activity will not be restarted as the context was already cancelled.

Huge thanks @maxim , for answering me. I find Temporal extremely nice tool, which removed a lot of burden from my service! I will for sure recommend it to my colleagues.

You are welcome!

workflow.WithCancel was my initial thought, but it wouldn’t work in case when all the producers finished their job and consumer context was cancelled, but not all the messages from stream are consumed yet. Correct me if I’m wrong - if the worker will crash in this moment, consuming activity will not be restarted as the context was already cancelled.

Then you have to indicate through the stream that all messages are delivered. It looks like the workflow code doesn’t know when to stop the consuming activity. IMHO you would be much better just sending signals to the workflow without Kafka.