Skip to content

fix: delete stale error.pb before uploading outputs on JobSet restart#1204

Open
AdilFayyaz wants to merge 2 commits into
mainfrom
adil/jobsets-restart-fix
Open

fix: delete stale error.pb before uploading outputs on JobSet restart#1204
AdilFayyaz wants to merge 2 commits into
mainfrom
adil/jobsets-restart-fix

Conversation

@AdilFayyaz

Copy link
Copy Markdown
Collaborator

Motivation

When a clustered (JobSet) task fails, it writes an error.pb to the output path. If a later restart attempt succeeds, that stale error file is left behind and the action can be misreported as failed despite a successful run.

Summary

  • Add _clear_stale_clustered_error_if_needed() to upload_outputs: on restart attempts (JOBSET_RESTART_ATTEMPT > 0), delete a leftover error.pb from the output path before writing outputs.
  • Add _delete_path() helper that deletes via the underlying fsspec filesystem, handling both async (_rm_file/_rm) and sync (rm_file/rm) backends.
  • Delete is best-effort: missing files are debug-logged, other failures are downgraded to a warning so output upload still proceeds.
  • Attempt 0 is a no-op (no exists/filesystem calls), keeping the non-restart hot path unchanged.

Test Plan

  • New tests/flyte/clustered/test_runtime_io_cleanup.py covers:
  • restart attempt > 0 deletes the stale error then uploads outputs (correct call order),
  • attempt 0 skips the existence check / delete entirely,
  • a delete failure is soft-failed (warning logged, outputs still uploaded).
  • Run: uv run pytest tests/flyte/clustered/test_runtime_io_cleanup.py -v

Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com>
@AdilFayyaz AdilFayyaz self-assigned this Jun 12, 2026
Signed-off-by: M. Adil Fayyaz <62440954+AdilFayyaz@users.noreply.github.com>
@AdilFayyaz AdilFayyaz requested a review from pingsutw June 12, 2026 23:02
if hasattr(fs, "_rm_file"):
await fs._rm_file(path) # pylint: disable=W0212
return
if hasattr(fs, "_rm"):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need rm? from my claude:

  _delete_path should follow the same shape. I verified the methods exist on the backends you actually hit:
  - AsyncFileSystem._rm_file is defined on the base, and obstore's FsspecStore implements _rm_file directly (S3/GCS/ABFS).
  - sync LocalFileSystem has rm_file.

  So the whole thing collapses to:

  async def _delete_path(path: str) -> None:
      fs = storage.get_underlying_filesystem(path=path)
      if isinstance(fs, AsyncFileSystem):
          await fs._rm_file(path)  # pylint: disable=W0212
      else:
          fs.rm_file(path)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is cleaner I agree

Comment on lines +84 to +85
stale_error_uri = error_path(output_path)
if not await storage.exists(stale_error_uri):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need it? Don't we write output.pb and error.pb to different folder (<raw_output>/0, <raw_output>/1) for different attempts?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because a Jobset restart attempt is a k8s Jobsets restart within the same attempt. This is different from flyte attempts because they go to different output folders. For Jobsets, the crashed errors.pb and successful restarts error.pb go in the same folder.
But are you suggesting we write per restart outputs to a jobsets suffixed restart attempt subfolder instead of deleting? Will have to scope that out and would require plugin changes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use Jobsets restart, do we see the attempt (1/2/3/4) on UI

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we delete the stale error.pb, how do see the error on UI for the previous attempt

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope you wont see attempt 1/2/3/4 because this is a Jobset level restart not a Flyte one. And regarding the deletion, we only clear error.pb when the run is about to write to outputs.pb (on success - i.e. on recovery). If the run fails it goes through a different path are returns before deletion so the real error is seen on the UI.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @EngHabu @kumare3 do you have any suggestions? I'm not sure if we need to use jobset restart policy or not.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants