Skip to content

feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900

Open
aglinxinyuan wants to merge 3 commits into
apache:mainfrom
aglinxinyuan:state-format-loop-columns
Open

feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900
aglinxinyuan wants to merge 3 commits into
apache:mainfrom
aglinxinyuan:state-format-loop-columns

Conversation

@aglinxinyuan

Copy link
Copy Markdown
Contributor

What changes were proposed in this PR?

Extends the cross-region State materialization format from a single content column to 4 columnscontent, loop_counter, loop_start_id, loop_start_state_uri — promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: the OutputManager state writer + emit_state, the Python network sender/receiver, the materialization reader, and the Scala state.toTuple call sites.

Dormant on main — nothing observable changes without the loop operators:

  • to_tuple() / toTuple() and OutputManager.save_state_to_storage_if_needed / emit_state default the three loop columns to 0 / "", so every existing non-loop caller is unchanged.
  • from_tuple / fromTuple read only the content column, so round-trip identity is preserved and the extra columns are inert.

No backward-compatible read of old 1-column State is needed: State materialization is intra-execution only — the iceberg state-document URI is execution-scoped (…/eid/{executionId}/) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 4-column reader.

This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR).

Any related issues, documentation, discussions?

Extracted from #5700 (loop operators) per @Xiao-zhen-Liu's split request; part of #4442 ("Introduce for loop").

How was this PR tested?

  • Format / round-trip: test_state.py (loop_counter is its own column, never in content JSON, defaults to 0), Scala StateSpec, ArrowUtilsSpec (4-column Arrow vector round-trip), IcebergDocumentSpec (iceberg state-doc round-trip).
  • Transport: test_network_receiver.py, test_input_port_materialization_reader_runnable.py, and test_state_materialization_e2e.py (hermetic sqlite catalog).
  • Dormancy: new test_output_manager.py::test_defaults_loop_columns_when_omitted pins that a no-loop caller (no loop_counter) still produces a valid 4-column tuple with the loop columns at 0 / "".
  • Local: workflow-core + amber compile; StateSpec + ArrowUtilsSpec pass; 26 Python state tests pass; scalafmt + scalafix + black clean. (IcebergDocumentSpec needs the iceberg catalog backend, so it runs in CI.)

Was this PR authored or co-authored using generative AI tooling?

Co-authored with Claude Opus 4.8 in compliance with ASF.

…ant)

Extend the cross-region State materialization format from 1 column (content)
to 4 columns: content + loop_counter + loop_start_id + loop_start_state_uri.
The loop bookkeeping is promoted to first-class columns (never inside the
content JSON), and the transport carries them: OutputManager state writer +
emit, the Python network sender/receiver, the materialization reader, and the
Scala state.toTuple call sites.

Dormant on main: to_tuple()/toTuple() and OutputManager.save_state_to_storage_if_needed
/ emit_state default the loop columns to 0/"", so every existing non-loop caller
is unchanged, and fromTuple/from_tuple read only the content column. The columns
activate only once the loop operators set them (follow-up PR). State
materialization is intra-execution (execution-scoped iceberg URI, recreated
fresh each run), so no backward-compatible read of old 1-column data is needed.

Extracted from apache#5700 (loop operators); part of apache#4442.
Copilot AI review requested due to automatic review settings June 23, 2026 01:35
@github-actions

Copy link
Copy Markdown
Contributor

Automated Reviewer Suggestions

Based on the git blame history of the changed files, we recommend the following reviewers:

  • Contributors with relevant context: @Yicong-Huang, @Ma77Ball
    You can notify them by mentioning @Yicong-Huang, @Ma77Ball in a comment.

@codecov-commenter

codecov-commenter commented Jun 23, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 95.45455% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 54.11%. Comparing base (8803d08) to head (7d5ab80).

