IoT devices management

Hi there,
I’m exploring temporal and trying to understand if this can fit my needs. Since this project appears to be amazing but huge, I’d love and appriciate hearing you feedback.

My goal is to be able to define a workflow like the following:

  • start the workflow based on time (cron-like) OR based on kafka event OR grpc/http call
  • make an http/grpc call (at the moment, but could also be a message sent over kafka) in order to send a commant to an IoT device
  • wait for feedback (command ack): http/grpc call or kafka message
  • (optional) each device send a report every X minutes, handle and process these events while waiting the “command execution result” (next step). This is optional because perhaps these messages can avoid going through temporal, I guess it depends on the final design
  • wait for command execution result: this could be a matter of minutes, hours or even days. The “feedback” should be received through a kafka message, but in case this wouldn’t be possible, can be replaced with ad http/grpc call.
  • proceed making a new http/grpc call or producing a kafka message (new command for an IoT device)
  • wait again for the reusult
  • an so on…
  • each “wait action” must have a known timeout
  • if something fails, the workflow must be stopped and the error reported via http/grpc/kafka
  • each worflow must be created/deleted/managed programmatically: the users, through our UI, will be able to create/delete/update workflows by themselves

Since this this is a brand new project, at this very moment we have a very low message rate, but we must be ready for a fast-growing scenario

Thank you very much

1 Like

This scenario is a very good fit for the Temporal platform. Use activities to initiate the async actions and use signals to report their completion.

Hi @maxim, thanks for your fast feedback!

So if I got it right, my workflow should look like the following:

  1. Activity which sends the commant to the IoT device (through grpc/http/kafka, this doesn’t matter)
  2. Activity which waits for the command ack message through signal
  3. Activity which waits for the command completed message through signal
  4. Activity which send the next command to the next device
  5. and so on…

My workflow can be started with:

  • directly using the SDK
  • using the CronSchedule parameter
  • sending a signal through SignalWithStart SDK API. In this scenario, the first activity of my workflow must be the one who waits for the signal?

There is still a part that I have no idea how to do, may you help me on that?
At this moment, almost each workflow consists of a sequence of the points 1, 2 and 3. What change is the command sent, the target device (id, type, …) and the wait time for command completed message; basically, our users must be able to “put together” different groups of activities 1, 2 and 3.

Thanks!

  1. Activity which sends the commant to the IoT device (through grpc/http/kafka, this doesn’t matter)

Correct.

  1. Activity which waits for the command ack message through signal

Signal is a way to deliver an event to a workflow directly. No activity is used to wait for signal. This is workflow only code.

  1. Activity which waits for the command completed message through signal

No activity involved. Workflow waits for the signal directly.

  1. Activity which send the next command to the next device

I think in your case you want to have a workflow execution (instance) per device. So there is no “next device” for each workflow execution as it always talks to the same device.

Thanks for the clarification.

Just to understand and go through the right way, why would you say this is a better approch? I tought about each “next command” as a next workflow step, which could simply be a new activity of the same workflow. If I put each “send command” activity in a separate workflow I would have a lot of workflow with just 1 activity per each, and I would start the “next workflow” from the previous

From the original post, I’ve got the impression that managing an IoT device is more complicated than executing a single activity.

Hi @maxim, you are right, managing IoT device is more complicated than executing a single activity.
I’ve been thinking about what you said and I come up with the following (possible) solution in order to be able to define a dynamic temporal workflows composition. This should allow our customers to define their workflow, where each “customer workflow” step is a temporal workflow. Don’t know if I’ve been able to explain myself clearly, it’s a tricky situation (and having to explain that in english doesn’t help…)

