Enable mypy strict compliance

This commit is contained in:
Edgar P. Burkhart 2024-12-09 12:16:26 +01:00
parent 9f0c6ba3db
commit 7b26d3a160
Signed by: edpibu
GPG key ID: 9833D3C5A25BD227
7 changed files with 151 additions and 90 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
__pycache__ __pycache__
/env /env
/out

View file

@ -13,7 +13,6 @@ repos:
hooks: hooks:
- id: flake8 - id: flake8
args: ["--max-line-length=88", "--extend-ignore=E203"] args: ["--max-line-length=88", "--extend-ignore=E203"]
exclude: "lyceedupaysdesoule/settings/|migrations"
- repo: https://github.com/pre-commit/mirrors-mypy - repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.13.0" rev: "v1.13.0"

4
mypy.ini Normal file
View file

@ -0,0 +1,4 @@
[mypy]
python_executable = ./env/bin/python
strict = True
pretty = True

View file

@ -8,7 +8,7 @@ logger = logging.getLogger(__name__)
config_path = "config.toml" config_path = "config.toml"
def main(): def main() -> int:
with open(config_path, "rb") as config_file: with open(config_path, "rb") as config_file:
config = tomllib.load(config_file) config = tomllib.load(config_file)
@ -18,7 +18,7 @@ def main():
if ha_config is None: if ha_config is None:
logger.error(f"Missing home assistant config in <{config_path}>") logger.error(f"Missing home assistant config in <{config_path}>")
logger.error(f"\t{config}") logger.error(f"\t{config}")
sys.exit(1) return 1
client = HAClient( client = HAClient(
ha_config.get("entity"), ha_config.get("entity"),
@ -26,10 +26,16 @@ def main():
mqtt_config=ha_config.get("mqtt"), mqtt_config=ha_config.get("mqtt"),
) )
client.connect() code = client.connect()
if code != 0:
return 1
client.loop() code = client.loop()
if code != 0:
return 1
return 0
if __name__ == "__main__": if __name__ == "__main__":
main() sys.exit(main())

View file

