diff --git a/docs/action-queue.md b/docs/action-queue.md index 8da02d90..8a55c6b2 100644 --- a/docs/action-queue.md +++ b/docs/action-queue.md @@ -2,12 +2,65 @@ The action queue automatically groups rapid, consecutive calls to `execute_action_group()` into a single ActionGroup execution. This minimizes the number of API calls and helps prevent rate limiting issues, such as `TooManyRequestsError`, `TooManyConcurrentRequestsError`, `TooManyExecutionsError`, or `ExecutionQueueFullError` which can occur if actions are sent individually in quick succession. -Important limitation: -- Gateways only allow a single action per device in each action group. The queue - merges commands for the same `device_url` into a single action to keep the - batch valid and preserve command order for that device. -- If you pass multiple actions for the same `device_url` in a single - `execute_action_group()` call, the queue will merge them for you. +## How batching and merging works + +The Overkiz API uses three levels of nesting: + +- **Command** — a single device instruction (e.g. `close`, `setClosure(50)`) +- **Action** — one device URL + one or more commands +- **ActionGroup** — a batch of actions submitted as a single API call + +The gateway enforces **one action per device** in each action group. The queue handles this automatically: when multiple actions target the same `device_url`, their commands are merged into a single action while preserving order. + +### Different devices — no merging needed + +Three commands for three different devices produce three actions in one action group: + +```python +# These three calls arrive within the delay window: +await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/87654321", commands=[Command(name=OverkizCommand.OPEN)])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/11111111", commands=[Command(name=OverkizCommand.STOP)])]) + +# Sent as one API call: +# ActionGroup(actions=[ +# Action(device_url="io://…/12345678", commands=[close]), +# Action(device_url="io://…/87654321", commands=[open]), +# Action(device_url="io://…/11111111", commands=[stop]), +# ]) +``` + +### Same device — commands are merged + +When two calls target the same device, the queue merges their commands into a single action: + +```python +await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])])]) + +# Sent as one API call: +# ActionGroup(actions=[ +# Action(device_url="io://…/12345678", commands=[close, setClosure(50)]), +# ]) +``` + +### Mixed — both behaviors combined + +```python +await client.execute_action_group([Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.CLOSE)])]) +await client.execute_action_group([ + Action(device_url="io://1234-5678-1234/87654321", commands=[Command(name=OverkizCommand.OPEN)]), + Action(device_url="io://1234-5678-1234/12345678", commands=[Command(name=OverkizCommand.SET_CLOSURE, parameters=[50])]), +]) + +# Sent as one API call: +# ActionGroup(actions=[ +# Action(device_url="io://…/12345678", commands=[close, setClosure(50)]), # merged +# Action(device_url="io://…/87654321", commands=[open]), +# ]) +``` + +The original action objects passed to `execute_action_group()` are never mutated — the queue works on internal copies. ## Enable with defaults diff --git a/pyoverkiz/action_queue.py b/pyoverkiz/action_queue.py index f78be230..8a124686 100644 --- a/pyoverkiz/action_queue.py +++ b/pyoverkiz/action_queue.py @@ -71,16 +71,35 @@ def __await__(self) -> Generator[Any, None, str]: class ActionQueue: - """Batches multiple action executions into single API calls. + """Batches device actions into single API calls (action groups). - When actions are added, they are held for a configurable delay period. - If more actions arrive during this window, they are batched together. - The batch is flushed when: + The Overkiz API executes commands via action groups. Each action group + contains one Action per device, and each Action holds one or more Commands. + The gateway enforces at most one Action per device per action group. + + Batching example — two add() calls arriving within the delay window:: + + add([Action(device_url="device/1", commands=[close])]) + add([ + Action(device_url="device/2", commands=[open]), + Action(device_url="device/1", commands=[setClosure(50)]), + ]) + + Produces one action group with two actions:: + + ActionGroup(actions=[ + Action(device_url="device/1", commands=[close, setClosure(50)]), # merged + Action(device_url="device/2", commands=[open]), + ]) + + Three separate devices would remain three separate actions in the group. + Merging only happens when the same device_url appears more than once. + + The queue flushes when: - The delay timer expires - The max actions limit is reached - - The execution mode changes - - The label changes - - Manual flush is requested + - The execution mode or label changes + - flush() or shutdown() is called """ def __init__( @@ -107,13 +126,31 @@ def __init__( self._lock = asyncio.Lock() @staticmethod - def _copy_action(action: Action) -> Action: - """Return an `Action` copy with an independent commands list. + def _merge_actions( + target: list[Action], + index: dict[str, Action], + source: list[Action], + *, + copy: bool = False, + ) -> None: + """Merge *source* actions into *target*, combining commands for duplicate devices. - The queue merges commands for duplicate devices, so caller-owned action - instances must be copied to avoid mutating user input while batching. + New device_urls are appended to *target*; existing ones get their commands + extended. When *copy* is True, source actions are copied to avoid mutating + caller-owned objects. """ - return Action(device_url=action.device_url, commands=list(action.commands)) + for action in source: + existing = index.get(action.device_url) + if existing is None: + merged = ( + Action(device_url=action.device_url, commands=list(action.commands)) + if copy + else action + ) + target.append(merged) + index[action.device_url] = merged + else: + existing.commands.extend(action.commands) async def add( self, @@ -146,14 +183,7 @@ async def add( normalized_actions: list[Action] = [] normalized_index: dict[str, Action] = {} - for action in actions: - existing = normalized_index.get(action.device_url) - if existing is None: - action_copy = self._copy_action(action) - normalized_actions.append(action_copy) - normalized_index[action.device_url] = action_copy - else: - existing.commands.extend(action.commands) + self._merge_actions(normalized_actions, normalized_index, actions, copy=True) async with self._lock: # If mode or label changes, flush existing queue first @@ -162,18 +192,10 @@ async def add( ): batches_to_execute.append(self._prepare_flush()) - # Add actions to pending queue - pending_index = { - pending_action.device_url: pending_action - for pending_action in self._pending_actions - } - for action in normalized_actions: - pending = pending_index.get(action.device_url) - if pending is None: - self._pending_actions.append(action) - pending_index[action.device_url] = action - else: - pending.commands.extend(action.commands) + pending_index = {a.device_url: a for a in self._pending_actions} + self._merge_actions( + self._pending_actions, pending_index, normalized_actions + ) self._pending_mode = mode self._pending_label = label @@ -196,39 +218,25 @@ async def add( # Execute batches outside the lock if we flushed for batch in batches_to_execute: - if batch[0]: - await self._execute_batch(*batch) + batch_actions, batch_mode, batch_label, batch_waiters = batch + if batch_actions: + await self._execute_batch( + batch_actions, batch_mode, batch_label, batch_waiters + ) return waiter async def _delayed_flush(self) -> None: """Wait for the delay period, then flush the queue.""" - waiters: list[QueuedExecution] = [] - try: - await asyncio.sleep(self._settings.delay) - async with self._lock: - if not self._pending_actions: - return - - # Take snapshot and clear state while holding lock - actions = self._pending_actions - mode = self._pending_mode - label = self._pending_label - waiters = self._pending_waiters - - self._pending_actions = [] - self._pending_mode = None - self._pending_label = None - self._pending_waiters = [] - self._flush_task = None + await asyncio.sleep(self._settings.delay) + async with self._lock: + self._flush_task = None + # Another coroutine may have already flushed the queue before we acquired the lock. + actions, mode, label, waiters = self._prepare_flush() + if not actions: + return - # Execute outside the lock - await self._execute_batch(actions, mode, label, waiters) - except asyncio.CancelledError as exc: - # Ensure all waiters are notified if this task is cancelled - for waiter in waiters: - waiter.set_exception(exc) - raise + await self._execute_batch(actions, mode, label, waiters) def _prepare_flush( self, @@ -317,19 +325,20 @@ def get_pending_count(self) -> int: async def shutdown(self) -> None: """Shutdown the queue, flushing any pending actions.""" + cancelled_task: asyncio.Task[None] | None = None batch_to_execute = None async with self._lock: if self._flush_task and not self._flush_task.done(): - task = self._flush_task - task.cancel() + cancelled_task = self._flush_task + cancelled_task.cancel() self._flush_task = None - # Wait for cancellation to complete - with contextlib.suppress(asyncio.CancelledError): - await task if self._pending_actions: batch_to_execute = self._prepare_flush() - # Execute outside the lock + if cancelled_task: + with contextlib.suppress(asyncio.CancelledError): + await cancelled_task + if batch_to_execute: await self._execute_batch(*batch_to_execute) diff --git a/tests/test_action_queue.py b/tests/test_action_queue.py index 0a8c0219..06f5658b 100644 --- a/tests/test_action_queue.py +++ b/tests/test_action_queue.py @@ -350,3 +350,102 @@ async def set_result(): # Ensure background task has completed await task + + +@pytest.mark.asyncio +async def test_action_queue_settings_validate(): + """Test that validate raises on invalid settings.""" + with pytest.raises(ValueError, match="positive"): + ActionQueueSettings(delay=-1).validate() + + with pytest.raises(ValueError, match="at least 1"): + ActionQueueSettings(max_actions=0).validate() + + # Valid settings should not raise + ActionQueueSettings(delay=0.5, max_actions=10).validate() + + +@pytest.mark.asyncio +async def test_action_queue_add_empty_actions(mock_executor): + """Test that add raises ValueError for empty action list.""" + queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1)) + + with pytest.raises(ValueError, match="at least one Action"): + await queue.add([]) + + +@pytest.mark.asyncio +async def test_action_queue_executor_cancelled_propagates(): + """Test that CancelledError during execution propagates to waiters.""" + + async def cancelling_executor(actions, mode, label): + raise asyncio.CancelledError + + queue = ActionQueue( + executor=AsyncMock(side_effect=cancelling_executor), + settings=ActionQueueSettings(delay=0.05), + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + + with pytest.raises(asyncio.CancelledError): + await queued + + +@pytest.mark.asyncio +async def test_action_queue_flush_empty(mock_executor): + """Test that flushing an empty queue is a no-op.""" + queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1)) + + await queue.flush() + mock_executor.assert_not_called() + + +@pytest.mark.asyncio +async def test_action_queue_shutdown_empty(mock_executor): + """Test that shutting down an empty queue is a no-op.""" + queue = ActionQueue(executor=mock_executor, settings=ActionQueueSettings(delay=0.1)) + + await queue.shutdown() + mock_executor.assert_not_called() + + +@pytest.mark.asyncio +async def test_action_queue_no_self_cancel_during_delayed_flush(): + """Test that _delayed_flush does not cancel itself via _prepare_flush. + + When _delayed_flush fires and calls _prepare_flush, the flush task is still + the running coroutine. _prepare_flush must not cancel it, otherwise the batch + would fail with CancelledError when the executor performs I/O. + """ + cancel_detected = False + + async def slow_executor(actions, mode, label): + nonlocal cancel_detected + try: + await asyncio.sleep(0.05) + except asyncio.CancelledError: + cancel_detected = True + raise + return "exec-ok" + + queue = ActionQueue( + executor=AsyncMock(side_effect=slow_executor), + settings=ActionQueueSettings(delay=0.05), + ) + + action = Action( + device_url="io://1234-5678-9012/1", + commands=[Command(name=OverkizCommand.CLOSE)], + ) + + queued = await queue.add([action]) + exec_id = await queued + + assert exec_id == "exec-ok" + assert not cancel_detected, "_delayed_flush cancelled itself via _prepare_flush"