feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900
feat(amber): add loop-bookkeeping columns to materialized State (dormant)#5900aglinxinyuan wants to merge 3 commits into
Conversation
…ant) Extend the cross-region State materialization format from 1 column (content) to 4 columns: content + loop_counter + loop_start_id + loop_start_state_uri. The loop bookkeeping is promoted to first-class columns (never inside the content JSON), and the transport carries them: OutputManager state writer + emit, the Python network sender/receiver, the materialization reader, and the Scala state.toTuple call sites. Dormant on main: to_tuple()/toTuple() and OutputManager.save_state_to_storage_if_needed / emit_state default the loop columns to 0/"", so every existing non-loop caller is unchanged, and fromTuple/from_tuple read only the content column. The columns activate only once the loop operators set them (follow-up PR). State materialization is intra-execution (execution-scoped iceberg URI, recreated fresh each run), so no backward-compatible read of old 1-column data is needed. Extracted from apache#5700 (loop operators); part of apache#4442.
Automated Reviewer SuggestionsBased on the
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5900 +/- ##
=========================================
Coverage 54.11% 54.11%
+ Complexity 2819 2818 -1
=========================================
Files 1103 1103
Lines 42650 42659 +9
Branches 4588 4588
=========================================
+ Hits 23079 23086 +7
- Misses 18226 18228 +2
Partials 1345 1345
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 378 | 0.231 | 19,839/143,094/143,094 us | 🔴 +415.1% / 🔴 +309.1% |
| 🟢 | bs=100 sw=10 sl=64 | 1,280 | 0.781 | 74,666/100,106/100,106 us | 🟢 -16.6% / 🟢 +43.5% |
| ⚪ | bs=1000 sw=10 sl=64 | 1,435 | 0.876 | 693,306/728,996/728,996 us | ⚪ within ±5% / 🟢 +37.9% |
Baseline details
Latest main 8803d08 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 378 tuples/sec | 555 tuples/sec | 410.82 tuples/sec | -31.9% | -8.0% |
| bs=10 sw=10 sl=64 | MB/s | 0.231 MB/s | 0.338 MB/s | 0.251 MB/s | -31.7% | -7.9% |
| bs=10 sw=10 sl=64 | p50 | 19,839 us | 16,765 us | 23,785 us | +18.3% | -16.6% |
| bs=10 sw=10 sl=64 | p95 | 143,094 us | 27,781 us | 34,980 us | +415.1% | +309.1% |
| bs=10 sw=10 sl=64 | p99 | 143,094 us | 27,781 us | 34,980 us | +415.1% | +309.1% |
| bs=100 sw=10 sl=64 | throughput | 1,280 tuples/sec | 1,204 tuples/sec | 891.94 tuples/sec | +6.3% | +43.5% |
| bs=100 sw=10 sl=64 | MB/s | 0.781 MB/s | 0.735 MB/s | 0.544 MB/s | +6.3% | +43.5% |
| bs=100 sw=10 sl=64 | p50 | 74,666 us | 79,603 us | 112,277 us | -6.2% | -33.5% |
| bs=100 sw=10 sl=64 | p95 | 100,106 us | 120,039 us | 139,802 us | -16.6% | -28.4% |
| bs=100 sw=10 sl=64 | p99 | 100,106 us | 120,039 us | 139,802 us | -16.6% | -28.4% |
| bs=1000 sw=10 sl=64 | throughput | 1,435 tuples/sec | 1,427 tuples/sec | 1,041 tuples/sec | +0.6% | +37.8% |
| bs=1000 sw=10 sl=64 | MB/s | 0.876 MB/s | 0.871 MB/s | 0.635 MB/s | +0.6% | +37.9% |
| bs=1000 sw=10 sl=64 | p50 | 693,306 us | 702,360 us | 972,714 us | -1.3% | -28.7% |
| bs=1000 sw=10 sl=64 | p95 | 728,996 us | 746,245 us | 1,023,057 us | -2.3% | -28.7% |
| bs=1000 sw=10 sl=64 | p99 | 728,996 us | 746,245 us | 1,023,057 us | -2.3% | -28.7% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,528.69,200,128000,378,0.231,19838.76,143093.65,143093.65
1,100,10,64,20,1562.50,2000,1280000,1280,0.781,74665.58,100106.23,100106.23
2,1000,10,64,20,13933.75,20000,12800000,1435,0.876,693305.53,728995.60,728995.60There was a problem hiding this comment.
Pull request overview
This PR updates Amber’s cross-region State materialization wire format from a single content column to a 4-column schema (content, loop_counter, loop_start_id, loop_start_state_uri) so loop bookkeeping is carried as first-class columns rather than embedded in the user state JSON. It touches both Scala and Python runtimes plus tests to keep the change dormant for non-loop workflows via default values.
Changes:
- Extend Scala/Python
Stateschemas andtoTuple/to_tuplewriters to emit the 4-column state tuple with defaults for non-loop callers. - Update Python materialization reader and Python network sender/receiver to read/write the new loop bookkeeping columns on
StateFrame. - Add/adjust Scala + Python tests to pin Arrow vector round-trips and state materialization behavior.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala | Expands State.schema to 4 columns and updates toTuple to write loop bookkeeping columns with defaults. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/core/state/StateSpec.scala | Updates state tuple tests to align with the new toTuple() signature and schema. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/util/ArrowUtilsSpec.scala | Adds a data-level Arrow encode/decode round-trip test to ensure multi-column State tuples survive Arrow vector conversion. |
| common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala | Updates call sites to toTuple() in state-document tests (but some test semantics still use reserved loop keys in JSON). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala | Updates Scala state persistence path to call state.toTuple(). |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonProxyClient.scala | Updates Scala→Python Arrow send path to call state.toTuple(). |
| amber/src/main/python/core/models/state.py | Expands Python State schema and adds loop-bookkeeping parameters to to_tuple(...). |
| amber/src/main/python/core/models/payload.py | Extends Python StateFrame envelope with loop bookkeeping fields (defaulting to no-loop values). |
| amber/src/main/python/core/architecture/packaging/output_manager.py | Threads loop bookkeeping through save_state_to_storage_if_needed(...) and emit_state(...) and refactors writer setup/close logic. |
| amber/src/main/python/core/runnables/network_sender.py | Serializes StateFrame as a 4-column Arrow table (content + loop bookkeeping columns). |
| amber/src/main/python/core/runnables/network_receiver.py | Deserializes StateFrame from a 4-column Arrow table and populates loop bookkeeping fields. |
| amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py | Reads loop bookkeeping columns from the state table rows and carries them on emitted StateFrames. |
| amber/src/test/python/core/models/test_state.py | Updates tests to assert the expanded schema and that loop bookkeeping does not leak into the content JSON. |
| amber/src/test/python/core/runnables/test_network_receiver.py | Updates unit test to verify non-zero loop_counter survives Python sender→receiver serialization. |
| amber/src/test/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py | Updates reader test to assert loop bookkeeping values are carried on emitted StateFrames. |
| amber/src/test/python/core/architecture/packaging/test_output_manager.py | Updates tests for new signature and adds a dormancy test ensuring omitted loop args default to 0/empty strings. |
| amber/src/test/python/core/architecture/packaging/test_state_materialization_e2e.py | Updates e2e state materialization test to use sqlite-backed catalog and assert loop_counter column round-trip. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address review (Copilot): the materialization round-trip tests used
"loop_counter" as a user-state key (landing in the content JSON), which is
misleading now that loop bookkeeping is a dedicated column. Rename the user key
to "i" and write/assert the loop columns via toTuple(loopCounter = ...) +
row.getField("loop_counter"/"loop_start_id"/"loop_start_state_uri"), matching
how StateSpec/test_state were updated.
What changes were proposed in this PR?
Extends the cross-region State materialization format from a single
contentcolumn to 4 columns —content,loop_counter,loop_start_id,loop_start_state_uri— promoting loop bookkeeping to first-class columns (never inside the content JSON). The transport carries them end to end: theOutputManagerstate writer +emit_state, the Python network sender/receiver, the materialization reader, and the Scalastate.toTuplecall sites.Dormant on
main— nothing observable changes without the loop operators:to_tuple()/toTuple()andOutputManager.save_state_to_storage_if_needed/emit_statedefault the three loop columns to0/"", so every existing non-loop caller is unchanged.from_tuple/fromTupleread only thecontentcolumn, so round-trip identity is preserved and the extra columns are inert.No backward-compatible read of old 1-column State is needed: State materialization is intra-execution only — the iceberg state-document URI is execution-scoped (
…/eid/{executionId}/) and recreated fresh each run, and State tuples are never replayed across executions or engine versions, so a 1-column tuple can never reach the 4-column reader.This is the state-format prerequisite the loop operators build on; the columns carry non-default values only once Loop Start/End set them (follow-up PR).
Any related issues, documentation, discussions?
Extracted from #5700 (loop operators) per @Xiao-zhen-Liu's split request; part of #4442 ("Introduce for loop").
How was this PR tested?
test_state.py(loop_counter is its own column, never in content JSON, defaults to 0), ScalaStateSpec,ArrowUtilsSpec(4-column Arrow vector round-trip),IcebergDocumentSpec(iceberg state-doc round-trip).test_network_receiver.py,test_input_port_materialization_reader_runnable.py, andtest_state_materialization_e2e.py(hermetic sqlite catalog).test_output_manager.py::test_defaults_loop_columns_when_omittedpins that a no-loop caller (noloop_counter) still produces a valid 4-column tuple with the loop columns at0/"".workflow-core+ambercompile;StateSpec+ArrowUtilsSpecpass; 26 Python state tests pass; scalafmt + scalafix + black clean. (IcebergDocumentSpecneeds the iceberg catalog backend, so it runs in CI.)Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.