Skip to content

Conversation

@cshape
Copy link
Contributor

@cshape cshape commented Jan 16, 2026

Description

Update Websockets implementation as per Inworld's connection/context setup

https://docs.inworld.ai/api-reference/ttsAPI/texttospeech/synthesize-speech-websocket

Pre-Review Checklist

  • Build passes: All builds (lint, typecheck, tests) pass locally
  • AI-generated code reviewed: Removed unnecessary comments and ensured code quality
  • Changes explained: All changes are properly documented and justified above
  • Scope appropriate: All changes relate to the PR title, or explanations provided for why they're included

Testing

Tested in examples/src/inworld_tts.ts

Summary by CodeRabbit

Release Notes

  • Bug Fixes

    • Fixed punctuation issue in Inworld TTS
  • Documentation

    • Added Inworld plugin TTS to the supported plugins table in README

✏️ Tip: You can customize this high-level summary in your review settings.

@changeset-bot
Copy link

changeset-bot bot commented Jan 16, 2026

🦋 Changeset detected

Latest commit: ef62402

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package
Name Type
@livekit/agents-plugin-inworld Minor

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@coderabbitai
Copy link

coderabbitai bot commented Jan 16, 2026

📝 Walkthrough

Walkthrough

A minor version bump changeset for the Inworld TTS plugin is introduced, along with documentation of the plugin in the README. The TTS implementation is refactored to use a connection pooling architecture with shared WebSocket connections, context lifecycle management, and per-connection capacity handling, replacing the previous per-instance WebSocket approach.

Changes

Cohort / File(s) Summary
Documentation & Configuration
.changeset/chatty-rockets-start.md, README.md
Added changeset entry for minor version bump (punctuation fix). Added inworld plugin row to supported plugins table with TTS capability.
TTS Connection Pooling Refactor
plugins/inworld/src/tts.ts
Introduced connection pooling layer with shared WebSocket connections managing multiple concurrent contexts. Added InworldConnection class for lifecycle management, new internal types (ContextState, ContextInfo, AcquireContextResult), and pooling logic supporting idle timeout and capacity limits. Refactored SynthesizeStream and ChunkedStream to acquire contexts from shared pool and push chunked text. Replaced per-context listener mechanism with callback-based message routing. Narrowed Encoding type signature (removed string union). Added public accessors wsURL and opts to TTS class. Updated tokenizer default to SentenceTokenizer with retainFormat option.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client Code
    participant TTS as TTS Instance
    participant Pool as ConnectionPool
    participant Conn as InworldConnection
    participant WS as WebSocket
    participant Server as Inworld Server

    Client->>TTS: synthesize(text)
    TTS->>Pool: acquireContext()
    Pool->>Pool: check capacity & pooled connections
    alt Need new connection
        Pool->>Conn: new InworldConnection()
        Conn->>WS: connect to wsURL
        WS->>Server: WebSocket upgrade
        Server-->>WS: connection established
    end
    Pool-->>TTS: return context (contextId)
    TTS->>Conn: sendText(contextId, chunks)
    Conn->>WS: send CREATE_CONTEXT + TEXT messages
    WS->>Server: forward messages
    Server->>Server: process TTS request
    Server->>WS: emit audio frames & CONTEXT_ENDED
    WS->>Conn: handleMessage(message)
    Conn->>Conn: update ContextState (audio, status)
    Conn->>TTS: call context listener callback
    TTS-->>Client: emit audio frames
    Server->>WS: CONTEXT_ENDED signal
    WS->>Conn: handleMessage(CONTEXT_ENDED)
    Conn->>Conn: resolve context waiter
    TTS->>Pool: releaseContext(contextId)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 Connections pooled and shared with care,
No more WebSockets floating in air!
Contexts dancing in harmony,
Efficiency blooms for all to see.

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The description is incomplete. It lacks a 'Changes Made' section detailing the specific modifications, and the pre-review checklist is only partially completed with missing video demo and testing evidence. Expand the description to include a detailed 'Changes Made' section outlining the connection pooling, context management refactoring, and other structural changes. Complete the remaining pre-review checklist items, particularly the video demo verification.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Inworld websocket improvements' accurately summarizes the main change: refactoring the Inworld TTS WebSocket implementation to support connection pooling and updated context management.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/inworld/src/tts.ts (1)

