hasspy/hasspy/mqtt.py

196 lines
6 KiB
Python

import getpass
import json
import logging
from subprocess import run
from typing import Any, Mapping
from paho.mqtt.client import Client, MQTTMessage, MQTTMessageInfo
from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
log = logging.getLogger(__name__)
class HassClient(Client):
def __init__(self, node_id: str, config: Mapping[str, Any]) -> None:
super().__init__(CallbackAPIVersion.VERSION2)
self.node_id = node_id
self.config = config
username = self.config.get("username")
if username:
self.username_pw_set(username, self.config.get("password"))
self.power_on = True
self.connect()
def connect(self, *args: Any, **kwargs: Any) -> MQTTErrorCode:
log.debug("Connecting to MQTT broker")
self.will_set(self.availability_topic, "offline", retain=True)
return super().connect(
self.config.get("host", ""), self.config.get("port", 1883)
)
def publish(self, *args: Any, **kwargs: Any) -> MQTTMessageInfo:
kwargs.setdefault("qos", 1)
kwargs.setdefault("retain", True)
return super().publish(*args, **kwargs)
def publish_json(self, *args: Any, **kwargs: Any) -> MQTTMessageInfo:
kwargs["payload"] = json.dumps(kwargs["payload"])
return self.publish(*args, **kwargs)
def publish_discovery(self) -> MQTTMessageInfo:
log.debug("Publishing discovery message")
return self.publish_json(self.discovery_topic, payload=self.discovery_payload)
def publish_availability(self) -> MQTTMessageInfo:
log.debug("Publishing availability message")
return self.publish(self.availability_topic, payload="online")
def init_subs(self) -> None:
log.debug("Initializing subscriptions")
self.subscribe(self.command_topic)
self.message_callback_add(self.command_topic, self.on_command)
def publish_state(self) -> MQTTMessageInfo:
return self.publish_json(
topic=self.state_topic,
payload=self.state_payload,
)
def on_command(self, client: Client, userdata: Any, message: MQTTMessage) -> None:
payload = message.payload.decode("utf-8")
log.debug(f"Received command: {payload}")
self.do_command(payload)
self.publish_state()
def do_command(self, payload: str) -> None:
log.debug(f"Executing command: {payload}")
def on_connect(self, *args: Any, **kwargs: Any) -> None:
log.info("Connected to MQTT broker")
self.publish_discovery()
self.publish_availability()
self.init_subs()
self.publish_state()
@property
def state_topic(self) -> str:
return f"{self.node_id}/state"
@property
def availability_topic(self) -> str:
return f"{self.node_id}/availability"
@property
def command_topic(self) -> str:
return f"{self.node_id}/set"
@property
def discovery_topic(self) -> str:
return f"homeassistant/device/{self.node_id}/config"
@property
def discovery_payload(self) -> dict[str, Any]:
return {
"dev": {
"ids": self.node_id,
"name": self.node_id,
},
"o": {
"name": "hasspy",
},
"cmps": self.components,
"availability_topic": self.availability_topic,
"command_topic": self.command_topic,
"state_topic": self.state_topic,
}
@property
def state_payload(self) -> dict[str, Any]:
return {}
@property
def components(self) -> dict[str, dict[str, str]]:
return {}
class HassSystemClient(HassClient):
def do_command(self, payload: str) -> None:
super().do_command(payload)
match payload:
case "POWER_ON":
if not self.power_on:
log.info("Cancelling shutdown…")
self.power_on = True
proc = run(["systemctl", "poweroff", "--when=cancel"])
if proc.returncode != 0:
log.error("Failed to cancel shutdown")
case "POWER_OFF":
if self.power_on:
log.info("Powering off…")
self.power_on = False
proc = run(["systemctl", "poweroff", "--when=+1m"])
if proc.returncode != 0:
log.error("Failed to schedule shutdown")
case "LOCK":
log.info("Locking screen…")
run(["loginctl", "lock-sessions"])
@property
def components(self) -> dict[str, dict[str, str]]:
return {
"power": {
"unique_id": f"{self.node_id}_power",
"p": "switch",
"name": "Power",
"payload_off": "POWER_OFF",
"payload_on": "POWER_ON",
"value_template": "{{ value_json.power }}",
},
"lock": {
"unique_id": f"{self.node_id}_power",
"p": "button",
"name": "Lock",
"payload_press": "LOCK",
},
}
@property
def state_payload(self) -> dict[str, Any]:
return {
"power": "POWER_ON" if self.power_on else "POWER_OFF",
}
class HassUserClient(HassClient):
def __init__(self, node_id: str, config: Mapping[str, Any]) -> None:
super().__init__(f"{node_id}_{getpass.getuser()}", config)
def do_command(self, payload: str) -> None:
super().do_command(payload)
match payload:
case "PLAY_PAUSE":
log.info("Toggling play/pause…")
run(["playerctl", "play-pause"])
@property
def components(self) -> dict[str, dict[str, str]]:
return {
"play-pause": {
"unique_id": f"{self.node_id}_play_pause",
"p": "button",
"name": "Play/Pause",
"payload_press": "PLAY_PAUSE",
},
}