Conversation
Signed-off-by: Albert Callarisa <albert@diagrid.io>
There was a problem hiding this comment.
Pull request overview
This PR updates the durable workflow SDK to populate the newly added protobuf origin oneof on timer actions/events, and extends wait_for_external_event to support optional timeouts by internally scheduling a timer (or a far-future timer when no timeout is provided), aligning behavior with other SDKs.
Changes:
- Add a
timeoutparameter towait_for_external_eventacross the public context APIs and implement it via a composite task that races the external event against a timer. - Populate timer
originmetadata for user timers (create_timer), external-event wait timers, and retry delay timers (activity + child workflow). - Rebuild protobuf Python outputs/stubs and add/adjust unit tests for the new timer-origin behavior.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py | Updates existing external-event tests for the new “auto timer” behavior and adds new assertions/tests around timer origin fields. |
| ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py | Extends the public workflow context interface to support wait_for_external_event(..., timeout=...). |
| ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py | Threads the new timeout keyword through the public wrapper context to the underlying durable task context. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py | Sets timer origins for create-timer and retry timers, and implements external-event timeout/far-future timer scheduling. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py | Adds ExternalEventWithTimeoutTask composite task and updates the orchestration context contract for the new timeout parameter. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py | Extends timer action helper to populate the correct origin oneof field. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi | Regenerated protobuf typing stubs for new timer origin fields. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py | Regenerated protobuf runtime code for new timer origin fields. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi | Regenerated protobuf typing stubs for new timer origin fields on history events. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py | Regenerated protobuf runtime code for new timer origin fields on history events. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._result = completed_task.get_result() | ||
| self._is_complete = True | ||
| elif completed_task is self._timer_task: | ||
| self._exception = TimeoutError('The operation timed out waiting for an external event') | ||
| self._is_complete = True |
There was a problem hiding this comment.
ExternalEventWithTimeoutTask assigns TimeoutError to self._exception, but Task._exception / Task.get_exception() are typed as TaskFailedError. This makes task failure typing inconsistent (and may break static typing and any code that assumes task failures are always TaskFailedError). Consider widening the base task exception type to Exception (and updating get_exception/annotations) or introducing a dedicated durable timeout failure type that is still catchable as TimeoutError.
| new_events = [ | ||
| helpers.new_workflow_started_event(current_timestamp), | ||
| helpers.new_child_workflow_created_event(3, 'child_orchestrator', expected_first_child_id), |
There was a problem hiding this comment.
This test intends to verify that retry timers keep pointing at the first child workflow instance ID even when later retries create a different child instance. However, the simulated second new_child_workflow_created_event(...) uses expected_first_child_id again, so the test doesn't actually cover the “different instance_id” scenario described in the comment. Use a distinct second child instance ID here and keep asserting that the timer origin still references the first ID.
| new_events = [ | |
| helpers.new_workflow_started_event(current_timestamp), | |
| helpers.new_child_workflow_created_event(3, 'child_orchestrator', expected_first_child_id), | |
| expected_second_child_id = f'{TEST_INSTANCE_ID}:0002' | |
| new_events = [ | |
| helpers.new_workflow_started_event(current_timestamp), | |
| helpers.new_child_workflow_created_event(3, 'child_orchestrator', expected_second_child_id), |
| fire_at = timeout if timeout is not None else datetime(9999, 12, 31, 23, 59, 59) | ||
| timer_task = self.create_timer_internal( | ||
| fire_at, | ||
| origin=pb.TimerOriginExternalEvent(name=name), | ||
| ) |
There was a problem hiding this comment.
wait_for_external_event() now always schedules a timer action (timeout or far-future). This changes the action sequence compared to previous versions and will cause in-flight orchestrations that were started on older code to begin emitting a new createTimer action when replayed after upgrade. If backward compatibility for running instances matters, consider gating the new behavior behind is_patched(...) (or another versioning mechanism) so existing histories can replay without introducing new actions unexpectedly.
originfield in timers.wait_for_external_eventto create a timer associated with such event. Other SDKs follow this same approach.