fix: make module-level syncify wrappers cloudpickleable#1102
Open
EngHabu wants to merge 1 commit into
Open
Conversation
Contributor
Author
|
@kumare3 @wild-endeavor I think I need an expert review here... I don't know if any of the things it says in the PR descriptions are true or 100% hallucinated |
16bcc96 to
4f73ad6
Compare
7e6b538 to
eb9e1e5
Compare
`_SyncWrapper` transitively references a `_BackgroundLoop` whose asyncio loop owns a `ThreadPoolExecutor` backed by a `_queue.SimpleQueue`. When cloudpickle serializes a user function that captures a module-level syncify-wrapped helper (notably `flyte._trace._fetch_action_outputs`), the default reducer walks into the loop's state and dies with: `TypeError: cannot pickle '_queue.SimpleQueue' object`. Add a `__reduce__` that pickles a `_SyncWrapper` by reference (module + qualname) whenever the wrapper is importable at its original module path. The consumer just re-imports it on load, which is the only sane contract anyway — the bg loop in the producer is meaningless in the consumer process. Falls back to the default reducer for unnamed (closure-local) wrappers so behavior is unchanged outside the regression. Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
eb9e1e5 to
7ec617e
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
_SyncWrappertransitively references a_BackgroundLoopwhose asyncio loop owns aThreadPoolExecutorbacked by a_queue.SimpleQueue. When cloudpickle serializes a user function that captures a module-level syncify-wrapped helper — notablyflyte._trace._fetch_action_outputs— the default reducer walks into the loop's state and dies with:This shows up in user-facing
flyte.deploy(...)flows whenever the user has any@trace-decorated task —wrapper_sync's globals capture_fetch_action_outputs(a_SyncWrapper), and that captures_bg_loop.PR #1051 added a generic
try/exceptaround the deploy-time pickle that converts this into aClickExceptionand points the user atversion=..., but the underlying SDK object is unpicklable for legitimate reasons we control — the proper fix is to make it serialize.Fix
Add a
__reduce__to_SyncWrapperthat pickles by reference (module + qualname) whenever the wrapper is importable at its original module path. The consumer just re-imports it on load, which is the only sane contract anyway — the bg loop in the producer is meaningless in the consumer process.Falls back to the default reducer for unnamed (closure-local) wrappers so behavior outside the regression is unchanged.
Closes
TypeError: cannot pickle '_queue.SimpleQueue' objectfromcloudpickle.dumpduringflyte deployfixes FLYTE-SDK-38Test plan
test_module_level_sync_wrapper_is_cloudpickleable— direct round-trip offlyte._trace._fetch_action_outputs.test_module_level_sync_wrapper_pickleable_via_closure— round-trip of a function that closes over the wrapper.cloudpickle.dumps(TaskEnvironment(...))with a@trace-decorated task now succeeds (was raisingTypeError).tests/flyte/syncify/suite green (34 passed).make fmtclean.🤖 Generated with Claude Code