Reorganize code

This commit is contained in:
Edgar P. Burkhart 2024-06-09 13:11:34 +02:00
parent 3ef379b0b7
commit 2bb3159385
Signed by: edpibu
GPG key ID: 9833D3C5A25BD227
5 changed files with 243 additions and 70 deletions

View file

@ -2,12 +2,13 @@ import tomllib
from datetime import date, datetime, timedelta, timezone
import json
from threading import Timer
import psycopg2
from sense_hat import SenseHat
from signal import pause
from urllib import request, parse
from .tempo import TempoAPI, convert_color
from .tempo import TempoAPI
from .logger import SenseLogger
from .info import InfoPanel
with open("config.toml", "rb") as config_file:
config = tomllib.load(config_file)
@ -16,71 +17,7 @@ sense = SenseHat()
tempo_api = TempoAPI(config.get("rte_api_key"))
def show_trash():
if date.today().isocalendar().week % 2:
sense.show_message("DR", text_colour=[255, 255, 0])
else:
sense.show_message("OM", text_colour=[0, 127, 0])
sense_logger = SenseLogger(sense, "dbname=tsdb user=edpibu host=/run/postgresql")
info_panel = InfoPanel(sense, tempo_api)
def show_temp():
sense.show_message(f"{sense.temperature:.1f}", text_colour=[255, 0, 255])
def show_humidity():
sense.show_message(f"{sense.humidity:.1f}", text_colour=[0, 0, 255])
def show_tempo():
tempo_colors = [convert_color(c) for c in tempo_api.tempo]
sense.set_pixels([tempo_colors[1]] * 48 + [tempo_colors[0]] * 16)
tc = Timer(5, sense.clear)
tc.start()
def stick_loop():
event = sense.stick.wait_for_event(emptybuffer=True)
match event.direction:
case "up":
show_trash()
case "right":
show_temp()
case "left":
show_humidity()
case "down":
show_tempo()
case "middle":
sense.clear()
ts = Timer(0, stick_loop)
ts.start()
def save_loop(conn):
tp = Timer(30, save_loop, (conn,))
tp.start()
dt = datetime.now()
temp = sense.temperature
pres = sense.pressure
humi = sense.humidity
print(dt)
cursor = conn.cursor()
INS = "INSERT INTO sensor_data (time, sensor, value) VALUES (%s, %s, %s);"
try:
cursor.execute(INS, (dt, "temp", temp))
cursor.execute(INS, (dt, "pres", pres))
cursor.execute(INS, (dt, "humi", humi))
except (Exception, psycopg2.Error) as error:
print(error.pgerror)
conn.commit()
with psycopg2.connect(database="tsdb", user="edpibu", host="/run/postgresql") as conn:
tp = Timer(0, save_loop, (conn,))
tp.start()
ts = Timer(0, stick_loop)
ts.start()
sense.show_message(">")

65
oin/info/__init__.py Normal file
View file

@ -0,0 +1,65 @@
from threading import Thread, Timer
from datetime import date
from .utils import num_to_pixels
from ..tempo import convert_color
class InfoPanel:
def __init__(self, sense, tempo_api, rotation=0):
self.sense = sense
self.stick = self.sense.stick
self.rotation = rotation
self.sense.set_rotation(self.rotation)
self.tempo_api = tempo_api
self.start_stick_info()
self.clear_timer = Timer(5, self.sense.clear)
def start_stick_info(self):
self.thread = Thread(target=self.stick_info)
self.thread.start()
def start_clear_timer(self):
self.clear_timer.cancel()
self.clear_timer = Timer(5, self.sense.clear)
self.clear_timer.start()
def stick_info(self):
event = self.stick.wait_for_event(emptybuffer=True)
self.start_stick_info()
match event.direction:
case "up":
self.show_trash()
case "right":
self.show_temp()
case "left":
self.show_humidity()
case "down":
self.show_tempo()
case "middle":
self.sense.clear()
self.start_clear_timer()
def show_trash(self):
today = date.today().isocalendar()
if (today.week % 2) ^ (today.weekday >= 4):
self.sense.set_pixels(64 * [[255, 255, 0]])
else:
grid_line = 4 * [[0, 127, 0], [0, 15, 0]]
self.sense.set_pixels(4 * (grid_line + grid_line[::-1]))
def show_temp(self):
self.sense.set_pixels(
num_to_pixels(self.sense.temperature, color=[255, 0, 255])
)
def show_humidity(self):
self.sense.set_pixels(num_to_pixels(self.sense.humidity, color=[0, 255, 255]))
def show_tempo(self):
tempo_colors = [convert_color(c) for c in self.tempo_api.tempo]
self.sense.set_pixels([tempo_colors[1]] * 48 + [tempo_colors[0]] * 16)