Files with missing lines Patch % Lines
...ne/architecture/messaginglayer/OutputManager.scala 0.00% 1 Missing ⚠️
.../architecture/pythonworker/PythonProxyClient.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##               main    #5900   +/-   ##
=========================================
  Coverage     54.11%   54.11%           
+ Complexity     2819     2818    -1     
=========================================
  Files          1103     1103           
  Lines         42650    42659    +9     
  Branches       4588     4588           
=========================================
+ Hits          23079    23086    +7     
- Misses        18226    18228    +2     
  Partials       1345     1345           
Flag Coverage Δ *Carryforward flag
access-control-service 70.44% <ø> (ø)
agent-service 34.36% <ø> (ø) Carriedforward from 439ea72
amber 55.69% <86.66%> (+0.04%) ⬆️
computing-unit-managing-service 1.65% <ø> (ø)
config-service 57.35% <ø> (ø)
file-service 58.59% <ø> (ø)
frontend 48.09% <ø> (-0.03%) ⬇️ Carriedforward from 439ea72
pyamber 91.09% <100.00%> (+0.89%) ⬆️
python 90.69% <ø> (-0.08%) ⬇️ Carriedforward from 439ea72
workflow-compiling-service 58.69% <ø> (ø)

*This pull request uses carry forward flags. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions

github-actions Bot commented Jun 23, 2026

Copy link
Copy Markdown
Contributor

⚠️ Benchmark changes need a look

🟢 5 better · 🔴 5 worse · ⚪ 5 noise (<±5%) · 0 without baseline

Compared against main 8803d08 benchmarked on this same runner, so the delta is largely free of cross-runner hardware noise. The "7d avg" column still reflects the gh-pages dashboard. Treat <±5% as noise unless repeated.

Dashboard · Run

config throughput MB/s latency max Δ latest / 7d
🔴 bs=10 sw=10 sl=64 378 0.231 19,839/143,094/143,094 us 🔴 +415.1% / 🔴 +309.1%
🟢 bs=100 sw=10 sl=64 1,280 0.781 74,666/100,106/100,106 us 🟢 -16.6% / 🟢 +43.5%
bs=1000 sw=10 sl=64 1,435 0.876 693,306/728,996/728,996 us ⚪ within ±5% / 🟢 +37.9%
Baseline details

Latest main 8803d08 from same runner

config metric PR latest main 7d avg Δ latest Δ 7d
bs=10 sw=10 sl=64 throughput 378 tuples/sec 555 tuples/sec 410.82 tuples/sec -31.9% -8.0%
bs=10 sw=10 sl=64 MB/s 0.231 MB/s 0.338 MB/s 0.251 MB/s -31.7% -7.9%
bs=10 sw=10 sl=64 p50 19,839 us 16,765 us 23,785 us +18.3% -16.6%
bs=10 sw=10 sl=64 p95 143,094 us 27,781 us 34,980 us +415.1% +309.1%
bs=10 sw=10 sl=64 p99 143,094 us 27,781 us 34,980 us +415.1% +309.1%
bs=100 sw=10 sl=64 throughput 1,280 tuples/sec 1,204 tuples/sec 891.94 tuples/sec +6.3% +43.5%
bs=100 sw=10 sl=64 MB/s 0.781 MB/s 0.735 MB/s 0.544 MB/s +6.3% +43.5%
bs=100 sw=10 sl=64 p50 74,666 us 79,603 us 112,277 us -6.2% -33.5%
bs=100 sw=10 sl=64 p95 100,106 us 120,039 us 139,802 us -16.6% -28.4%
bs=100 sw=10 sl=64 p99 100,106 us 120,039 us 139,802 us -16.6% -28.4%
bs=1000 sw=10 sl=64 throughput 1,435 tuples/sec 1,427 tuples/sec 1,041 tuples/sec +0.6% +37.8%
bs=1000 sw=10 sl=64 MB/s 0.876 MB/s 0.871 MB/s 0.635 MB/s +0.6% +37.9%
bs=1000 sw=10 sl=64 p50 693,306 us 702,360 us 972,714 us -1.3% -28.7%
bs=1000 sw=10 sl=64 p95 728,996 us 746,245 us 1,023,057 us -2.3% -28.7%
bs=1000 sw=10 sl=64 p99 728,996 us 746,245 us 1,023,057 us -2.3% -28.7%
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,528.69,200,128000,378,0.231,19838.76,143093.65,143093.65
1,100,10,64,20,1562.50,2000,1280000,1280,0.781,74665.58,100106.23,100106.23
2,1000,10,64,20,13933.75,20000,12800000,1435,0.876,693305.53,728995.60,728995.60

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates Amber’s cross-region State materialization wire format from a single content column to a 4-column schema (content, loop_counter, loop_start_id, loop_start_state_uri) so loop bookkeeping is carried as first-class columns rather than embedded in the user state JSON. It touches both Scala and Python runtimes plus tests to keep the change dormant for non-loop workflows via default values.

