client.workflows.executions inspects and manages individual workflow executions.
| Run length | Use |
|---|---|
| 0–60s | client.workflows.run(id, input, wait_for_completion=60) — server hold. |
| 60s–5min | client.workflows.executions.run_and_wait(id, input) — client polls every 2s. |
| Driving a UI | client.workflows.run(...) then poll client.workflows.executions.get(id) yourself. |
created → pending → running → waiting → finalizing → completed
↘ failed
↘ cancelled
↘ rejected
The first five are non-terminal; the last four are terminal. Always check status before reading result.
exec = client.workflows.executions.get(execution_id)
# ExecutionStatusResponse(execution_id, status, created_at, completed_at?, result?, error?)result is set on status == "completed". error is set on status == "failed".
detail = client.workflows.executions.get(execution_id, include_steps=True)Returns the full per-step execution payload (heavier; intended for debugging, not happy-path UI).
The convenience helper that wraps trigger + poll:
final = client.workflows.executions.run_and_wait(
"extract-invoice",
input={"contract_document": Path("contract.pdf")},
poll_interval_seconds=2.0, # default 2.0
timeout_seconds=5 * 60.0, # default 300.0
)Triggers async, then polls workflows.executions.get until terminal or timeout. Raises EigenpalTimeoutError on timeout.
executions = client.workflows.executions.list(
"wf_abc123",
status=["failed", "cancelled"], # str or list[str]
from_date="now()-7d", # ISO-8601 or relative expression
to_date="now()",
limit=50,
)Item shape: { id, workflow_id, status, trigger_type, trigger_input, result, error, created_at, started_at, completed_at, workflow }.
workflow is { id, name } of the owning workflow, or None if it has been deleted.
client.workflows.executions.cancel(execution_id)Idempotent. For runs not yet picked up by a worker (created/pending), transitions immediately to cancelled. For running executions, stamps cancel_requested_at so the worker honors the cancel at the next checkpoint.
If run_and_wait doesn't fit (e.g. you're driving a UI progress bar), poll manually:
import time
TERMINAL = {"completed", "failed", "cancelled", "rejected"}
triggered = client.workflows.run("extract-invoice", input=input)
status = None
while True:
status = client.workflows.executions.get(triggered.execution_id)
if status.status in TERMINAL:
break
time.sleep(2)
print(status.status, status.result, status.error)The client owns an httpx connection pool. Use a with block to close it deterministically:
with EigenpalClient(api_key=api_key) as client:
client.workflows.run("extract-invoice", input=input)