diff --git a/.gitignore b/.gitignore index e73ce05..1dab207 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ __pycache__ /env +/out diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0cb2f3c..5f7ffe1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,7 +13,6 @@ repos: hooks: - id: flake8 args: ["--max-line-length=88", "--extend-ignore=E203"] - exclude: "lyceedupaysdesoule/settings/|migrations" - repo: https://github.com/pre-commit/mirrors-mypy rev: "v1.13.0" diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..8a26835 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,4 @@ +[mypy] +python_executable = ./env/bin/python +strict = True +pretty = True diff --git a/oin_thermostat/__main__.py b/oin_thermostat/__main__.py index b64e819..4e8e737 100644 --- a/oin_thermostat/__main__.py +++ b/oin_thermostat/__main__.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) config_path = "config.toml" -def main(): +def main() -> int: with open(config_path, "rb") as config_file: config = tomllib.load(config_file) @@ -18,7 +18,7 @@ def main(): if ha_config is None: logger.error(f"Missing home assistant config in <{config_path}>") logger.error(f"\t{config}") - sys.exit(1) + return 1 client = HAClient( ha_config.get("entity"), @@ -26,10 +26,16 @@ def main(): mqtt_config=ha_config.get("mqtt"), ) - client.connect() + code = client.connect() + if code != 0: + return 1 - client.loop() + code = client.loop() + if code != 0: + return 1 + + return 0 if __name__ == "__main__": - main() + sys.exit(main()) diff --git a/oin_thermostat/mqtt.py b/oin_thermostat/mqtt.py index 934e002..b5fa0e5 100644 --- a/oin_thermostat/mqtt.py +++ b/oin_thermostat/mqtt.py @@ -1,10 +1,10 @@ import json import logging -import sys from collections.abc import Callable from typing import Any import paho.mqtt.client as mqtt +from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode from .screen import Screen from .select import Selector @@ -26,7 +26,7 @@ class HAClient: self.state_topic = "oin/state" self.availability_topic = "oin/availability" - self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + self.client = mqtt.Client(CallbackAPIVersion.VERSION2) username = self.config.get("username", None) logger.debug(f"Setting up MQTT with user <{username}>") self.client.username_pw_set( @@ -52,28 +52,68 @@ class HAClient: "cmps": self.selector.ha_options, } - def connect(self) -> None: + def connect(self) -> int: self.client.will_set(self.availability_topic, "offline", retain=True) host = self.config.get("host") port = self.config.get("port", 1883) - logger.debug(f"Connecting to <{host}> on port <{port}>.") - self.client.connect(host, port) - self.subscribe(entity_topic(self.entity), self.state_update) - self.subscribe( + if host is None: + logger.error("Host not found in config.") + logger.error(f"\t{self.config}") + return 1 + if not isinstance(port, int): + logger.warning(f"Invalid port config : <{port}> ; using port 1883.") + port = 1883 + + logger.debug(f"Connecting to <{host}> on port <{port}>.") + code = self.client.connect(host, port) + if code != 0: + logger.error(f"Could not connect to host <{host}> on port <{port}>.") + return 1 + + code = self.subscribe(entity_topic(self.entity), self.state_update) + if code != 0: + return 1 + code = self.subscribe( [entity_topic(entity) for entity in self.secondary_entities], self.secondary_state_update, ) + if code != 0: + return 1 - self.publish("homeassistant/device/oin/config", self.ha_options, retain=True) - self.client.publish(self.availability_topic, "online", retain=True) + m_info = self.publish_json( + "homeassistant/device/oin/config", self.ha_options, retain=True + ) + m_info.wait_for_publish(60) + if not m_info.is_published(): + logger.error("Config message timed out") + return 1 - def publish(self, topic: str, data: Any, **kwargs) -> mqtt.MQTTMessageInfo: - logger.debug(f"Sending message on topic <{topic}>: {json.dumps(data)}") - return self.client.publish(topic, json.dumps(data), **kwargs) + m_info = self.publish(self.availability_topic, "online", retain=True) + m_info.wait_for_publish(60) + if not m_info.is_published(): + logger.error("Availability message timed out") + return 1 - def subscribe(self, topic: str | list[str], callback: Callable) -> None: + return 0 + + def publish( + self, topic: str, data: str, retain: bool = False + ) -> mqtt.MQTTMessageInfo: + logger.debug(f"Sending message on topic <{topic}>.") + return self.client.publish(topic, data, retain=retain) + + def publish_json( + self, topic: str, data: Any, retain: bool = False + ) -> mqtt.MQTTMessageInfo: + return self.publish(topic, json.dumps(data), retain) + + def subscribe( + self, + topic: str | list[str], + callback: Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None], + ) -> MQTTErrorCode: logger.debug(f"Subscribing to <{topic}>.") match topic: @@ -87,39 +127,41 @@ class HAClient: if code != 0: logger.error(f"Failed subscribing to topic <{topic}> with code <{code}>.") - sys.exit(1) + return code - def loop(self) -> mqtt.MQTTErrorCode: + def loop(self) -> MQTTErrorCode: logger.info("Starting MQTT client loop.") code = self.client.loop_forever(retry_first_connection=True) if code != 0: logger.error("MQTT client loop failed with code <{code}>.") + return code def state_update( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: - logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.") + data = message.payload.decode() + logger.debug(f"Message received on topic <{message.topic}>: {data}.") subtopic = message.topic.rsplit("/", maxsplit=1)[1] match subtopic: case "current_temperature": - self.screen.value = parse(message) + self.screen.value = json.loads(data) case "temperature": - if (value := parse(message)) != self.selector.temperature: + if (value := json.loads(data)) != self.selector.temperature: self.screen.tmp_value = value self.selector.temperature = value case "hvac_action": - self.screen.mode = parse(message) + self.screen.mode = json.loads(data) case "preset_modes": - if (value := parse(message)) != self.selector.preset_modes: + if (value := json.loads(data)) != self.selector.preset_modes: self.selector.preset_modes = value case "preset_mode": - if (value := parse(message)) != self.selector.mode: + if (value := json.loads(data)) != self.selector.mode: self.selector.mode = value case "state": - match message.payload.decode(): + match data: case "heat": self.selector.switch = True case "off": @@ -128,20 +170,17 @@ class HAClient: def secondary_state_update( self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage ) -> None: - logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.") + data = message.payload.decode() + logger.debug(f"Message received on topic <{message.topic}>: {data}.") _, grp, ent, subtopic = message.topic.split("/") idx = self.secondary_entities.index(f"{grp}.{ent}") if subtopic == "state": - self.screen.secondary |= {idx: message.payload.decode()} + self.screen.secondary |= {idx: data} def send_data(self, data: Any) -> mqtt.MQTTMessageInfo: - return self.publish(self.state_topic, data) - - -def parse(message: mqtt.MQTTMessage) -> Any: - return json.loads(message.payload.decode()) + return self.publish_json(self.state_topic, data) def entity_topic(entity: str, subtopic: str = "#") -> str: diff --git a/oin_thermostat/screen.py b/oin_thermostat/screen.py index e28b143..a9bf6ee 100644 --- a/oin_thermostat/screen.py +++ b/oin_thermostat/screen.py @@ -3,36 +3,36 @@ import math from threading import Timer import bdfparser -from sense_hat import SenseHat +from sense_hat import InputEvent, SenseHat logger = logging.getLogger(__name__) COLORS = { - "Bleu": [0, 0, 255], - "Blanc": [255, 255, 255], - "Rouge": [255, 0, 0], - "Verte": [0, 127, 31], - "Jaune": [255, 255, 0], - "heat": [255, 0, 0], - "heating": [255, 0, 0], - "idle": [127, 0, 255], - "off": [127, 127, 127], - "on_setting": [255, 255, 0], - "off_setting": [255, 255, 255], - None: [0, 0, 0], + "Bleu": (0, 0, 255), + "Blanc": (255, 255, 255), + "Rouge": (255, 0, 0), + "Verte": (0, 127, 31), + "Jaune": (255, 255, 0), + "heat": (255, 0, 0), + "heating": (255, 0, 0), + "idle": (127, 0, 255), + "off": (127, 127, 127), + "on_setting": (255, 255, 0), + "off_setting": (255, 255, 255), + None: (0, 0, 0), } class Screen: - def __init__(self): + def __init__(self) -> None: self.sense = SenseHat() self._value = "" self._tmp = False - self._tmp_value = None - self._mode = None + self._tmp_value = "" + self._mode = "" self.font = bdfparser.Font("src/tom-thumb.bdf") - self._secondary = dict() - self._secondary_pixels = [[0, 0, 0]] * 8 + self._secondary: dict[int, str] = dict() + self._secondary_pixels: list[tuple[int, int, int]] = [(0, 0, 0)] * 8 self.timer = Timer(0, self.set_pixels) @@ -40,11 +40,11 @@ class Screen: self.sense.stick.direction_middle = self.stick_click @property - def value(self): + def value(self) -> None | str: return self._value @value.setter - def value(self, value): + def value(self, value: float) -> None: logger.debug(f"Updated value: <{value}>") self._value = format_value(value) @@ -52,32 +52,32 @@ class Screen: self.set_pixels() @property - def color(self): - return COLORS.get(self.mode, [0, 0, 0]) + def color(self) -> tuple[int, int, int]: + return COLORS.get(self.mode, (0, 0, 0)) @property - def mode(self): + def mode(self) -> str: return self._mode @mode.setter - def mode(self, value): + def mode(self, value: str) -> None: self._mode = value if not self._tmp: self.set_pixels() @property - def tmp_value(self): + def tmp_value(self) -> None | str: return self._tmp_value @tmp_value.setter - def tmp_value(self, value): + def tmp_value(self, value: float) -> None: logger.debug(f"Show value: <{value}>") self.timer.cancel() self._tmp_value = format_value(value) self.show_tmp() - def show_tmp(self): + def show_tmp(self) -> None: self._tmp = True self.set_pixels( self.tmp_value, @@ -86,7 +86,12 @@ class Screen: self.timer = Timer(3, self.set_pixels) self.timer.start() - def set_pixels(self, value=None, color=None, bg_color=[0, 0, 0]): + def set_pixels( + self, + value: str | None = None, + color: tuple[int, int, int] | None = None, + bg_color: tuple[int, int, int] = (0, 0, 0), + ) -> None: if value is None: value = self.value self._tmp = False @@ -99,32 +104,32 @@ class Screen: for x in self.font.draw(value, mode=0).crop(8, 7).todata(3) ] else: - pixels = 48 * [[0, 0, 0]] + pixels = 48 * [(0, 0, 0)] pixels += self.secondary_pixels self.sense.set_pixels(pixels) @property - def secondary(self): + def secondary(self) -> dict[int, str]: return self._secondary @secondary.setter - def secondary(self, value): + def secondary(self, value: dict[int, str]) -> None: self._secondary = value for idx in range(2): self._secondary_pixels[4 * idx : 4 * (idx + 1)] = [ - COLORS.get(value.get(idx, None), [0, 0, 0]) + COLORS.get(value.get(idx, None), (0, 0, 0)) ] * 4 if not self._tmp: self.set_pixels() @property - def secondary_pixels(self): + def secondary_pixels(self) -> list[tuple[int, int, int]]: return self._secondary_pixels - def stick_click(self, event): + def stick_click(self, event: InputEvent) -> None: match (event.action, self._held): case ("held", False): self._held = True @@ -134,7 +139,7 @@ class Screen: self.show_tmp() -def format_value(value): +def format_value(value: float) -> str: v = math.trunc(value) d = "." if (value - v) >= 0.5 else "" return f"{v}{d}" diff --git a/oin_thermostat/select.py b/oin_thermostat/select.py index a6c1976..f8ca95f 100644 --- a/oin_thermostat/select.py +++ b/oin_thermostat/select.py @@ -1,18 +1,19 @@ import logging import math +from collections.abc import Callable -from sense_hat import ACTION_HELD, ACTION_RELEASED, SenseHat +from sense_hat import ACTION_HELD, ACTION_RELEASED, InputEvent, SenseHat logger = logging.getLogger(__name__) class Selector: - def __init__(self, send_data): + def __init__(self, send_data: Callable[[dict[str, float | str | None]], None]): self.stick = SenseHat().stick self.temperature = None self.mode = None self.switch = None - self.preset_modes = [] + self.preset_modes: list[str] = [] self.send_data = send_data self.switch_held = False self.default_data = {"temperature": None, "mode": None, "switch": None} @@ -26,7 +27,7 @@ class Selector: self.stick.direction_left = self.prev_mode @property - def ha_options(self): + def ha_options(self) -> dict[str, dict[str, str]]: return { "temperature": { "p": "sensor", @@ -55,36 +56,42 @@ class Selector: }, } - def increase_temperature(self, event): + def increase_temperature(self, event: InputEvent) -> None: if event.action != ACTION_RELEASED and self.temperature is not None: - self.callback(temperature=math.floor(self.temperature * 2) / 2 + 0.5) + self.callback({"temperature": math.floor(self.temperature * 2) / 2 + 0.5}) - def decrease_temperature(self, event): + def decrease_temperature(self, event: InputEvent) -> None: if event.action != ACTION_RELEASED and self.temperature is not None: - self.callback(temperature=math.ceil(self.temperature * 2) / 2 - 0.5) + self.callback({"temperature": math.ceil(self.temperature * 2) / 2 - 0.5}) - def next_mode(self, event): + def next_mode(self, event: InputEvent) -> None: if event.action != ACTION_RELEASED and self.mode is not None: self.callback( - mode=self.preset_modes[ - (self.preset_modes.index(self.mode) + 1) % len(self.preset_modes) - ] + { + "mode": self.preset_modes[ + (self.preset_modes.index(self.mode) + 1) + % len(self.preset_modes) + ] + } ) - def prev_mode(self, event): + def prev_mode(self, event: InputEvent) -> None: if event.action != ACTION_RELEASED and self.mode is not None: self.callback( - mode=self.preset_modes[ - (self.preset_modes.index(self.mode) - 1) % len(self.preset_modes) - ] + { + "mode": self.preset_modes[ + (self.preset_modes.index(self.mode) - 1) + % len(self.preset_modes) + ] + } ) - def toggle(self, event): + def toggle(self, event: InputEvent) -> None: if not self.switch_held and event.action == ACTION_HELD: self.switch_held = True - self.callback(switch="off" if self.switch else "heat") + self.callback({"switch": "off" if self.switch else "heat"}) elif self.switch_held and event.action == ACTION_RELEASED: self.switch_held = False - def callback(self, **kwargs): - self.send_data(self.default_data | kwargs) + def callback(self, data: dict[str, float | str | None]) -> None: + self.send_data(self.default_data | data)