Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions docs/action-queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,65 @@

The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsError`, `TooManyConcurrentRequestsError`, `TooManyExecutionsError`, or `ExecutionQueueFullError` which can occur if actions are sent individually in quick succession.

Important limitation:
- Gateways only allow a single action per device in each action group. The queue
merges commands for the same `device_url` into a single action to keep the
batch valid and preserve command order for that device.
- If you pass multiple actions for the same `device_url` in a single
`execute_action_group()` call, the queue will merge them for you.
## How batching and merging works

The Overkiz API uses three levels of nesting:

- **Command** — a single device instruction (e.g. `close`, `setClosure(50)`)
- **Action** — one device URL + one or more commands
- **ActionGroup** — a batch of actions submitted as a single API call

The gateway enforces **one action per device** in each action group. The queue handles this automatically: when multiple actions target the same `device_url`, their commands are merged into a single action while preserving order.

### Different devices — no merging needed

Three commands for three different devices produce three actions in one action group:

```python
# These three calls arrive within the delay window:
await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])])
await client.execute_action_group([Action(device_url="io://1234-5678-1234/87654321", commands=[Command(name=OverkizCommand.OPEN)])])
await client.execute_action_group([Action(device_url="io://1234-5678-1234/11111111", commands=[Command(name=OverkizCommand.STOP)])])

# Sent as one API call:
# ActionGroup(actions=[
# Action(device_url="io://…/12345678", commands=[close]),
# Action(device_url="io://…/87654321", commands=[open]),
# Action(device_url="io://…/11111111", commands=[stop]),
# ])
```

### Same device — commands are merged

When two calls target the same device, the queue merges their commands into a single action:

```python
await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])])
await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])])])

# Sent as one API call:
# ActionGroup(actions=[
# Action(device_url="io://…/12345678", commands=[close, setClosure(50)]),
# ])
```

### Mixed — both behaviors combined

```python
await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])])
await client.execute_action_group([
Action(device_url="io://1234-5678-1234/87654321", commands=[Command(name=OverkizCommand.OPEN)]),
Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])]),
])

