-
Notifications
You must be signed in to change notification settings - Fork 212
Inworld websocket improvements #979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: ef62402 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📝 WalkthroughWalkthroughA minor version bump changeset for the Inworld TTS plugin is introduced, along with documentation of the plugin in the README. The TTS implementation is refactored to use a connection pooling architecture with shared WebSocket connections, context lifecycle management, and per-connection capacity handling, replacing the previous per-instance WebSocket approach. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client Code
participant TTS as TTS Instance
participant Pool as ConnectionPool
participant Conn as InworldConnection
participant WS as WebSocket
participant Server as Inworld Server
Client->>TTS: synthesize(text)
TTS->>Pool: acquireContext()
Pool->>Pool: check capacity & pooled connections
alt Need new connection
Pool->>Conn: new InworldConnection()
Conn->>WS: connect to wsURL
WS->>Server: WebSocket upgrade
Server-->>WS: connection established
end
Pool-->>TTS: return context (contextId)
TTS->>Conn: sendText(contextId, chunks)
Conn->>WS: send CREATE_CONTEXT + TEXT messages
WS->>Server: forward messages
Server->>Server: process TTS request
Server->>WS: emit audio frames & CONTEXT_ENDED
WS->>Conn: handleMessage(message)
Conn->>Conn: update ContextState (audio, status)
Conn->>TTS: call context listener callback
TTS-->>Client: emit audio frames
Server->>WS: CONTEXT_ENDED signal
WS->>Conn: handleMessage(CONTEXT_ENDED)
Conn->>Conn: resolve context waiter
TTS->>Pool: releaseContext(contextId)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/inworld/src/tts.ts (1)
908-961: Ensure context cleanup on error paths in SynthesizeStream.If
sendLoop,flushContext, orwaiterrejects, the context can remain open and capacity never returns to the pool. Close the context on failure.🩹 Proposed fix
try { // Acquire a context from the shared pool const acquired = await pool.acquireContext(handleMessage, config); contextId = acquired.contextId; connection = acquired.connection; waiter = acquired.waiter; @@ await waiter; @@ for (const frame of bstream.flush()) { this.queue.put({ requestId: contextId, segmentId: contextId, frame, final: false, }); } } catch (e) { this.#logger.error({ error: e, contextId }, 'Error in SynthesizeStream run'); + if (connection && contextId) { + try { + await connection.closeContext(contextId); + } catch (closeErr) { + this.#logger.warn({ error: closeErr, contextId }, 'Failed to close context after error'); + } + } throw e; }
🤖 Fix all issues with AI agents
In `@plugins/inworld/src/tts.ts`:
- Around line 220-255: In acquireContext, if this.#sendCreateContext(contextId,
config) throws the created ContextInfo remains in this.#contexts and capacity is
leaked; wrap the send in try/catch, and on error remove the contextId from
this.#contexts, clear or call rejectWaiter(err) if set, then rethrow the error;
ensure this.#lastActivityAt is only left as-is or updated appropriately. Apply
the same cleanup pattern to the other send-create-context call noted (lines
446-448) so any failed send removes the context entry and signals the waiter.
- Around line 471-479: The recurring setInterval in ConnectionPool's constructor
(the `#idleCleanupInterval` created for `#cleanupIdleConnections`) prevents process
exit in short-lived contexts; update the interval creation to call .unref() if
available (e.g., setInterval(...).unref?.()) so the timer is non-blocking, and
also ensure resource cleanup by wiring TTS.close() to call
ConnectionPool.close() and remove entries from the module-level sharedPools Map
so intervals are cleared when a TTS instance is closed; apply the same .unref()
fix to the other setInterval usages mentioned (around the other constructors at
the referenced locations).
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
.changeset/chatty-rockets-start.mdREADME.mdplugins/inworld/src/tts.ts
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/inworld/src/tts.ts (1)
agents/src/utils.ts (1)
shortuuid(554-556)
🔇 Additional comments (2)
README.md (1)
74-74: Inworld plugin entry looks good.Clear addition to the supported plugins table.
.changeset/chatty-rockets-start.md (1)
1-5: Changeset entry is clear.Version bump and summary look fine.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| async acquireContext( | ||
| listener: (msg: InworldMessage) => void, | ||
| config: CreateContextConfig, | ||
| ): Promise<{ contextId: string; waiter: Promise<void> }> { | ||
| if (!this.hasCapacity) { | ||
| throw new Error('Connection has no capacity for new contexts'); | ||
| } | ||
|
|
||
| // Ensure connection is established | ||
| await this.#ensureConnected(); | ||
|
|
||
| const contextId = shortuuid(); | ||
| let resolveWaiter: (() => void) | null = null; | ||
| let rejectWaiter: ((err: Error) => void) | null = null; | ||
|
|
||
| const waiter = new Promise<void>((resolve, reject) => { | ||
| resolveWaiter = resolve; | ||
| rejectWaiter = reject; | ||
| }); | ||
|
|
||
| const contextInfo: ContextInfo = { | ||
| contextId, | ||
| state: ContextState.CREATING, | ||
| listener, | ||
| resolveWaiter, | ||
| rejectWaiter, | ||
| createdAt: Date.now(), | ||
| }; | ||
|
|
||
| this.#contexts.set(contextId, contextInfo); | ||
| this.#lastActivityAt = Date.now(); | ||
|
|
||
| // Send create context command | ||
| await this.#sendCreateContext(contextId, config); | ||
|
|
||
| return { contextId, waiter }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against context leaks when create-context send fails.
If #sendCreateContext throws, the context stays in #contexts, permanently reducing capacity. Clean it up on error.
🩹 Proposed fix
this.#contexts.set(contextId, contextInfo);
this.#lastActivityAt = Date.now();
// Send create context command
- await this.#sendCreateContext(contextId, config);
+ try {
+ await this.#sendCreateContext(contextId, config);
+ } catch (err) {
+ this.#contexts.delete(contextId);
+ if (rejectWaiter) {
+ const error = err instanceof Error ? err : new Error(String(err));
+ rejectWaiter(error);
+ }
+ throw err;
+ }
return { contextId, waiter };Also applies to: 446-448
🤖 Prompt for AI Agents
In `@plugins/inworld/src/tts.ts` around lines 220 - 255, In acquireContext, if
this.#sendCreateContext(contextId, config) throws the created ContextInfo
remains in this.#contexts and capacity is leaked; wrap the send in try/catch,
and on error remove the contextId from this.#contexts, clear or call
rejectWaiter(err) if set, then rethrow the error; ensure this.#lastActivityAt is
only left as-is or updated appropriately. Apply the same cleanup pattern to the
other send-create-context call noted (lines 446-448) so any failed send removes
the context entry and signals the waiter.
| constructor(url: string, auth: string) { | ||
| this.#url = url; | ||
| this.#auth = auth; | ||
|
|
||
| // Start idle connection cleanup | ||
| this.#idleCleanupInterval = setInterval(() => { | ||
| this.#cleanupIdleConnections(); | ||
| }, 60_000); // Check every minute | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and examine the TTS file
find . -name "tts.ts" -type f | head -5Repository: livekit/agents-js
Length of output: 206
🏁 Script executed:
# Get the file size to understand how large it is
wc -l plugins/inworld/src/tts.ts 2>/dev/null || find . -path "*plugins/inworld/src/tts.ts" -exec wc -l {} \;Repository: livekit/agents-js
Length of output: 91
🏁 Script executed:
# Search for the TTS class and close method
rg -n "close\(\)" plugins/inworld/src/tts.ts 2>/dev/null | head -20Repository: livekit/agents-js
Length of output: 232
🏁 Script executed:
# Let's read the specific lines mentioned: 471-479 and around the close method
sed -n '450,650p' plugins/inworld/src/tts.ts 2>/dev/null || find . -path "*plugins/inworld/src/tts.ts" -exec sed -n '450,650p' {} \;Repository: livekit/agents-js
Length of output: 5652
🏁 Script executed:
# Find TTS.close() method and the lines mentioned in "Also applies to"
sed -n '570,700p' plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 3390
🏁 Script executed:
# Also get the export and see if TTS exposes a close method
rg -n "close.*\(\)" plugins/inworld/src/tts.ts | head -20Repository: livekit/agents-js
Length of output: 265
🏁 Script executed:
# Check lines 586-597 and get context around them
sed -n '580,600p' plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 555
🏁 Script executed:
# Let's see the full file structure to understand what's at line 586-597
sed -n '586,597p' plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 437
🏁 Script executed:
# Find where getSharedPool is called
rg -n "getSharedPool" plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 221
🏁 Script executed:
# Get the ChunkedStream and SynthesizeStream classes to see if they use the pool
sed -n '700,850p' plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 4549
🏁 Script executed:
# Check if there are any tests or examples that might show the issue
find . -type f \( -name "*.test.ts" -o -name "*.spec.ts" -o -name "example*.ts" \) | xargs grep -l "inworld.*TTS\|new TTS" 2>/dev/null | head -5Repository: livekit/agents-js
Length of output: 248
🏁 Script executed:
# Let's check the base TTS class to understand the expected close() behavior
rg -n "class TTS" plugins/inworld/src/tts.ts -A 5Repository: livekit/agents-js
Length of output: 240
🏁 Script executed:
# Check if there are other TTS implementations in the codebase to compare
rg -n "async close\(\)" agents/src/tts/tts.ts | head -10Repository: livekit/agents-js
Length of output: 97
🏁 Script executed:
# Let's see the full base TTS class definition and close() method
sed -n '100,120p' agents/src/tts/tts.tsRepository: livekit/agents-js
Length of output: 608
🏁 Script executed:
# Let's check if there's documentation about TTS lifecycle
rg -B 3 -A 3 "async close" agents/src/tts/tts.ts | head -20Repository: livekit/agents-js
Length of output: 202
🏁 Script executed:
# Let's check the whole InworldTTS close method again and understand the full picture
sed -n '680,690p' plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 273
🏁 Script executed:
# Let's look at how the shared pool is initialized and understand its lifecycle
sed -n '586,610p' plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 833
🏁 Script executed:
# Check if there's any code that removes items from sharedPools
rg -n "sharedPools\." plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 135
🏁 Script executed:
# Verify the complete lifecycle: check all usages of getSharedPool
rg -n "getSharedPool\|sharedPools" plugins/inworld/src/tts.tsRepository: livekit/agents-js
Length of output: 43
🏁 Script executed:
# Check if there's a module-level cleanup or export function
rg -n "export.*function\|export.*const" plugins/inworld/src/tts.ts | tail -20Repository: livekit/agents-js
Length of output: 43
The shared pool's cleanup interval prevents process exit in short-lived contexts.
The ConnectionPool sets an active setInterval that runs every 60 seconds. Although ConnectionPool has a close() method that clears this interval, it is never invoked because:
TTS.close()is a no-op with no cleanup logic- The shared pool instances are stored indefinitely in a module-level
sharedPoolsMap with no cleanup mechanism
In tests or short-lived scripts, the active interval keeps the process alive. Use setInterval(...).unref?.() to make the timer non-blocking.
🩹 Minimal mitigation
this.#idleCleanupInterval = setInterval(() => {
this.#cleanupIdleConnections();
}, 60_000); // Check every minute
+ this.#idleCleanupInterval.unref?.();Also applies to: 586-597, 680-682
🤖 Prompt for AI Agents
In `@plugins/inworld/src/tts.ts` around lines 471 - 479, The recurring setInterval
in ConnectionPool's constructor (the `#idleCleanupInterval` created for
`#cleanupIdleConnections`) prevents process exit in short-lived contexts; update
the interval creation to call .unref() if available (e.g.,
setInterval(...).unref?.()) so the timer is non-blocking, and also ensure
resource cleanup by wiring TTS.close() to call ConnectionPool.close() and remove
entries from the module-level sharedPools Map so intervals are cleared when a
TTS instance is closed; apply the same .unref() fix to the other setInterval
usages mentioned (around the other constructors at the referenced locations).
|
@codex review this PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ef62402c77
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| async close() { | ||
| this.#pool.close(); | ||
| // No per-instance cleanup needed - pool is shared | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add shutdown path for shared pools
The new shared ConnectionPool starts a setInterval for idle cleanup, but TTS.close() is now a no-op and there is no other path that calls ConnectionPool.close(). Because pools are stored in a module-level map, those intervals and any open WebSockets will persist even after callers invoke close(), keeping the Node event loop alive and leaking resources in short-lived scripts or tests. Consider reference counting or exposing a shutdown API that clears the interval and closes pooled connections when no instances are using them.
Useful? React with 👍 / 👎.
| if (result.status && result.status.code !== 0) { | ||
| this.#logger.error({ contextId, status: result.status }, 'Inworld stream error'); | ||
| const ctx = this.#contexts.get(contextId); | ||
| if (ctx?.rejectWaiter) { | ||
| ctx.rejectWaiter(new Error(`Inworld error: ${result.status.message}`)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove contexts from pool on error status
When the server replies with a non-zero status, the handler rejects the waiter but leaves the context in #contexts and does not notify capacity. In cases like invalid config or server-side stream errors, this permanently consumes a slot, so hasCapacity can go false and acquireContext may block or create unnecessary extra connections. Consider deleting the context and signaling capacity available when a status error is received.
Useful? React with 👍 / 👎.
Description
Update Websockets implementation as per Inworld's connection/context setup
https://docs.inworld.ai/api-reference/ttsAPI/texttospeech/synthesize-speech-websocket
Pre-Review Checklist
Testing
Tested in examples/src/inworld_tts.ts
Summary by CodeRabbit
Release Notes
Bug Fixes
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.