@ -1,10 +1,10 @@
import json import json
import logging import logging
import sys
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
from .screen import Screen from .screen import Screen
from .select import Selector from .select import Selector
@ -26,7 +26,7 @@ class HAClient:
self.state_topic = "oin/state" self.state_topic = "oin/state"
self.availability_topic = "oin/availability" self.availability_topic = "oin/availability"
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) self.client = mqtt.Client(CallbackAPIVersion.VERSION2)
username = self.config.get("username", None) username = self.config.get("username", None)
logger.debug(f"Setting up MQTT with user <{username}>") logger.debug(f"Setting up MQTT with user <{username}>")
self.client.username_pw_set( self.client.username_pw_set(
@ -52,28 +52,68 @@ class HAClient:
"cmps": self.selector.ha_options, "cmps": self.selector.ha_options,
} }
def connect(self) -> None: def connect(self) -> int:
self.client.will_set(self.availability_topic, "offline", retain=True) self.client.will_set(self.availability_topic, "offline", retain=True)
host = self.config.get("host") host = self.config.get("host")
port = self.config.get("port", 1883) port = self.config.get("port", 1883)
logger.debug(f"Connecting to <{host}> on port <{port}>.")
self.client.connect(host, port)
self.subscribe(entity_topic(self.entity), self.state_update) if host is None:
self.subscribe( logger.error("Host not found in config.")
logger.error(f"\t{self.config}")
return 1
if not isinstance(port, int):
logger.warning(f"Invalid port config : <{port}> ; using port 1883.")
port = 1883
logger.debug(f"Connecting to <{host}> on port <{port}>.")
code = self.client.connect(host, port)
if code != 0:
logger.error(f"Could not connect to host <{host}> on port <{port}>.")
return 1
code = self.subscribe(entity_topic(self.entity), self.state_update)
if code != 0:
return 1
code = self.subscribe(
[entity_topic(entity) for entity in self.secondary_entities], [entity_topic(entity) for entity in self.secondary_entities],
self.secondary_state_update, self.secondary_state_update,
) )
if code != 0:
return 1
self.publish("homeassistant/device/oin/config", self.ha_options, retain=True) m_info = self.publish_json(
self.client.publish(self.availability_topic, "online", retain=True) "homeassistant/device/oin/config", self.ha_options, retain=True
)
m_info.wait_for_publish(60)
if not m_info.is_published():
logger.error("Config message timed out")
return 1
def publish(self, topic: str, data: Any, **kwargs) -> mqtt.MQTTMessageInfo: m_info = self.publish(self.availability_topic, "online", retain=True)
logger.debug(f"Sending message on topic <{topic}>: {json.dumps(data)}") m_info.wait_for_publish(60)
return self.client.publish(topic, json.dumps(data), **kwargs) if not m_info.is_published():
logger.error("Availability message timed out")
return 1
def subscribe(self, topic: str | list[str], callback: Callable) -> None: return 0
def publish(
self, topic: str, data: str, retain: bool = False
) -> mqtt.MQTTMessageInfo:
logger.debug(f"Sending message on topic <{topic}>.")
return self.client.publish(topic, data, retain=retain)
def publish_json(
self, topic: str, data: Any, retain: bool = False
) -> mqtt.MQTTMessageInfo:
return self.publish(topic, json.dumps(data), retain)
def subscribe(
self,
topic: str | list[str],
callback: Callable[[mqtt.Client, Any, mqtt.MQTTMessage], None],
) -> MQTTErrorCode:
logger.debug(f"Subscribing to <{topic}>.") logger.debug(f"Subscribing to <{topic}>.")
match topic: match topic:
@ -87,39 +127,41 @@ class HAClient:
if code != 0: if code != 0:
logger.error(f"Failed subscribing to topic <{topic}> with code <{code}>.") logger.error(f"Failed subscribing to topic <{topic}> with code <{code}>.")
sys.exit(1) return code
def loop(self) -> mqtt.MQTTErrorCode: def loop(self) -> MQTTErrorCode:
logger.info("Starting MQTT client loop.") logger.info("Starting MQTT client loop.")
code = self.client.loop_forever(retry_first_connection=True) code = self.client.loop_forever(retry_first_connection=True)
if code != 0: if code != 0:
logger.error("MQTT client loop failed with code <{code}>.") logger.error("MQTT client loop failed with code <{code}>.")
return code
def state_update( def state_update(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) -> None: ) -> None:
logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.") data = message.payload.decode()
logger.debug(f"Message received on topic <{message.topic}>: {data}.")
subtopic = message.topic.rsplit("/", maxsplit=1)[1] subtopic = message.topic.rsplit("/", maxsplit=1)[1]
match subtopic: match subtopic:
case "current_temperature": case "current_temperature":
self.screen.value = parse(message) self.screen.value = json.loads(data)
case "temperature": case "temperature":
if (value := parse(message)) != self.selector.temperature: if (value := json.loads(data)) != self.selector.temperature:
self.screen.tmp_value = value self.screen.tmp_value = value
self.selector.temperature = value self.selector.temperature = value
case "hvac_action": case "hvac_action":
self.screen.mode = parse(message) self.screen.mode = json.loads(data)
case "preset_modes": case "preset_modes":
if (value := parse(message)) != self.selector.preset_modes: if (value := json.loads(data)) != self.selector.preset_modes:
self.selector.preset_modes = value self.selector.preset_modes = value
case "preset_mode": case "preset_mode":
if (value := parse(message)) != self.selector.mode: if (value := json.loads(data)) != self.selector.mode:
self.selector.mode = value self.selector.mode = value
case "state": case "state":
match message.payload.decode(): match data:
case "heat": case "heat":
self.selector.switch = True self.selector.switch = True
case "off": case "off":
@ -128,20 +170,17 @@ class HAClient:
def secondary_state_update( def secondary_state_update(
self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage self, client: mqtt.Client, userdata: Any, message: mqtt.MQTTMessage
) -> None: ) -> None:
logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.") data = message.payload.decode()
logger.debug(f"Message received on topic <{message.topic}>: {data}.")
_, grp, ent, subtopic = message.topic.split("/") _, grp, ent, subtopic = message.topic.split("/")
idx = self.secondary_entities.index(f"{grp}.{ent}") idx = self.secondary_entities.index(f"{grp}.{ent}")
if subtopic == "state": if subtopic == "state":
self.screen.secondary |= {idx: message.payload.decode()} self.screen.secondary |= {idx: data}
def send_data(self, data: Any) -> mqtt.MQTTMessageInfo: def send_data(self, data: Any) -> mqtt.MQTTMessageInfo:
return self.publish(self.state_topic, data) return self.publish_json(self.state_topic, data)
def parse(message: mqtt.MQTTMessage) -> Any:
return json.loads(message.payload.decode())
def entity_topic(entity: str, subtopic: str = "#") -> str: def entity_topic(entity: str, subtopic: str = "#") -> str:

View file

