import json import logging from collections.abc import Callable from typing import Any, TypedDict import paho.mqtt.client as mqtt from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode from .screen import Screen from .select import Selector logger = logging.getLogger(__name__) class HAOptions(TypedDict): dev: dict[str, str] o: dict[str, str] availability_topic: str state_topic: str cmps: dict[str, dict[str, str]] class HAClient: def __init__( self, entity: str, secondary_entities: list[str] = [], mqtt_config: dict[str, str] = dict(), ) -> None: self.entity = entity self.secondary_entities = secondary_entities self.config = mqtt_config self.state_topic = "oin/state" self.availability_topic = "oin/availability" self.client = mqtt.Client(CallbackAPIVersion.VERSION2) username = self.config.get("username", None) logger.debug(f"Setting up MQTT with user <{username}>") self.client.username_pw_set( username=username, password=self.config.get("password", None), ) self.screen = Screen() self.selector = Selector(self.send_data) @property def ha_options(self) -> HAOptions: return { "dev": { "ids": "oin", "name": "Oin", }, "o": { "name": "Oin", }, "availability_topic": self.availability_topic, "state_topic": self.state_topic, "cmps": self.selector.ha_options, } def connect(self) -> int: self.client.will_set(self.availability_topic, "offline", retain=True) host = self.config.get("host") port = self.config.get("port", 1883) if host is None: logger.error("Host not found in config.") logger.error(f"\t{self.config}") return 1 if not isinstance(port, int): logger.warning(f"Invalid port config : <{port}> ; using port 1883.") port = 1883 logger.debug(f"Connecting to <{host}> on port <{port}>.") code = self.client.connect(host, port) if code != 0: logger.error(f"Could not connect to host <{host}> on port <{port}>.") return 1 code = self.subscribe(entity_topic(self.entity), self.state_update) if code != 0: return 1 code = self.subscribe( [entity_topic(entity) for entity in self.secondary_entities], self.secondary_state_update, ) if code != 0: return 1 m_info = self.publish_json( "homeassistant/device/oin/config", self.ha_options, retain=True ) m_info.wait_for_publish(60) if not m_info.is_published(): logger.error("Config message timed out") return 1 m_info = self.publish(self.availability_topic, "online", retain=True) m_info.wait_for_publish(60) if not m_info.is_published(): logger.error("Availability message timed out") return 1 return 0 def publish( self, topic: str, data: str, retain: bool = False ) -> mqtt.MQTTMessageInfo: logger.debug(f"Sending message on topic <{topic}>.") return self.client.publish(topic, data, retain=retain) def publish_json( self, topic: str, data: Any, retain: bool = False ) -> mqtt.MQTTMessageInfo: return self.publish(topic, json.dumps(data), retain) def subscribe( self, topic: str | list[str], callback: Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None], ) -> MQTTErrorCode: logger.debug(f"Subscribing to <{topic}>.") match topic: case str(): self.client.message_callback_add(topic, callback) code, _ = self.client.subscribe(topic) case list(): for top in topic: self.client.message_callback_add(top, callback) code, _ = self.client.subscribe([(top, 0) for top in topic]) if code != 0: logger.error(f"Failed subscribing to topic <{topic}> with code <{code}>.") return code def loop(self) -> MQTTErrorCode: logger.info("Starting MQTT client loop.") code = self.client.loop_forever(retry_first_connection=True) if code != 0: logger.error("MQTT client loop failed with code <{code}>.") return code def state_update( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: data = message.payload.decode() logger.debug(f"Message received on topic <{message.topic}>: {data}.") subtopic = message.topic.rsplit("/", maxsplit=1)[1] match subtopic: case "current_temperature": self.screen.value = json.loads(data) case "temperature": if (value := json.loads(data)) != self.selector.temperature: self.screen.tmp_value = value self.selector.temperature = value case "hvac_action": self.screen.mode = json.loads(data) case "preset_modes": if (value := json.loads(data)) != self.selector.preset_modes: self.selector.preset_modes = value case "preset_mode": if (value := json.loads(data)) != self.selector.mode: self.selector.mode = value case "state": match data: case "heat": self.selector.switch = True case "off": self.selector.switch = False case other: logger.warning(f"Unknown state received: <{other}>.") case _: pass def secondary_state_update( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: data = message.payload.decode() logger.debug(f"Message received on topic <{message.topic}>: {data}.") _, grp, ent, subtopic = message.topic.split("/") idx = self.secondary_entities.index(f"{grp}.{ent}") if subtopic == "state": self.screen.secondary |= {idx: data} def send_data(self, data: Any) -> mqtt.MQTTMessageInfo: return self.publish_json(self.state_topic, data) def entity_topic(entity: str, subtopic: str = "#") -> str: topic = entity.replace(".", "/") return f"homeassistant/{topic}/{subtopic}"