# Sent as one API call:
# ActionGroup(actions=[
# Action(device_url="io://…/12345678", commands=[close, setClosure(50)]), # merged
# Action(device_url="io://…/87654321", commands=[open]),
# ])
```

The original action objects passed to `execute_action_group()` are never mutated — the queue works on internal copies.

## Enable with defaults

Expand Down
139 changes: 74 additions & 65 deletions pyoverkiz/action_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,35 @@ def __await__(self) -> Generator[Any, None, str]:


class ActionQueue:
"""Batches multiple action executions into single API calls.
"""Batches device actions into single API calls (action groups).

When actions are added, they are held for a configurable delay period.
If more actions arrive during this window, they are batched together.
The batch is flushed when:
The Overkiz API executes commands via action groups. Each action group
contains one Action per device, and each Action holds one or more Commands.
The gateway enforces at most one Action per device per action group.

Batching example — two add() calls arriving within the delay window::

add([Action(device_url="device/1", commands=[close])])
add([
Action(device_url="device/2", commands=[open]),
Action(device_url="device/1", commands=[setClosure(50)]),
])

Produces one action group with two actions::

ActionGroup(actions=[
Action(device_url="device/1", commands=[close, setClosure(50)]), # merged
Action(device_url="device/2", commands=[open]),
])

Three separate devices would remain three separate actions in the group.
Merging only happens when the same device_url appears more than once.

The queue flushes when:
- The delay timer expires
- The max actions limit is reached
- The execution mode changes
- The label changes
- Manual flush is requested
- The execution mode or label changes
- flush() or shutdown() is called
"""

def __init__(
Expand All @@ -107,13 +126,31 @@ def __init__(
self._lock = asyncio.Lock()

@staticmethod
def _copy_action(action: Action) -> Action:
"""Return an `Action` copy with an independent commands list.
def _merge_actions(
target: list[Action],
index: dict[str, Action],
source: list[Action],
*,
copy: bool = False,
) -> None:
"""Merge *source* actions into *target*, combining commands for duplicate devices.

The queue merges commands for duplicate devices, so caller-owned action
instances must be copied to avoid mutating user input while batching.
New device_urls are appended to *target*; existing ones get their commands
extended. When *copy* is True, source actions are copied to avoid mutating
caller-owned objects.
"""
return Action(device_url=action.device_url, commands=list(action.commands))
for action in source:
existing = index.get(action.device_url)
if existing is None:
merged = (
Action(device_url=action.device_url, commands=list(action.commands))
if copy
else action
)
target.append(merged)
index[action.device_url] = merged
else:
existing.commands.extend(action.commands)

async def add(
self,
Expand Down Expand Up @@ -146,14 +183,7 @@ async def add(

normalized_actions: list[Action] = []
normalized_index: dict[str, Action] = {}
for action in actions:
existing = normalized_index.get(action.device_url)
if existing is None:
action_copy = self._copy_action(action)
normalized_actions.append(action_copy)
normalized_index[action.device_url] = action_copy
else:
existing.commands.extend(action.commands)
self._merge_actions(normalized_actions, normalized_index, actions, copy=True)

async with self._lock:
# If mode or label changes, flush existing queue first
Expand All @@ -162,18 +192,10 @@ async def add(
):
batches_to_execute.append(self._prepare_flush())

# Add actions to pending queue
pending_index = {
pending_action.device_url: pending_action
for pending_action in self._pending_actions
}
for action in normalized_actions:
pending = pending_index.get(action.device_url)
if pending is None:
self._pending_actions.append(action)
pending_index[action.device_url] = action
else:
pending.commands.extend(action.commands)
pending_index = {a.device_url: a for a in self._pending_actions}
self._merge_actions(
self._pending_actions, pending_index, normalized_actions
)
self._pending_mode = mode
self._pending_label = label

Expand All @@ -196,39 +218,25 @@ async def add(

# Execute batches outside the lock if we flushed
for batch in batches_to_execute:
if batch[0]:
await self._execute_batch(*batch)
batch_actions, batch_mode, batch_label, batch_waiters = batch
if batch_actions:
await self._execute_batch(
batch_actions, batch_mode, batch_label, batch_waiters
)

return waiter

async def _delayed_flush(self) -> None:
"""Wait for the delay period, then flush the queue."""
waiters: list[QueuedExecution] = []
try:
await asyncio.sleep(self._settings.delay)
async with self._lock:
if not self._pending_actions:
return

# Take snapshot and clear state while holding lock
actions = self._pending_actions
mode = self._pending_mode
label = self._pending_label
waiters = self._pending_waiters

self._pending_actions = []
self._pending_mode = None
self._pending_label = None
self._pending_waiters = []
self._flush_task = None
await asyncio.sleep(self._settings.delay)
async with self._lock:
self._flush_task = None
# Another coroutine may have already flushed the queue before we acquired the lock.
actions, mode, label, waiters = self._prepare_flush()
if not actions:
return

# Execute outside the lock
await self._execute_batch(actions, mode, label, waiters)
except asyncio.CancelledError as exc:
# Ensure all waiters are notified if this task is cancelled
for waiter in waiters:
waiter.set_exception(exc)
raise
await self._execute_batch(actions, mode, label, waiters)

def _prepare_flush(
self,
Expand Down Expand Up @@ -317,19 +325,20 @@ def get_pending_count(self) -> int:

async def shutdown(self) -> None:
"""Shutdown the queue, flushing any pending actions."""
cancelled_task: asyncio.Task[None] | None = None
batch_to_execute = None
async with self._lock:
if self._flush_task and not self._flush_task.done():
task = self._flush_task
task.cancel()
cancelled_task = self._flush_task
cancelled_task.cancel()
self._flush_task = None
# Wait for cancellation to complete
with contextlib.suppress(asyncio.CancelledError):
await task

if self._pending_actions:
batch_to_execute = self._prepare_flush()

# Execute outside the lock
if cancelled_task:
with contextlib.suppress(asyncio.CancelledError):
await cancelled_task

if batch_to_execute:
await self._execute_batch(*batch_to_execute)
99 changes: 99 additions & 0 deletions tests/test_action_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,3 +350,102 @@ async def set_result():

# Ensure background task has completed
await task


@pytest.mark.asyncio
async def test_action_queue_settings_validate():
"""Test that validate raises on invalid settings."""
with pytest.raises(ValueError, match="positive"):
ActionQueueSettings(delay=-1).validate()

with pytest.raises(ValueError, match="at least 1"):
ActionQueueSettings(max_actions=0).validate()

# Valid settings should not raise
ActionQueueSettings(delay=0.5, max_actions=10).validate()


@pytest.mark.asyncio
async def test_action_queue_add_empty_actions(mock_executor):
"""Test that add raises ValueError for empty action list."""
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))

with pytest.raises(ValueError, match="at least one Action"):
await queue.add([])


@pytest.mark.asyncio
async def test_action_queue_executor_cancelled_propagates():
"""Test that CancelledError during execution propagates to waiters."""

async def cancelling_executor(actions, mode, label):
raise asyncio.CancelledError

queue = ActionQueue(
executor=AsyncMock(side_effect=cancelling_executor),
settings=ActionQueueSettings(delay=0.05),
)

action = Action(
device_url="io://1234-5678-9012/1",
commands=[Command(name=OverkizCommand.CLOSE)],
)

queued = await queue.add([action])

with pytest.raises(asyncio.CancelledError):
await queued


@pytest.mark.asyncio
async def test_action_queue_flush_empty(mock_executor):
"""Test that flushing an empty queue is a no-op."""
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))

await queue.flush()
mock_executor.assert_not_called()


@pytest.mark.asyncio
async def test_action_queue_shutdown_empty(mock_executor):
"""Test that shutting down an empty queue is a no-op."""
queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1))

await queue.shutdown()
mock_executor.assert_not_called()


@pytest.mark.asyncio
async def test_action_queue_no_self_cancel_during_delayed_flush():
"""Test that _delayed_flush does not cancel itself via _prepare_flush.

When _delayed_flush fires and calls _prepare_flush, the flush task is still
the running coroutine. _prepare_flush must not cancel it, otherwise the batch
would fail with CancelledError when the executor performs I/O.
"""
cancel_detected = False

async def slow_executor(actions, mode, label):
nonlocal cancel_detected
try:
await asyncio.sleep(0.05)
except asyncio.CancelledError:
cancel_detected = True
raise
return "exec-ok"

queue = ActionQueue(
executor=AsyncMock(side_effect=slow_executor),
settings=ActionQueueSettings(delay=0.05),
)

action = Action(
device_url="io://1234-5678-9012/1",
commands=[Command(name=OverkizCommand.CLOSE)],
)

queued = await queue.add([action])
exec_id = await queued

assert exec_id == "exec-ok"
assert not cancel_detected, "_delayed_flush cancelled itself via _prepare_flush"
Loading