Skip to content

Commit

Permalink
Add broadcast listener to receive real-time updates (#9)
Browse files Browse the repository at this point in the history
* use fail counter to avoid unavailable state

* move api to module

* split api into multiple files

* fix some issues

* broadcast stuff

* config option

* destroy coordinator on unload

* add "last" sensor to broadcast measurements
  • Loading branch information
siku2 committed Jun 17, 2021
1 parent d0ec08f commit f9e77b7
Show file tree
Hide file tree
Showing 25 changed files with 1,173 additions and 647 deletions.
105 changes: 87 additions & 18 deletions custom_components/weatherlink/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
from datetime import timedelta
from typing import Optional

from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
Expand All @@ -9,7 +11,9 @@
DataUpdateCoordinator,
)

from .api import CurrentConditions, WeatherLinkSession
from .api import CurrentConditions, WeatherLinkBroadcast, WeatherLinkRest
from .api.conditions import DeviceType
from .config_flow import get_listen_to_broadcasts
from .const import DOMAIN, PLATFORMS
from .units import UnitConfig, get_unit_config

Expand All @@ -35,39 +39,100 @@ def get_update_interval(entry: ConfigEntry) -> timedelta:
return timedelta(seconds=seconds)


class WeatherLinkCoordinator(DataUpdateCoordinator):
session: WeatherLinkSession
MAX_FAIL_COUNTER: int = 3
FAIL_TIMEOUT: float = 3.0


class WeatherLinkCoordinator(DataUpdateCoordinator[CurrentConditions]):
session: WeatherLinkRest
units: UnitConfig
current_conditions: CurrentConditions

_device_type: DeviceType
device_did: str
device_name: str
device_model_name: str

__broadcast_task: Optional[asyncio.Task] = None

def __set_broadcast_task_state(self, on: bool) -> None:
if self.__broadcast_task:
logger.debug("stopping current broadcast task")
self.__broadcast_task.cancel()

if on:
logger.info("starting live broadcast listener")
self.__broadcast_task = asyncio.create_task(
self.__broadcast_loop(), name="broadcast listener loop"
)
else:
self.__broadcast_task = None

async def __update_config(self, hass: HomeAssistant, entry: ConfigEntry):
self.units = get_unit_config(hass, entry)
self.update_interval = get_update_interval(entry)

async def __initalize(
self, session: WeatherLinkSession, entry: ConfigEntry
) -> None:
self.__set_broadcast_task_state(
self._device_type.supports_real_time_api()
and get_listen_to_broadcasts(entry)
)

async def __initalize(self, session: WeatherLinkRest, entry: ConfigEntry) -> None:
self.session = session
entry.add_update_listener(self.__update_config)
await self.__update_config(self.hass, entry)

self.update_method = self.__fetch_data
await self.__fetch_data()

conditions = self.current_conditions
conditions = self.data = await self.__fetch_data()
if conditions is None:
raise RuntimeError(f"failed to get conditions from {session.base_url!r}")
self._device_type = conditions.determine_device_type()
self.device_did = conditions.did
self.device_model_name = conditions.determine_device_type().value
self.device_model_name = self._device_type.value
self.device_name = conditions.determine_device_name()

async def __fetch_data(self) -> None:
self.current_conditions = await self.session.current_conditions()
await self.__update_config(self.hass, entry)

async def __fetch_data(self) -> CurrentConditions:
for i in range(MAX_FAIL_COUNTER):
try:
conditions = await self.session.current_conditions()
except Exception as exc:
logger.warning(
"failed to get current conditions, error %s / %s",
i + 1,
MAX_FAIL_COUNTER,
exc_info=exc,
)
await asyncio.sleep(FAIL_TIMEOUT)
else:
break
else:
conditions = self.data

return conditions

async def __broadcast_loop_once(self, broadcast: WeatherLinkBroadcast) -> None:
logger.debug("received broadcast conditions")
conditions = await broadcast.read()
self.data.update_from(conditions)
self.async_set_updated_data(self.data)

async def __broadcast_loop(self) -> None:
try:
broadcast = await WeatherLinkBroadcast.start(self.session)
except Exception:
logger.exception("failed to start broadcast")
return
try:
while True:
try:
await self.__broadcast_loop_once(broadcast)
except Exception:
logger.exception("failed to read broadcast")
finally:
await broadcast.stop()

@classmethod
async def build(cls, hass, session: WeatherLinkSession, entry: ConfigEntry):
async def build(cls, hass, session: WeatherLinkRest, entry: ConfigEntry):
coordinator = cls(
hass,
logger,
Expand All @@ -78,13 +143,16 @@ async def build(cls, hass, session: WeatherLinkSession, entry: ConfigEntry):

return coordinator

async def destroy(self) -> None:
self.__set_broadcast_task_state(False)


async def setup_coordinator(hass, entry: ConfigEntry):
host = entry.data["host"]

coordinator = await WeatherLinkCoordinator.build(
hass,
WeatherLinkSession(aiohttp_client.async_get_clientsession(hass), host),
WeatherLinkRest(aiohttp_client.async_get_clientsession(hass), host),
entry,
)
hass.data[DOMAIN][entry.entry_id] = coordinator
Expand All @@ -105,7 +173,8 @@ async def async_unload_entry(hass, entry):
for platform in PLATFORMS:
await hass.config_entries.async_forward_entry_unload(entry, platform)

del hass.data[DOMAIN][entry.entry_id]
coordinator: WeatherLinkCoordinator = hass.data[DOMAIN].pop(entry.entry_id)
await coordinator.destroy()

return True

Expand All @@ -118,7 +187,7 @@ def __init__(self, coordinator: WeatherLinkCoordinator) -> None:

@property
def _conditions(self) -> CurrentConditions:
return self.coordinator.current_conditions
return self.coordinator.data

@property
def units(self) -> UnitConfig:
Expand Down
4 changes: 2 additions & 2 deletions custom_components/weatherlink/air_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
from homeassistant.components.air_quality import AirQualityEntity

from . import WeatherLinkCoordinator, WeatherLinkEntity
from .api import AirQualityCondition
from .api.conditions import AirQualityCondition
from .const import DOMAIN

logger = logging.getLogger(__name__)


async def async_setup_entry(hass, entry, async_add_entities):
c: WeatherLinkCoordinator = hass.data[DOMAIN][entry.entry_id]
if AirQualityCondition in c.current_conditions:
if AirQualityCondition in c.data:
async_add_entities([AirQuality(c)])

return True
Expand Down
Loading

0 comments on commit f9e77b7

Please sign in to comment.