diff --git a/README.md b/README.md index 98c8f4d..53d0e35 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ relies on the PlugApi as well. There are also some small utilities included,`ps-plugevents` and `ps-rawplug` showcasing the use of the first interface approach, and `ps-events` the latter. . -The `ps-events` is effectively a consumer of the the PowersensorDevices event +The `ps-events` is effectively a consumer the PowersensorDevices event stream and dumps all events to standard out. Similary, `ps-plugevents` shows the event stream from a single plug (plus whatever it might be relaying for), and `ps-rawplug` shows the raw event stream from the plug. Note that the format diff --git a/pyproject.toml b/pyproject.toml index b121353..c83b81a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,12 +25,16 @@ Issues = "https://github.com/DiUS/python-powersensor_local/issues" ps-events = "powersensor_local.events:app" ps-rawplug = "powersensor_local.rawplug:app" ps-plugevents = "powersensor_local.plugevents:app" +ps-zcevents = "powersensor_local.zc_events:app" [build-system] requires = [ "hatchling" ] build-backend = "hatchling.build" [project.optional-dependencies] +zeroconf = [ + "zeroconf>=0.38.0", +] docs = [ "sphinx>=7.0.0", "sphinx-rtd-theme>=1.3.0", diff --git a/requirements.test.txt b/requirements.test.txt index b08591f..8fd6ba3 100644 --- a/requirements.test.txt +++ b/requirements.test.txt @@ -1,3 +1,4 @@ mypy pytest pytest-coverage +zeroconf \ No newline at end of file diff --git a/src/powersensor_local/__init__.py b/src/powersensor_local/__init__.py index 3b57563..f10a99d 100644 --- a/src/powersensor_local/__init__.py +++ b/src/powersensor_local/__init__.py @@ -39,12 +39,16 @@ 'PlugApi', '__version__', 'PlugListenerTcp', - 'PlugListenerUdp' + 'PlugListenerUdp', + 'PowersensorDevices', + 'PowersensorLegacyDevices', + 'PowersensorZeroconfDevices', ] -__version__ = "2.1.2" -from .devices import PowersensorDevices +__version__ = "2.1.3" +from .devices import PowersensorDevices, PowersensorLegacyDevices from .legacy_discovery import LegacyDiscovery from .plug_api import PlugApi from .plug_listener_tcp import PlugListenerTcp from .plug_listener_udp import PlugListenerUdp from .virtual_household import VirtualHousehold +from .zeroconf_devices import PowersensorZeroconfDevices diff --git a/src/powersensor_local/devices.py b/src/powersensor_local/devices.py old mode 100644 new mode 100755 index 4093ee9..d0f811a --- a/src/powersensor_local/devices.py +++ b/src/powersensor_local/devices.py @@ -1,4 +1,4 @@ -"""Abstraction interface for unified event stream from Powersensor devices""" +"""Abstraction interface for unified event stream from Powersensor devices.""" import asyncio import sys @@ -15,188 +15,273 @@ EXPIRY_CHECK_INTERVAL_S = 30 EXPIRY_TIMEOUT_S = 5 * 60 -class PowersensorDevices: - """Abstraction interface for the unified event stream from all Powersensor - devices on the local network. +_KNOWN_PLUG_EVENTS = [ + 'average_flow', + 'average_power', + 'average_power_components', + 'battery_level', + 'exception', + 'now_relaying_for', + 'radio_signal_quality', + 'summation_energy', + 'summation_volume', +] + + +class _PowersensorDevicesBase: + """Shared base for PowersensorLegacyDevices and PowersensorZeroconfDevices. + + Manages the PlugApi lifecycle, device tracking, expiry, and the event + callback. Subclasses are responsible for discovery — they call + _plug_discovered(mac, ip, port) when a plug appears and + _plug_lost(mac) when one disappears. """ - def __init__(self, bcast_addr=''): - """Creates a fresh instance, without scanning for devices.""" + def __init__(self, relay_now_relaying_for: bool = False) -> None: + """Initialise the base. + + Parameters + ---------- + relay_now_relaying_for: + When False (default), ``now_relaying_for`` messages are consumed + internally to synthesise ``device_found`` / ``device_lost`` events, + matching the behaviour of the original PowersensorDevices class. + When True the raw ``now_relaying_for`` event is forwarded to the + caller's callback unchanged, in addition to any ``device_found`` + synthesis. Set this to True when the caller wants to inspect relay + metadata directly (e.g. the HA dispatcher). + """ self._event_cb = None - self._discovery = LegacyDiscovery(bcast_addr) - self._devices = {} - self._timer = None - self._plug_apis = {} - - async def start(self, async_event_cb): - """Registers the async event callback function and starts the scan - of the local network to discover present devices. The callback is - of the form + self._devices: dict[str, '_PowersensorDevicesBase._Device'] = {} + self._plug_apis: dict[str, PlugApi] = {} + self._timer: '_PowersensorDevicesBase._Timer | None' = None + self._relay_now_relaying_for = relay_now_relaying_for - async def yourcallback(event: dict) -> None - - Known events: - - **scan_complete** - Indicates the discovery of Powersensor devices has completed. - Emitted in response to start() and rescan() calls. - The number of found gateways (plugs) is reported. + # ------------------------------------------------------------------ + # Public subscription API + # ------------------------------------------------------------------ - { event: "scan_complete", gateway_count: N } - - **device_found** - A new device found on the network. - The order found devices are announced is not fixed. - - { event: "device_found", - device_type: "plug" or "sensor", - mac: "...", - } + def subscribe(self, mac: str) -> None: + """Subscribe to events from the device with the given MAC address.""" + device = self._devices.get(mac) + if device: + device.subscribed = True - **device_lost** - A device appears to no longer be present on the network. + def unsubscribe(self, mac: str) -> None: + """Unsubscribe from events from the given MAC address.""" + device = self._devices.get(mac) + if device: + device.subscribed = False - { event: "device_lost", mac: "..." } + # ------------------------------------------------------------------ + # Teardown + # ------------------------------------------------------------------ - Additionally, all events described in xlatemsg.translate_raw_message - may be issued. The event name is inserted into the field 'event'. + async def stop(self) -> None: + """Stop event streaming and disconnect from all devices. - The start function returns the number of found gateway plugs. - Powersensor devices aren't found directly as they are typically not - on the network, but are instead detected when they relay data through - a plug via long-range radio. + To restart, call start() (legacy) or add_plug() (zeroconf) again. """ - self._event_cb = async_event_cb - await self._on_scanned(await self._discovery.scan()) - self._timer = self._Timer(EXPIRY_CHECK_INTERVAL_S, self._on_timer) - return len(self._plug_apis) - - async def rescan(self): - """Performs a fresh scan of the network to discover added devices, - or devices which have changed their IP address for some reason.""" - await self._on_scanned(await self._discovery.scan()) - - async def stop(self): - """Stops the event streaming and disconnects from the devices. - To restart the event streaming, call start() again.""" - for plug in self._plug_apis.values(): + for plug in list(self._plug_apis.values()): await plug.disconnect() - self._plug_apis = {} + self._plug_apis.clear() self._event_cb = None if self._timer: self._timer.terminate() self._timer = None - def subscribe(self, mac): - """Subscribes to events from the device with the given MAC address.""" - device = self._devices.get(mac) - if device: - device.subscribed = True + # ------------------------------------------------------------------ + # Called by subclasses when discovery reports changes + # ------------------------------------------------------------------ - def unsubscribe(self, mac): - """Unsubscribes from events from the given MAC address.""" - device = self._devices.get(mac) - if device: - device.subscribed = False + async def _plug_discovered(self, mac: str, ip: str, port: int) -> None: + """Called by the subclass when a plug is found or updated. - async def _emit_if_subscribed(self, ev, obj): + Creates a PlugApi for the plug if one doesn't exist yet, or + reconnects with a new address if the IP/port has changed. + """ + if mac in self._plug_apis: + api = self._plug_apis[mac] + if api.ip_address == ip and api.port == port: + return # no change + # Address changed — disconnect stale connection and reconnect. + await api.disconnect() + self._plug_apis.pop(mac) + + await self._add_device(mac, 'plug') + api = PlugApi(mac, ip, port) + self._plug_apis[mac] = api + for event in _KNOWN_PLUG_EVENTS: + api.subscribe(event, self._reemit) + api.connect() + + async def _plug_lost(self, mac: str) -> None: + """Called by the subclass when a plug has definitively disappeared.""" + if mac in self._plug_apis: + await self._plug_apis.pop(mac).disconnect() + await self._remove_device(mac) + + # ------------------------------------------------------------------ + # Internal event routing + # ------------------------------------------------------------------ + + async def _emit_if_subscribed(self, ev: str, mac: str, obj: dict) -> None: if self._event_cb is None: return - device = self._devices.get(obj.get('mac')) + device = self._devices.get(mac) if device is not None and device.subscribed: obj['event'] = ev await self._event_cb(obj) - async def _reemit(self, ev, obj): - mac = obj['mac'] + async def _reemit(self, ev: str, obj: dict[str, str]) -> None: + mac: str|None = obj.get('mac') + if mac is None: + # we don't log anything in this library, but if we did perhaps + # _LOGGER.warning("Received event '%s' with no MAC address -- ignoring", ev) might be appropriate + # for now...silence + return device = self._devices.get(mac) if device is not None: device.mark_active() if ev == 'now_relaying_for': await self._add_device(mac, 'sensor') + if self._relay_now_relaying_for and self._event_cb is not None: + obj['event'] = ev + await self._event_cb(obj) else: - await self._emit_if_subscribed(ev, obj) - - async def _on_scanned(self, found): - for device in found: - mac = device['id'] - ip = device['ip'] - if not mac in self._devices: - await self._add_device(mac, 'plug') - api = PlugApi(mac, ip) - self._plug_apis[mac] = api - api.subscribe('average_flow', self._reemit) - api.subscribe('average_power', self._reemit) - api.subscribe('average_power_components', self._reemit) - api.subscribe('battery_level', self._reemit) - api.subscribe('exception', self._reemit) - api.subscribe('now_relaying_for', self._reemit) - api.subscribe('radio_signal_quality', self._reemit) - api.subscribe('summation_energy', self._reemit) - api.subscribe('summation_volume', self._reemit) - api.connect() - - await self._event_cb({ - 'event': 'scan_complete', - 'gateway_count': len(found), - }) - - async def _on_timer(self): - devices = list(self._devices.values()) - for device in devices: - if device.has_expired(): - await self._remove_device(device.mac) + await self._emit_if_subscribed(ev, mac, obj) - async def _add_device(self, mac, typ): + async def _add_device(self, mac: str, typ: str) -> None: if mac in self._devices: return self._devices[mac] = self._Device(mac) - await self._event_cb({ - 'event': 'device_found', - 'mac': mac, - 'device_type': typ, - }) - - async def _remove_device(self, mac): - if mac in self._devices: - self._devices.pop(mac) + if self._event_cb is not None: await self._event_cb({ - 'event': 'device_lost', + 'event': 'device_found', 'mac': mac, + 'device_type': typ, }) - ### Supporting classes ### + async def _remove_device(self, mac: str) -> None: + if mac in self._devices: + self._devices.pop(mac) + if self._event_cb is not None: + await self._event_cb({ + 'event': 'device_lost', + 'mac': mac, + }) + + async def _on_timer(self) -> None: + for device in list(self._devices.values()): + if device.has_expired(): + await self._remove_device(device.mac) + + def _start_expiry_timer(self) -> None: + self._timer = self._Timer(EXPIRY_CHECK_INTERVAL_S, self._on_timer) + + # ------------------------------------------------------------------ + # Supporting inner classes + # ------------------------------------------------------------------ class _Device: - def __init__(self, mac): + def __init__(self, mac: str) -> None: self.mac = mac self.subscribed = False self._last_active = datetime.now(timezone.utc) - def mark_active(self): - """Updates the last activity time to prevent expiry.""" + def mark_active(self) -> None: + """Update the last activity timestamp to prevent expiry.""" self._last_active = datetime.now(timezone.utc) - def has_expired(self): - """Checks whether the last activity time is past the expiry.""" - now = datetime.now(timezone.utc) - delta = now - self._last_active + def has_expired(self) -> bool: + """Return True if last activity is past the expiry window.""" + delta = datetime.now(timezone.utc) - self._last_active return delta.total_seconds() > EXPIRY_TIMEOUT_S - class _Timer: # pylint: disable=R0903 - def __init__(self, interval_s, callback): + class _Timer: + def __init__(self, interval_s: float, callback) -> None: self._terminate = False self._interval = interval_s self._callback = callback self._task = asyncio.create_task(self._run()) - def terminate(self): - """Disables the timer and cancels the associated task.""" + def terminate(self) -> None: + """Cancel the timer task.""" self._terminate = True self._task.cancel() - async def _run(self): + async def _run(self) -> None: while not self._terminate: await asyncio.sleep(self._interval) await self._callback() + + +class PowersensorLegacyDevices(_PowersensorDevicesBase): + """Abstraction interface for the unified event stream from all Powersensor + devices on the local network, using the legacy broadcast UDP discovery. + + This is the original PowersensorDevices implementation, renamed to make + room for PowersensorZeroconfDevices. The name PowersensorDevices is kept + as an alias for backwards compatibility. + """ + + def __init__( + self, + bcast_addr: str = '', + relay_now_relaying_for: bool = False, + ) -> None: + """Create a fresh instance, without scanning for devices.""" + super().__init__(relay_now_relaying_for=relay_now_relaying_for) + self._discovery = LegacyDiscovery(bcast_addr) + + async def start(self, async_event_cb) -> int: + """Register the async event callback and scan the local network. + + The callback has the form:: + + async def yourcallback(event: dict) -> None + + Known lifecycle events emitted: + + **scan_complete** + Indicates discovery has completed. + ``{ event: "scan_complete", gateway_count: N }`` + + **device_found** + A new device found on the network. + ``{ event: "device_found", device_type: "plug"|"sensor", mac: "..." }`` + + **device_lost** + A device appears to no longer be present. + ``{ event: "device_lost", mac: "..." }`` + + Additionally all events from xlatemsg.translate_raw_message may be + issued, with the event name inserted into the ``event`` field. + + Returns the number of gateway plugs found. + """ + self._event_cb = async_event_cb + await self._on_scanned(await self._discovery.scan()) + self._start_expiry_timer() + return len(self._plug_apis) + + async def rescan(self) -> None: + """Perform a fresh scan to discover added or moved devices.""" + await self._on_scanned(await self._discovery.scan()) + + async def _on_scanned(self, found: list) -> None: + for device in found: + mac = device['id'] + ip = device['ip'] + await self._plug_discovered(mac, ip, 49476) + + if self._event_cb is not None: + await self._event_cb({ + 'event': 'scan_complete', + 'gateway_count': len(self._plug_apis), + }) + + +# Backwards-compatible alias. +PowersensorDevices = PowersensorLegacyDevices diff --git a/src/powersensor_local/zc_events.py b/src/powersensor_local/zc_events.py new file mode 100644 index 0000000..b7bfdf3 --- /dev/null +++ b/src/powersensor_local/zc_events.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 + +"""Firehose event stream from all Powersensor devices via mDNS/zeroconf discovery. + +Intended for debugging use only. Please use the proper interface in +zeroconf_devices.py rather than parsing the output from this script. + +All events from all discovered plugs and their relayed sensors are printed +to stdout as they arrive. Every device is subscribed to automatically. + +Requires the zeroconf extra:: + + pip install powersensor-local[zeroconf] +""" +import sys +from pathlib import Path + +PROJECT_ROOT = str(Path(__file__).parents[1]) +if PROJECT_ROOT not in sys.path: + sys.path.append(PROJECT_ROOT) + +# pylint: disable=C0413 +from powersensor_local.abstract_event_handler import AbstractEventHandler +from powersensor_local.zeroconf_devices import PowersensorZeroconfDevices + + +class ZcEventLoopRunner(AbstractEventHandler): + """Main logic wrapper.""" + + def __init__(self) -> None: + self.devices: PowersensorZeroconfDevices = PowersensorZeroconfDevices( + relay_now_relaying_for=True, + ) + + async def on_exit(self) -> None: + await self.devices.stop() + + async def on_message(self, obj: dict) -> None: + """Print every event and subscribe to any newly discovered device.""" + print(obj) + if obj['event'] == 'device_found': + self.devices.subscribe(obj['mac']) + + async def main(self) -> None: + self.register_sigint_handler() + await self.devices.start(self.on_message) + await self.wait() + + +def app() -> None: + """Application entry point.""" + ZcEventLoopRunner().run() + + +if __name__ == '__main__': + app() diff --git a/src/powersensor_local/zeroconf_devices.py b/src/powersensor_local/zeroconf_devices.py new file mode 100755 index 0000000..60f5ed9 --- /dev/null +++ b/src/powersensor_local/zeroconf_devices.py @@ -0,0 +1,379 @@ +"""Zeroconf/mDNS-based discovery for Powersensor devices. + +This module provides PowersensorZeroconfDevices, which uses continuous mDNS +browsing to discover Powersensor plugs rather than the legacy one-shot UDP +broadcast in PowersensorLegacyDevices. + +The zeroconf package is an optional dependency. Install it via:: + + pip install powersensor-local[zeroconf] + +Architecture +------------ +PowersensorZeroconfDevices owns the full lifecycle: + +- It starts a zeroconf ServiceBrowser that calls back on plug add/update/remove. +- Plug removals are debounced (default 60 s) to absorb transient disappearances + such as reboots or DHCP renewals. +- The public add_plug() / remove_plug() methods are the seam between discovery + and the plug API lifecycle, and can also be called directly (e.g. from a + test or from HA's own mDNS handler) without needing a real zeroconf instance. + +Zeroconf instance ownership +---------------------------- +If ``zeroconf_instance`` is None (the default), the class creates and owns a +Zeroconf instance and closes it in stop(). If a Zeroconf instance is passed +in, the caller owns it and the class will not close it — this is the correct +pattern for Home Assistant, which maintains a single shared zeroconf instance. + +Thread safety +------------- +On zeroconf >= 0.32 (including Home Assistant's 0.149.x), ServiceBrowser +callbacks run inside the asyncio event loop rather than on a background thread. +On older zeroconf (e.g. 1.0.0, which used a select() thread), they run on a +background thread. + +``_Listener`` is therefore written to be safe in both models: + +- ``_extract`` uses ``ServiceInfo.load_from_cache()`` rather than + ``Zeroconf.get_service_info()``. On >= 0.32, calling get_service_info() + from inside a ServiceBrowser callback deadlocks — it blocks waiting for a + DNS reply that can never arrive because it holds the event loop. + load_from_cache() is synchronous, non-blocking, and explicitly threadsafe; + the ServiceBrowser guarantees the cache is populated before firing the + callback, so the record is always present. + +- ``_name_to_mac`` is populated in ``add_service`` / ``update_service`` and + consumed in ``remove_service``. On >= 0.32 all three callbacks run on the + same event loop thread, so no locking is needed. On older versions they run + on the same zeroconf background thread, so no locking is needed there either. + +- All work that touches PowersensorZeroconfDevices state crosses the thread + boundary via ``loop.call_soon_threadsafe``, making it safe regardless of + which threading model the installed zeroconf uses. +""" +from __future__ import annotations + +import asyncio +import logging +import sys +from pathlib import Path +from typing import Any + +PROJECT_ROOT = str(Path(__file__).parents[1]) +if PROJECT_ROOT not in sys.path: + sys.path.append(PROJECT_ROOT) + +from powersensor_local.devices import _PowersensorDevicesBase + +_LOGGER = logging.getLogger(__name__) + +_SERVICE_TYPE_UDP = '_powersensor._udp.local.' +_SERVICE_TYPE_TCP = '_powersensor._tcp.local.' + +_DEBOUNCE_DEFAULT_S = 60.0 + + +try: + import zeroconf as _zc + class PowersensorZeroconfDevices(_PowersensorDevicesBase): + """Discovers and manages Powersensor plugs via continuous mDNS browsing. + + Usage example (no existing zeroconf instance):: + + devices = PowersensorZeroconfDevices() + await devices.start(my_callback) + # plugs arrive via my_callback as device_found events + # ... + await devices.stop() + + Usage example (HA — pass the shared zeroconf instance):: + + zc = await homeassistant.components.zeroconf.async_get_instance(hass) + devices = PowersensorZeroconfDevices(zeroconf_instance=zc) + await devices.start(my_callback) + + The callback signature is the same as PowersensorLegacyDevices.start(): + ``async def callback(event: dict) -> None`` + + Lifecycle events emitted: + + **device_found** + ``{ event: "device_found", device_type: "plug"|"sensor", mac: "..." }`` + + **device_lost** + ``{ event: "device_lost", mac: "..." }`` + + Note: ``scan_complete`` is NOT emitted — mDNS browsing is continuous and + has no defined completion point. + """ + + def __init__( + self, + zeroconf_instance: Any = None, + service_type: str = _SERVICE_TYPE_UDP, + debounce_timeout: float = _DEBOUNCE_DEFAULT_S, + relay_now_relaying_for: bool = False, + ) -> None: + """Initialise. + + Parameters + ---------- + zeroconf_instance: + An existing ``Zeroconf`` instance to use. If None, one is created + and owned by this object (and closed in stop()). + service_type: + The mDNS service type to browse. Defaults to the UDP service + ``_powersensor._udp.local.``; pass ``_powersensor._tcp.local.`` + to use TCP transport instead. + debounce_timeout: + Seconds to wait after a ``remove_service`` callback before treating + the plug as truly gone. Defaults to 60 s. + relay_now_relaying_for: + See _PowersensorDevicesBase for documentation. + """ + super().__init__(relay_now_relaying_for=relay_now_relaying_for) + self._zc_instance = zeroconf_instance + self._zc_owned = zeroconf_instance is None # True → we close it in stop() + self._service_type = service_type + self._debounce_seconds = debounce_timeout + self._browser: Any = None + self._listener: _Listener | None = None + self._pending_removals: dict[str, asyncio.TimerHandle] = {} + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def start(self, async_event_cb) -> None: + """Register the event callback and start the mDNS service browser. + + The browser is event-driven; no polling loop is started here. + Plugs already present on the network will trigger add_service + callbacks shortly after the browser starts. + """ + self._event_cb = async_event_cb + self._start_expiry_timer() + + if self._zc_instance is None: + self._zc_instance = _zc.Zeroconf() + + loop = asyncio.get_running_loop() + self._listener = _Listener(self, loop) + self._browser = _zc.ServiceBrowser( + self._zc_instance, self._service_type, self._listener + ) + + async def stop(self) -> None: + """Stop browsing, cancel pending removals, and disconnect all plugs.""" + for handle in list(self._pending_removals.values()): + handle.cancel() + self._pending_removals.clear() + + if self._browser is not None: + self._browser.cancel() + self._browser = None + + self._listener = None + + if self._zc_owned and self._zc_instance is not None: + self._zc_instance.close() + self._zc_instance = None + + await super().stop() + + # ------------------------------------------------------------------ + # Public discovery seam — may also be called directly + # ------------------------------------------------------------------ + + def add_plug(self, mac: str, ip: str, port: int) -> None: + """Notify that a plug is present at the given address. + + Creates or reconnects the PlugApi for this plug. Safe to call + directly without a zeroconf browser (e.g. from tests, or from HA's + own mDNS handler). Cancels any pending debounced removal for this MAC. + + Must be called from the event loop thread. + """ + self._cancel_pending_removal(mac, source='add_plug') + asyncio.get_running_loop().create_task( + self._plug_discovered(mac, ip, port) + ) + + def remove_plug(self, mac: str) -> None: + """Schedule a debounced removal for the given plug. + + After ``debounce_timeout`` seconds with no re-announcement, the plug + API is disconnected and a ``device_lost`` event is emitted. + + Must be called from the event loop thread. + """ + self._schedule_removal(mac) + + # ------------------------------------------------------------------ + # Debounce helpers (event-loop side only) + # ------------------------------------------------------------------ + + def _schedule_removal(self, mac: str) -> None: + if mac in self._pending_removals: + return + loop = asyncio.get_running_loop() + handle = loop.call_later( + self._debounce_seconds, + self._on_debounce_expired, + mac, + ) + self._pending_removals[mac] = handle + _LOGGER.debug("Scheduled removal for %s in %.0f s", mac, self._debounce_seconds) + + def _on_debounce_expired(self, mac: str) -> None: + """Called by the event loop when the debounce timer fires.""" + self._pending_removals.pop(mac, None) + _LOGGER.info("Plug %s still absent after debounce — removing", mac) + asyncio.get_running_loop().create_task(self._plug_lost(mac)) + + def _cancel_pending_removal(self, mac: str, source: str) -> None: + handle = self._pending_removals.pop(mac, None) + if handle: + handle.cancel() + _LOGGER.debug("Cancelled pending removal for %s (%s)", mac, source) + + # ------------------------------------------------------------------ + # Called from _Listener (zeroconf thread → event loop via stored loop ref) + # ------------------------------------------------------------------ + + def _on_zc_add(self, mac: str, ip: str, port: int, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._cancel_pending_removal, mac, 'zeroconf add') + loop.call_soon_threadsafe( + lambda: loop.create_task(self._plug_discovered(mac, ip, port)) + ) + + def _on_zc_update(self, mac: str, ip: str, port: int, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._cancel_pending_removal, mac, 'zeroconf update') + loop.call_soon_threadsafe( + lambda: loop.create_task(self._plug_discovered(mac, ip, port)) + ) + + def _on_zc_remove(self, mac: str, loop: asyncio.AbstractEventLoop) -> None: + loop.call_soon_threadsafe(self._schedule_removal, mac) + + + class _Listener(_zc.ServiceListener): + """Zeroconf ServiceListener that forwards events to PowersensorZeroconfDevices. + + Internal implementation detail. All ServiceListener callbacks arrive on + the zeroconf event loop (>= 0.32) or background thread (< 0.32 / 1.0.0). + + Thread safety + ------------- + ``_name_to_mac`` is populated in ``add_service`` / ``update_service`` and + consumed in ``remove_service``. In both threading models all three + callbacks arrive on the same thread/loop, so no locking is required. + + The stored ``_loop`` reference is captured once at construction from the + running asyncio event loop, and is used (read-only) from the callback + context to schedule work back onto that loop via ``call_soon_threadsafe``. + """ + + def __init__(self, owner: PowersensorZeroconfDevices, loop: asyncio.AbstractEventLoop) -> None: + self._owner = owner + self._loop = loop + self._name_to_mac: dict[str, str] = {} + + def _extract(self, zc: Any, type_: str, name: str) -> tuple[str, str, int] | None: + """Return (mac, ip, port) from the zeroconf cache, or None. + + Uses ServiceInfo.load_from_cache() rather than + Zeroconf.get_service_info() for two reasons: + + 1. On zeroconf >= 0.32 (including HA's 0.149.x), ServiceBrowser + callbacks run inside the asyncio event loop. Calling + get_service_info() from there blocks waiting for a DNS reply + that can never be processed because the event loop is occupied — + it deadlocks, times out after 3 s, and silently returns None, + causing the device to be dropped. + + 2. load_from_cache() is synchronous, non-blocking, and explicitly + documented as threadsafe. The ServiceBrowser guarantees the + cache is populated before firing add_service / update_service, + so the record is always present when this method is called. + """ + info = _zc.ServiceInfo(type_, name) + if not info.load_from_cache(zc): + _LOGGER.warning( + "No cache entry for %s — device will appear on next mDNS announcement", + name, + ) + return None + + addresses = info.parsed_addresses() + if not addresses: + _LOGGER.warning("No addresses in zeroconf cache record for %s", name) + return None + + try: + raw_id = info.properties[b'id'] + except KeyError: + _LOGGER.error("Missing 'id' property in zeroconf record for %s", name) + return None + + if raw_id is None: + _LOGGER.error("'id' property in zeroconf record for %s has no value", name) + return None + + if info.port is None: + _LOGGER.error("No port in zeroconf record for %s", name) + return None + + return raw_id.decode('utf-8'), addresses[0], info.port + + def add_service(self, zc: Any, type_: str, name: str) -> None: + result = self._extract(zc, type_, name) + if result is None: + _LOGGER.warning( + "add_service: no info available for %s — will retry on next announcement", + name, + ) + return + mac, ip, port = result + self._name_to_mac[name] = mac + self._owner._on_zc_add(mac, ip, port, self._loop) + + def update_service(self, zc: Any, type_: str, name: str) -> None: + result = self._extract(zc, type_, name) + if result is None: + _LOGGER.warning( + "update_service: no info available for %s — will retry on next announcement", + name, + ) + return + mac, ip, port = result + self._name_to_mac[name] = mac + self._owner._on_zc_update(mac, ip, port, self._loop) + + def remove_service(self, zc: Any, type_: str, name: str) -> None: + mac = self._name_to_mac.pop(name, None) + if mac is None: + _LOGGER.warning( + "remove_service for %s: MAC not in cache — removal ignored", name + ) + return + self._owner._on_zc_remove(mac, self._loop) + +except ImportError as exc: + _zeroconf_import_error = exc + + class PowersensorZeroconfDevices(_PowersensorDevicesBase): # type: ignore[no-redef] + """Stub raised when the optional zeroconf package is not installed. + + To use mDNS-based discovery, install the zeroconf extra:: + + pip install powersensor-local[zeroconf] + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__() + raise ImportError( + "The 'zeroconf' package is required for PowersensorZeroconfDevices. " + "Install it with: pip install powersensor-local[zeroconf]" + ) from _zeroconf_import_error