-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Inworld websocket improvements #4533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
📝 WalkthroughWalkthroughIntroduced a connection-pooling architecture for Inworld TTS: a _ConnectionPool managing multiple WebSocket connections and per-connection context limits, plus refactored TTS to obtain shared contexts from the pool, chunk text, and route all WebSocket interactions through pooled connections. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant TTS
participant Pool as ConnectionPool
participant Conn as Connection
participant Inworld as InworldAPI
Client->>TTS: synthesize(text)
TTS->>Pool: acquire_context()
alt capacity available
Pool->>Conn: assign existing connection/context
else wait for capacity
Pool-->>TTS: wait (capacity event)
Pool->>Conn: create new connection if under max
end
Pool-->>TTS: return context_id, connection
loop for each text chunk (<=1000 chars)
TTS->>Conn: send_text(chunk)
Conn->>Inworld: WebSocket send
Inworld-->>Conn: audio chunk
Conn-->>TTS: deliver audio chunk
end
TTS->>Conn: flush_context()
Conn->>Inworld: flush message
Inworld-->>Conn: final frames
TTS->>Conn: close_context()
Conn->>Pool: release context capacity
Pool->>Pool: signal waiting acquirers
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py`:
- Around line 102-108: The stale-context cleanup uses created_at so long-lived
streams get evicted immediately after close_context and their waiter futures
stay unresolved; add a close_started_at timestamp to _ContextInfo, set it in
close_context, update the cleanup logic to base eviction age on close_started_at
when present (fallback to created_at), and when evicting a context ensure any
waiter (tts._ContextInfo.waiter) is resolved (e.g., set_result(None) if not
done) so upstream timeouts don't occur; also make sure the existing
contextClosed handling still resolves the waiter if it arrives before cleanup.
- Around line 522-543: Before checking capacity and creating a new connection,
prune closed entries from self._connections so closed connections don't count
toward self._max_connections; iterate self._connections, remove or filter out
conn where conn._closed is True, then perform the existing capacity loop and the
len(self._connections) < self._max_connections check. Update references in this
block that create _InworldConnection, call conn.acquire_context, and use
notify_capacity_available so behavior is unchanged except closed connections are
excluded.
- Around line 212-257: The acquire_context loops have a lost-wakeup race because
they call self._context_available.clear() before awaiting
self._context_available.wait(); move the clear() call to after wait() returns so
a set() that happens between the capacity check and the wait won't be lost.
Update both _InworldConnection.acquire_context and
_ConnectionPool.acquire_context: remove/relocate the pre-wait clear() and call
clear() only after asyncio.Event.wait() completes (or equivalently clear
immediately after the awaited wait_for returns) while preserving the timeout
logic and exception handling.
- Around line 365-372: When status.code != 0 in the response-handling block, you
must release the stuck creation context instead of leaving it in self._contexts
in CREATING state; after setting ctx.waiter exception
(ctx.waiter.set_exception(error)) call the cleanup path (e.g., invoke
close_context(ctx.id) or transition ctx.state to ContextState.CLOSING and remove
it from self._contexts) and then signal capacity (call the existing
_signal_capacity() or equivalent) so the pool no longer counts the failed
context toward capacity. Ensure you reference the same symbols used in the file:
ctx, ctx.waiter, self._contexts, close_context (or ContextState.CLOSING) and
_signal_capacity when implementing the fix.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧬 Code graph analysis (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (2)
livekit-agents/livekit/agents/utils/misc.py (2)
shortuuid(21-22)is_given(25-26)livekit-agents/livekit/agents/_exceptions.py (1)
APIError(14-42)
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
|
@davidzhao made small updates in response to coderabbit review. re-tested both the normal agent convo flow and connection pool performance and looks good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py`:
- Around line 1037-1039: The generic exception handler logs the error and raises
APIConnectionError but does not call close_context, which can leak context_id in
the pool; update the except Exception as e block (where logger.error and raise
APIConnectionError() from e occur) to call close_context(context_id) before
raising (ideally in a finally-like cleanup or immediately after logging), and
ensure close_context is itself protected (e.g., swallow/log any errors from
close_context) so the original exception is re-raised as APIConnectionError.
- Around line 548-561: The new connection is appended to self._connections
before calling _InworldConnection.acquire_context, so if acquire_context raises
the pool retains a failed connection; either defer appending until after ctx
acquisition succeeds or catch exceptions around acquire_context, remove the conn
from self._connections and close it. Specifically, create the _InworldConnection
with session/_ws_url/_authorization, then call await
conn.acquire_context(emitter, opts, remaining_timeout) inside a try block; on
success append conn to self._connections and log; on exception ensure you call
conn.close()/cleanup, do not append (or remove) and re-raise or return the
error. Ensure notify_capacity_available usage remains unchanged.
🧹 Nitpick comments (5)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (5)
159-160: Consider bounding the outbound queue to prevent memory growth.The
_outbound_queueis unbounded. If the WebSocket send rate is slower than the producer rate (e.g., during network issues), the queue can grow indefinitely. Consider setting a reasonablemaxsize(e.g., 1000) to apply backpressure.♻️ Suggested change
- self._outbound_queue: asyncio.Queue[_OutboundMessage] = asyncio.Queue() + self._outbound_queue: asyncio.Queue[_OutboundMessage] = asyncio.Queue(maxsize=1000)
833-838:prewarmdoesn't actually establish a connection.The current implementation only creates the pool object but doesn't establish any WebSocket connection until the first
acquire_contextcall. If the intent is to reduce latency on first synthesis, consider havingprewarmactually create and connect at least one connection.♻️ Suggested enhancement
async def _prewarm_impl(self) -> None: - # Just ensure the pool is created - first acquire will establish a connection - await self._get_pool() + pool = await self._get_pool() + # Create one connection to reduce latency on first synthesis + conn = _InworldConnection( + session=self._ensure_session(), + ws_url=self._ws_url, + authorization=self._authorization, + on_capacity_available=pool.notify_capacity_available, + ) + await conn.connect() + async with pool._pool_lock: + pool._connections.append(conn)
1028-1028: Consider extracting the magic number60to a named constant.The
+ 60buffer on the waiter timeout is not immediately clear. Consider defining a constant likeSYNTHESIS_TIMEOUT_BUFFER = 60.0to document its purpose.
1015-1019:_mark_started()is called for every chunk; consider calling once.
_mark_started()is called inside the loop for each 1000-char chunk. If this method has side effects or is not idempotent, it may cause unexpected behavior. Consider moving it outside the inner loop or guarding with a flag.♻️ Suggested fix
async def _send_task() -> None: + started = False async for ev in sent_tokenizer_stream: text = ev.token # Chunk to stay within Inworld's 1000 char limit for i in range(0, len(text), 1000): connection.send_text(context_id, text[i : i + 1000]) - self._mark_started() + if not started: + self._mark_started() + started = True connection.flush_context(context_id) connection.close_context(context_id)
235-235: Useasyncio.get_running_loop()instead ofget_event_loop()in async context.
asyncio.get_event_loop()emits a DeprecationWarning in Python 3.10+ when called from a coroutine. Sinceacquire_context()is an async function, useasyncio.get_running_loop().create_future()instead.♻️ Suggested fix
- waiter: asyncio.Future[None] = asyncio.get_event_loop().create_future() + waiter: asyncio.Future[None] = asyncio.get_running_loop().create_future()
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings
Files:
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: type-check (3.9)
- GitHub Check: type-check (3.13)
- GitHub Check: unit-tests
🔇 Additional comments (6)
livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py (6)
21-25: LGTM!The new imports (
time,field,Enum) are appropriate for the connection pool infrastructure.
94-135: LGTM!The context state tracking and message types are well-structured. The addition of
close_started_atproperly addresses the stale-context cleanup concern from prior review.
424-440: LGTM on stale context cleanup implementation.The cleanup logic now correctly uses
close_started_atfor timing (falling back tocreated_at), and resolves the waiter before eviction. This addresses the prior review feedback.
588-615: LGTM!The idle connection cleanup logic properly keeps at least one connection, checks periodically, and closes connections outside the lock to avoid blocking the pool.
759-770: LGTM!The lazy pool initialization with proper locking and closed-pool detection is well implemented.
855-862: LGTM!The shutdown sequence properly closes streams first, then the pool, ensuring clean resource cleanup.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| if len(self._connections) < self._max_connections: | ||
| conn = _InworldConnection( | ||
| session=self._session, | ||
| ws_url=self._ws_url, | ||
| authorization=self._authorization, | ||
| on_capacity_available=self.notify_capacity_available, | ||
| ) | ||
| self._connections.append(conn) | ||
| logger.debug( | ||
| "Created new Inworld connection", | ||
| extra={"pool_size": len(self._connections)}, | ||
| ) | ||
| ctx_id, waiter = await conn.acquire_context(emitter, opts, remaining_timeout) | ||
| return ctx_id, waiter, conn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New connection acquisition should handle exceptions to avoid pool corruption.
If conn.acquire_context() fails after the connection is appended to _connections (line 555), the newly created connection remains in the pool even if it failed to connect. Consider wrapping in try/except or deferring the append until after successful acquisition.
🛠️ Suggested fix
if len(self._connections) < self._max_connections:
conn = _InworldConnection(
session=self._session,
ws_url=self._ws_url,
authorization=self._authorization,
on_capacity_available=self.notify_capacity_available,
)
- self._connections.append(conn)
logger.debug(
"Created new Inworld connection",
- extra={"pool_size": len(self._connections)},
+ extra={"pool_size": len(self._connections) + 1},
)
- ctx_id, waiter = await conn.acquire_context(emitter, opts, remaining_timeout)
- return ctx_id, waiter, conn
+ try:
+ ctx_id, waiter = await conn.acquire_context(emitter, opts, remaining_timeout)
+ self._connections.append(conn)
+ return ctx_id, waiter, conn
+ except Exception:
+ await conn.aclose()
+ raise🤖 Prompt for AI Agents
In `@livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py`
around lines 548 - 561, The new connection is appended to self._connections
before calling _InworldConnection.acquire_context, so if acquire_context raises
the pool retains a failed connection; either defer appending until after ctx
acquisition succeeds or catch exceptions around acquire_context, remove the conn
from self._connections and close it. Specifically, create the _InworldConnection
with session/_ws_url/_authorization, then call await
conn.acquire_context(emitter, opts, remaining_timeout) inside a try block; on
success append conn to self._connections and log; on exception ensure you call
conn.close()/cleanup, do not append (or remove) and re-raise or return the
error. Ensure notify_capacity_available usage remains unchanged.
| except Exception as e: | ||
| logger.error( | ||
| "Inworld WebSocket connection error", | ||
| extra={"context_id": context_id, "error": e}, | ||
| ) | ||
| logger.error("Inworld stream error", extra={"context_id": context_id, "error": e}) | ||
| raise APIConnectionError() from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing close_context call in generic exception handler may leak contexts.
When a generic exception occurs (lines 1037-1039), close_context is not called, leaving the context in the pool until stale cleanup runs (120+ seconds). Consider closing the context on all error paths.
🛠️ Suggested fix
except Exception as e:
logger.error("Inworld stream error", extra={"context_id": context_id, "error": e})
+ connection.close_context(context_id)
raise APIConnectionError() from e🤖 Prompt for AI Agents
In `@livekit-plugins/livekit-plugins-inworld/livekit/plugins/inworld/tts.py`
around lines 1037 - 1039, The generic exception handler logs the error and
raises APIConnectionError but does not call close_context, which can leak
context_id in the pool; update the except Exception as e block (where
logger.error and raise APIConnectionError() from e occur) to call
close_context(context_id) before raising (ideally in a finally-like cleanup or
immediately after logging), and ensure close_context is itself protected (e.g.,
swallow/log any errors from close_context) so the original exception is
re-raised as APIConnectionError.
Inworld has a maximum of 20 connections and 5 contexts per websocket connection, so the plugin has been updated to support this.
Summary by CodeRabbit
New Features
Performance Improvements
✏️ Tip: You can customize this high-level summary in your review settings.