908-961: Ensure context cleanup on error paths in SynthesizeStream.

If sendLoop, flushContext, or waiter rejects, the context can remain open and capacity never returns to the pool. Close the context on failure.

🩹 Proposed fix
     try {
       // Acquire a context from the shared pool
       const acquired = await pool.acquireContext(handleMessage, config);
       contextId = acquired.contextId;
       connection = acquired.connection;
       waiter = acquired.waiter;
@@
       await waiter;
@@
       for (const frame of bstream.flush()) {
         this.queue.put({
           requestId: contextId,
           segmentId: contextId,
           frame,
           final: false,
         });
       }
     } catch (e) {
       this.#logger.error({ error: e, contextId }, 'Error in SynthesizeStream run');
+      if (connection && contextId) {
+        try {
+          await connection.closeContext(contextId);
+        } catch (closeErr) {
+          this.#logger.warn({ error: closeErr, contextId }, 'Failed to close context after error');
+        }
+      }
       throw e;
     }
🤖 Fix all issues with AI agents
In `@plugins/inworld/src/tts.ts`:
- Around line 220-255: In acquireContext, if this.#sendCreateContext(contextId,
config) throws the created ContextInfo remains in this.#contexts and capacity is
leaked; wrap the send in try/catch, and on error remove the contextId from
this.#contexts, clear or call rejectWaiter(err) if set, then rethrow the error;
ensure this.#lastActivityAt is only left as-is or updated appropriately. Apply
the same cleanup pattern to the other send-create-context call noted (lines
446-448) so any failed send removes the context entry and signals the waiter.
- Around line 471-479: The recurring setInterval in ConnectionPool's constructor
(the `#idleCleanupInterval` created for `#cleanupIdleConnections`) prevents process
exit in short-lived contexts; update the interval creation to call .unref() if
available (e.g., setInterval(...).unref?.()) so the timer is non-blocking, and
also ensure resource cleanup by wiring TTS.close() to call
ConnectionPool.close() and remove entries from the module-level sharedPools Map
so intervals are cleared when a TTS instance is closed; apply the same .unref()
fix to the other setInterval usages mentioned (around the other constructors at
the referenced locations).
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between df523ab and ef62402.

📒 Files selected for processing (3)
  • .changeset/chatty-rockets-start.md
  • README.md
  • plugins/inworld/src/tts.ts
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/inworld/src/tts.ts (1)
agents/src/utils.ts (1)
  • shortuuid (554-556)
🔇 Additional comments (2)
README.md (1)

74-74: Inworld plugin entry looks good.

Clear addition to the supported plugins table.

.changeset/chatty-rockets-start.md (1)

1-5: Changeset entry is clear.