@ -3,36 +3,36 @@ import math
from threading import Timer from threading import Timer
import bdfparser import bdfparser
from sense_hat import SenseHat from sense_hat import InputEvent, SenseHat
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
COLORS = { COLORS = {
"Bleu": [0, 0, 255], "Bleu": (0, 0, 255),
"Blanc": [255, 255, 255], "Blanc": (255, 255, 255),
"Rouge": [255, 0, 0], "Rouge": (255, 0, 0),
"Verte": [0, 127, 31], "Verte": (0, 127, 31),
"Jaune": [255, 255, 0], "Jaune": (255, 255, 0),
"heat": [255, 0, 0], "heat": (255, 0, 0),
"heating": [255, 0, 0], "heating": (255, 0, 0),
"idle": [127, 0, 255], "idle": (127, 0, 255),
"off": [127, 127, 127], "off": (127, 127, 127),
"on_setting": [255, 255, 0], "on_setting": (255, 255, 0),
"off_setting": [255, 255, 255], "off_setting": (255, 255, 255),
None: [0, 0, 0], None: (0, 0, 0),
} }
class Screen: class Screen:
def __init__(self): def __init__(self) -> None:
self.sense = SenseHat() self.sense = SenseHat()
self._value = "" self._value = ""
self._tmp = False self._tmp = False
self._tmp_value = None self._tmp_value = ""
self._mode = None self._mode = ""
self.font = bdfparser.Font("src/tom-thumb.bdf") self.font = bdfparser.Font("src/tom-thumb.bdf")
self._secondary = dict() self._secondary: dict[int, str] = dict()
self._secondary_pixels = [[0, 0, 0]] * 8 self._secondary_pixels: list[tuple[int, int, int]] = [(0, 0, 0)] * 8
self.timer = Timer(0, self.set_pixels) self.timer = Timer(0, self.set_pixels)
@ -40,11 +40,11 @@ class Screen:
self.sense.stick.direction_middle = self.stick_click self.sense.stick.direction_middle = self.stick_click
@property @property
def value(self): def value(self) -> None | str:
return self._value return self._value
@value.setter @value.setter
def value(self, value): def value(self, value: float) -> None:
logger.debug(f"Updated value: <{value}>") logger.debug(f"Updated value: <{value}>")
self._value = format_value(value) self._value = format_value(value)
@ -52,32 +52,32 @@ class Screen:
self.set_pixels() self.set_pixels()
@property @property
def color(self): def color(self) -> tuple[int, int, int]:
return COLORS.get(self.mode, [0, 0, 0]) return COLORS.get(self.mode, (0, 0, 0))
@property @property
def mode(self): def mode(self) -> str:
return self._mode return self._mode
@mode.setter @mode.setter
def mode(self, value): def mode(self, value: str) -> None:
self._mode = value self._mode = value
if not self._tmp: if not self._tmp:
self.set_pixels() self.set_pixels()
@property @property
def tmp_value(self): def tmp_value(self) -> None | str:
return self._tmp_value return self._tmp_value
@tmp_value.setter @tmp_value.setter
def tmp_value(self, value): def tmp_value(self, value: float) -> None:
logger.debug(f"Show value: <{value}>") logger.debug(f"Show value: <{value}>")
self.timer.cancel() self.timer.cancel()
self._tmp_value = format_value(value) self._tmp_value = format_value(value)
self.show_tmp() self.show_tmp()
def show_tmp(self): def show_tmp(self) -> None:
self._tmp = True self._tmp = True
self.set_pixels( self.set_pixels(
self.tmp_value, self.tmp_value,
@ -86,7 +86,12 @@ class Screen:
self.timer = Timer(3, self.set_pixels) self.timer = Timer(3, self.set_pixels)
self.timer.start() self.timer.start()
def set_pixels(self, value=None, color=None, bg_color=[0, 0, 0]): def set_pixels(
self,
value: str | None = None,
color: tuple[int, int, int] | None = None,
bg_color: tuple[int, int, int] = (0, 0, 0),
) -> None:
if value is None: if value is None:
value = self.value value = self.value
self._tmp = False self._tmp = False
@ -99,32 +104,32 @@ class Screen:
for x in self.font.draw(value, mode=0).crop(8, 7).todata(3) for x in self.font.draw(value, mode=0).crop(8, 7).todata(3)
] ]
else: else:
pixels = 48 * [[0, 0, 0]] pixels = 48 * [(0, 0, 0)]
pixels += self.secondary_pixels pixels += self.secondary_pixels
self.sense.set_pixels(pixels) self.sense.set_pixels(pixels)
@property @property
def secondary(self): def secondary(self) -> dict[int, str]:
return self._secondary return self._secondary
@secondary.setter @secondary.setter
def secondary(self, value): def secondary(self, value: dict[int, str]) -> None:
self._secondary = value self._secondary = value
for idx in range(2): for idx in range(2):
self._secondary_pixels[4 * idx : 4 * (idx + 1)] = [ self._secondary_pixels[4 * idx : 4 * (idx + 1)] = [
COLORS.get(value.get(idx, None), [0, 0, 0]) COLORS.get(value.get(idx, None), (0, 0, 0))
] * 4 ] * 4
if not self._tmp: if not self._tmp:
self.set_pixels() self.set_pixels()
@property @property
def secondary_pixels(self): def secondary_pixels(self) -> list[tuple[int, int, int]]:
return self._secondary_pixels return self._secondary_pixels
def stick_click(self, event): def stick_click(self, event: InputEvent) -> None:
match (event.action, self._held): match (event.action, self._held):
case ("held", False): case ("held", False):
self._held = True self._held = True
@ -134,7 +139,7 @@ class Screen:
self.show_tmp() self.show_tmp()
def format_value(value): def format_value(value: float) -> str:
v = math.trunc(value) v = math.trunc(value)
d = "." if (value - v) >= 0.5 else "" d = "." if (value - v) >= 0.5 else ""
return f"{v}{d}" return f"{v}{d}"