133
oin/info/utils.py Normal file
View file

@ -0,0 +1,133 @@
from itertools import chain
def digit_to_pixels(n, color=[255, 255, 255], bg_color=[0, 0, 0]):
X = color
O = bg_color
match n:
case "0":
return [
[O, O, O, O],
[O, X, X, O],
[X, O, O, X],
[X, O, O, X],
[X, O, O, X],
[X, O, O, X],
[O, X, X, O],
[O, O, O, O],
]
case "1":
return [
[O, O, O, O],
[O, O, O, X],
[O, O, X, X],
[O, X, O, X],
[O, O, O, X],
[O, O, O, X],
[O, O, O, X],
[O, O, O, O],
]
case "2":
return [
[O, O, O, O],
[O, X, X, O],
[X, O, O, X],
[O, O, O, X],
[O, O, X, O],
[O, X, O, O],
[X, X, X, X],
[O, O, O, O],
]
case "3":
return [
[O, O, O, O],
[X, X, X, X],
[O, O, O, X],
[O, O, X, O],
[O, O, O, X],
[X, O, O, X],
[O, X, X, O],
[O, O, O, O],
]
case "4":
return [
[O, O, O, O],
[O, O, O, X],
[O, O, X, X],
[O, X, O, X],
[X, O, X, X],
[X, X, X, X],
[O, O, O, X],
[O, O, O, O],
]
case "5":
return [
[O, O, O, O],
[X, X, X, X],
[X, O, O, O],
[X, X, X, O],
[O, O, O, X],
[X, O, O, X],
[O, X, X, O],
[O, O, O, O],
]
case "6":
return [
[O, O, O, O],
[O, X, X, X],
[X, O, O, O],
[X, X, X, O],
[X, O, O, X],
[X, O, O, X],
[O, X, X, O],
[O, O, O, O],
]
case "7":
return [
[O, O, O, O],
[X, X, X, X],
[O, O, O, X],
[O, O, X, O],
[O, X, O, O],
[X, O, O, O],
[X, O, O, O],
[O, O, O, O],
]
case "8":
return [
[O, O, O, O],
[O, X, X, O],
[X, O, O, X],
[O, X, X, O],
[X, O, O, X],
[X, O, O, X],
[O, X, X, O],
[O, O, O, O],
]
case "9":
return [
[O, O, O, O],
[O, X, X, O],
[X, O, O, X],
[X, O, O, X],
[O, X, X, X],
[O, O, O, X],
[O, X, X, O],
[O, O, O, O],
]
case _:
return 8 * [4 * [O]]
def num_to_pixels(n, color=[255, 255, 255], bg_color=[0, 0, 0]):
digits = f"{n:02.0f}"
if len(digits) > 2:
digits = "XX"
return list(
chain.from_iterable(
chain.from_iterable(
zip(*[digit_to_pixels(i, color, bg_color) for i in digits])
)
)
)

40
oin/logger/__init__.py Normal file
View file

@ -0,0 +1,40 @@
from datetime import date, datetime, timedelta, timezone
from threading import Timer
import psycopg
from psycopg_pool import ConnectionPool
class SenseLogger:
def __init__(self, sense, conninfo):
self.sense = sense
self.database_pool = ConnectionPool(conninfo)
self.timer = Timer(0, self.log)
self.timer.start()
def log(self):
self.timer = Timer(30, self.log)
self.timer.start()
dt = datetime.now()
temp = self.sense.temperature
pres = self.sense.pressure
humi = self.sense.humidity
print(dt)
INS = "INSERT INTO sensor_data (time, sensor, value) VALUES (%s, %s, %s);"
conn = self.database_pool.getconn()
try:
with conn.cursor() as cursor:
cursor.execute(INS, (dt, "temp", temp))
cursor.execute(INS, (dt, "pres", pres))
cursor.execute(INS, (dt, "humi", humi))
conn.commit()
except:
conn.rollback()
raise
finally:
self.database_pool.putconn(conn)

View file

@ -51,8 +51,6 @@ class TempoAPI:
) as res:
api_data = json.load(res)
print(api_data.get("tempo_like_calendars").get("values"))
return [
val.get("value")
for val in api_data.get("tempo_like_calendars").get("values")