Hello everyone , another topic on handling large payload.
What I found when exploring this forum and Slack is:
- A codec server transparently handles large payload, but may have a large impact on performance when replaying workflows (source);
- Recommended approach is to upload payload to a blob storage and pass around keys.
I chose to explore the latter option, and came to the following conclusion:
Let’s say I have to these message definitions:
message Payload {
google.protobuf.Any data = 1;
}
// Reference to a Payload stored in a blob storage
message PayloadRef {
string id = 1;
uint64 content_size_bytes = 2;
}
message Image {
bytes data = 1; // Can be very large
string description = 2;
}
Option 1: automatically handle payloads with wrapper around Activities
Much like this post, the idea is to “intercept” an Activity input and output, serialize them, send them to the blob storage and return a key (or equivalent).
def to_payload(obj: Message) -> PayloadRef:
# 1) Build `Payload` from `obj`
# 2) Upload it, and return the received `PayloadRef`
def from_payload(payload_ref: PayloadRef, message_type: T) -> T:
# Do the opposite of `to_payload`
@activity.defn
def first_activity() -> PayloadRef:
image: Image = do_activity_stuff()
return to_payload(image)
@activity.defn
def second_activity(payload_ref: PayloadRef) -> None:
image = from_payload(payload, Image)
Pros:
- Can be implemented generically, once and for all for any Activity, making the devs’ life easier
Cons:
- We lose all type safety: I really don’t like manipulating keys as inputs/outputs, because it is like (are almost like) manipulating void pointers in the sense that keys act like pointers to data of arbitrary type.
- It is difficult to do any operation on Activities inputs/outputs in the workflow code (filtering, merging…)
Option 2: manually handle large payloads
To alleviate Option 1’s cons, I imagined the following:
message Image {
bytes data = 1; // Can be very large
string description = 2;
}
message ImageRef {
PayloadRef payload_ref = 1;
string description = 2;
}
message ActivityInput {
repeated ImageRef images = 1;
}
With these, I can implement the following functions:
def as_ref(image: Image) -> ImageRef:
# 1) Serialize `image` into bytes
# 2) Upload these bytes, get a `PayloadRef` back
# 3) Return an `ImageRef` with the received `PayloadRef`
# and `image`'s description
def deref(image_ref: ImageRef) -> Image:
# Do the opposite of `as_ref`
Pros:
- “Type safety”
- Possibility of manipulating data outside of Activities (e.g.: reading an image’s description)
Cons:
- For each potential large object
LargeObject
, we have to:- define
LargeObjectRef
- implement
as_ref(LargeObject) -> LargeObjectRef
- implement
deref(LargeObjectRef) -> LargeObject
.
- define
Conclusion
Each of these options’ cons are hard to ignore.
I prefer Option 2, but long-term maintenance is a bit frightening. It could be alleviated with code gen for as_ref
and deref
though.
What are your thoughts? What solutions have been found out there? Thank you very much