import json from threading import Timer import bdfparser from sense_hat import SenseHat class Display: def __init__(self, mqttc, uid, name): self.mqttc = mqttc self.sense = SenseHat() self.uid = uid self.name = name self.init_lights() self.mqttc.will_set(self.availability_topic, "offline", retain=True) @property def device(self): return { "identifiers": [self.uid], "name": self.name, } @property def availability_topic(self): return f"{self.uid}/display/availability" def init_lights(self): options = { "device": self.device, "availability_topic": self.availability_topic, } self.main_light = Light("LED", self.uid, "led", self.sense, **options) def publish_discovery(self): self.main_light.publish_discovery(self.mqttc) def publish_online(self): self.subscribe() self.mqttc.publish(self.availability_topic, "online", retain=True) def subscribe(self): self.main_light.subscribe(self.mqttc) def on_message(self, *args, **kwargs): self.main_light.on_message(*args, **kwargs) class Light: def __init__(self, name, parent_uid, slug, sense, **kwargs): self.name = name self.parent_uid = parent_uid self.slug = slug self.sense = sense self.options = kwargs self._switch = False self._color = [255, 255, 255] self._pre = "" self._value = "" self.font = bdfparser.Font("src/tom-thumb.bdf") self.timer = Timer(0, self.__init__) def publish_discovery(self, mqttc): mqttc.publish( self.discovery_topic, json.dumps( { "command_topic": self.command_topic("command"), "effect_command_topic": self.command_topic("effect"), "effect_list": ["Low Light", "Normal"], "effect_state_topie": self.state_topic, "effect_value_template": "{{ value_json.effect }}", "icon": "mdi:dots-grid", "name": self.name, "on_command_type": "brightness", "rgb_command_topic": self.command_topic("rgb"), "rgb_state_topic": self.state_topic, "rgb_value_template": "{{ value_json.rgb }}", "retain": True, "unique_id": self.uid, "state_topic": self.state_topic, "state_value_template": "{{ value_json.state }}", } | self.options ), retain=True, ) self.publish_state(mqttc) def subscribe(self, mqttc): mqttc.subscribe(self.command_topic("command")) mqttc.subscribe(self.command_topic("effect")) mqttc.subscribe(self.command_topic("rgb")) mqttc.subscribe(self.command_topic("value")) def on_message(self, client, userdata, message): data = message.payload.decode() print(data) match message.topic.rsplit("/", maxsplit=1): case [self.base_topic, "command"]: self.switch = data == "ON" case [self.base_topic, "rgb"]: self.color = list(map(int, data.split(","))) case [self.base_topic, "effect"]: self.sense.low_light = data == "Low Light" case [self.base_topic, "value"]: self.value = data case _: return self.publish_state(client) def publish_state(self, mqttc): mqttc.publish( self.state_topic, json.dumps( { "effect": "Low Light" if self.sense.low_light else "Normal", "rgb": self.rgb, "state": self.state, } ), retain=True, ) @property def uid(self): return f"{self.parent_uid}_{self.slug}" @property def discovery_topic(self): return f"homeassistant/light/{self.uid}/config" @property def base_topic(self): return f"{self.parent_uid}/display/{self.slug}" def command_topic(self, cmd): return f"{self.base_topic}/{cmd}" @property def state_topic(self): return f"{self.parent_uid}/display/{self.slug}/state" @property def switch(self): return self._switch @switch.setter def switch(self, value): self._switch = value if value: self.update_value() else: self.sense.clear() @property def state(self): return "ON" if self.switch else "OFF" @property def color(self): return self._color @color.setter def color(self, value): self._color = value if not self.switch: self.switch = True self.display_value() @property def rgb(self): return ",".join(map(str, self.color)) @property def value(self): return f"{self._pre} {self._value}" @value.setter def value(self, value): match value.split(): case [val]: self._pre = "" self._value = val case [pre, val, *_]: self._pre = pre self._value = val self.display_value() def update_value(self): if not self.switch: return if not self._pre: self.display_value() return self.timer.cancel() pixels = self.to_pixels(self._pre) self.sense.set_pixels(pixels) self.timer = Timer(1, self.display_value, kwargs=dict(timer=True)) self.timer.start() def display_value(self, timer=False): if (not timer and self.timer.is_alive()) or not self.switch: return pixels = self.to_pixels(self._value) self.sense.set_pixels(pixels) def to_pixels(self, text): if text: return [ self.color if x else [0, 0, 0] for x in self.font.draw(text).crop(8, 8, yoff=-2).todata(3) ] return [self.color] * 64