You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Three classes of problems with the current manual 5-step pipeline (make pipeline):
1. Version skew
Each --detach step does git clone -b <branch>, so if the branch moves between steps, artifacts are built from different code versions. The calibration_package_meta.json sidecar already tracks per-step provenance but nothing enforces consistency across steps.
--push-results uploads weights to HF between calibrate and stage steps; subsequent runs overwrite calibration/calibration_weights.npy and calibration/logs/* without versioning. The pipeline-artifacts volume avoids some round-trips but the pattern is inconsistent.
3. Manual orchestration burden
Operator must monitor Modal dashboard, wait for each step, then run the next with correct flags. Error-prone and adds hours of human latency.
Issue #600 introduces a version_manifest.json registry with VersionManifest entries mapping each semver to GCS generations and HF commit SHAs. This pipeline issue feeds into #600 in two ways:
Run provenance feeds the registry — VersionManifest should include a pipeline_run_id field linking back to the diagnostics stored under runs/{run_id}/ on HF. This connects "what version is deployed" (Add unified version registry for GCS and HF dataset versioning #600) to "how was it built and what were the calibration metrics" (this issue).
Potential improvement to #600: The VersionManifest type should include an optional pipeline_run_id: str field and diagnostics_path: str field so consumers can trace any published version back to its full calibration diagnostics.
A lightweight CPU function in modal_app/pipeline.py that spawns existing Modal functions (build_datasets, build_package, fit_weights, coordinate_publish) sequentially, passing each the same pinned git SHA and run-scoped volume paths. Matches the pattern already used by coordinate_publish in local_area.py.
Production paths (states/AL.h5, calibration/calibration_weights.npy) only written during promote. The promote step registers the version in #600's version_manifest.json rather than maintaining a separate latest.json.
Promote remains a separate manual step
Preserves staging-before-promoting rationale. Operator can inspect diagnostics under runs/{run_id}/diagnostics/ before promoting.
Pipeline Flow
make pipeline-auto BRANCH=main GPU=A100-80GB EPOCHS=200 BUILD_DB=1
│
├─ 1. Coordinator starts (CPU, lightweight, 48h timeout)
│ - Clones at branch tip, records exact SHA
│ - Creates /pipeline/runs/{run_id}/ on volume
│ - Writes meta.json with status="running"
│
├─ 1b. (Optional, if BUILD_DB=1) Builds policy_data.db from scratch
│ - Runs the 10 ETL scripts (create_database_tables → validate_database)
│ - Fetches from Census/IRS/CDC APIs (cached after first run)
│ - Ensures DB matches pinned code version, not stale HF copy
│ - ~10-15 min with cached downloads
│
├─ 2. Spawns build_datasets(run_id=..., sha=..., build_database=...)
│ - Clones at pinned SHA (not branch)
│ - If build_database: uses freshly-built DB; else downloads from HF
│ - Writes dataset, db to runs/{run_id}/artifacts/
│ - Existing checkpointing still works within this step
│
├─ 3. Spawns build_package_remote(run_id=...)
│ - Reads dataset/db from runs/{run_id}/artifacts/
│ - Writes calibration_package.pkl to same dir
│
├─ 4. Spawns fit_weights (GPU) for state + national in parallel
│ - Reads package from runs/{run_id}/artifacts/
│ - Writes weights + diagnostics to run dir
│ - NO HF upload between steps
│
├─ 5. Spawns coordinate_publish(run_id=...)
│ - Reads weights/dataset/db from runs/{run_id}/artifacts/
│ - Writes 488 H5s to runs/{run_id}/h5s/
│ - Stages to HF staging/ paths
│
├─ 6. Uploads diagnostics to HF runs/{run_id}/diagnostics/
│ - Calibration logs, validation results, run config
│ - Append-only — previous runs untouched
│
└─ 7. Updates meta.json status="completed", prints run_id
Operator reviews diagnostics, then:
make pipeline-promote RUN_ID=1.57.0_a1b2c3d4_20260318_143022
│
├─ Promotes staging/ → production on HF
├─ Uploads to GCS
├─ Registers version in version_manifest.json (#600)
└─ Cleans up staging/
Function Reuse Analysis: What the Orchestrator Calls
The orchestrator needs to call 4 existing functions. Here's what each does, what's problematic for pipeline use, and the proposed approach.
Step 1: build_datasets() in data_build.py
What it does well: Parallel dependency-aware script execution, checkpointing for preemption resilience, copies source_imputed_*.h5 + policy_data.db to pipeline volume.
Embedded I/O issues:
L339: Runs download_private_prerequisites.py which downloads policy_data.dbfrom HF. In a pipeline context, we want the freshly-built DB to flow forward, not a stale HF copy. However, this download also fetches PUF CSVs from a different HF repo (policyengine/irs-soi-puf) which are genuine prerequisites.
L514-524: Hardcodes output path as flat artifacts/ on pipeline volume. Needs to write to runs/{run_id}/artifacts/ instead.
L534-543: upload flag triggers HF upload via upload_completed_datasets.py. The orchestrator should pass upload=False and handle uploads centrally.
L320: git clone -b branch — orchestrator needs this to clone at a pinned SHA instead.
Database build option: Currently policy_data.db is built locally via make database (10 sequential ETL scripts in policyengine_us_data/db/, ~10-15 min with cached downloads, fetches from Census/IRS/CDC APIs). For full encapsulation, the orchestrator should optionally run this step before dataset building. The ETL scripts have a raw input caching layer (utils/raw_cache.py) so API downloads only happen once. Adding --build-database flag to the orchestrator would run make database as Step 0, ensuring the DB matches the pinned code version rather than relying on a pre-promoted HF copy.
Proposed approach: The orchestrator calls build_datasets with upload=False and the existing code mostly works. The changes needed are:
Accept an optional git_sha parameter — if provided, clone at that SHA instead of branch tip
Accept an optional artifact_dir parameter — if provided, copy pipeline artifacts there instead of flat artifacts/
Accept an optional build_database parameter — if True, run the make database ETL scripts before dataset building (replacing the HF download of policy_data.db). When False (default), download from HF as today for backward compat.
These are additive parameters with backward-compatible defaults.
Step 2: build_package_remote() → _build_package_impl() in remote_calibration_runner.py
What it does well: Reads dataset/db from pipeline volume, builds X matrix, saves .pkl to volume.
Embedded I/O issues:
L321-323: Hardcodes artifact paths as /pipeline/artifacts/{file}. Needs to read from runs/{run_id}/artifacts/.
L330: Hardcodes package output to /pipeline/artifacts/calibration_package.pkl.
L365-366: Writes sidecar JSON + commits to volume — fine, just needs scoped path.
Proposed approach: Add optional artifact_dir parameter. When provided, read inputs and write outputs to that path. When absent, use flat artifacts/ (backward compat).
Step 3: fit_weights_*() → _fit_weights_impl() / _fit_from_package_impl() in remote_calibration_runner.py
What it does well: GPU calibration, returns weights + diagnostics as byte buffers.
Embedded I/O issues:
L161-163: Hardcodes artifact read paths (/pipeline/artifacts/...). Same as Step 2.
The GPU functions themselves are clean — they return byte dicts, don't touch HF.
But the main() local entrypoint (L805-1014) does a LOT of post-processing that blends concerns:
L964-983: Writes result bytes to local files
L988-1000: Pushes weights back to pipeline volume at flat artifacts/ path
L1002-1011: Optionally uploads to HF via upload_calibration_artifacts()
L1013-1014: Optionally triggers GitHub dispatch
Key insight: The orchestrator should NOT call main(). It should call the GPU functions directly (e.g., fit_from_package_a100_80.remote(...)) which return clean byte dicts. The orchestrator then writes those bytes to the run-scoped volume path itself. This avoids all the main() entrypoint clutter.
Proposed approach: No changes needed to the GPU functions themselves. The orchestrator calls them directly and handles volume writes. The main() entrypoint continues to work standalone for manual one-off calibration runs.
Step 4: coordinate_publish() in local_area.py
What it does well: Partitions 488 areas across workers, spawns parallel build_areas_worker functions, validates manifest, stages to HF.
L628: Validates artifacts against unified_run_config.json — good, should continue.
L718-731: Validates and uploads to HF staging — in pipeline mode, we may want to defer HF staging to the orchestrator so all H5s + diagnostics go up in one operation.
L596-599: Clears stale version directory on staging volume — fine, scoped by version.
Proposed approach: Add artifact_dir parameter for reading inputs. Add skip_upload flag (already exists!) to skip the HF staging step — the orchestrator can call with skip_upload=True, then handle HF staging centrally after collecting diagnostics.
Summary: Complexity Strategy
Function
Orchestrator calls
Changes needed
Standalone still works?
build_datasets
.remote(upload=False, ...)
Add git_sha, artifact_dir params
Yes (params optional)
build_package_remote
.remote(...)
Add artifact_dir param
Yes
fit_from_package_*
.remote(...) directly
None — returns bytes
Yes
coordinate_publish
.remote(skip_upload=True, ...)
Add artifact_dir param
Yes
The strategy is additive parameters with backward-compatible defaults — no function signatures break, no existing make targets change. The orchestrator is the only caller that passes the new params. The main() local entrypoints remain untouched for manual use.
The one thing the orchestrator does differently from the current main() entrypoints is owning the HF upload step. Currently, HF uploads are scattered: build_datasets has upload=True, remote_calibration_runner/main() has --push-results, coordinate_publish stages to HF. In pipeline mode, the orchestrator collects all artifacts on the volume first, then does a single coordinated upload at the end (diagnostics to runs/{run_id}/, H5s to staging/). This eliminates the mid-pipeline HF round-trips that cause overwrites.
Implementation Phases
Phase
Description
Files
1
Run ID utilities, RunMetadata dataclass, volume layout helpers
modal_app/pipeline.py (new)
2
Thread optional run_id/artifact_dir through existing functions; when absent, fall back to flat artifacts/ layout
Problem Statement
Three classes of problems with the current manual 5-step pipeline (
make pipeline):1. Version skew
Each
--detachstep doesgit clone -b <branch>, so if the branch moves between steps, artifacts are built from different code versions. Thecalibration_package_meta.jsonsidecar already tracks per-step provenance but nothing enforces consistency across steps.2. Artifact overwrites & unnecessary HF round-trips
--push-resultsuploads weights to HF between calibrate and stage steps; subsequent runs overwritecalibration/calibration_weights.npyandcalibration/logs/*without versioning. Thepipeline-artifactsvolume avoids some round-trips but the pattern is inconsistent.3. Manual orchestration burden
Operator must monitor Modal dashboard, wait for each step, then run the next with correct flags. Error-prone and adds hours of human latency.
Relationship to #600 (Version Registry)
Issue #600 introduces a
version_manifest.jsonregistry withVersionManifestentries mapping each semver to GCS generations and HF commit SHAs. This pipeline issue feeds into #600 in two ways:The promote step should register the version in Add unified version registry for GCS and HF dataset versioning #600's registry — when
pipeline-promoteruns, it writes a newVersionManifestentry with the GCS generations and HF commit from the promotion. This replaces thelatest.jsonconcept from an earlier draft; Add unified version registry for GCS and HF dataset versioning #600'scurrentpointer inversion_manifest.jsonserves that role.Run provenance feeds the registry —
VersionManifestshould include apipeline_run_idfield linking back to the diagnostics stored underruns/{run_id}/on HF. This connects "what version is deployed" (Add unified version registry for GCS and HF dataset versioning #600) to "how was it built and what were the calibration metrics" (this issue).Rollback-as-release (Add unified version registry for GCS and HF dataset versioning #600) works naturally — since each run's artifacts persist on the volume and HF under
runs/{run_id}/, a rollback can reference a previous run's artifacts without rebuilding.Potential improvement to #600: The
VersionManifesttype should include an optionalpipeline_run_id: strfield anddiagnostics_path: strfield so consumers can trace any published version back to its full calibration diagnostics.Proposed Architecture
Run ID
{version}_{git_sha_short}_{YYYYMMDD_HHMMSS}(e.g.,1.57.0_a1b2c3d4_20260318_143022)Coordinator pattern
A lightweight CPU function in
modal_app/pipeline.pythat spawns existing Modal functions (build_datasets,build_package,fit_weights,coordinate_publish) sequentially, passing each the same pinned git SHA and run-scoped volume paths. Matches the pattern already used bycoordinate_publishinlocal_area.py.Volume layout on
pipeline-artifactsBackward compat: flat
artifacts/symlinked to latest run's artifacts so existing entrypoints keep working.HF versioned paths (append-only, never overwritten)
Production paths (
states/AL.h5,calibration/calibration_weights.npy) only written during promote. The promote step registers the version in #600'sversion_manifest.jsonrather than maintaining a separatelatest.json.Promote remains a separate manual step
Preserves staging-before-promoting rationale. Operator can inspect diagnostics under
runs/{run_id}/diagnostics/before promoting.Pipeline Flow
Function Reuse Analysis: What the Orchestrator Calls
The orchestrator needs to call 4 existing functions. Here's what each does, what's problematic for pipeline use, and the proposed approach.
Step 1:
build_datasets()indata_build.pyWhat it does well: Parallel dependency-aware script execution, checkpointing for preemption resilience, copies
source_imputed_*.h5+policy_data.dbto pipeline volume.Embedded I/O issues:
download_private_prerequisites.pywhich downloadspolicy_data.dbfrom HF. In a pipeline context, we want the freshly-built DB to flow forward, not a stale HF copy. However, this download also fetches PUF CSVs from a different HF repo (policyengine/irs-soi-puf) which are genuine prerequisites.artifacts/on pipeline volume. Needs to write toruns/{run_id}/artifacts/instead.uploadflag triggers HF upload viaupload_completed_datasets.py. The orchestrator should passupload=Falseand handle uploads centrally.git clone -b branch— orchestrator needs this to clone at a pinned SHA instead.Database build option: Currently
policy_data.dbis built locally viamake database(10 sequential ETL scripts inpolicyengine_us_data/db/, ~10-15 min with cached downloads, fetches from Census/IRS/CDC APIs). For full encapsulation, the orchestrator should optionally run this step before dataset building. The ETL scripts have a raw input caching layer (utils/raw_cache.py) so API downloads only happen once. Adding--build-databaseflag to the orchestrator would runmake databaseas Step 0, ensuring the DB matches the pinned code version rather than relying on a pre-promoted HF copy.Proposed approach: The orchestrator calls
build_datasetswithupload=Falseand the existing code mostly works. The changes needed are:git_shaparameter — if provided, clone at that SHA instead of branch tipartifact_dirparameter — if provided, copy pipeline artifacts there instead of flatartifacts/build_databaseparameter — if True, run themake databaseETL scripts before dataset building (replacing the HF download ofpolicy_data.db). When False (default), download from HF as today for backward compat.These are additive parameters with backward-compatible defaults.
Step 2:
build_package_remote()→_build_package_impl()inremote_calibration_runner.pyWhat it does well: Reads dataset/db from pipeline volume, builds X matrix, saves
.pklto volume.Embedded I/O issues:
/pipeline/artifacts/{file}. Needs to read fromruns/{run_id}/artifacts/./pipeline/artifacts/calibration_package.pkl.Proposed approach: Add optional
artifact_dirparameter. When provided, read inputs and write outputs to that path. When absent, use flatartifacts/(backward compat).Step 3:
fit_weights_*()→_fit_weights_impl()/_fit_from_package_impl()inremote_calibration_runner.pyWhat it does well: GPU calibration, returns weights + diagnostics as byte buffers.
Embedded I/O issues:
/pipeline/artifacts/...). Same as Step 2.main()local entrypoint (L805-1014) does a LOT of post-processing that blends concerns:artifacts/pathupload_calibration_artifacts()Key insight: The orchestrator should NOT call
main(). It should call the GPU functions directly (e.g.,fit_from_package_a100_80.remote(...)) which return clean byte dicts. The orchestrator then writes those bytes to the run-scoped volume path itself. This avoids all themain()entrypoint clutter.Proposed approach: No changes needed to the GPU functions themselves. The orchestrator calls them directly and handles volume writes. The
main()entrypoint continues to work standalone for manual one-off calibration runs.Step 4:
coordinate_publish()inlocal_area.pyWhat it does well: Partitions 488 areas across workers, spawns parallel
build_areas_workerfunctions, validates manifest, stages to HF.Embedded I/O issues:
/pipeline/artifacts/...).unified_run_config.json— good, should continue.Proposed approach: Add
artifact_dirparameter for reading inputs. Addskip_uploadflag (already exists!) to skip the HF staging step — the orchestrator can call withskip_upload=True, then handle HF staging centrally after collecting diagnostics.Summary: Complexity Strategy
build_datasets.remote(upload=False, ...)git_sha,artifact_dirparamsbuild_package_remote.remote(...)artifact_dirparamfit_from_package_*.remote(...)directlycoordinate_publish.remote(skip_upload=True, ...)artifact_dirparamThe strategy is additive parameters with backward-compatible defaults — no function signatures break, no existing
maketargets change. The orchestrator is the only caller that passes the new params. Themain()local entrypoints remain untouched for manual use.The one thing the orchestrator does differently from the current
main()entrypoints is owning the HF upload step. Currently, HF uploads are scattered:build_datasetshasupload=True,remote_calibration_runner/main()has--push-results,coordinate_publishstages to HF. In pipeline mode, the orchestrator collects all artifacts on the volume first, then does a single coordinated upload at the end (diagnostics toruns/{run_id}/, H5s tostaging/). This eliminates the mid-pipeline HF round-trips that cause overwrites.Implementation Phases
RunMetadatadataclass, volume layout helpersmodal_app/pipeline.py(new)run_id/artifact_dirthrough existing functions; when absent, fall back to flatartifacts/layoutdata_build.py,remote_calibration_runner.py,local_area.pyrun_pipeline()— pins SHA, chains steps, collects diagnostics, handles failuresmodal_app/pipeline.pyupload_run_diagnostics(), promote writes to #600 registryhuggingface.py,data_upload.pypipeline-auto,pipeline-status,pipeline-promoteMakefileAcceptance Criteria
make pipeline-auto BRANCH=main GPU=T4 EPOCHS=200runs entire pipeline as single--detachruns/{run_id}/diagnostics/(append-only)make pipeline-promote RUN_ID=...promotes staging → production and registers in version_manifest.json (Add unified version registry for GCS and HF dataset versioning #600)make build-data-modal,make calibrate-modal,make stage-h5scontinue unchanged (backward compat)--clean-old-runsflag available)make pipeline-statusshows active and completed runs on the volume--build-databaseflag buildspolicy_data.dbfrom scratch within the pipeline (optional, default: download from HF)