Changes:

  • Extend Scala/Python State schemas and toTuple/to_tuple writers to emit the 4-column state tuple with defaults for non-loop callers.
  • Update Python materialization reader and Python network sender/receiver to read/write the new loop bookkeeping columns on StateFrame.
  • Add/adjust Scala + Python tests to pin Arrow vector round-trips and state materialization behavior.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala Expands State.schema to 4 columns and updates toTuple to write loop bookkeeping columns with defaults.
common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala Updates state tuple tests to align with the new toTuple() signature and schema.
common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala Adds a data-level Arrow encode/decode round-trip test to ensure multi-column State tuples survive Arrow vector conversion.
common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala Updates call sites to toTuple() in state-document tests (but some test semantics still use reserved loop keys in JSON).
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala Updates Scala state persistence path to call state.toTuple().
amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala Updates Scala→Python Arrow send path to call state.toTuple().
amber/src/main/python/core/models/state.py Expands Python State schema and adds loop-bookkeeping parameters to to_tuple(...).
amber/src/main/python/core/models/payload.py Extends Python StateFrame envelope with loop bookkeeping fields (defaulting to no-loop values).
amber/src/main/python/core/architecture/packaging/output_manager.py Threads loop bookkeeping through save_state_to_storage_if_needed(...) and emit_state(...) and refactors writer setup/close logic.
amber/src/main/python/core/runnables/network_sender.py Serializes StateFrame as a 4-column Arrow table (content + loop bookkeeping columns).
amber/src/main/python/core/runnables/network_receiver.py Deserializes StateFrame from a 4-column Arrow table and populates loop bookkeeping fields.
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py Reads loop bookkeeping columns from the state table rows and carries them on emitted StateFrames.
amber/src/test/python/core/models/test_state.py Updates tests to assert the expanded schema and that loop bookkeeping does not leak into the content JSON.
amber/src/test/python/core/runnables/test_network_receiver.py Updates unit test to verify non-zero loop_counter survives Python sender→receiver serialization.
amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py Updates reader test to assert loop bookkeeping values are carried on emitted StateFrames.
amber/src/test/python/core/architecture/packaging/test_output_manager.py Updates tests for new signature and adds a dormancy test ensuring omitted loop args default to 0/empty strings.
amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py Updates e2e state materialization test to use sqlite-backed catalog and assert loop_counter column round-trip.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread amber/src/main/python/core/runnables/network_sender.py
Comment thread amber/src/main/python/core/runnables/network_receiver.py
Address review (Copilot): the materialization round-trip tests used
"loop_counter" as a user-state key (landing in the content JSON), which is
misleading now that loop bookkeeping is a dedicated column. Rename the user key
to "i" and write/assert the loop columns via toTuple(loopCounter = ...) +
row.getField("loop_counter"/"loop_start_id"/"loop_start_state_uri"), matching
how StateSpec/test_state were updated.
aglinxinyuan added a commit to aglinxinyuan/texera that referenced this pull request Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants