From d67fdd9392d6a7c0e6820768c36289c380b830d3 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 12:42:38 -0500 Subject: [PATCH 01/11] fix: guard against None gateway in _startup_reset() When bellows connects to an EZSP coordinator over a TCP socket on Python 3.14, the gateway's wait_for_startup_reset() can fail with TypeError due to asyncio threading behavior changes. Add a null guard in _startup_reset() to raise a clear EzspError instead of an opaque TypeError, allowing the retry logic in startup_reset() to handle the failure cleanly. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/ezsp/__init__.py | 3 +++ tests/test_ezsp.py | 24 ++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 7166847e..6fd86226 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -117,6 +117,9 @@ def is_tcp_serial_port(self) -> bool: async def _startup_reset(self) -> None: """Start EZSP and reset the stack.""" + if self._gw is None: + raise EzspError("Gateway is not connected") + # `zigbeed` resets on startup if self.is_tcp_serial_port: try: diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index d548309a..483ad085 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -789,6 +789,30 @@ async def wait_forever(*args, **kwargs): assert version_mock.await_count == 1 +async def test_startup_reset_gw_none(): + """Test _startup_reset raises EzspError when gateway is None.""" + ezsp = make_ezsp( + config={ + **DEVICE_CONFIG, + zigpy.config.CONF_DEVICE_PATH: "socket://localhost:1234", + } + ) + ezsp._gw = None + + with pytest.raises(EzspError, match="Gateway is not connected"): + await ezsp._startup_reset() + + +async def test_disconnect_gw_none(): + """Test disconnect doesn't raise when gateway is already None.""" + ezsp = make_ezsp() + ezsp._gw = None + + await ezsp.disconnect() # Should not raise + + assert ezsp._gw is None + + async def test_wait_for_stack_status(ezsp_f): assert not ezsp_f._stack_status_listeners[t.sl_Status.NETWORK_DOWN] From 19dd27c027baeab2fd4ca6fbab1fbabce9a07ebc Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 13:34:30 -0500 Subject: [PATCH 02/11] fix: use inspect.iscoroutinefunction for Python 3.14 compat asyncio.iscoroutinefunction() was removed in Python 3.14. The ThreadsafeProxy was using it to detect async methods and dispatch them cross-thread via run_coroutine_threadsafe. When the check fails, the async method falls through to the sync path which returns None, causing `TypeError: 'NoneType' object can't be awaited`. Also raise ConnectionError instead of silently returning None when the secondary event loop is closed, so callers get a meaningful error instead of a TypeError. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/thread.py | 10 ++++++---- tests/test_thread.py | 3 ++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/bellows/thread.py b/bellows/thread.py index 4311768d..73d9e5fe 100644 --- a/bellows/thread.py +++ b/bellows/thread.py @@ -1,6 +1,7 @@ import asyncio from concurrent.futures import ThreadPoolExecutor import functools +import inspect import logging LOGGER = logging.getLogger(__name__) @@ -95,10 +96,11 @@ def func_wrapper(*args, **kwargs): if loop == curr_loop: return call() if loop.is_closed(): - # Disconnected - LOGGER.warning("Attempted to use a closed event loop") - return - if asyncio.iscoroutinefunction(func): + raise ConnectionError( + "Attempted to use a closed event loop, " + "the connection may have been lost" + ) + if inspect.iscoroutinefunction(func): future = asyncio.run_coroutine_threadsafe(call(), loop) return asyncio.wrap_future(future, loop=curr_loop) else: diff --git a/tests/test_thread.py b/tests/test_thread.py index 72efa701..e8d35bc2 100644 --- a/tests/test_thread.py +++ b/tests/test_thread.py @@ -157,7 +157,8 @@ async def test_proxy_loop_closed(): obj = mock.MagicMock() proxy = ThreadsafeProxy(obj, loop) loop.close() - proxy.test() + with pytest.raises(ConnectionError, match="closed event loop"): + proxy.test() assert obj.test.call_count == 0 From bd0fa158406fb50745e159b6252d641961b13586 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 13:51:54 -0500 Subject: [PATCH 03/11] fix: handle closed event loop in disconnect and update deprecated asyncio APIs - EZSP.disconnect() now catches ConnectionError from the proxy when the secondary thread's event loop is already closed, preventing cascading errors during cleanup - Replace deprecated asyncio.get_event_loop() calls with asyncio.get_running_loop() in thread.py and uart.py for Python 3.14 forward compatibility Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/ezsp/__init__.py | 7 +++++-- bellows/thread.py | 4 ++-- bellows/uart.py | 6 +++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 6fd86226..12e774be 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -223,8 +223,11 @@ async def get_xncp_features(self) -> xncp.FirmwareFeatures: async def disconnect(self): self.stop_ezsp() - if self._gw: - await self._gw.disconnect() + if self._gw is not None: + try: + await self._gw.disconnect() + except ConnectionError: + pass self._gw = None async def _command(self, name: str, *args: Any, **kwargs: Any) -> Any: diff --git a/bellows/thread.py b/bellows/thread.py index 73d9e5fe..eb8f62da 100644 --- a/bellows/thread.py +++ b/bellows/thread.py @@ -15,7 +15,7 @@ def __init__(self): self.thread_complete = None def run_coroutine_threadsafe(self, coroutine): - current_loop = asyncio.get_event_loop() + current_loop = asyncio.get_running_loop() future = asyncio.run_coroutine_threadsafe(coroutine, self.loop) return asyncio.wrap_future(future, loop=current_loop) @@ -31,7 +31,7 @@ def _thread_main(self, init_task): self.loop = None async def start(self): - current_loop = asyncio.get_event_loop() + current_loop = asyncio.get_running_loop() if self.loop is not None and not self.loop.is_closed(): return diff --git a/bellows/uart.py b/bellows/uart.py index af274dc8..bf8c5700 100644 --- a/bellows/uart.py +++ b/bellows/uart.py @@ -98,7 +98,7 @@ async def reset(self): return await self._reset_future self._transport.send_reset() - self._reset_future = asyncio.get_event_loop().create_future() + self._reset_future = asyncio.get_running_loop().create_future() self._reset_future.add_done_callback(self._reset_cleanup) async with asyncio_timeout(RESET_TIMEOUT): @@ -106,7 +106,7 @@ async def reset(self): async def _connect(config, api): - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() connection_done_future = loop.create_future() @@ -135,7 +135,7 @@ async def _connect(config, api): async def connect(config, api, use_thread=True): if use_thread: - api = ThreadsafeProxy(api, asyncio.get_event_loop()) + api = ThreadsafeProxy(api, asyncio.get_running_loop()) thread = EventLoopThread() await thread.start() try: From 1e1fd579d80ffffe03e1d3601f23ee9397438074 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 14:14:46 -0500 Subject: [PATCH 04/11] fix: pre-create startup reset future to close race window The EZSP coordinator sends a reset frame immediately after TCP connection. Previously, _startup_reset_future was only created when wait_for_startup_reset() was dispatched from the main thread through the ThreadsafeProxy. In Python 3.14, different event loop scheduling means the reset frame arrives before the future exists, causing reset_received() to call enter_failed_state() and close the secondary thread's event loop. Fix by pre-creating _startup_reset_future in _connect() right after the connection is established, so reset frames arriving during the proxy dispatch window are captured instead of treated as unexpected. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/uart.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/bellows/uart.py b/bellows/uart.py index bf8c5700..21803331 100644 --- a/bellows/uart.py +++ b/bellows/uart.py @@ -52,8 +52,8 @@ def error_received(self, code: t.NcpResetCode) -> None: async def wait_for_startup_reset(self) -> None: """Wait for the first reset frame on startup.""" - assert self._startup_reset_future is None - self._startup_reset_future = asyncio.get_running_loop().create_future() + if self._startup_reset_future is None: + self._startup_reset_future = asyncio.get_running_loop().create_future() try: await self._startup_reset_future @@ -129,6 +129,12 @@ async def _connect(config, api): await gateway.wait_until_connected() + # Pre-create the startup reset future so that reset frames arriving before + # wait_for_startup_reset() is called don't trigger enter_failed_state(). + # This closes a race window between _connect() returning to the main thread + # and wait_for_startup_reset() being dispatched back to this thread. + gateway._startup_reset_future = loop.create_future() + thread_safe_protocol = ThreadsafeProxy(gateway, loop) return thread_safe_protocol, connection_done_future From fa73046a8ab8a11a23fe662a173e5eade3fa4790 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 14:25:41 -0500 Subject: [PATCH 05/11] fix: move startup reset future creation before serial connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pre-created future was placed after wait_until_connected(), but in Python 3.14 the reset frame can arrive and be processed in the same event loop iteration that resolves the connection — before the _connect coroutine resumes. Move the future creation to before create_serial_connection() so it exists before any data can flow. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/uart.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/bellows/uart.py b/bellows/uart.py index 21803331..f38b4155 100644 --- a/bellows/uart.py +++ b/bellows/uart.py @@ -113,6 +113,11 @@ async def _connect(config, api): gateway = Gateway(api, connection_done_future) protocol = AshProtocol(gateway) + # Pre-create the startup reset future before opening the connection so that + # reset frames arriving immediately after connect are captured by + # reset_received() instead of triggering enter_failed_state(). + gateway._startup_reset_future = loop.create_future() + if config[zigpy.config.CONF_DEVICE_FLOW_CONTROL] is None: xon_xoff, rtscts = True, False else: @@ -129,12 +134,6 @@ async def _connect(config, api): await gateway.wait_until_connected() - # Pre-create the startup reset future so that reset frames arriving before - # wait_for_startup_reset() is called don't trigger enter_failed_state(). - # This closes a race window between _connect() returning to the main thread - # and wait_for_startup_reset() being dispatched back to this thread. - gateway._startup_reset_future = loop.create_future() - thread_safe_protocol = ThreadsafeProxy(gateway, loop) return thread_safe_protocol, connection_done_future From 097a9bc0dc97ccee06bc16e3822199354711f639 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 15:06:32 -0500 Subject: [PATCH 06/11] fix: skip threading for TCP socket connections The EventLoopThread and ThreadsafeProxy exist because pyserial uses blocking I/O that must run in a separate thread. TCP socket connections use native asyncio I/O and don't need threading at all. Running TCP connections on the main event loop eliminates the cross-thread race conditions that cause startup failures on Python 3.14: coroutine dispatch races, event loop lifecycle races, and future cancellation cleanup errors. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/uart.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/bellows/uart.py b/bellows/uart.py index f38b4155..32e6bc89 100644 --- a/bellows/uart.py +++ b/bellows/uart.py @@ -1,6 +1,7 @@ import asyncio from asyncio import timeout as asyncio_timeout import logging +import urllib.parse import zigpy.config import zigpy.serial @@ -139,6 +140,12 @@ async def _connect(config, api): async def connect(config, api, use_thread=True): + # TCP socket connections use native asyncio I/O and don't need a secondary + # thread. Threading is only required for pyserial's blocking serial I/O. + parsed_path = urllib.parse.urlparse(config[zigpy.config.CONF_DEVICE_PATH]) + if parsed_path.scheme in ("socket", "tcp"): + use_thread = False + if use_thread: api = ThreadsafeProxy(api, asyncio.get_running_loop()) thread = EventLoopThread() From 671ccc8c207d16176a01b612c00d5994d0664561 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 15:19:50 -0500 Subject: [PATCH 07/11] fix: revert use_thread=False, handle coroutine lifecycle in proxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revert the use_thread=False change for TCP sockets — the main HA event loop is too busy for serial protocol timing, causing transport disconnects. Instead, properly handle coroutine lifecycle in ThreadsafeProxy: if run_coroutine_threadsafe fails because the loop closed between the is_closed() check and the dispatch, close the coroutine to prevent the "coroutine was never awaited" RuntimeWarning and raise a clean ConnectionError. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/thread.py | 11 ++++++++++- bellows/uart.py | 7 ------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/bellows/thread.py b/bellows/thread.py index eb8f62da..270402f6 100644 --- a/bellows/thread.py +++ b/bellows/thread.py @@ -101,7 +101,16 @@ def func_wrapper(*args, **kwargs): "the connection may have been lost" ) if inspect.iscoroutinefunction(func): - future = asyncio.run_coroutine_threadsafe(call(), loop) + coro = call() + try: + future = asyncio.run_coroutine_threadsafe(coro, loop) + except RuntimeError: + # Loop closed between is_closed() check and dispatch + coro.close() + raise ConnectionError( + "Attempted to use a closed event loop, " + "the connection may have been lost" + ) return asyncio.wrap_future(future, loop=curr_loop) else: diff --git a/bellows/uart.py b/bellows/uart.py index 32e6bc89..f38b4155 100644 --- a/bellows/uart.py +++ b/bellows/uart.py @@ -1,7 +1,6 @@ import asyncio from asyncio import timeout as asyncio_timeout import logging -import urllib.parse import zigpy.config import zigpy.serial @@ -140,12 +139,6 @@ async def _connect(config, api): async def connect(config, api, use_thread=True): - # TCP socket connections use native asyncio I/O and don't need a secondary - # thread. Threading is only required for pyserial's blocking serial I/O. - parsed_path = urllib.parse.urlparse(config[zigpy.config.CONF_DEVICE_PATH]) - if parsed_path.scheme in ("socket", "tcp"): - use_thread = False - if use_thread: api = ThreadsafeProxy(api, asyncio.get_running_loop()) thread = EventLoopThread() From 0d97cee1faedb85956b71bd8cf6590b712dfdbb1 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 15:32:55 -0500 Subject: [PATCH 08/11] debug: add WARNING-level logging to diagnose startup failure Temporarily elevate connection_lost, reset_received, and error_received logging to WARNING level to diagnose why the secondary event loop closes during startup on Python 3.14. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/uart.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/bellows/uart.py b/bellows/uart.py index f38b4155..82043e0b 100644 --- a/bellows/uart.py +++ b/bellows/uart.py @@ -33,7 +33,12 @@ def data_received(self, data): def reset_received(self, code: t.NcpResetCode) -> None: """Reset acknowledgement frame receive handler""" - LOGGER.debug("Received reset: %r", code) + LOGGER.warning( + "Received reset: %r (reset_future=%s, startup_reset_future=%s)", + code, + self._reset_future, + self._startup_reset_future, + ) if self._reset_future and not self._reset_future.done(): self._reset_future.set_result(True) @@ -46,8 +51,9 @@ def reset_received(self, code: t.NcpResetCode) -> None: def error_received(self, code: t.NcpResetCode) -> None: """Error frame receive handler.""" if self._reset_future is not None or self._startup_reset_future is not None: - LOGGER.debug("Ignoring spurious error during reset: %r", code) + LOGGER.warning("Ignoring spurious error during reset: %r", code) else: + LOGGER.warning("Error received, entering failed state: %r", code) self._api.enter_failed_state(code) async def wait_for_startup_reset(self) -> None: @@ -68,7 +74,7 @@ def connection_lost(self, exc): """Port was closed unexpectedly.""" super().connection_lost(exc) - LOGGER.debug("Connection lost: %r", exc) + LOGGER.warning("Gateway connection lost: %r", exc) reason = exc or ConnectionResetError("Remote server closed connection") # XXX: The startup reset future must be resolved with an error *before* the From c561178649d308f60b6188e6b1b6cf766d1a1f96 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 15:38:50 -0500 Subject: [PATCH 09/11] fix: prevent transport auto-close on EOF for TCP connections AshProtocol.eof_received() was returning None (falsy), which tells asyncio to auto-close the transport when the remote end signals EOF. For serial-over-TCP connections (e.g. ser2net), the remote end may signal EOF during initialization in Python 3.14 without intending to fully close the connection. This caused connection_lost(None) to fire immediately, closing the secondary event loop before startup could complete. Return True from eof_received() to keep the transport open and let bellows manage the connection lifecycle explicitly via disconnect(). Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/ash.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/bellows/ash.py b/bellows/ash.py index 48b492ca..0d999399 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -379,6 +379,10 @@ def connection_lost(self, exc: Exception | None) -> None: def eof_received(self): self._ezsp_protocol.eof_received() + # Return True to prevent the transport from auto-closing. + # For serial-over-TCP connections (e.g. ser2net), the remote end may + # signal EOF during initialization without intending to close. + return True def _cancel_pending_data_frames( self, exc: BaseException = RuntimeError("Connection has been closed") From aae14f881f68dfa466ea81e58a77623fa0546a1a Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 15:48:43 -0500 Subject: [PATCH 10/11] debug: add WARNING-level logging for ASH data flow Log data_received, eof_received, and send_reset at WARNING level to diagnose whether data is flowing between bellows and the coordinator during startup. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/ash.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/bellows/ash.py b/bellows/ash.py index 0d999399..c76ab712 100644 --- a/bellows/ash.py +++ b/bellows/ash.py @@ -378,6 +378,7 @@ def connection_lost(self, exc: Exception | None) -> None: self._ezsp_protocol.connection_lost(exc) def eof_received(self): + _LOGGER.warning("EOF received from remote end") self._ezsp_protocol.eof_received() # Return True to prevent the transport from auto-closing. # For serial-over-TCP connections (e.g. ser2net), the remote end may @@ -449,7 +450,7 @@ def _unstuff_bytes(data: bytes) -> bytes: return out def data_received(self, data: bytes) -> None: - _LOGGER.debug("Received data %s", data.hex()) + _LOGGER.warning("ASH received %d bytes: %s", len(data), data[:32].hex()) self._buffer.extend(data) if len(self._buffer) > MAX_BUFFER_SIZE: @@ -746,5 +747,6 @@ async def send_data(self, data: bytes) -> None: ) def send_reset(self) -> None: + _LOGGER.warning("Sending ASH reset frame") # Some adapters seem to send a NAK immediately but still process the reset frame self._write_frame(RstFrame(), prefix=32 * (Reserved.CANCEL,)) From 7da58d8e8e8ab2f17b648084dfcb2280b65edaf7 Mon Sep 17 00:00:00 2001 From: Alex Autem Date: Sat, 18 Apr 2026 18:07:57 -0500 Subject: [PATCH 11/11] fix: force-close TCP socket when proxy dispatch fails in disconnect When the secondary event loop is dead and disconnect() cannot dispatch through the ThreadsafeProxy, the TCP socket stays open. This prevents ser2net from releasing the serial port, causing subsequent connection attempts to fail with "Device open failure". Force-close the underlying OS socket directly when the proxy dispatch fails, so ser2net detects the disconnect and releases the serial port for the next connection attempt. Co-Authored-By: Claude Opus 4.6 (1M context) --- bellows/ezsp/__init__.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 12e774be..3b0c339c 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -227,7 +227,17 @@ async def disconnect(self): try: await self._gw.disconnect() except ConnectionError: - pass + # The secondary event loop is dead. Force-close the + # underlying TCP socket so ser2net (or similar) releases + # the serial port for subsequent connection attempts. + try: + ash = self._gw._obj._transport + if ash is not None and ash._transport is not None: + sock = ash._transport.get_extra_info("socket") + if sock is not None: + sock.close() + except Exception: + pass self._gw = None async def _command(self, name: str, *args: Any, **kwargs: Any) -> Any: