Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions getstream/video/rtc/connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import logging
import uuid
import functools
from typing import Optional, Dict, Any

import aioice
Expand Down Expand Up @@ -281,6 +282,9 @@ async def _connect_coordinator_ws(self):
user_details={"id": self.user_id},
)
self._coordinator_ws_client.on_wildcard("*", _log_event)
self._coordinator_ws_client.on(
"custom", functools.partial(self.emit, "custom")
)
await self._coordinator_ws_client.connect()

with telemetry.start_as_current_span(
Expand Down
72 changes: 72 additions & 0 deletions tests/rtc/coordinator/test_custom_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
Integration tests for custom event pub/sub via coordinator WebSocket.

Tests the full round-trip: send a custom event via REST, receive it on the
ConnectionManager via its re-emitted "custom" event.

Requires Stream API credentials (STREAM_API_KEY, STREAM_API_SECRET).
"""

import asyncio
import logging
import uuid

import pytest
import pytest_asyncio

from getstream import AsyncStream
from getstream.models import CallRequest, UserRequest
from getstream.video import rtc
from getstream.video.rtc.connection_utils import ConnectionState
from tests.conftest import skip_on_rate_limit

logger = logging.getLogger(__name__)


@pytest_asyncio.fixture()
async def test_users(async_client: AsyncStream):
user_ids = [f"test-user-{uuid.uuid4()}" for _ in range(2)]
await async_client.upsert_users(*[UserRequest(id=uid) for uid in user_ids])
yield user_ids
try:
await async_client.delete_users(
user_ids=user_ids, user="hard", conversations="hard", messages="hard"
)
except Exception:
logger.warning("Failed to clean up test users %s", user_ids, exc_info=True)


@pytest.mark.asyncio
@pytest.mark.integration
@skip_on_rate_limit
async def test_custom_event_round_trip(async_client: AsyncStream, test_users: list):
"""Send a custom event via REST and verify it arrives on ConnectionManager."""
sender, receiver = test_users

call = async_client.video.call("default", str(uuid.uuid4()))
await call.get_or_create(data=CallRequest(created_by_id=sender))

async with await rtc.join(call, receiver) as connection:
assert connection.connection_state == ConnectionState.JOINED

received_event = None
event_received = asyncio.Event()

@connection.on("custom")
def on_custom(event):
nonlocal received_event
received_event = event
event_received.set()

await call.send_call_event(
user_id=sender,
custom={"type": "test_event", "payload": "hello from sender"},
)

await asyncio.wait_for(event_received.wait(), timeout=10.0)

assert received_event is not None
custom_data = received_event.get("custom", {})
assert custom_data.get("type") == "test_event"
assert custom_data.get("payload") == "hello from sender"
assert received_event.get("user", {}).get("id") == sender
Loading