Version bump and summary look fine.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +220 to +255
async acquireContext(
listener: (msg: InworldMessage) => void,
config: CreateContextConfig,
): Promise<{ contextId: string; waiter: Promise<void> }> {
if (!this.hasCapacity) {
throw new Error('Connection has no capacity for new contexts');
}

// Ensure connection is established
await this.#ensureConnected();

const contextId = shortuuid();
let resolveWaiter: (() => void) | null = null;
let rejectWaiter: ((err: Error) => void) | null = null;

const waiter = new Promise<void>((resolve, reject) => {
resolveWaiter = resolve;
rejectWaiter = reject;
});

const contextInfo: ContextInfo = {
contextId,
state: ContextState.CREATING,
listener,
resolveWaiter,
rejectWaiter,
createdAt: Date.now(),
};

this.#contexts.set(contextId, contextInfo);
this.#lastActivityAt = Date.now();

// Send create context command
await this.#sendCreateContext(contextId, config);

return { contextId, waiter };
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard against context leaks when create-context send fails.

If #sendCreateContext throws, the context stays in #contexts, permanently reducing capacity. Clean it up on error.

🩹 Proposed fix
     this.#contexts.set(contextId, contextInfo);
     this.#lastActivityAt = Date.now();
 
     // Send create context command
-    await this.#sendCreateContext(contextId, config);
+    try {
+      await this.#sendCreateContext(contextId, config);
+    } catch (err) {
+      this.#contexts.delete(contextId);
+      if (rejectWaiter) {
+        const error = err instanceof Error ? err : new Error(String(err));
+        rejectWaiter(error);
+      }
+      throw err;
+    }
 
     return { contextId, waiter };

Also applies to: 446-448

🤖 Prompt for AI Agents
In `@plugins/inworld/src/tts.ts` around lines 220 - 255, In acquireContext, if
this.#sendCreateContext(contextId, config) throws the created ContextInfo
remains in this.#contexts and capacity is leaked; wrap the send in try/catch,
and on error remove the contextId from this.#contexts, clear or call
rejectWaiter(err) if set, then rethrow the error; ensure this.#lastActivityAt is
only left as-is or updated appropriately. Apply the same cleanup pattern to the
other send-create-context call noted (lines 446-448) so any failed send removes
the context entry and signals the waiter.

Comment on lines +471 to +479
constructor(url: string, auth: string) {
this.#url = url;
this.#auth = auth;

// Start idle connection cleanup
this.#idleCleanupInterval = setInterval(() => {
this.#cleanupIdleConnections();
}, 60_000); // Check every minute
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's locate and examine the TTS file
find . -name "tts.ts" -type f | head -5

Repository: livekit/agents-js

Length of output: 206


🏁 Script executed:

# Get the file size to understand how large it is
wc -l plugins/inworld/src/tts.ts 2>/dev/null || find . -path "*plugins/inworld/src/tts.ts" -exec wc -l {} \;

Repository: livekit/agents-js

Length of output: 91


🏁 Script executed:

# Search for the TTS class and close method
rg -n "close\(\)" plugins/inworld/src/tts.ts 2>/dev/null | head -20

Repository: livekit/agents-js

Length of output: 232


🏁 Script executed:

# Let's read the specific lines mentioned: 471-479 and around the close method
sed -n '450,650p' plugins/inworld/src/tts.ts 2>/dev/null || find . -path "*plugins/inworld/src/tts.ts" -exec sed -n '450,650p' {} \;

Repository: livekit/agents-js

Length of output: 5652


🏁 Script executed:

# Find TTS.close() method and the lines mentioned in "Also applies to"
sed -n '570,700p' plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 3390


🏁 Script executed:

# Also get the export and see if TTS exposes a close method
rg -n "close.*\(\)" plugins/inworld/src/tts.ts | head -20

Repository: livekit/agents-js

Length of output: 265


🏁 Script executed:

# Check lines 586-597 and get context around them
sed -n '580,600p' plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 555


🏁 Script executed:

# Let's see the full file structure to understand what's at line 586-597
sed -n '586,597p' plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 437


🏁 Script executed:

# Find where getSharedPool is called
rg -n "getSharedPool" plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 221


🏁 Script executed:

# Get the ChunkedStream and SynthesizeStream classes to see if they use the pool
sed -n '700,850p' plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 4549


🏁 Script executed:

# Check if there are any tests or examples that might show the issue
find . -type f \( -name "*.test.ts" -o -name "*.spec.ts" -o -name "example*.ts" \) | xargs grep -l "inworld.*TTS\|new TTS" 2>/dev/null | head -5

Repository: livekit/agents-js

Length of output: 248


🏁 Script executed:

# Let's check the base TTS class to understand the expected close() behavior
rg -n "class TTS" plugins/inworld/src/tts.ts -A 5

Repository: livekit/agents-js

Length of output: 240


🏁 Script executed:

# Check if there are other TTS implementations in the codebase to compare
rg -n "async close\(\)" agents/src/tts/tts.ts | head -10

Repository: livekit/agents-js

Length of output: 97