Here is my thought about how a temporal workflow can be made up:

  • Declare “wait for command ack signal” (s1 := workflow.NewSelector(ctx); s1.AddReceive(...)
  • Declare “wait for command completed signal” (s2 := workflow.NewSelector(ctx); s2.AddReceive(...)
  • Run activity to send command to device
  • Run activity to dispatch COMMAND_SENT kafka event (auditing and user-feedback pourpose)
  • Wait for device command ack (s1.Select)
  • Run activity to dispatch DEVICE_COMMAND_ACK kafka event
  • Wait for device command completed (s2.Select)
  • Run activity to dispatch DEVICE_COMMAND_COMPLETED kafka event (auditing, user-feedback pourpose and trigger next workflow execution)

At this point, a service outside the temporal environment will consume the DEVICE_COMMAND_COMPLETED kafka event, check in our db is user has defined another “customer workflow” step and strart another temporal workflow (“customer workflow” next step) if required.

What to you think? Is conceptually correct? Is there a better way? The service ouside the temporal environment is really required?

Thanks

Let’s zoom about your use case. DM me your available times and email.

Hi there,

thanks to the very nice zoom call with @maxim a few weeks ago, I’ve been able to come up with a very elegant, reliable and powerful solution (in my opinion, of couse). I’ll try to describe here what I’ve done, perhaps it can be useful to someone else.

For the control of a single device I’ve created a workflow based on the timer example, with the difference I dont’t wait for an activity to be completed, but for a specific signal. Basically what I do is the following

// send command
workflow.ExecuteActivity(ctx, activities.SendCommandActivity, cmd, device).Get(ctx, nil)
// end send command

// wait for command ack (function)
s := workflow.NewSelector(ctx)
s.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) { c.Receive(ctx, &telemetrySignalVal)})

timerFuture := workflow.NewTimer(timerCtx, 2*time.Minute)
s.AddFuture(timerFuture, func(f workflow.Future) {
	workingTelemetryTimeout = errors.New("working telemetry timeout")
})

for {
	s.Select(ctx)
	if workingTelemetryTimeout != nil {
		l.Error("timeout waiting for working telemetry")
		return workingTelemetryTimeout
	}

	if telemetrySignalVal.Telemetry.Status == types.TelemetryStatusWorking {
		l.Info("command successfully ack")
		return nil
	}
}
// end wait for command ack

// wait for command completed (function)
s := workflow.NewSelector(ctx)

s.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {c.Receive(ctx, &telemetrySignalVal)})

var stopWorkingCmdError error
timerFuture := workflow.NewTimer(timerCtx, time.Duration(cmd.Watering.Interval)*time.Second+time.Minute)
s.AddFuture(timerFuture, func(f workflow.Future) {
    // the command should have been executed at this time, try to force-stop the execution through a stop command
	err := workflow.ExecuteActivity(ctx, activities.SendCommandActivity, stopCmd, device).Get(ctx, nil)
	if err != nil {
		stopWorkingCmdError = err
	}
})

for {
	s.Select(ctx)
	if stopWorkingCmdError != nil {
		l.Error("Timeout waiting for stop working telemetry")
		return stopWorkingCmdError
	}

	if telemetrySignalVal.Telemetry.Status == types.TelemetryStatusStopWatering {
		l.Info("command successfully executed")
		return nil
	}
}

N.B.: there are 2 workflow.NewSelector(ctx) because they are in different functions, the first is closed before the second one is created.

In order to make users able to define their “customer workflow”, I’ve created a second workflow based on the dsl example with a simple difference: the activity has been replaced with the above workflow.

// was AtivityInvocation
func (a WorkflowInvocation) execute(ctx workflow.Context, bindings map[string]string) error {
	var result string
	err := workflow.ExecuteChildWorkflow(ctx, workflows.ExecuteCommand, a.DeviceID, a.Interval).Get(ctx, &result)
	if err != nil {
		return err
	}
	if a.Result != "" {
		bindings[a.Result] = result
	}
	return nil
}

I’d love to hear your opinion, expecially I’d like to know if there are any problem using child workflows (performance, reliability, restore, …) and if the amount of signals can have some kind of impact on performance (database storage?)

Thanks!

The design looks good. One possible improvement would be to create a reusable function or structure that encapsulates the behavior of waiting for a signal with a specific payload with timeout.

I’d like to know if there are any problem using child workflows (performance, reliability, restore, …) and if the amount of signals can have some kind of impact on performance (database storage?)

Child workflows have their price, but I would start with a design that fits your use case and only try to optimize things that cause performance issues. A single workflow has a default limit of 100k events. So unless signals contain large payloads you can process quite a few of them before having issues.