hasspy/hasspy/mqtt.py

107 lines
3.6 KiB
Python

import json
from subprocess import run
from typing import Any, Mapping
from paho.mqtt.client import Client, MQTTMessage, MQTTMessageInfo
from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
class HassClient(Client):
def __init__(self, node_id: str, config: Mapping[str, Any]) -> None:
super().__init__(CallbackAPIVersion.VERSION2)
self.node_id = node_id
self.config = config
self.state_topic = f"{self.node_id}/state"
self.availability_topic = f"{self.node_id}/availability"
self.command_topic = f"{self.node_id}/set"
self.discovery_topic = f"homeassistant/device/{self.node_id}/config"
username = self.config.get("username")
if username:
self.username_pw_set(username, self.config.get("password"))
self.power_on = True
self.connect()
def connect(self, *args: Any, **kwargs: Any) -> MQTTErrorCode:
self.will_set(self.availability_topic, "offline", retain=True)
return super().connect(
self.config.get("host", ""), self.config.get("port", 1883)
)
def publish(self, *args: Any, **kwargs: Any) -> MQTTMessageInfo:
kwargs.setdefault("qos", 1)
kwargs.setdefault("retain", True)
return super().publish(*args, **kwargs)
def publish_json(self, *args: Any, **kwargs: Any) -> MQTTMessageInfo:
kwargs["payload"] = json.dumps(kwargs["payload"])
return self.publish(*args, **kwargs)
def publish_discovery(self) -> MQTTMessageInfo:
return self.publish_json(self.discovery_topic, payload=self.discovery_payload)
def publish_availability(self) -> MQTTMessageInfo:
return self.publish(self.availability_topic, payload="online")
def init_subs(self) -> None:
self.subscribe(self.command_topic)
self.message_callback_add(self.command_topic, self.on_command)
def publish_state(self) -> MQTTMessageInfo:
return self.publish_json(
topic=self.state_topic,
payload={
"power": "POWER_ON" if self.power_on else "POWER_OFF",
},
)
def on_connect(self, *args: Any, **kwargs: Any) -> None:
self.publish_discovery()
self.publish_availability()
self.init_subs()
self.publish_state()
def on_command(self, client: Client, userdata: Any, message: MQTTMessage) -> None:
payload = message.payload.decode("utf-8")
if not self.power_on and payload == "POWER_ON":
print("Cancelling shutdown…")
self.power_on = True
run(["systemctl", "poweroff", "--when=cancel"])
elif self.power_on and payload == "POWER_OFF":
print("Powering off…")
self.power_on = False
run(["systemctl", "poweroff", "--when=+1m"])
self.publish_state()
@property
def discovery_payload(self) -> dict[str, Any]:
return {
"dev": {
"ids": self.node_id,
"name": self.node_id,
},
"o": {
"name": "hasspy",
},
"cmps": {
"power": {
"unique_id": f"{self.node_id}_power",
"p": "switch",
"name": "Power",
"payload_off": "POWER_OFF",
"payload_on": "POWER_ON",
"value_template": "{{ value_json.power }}",
},
},
"availability_topic": self.availability_topic,
"command_topic": self.command_topic,
"state_topic": self.state_topic,
}