Enhance command execution with error handling and return codes

This commit is contained in:
Edgar P. Burkhart 2025-03-09 20:06:36 +01:00
parent e54223e361
commit 8a6fb77de3
Signed by: edpibu
GPG key ID: 9833D3C5A25BD227

View file

@ -4,7 +4,7 @@ import logging
import re import re
from subprocess import run from subprocess import run
from threading import Thread, Timer from threading import Thread, Timer
from typing import Any, Mapping from typing import Any, Mapping, Tuple
from paho.mqtt.client import Client, MQTTMessage, MQTTMessageInfo from paho.mqtt.client import Client, MQTTMessage, MQTTMessageInfo
from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode from paho.mqtt.enums import CallbackAPIVersion, MQTTErrorCode
@ -145,7 +145,9 @@ class HassSystemClient(HassClient):
def do_command(self, cmd: str, value: str = "") -> None: def do_command(self, cmd: str, value: str = "") -> None:
if cmd in self.commands: if cmd in self.commands:
log.debug(f"Executing command: {cmd}") log.debug(f"Executing command: {cmd}")
run_command(self.commands[cmd]) code, _ = run_command(self.commands[cmd])
if code != 0:
log.error(f"Failed to execute command: {cmd}")
if cmd == "POWER_ON": if cmd == "POWER_ON":
self.power_on = True self.power_on = True
@ -194,13 +196,15 @@ class HassUserClient(HassClient):
def do_command(self, cmd: str, value: str = "") -> None: def do_command(self, cmd: str, value: str = "") -> None:
if cmd in self.commands: if cmd in self.commands:
log.debug(f"Executing command: {cmd}") log.debug(f"Executing command: {cmd}")
run_command(self.commands[cmd]) code, _ = run_command(self.commands[cmd])
if code != 0:
log.debug(f"Failed to execute command: {cmd}")
match [cmd, value]: match [cmd, value]:
case ["VOLUME", value]: case ["VOLUME", value]:
log.debug(f"Executing command: {cmd}:{value}") log.debug(f"Executing command: {cmd}:{value}")
run_command( code, _ = run_command(
[ [
"wpctl", "wpctl",
"set-volume", "set-volume",
@ -208,6 +212,8 @@ class HassUserClient(HassClient):
f"{int(value) / 100:.2f}", f"{int(value) / 100:.2f}",
] ]
) )
if code != 0:
log.error(f"Failed to execute command: {cmd}:{value}")
@property @property
def availability_topic(self) -> str: def availability_topic(self) -> str:
@ -283,19 +289,31 @@ class HassUserClient(HassClient):
} }
@property @property
def volume_value(self) -> int: def volume_value(self) -> int | str:
vol = run_command(["wpctl", "get-volume", "@DEFAULT_AUDIO_SINK@"]) code, vol = run_command(["wpctl", "get-volume", "@DEFAULT_AUDIO_SINK@"])
if code != 0:
log.error("Failed to get volume")
return "none"
return int(float(vol.split(": ")[1]) * 100) return int(float(vol.split(": ")[1]) * 100)
@property @property
def player_value(self) -> str | dict[str, str | dict[str, str]]: def player_value(self) -> str | dict[str, str | dict[str, str]]:
code, value = run_command(["playerctl", "status"])
attrs = dict()
if code == 0:
for k in ["title", "album", "artist"]:
code, v = run_command(["playerctl", "metadata", k])
if code == 0:
attrs[k] = v
else:
log.error(f"Failed to get metadata: {k}")
else:
log.debug("Player is not running")
return { return {
"value": run_command(["playerctl", "status"]), "value": value,
"attributes": { "attributes": attrs,
k: run_command(["playerctl", "metadata", k])
for k in ["title", "album", "artist"]
},
} }
def publish_state(self) -> MQTTMessageInfo: def publish_state(self) -> MQTTMessageInfo:
@ -308,7 +326,9 @@ class HassUserClient(HassClient):
def publish_cover(self) -> None: def publish_cover(self) -> None:
log.debug("Publishing cover image") log.debug("Publishing cover image")
out = run_command(["playerctl", "metadata"]) code, out = run_command(["playerctl", "metadata"])
if code != 0:
return
artUrl = re.compile(r"mpris:artUrl\s+file://(.*)").search(out) artUrl = re.compile(r"mpris:artUrl\s+file://(.*)").search(out)
if not artUrl: if not artUrl:
@ -328,10 +348,9 @@ class HassUserClient(HassClient):
self.publish(self.cover_topic, by.read()) self.publish(self.cover_topic, by.read())
def run_command(cmd: list[str]) -> str: def run_command(cmd: list[str]) -> Tuple[int, str]:
proc = run(cmd, capture_output=True) proc = run(cmd, capture_output=True)
if proc.returncode != 0: if proc.returncode != 0:
log.error(f"Failed to execute command: {cmd}") return proc.returncode, "null"
return "null"
return proc.stdout.decode("utf-8") return proc.returncode, proc.stdout.decode("utf-8")