Typed mqtt.py

This commit is contained in:
Edgar P. Burkhart 2024-12-08 11:51:05 +01:00
parent 119feee5e5
commit 9f0c6ba3db
Signed by: edpibu
GPG key ID: 9833D3C5A25BD227

View file

@ -1,5 +1,8 @@
import json import json
import logging import logging
import sys
from collections.abc import Callable
from typing import Any
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
@ -14,7 +17,7 @@ class HAClient:
self, self,
entity: str, entity: str,
secondary_entities: list[str] = [], secondary_entities: list[str] = [],
mqtt_config: dict = dict(), mqtt_config: dict[str, str] = dict(),
) -> None: ) -> None:
self.entity = entity self.entity = entity
self.secondary_entities = secondary_entities self.secondary_entities = secondary_entities
@ -54,34 +57,48 @@ class HAClient:
host = self.config.get("host") host = self.config.get("host")
port = self.config.get("port", 1883) port = self.config.get("port", 1883)
logger.debug(f"Connecting to <{host}> on port <{port}>") logger.debug(f"Connecting to <{host}> on port <{port}>.")
self.client.connect(host, port) self.client.connect(host, port)
self.subscribe(entity_topic(self.entity), self.state_update) self.subscribe(entity_topic(self.entity), self.state_update)
for entity in self.secondary_entities: self.subscribe(
self.subscribe(entity_topic(entity, "state"), self.secondary_state_update) [entity_topic(entity) for entity in self.secondary_entities],
self.secondary_state_update,
)
self.publish("homeassistant/device/oin/config", self.ha_options, retain=True) self.publish("homeassistant/device/oin/config", self.ha_options, retain=True)
self.client.publish(self.availability_topic, "online", retain=True) self.client.publish(self.availability_topic, "online", retain=True)
def publish(self, topic, data, **kwargs): def publish(self, topic: str, data: Any, **kwargs) -> mqtt.MQTTMessageInfo:
logger.debug(f"Sending message on topic <{topic}>: {json.dumps(data)}") logger.debug(f"Sending message on topic <{topic}>: {json.dumps(data)}")
self.client.publish(topic, json.dumps(data), **kwargs) return self.client.publish(topic, json.dumps(data), **kwargs)
def subscribe(self, topic, callback): def subscribe(self, topic: str | list[str], callback: Callable) -> None:
logger.debug(f"Subscribe to <{topic}>") logger.debug(f"Subscribing to <{topic}>.")
self.client.subscribe(topic)
self.client.message_callback_add(topic, callback)
def unsubscribe(self, topic): match topic:
logger.debug(f"Unsubscribe from <{topic}>") case str():
self.client.unsubscribe(topic) self.client.message_callback_add(topic, callback)
code, _ = self.client.subscribe(topic)
case list():
for top in topic:
self.client.message_callback_add(top, callback)
code, _ = self.client.subscribe([(top, 0) for top in topic])
def loop(self): if code != 0:
logger.info("Starting MQTT client loop") logger.error(f"Failed subscribing to topic <{topic}> with code <{code}>.")
self.client.loop_forever() sys.exit(1)
def state_update(self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage): def loop(self) -> mqtt.MQTTErrorCode:
logger.info("Starting MQTT client loop.")
code = self.client.loop_forever(retry_first_connection=True)
if code != 0:
logger.error("MQTT client loop failed with code <{code}>.")
def state_update(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) -> None:
logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.") logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.")
subtopic = message.topic.rsplit("/", maxsplit=1)[1] subtopic = message.topic.rsplit("/", maxsplit=1)[1]
@ -109,8 +126,8 @@ class HAClient:
self.selector.switch = False self.selector.switch = False
def secondary_state_update( def secondary_state_update(
self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
): ) -> None:
logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.") logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.")
_, grp, ent, subtopic = message.topic.split("/") _, grp, ent, subtopic = message.topic.split("/")
@ -119,14 +136,14 @@ class HAClient:
if subtopic == "state": if subtopic == "state":
self.screen.secondary |= {idx: message.payload.decode()} self.screen.secondary |= {idx: message.payload.decode()}
def send_data(self, data): def send_data(self, data: Any) -> mqtt.MQTTMessageInfo:
self.publish(self.state_topic, data) return self.publish(self.state_topic, data)
def parse(message): def parse(message: mqtt.MQTTMessage) -> Any:
return json.loads(message.payload.decode()) return json.loads(message.payload.decode())
def entity_topic(entity, subtopic="#"): def entity_topic(entity: str, subtopic: str = "#") -> str:
topic = entity.replace(".", "/") topic = entity.replace(".", "/")
return f"homeassistant/{topic}/{subtopic}" return f"homeassistant/{topic}/{subtopic}"