import json from threading import Timer import bdfparser from sense_hat import ACTION_RELEASED, SenseHat class Display: def __init__(self, mqttc, uid, name): self.mqttc = mqttc self.sense = SenseHat() self.uid = uid self.name = name self.init_lights() self.mqttc.will_set(self.availability_topic, "offline", retain=True) @property def device(self): return { "identifiers": [self.uid], "name": self.name, } @property def availability_topic(self): return f"{self.uid}/display/availability" def init_lights(self): options = { "device": self.device, "availability_topic": self.availability_topic, } self.main_light = Light("LED", self.uid, "led", self.sense, **options) def publish_discovery(self): self.main_light.publish_discovery(self.mqttc) def publish_online(self): self.subscribe() self.mqttc.publish(self.availability_topic, "online", retain=True) def subscribe(self): self.main_light.subscribe(self.mqttc) def on_message(self, *args, **kwargs): self.main_light.on_message(*args, **kwargs) class Light: def __init__(self, name, parent_uid, slug, sense, n=3, **kwargs): self.name = name self.parent_uid = parent_uid self.slug = slug self.sense = sense self.options = kwargs self._switch = False self._color = [[255, 255, 255]] * n self._pres = [""] * n self._values = [""] * n self._n = n self._i = 0 self.font = bdfparser.Font("src/tom-thumb.bdf") self.timer = Timer(0, self.__init__) self.sense.stick.direction_right = self.switch_screen self.sense.stick.direction_left = self.switch_screen_rev def publish_discovery(self, mqttc): for i in range(self._n): mqttc.publish( self.get_discovery_topic(i), json.dumps( { "command_topic": self.command_topic(i, "command"), "effect_command_topic": self.command_topic(i, "effect"), "effect_list": ["Low Light", "Normal"], "effect_state_topie": self.state_topic, "effect_value_template": "{{ value_json.effect }}", "icon": "mdi:dots-grid", "name": f"{self.name} {i}", "on_command_type": "brightness", "rgb_command_topic": self.command_topic(i, "command"), "rgb_state_topic": self.state_topic, "rgb_value_template": "{{" + f"value_json.rgb[{i}]" + "}}", "retain": True, "unique_id": f"{self.uid}_{i}", "state_topic": self.state_topic, "state_value_template": "{{ value_json.state }}", } | self.options ), retain=True, ) self.publish_state(mqttc) def subscribe(self, mqttc): for i in range(self._n): mqttc.subscribe(self.command_topic(i, "command")) mqttc.subscribe(self.command_topic(i, "effect")) mqttc.subscribe(self.command_topic(i, "rgb")) mqttc.subscribe(self.command_topic(i, "value")) def on_message(self, client, userdata, message): data = message.payload.decode() print(message.topic) print(data) match message.topic.rsplit("/", maxsplit=2): case [self.base_topic, i, "command"]: match data.split(","): case ["OFF"]: self.switch = False case [*rgb]: self.set_color(int(i), list(map(int, rgb))) case [self.base_topic, i, "effect"]: self.sense.low_light = data == "Low Light" case [self.base_topic, i, "value"]: self.set_value(int(i), data) case _: return self.publish_state(client) def publish_state(self, mqttc): mqttc.publish( self.state_topic, json.dumps( { "effect": "Low Light" if self.sense.low_light else "Normal", "rgb": self.rgb, "state": self.state, } ), retain=True, ) @property def uid(self): return f"{self.parent_uid}_{self.slug}" def get_discovery_topic(self, i): return f"homeassistant/light/{self.uid}_{i}/config" @property def base_topic(self): return f"{self.parent_uid}/display/{self.slug}" def command_topic(self, i, cmd): return f"{self.base_topic}/{i}/{cmd}" @property def state_topic(self): return f"{self.parent_uid}/display/{self.slug}/state" @property def switch(self): return self._switch @switch.setter def switch(self, value): self._switch = value if value: self.update_value() else: self.sense.clear() @property def state(self): return "ON" if self.switch else "OFF" @property def color(self): return self._color def set_color(self, i, value): self._color[i] = value if not self.switch: self.switch = True if i == self._i: self.display_value() @property def rgb(self): return [",".join(map(str, self.color[i])) for i in range(self._n)] @property def value(self): return f"{self._pres[self._i]} {self._values[self._i]}" def set_value(self, i, value): match value.split(): case [val]: self._pres[i] = "" self._values[i] = val case [pre, val, *_]: self._pres[i] = pre self._values[i] = val if i == self._i: self.display_value() def update_value(self): if not self.switch: return if not self._pres[self._i]: self.display_value() return self.timer.cancel() pixels = self.to_pixels(self._pres[self._i]) print(pixels) self.sense.set_pixels(pixels) self.timer = Timer(0.25, self.display_value, kwargs=dict(timer=True)) self.timer.start() def display_value(self, timer=False): if (not timer and self.timer.is_alive()) or not self.switch: return pixels = self.to_pixels(self._values[self._i]) self.sense.set_pixels(pixels) def to_pixels(self, text): if text: return [ self.color[self._i] if x else [0, 0, 0] for x in self.font.draw(text).crop(8, 8, yoff=-2).todata(3) ] return [self.color[self._i]] * 64 def switch_screen(self, event): if event.action == ACTION_RELEASED: self._i = (self._i + 1) % self._n self.update_value() print(f"switch to {self._i}") def switch_screen_rev(self, event): if event.action == ACTION_RELEASED: self._i = (self._i - 1) % self._n self.update_value() print(f"switch to {self._i}")