Adding codec and not breaking determinism of existing long running workflows

Hi guys

Is there any way to add a codec to the DataConverter without breaking determinism of existing long running workflows (that weren’t using the codec for data conversion)?

My codec is identical to the temporal example: https://github.com/temporalio/samples-python/blob/main/encryption/codec.py

And this is the code for the data converter

class PydanticPayloadConverter(CompositePayloadConverter):
    """Payload converter that replaces Temporal JSON conversion with Pydantic
    JSON conversion.
    """

    def __init__(self) -> None:
        super().__init__(
            *(
                c
                if not isinstance(c, JSONPlainPayloadConverter)
                else PydanticJSONPayloadConverter()
                for c in DefaultPayloadConverter.default_encoding_payload_converters
            )
        )


pydantic_data_converter = DataConverter(
    payload_converter_class=PydanticPayloadConverter, payload_codec=EncryptionCodec()
)

When I add the codec and run the replayer for existing workflows that have not used the codec it fails with the error below.

When I run the replayer with a workflow that has used codec it works perfectly

This is the error

E RuntimeError: 4: Workflow activation completion failed: Failure { failure: Some(Failure { message: “Failed decoding arguments”, source: “”, stack_trace: " File "/usr/local/lib/python3.11/site-packages/temporalio/worker/_workflow_instance.py", line 301, in activate\n self._apply(job)\n\n File "/usr/local/lib/python3.11/site-packages/temporalio/worker/_workflow_instance.py", line 373, in _apply\n self._apply_resolve_activity(job.resolve_activity)\n\n File "/usr/local/lib/python3.11/site-packages/temporalio/worker/_workflow_instance.py", line 490, in _apply_resolve_activity\n ret_vals = self._convert_payloads(\n ^^^^^^^^^^^^^^^^^^^^^^^\n\n File "/usr/local/lib/python3.11/site-packages/temporalio/worker/_workflow_instance.py", line 1232, in _convert_payloads\n raise RuntimeError("Failed decoding arguments") from err\n", encoded_attributes: None, cause: Some(Failure { message: “‘Unknown payload encoding ’”, source: “”, stack_trace: " File "/usr/local/lib/python3.11/site-packages/temporalio/worker/_workflow_instance.py", line 1224, in _convert_payloads\n return self._payload_converter.from_payloads(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n File "/usr/local/lib/python3.11/site-packages/temporalio/converter.py", line 305, in from_payloads\n raise KeyError(f"Unknown payload encoding {encoding.decode()}")\n", encoded_attributes: None, cause: None, failure_info: Some(ApplicationFailureInfo(ApplicationFailureInfo { r#type: “KeyError”, non_retryable: false, details: None })) }), failure_info: Some(ApplicationFailureInfo(ApplicationFailureInfo { r#type: “RuntimeError”, non_retryable: false, details: None })) }), force_cause: Unspecified }

/usr/local/lib/python3.11/site-packages/temporalio/worker/_replayer.py:105: RuntimeError

It looks like something has an empty encoding or the payload is otherwise malformed?

I cannot see your codec, so I cannot know if it was developed in a backwards compatible way. But basically if you want a codec to work with previous codec-not-applied payloads, it needs to be backwards compatible. But I fear your codec isn’t setting an encoding. Can you alter the encryption sample to demonstrate the exact error you are seeing? Also, showing your codec code may help.

Hi Chad, below is all the code

The strange thing is that in the decode method of codec.py, the encoding property of some of the payloads are not set - i.e. “None”. I can demonstrate this by putting in a print(p.metadata.get(“encoding”)).

Furthermore with the the CompositePayloadConverter (packages/temporalio/converter.py) that I use for the converter, I can also see that the “from_payloads” method sets encoding to “” in this line for said payloads: encoding = payload.metadata.get("encoding", b"<unknown>").

I am certain that in the workflow JSON all have an encoding prop as seen after all the code and I can confirm that running the replayer without encoding works 100%.

data_converter.py

class PydanticPayloadConverter(CompositePayloadConverter):
    """Payload converter that replaces Temporal JSON conversion with Pydantic
    JSON conversion.
    """

    def __init__(self) -> None:
        super().__init__(
            *(
                c
                if not isinstance(c, JSONPlainPayloadConverter)
                else PydanticJSONPayloadConverter()
                for c in DefaultPayloadConverter.default_encoding_payload_converters
            )
        )


pydantic_data_converter = DataConverter(
    payload_converter_class=PydanticPayloadConverter, payload_codec=EncryptionCodec()
)
"""Data converter using Pydantic JSON conversion."""

codec.py

class EncryptionCodec(PayloadCodec):
    def __init__(
        self,
        key_id: str = get_settings().temporal_codec_key_id,
        key: bytes = get_settings().temporal_codec_key.encode("utf-8"),
    ) -> None:
        super().__init__()
        self.key_id = key_id
        self.encryptor = AESGCM(key)

    async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
        return [
            Payload(
                metadata={
                    "encoding": b"binary/encrypted",
                    "encryption-key-id": self.key_id.encode(),
                },
                data=self.encrypt(p.SerializeToString()),
            )
            for p in payloads
        ]

    async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
        ret: List[Payload] = []
        for p in payloads:
            # Ignore ones w/out our expected encoding
            print(p.metadata.get("encoding")) # ********* some payloads do not have this property set
            if p.metadata.get("encoding", b"").decode() != "binary/encrypted":
                ret.append(p)
                continue
            # Confirm our key ID is the same
            key_id = p.metadata.get("encryption-key-id", b"").decode()
            if key_id != self.key_id:
                raise ValueError(
                    f"Unrecognized key ID {key_id}. Current key ID is {self.key_id}."
                )
            # Decrypt and append
            ret.append(Payload.FromString(self.decrypt(p.data)))
        return ret

    def encrypt(self, data: bytes) -> bytes:
        nonce = os.urandom(12)
        return nonce + self.encryptor.encrypt(nonce, data, None)

    def decrypt(self, data: bytes) -> bytes:
        return self.encryptor.decrypt(data[:12], data[12:], None)

temporalio lib converter.py (in temporalio package)

    def from_payloads(
        self,
        payloads: Sequence[temporalio.api.common.v1.Payload],
        type_hints: Optional[List[Type]] = None,
    ) -> List[Any]:
        """Decode values trying each converter.

        See base class. Always returns the same number of values as payloads.

        Raises:
            KeyError: Unknown payload encoding
            RuntimeError: Error during decode
        """
        values = []
        for index, payload in enumerate(payloads):
            type_hint = None
            if type_hints and len(type_hints) > index:
                type_hint = type_hints[index]
            # Raw value should just wrap
            if type_hint == temporalio.common.RawValue:
                values.append(temporalio.common.RawValue(payload))
                continue
            print(payload.metadata.get("encoding")) # ******** this is None for certain payloads
            encoding = payload.metadata.get("encoding", b"<unknown>")

workflow_history.json

{
    "events": [{
            "eventId": "1",
            "eventTime": "2023-08-24T10:58:05.769575887Z",
            "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
            "taskId": "1048587",
            "workflowExecutionStartedEventAttributes": {
                ......
                "input": {
                    "payloads": [{
                        "metadata": {
                            "encoding": "anNvbi9wbGFpbg=="
                        },
                        "data": "eyJidXNp......"
                    }]
                },
                ....
            }
        },
        {
            "eventId": "5",
            "eventTime": "2023-08-24T10:58:05.782837845Z",
            "eventType": "EVENT_TYPE_ACTIVITY_TASK_SCHEDULED",
            "taskId": "1048599",
            "activityTaskScheduledEventAttributes": {
                .....
                "input": {
                    "payloads": [{
                        "metadata": {
                            "encoding": "anNvbi9wbGFpbg=="
                        },
                        "data": "eyJmaXJ....."
                    }]
                },
                ....
            }
        },
        {
            "eventId": "7",
            "eventTime": "2023-08-24T10:58:07.973903054Z",
            "eventType": "EVENT_TYPE_ACTIVITY_TASK_COMPLETED",
            "taskId": "1048606",
            "activityTaskCompletedEventAttributes": {
                "result": {
                    "payloads": [{
                        "metadata": {
                            "encoding": "YmluYXJ5L251bGw="
                        }
                    }]
                },
                "scheduledEventId": "5",
                "startedEventId": "6",
                "identity": "7@e35051cbd582"
            }
        },
        {
            "eventId": "11",
            "eventTime": "2023-08-24T10:58:07.983431138Z",
            "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
            "taskId": "1048616",
            "workflowExecutionCompletedEventAttributes": {
                "result": {
                    "payloads": [{
                        "metadata": {
                            "encoding": "anNvbi9wbGFpbg=="
                        },
                        "data": "eyJkYXRhI...."
                    }]
                },
                "workflowTaskCompletedEventId": "10"
            }
        }
    ]
}

I don’t see anything obviously wrong (though I can’t see the “PydanticJSONPayloadConverter” in use it shouldn’t matter). So to confirm, somehow between history and the codec the encoding metadata is removed?

But that’s a getter call, I wonder why it would be setting anything.

I have not seen this happen before and I wonder if it’s something surprising with protobuf or on a specific call we’re not properly decoding payloads.

Can you confirm your Temporal version? Is it possible to provide a standalone Python script showing the codec decode call not getting metadata reliably? Maybe just with a tiny client+worker+workflowor something? That way I can debug exactly what code you are running to replicate.