diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index b8d992f1..6c6f2879 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -2,6 +2,7 @@ import asyncio import logging import uuid +import functools from typing import Optional, Dict, Any import aioice @@ -281,6 +282,9 @@ async def _connect_coordinator_ws(self): user_details={"id": self.user_id}, ) self._coordinator_ws_client.on_wildcard("*", _log_event) + self._coordinator_ws_client.on( + "custom", functools.partial(self.emit, "custom") + ) await self._coordinator_ws_client.connect() with telemetry.start_as_current_span( diff --git a/tests/rtc/coordinator/test_custom_events.py b/tests/rtc/coordinator/test_custom_events.py new file mode 100644 index 00000000..6795fe2e --- /dev/null +++ b/tests/rtc/coordinator/test_custom_events.py @@ -0,0 +1,72 @@ +""" +Integration tests for custom event pub/sub via coordinator WebSocket. + +Tests the full round-trip: send a custom event via REST, receive it on the +ConnectionManager via its re-emitted "custom" event. + +Requires Stream API credentials (STREAM_API_KEY, STREAM_API_SECRET). +""" + +import asyncio +import logging +import uuid + +import pytest +import pytest_asyncio + +from getstream import AsyncStream +from getstream.models import CallRequest, UserRequest +from getstream.video import rtc +from getstream.video.rtc.connection_utils import ConnectionState +from tests.conftest import skip_on_rate_limit + +logger = logging.getLogger(__name__) + + +@pytest_asyncio.fixture() +async def test_users(async_client: AsyncStream): + user_ids = [f"test-user-{uuid.uuid4()}" for _ in range(2)] + await async_client.upsert_users(*[UserRequest(id=uid) for uid in user_ids]) + yield user_ids + try: + await async_client.delete_users( + user_ids=user_ids, user="hard", conversations="hard", messages="hard" + ) + except Exception: + logger.warning("Failed to clean up test users %s", user_ids, exc_info=True) + + +@pytest.mark.asyncio +@pytest.mark.integration +@skip_on_rate_limit +async def test_custom_event_round_trip(async_client: AsyncStream, test_users: list): + """Send a custom event via REST and verify it arrives on ConnectionManager.""" + sender, receiver = test_users + + call = async_client.video.call("default", str(uuid.uuid4())) + await call.get_or_create(data=CallRequest(created_by_id=sender)) + + async with await rtc.join(call, receiver) as connection: + assert connection.connection_state == ConnectionState.JOINED + + received_event = None + event_received = asyncio.Event() + + @connection.on("custom") + def on_custom(event): + nonlocal received_event + received_event = event + event_received.set() + + await call.send_call_event( + user_id=sender, + custom={"type": "test_event", "payload": "hello from sender"}, + ) + + await asyncio.wait_for(event_received.wait(), timeout=10.0) + + assert received_event is not None + custom_data = received_event.get("custom", {}) + assert custom_data.get("type") == "test_event" + assert custom_data.get("payload") == "hello from sender" + assert received_event.get("user", {}).get("id") == sender