Description
Follow-up to #723 / #839.
The set()-time memory value validator added in #839 (validate_memory_value in python/flink_agents/api/memory_object.py) accepts a recursively checkpoint-stable contract of None | bool | int | float | str | list[...] | dict[str, ...]. The accepted scalar set is exact-typed:
_CHECKPOINT_STABLE_SCALARS = (bool, int, float, str)
bytes was part of the contract originally proposed in #723:
None | bool | int | float | str | bytes | list[MemoryValue] | dict[str, MemoryValue]
but was intentionally excluded from the validator. The reason: the Python→Java bytes conversion through Pemja is unverified end-to-end, and accepting an unsafe value would defeat the validator's purpose (turning an early, debuggable rejection back into a checkpoint-time JVM crash on restore). Excluding it was the safe default. This issue tracks closing that gap.
Task
Verify whether bytes survives the full Pemja → Flink state → checkpoint → TaskManager-restart → restore path, the same real-job path used to reproduce #723 (not just direct Flink serializer behavior):
- Confirm whether Pemja materializes a Python
bytes value into a native, checkpoint-stable JVM type (e.g. byte[]) on the FlinkMemoryObject.set() path, rather than a PyObject/PyJObject wrapper.
- If it does materialize safely:
- If it does not materialize safely:
- Keep
bytes rejected, and document in the contract docs that bytes must be materialized before storing (e.g. base64-encode to str), with the rationale.
Notes
A true restore test can't run on the MiniCluster — in-place recovery does not recreate the JVM, so the Pemja conversion path isn't crossed. Verification needs the real restart path described in #723 (RocksDB + filesystem checkpointing + manual TaskManager kill/restart), or the e2e checkpoint-recovery harness tracked in #836.
Are you willing to submit a PR?
Description
Follow-up to #723 / #839.
The
set()-time memory value validator added in #839 (validate_memory_valueinpython/flink_agents/api/memory_object.py) accepts a recursively checkpoint-stable contract ofNone | bool | int | float | str | list[...] | dict[str, ...]. The accepted scalar set is exact-typed:byteswas part of the contract originally proposed in #723:but was intentionally excluded from the validator. The reason: the Python→Java
bytesconversion through Pemja is unverified end-to-end, and accepting an unsafe value would defeat the validator's purpose (turning an early, debuggable rejection back into a checkpoint-time JVM crash on restore). Excluding it was the safe default. This issue tracks closing that gap.Task
Verify whether
bytessurvives the full Pemja → Flink state → checkpoint → TaskManager-restart → restore path, the same real-job path used to reproduce #723 (not just direct Flink serializer behavior):bytesvalue into a native, checkpoint-stable JVM type (e.g.byte[]) on theFlinkMemoryObject.set()path, rather than aPyObject/PyJObjectwrapper.bytesto_CHECKPOINT_STABLE_SCALARS.test_memory_value_validation.pyto coverbytesaccept (incl. nested inlist/dict).bytes.bytesrejected, and document in the contract docs thatbytesmust be materialized before storing (e.g. base64-encode tostr), with the rationale.Notes
A true restore test can't run on the MiniCluster — in-place recovery does not recreate the JVM, so the Pemja conversion path isn't crossed. Verification needs the real restart path described in #723 (RocksDB + filesystem checkpointing + manual TaskManager kill/restart), or the e2e checkpoint-recovery harness tracked in #836.
Are you willing to submit a PR?