hasspy/hasspy/mqtt.py
Edgar P. Burkhart e54223e361
feat: Add cover art support for user client
This commit adds support for displaying cover art from the currently playing media in the Home Assistant user client.

- The `HassUserClient` now publishes cover art to the `user/image/cover` MQTT topic.
- The `publish_cover` method uses `playerctl` to retrieve the art URL and `pillow` to convert the image to WebP format before publishing.
- The `components` property now includes an `image` component for cover art.
- The `cover` attribute on `HassUserClient` is used to avoid resending the same cover multiple times.
- The `publish_state` method has been extended to spawn a thread for updating the cover.
2025-03-09 18:51:38 +01:00

337 lines
10 KiB
Python

import io
import json
import logging
import re
from subprocess import run
from threading import Thread, Timer
from typing import Any, Mapping
from paho.mqtt.client import Client, MQTTMessage, MQTTMessageInfo
from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
from PIL import Image
log = logging.getLogger(__name__)
class HassClient(Client):
def __init__(self, node_id: str, config: Mapping[str, Any]) -> None:
super().__init__(CallbackAPIVersion.VERSION2)
self.node_id = node_id
self.config = config
username = self.config.get("username")
if username:
self.username_pw_set(username, self.config.get("password"))
self.interval = self.config.get("interval", 60)
self.power_on = True
self.timer = Timer(0, self.publish_state)
self.cover = ""
self.connect()
def connect(self, *args: Any, **kwargs: Any) -> MQTTErrorCode:
log.debug("Connecting to MQTT broker")
self.will_set(self.availability_topic, "offline", retain=True)
return super().connect(
self.config.get("host", ""), self.config.get("port", 1883)
)
def publish(self, *args: Any, **kwargs: Any) -> MQTTMessageInfo:
kwargs.setdefault("qos", 1)
kwargs.setdefault("retain", True)
return super().publish(*args, **kwargs)
def publish_json(self, *args: Any, **kwargs: Any) -> MQTTMessageInfo:
kwargs["payload"] = json.dumps(kwargs["payload"])
return self.publish(*args, **kwargs)
def publish_discovery(self) -> MQTTMessageInfo:
log.debug("Publishing discovery message")
return self.publish_json(self.discovery_topic, payload=self.discovery_payload)
def publish_availability(self) -> MQTTMessageInfo:
log.debug("Publishing availability message")
return self.publish(self.availability_topic, payload="online")
def init_subs(self) -> None:
log.debug("Initializing subscriptions")
self.subscribe(self.command_topic)
self.message_callback_add(self.command_topic, self.on_command)
def publish_state(self) -> MQTTMessageInfo:
self.timer = Timer(self.interval, self.publish_state)
self.timer.start()
log.debug("Publishing state message")
return self.publish_json(
topic=self.state_topic,
payload=self.state_payload,
)
def on_command(self, client: Client, userdata: Any, message: MQTTMessage) -> None:
payload = message.payload.decode("utf-8")
log.debug(f"Received command: {payload}")
self.do_command(*payload.split(":"))
self.timer.cancel()
self.timer = Timer(1, self.publish_state)
self.timer.start()
def do_command(self, cmd: str, value: str = "") -> None:
pass
def on_connect(self, *args: Any, **kwargs: Any) -> None:
log.info("Connected to MQTT broker")
self.publish_discovery()
self.publish_availability()
self.init_subs()
self.timer.start()
@property
def state_topic(self) -> str:
return f"{self.node_id}/state"
@property
def availability_topic(self) -> str:
return f"{self.node_id}/availability"
@property
def command_topic(self) -> str:
return f"{self.node_id}/set"
@property
def discovery_topic(self) -> str:
return f"homeassistant/device/{self.node_id}/config"
@property
def discovery_payload(self) -> dict[str, Any]:
return {
"dev": {
"ids": self.node_id,
"name": self.node_id.capitalize(),
},
"o": {
"name": "HassPy",
"url": "https://git.edgarpierre.fr/edpibu/hasspy",
},
"cmps": self.components,
"availability_topic": self.availability_topic,
"command_topic": self.command_topic,
"state_topic": self.state_topic,
}
@property
def state_payload(self) -> dict[str, Any]:
return {}
@property
def components(self) -> dict[str, dict[str, Any]]:
return {}
class HassSystemClient(HassClient):
commands = {
"POWER_ON": ["systemctl", "poweroff", "--when=cancel"],
"POWER_OFF": ["systemctl", "poweroff", "--when=+1m"],
"LOCK": ["loginctl", "lock-sessions"],
}
def do_command(self, cmd: str, value: str = "") -> None:
if cmd in self.commands:
log.debug(f"Executing command: {cmd}")
run_command(self.commands[cmd])
if cmd == "POWER_ON":
self.power_on = True
elif cmd == "POWER_OFF":
self.power_on = False
@property
def components(self) -> dict[str, dict[str, Any]]:
return {
"power": {
"unique_id": f"{self.node_id}_power",
"p": "switch",
"name": "Power",
"icon": "mdi:power",
"payload_off": "POWER_OFF",
"payload_on": "POWER_ON",
"value_template": "{{ value_json.power }}",
},
"lock": {
"unique_id": f"{self.node_id}_lock",
"p": "button",
"name": "Lock",
"icon": "mdi:account-lock",
"payload_press": "LOCK",
},
}
@property
def state_payload(self) -> dict[str, Any]:
return {
"power": "POWER_ON" if self.power_on else "POWER_OFF",
}
class HassUserClient(HassClient):
commands = {
"PLAY_PAUSE": ["playerctl", "play-pause"],
"PLAY_NEXT": ["playerctl", "next"],
"PLAY_PREV": ["playerctl", "previous"],
"PLAY_STOP": ["playerctl", "stop"],
}
def __init__(self, node_id: str, config: Mapping[str, Any]) -> None:
super().__init__(f"{node_id}", config)
def do_command(self, cmd: str, value: str = "") -> None:
if cmd in self.commands:
log.debug(f"Executing command: {cmd}")
run_command(self.commands[cmd])
match [cmd, value]:
case ["VOLUME", value]:
log.debug(f"Executing command: {cmd}:{value}")
run_command(
[
"wpctl",
"set-volume",
"@DEFAULT_AUDIO_SINK@",
f"{int(value) / 100:.2f}",
]
)
@property
def availability_topic(self) -> str:
return f"{self.node_id}/user/availability"
@property
def components(self) -> dict[str, dict[str, Any]]:
return {
"play-pause": {
"unique_id": f"{self.node_id}_play_pause",
"p": "button",
"name": "Play/Pause",
"icon": "mdi:play-pause",
"payload_press": "PLAY_PAUSE",
},
"next": {
"unique_id": f"{self.node_id}_next",
"p": "button",
"name": "Next",
"icon": "mdi:skip-next",
"payload_press": "PLAY_NEXT",
},
"prev": {
"unique_id": f"{self.node_id}_prev",
"p": "button",
"name": "Previous",
"icon": "mdi:skip-previous",
"payload_press": "PLAY_PREV",
},
"stop": {
"unique_id": f"{self.node_id}_stop",
"p": "button",
"name": "Stop",
"icon": "mdi:stop",
"payload_press": "PLAY_STOP",
},
"player": {
"unique_id": f"{self.node_id}_player",
"p": "sensor",
"name": "Player",
"icon": "mdi:music",
"value_template": "{{ value_json.player.value }}",
"json_attributes_topic": self.state_topic,
"json_attributes_template": "{{ value_json.player.attributes | to_json }}",
},
"cover": {
"unique_id": f"{self.node_id}_cover",
"p": "image",
"name": "Cover",
"icon": "mdi:disc-player",
"content_type": "image/webp",
"image_topic": self.cover_topic,
},
"volume": {
"unique_id": f"{self.node_id}_volume",
"p": "number",
"name": "Volume",
"icon": "mdi:volume-high",
"command_template": "VOLUME:{{ value }}",
"step": 10,
"min": 0,
"max": 100,
"unit_of_measurement": "%",
"value_template": "{{ value_json.volume }}",
},
}
@property
def state_payload(self) -> dict[str, Any]:
return {
"volume": self.volume_value,
"player": self.player_value,
}
@property
def volume_value(self) -> int:
vol = run_command(["wpctl", "get-volume", "@DEFAULT_AUDIO_SINK@"])
return int(float(vol.split(": ")[1]) * 100)
@property
def player_value(self) -> str | dict[str, str | dict[str, str]]:
return {
"value": run_command(["playerctl", "status"]),
"attributes": {
k: run_command(["playerctl", "metadata", k])
for k in ["title", "album", "artist"]
},
}
def publish_state(self) -> MQTTMessageInfo:
Thread(target=self.publish_cover).start()
return super().publish_state()
@property
def cover_topic(self) -> str:
return f"{self.node_id}/image/cover"
def publish_cover(self) -> None:
log.debug("Publishing cover image")
out = run_command(["playerctl", "metadata"])
artUrl = re.compile(r"mpris:artUrl\s+file://(.*)").search(out)
if not artUrl:
return
art = artUrl.group(1)
if art == self.cover:
return
self.cover = art
by = io.BytesIO()
with Image.open(art) as im:
im.save(by, format="webp")
by.seek(0)
self.publish(self.cover_topic, by.read())
def run_command(cmd: list[str]) -> str:
proc = run(cmd, capture_output=True)
if proc.returncode != 0:
log.error(f"Failed to execute command: {cmd}")
return "null"
return proc.stdout.decode("utf-8")