Conversation
Signed-off-by: Albert Callarisa <albert@diagrid.io>
There was a problem hiding this comment.
Pull request overview
This PR updates the durable workflow SDK to populate the newly added protobuf origin oneof on timer actions/events, and extends wait_for_external_event to support optional timeouts by internally scheduling a timer (or a far-future timer when no timeout is provided), aligning behavior with other SDKs.
Changes:
- Add a
timeoutparameter towait_for_external_eventacross the public context APIs and implement it via a composite task that races the external event against a timer. - Populate timer
originmetadata for user timers (create_timer), external-event wait timers, and retry delay timers (activity + child workflow). - Rebuild protobuf Python outputs/stubs and add/adjust unit tests for the new timer-origin behavior.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py | Updates existing external-event tests for the new “auto timer” behavior and adds new assertions/tests around timer origin fields. |
| ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py | Extends the public workflow context interface to support wait_for_external_event(..., timeout=...). |
| ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py | Threads the new timeout keyword through the public wrapper context to the underlying durable task context. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/worker.py | Sets timer origins for create-timer and retry timers, and implements external-event timeout/far-future timer scheduling. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/task.py | Adds ExternalEventWithTimeoutTask composite task and updates the orchestration context contract for the new timeout parameter. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py | Extends timer action helper to populate the correct origin oneof field. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.pyi | Regenerated protobuf typing stubs for new timer origin fields. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/orchestrator_actions_pb2.py | Regenerated protobuf runtime code for new timer origin fields. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.pyi | Regenerated protobuf typing stubs for new timer origin fields on history events. |
| ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/history_events_pb2.py | Regenerated protobuf runtime code for new timer origin fields on history events. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ext/dapr-ext-workflow/tests/durabletask/test_orchestration_executor.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Albert Callarisa <albert@diagrid.io>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Albert Callarisa <albert@diagrid.io>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ext/dapr-ext-workflow/dapr/ext/workflow/_durabletask/internal/helpers.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Albert Callarisa <albert@diagrid.io>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #976 +/- ##
==========================================
- Coverage 86.63% 80.93% -5.70%
==========================================
Files 84 136 +52
Lines 4473 13090 +8617
==========================================
+ Hits 3875 10594 +6719
- Misses 598 2496 +1898 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Albert Callarisa <albert@diagrid.io>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Albert Callarisa <albert@diagrid.io>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Albert Callarisa <albert@diagrid.io>
sicoyle
left a comment
There was a problem hiding this comment.
nice job 👏 👏 👏 🙌 few comments so far and I've reviewed mostttt files
| import dapr.ext.workflow._durabletask.internal.protos as pb | ||
| from google.protobuf import timestamp_pb2, wrappers_pb2 | ||
|
|
||
| TimerOrigin = Union[ |
There was a problem hiding this comment.
can you mv these changes into a timer.py file instead of adding to the helpers.py pls
| def is_optional_timer_action(action: pb.WorkflowAction) -> bool: | ||
| """Returns True if the action is an optional TimerOriginExternalEvent timer | ||
| with the sentinel fireAt — i.e. created by an indefinite wait_for_external_event. | ||
|
|
||
| Pre-patch histories (from prior SDK versions that didn't schedule a timer for | ||
| indefinite waits) won't carry a matching TimerCreatedEvent; the replay logic | ||
| uses this check to drop the optional action and shift sequence ids. | ||
| """ | ||
| if not action.HasField('createTimer'): | ||
| return False | ||
| timer = action.createTimer | ||
| if timer.WhichOneof('origin') != 'externalEvent': | ||
| return False | ||
| return ( | ||
| timer.fireAt.seconds == OPTIONAL_TIMER_FIRE_AT.seconds | ||
| and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos | ||
| ) | ||
|
|
||
|
|
||
| def is_optional_timer_event(event: pb.HistoryEvent) -> bool: | ||
| """Returns True if a TimerCreatedEvent is the optional sentinel timer. | ||
|
|
||
| For replay compatibility, treat a timerCreated event with the sentinel | ||
| fireAt as optional even if the proto3 ``origin`` oneof is unset (e.g. when | ||
| reading histories emitted by older sidecars that didn't populate it). When | ||
| ``origin`` *is* populated, it must match TimerOriginExternalEvent. | ||
| """ | ||
| if not event.HasField('timerCreated'): | ||
| return False | ||
| timer = event.timerCreated | ||
| if ( | ||
| timer.fireAt.seconds != OPTIONAL_TIMER_FIRE_AT.seconds | ||
| or timer.fireAt.nanos != OPTIONAL_TIMER_FIRE_AT.nanos | ||
| ): | ||
| return False | ||
| origin = timer.WhichOneof('origin') | ||
| return origin in (None, 'externalEvent') | ||
|
|
There was a problem hiding this comment.
these look almost like the same func... can we create a helper for the same logic between the two?
| timer.fireAt.seconds == OPTIONAL_TIMER_FIRE_AT.seconds | ||
| and timer.fireAt.nanos == OPTIONAL_TIMER_FIRE_AT.nanos |
| timer.fireAt.seconds != OPTIONAL_TIMER_FIRE_AT.seconds | ||
| or timer.fireAt.nanos != OPTIONAL_TIMER_FIRE_AT.nanos |
There was a problem hiding this comment.
why is this one or if the func above uses and for this check?
| @@ -202,10 +272,20 @@ def new_workflow_version_not_available_action( | |||
| ) | |||
|
|
|||
|
|
|||
| def new_create_timer_action(id: int, fire_at: datetime) -> pb.WorkflowAction: | |||
| timestamp = timestamp_pb2.Timestamp() | |||
| timestamp.FromDatetime(fire_at) | |||
| return pb.WorkflowAction(id=id, createTimer=pb.CreateTimerAction(fireAt=timestamp)) | |||
| def new_create_timer_action( | |||
| id: int, | |||
| fire_at: Union[datetime, timestamp_pb2.Timestamp], | |||
| origin: Optional[TimerOrigin] = None, | |||
| ) -> pb.WorkflowAction: | |||
| if isinstance(fire_at, timestamp_pb2.Timestamp): | |||
| timestamp = fire_at | |||
There was a problem hiding this comment.
consistency between timer_id and id params between the two funcs and also vars within on the ts vs timestamp for the same thing pls
|
|
||
| DESCRIPTOR: _descriptor.Descriptor | ||
|
|
||
| TASKEXECUTIONID_FIELD_NUMBER: _builtins.int |
There was a problem hiding this comment.
| TASKEXECUTIONID_FIELD_NUMBER: _builtins.int | |
| TASK_EXECUTION_ID_FIELD_NUMBER: _builtins.int |
why do we need to track the field number for this? can it just be TASK_EXECUTION_ID instead?
| * ``None`` (default) or a *negative* ``timedelta`` — wait indefinitely. | ||
| An optional sentinel timer is scheduled internally for runtime | ||
| tracking, but ``TimeoutError`` is never raised on its own. | ||
| * ``timedelta(0)`` — do not wait at all. The returned task fails | ||
| immediately with ``TimeoutError``. | ||
| * A future ``datetime`` or a positive ``timedelta`` — wait until | ||
| that deadline / for that duration; ``TimeoutError`` is raised if | ||
| the event has not been received in time. |
There was a problem hiding this comment.
which of these does plain 0 fall under? How do these compare to the other SDK timeout field defaults? How does this compare to other workflow providers for default behaviors?
| wait_for_external_event being canceled before any history event arrives. | ||
| """ | ||
| if self._is_complete: | ||
| raise ValueError('The task has already completed.') |
There was a problem hiding this comment.
should this instead be a noop?
| if self.is_complete: | ||
| return | ||
| if completed_task is self._event_task: | ||
| if completed_task.is_failed: |
There was a problem hiding this comment.
this is missing a few other terminal states i believe in the if conditional check
| if self._timeout is not None: | ||
| msg = ( | ||
| f'Timed out after {self._timeout!r} waiting for ' | ||
| f'external event {self._event_name!r}' | ||
| ) | ||
| else: | ||
| msg = f'Timed out waiting for external event {self._event_name!r}' |
There was a problem hiding this comment.
can these just be combined instead of split in an if else?
originfield in timers.wait_for_external_eventto create a timer associated with such event. Other SDKs follow this same approach.