import json import logging from collections.abc import Callable from typing import Any import paho.mqtt.client as mqtt from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode from .screen import Screen from .select import Selector logger = logging.getLogger(__name__) class HAClient: def __init__( self, entity: str, secondary_entities: list[str] = [], mqtt_config: dict[str, str] = dict(), ) -> None: self.entity = entity self.secondary_entities = secondary_entities self.config = mqtt_config self.state_topic = "oin/state" self.availability_topic = "oin/availability" self.client = mqtt.Client(CallbackAPIVersion.VERSION2) username = self.config.get("username", None) logger.debug(f"Setting up MQTT with user <{username}>") self.client.username_pw_set( username=username, password=self.config.get("password", None), ) self.screen = Screen() self.selector = Selector(self.send_data) @property def ha_options(self) -> dict[str, Any]: return { "dev": { "ids": "oin", "name": "Oin", }, "o": { "name": "Oin", }, "availability_topic": self.availability_topic, "state_topic": self.state_topic, "cmps": self.selector.ha_options, } def connect(self) -> int: self.client.will_set(self.availability_topic, "offline", retain=True) host = self.config.get("host") port = self.config.get("port", 1883) if host is None: logger.error("Host not found in config.") logger.error(f"\t{self.config}") return 1 if not isinstance(port, int): logger.warning(f"Invalid port config : <{port}> ; using port 1883.") port = 1883 logger.debug(f"Connecting to <{host}> on port <{port}>.") code = self.client.connect(host, port) if code != 0: logger.error(f"Could not connect to host <{host}> on port <{port}>.") return 1 code = self.subscribe(entity_topic(self.entity), self.state_update) if code != 0: return 1 code = self.subscribe( [entity_topic(entity) for entity in self.secondary_entities], self.secondary_state_update, ) if code != 0: return 1 m_info = self.publish_json( "homeassistant/device/oin/config", self.ha_options, retain=True ) m_info.wait_for_publish(60) if not m_info.is_published(): logger.error("Config message timed out") return 1 m_info = self.publish(self.availability_topic, "online", retain=True) m_info.wait_for_publish(60) if not m_info.is_published(): logger.error("Availability message timed out") return 1 return 0 def publish( self, topic: str, data: str, retain: bool = False ) -> mqtt.MQTTMessageInfo: logger.debug(f"Sending message on topic <{topic}>.") return self.client.publish(topic, data, retain=retain) def publish_json( self, topic: str, data: Any, retain: bool = False ) -> mqtt.MQTTMessageInfo: return self.publish(topic, json.dumps(data), retain) def subscribe( self, topic: str | list[str], callback: Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None], ) -> MQTTErrorCode: logger.debug(f"Subscribing to <{topic}>.") match topic: case str(): self.client.message_callback_add(topic, callback) code, _ = self.client.subscribe(topic) case list(): for top in topic: self.client.message_callback_add(top, callback) code, _ = self.client.subscribe([(top, 0) for top in topic]) if code != 0: logger.error(f"Failed subscribing to topic <{topic}> with code <{code}>.") return code def loop(self) -> MQTTErrorCode: logger.info("Starting MQTT client loop.") code = self.client.loop_forever(retry_first_connection=True) if code != 0: logger.error("MQTT client loop failed with code <{code}>.") return code def state_update( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: data = message.payload.decode() logger.debug(f"Message received on topic <{message.topic}>: {data}.") subtopic = message.topic.rsplit("/", maxsplit=1)[1] match subtopic: case "current_temperature": self.screen.value = json.loads(data) case "temperature": if (value := json.loads(data)) != self.selector.temperature: self.screen.tmp_value = value self.selector.temperature = value case "hvac_action": self.screen.mode = json.loads(data) case "preset_modes": if (value := json.loads(data)) != self.selector.preset_modes: self.selector.preset_modes = value case "preset_mode": if (value := json.loads(data)) != self.selector.mode: self.selector.mode = value case "state": match data: case "heat": self.selector.switch = True case "off": self.selector.switch = False case other: logger.warning(f"Unknown state received: <{other}>.") case _: pass def secondary_state_update( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: data = message.payload.decode() logger.debug(f"Message received on topic <{message.topic}>: {data}.") _, grp, ent, subtopic = message.topic.split("/") idx = self.secondary_entities.index(f"{grp}.{ent}") if subtopic == "state": self.screen.secondary |= {idx: data} def send_data(self, data: Any) -> mqtt.MQTTMessageInfo: return self.publish_json(self.state_topic, data) def entity_topic(entity: str, subtopic: str = "#") -> str: topic = entity.replace(".", "/") return f"homeassistant/{topic}/{subtopic}"