Add log level configuration and improve logging in MQTT client

This commit is contained in:
Edgar P. Burkhart 2025-03-09 11:54:33 +01:00
parent be2381429f
commit cbb6c71f09
Signed by: edpibu
GPG key ID: 9833D3C5A25BD227
3 changed files with 44 additions and 12 deletions

View file

@ -2,3 +2,5 @@ host ="homeassistant.local"
port = 1883
username = "hasspy"
password = "password"
log_level = "INFO"

View file

@ -1,27 +1,46 @@
import logging
import tomllib
from argparse import ArgumentParser
from pathlib import Path
from hasspy.mqtt import HassClient
def main() -> None:
logging.basicConfig(level=logging.INFO)
config_file = next(
(
x
for x in (Path("/etc/hasspy.toml"), Path("/etc/hasspy/config.toml"))
if x.exists()
),
None,
parser = ArgumentParser(
prog="HassPy",
description="Home Assistant MQTT client",
)
if not config_file:
parser.add_argument("-c", "--config", help="Path to configuration file")
parser.add_argument(
"-v", "--verbose", help="Enable verbose logging", action="count", default=0
)
args = parser.parse_args()
if args.config:
config_file = Path(args.config)
else:
config_file = next(
(
x
for x in (Path("/etc/hasspy.toml"), Path("/etc/hasspy/config.toml"))
if x.exists()
),
Path("config.toml"),
)
if not config_file or not config_file.exists():
raise FileNotFoundError("No configuration file found")
with open(config_file, "rb") as file:
config = tomllib.load(file)
if isinstance(config.get("log_level"), str):
config["log_level"] = getattr(logging, config["log_level"])
logging.basicConfig(
level=config.get("log_level", logging.INFO) - (args.verbose * 10)
)
ha = HassClient(
"orchomenos",
config=config,

View file

@ -30,6 +30,7 @@ class HassClient(Client):
self.connect()
def connect(self, *args: Any, **kwargs: Any) -> MQTTErrorCode:
log.debug("Connecting to MQTT broker")
self.will_set(self.availability_topic, "offline", retain=True)
return super().connect(
@ -46,12 +47,15 @@ class HassClient(Client):
return self.publish(*args, **kwargs)
def publish_discovery(self) -> MQTTMessageInfo:
log.debug("Publishing discovery message")
return self.publish_json(self.discovery_topic, payload=self.discovery_payload)
def publish_availability(self) -> MQTTMessageInfo:
log.debug("Publishing availability message")
return self.publish(self.availability_topic, payload="online")
def init_subs(self) -> None:
log.debug("Initializing subscriptions")
self.subscribe(self.command_topic)
self.message_callback_add(self.command_topic, self.on_command)
@ -73,18 +77,25 @@ class HassClient(Client):
def on_command(self, client: Client, userdata: Any, message: MQTTMessage) -> None:
payload = message.payload.decode("utf-8")
log.debug(f"Received command: {payload}")
match payload:
case "POWER_ON":
if not self.power_on:
log.info("Cancelling shutdown…")
self.power_on = True
run(["systemctl", "poweroff", "--when=cancel"])
proc = run(["systemctl", "poweroff", "--when=cancel"])
if proc.returncode != 0:
log.error("Failed to cancel shutdown")
case "POWER_OFF":
if self.power_on:
log.info("Powering off…")
self.power_on = False
run(["systemctl", "poweroff", "--when=+1m"])
proc = run(["systemctl", "poweroff", "--when=+1m"])
if proc.returncode != 0:
log.error("Failed to schedule shutdown")
self.publish_state()