oin/oin_thermostat/mqtt.py

203 lines
6.6 KiB
Python

import json
import logging
from collections.abc import Callable
from typing import Any, TypedDict
import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
from .screen import Screen
from .select import Selector
logger = logging.getLogger(__name__)
class HAOptions(TypedDict):
dev: dict[str, str]
o: dict[str, str]
availability_topic: str
state_topic: str
cmps: dict[str, dict[str, str]]
class HAClient:
def __init__(
self,
entity: str,
secondary_entities: list[str] = [],
mqtt_config: dict[str, str] = dict(),
) -> None:
self.entity = entity
self.secondary_entities = secondary_entities
self.config = mqtt_config
self.state_topic = "oin/state"
self.availability_topic = "oin/availability"
self.client = mqtt.Client(CallbackAPIVersion.VERSION2)
username = self.config.get("username", None)
logger.debug(f"Setting up MQTT with user <{username}>")
self.client.username_pw_set(
username=username,
password=self.config.get("password", None),
)
self.screen = Screen()
self.selector = Selector(self.send_data)
@property
def ha_options(self) -> HAOptions:
return {
"dev": {
"ids": "oin",
"name": "Oin",
},
"o": {
"name": "Oin",
},
"availability_topic": self.availability_topic,
"state_topic": self.state_topic,
"cmps": self.selector.ha_options,
}
def connect(self) -> int:
self.client.will_set(self.availability_topic, "offline", retain=True)
host = self.config.get("host")
port = self.config.get("port", 1883)
if host is None:
logger.error("Host not found in config.")
logger.error(f"\t{self.config}")
return 1
if not isinstance(port, int):
logger.warning(f"Invalid port config : <{port}> ; using port 1883.")
port = 1883
logger.debug(f"Connecting to <{host}> on port <{port}>.")
code = self.client.connect(host, port)
if code != 0:
logger.error(f"Could not connect to host <{host}> on port <{port}>.")
return 1
code = self.subscribe(entity_topic(self.entity), self.state_update)
if code != 0:
return 1
code = self.subscribe(
[entity_topic(entity) for entity in self.secondary_entities],
self.secondary_state_update,
)
if code != 0:
return 1
m_info = self.publish_json(
"homeassistant/device/oin/config", self.ha_options, retain=True
)
m_info.wait_for_publish(60)
if not m_info.is_published():
logger.error("Config message timed out")
return 1
m_info = self.publish(self.availability_topic, "online", retain=True)
m_info.wait_for_publish(60)
if not m_info.is_published():
logger.error("Availability message timed out")
return 1
logger.info("Connected to Home Assistant.")
return 0
def publish(
self, topic: str, data: str, retain: bool = False
) -> mqtt.MQTTMessageInfo:
logger.debug(f"Sending message on topic <{topic}>.")
return self.client.publish(topic, data, retain=retain)
def publish_json(
self, topic: str, data: Any, retain: bool = False
) -> mqtt.MQTTMessageInfo:
return self.publish(topic, json.dumps(data), retain)
def subscribe(
self,
topic: str | list[str],
callback: Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None],
) -> MQTTErrorCode:
logger.debug(f"Subscribing to <{topic}>.")
match topic:
case str():
self.client.message_callback_add(topic, callback)
code, _ = self.client.subscribe(topic)
case list():
for top in topic:
self.client.message_callback_add(top, callback)
code, _ = self.client.subscribe([(top, 0) for top in topic])
if code != 0:
logger.error(f"Failed subscribing to topic <{topic}> with code <{code}>.")
return code
def loop(self) -> MQTTErrorCode:
logger.info("Starting MQTT client loop.")
code = self.client.loop_forever(retry_first_connection=True)
if code != 0:
logger.error("MQTT client loop failed with code <{code}>.")
else:
logger.info("MQTT client loop successfully exited")
return code
def state_update(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) -> None:
data = message.payload.decode()
logger.debug(f"Message received on topic <{message.topic}>: {data}.")
subtopic = message.topic.rsplit("/", maxsplit=1)[1]
match subtopic:
case "current_temperature":
self.screen.value = json.loads(data)
case "temperature":
if (value := json.loads(data)) != self.selector.temperature:
self.screen.tmp_value = value
self.selector.temperature = value
case "hvac_action":
self.screen.mode = json.loads(data)
case "preset_modes":
if (value := json.loads(data)) != self.selector.preset_modes:
self.selector.preset_modes = value
case "preset_mode":
if (value := json.loads(data)) != self.selector.mode:
self.selector.mode = value
case "state":
match data:
case "heat":
self.selector.switch = True
case "off":
self.selector.switch = False
case other:
logger.warning(f"Unknown state received: <{other}>.")
case _:
pass
def secondary_state_update(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) -> None:
data = message.payload.decode()
logger.debug(f"Message received on topic <{message.topic}>: {data}.")
_, grp, ent, subtopic = message.topic.split("/")
idx = self.secondary_entities.index(f"{grp}.{ent}")
if subtopic == "state":
self.screen.secondary |= {idx: data}
def send_data(self, data: Any) -> mqtt.MQTTMessageInfo:
return self.publish_json(self.state_topic, data)
def entity_topic(entity: str, subtopic: str = "#") -> str:
topic = entity.replace(".", "/")
return f"homeassistant/{topic}/{subtopic}"