diff --git a/oin_ha/__main__.py b/oin_ha/__main__.py deleted file mode 100644 index 0dff414..0000000 --- a/oin_ha/__main__.py +++ /dev/null @@ -1,44 +0,0 @@ -from threading import Timer - -import paho.mqtt.client as mqtt - -from .display import Display -from .sensors import Sensors - -dt = 10 -mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) -hat_sensors = Sensors(mqttc, "oin", "Oin") -hat_display = Display(mqttc, "oin", "Oin") - - -@mqttc.connect_callback() -def on_connect(client, userdata, flags, reason_code, properties): - print(f"Connected with result code {reason_code}") - - hat_sensors.publish_discovery() - hat_display.publish_discovery() - - hat_sensors.publish_online() - hat_display.publish_online() - - timer = Timer(0, send_data) - timer.start() - - -@mqttc.message_callback() -def on_message(*args, **kwargs): - hat_display.on_message(*args, **kwargs) - - -mqttc.username_pw_set(username="oin", password="n+Bi58l7LxbH5nEJ") -mqttc.connect("homeassistant.local", 1883, 60) - - -def send_data(): - timer = Timer(dt, send_data) - timer.start() - - hat_sensors.publish_state() - - -mqttc.loop_forever() diff --git a/oin_ha/display/__init__.py b/oin_ha/display/__init__.py deleted file mode 100644 index fa628d3..0000000 --- a/oin_ha/display/__init__.py +++ /dev/null @@ -1,282 +0,0 @@ -import json -from threading import Timer - -import bdfparser -from sense_hat import ACTION_HELD, ACTION_RELEASED, SenseHat - - -class Display: - def __init__(self, mqttc, uid, name): - self.mqttc = mqttc - self.sense = SenseHat() - self.uid = uid - self.name = name - - self.init_lights() - self.mqttc.will_set(self.availability_topic, "offline", retain=True) - - @property - def device(self): - return { - "identifiers": [self.uid], - "name": self.name, - } - - @property - def availability_topic(self): - return f"{self.uid}/display/availability" - - def init_lights(self): - options = { - "device": self.device, - "availability_topic": self.availability_topic, - } - - self.main_light = Light( - "LED", self.uid, "led", self.sense, mqttc=self.mqttc, **options - ) - - def publish_discovery(self): - self.main_light.publish_discovery() - - def publish_online(self): - self.subscribe() - self.mqttc.publish(self.availability_topic, "online", retain=True) - - def subscribe(self): - self.main_light.subscribe() - - def on_message(self, *args, **kwargs): - self.main_light.on_message(*args, **kwargs) - - -class Light: - _colors = { - "Bleu": [0, 0, 255], - "Blanc": [255, 255, 255], - "Rouge": [255, 0, 0], - "Verte": [0, 255, 0], - "Jaune": [255, 255, 0], - } - - def __init__(self, name, parent_uid, slug, sense, mqttc, n=6, **kwargs): - self.name = name - self.parent_uid = parent_uid - self.slug = slug - self.sense = sense - self.mqttc = mqttc - self.options = kwargs - - self._switch = False - self._color = [[255, 255, 255]] * n - self._pres = [""] * n - self._values = [""] * n - self._n = n - self._i = 0 - self._toggled = False - - self.font = bdfparser.Font("src/tom-thumb.bdf") - self.timer = Timer(0, self.__init__) - - self.sense.stick.direction_right = self.switch_screen - self.sense.stick.direction_left = self.switch_screen_rev - self.sense.stick.direction_middle = self.toggle - - def publish_discovery(self): - for i in range(self._n): - self.mqttc.publish( - self.get_discovery_topic(i), - json.dumps( - { - "command_topic": self.command_topic(i, "command"), - "effect_command_topic": self.command_topic(i, "effect"), - "effect_list": ["Low Light", "Normal"], - "effect_state_topic": self.state_topic, - "effect_value_template": "{{ value_json.effect }}", - "icon": "mdi:dots-grid", - "name": f"{self.name} {i}", - "on_command_type": "brightness", - "rgb_command_topic": self.command_topic(i, "rgb"), - "rgb_state_topic": self.state_topic, - "rgb_value_template": "{{" + f"value_json.rgb[{i}]" + "}}", - "retain": True, - "unique_id": f"{self.uid}_{i}", - "state_topic": self.state_topic, - "state_value_template": "{{" + f"value_json.state[{i}]" + "}}", - } - | self.options - ), - retain=True, - ) - self.publish_state() - - def subscribe(self): - for i in range(self._n): - self.mqttc.subscribe(self.command_topic(i, "command")) - self.mqttc.subscribe(self.command_topic(i, "effect")) - self.mqttc.subscribe(self.command_topic(i, "rgb")) - self.mqttc.subscribe(self.command_topic(i, "value")) - self.mqttc.subscribe(self.command_topic(i, "action_color")) - - def on_message(self, client, userdata, message): - data = message.payload.decode() - - match message.topic.rsplit("/", maxsplit=2): - case [self.base_topic, i, "command"]: - self.switch = False - case [self.base_topic, i, "rgb"]: - self._i = int(i) - self.set_color(int(i), list(map(int, data.split(",")))) - case [self.base_topic, i, "effect"]: - self.low_light = data == "Low Light" - case [self.base_topic, i, "value"]: - self.set_value(int(i), data) - case [self.base_topic, i, "action_color"]: - self.set_color(int(i), self._colors.get(data, [0, 0, 0]), switch=False) - case _: - return - - def publish_state(self): - self.mqttc.publish( - self.state_topic, - json.dumps( - { - "effect": "Low Light" if self.low_light else "Normal", - "rgb": self.rgb, - "state": [ - self.state if i == self._i else "OFF" for i in range(self._n) - ], - } - ), - retain=True, - ) - - @property - def uid(self): - return f"{self.parent_uid}_{self.slug}" - - def get_discovery_topic(self, i): - return f"homeassistant/light/{self.uid}_{i}/config" - - @property - def base_topic(self): - return f"{self.parent_uid}/display/{self.slug}" - - def command_topic(self, i, cmd): - return f"{self.base_topic}/{i}/{cmd}" - - @property - def state_topic(self): - return f"{self.parent_uid}/display/{self.slug}/state" - - @property - def switch(self): - return self._switch - - @switch.setter - def switch(self, value): - self._switch = value - if value: - self.update_value() - else: - self.sense.clear() - self.publish_state() - - @property - def state(self): - return "ON" if self.switch else "OFF" - - @property - def color(self): - return self._color - - def set_color(self, i, value, switch=True): - self._color[i] = value - if switch and not self.switch: - self.switch = True - if switch: - self.update_value() - elif i == self._i: - self.display_value() - self.publish_state() - - @property - def rgb(self): - return [",".join(map(str, self.color[i])) for i in range(self._n)] - - @property - def value(self): - return f"{self._pres[self._i]} {self._values[self._i]}" - - @property - def low_light(self): - return self.sense.low_light - - @low_light.setter - def low_light(self, value): - self.sense.low_light = value - self.publish_state() - - def set_value(self, i, value): - match value.split(): - case [val]: - self._pres[i] = "" - self._values[i] = val - case [pre, val, *_]: - self._pres[i] = pre - self._values[i] = val - if i == self._i: - self.display_value() - self.publish_state() - - def update_value(self): - if not self.switch: - return - - if not self._pres[self._i]: - self.display_value() - return - - pixels = self.to_pixels(self._pres[self._i]) - self.sense.set_pixels(pixels) - - self.timer = Timer(0.5, self.display_value, kwargs=dict(timer=True)) - self.timer.start() - - def display_value(self, timer=False): - if (not timer and self.timer.is_alive()) or not self.switch: - return - self.timer.cancel() - - pixels = self.to_pixels(self._values[self._i]) - self.sense.set_pixels(pixels) - - def to_pixels(self, text): - if text: - return [ - self.color[self._i] if x else [0, 0, 0] - for x in self.font.draw(text).crop(8, 8).todata(3) - ] - - return [self.color[self._i]] * 64 - - def switch_screen(self, event): - if event.action == ACTION_RELEASED: - self._i = (self._i + 1) % self._n - self.switch = True - - def switch_screen_rev(self, event): - if event.action == ACTION_RELEASED: - self._i = (self._i - 1) % self._n - self.switch = True - - def toggle(self, event): - if event.action == ACTION_HELD: - if not self._toggled: - self.low_light = not self.low_light - self._toggled = True - if event.action == ACTION_RELEASED: - if self._toggled: - self._toggled = False - else: - self.switch = not self.switch diff --git a/oin_ha/sensors/__init__.py b/oin_ha/sensors/__init__.py deleted file mode 100644 index 1ca1595..0000000 --- a/oin_ha/sensors/__init__.py +++ /dev/null @@ -1,129 +0,0 @@ -import json - -from sense_hat import SenseHat - - -class Sensors: - def __init__(self, mqttc, uid, name): - self.mqttc = mqttc - self.sense = SenseHat() - self.uid = uid - self.name = name - - self.init_sensors() - self.mqttc.will_set(self.availability_topic, "offline", retain=True) - - @property - def device(self): - return { - "identifiers": [self.uid], - "name": self.name, - } - - @property - def availability_topic(self): - return f"{self.uid}/availability" - - @property - def state_topic(self): - return f"{self.uid}/state" - - def init_sensors(self): - options = { - "device": self.device, - "availability_topic": self.availability_topic, - "state_topic": self.state_topic, - "entity_category": "diagnostic", - } - - self.sensors = [ - TemperatureSensor( - "Température", - self.uid, - "temperature", - self.sense.get_temperature, - **options, - ), - HumiditySensor( - "Humidité", self.uid, "humidity", self.sense.get_humidity, **options - ), - PressureSensor( - "Pression", self.uid, "pressure", self.sense.get_pressure, **options - ), - ] - - def publish_discovery(self): - for sensor in self.sensors: - sensor.publish_discovery(self.mqttc) - - def publish_online(self): - self.mqttc.publish(self.availability_topic, "online", retain=True) - - def publish_state(self): - self.mqttc.publish( - self.state_topic, - json.dumps({sensor.slug: sensor.get_value() for sensor in self.sensors}), - ) - - -class SingleSensor: - def __init__(self, name, parent_uid, slug, get_value, **kwargs): - self.name = name - self.parent_uid = parent_uid - self.slug = slug - self.get_value = get_value - self.options = kwargs - - def publish_discovery(self, mqttc): - mqttc.publish( - self.discovery_topic, - json.dumps( - { - "name": self.name, - "state_class": "MEASUREMENT", - "unique_id": self.uid, - "value_template": self.value_template, - } - | self.options - ), - retain=True, - ) - - @property - def uid(self): - return f"{self.parent_uid}_{self.slug}" - - @property - def discovery_topic(self): - return f"homeassistant/sensor/{self.uid}/config" - - @property - def value_template(self): - return "{{" + f"value_json.{self.slug}" + "}}" - - -class TemperatureSensor(SingleSensor): - def __init__(self, name, parent_uid, slug, get_value, **kwargs): - super().__init__(name, parent_uid, slug, get_value, **kwargs) - self.options["device_class"] = "temperature" - self.options["icon"] = "mdi:thermometer" - self.options["suggested_display_precision"] = 1 - self.options["unit_of_measurement"] = "°C" - - -class HumiditySensor(SingleSensor): - def __init__(self, name, parent_uid, slug, get_value, **kwargs): - super().__init__(name, parent_uid, slug, get_value, **kwargs) - self.options["device_class"] = "humidity" - self.options["icon"] = "mdi:water-percent" - self.options["suggested_display_precision"] = 0 - self.options["unit_of_measurement"] = "%" - - -class PressureSensor(SingleSensor): - def __init__(self, name, parent_uid, slug, get_value, **kwargs): - super().__init__(name, parent_uid, slug, get_value, **kwargs) - self.options["device_class"] = "pressure" - self.options["icon"] = "mdi:gauge" - self.options["suggested_display_precision"] = 0 - self.options["unit_of_measurement"] = "mbar" diff --git a/oin_thermostat/__main__.py b/oin_thermostat/__main__.py new file mode 100644 index 0000000..853abdb --- /dev/null +++ b/oin_thermostat/__main__.py @@ -0,0 +1,15 @@ +import logging + +from .mqtt import HAClient + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + + client = HAClient( + "climate.chaudiere", + ["sensor.esptic_tempo", "sensor.rte_tempo_prochaine_couleur"], + ) + + client.connect() + + client.loop() diff --git a/oin_thermostat/mqtt.py b/oin_thermostat/mqtt.py new file mode 100644 index 0000000..7a71055 --- /dev/null +++ b/oin_thermostat/mqtt.py @@ -0,0 +1,118 @@ +import json +import logging + +import paho.mqtt.client as mqtt + +from .screen import Screen +from .select import Selector + +logger = logging.getLogger(__name__) + + +class HAClient: + def __init__(self, entity, secondary_entities=[]): + self.entity = entity + self.secondary_entities = secondary_entities + + self.state_topic = "oin/state" + self.availability_topic = "oin/availability" + + self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) + self.client.username_pw_set(username="oin", password="n+Bi58l7LxbH5nEJ") + + self.screen = Screen() + self.selector = Selector(self.send_data) + + @property + def ha_options(self): + return { + "dev": { + "ids": "oin", + "name": "Oin", + }, + "o": { + "name": "Oin", + }, + "availability_topic": self.availability_topic, + "state_topic": self.state_topic, + "cmps": self.selector.ha_options, + } + + def connect(self): + logger.debug("Connecting to HA...") + self.client.will_set(self.availability_topic, "offline", retain=True) + self.client.connect("homeassistant.local", 1883, 60) + + self.subscribe(entity_topic(self.entity), self.state_update) + for entity in self.secondary_entities: + self.subscribe(entity_topic(entity, "state"), self.secondary_state_update) + + self.publish("homeassistant/device/oin/config", self.ha_options, retain=True) + self.client.publish(self.availability_topic, "online", retain=True) + + def publish(self, topic, data, **kwargs): + logger.debug(f"Sending message on topic <{topic}>: {json.dumps(data)}") + self.client.publish(topic, json.dumps(data), **kwargs) + + def subscribe(self, topic, callback): + logger.debug(f"Subscribe to <{topic}>") + self.client.subscribe(topic) + self.client.message_callback_add(topic, callback) + + def unsubscribe(self, topic): + logger.debug(f"Unsubscribe from <{topic}>") + self.client.unsubscribe(topic) + + def loop(self): + logger.info("Starting MQTT client loop") + self.client.loop_forever() + + def state_update(self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage): + logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.") + + subtopic = message.topic.rsplit("/", maxsplit=1)[1] + + match subtopic: + case "current_temperature": + self.screen.value = parse(message) + case "temperature": + if (value := parse(message)) != self.selector.temperature: + self.screen.tmp_value = value + self.selector.temperature = value + case "hvac_action": + self.screen.mode = parse(message) + case "preset_modes": + if (value := parse(message)) != self.selector.preset_modes: + self.selector.preset_modes = value + case "preset_mode": + if (value := parse(message)) != self.selector.mode: + self.selector.mode = value + case "state": + match message.payload.decode(): + case "heat": + self.selector.switch = True + case "off": + self.selector.switch = False + + def secondary_state_update( + self, client: mqtt.Client, userdata, message: mqtt.MQTTMessage + ): + logger.debug(f"Message received on topic <{message.topic}>: {message.payload}.") + + _, grp, ent, subtopic = message.topic.split("/") + idx = self.secondary_entities.index(f"{grp}.{ent}") + + if subtopic == "state": + self.screen.secondary |= {idx: message.payload.decode()} + + def send_data(self, data): + self.publish(self.state_topic, data) + + +def parse(message): + return json.loads(message.payload.decode()) + + +def entity_topic(entity, subtopic="#"): + topic = entity.replace(".", "/") + return f"homeassistant/{topic}/{subtopic}" diff --git a/oin_thermostat/screen.py b/oin_thermostat/screen.py new file mode 100644 index 0000000..9a32a95 --- /dev/null +++ b/oin_thermostat/screen.py @@ -0,0 +1,140 @@ +import logging +import math +from threading import Timer + +import bdfparser +from sense_hat import SenseHat + +logger = logging.getLogger(__name__) + +COLORS = { + "Bleu": [0, 0, 255], + "Blanc": [0, 0, 0], + "Rouge": [255, 0, 0], + "Verte": [0, 127, 31], + "Jaune": [255, 255, 0], + "heat": [255, 0, 0], + "heating": [255, 0, 0], + "idle": [127, 0, 255], + "off": [127, 127, 127], + "on_setting": [255, 255, 0], + "off_setting": [255, 255, 255], + None: [0, 0, 0], +} + + +class Screen: + def __init__(self): + self.sense = SenseHat() + self._value = "" + self._tmp = False + self._tmp_value = None + self._mode = None + self.font = bdfparser.Font("src/tom-thumb.bdf") + self._secondary = dict() + self._secondary_pixels = [[0, 0, 0]] * 8 + + self.timer = Timer(0, self.set_pixels) + + self._held = False + self.sense.stick.direction_middle = self.stick_click + + @property + def value(self): + return self._value + + @value.setter + def value(self, value): + logger.debug(f"Updated value: <{value}>") + + self._value = format_value(value) + if not self._tmp: + self.set_pixels() + + @property + def color(self): + return COLORS.get(self.mode, [0, 0, 0]) + + @property + def mode(self): + return self._mode + + @mode.setter + def mode(self, value): + self._mode = value + if not self._tmp: + self.set_pixels() + + @property + def tmp_value(self): + return self._tmp_value + + @tmp_value.setter + def tmp_value(self, value): + logger.debug(f"Show value: <{value}>") + + self.timer.cancel() + self._tmp_value = format_value(value) + self.show_tmp() + + def show_tmp(self): + self._tmp = True + self.set_pixels( + self.tmp_value, + color=COLORS.get("off_setting" if self.mode == "off" else "on_setting"), + ) + self.timer = Timer(3, self.set_pixels) + self.timer.start() + + def set_pixels(self, value=None, color=None, bg_color=[0, 0, 0]): + if value is None: + value = self.value + self._tmp = False + if color is None: + color = self.color + + if value: + pixels = [ + color if x else bg_color + for x in self.font.draw(value, mode=0).crop(8, 7).todata(3) + ] + else: + pixels = 48 * [[0, 0, 0]] + pixels += self.secondary_pixels + + self.sense.set_pixels(pixels) + + @property + def secondary(self): + return self._secondary + + @property + def secondary_pixels(self): + return self._secondary_pixels + + @secondary.setter + def secondary(self, value): + self._secondary = value + + for idx in range(2): + self._secondary_pixels[4 * idx : 4 * (idx + 1)] = [ + COLORS.get(value.get(idx, None), [0, 0, 0]) + ] * 4 + + if not self._tmp: + self.set_pixels() + + def stick_click(self, event): + match (event.action, self._held): + case ("held", False): + self._held = True + case ("released", True): + self._held = False + case ("released", False): + self.show_tmp() + + +def format_value(value): + v = math.trunc(value) + d = "." if (value - v) >= 0.5 else "" + return f"{v}{d}" diff --git a/oin_thermostat/select.py b/oin_thermostat/select.py new file mode 100644 index 0000000..a6c1976 --- /dev/null +++ b/oin_thermostat/select.py @@ -0,0 +1,90 @@ +import logging +import math + +from sense_hat import ACTION_HELD, ACTION_RELEASED, SenseHat + +logger = logging.getLogger(__name__) + + +class Selector: + def __init__(self, send_data): + self.stick = SenseHat().stick + self.temperature = None + self.mode = None + self.switch = None + self.preset_modes = [] + self.send_data = send_data + self.switch_held = False + self.default_data = {"temperature": None, "mode": None, "switch": None} + + self.stick.direction_middle = self.toggle + + self.stick.direction_up = self.increase_temperature + self.stick.direction_down = self.decrease_temperature + + self.stick.direction_right = self.next_mode + self.stick.direction_left = self.prev_mode + + @property + def ha_options(self): + return { + "temperature": { + "p": "sensor", + "value_template": "{{ value_json.temperature }}", + "unique_id": "oin_temp", + "device_class": "temperature", + "entity_category": "diagnostic", + "icon": "mdi:thermostat", + "unit_of_measurement": "°C", + }, + "mode": { + "p": "sensor", + "name": "Mode", + "value_template": "{{ value_json.mode }}", + "unique_id": "oin_mode", + "entity_category": "diagnostic", + "icon": "mdi:thermostat-auto", + }, + "switch": { + "p": "sensor", + "name": "Switch", + "value_template": "{{ value_json.switch }}", + "unique_id": "oin_switch", + "entity_category": "diagnostic", + "icon": "mdi:thermostat-auto", + }, + } + + def increase_temperature(self, event): + if event.action != ACTION_RELEASED and self.temperature is not None: + self.callback(temperature=math.floor(self.temperature * 2) / 2 + 0.5) + + def decrease_temperature(self, event): + if event.action != ACTION_RELEASED and self.temperature is not None: + self.callback(temperature=math.ceil(self.temperature * 2) / 2 - 0.5) + + def next_mode(self, event): + if event.action != ACTION_RELEASED and self.mode is not None: + self.callback( + mode=self.preset_modes[ + (self.preset_modes.index(self.mode) + 1) % len(self.preset_modes) + ] + ) + + def prev_mode(self, event): + if event.action != ACTION_RELEASED and self.mode is not None: + self.callback( + mode=self.preset_modes[ + (self.preset_modes.index(self.mode) - 1) % len(self.preset_modes) + ] + ) + + def toggle(self, event): + if not self.switch_held and event.action == ACTION_HELD: + self.switch_held = True + self.callback(switch="off" if self.switch else "heat") + elif self.switch_held and event.action == ACTION_RELEASED: + self.switch_held = False + + def callback(self, **kwargs): + self.send_data(self.default_data | kwargs) diff --git a/oin_thermostat/src/tom-thumb.bdf b/oin_thermostat/src/tom-thumb.bdf new file mode 100644 index 0000000..e69de29