View file

@ -1,18 +1,19 @@
import logging import logging
import math import math
from collections.abc import Callable
from sense_hat import ACTION_HELD, ACTION_RELEASED, SenseHat from sense_hat import ACTION_HELD, ACTION_RELEASED, InputEvent, SenseHat
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Selector: class Selector:
def __init__(self, send_data): def __init__(self, send_data: Callable[[dict[str, float | str | None]], None]):
self.stick = SenseHat().stick self.stick = SenseHat().stick
self.temperature = None self.temperature = None
self.mode = None self.mode = None
self.switch = None self.switch = None
self.preset_modes = [] self.preset_modes: list[str] = []
self.send_data = send_data self.send_data = send_data
self.switch_held = False self.switch_held = False
self.default_data = {"temperature": None, "mode": None, "switch": None} self.default_data = {"temperature": None, "mode": None, "switch": None}
@ -26,7 +27,7 @@ class Selector:
self.stick.direction_left = self.prev_mode self.stick.direction_left = self.prev_mode
@property @property
def ha_options(self): def ha_options(self) -> dict[str, dict[str, str]]:
return { return {
"temperature": { "temperature": {
"p": "sensor", "p": "sensor",
@ -55,36 +56,42 @@ class Selector:
}, },
} }
def increase_temperature(self, event): def increase_temperature(self, event: InputEvent) -> None:
if event.action != ACTION_RELEASED and self.temperature is not None: if event.action != ACTION_RELEASED and self.temperature is not None:
self.callback(temperature=math.floor(self.temperature * 2) / 2 + 0.5) self.callback({"temperature": math.floor(self.temperature * 2) / 2 + 0.5})
def decrease_temperature(self, event): def decrease_temperature(self, event: InputEvent) -> None:
if event.action != ACTION_RELEASED and self.temperature is not None: if event.action != ACTION_RELEASED and self.temperature is not None:
self.callback(temperature=math.ceil(self.temperature * 2) / 2 - 0.5) self.callback({"temperature": math.ceil(self.temperature * 2) / 2 - 0.5})
def next_mode(self, event): def next_mode(self, event: InputEvent) -> None:
if event.action != ACTION_RELEASED and self.mode is not None: if event.action != ACTION_RELEASED and self.mode is not None:
self.callback( self.callback(
mode=self.preset_modes[ {
(self.preset_modes.index(self.mode) + 1) % len(self.preset_modes) "mode": self.preset_modes[
(self.preset_modes.index(self.mode) + 1)
% len(self.preset_modes)
] ]
}
) )
def prev_mode(self, event): def prev_mode(self, event: InputEvent) -> None:
if event.action != ACTION_RELEASED and self.mode is not None: if event.action != ACTION_RELEASED and self.mode is not None:
self.callback( self.callback(
mode=self.preset_modes[ {
(self.preset_modes.index(self.mode) - 1) % len(self.preset_modes) "mode": self.preset_modes[
(self.preset_modes.index(self.mode) - 1)
% len(self.preset_modes)
] ]
}
) )
def toggle(self, event): def toggle(self, event: InputEvent) -> None:
if not self.switch_held and event.action == ACTION_HELD: if not self.switch_held and event.action == ACTION_HELD:
self.switch_held = True self.switch_held = True
self.callback(switch="off" if self.switch else "heat") self.callback({"switch": "off" if self.switch else "heat"})
elif self.switch_held and event.action == ACTION_RELEASED: elif self.switch_held and event.action == ACTION_RELEASED:
self.switch_held = False self.switch_held = False
def callback(self, **kwargs): def callback(self, data: dict[str, float | str | None]) -> None:
self.send_data(self.default_data | kwargs) self.send_data(self.default_data | data)