🏁 Script executed:

# Let's see the full base TTS class definition and close() method
sed -n '100,120p' agents/src/tts/tts.ts

Repository: livekit/agents-js

Length of output: 608


🏁 Script executed:

# Let's check if there's documentation about TTS lifecycle
rg -B 3 -A 3 "async close" agents/src/tts/tts.ts | head -20

Repository: livekit/agents-js

Length of output: 202


🏁 Script executed:

# Let's check the whole InworldTTS close method again and understand the full picture
sed -n '680,690p' plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 273


🏁 Script executed:

# Let's look at how the shared pool is initialized and understand its lifecycle
sed -n '586,610p' plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 833


🏁 Script executed:

# Check if there's any code that removes items from sharedPools
rg -n "sharedPools\." plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 135


🏁 Script executed:

# Verify the complete lifecycle: check all usages of getSharedPool
rg -n "getSharedPool\|sharedPools" plugins/inworld/src/tts.ts

Repository: livekit/agents-js

Length of output: 43


🏁 Script executed:

# Check if there's a module-level cleanup or export function
rg -n "export.*function\|export.*const" plugins/inworld/src/tts.ts | tail -20

Repository: livekit/agents-js

Length of output: 43


The shared pool's cleanup interval prevents process exit in short-lived contexts.

The ConnectionPool sets an active setInterval that runs every 60 seconds. Although ConnectionPool has a close() method that clears this interval, it is never invoked because:

  1. TTS.close() is a no-op with no cleanup logic
  2. The shared pool instances are stored indefinitely in a module-level sharedPools Map with no cleanup mechanism

In tests or short-lived scripts, the active interval keeps the process alive. Use setInterval(...).unref?.() to make the timer non-blocking.

🩹 Minimal mitigation
     this.#idleCleanupInterval = setInterval(() => {
       this.#cleanupIdleConnections();
     }, 60_000); // Check every minute
+    this.#idleCleanupInterval.unref?.();

Also applies to: 586-597, 680-682

🤖 Prompt for AI Agents
In `@plugins/inworld/src/tts.ts` around lines 471 - 479, The recurring setInterval
in ConnectionPool's constructor (the `#idleCleanupInterval` created for
`#cleanupIdleConnections`) prevents process exit in short-lived contexts; update
the interval creation to call .unref() if available (e.g.,
setInterval(...).unref?.()) so the timer is non-blocking, and also ensure
resource cleanup by wiring TTS.close() to call ConnectionPool.close() and remove
entries from the module-level sharedPools Map so intervals are cleared when a
TTS instance is closed; apply the same .unref() fix to the other setInterval
usages mentioned (around the other constructors at the referenced locations).

@toubatbrian
Copy link
Contributor

@codex review this PR

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ef62402c77

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 680 to 682
async close() {
this.#pool.close();
// No per-instance cleanup needed - pool is shared
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Add shutdown path for shared pools

The new shared ConnectionPool starts a setInterval for idle cleanup, but TTS.close() is now a no-op and there is no other path that calls ConnectionPool.close(). Because pools are stored in a module-level map, those intervals and any open WebSockets will persist even after callers invoke close(), keeping the Node event loop alive and leaking resources in short-lived scripts or tests. Consider reference counting or exposing a shutdown API that clears the interval and closes pooled connections when no instances are using them.

Useful? React with 👍 / 👎.

Comment on lines +412 to +416
if (result.status && result.status.code !== 0) {
this.#logger.error({ contextId, status: result.status }, 'Inworld stream error');
const ctx = this.#contexts.get(contextId);
if (ctx?.rejectWaiter) {
ctx.rejectWaiter(new Error(`Inworld error: ${result.status.message}`));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Remove contexts from pool on error status

When the server replies with a non-zero status, the handler rejects the waiter but leaves the context in #contexts and does not notify capacity. In cases like invalid config or server-side stream errors, this permanently consumes a slot, so hasCapacity can go false and acquireContext may block or create unnecessary extra connections. Consider deleting the context and signaling capacity available when a status error is received.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants