External library returning a go channel

Hello there !

Another day, another question (maybe @maxim will answer again :P): I have an external library that returns a go channel that returns to the application any new outside world events. Problem is that I can’t use this library in the workflow level because it is a go channel and not a workflow channel. I can’t also use an activity because I can’t return a go channel as it is not serializable.

How would you do it, knowing that you have to maintain the channel open to get new events ?

Thanks a lot !!

Hi @maxim, as the library that I’m using is listening to a websocket, I found this thread (which is a bit old now) that has the same problem : Modelling temporal workflows for websockets and event streaming

Is that still the solution today ? And what do you mean by “separate” ? I would have find it quite useful to encapsulate the websocket into a workflow or an activity to start it again automatically if it goes down, so an out-of-temporal process that send signals to my workflow would require control management (like the one mentioned, or to kill it if the main workflow is stopped).

Sorry to bother you especially with that, but as you were the one answering this topic, you may remember that :slight_smile:

Okay, so I discovered the AI button on the bottom right of the documentation page (I’m not a great observer apparently :dotted_line_face:) and tried to ask the same question. Here is the response:

Hello! I'd be happy to help you with your question about integrating an external library that returns a Go channel with Temporal workflows. This is indeed a tricky situation due to the constraints of Temporal workflows. Let's explore a possible solution based on the information available in the knowledge sources.
The key challenge here is that Temporal workflows cannot directly use Go channels due to determinism requirements. Instead, they must use Temporal's workflow.Channel Go SDK Channel.
Here's an approach you could consider:

    Create an Activity that interacts with your external library and listens to the Go channel it returns.
    Within this Activity, use the Temporal SDK to send signals to the workflow for each event received on the Go channel.
    In your workflow, use a workflow.GetSignalChannel to receive these signals.

Here's a high-level outline of how this might look:

// Activity
func ListenToExternalEvents(ctx context.Context) error {
    // Initialize your external library here
    externalChan := externalLibrary.GetEventChannel()

    // Use the activity's context to get the workflow ID
    info := activity.GetInfo(ctx)
    workflowID := info.WorkflowExecution.ID

    // Create a client to send signals
    c, err := client.NewClient(client.Options{})
    if err != nil {
        return err
    }

    for event := range externalChan {
        // Send each event as a signal to the workflow
        err := c.SignalWorkflow(ctx, workflowID, "", "externalEvent", event)
        if err != nil {
            // Handle error
        }
    }

    return nil
}

// Workflow
func YourWorkflow(ctx workflow.Context) error {
    // Create a channel to receive signals
    signalChan := workflow.GetSignalChannel(ctx, "externalEvent")

    // Start the activity to listen to external events
    future := workflow.ExecuteActivity(ctx, ListenToExternalEvents)

    for {
        selector := workflow.NewSelector(ctx)
        
        selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) {
            var event YourEventType
            c.Receive(ctx, &event)
            // Process the event
        })

        selector.Select(ctx)
    }
}

This approach allows you to maintain an open channel in the Activity while still adhering to Temporal's workflow constraints. The Activity acts as a bridge between the external library's Go channel and the workflow's signal channel.
A few important points to note:

    The Activity will run continuously, so you might want to consider using a long-running Activity pattern Temporal Docs: Long-Running Activities.
    Be mindful of the rate at which you're sending signals. If the external events come in very rapidly, you might need to implement some form of batching or rate limiting.
    Error handling and graceful shutdown of the Activity are important considerations that you'll need to implement based on your specific requirements.
    Remember that workflows have deterministic execution requirements, so all interactions with external systems should be done through Activities.

This solution allows you to maintain the open channel from your external library while still working within Temporal's workflow model. However, please note that the exact implementation details might vary depending on your specific use case and the nature of the events you're processing.

It seems like a convincing way of doing this (that respect the “all external interactions into an activity”), so I’ll try it and I’ll come back here to tell what went good and what went wrong :slight_smile:

We might one day provide direct integration with WebSockets. But for now, the advice from the post you linked is solid.

I confirm that it works flawlessly, so let’s do it that way. Thanks @maxim for taking the time to confirme that :slight_smile:

1 Like