refactor(execution-service): consolidate init-error reporting into WorkflowExecutionService#5922
Conversation
…rkflowExecutionService Per apache#5921 (follow-up to apache#5781): make error reporting a single site owned by WorkflowExecutionService instead of the two-phase split in WorkflowService.initExecutionService. - WorkflowExecutionService registers its fatalErrors -> WorkflowErrorEvent diff handler as the FIRST construction action. Construction does no external work and cannot throw (workflowSettings assignment, WebsocketInput creation, handler registration); all throwing work is in executeWorkflow(), which runs after the execution is published, so its failures surface through this same handler. - WorkflowService.initExecutionService drops the pre-publish errorSubject fallback (reportFatalErrorsToSubscribers + the executionPublished gating); the catch is simply errorHandler(e). Removes the now-unused errorSubject field and its connect() subscription. - Remove WorkflowServiceSpec (it only tested the removed reportFatalErrorsToSubscribers); the behavior is exercised by the integration/e2e suites. Resolves apache#5921.
Automated Reviewer SuggestionsBased on the
|
There was a problem hiding this comment.
Pull request overview
This PR refactors Amber’s workflow execution initialization error surfacing to use a single reporting path owned by WorkflowExecutionService (the execution metadata-store diff handler), removing the pre-publish websocket fallback previously implemented in WorkflowService.
Changes:
- Removed
WorkflowService’s pre-publisherrorSubjectfallback and simplified init-time exception handling to rely on the metadata-store diff handler. - Reordered
WorkflowExecutionServiceconstruction to register the fatal-error/state diff handler before other initialization steps. - Deleted
WorkflowServiceSpecwhich only covered the removed fallback helper.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| amber/src/main/scala/org/apache/texera/web/service/WorkflowService.scala | Removes errorSubject + pre-publish fallback logic; relies on metadata-store diff handler for fatal error surfacing. |
| amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala | Registers metadata-store diff handler as the first constructor action to ensure fatal error/state events always have an emitter. |
| amber/src/test/scala/org/apache/texera/web/service/WorkflowServiceSpec.scala | Removes unit tests tied to the deleted fallback method. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
| config | throughput | MB/s | latency | max Δ latest / 7d | |
|---|---|---|---|---|---|
| 🔴 | bs=10 sw=10 sl=64 | 413 | 0.252 | 23,470/36,442/36,442 us | 🔴 +9.5% / ⚪ within ±5% |
| 🔴 | bs=100 sw=10 sl=64 | 815 | 0.497 | 121,650/155,397/155,397 us | 🔴 +14.2% / 🔴 +11.2% |
| ⚪ | bs=1000 sw=10 sl=64 | 975 | 0.595 | 1,022,476/1,064,671/1,064,671 us | ⚪ within ±5% / 🔴 -6.4% |
Baseline details
Latest main 1c580e5 from same runner
| config | metric | PR | latest main | 7d avg | Δ latest | Δ 7d |
|---|---|---|---|---|---|---|
| bs=10 sw=10 sl=64 | throughput | 413 tuples/sec | 445 tuples/sec | 410.82 tuples/sec | -7.2% | +0.5% |
| bs=10 sw=10 sl=64 | MB/s | 0.252 MB/s | 0.271 MB/s | 0.251 MB/s | -7.0% | +0.5% |
| bs=10 sw=10 sl=64 | p50 | 23,470 us | 21,607 us | 23,785 us | +8.6% | -1.3% |
| bs=10 sw=10 sl=64 | p95 | 36,442 us | 33,274 us | 34,980 us | +9.5% | +4.2% |
| bs=10 sw=10 sl=64 | p99 | 36,442 us | 33,274 us | 34,980 us | +9.5% | +4.2% |
| bs=100 sw=10 sl=64 | throughput | 815 tuples/sec | 874 tuples/sec | 891.94 tuples/sec | -6.8% | -8.6% |
| bs=100 sw=10 sl=64 | MB/s | 0.497 MB/s | 0.534 MB/s | 0.544 MB/s | -6.9% | -8.7% |
| bs=100 sw=10 sl=64 | p50 | 121,650 us | 112,128 us | 112,277 us | +8.5% | +8.3% |
| bs=100 sw=10 sl=64 | p95 | 155,397 us | 136,062 us | 139,802 us | +14.2% | +11.2% |
| bs=100 sw=10 sl=64 | p99 | 155,397 us | 136,062 us | 139,802 us | +14.2% | +11.2% |
| bs=1000 sw=10 sl=64 | throughput | 975 tuples/sec | 945 tuples/sec | 1,041 tuples/sec | +3.2% | -6.3% |
| bs=1000 sw=10 sl=64 | MB/s | 0.595 MB/s | 0.577 MB/s | 0.635 MB/s | +3.1% | -6.4% |
| bs=1000 sw=10 sl=64 | p50 | 1,022,476 us | 1,052,923 us | 972,714 us | -2.9% | +5.1% |
| bs=1000 sw=10 sl=64 | p95 | 1,064,671 us | 1,099,197 us | 1,023,057 us | -3.1% | +4.1% |
| bs=1000 sw=10 sl=64 | p99 | 1,064,671 us | 1,099,197 us | 1,023,057 us | -3.1% | +4.1% |
Raw CSV
config_idx,batch_size,schema_width,string_len,num_batches,total_ms,total_tuples,total_bytes,tuples_per_sec,mb_per_sec,lat_p50_us,lat_p95_us,lat_p99_us
0,10,10,64,20,484.53,200,128000,413,0.252,23469.58,36441.60,36441.60
1,100,10,64,20,2454.67,2000,1280000,815,0.497,121650.01,155397.26,155397.26
2,1000,10,64,20,20522.36,20000,12800000,975,0.595,1022475.66,1064670.56,1064670.56
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5922 +/- ##
============================================
- Coverage 54.60% 54.58% -0.02%
+ Complexity 2927 2920 -7
============================================
Files 1109 1109
Lines 42828 42821 -7
Branches 4608 4607 -1
============================================
- Hits 23385 23373 -12
- Misses 18081 18083 +2
- Partials 1362 1365 +3
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
Need test coverage ;) |
Address review (Copilot + @Yicong-Huang): deleting WorkflowServiceSpec left the single-reporting-site invariant untested. Add WorkflowExecutionServiceSpec: - a recorded fatal error surfaces as a WorkflowErrorEvent through the metadata-store diff handler registered at construction (the regression guard for init-error surfacing); - a state change emits a WorkflowStateEvent (handler's other branch). The construction-unused controllerConfig/resultService are passed as null on purpose, so a future change that does external work during construction (which would reopen the pre-publish gap) fails this test.
|
Added test coverage in |
What changes were proposed in this PR?
Consolidates execution init-error reporting into a single site owned by
WorkflowExecutionService, replacing the two-phase split #5781 introduced (per @Yicong-Huang's suggestion, tracked in #5921).WorkflowExecutionServicenow registers itsfatalErrors → WorkflowErrorEvent(and state) diff handler as the first construction action. Construction itself does no external work and cannot throw — it only assignsworkflowSettings, creates aWebsocketInput(aPublishSubject), and registers the handler. All throwing work lives inexecuteWorkflow(), which runs afterexecutionService.onNext(...)publishes the execution, so any failure there is recorded byerrorHandlerinto the metadata store and surfaced by the handler thatconnectToExecutionforwards.WorkflowService.initExecutionServicedrops the pre-publisherrorSubjectfallback: theexecutionPublishedgating andreportFatalErrorsToSubscribersare gone, and the catch is simplyerrorHandler(e). The now-unusederrorSubjectfield and itsconnect()subscription are removed.WorkflowServiceSpecis removed — it only tested the deletedreportFatalErrorsToSubscribers; the surfacing behavior is exercised by the integration/e2e suites.Net: a single reporting path (the metadata-store diff handler), with no change to the init-error surfacing #5781 added — construction is provably side-effect-free, so the pre-publish window no longer has a failure mode.
Any related issues, documentation, discussions?
Resolves #5921 — the follow-up refactor agreed during the #5781 review.
How was this PR tested?
scalafmtCheckAllclean. This is a behavior-preserving refactor of init-error reporting: the single reporting path is the metadata-store diff handler that already surfaced post-publish errors, and construction is side-effect-free so no error can escape it. The full compile, scalafix, and the integration/e2e suites that exercise init-error surfacing run in CI.Was this PR authored or co-authored using generative AI tooling?
Co-authored with Claude Opus 4.8 in compliance with ASF.