examples(volumes): typed RO/RW lifecycle + enable_fuse_mount + reusable env overrides#1113
Open
EngHabu wants to merge 34 commits into
Open
examples(volumes): typed RO/RW lifecycle + enable_fuse_mount + reusable env overrides#1113EngHabu wants to merge 34 commits into
EngHabu wants to merge 34 commits into
Conversation
74b6abe to
5500a78
Compare
24dafdb to
a129418
Compare
5500a78 to
ad58998
Compare
a129418 to
9f6feca
Compare
ad58998 to
0382fea
Compare
9f6feca to
16d095f
Compare
46408e0 to
cd03283
Compare
0cb31c2 to
dbcc828
Compare
aebf6c1 to
1909013
Compare
kumare3
reviewed
May 28, 2026
| # dependencies = [ | ||
| # "kubernetes", | ||
| # "flyte", | ||
| # "flyteplugins-union", |
Contributor
There was a problem hiding this comment.
we should probably put kubernetes as a dependency of this, its weird to add kuberenetes otherwise or we should raise an error when you import volume to add k8s
Contributor
Author
There was a problem hiding this comment.
Removed... This was needed when I had the pod template in here... it's already in flyteplugins-union
kumare3
reviewed
May 28, 2026
| # forks Just Work with no extra image deps. | ||
| vol = Volume.new(name=volume_name) | ||
| logger.info("init_volume: bucket resolved to %s", vol.bucket) | ||
| await vol.mount() |
Contributor
There was a problem hiding this comment.
can we pass the path where we want to mount to this?
Contributor
Author
There was a problem hiding this comment.
it takes mount_path... added to the example
kumare3
reviewed
May 28, 2026
| # finalize() drains writeback, unmounts, and publishes an immutable | ||
| # ROVolume. (Returning the mounted RWVolume directly would do the same | ||
| # via the type transformer.) | ||
| sealed = await vol.finalize(message="initial write") |
Contributor
There was a problem hiding this comment.
how about volume.commit? or persist?
Set a contextvar with the output slot name (e.g. "o0") around each TypeEngine.to_literal call in convert_from_native_to_outputs, and add current_output_name() to read it. Lets a TypeTransformer attribute a returned value to the output it's being serialized as, without changing to_literal's signature. None on the input path and outside conversion. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
dbcc828 to
e3e18ea
Compare
1909013 to
b8bf6fa
Compare
Cover the three cases Ketan asked about: None outside conversion, correct slot name set during each output's to_literal call, and guaranteed cleanup on transformer error. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@union.ai>
b8bf6fa to
fe926ea
Compare
Signed-off-by: Haytham Abuelfutuh <haytham@union.ai>
fe926ea to
8238fa7
Compare
Set a contextvar with the output slot name (e.g. "o0") around each TypeEngine.to_literal call in convert_from_native_to_outputs, and add current_output_name() to read it. Lets a TypeTransformer attribute a returned value to the output it's being serialized as, without changing to_literal's signature. None on the input path and outside conversion. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
Cover the three cases Ketan asked about: None outside conversion, correct slot name set during each output's to_literal call, and guaranteed cleanup on transformer error. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@union.ai>
Signed-off-by: Haytham Abuelfutuh <haytham@union.ai>
Volume / volume_image / volume_pod_template moved from flyte.extras to the flyteplugins-union package. Update the three examples to import from flyteplugins.union.io.volume and bake the local plugin wheel via volume_image(base, install_local=True) instead of with_local_v2(). uv sources point at a sibling checkout at ../../../../unionai/ flyteplugins-union for editable iteration; run `make dist` in that repo before running these examples. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
Three changes to the Volume bench: - Add 'badger' to the engine matrix (alongside redis/sqlite) for the existing workload×engine×writeback sweep. Scoped out of fork workloads for now; their counter-bump path needs a juicefs-dump key-casing fix on the flyteplugins-union side. - New dataset_sweep=True path that fans out (engine, n) cells across small_files (n up to 100k @ 256 B per file) and big_files (n up to 10k @ 64 KiB per file) and renders a 'Dataset sweep' tab with two log-log line plots — one curve per engine. - Rename metadata_engine -> metadata_store_type to match the flyteplugins-union Volume.empty() signature; the old kwarg name no longer existed and every cell was failing to spawn. Verified end-to-end: https://demo.hosted.unionai.cloud/v2/domain/development/project/flytesnacks/runs/umpk9hlrjt877wsnjmqw Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
Both run_cell and run_dataset_cell (and the self-managed _cold_fork workload) now return a Tuple[Volume, Dict[str, float]]. The driver unpacks (_vol, stats) into the rendered tables; the Volume travels into Flyte's per-task output panel so you can inspect bucket / index path / parent / used_bytes / inode_count / metadata_store_type for each cell directly in the UI without re-deriving them. Verified: https://demo.hosted.unionai.cloud/v2/domain/development/project/flytesnacks/runs/utcdndfc6hs2nlzmblpq Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
…ection
Mounting FUSE inside a task container needs three things on the pod:
- the /dev/fuse character device mounted in via hostPath
- the primary container marked privileged with CAP_SYS_ADMIN
- volume_mount the device into the primary container
Previously, plugins shipping FUSE-backed types (flyteplugins-union's
Volume) carried a `volume_pod_template()` helper that built this by
hand, forcing every consumer to wire it explicitly:
env = flyte.TaskEnvironment(name=..., pod_template=volume_pod_template(), ...)
That's both repetitive and conflicts with user-supplied pod_templates
(can't merge two PodTemplates cleanly). Move the capability into the
SDK as a single declarative flag:
env = flyte.TaskEnvironment(name=..., enable_fuse_mount=True, ...)
Implementation:
* `enable_fuse_mount: bool = False` on `Environment` (base class), so
both `TaskEnvironment` and `AppEnvironment` inherit it.
* Propagated to `TaskTemplate` via `TaskEnvironment.task()`.
* New `pod_template_with_fuse_mount(pt)` helper in `flyte._pod` that
takes any user-supplied PodTemplate (including None) and returns a
copy with the FUSE bits merged in. Idempotent on repeated apply;
preserves existing volumes, sidecars, and security_context fields.
* Injected at serialization time in `task_serde.py` and `app_serde.py`
via `_resolve_pod_template(task)` — string/named PodTemplates emit a
warning (server-side resolved, can't merge) and fall through.
Examples ported: bench.py, volume_example.py, volume_cold_fork.py now
use `flyte.TaskEnvironment(enable_fuse_mount=True, image=with_volume_deps(...))`
instead of the explicit volume_pod_template plumbing.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
Mirror the simplification already applied to volume_example.py across
bench.py and volume_cold_fork.py:
- Drop the per-script IDL2 git+url, `kubernetes` pip, and `git` apt
layers — flyteplugins-union's with_volume_deps now ships everything
the volume runtime needs (juicefs binary + fuse userspace), and
with_local_v2() on the base pulls in the right flyteidl2 transitively.
- Custom-base path is now:
base = Image.from_debian_base(...).with_local_v2()
image = with_volume_deps(base, install_local=True)
- TaskEnvironment uses enable_fuse_mount=True instead of an explicit
volume_pod_template() PodTemplate.
- Pin the script-header flyte source to ../.. (editable) on all three
scripts so uv resolves enable_fuse_mount, which only exists on local
flyte-sdk source — published flyte 2.3.2 doesn't have the field yet.
Verified:
bench.py -> u4gvpmnsxgbtqxnwjhxf
volume_example.py -> uxf7c7rdr5ws9lwl82p6
volume_cold_fork.py -> ubrvg6zphj7mvm7xqbxz
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
Update the Volume examples for the new flyteplugins-union API: - Volume.empty(...) -> Volume.new(...) (returns a writable RWVolume). - Terminal `await vol.commit()` -> `await vol.finalize()` (drain writeback, unmount, publish an immutable ROVolume). - Tasks now exchange immutable ROVolume values; to mutate, a task forks one into an RWVolume, writes, and finalize()s back. volume_example threads commit messages through the lineage. - bench cells return ROVolume; cold-fork forks the sealed ROVolume. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
flyteplugins-union removed the image-builder constants + with_volume_deps() (a plain image + the pip wheel is all sqlite/badger need) and reverted the default store to sqlite (forkable). Update the examples: - bake the local dev wheel via with_local_flyteplugins_union instead of with_volume_deps(install_local=True) / with_high_throughput_volume_deps(install_local=True) - drop the now-redundant metadata_store_type="sqlite" pins (sqlite is default) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
… vs commit - Remove `kubernetes` from script headers: it is a transitive dep of flyteplugins-union, not something users should list explicitly. - Pass mount_path explicitly in init_volume so users see it is configurable (default is /workspace). - Add comment distinguishing finalize() (unmounts, returns ROVolume) from commit() (flushes but keeps the volume mounted). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@union.ai>
The flyteplugins-union Volume PR flattened its public API from flyteplugins.union.io.volume to flyteplugins.union.io. Update the import in bench.py, volume_cold_fork.py, and volume_example.py (plus a stale docstring reference) accordingly. Add volume_checkpoint.py demonstrating the long-running-task pattern: a single task holds a mounted RWVolume across a multi-epoch loop with mount(checkpoint_interval_seconds=..., on_checkpoint=report_checkpoint_trace), covering both background crash recovery and per-checkpoint lineage traces. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
Replace the (removed) built-in periodic checkpointing with the idiomatic pattern: a long-running train task mounts a writable volume once and records one @flyte.trace span per epoch, each committing a durable ROVolume. On a retry, trace memoization replays completed epochs and the first live epoch forks the last checkpoint into a writable copy to resume — exercised by a simulated crash (CRASH_AT_EPOCH, retries=2). Key design points captured in the file: mount once / commit many; no Volume crosses the trace boundary (the writable handle is closure-captured and only the stable epoch int is the memo key — a threaded Volume would either be auto-finalized on input hashing or re-hash unstably across the record/restore boundary); resume via fork() (the blessed RO->RW path). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
The example's focus is crash recovery / resume-from-checkpoint, so the new name reads truer to intent. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
The Badger metadata store was dropped from flyteplugins-union, so the bench sweep failed instantiating volumes with metadata_store_type="badger". Drop "badger" from ENGINES / DATASET_ENGINES and the engine color map, and delete the _FORK_WORKLOADS / _FORK_ENGINES machinery whose only purpose was to keep fork workloads off Badger — fork now runs on every supported engine (redis, sqlite). Also fixes a stale "sqlite/badger" comment in volume_example.py. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
0e014a2 to
13b0198
Compare
53ae2a3 to
cc9b267
Compare
Set a contextvar with the output slot name (e.g. "o0") around each TypeEngine.to_literal call in convert_from_native_to_outputs, and add current_output_name() to read it. Lets a TypeTransformer attribute a returned value to the output it's being serialized as, without changing to_literal's signature. None on the input path and outside conversion. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@union.ai>
Cover the three cases Ketan asked about: None outside conversion, correct slot name set during each output's to_literal call, and guaranteed cleanup on transformer error. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@union.ai>
Signed-off-by: Haytham Abuelfutuh <haytham@union.ai>
cc9b267 to
d851ff5
Compare
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
…into haytham/output-name-in-context
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
98e4bcc to
d606b01
Compare
…ples
The Volume examples previously baked a locally-built flyteplugins-union
wheel (with_local_flyteplugins_union + `make dist-bundled`). 0.4.0 is now
on PyPI with juicefs bundled in the platform wheels, so the images just
pip-install it.
- PEP 723 headers: pin flyteplugins-union>=0.4.0, drop the local
[tool.uv.sources] path for the plugin
- Images: replace local-wheel baking with
.with_pip_packages("flyteplugins-union>=0.4.0"); pip layer goes BEFORE
.with_local_v2() so the local-SDK wheel (installed --no-deps
--reinstall) isn't clobbered by the plugin's flyte<2.5 dep re-resolving
flyte from PyPI
- Drop stale "not yet released to PyPI / make dist-bundled" docstring notes
- volume_example.py: remove leftover AZURE-creds debug logging
- pyproject.toml: opt flyteplugins_union out of the 5-day exclude-newer
recency cutoff (same pattern as flyteidl2)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Top of the stack — the flyte-sdk support + examples for the
flyteplugins-unionVolume type.SDK
enable_fuse_mountflag onEnvironment/TaskEnvironment: at serialize time, injectsCAP_SYS_ADMIN+ a/dev/fusehost device into the pod spec, so in-process FUSE mounts work without a hand-written PodTemplate (_environment.py,_pod.py,_internal/runtime/task_serde.py,app/_runtime/app_serde.py).env_varsoverrides on reusable tasks (_task.py,tests/user_api/test_override.py).Examples (
examples/volumes/)ROVolume/RWVolumelifecycle across a multi-task lineage (init → append → branch → read_all), a cold-fork demo, and a metadata-store benchmark (sqlite / badger / redis).flyteplugins-unionwheel (viawith_local_flyteplugins_union) is all that's needed; the defaultsqlitestore is forkable, so nometadata_store_type=pin.Validated end-to-end on demo: full
init → append → branch → append → read_alllineage SUCCEEDED.Stack (bottom → top):
haytham/fix-reusable-secret-override→main(feat(_task): allow secret and env_vars overrides on reusable tasks #1108)haytham/output-name-in-context(feat(convert): expose current output name to type transformers #1112)haytham/volume-examples(examples(volumes): typed RO/RW lifecycle + enable_fuse_mount + reusable env overrides #1113, this)🤖 Generated with Claude Code