hasspy/hasspy/mqtt.py

423 lines
13 KiB
Python

import io
import json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from subprocess import run
from threading import Thread, Timer
from typing import Any, Mapping, Tuple
from paho.mqtt.client import Client, MQTTMessage, MQTTMessageInfo
from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
from PIL import Image
log = logging.getLogger(__name__)
class HassClient(Client):
def __init__(self, node_id: str, config: Mapping[str, Any]) -> None:
super().__init__(CallbackAPIVersion.VERSION2)
self.node_id = node_id
self.config = config
username = self.config.get("username")
if username:
self.username_pw_set(username, self.config.get("password"))
self.interval = self.config.get("interval", 60)
self.timer = Timer(0, self.publish_state)
self.cover = ""
self.connect()
def connect(self, *args: Any, **kwargs: Any) -> MQTTErrorCode:
log.debug("Connecting to MQTT broker")
self.will_set(self.availability_topic, "offline", retain=True)
return super().connect(
self.config.get("host", ""), self.config.get("port", 1883)
)
def publish(self, *args: Any, **kwargs: Any) -> MQTTMessageInfo:
kwargs.setdefault("qos", 1)
kwargs.setdefault("retain", True)
return super().publish(*args, **kwargs)
def publish_json(self, *args: Any, **kwargs: Any) -> MQTTMessageInfo:
kwargs["payload"] = json.dumps(kwargs["payload"])
return self.publish(*args, **kwargs)
def publish_discovery(self) -> MQTTMessageInfo:
log.debug("Publishing discovery message")
return self.publish_json(self.discovery_topic, payload=self.discovery_payload)
def publish_availability(self) -> MQTTMessageInfo:
log.debug("Publishing availability message")
return self.publish(self.availability_topic, payload="online")
def init_subs(self) -> None:
log.debug("Initializing subscriptions")
self.subscribe(self.command_topic)
self.message_callback_add(self.command_topic, self.on_command)
def publish_state(self) -> MQTTMessageInfo:
self.timer = Timer(self.interval, self.publish_state)
self.timer.start()
log.debug("Publishing state message")
return self.publish_json(
topic=self.state_topic,
payload=self.state_payload,
)
def on_command(self, client: Client, userdata: Any, message: MQTTMessage) -> None:
payload = message.payload.decode("utf-8")
log.debug(f"Received command: {payload}")
self.do_command(*payload.split(":"))
self.timer.cancel()
self.timer = Timer(1, self.publish_state)
self.timer.start()
def do_command(self, cmd: str, value: str = "") -> None:
pass
def on_connect(self, *args: Any, **kwargs: Any) -> None:
log.info("Connected to MQTT broker")
self.publish_discovery()
self.publish_availability()
self.init_subs()
self.timer.start()
@property
def state_topic(self) -> str:
return f"{self.node_id}/state"
@property
def availability_topic(self) -> str:
return f"{self.node_id}/availability"
@property
def command_topic(self) -> str:
return f"{self.node_id}/set"
@property
def discovery_topic(self) -> str:
return f"homeassistant/device/{self.node_id}/config"
@property
def discovery_payload(self) -> dict[str, Any]:
return {
"dev": {
"ids": self.node_id,
"name": self.node_id.capitalize(),
},
"o": {
"name": "HassPy",
"url": "https://git.edgarpierre.fr/edpibu/hasspy",
},
"cmps": self.components,
"availability_topic": self.availability_topic,
"command_topic": self.command_topic,
"state_topic": self.state_topic,
}
@property
def state_payload(self) -> dict[str, Any]:
return {}
@property
def components(self) -> dict[str, dict[str, Any]]:
return {}
class HassSystemClient(HassClient):
commands = {
"POWER_ON": ["systemctl", "poweroff", "--when=cancel"],
"POWER_OFF": ["systemctl", "poweroff", "--when=+1m"],
"LOCK": ["loginctl", "lock-sessions"],
"SUSPEND": ["systemctl", "suspend"],
}
def do_command(self, cmd: str, value: str = "") -> None:
if cmd in self.commands:
log.debug(f"Executing command: {cmd}")
code, _ = run_command(self.commands[cmd])
if code != 0:
log.error(f"Failed to execute command: {cmd}")
match cmd:
case "POWER_OFF":
self.publish_notify("System will be shutting down in 1 minute.")
case "POWER_ON":
self.publish_notify("Shutdown cancelled.")
def publish_notify(
self, payload: str, *args: Any, **kwargs: Any
) -> MQTTMessageInfo:
kwargs.setdefault("retain", False)
return self.publish(f"{self.node_id}/user/notify", payload, *args, **kwargs)
@property
def state_topic(self) -> str:
return f"{self.node_id}/system/state"
@property
def components(self) -> dict[str, dict[str, Any]]:
return {
"power": {
"unique_id": f"{self.node_id}_power",
"p": "switch",
"name": "Power",
"icon": "mdi:power",
"payload_off": "POWER_OFF",
"payload_on": "POWER_ON",
"value_template": "{{ value_json.power }}",
},
"lock": {
"unique_id": f"{self.node_id}_lock",
"p": "button",
"name": "Lock",
"icon": "mdi:account-lock",
"payload_press": "LOCK",
},
"suspend": {
"unique_id": f"{self.node_id}_suspend",
"p": "button",
"name": "Suspend",
"icon": "mdi:sleep",
"payload_press": "SUSPEND",
},
"uptime": {
"unique_id": f"{self.node_id}_uptime",
"p": "sensor",
"name": "Uptime",
"icon": "mdi:clock",
"device_class": "timestamp",
"value_template": "{{ value_json.uptime|timestamp_utc }}",
},
}
@property
def state_payload(self) -> dict[str, Any]:
return {
"power": "POWER_OFF"
if Path("/run/systemd/shutdown/scheduled").exists()
else "POWER_ON",
"uptime": self.uptime_value,
}
@property
def uptime_value(self) -> float | None:
code, out = run_command(["uptime", "--since"])
if code != 0:
log.error("Failed to get uptime")
return None
return datetime.fromisoformat(out.strip()).astimezone(timezone.utc).timestamp()
class HassUserClient(HassClient):
commands = {
"PLAY_PAUSE": ["playerctl", "play-pause"],
"PLAY_NEXT": ["playerctl", "next"],
"PLAY_PREV": ["playerctl", "previous"],
"PLAY_STOP": ["playerctl", "stop"],
}
def __init__(self, node_id: str, config: Mapping[str, Any]) -> None:
super().__init__(f"{node_id}", config)
def do_command(self, cmd: str, value: str = "") -> None:
if cmd in self.commands:
log.debug(f"Executing command: {cmd}")
code, _ = run_command(self.commands[cmd])
if code != 0:
log.debug(f"Failed to execute command: {cmd}")
match [cmd, value]:
case ["VOLUME", value]:
log.debug(f"Executing command: {cmd}:{value}")
code, _ = run_command(
[
"wpctl",
"set-volume",
"@DEFAULT_AUDIO_SINK@",
f"{int(value) / 100:.2f}",
]
)
if code != 0:
log.error(f"Failed to execute command: {cmd}:{value}")
def init_subs(self) -> None:
super().init_subs()
self.subscribe(self.notify_topic)
self.message_callback_add(self.notify_topic, self.on_notify)
def on_notify(self, client: Client, userdata: Any, message: MQTTMessage) -> None:
payload = message.payload.decode("utf-8")
log.info(f"Received notification: {payload}")
run_command(["notify-send", payload, "-i", "icon.svg"])
print(
[
"notify-send",
payload,
"-i",
"/usr/share/icons/hicolor/scalable/apps/fr.edgarpierre.hasspy.svg",
]
)
@property
def availability_topic(self) -> str:
return f"{self.node_id}/user/availability"
@property
def notify_topic(self) -> str:
return f"{self.node_id}/user/notify"
@property
def state_topic(self) -> str:
return f"{self.node_id}/user/state"
@property
def components(self) -> dict[str, dict[str, Any]]:
return {
"play-pause": {
"unique_id": f"{self.node_id}_play_pause",
"p": "button",
"name": "Play/Pause",
"icon": "mdi:play-pause",
"payload_press": "PLAY_PAUSE",
},
"next": {
"unique_id": f"{self.node_id}_next",
"p": "button",
"name": "Next",
"icon": "mdi:skip-next",
"payload_press": "PLAY_NEXT",
},
"prev": {
"unique_id": f"{self.node_id}_prev",
"p": "button",
"name": "Previous",
"icon": "mdi:skip-previous",
"payload_press": "PLAY_PREV",
},
"stop": {
"unique_id": f"{self.node_id}_stop",
"p": "button",
"name": "Stop",
"icon": "mdi:stop",
"payload_press": "PLAY_STOP",
},
"player": {
"unique_id": f"{self.node_id}_player",
"p": "sensor",
"name": "Player",
"icon": "mdi:music",
"value_template": "{{ value_json.player.value }}",
"json_attributes_topic": self.state_topic,
"json_attributes_template": "{{ value_json.player.attributes | to_json }}",
},
"cover": {
"unique_id": f"{self.node_id}_cover",
"p": "image",
"name": "Cover",
"icon": "mdi:disc-player",
"content_type": "image/webp",
"image_topic": self.cover_topic,
},
"volume": {
"unique_id": f"{self.node_id}_volume",
"p": "number",
"name": "Volume",
"icon": "mdi:volume-high",
"command_template": "VOLUME:{{ value }}",
"step": 10,
"min": 0,
"max": 100,
"unit_of_measurement": "%",
"value_template": "{{ value_json.volume }}",
},
}
@property
def state_payload(self) -> dict[str, Any]:
return {
"volume": self.volume_value,
"player": self.player_value,
}
@property
def volume_value(self) -> int | str:
code, vol = run_command(["wpctl", "get-volume", "@DEFAULT_AUDIO_SINK@"])
if code != 0:
log.error("Failed to get volume")
return "none"
return int(float(vol.split(": ")[1]) * 100)
@property
def player_value(self) -> str | dict[str, str | dict[str, str]]:
code, value = run_command(["playerctl", "status"])
attrs = dict()
if code == 0:
for k in ["title", "album", "artist"]:
code, v = run_command(["playerctl", "metadata", k])
if code == 0:
attrs[k] = v
else:
log.error(f"Failed to get metadata: {k}")
else:
log.debug("Player is not running")
return {
"value": value,
"attributes": attrs,
}
def publish_state(self) -> MQTTMessageInfo:
Thread(target=self.publish_cover).start()
return super().publish_state()
@property
def cover_topic(self) -> str:
return f"{self.node_id}/image/cover"
def publish_cover(self) -> None:
log.debug("Publishing cover image")
code, out = run_command(["playerctl", "metadata"])
if code != 0:
return
artUrl = re.compile(r"mpris:artUrl\s+file://(.*)").search(out)
if not artUrl:
return
art = artUrl.group(1)
if art == self.cover:
return
self.cover = art
by = io.BytesIO()
with Image.open(art) as im:
im.save(by, format="webp")
by.seek(0)
self.publish(self.cover_topic, by.read())
def run_command(cmd: list[str]) -> Tuple[int, str]:
proc = run(cmd, capture_output=True)
if proc.returncode != 0:
return proc.returncode, ""
return proc.returncode, proc.stdout.decode("utf-8")