From 3ef379b0b7f04ae58dabccd47fef7692661d8381 Mon Sep 17 00:00:00 2001 From: "Edgar P. Burkhart" Date: Sun, 9 Jun 2024 10:58:55 +0200 Subject: [PATCH] Initial commit --- .gitignore | 2 + oin/__main__.py | 86 +++++++++++++++++++++++++++++++++++++++++++ oin/tempo/__init__.py | 71 +++++++++++++++++++++++++++++++++++ 3 files changed, 159 insertions(+) create mode 100644 .gitignore create mode 100644 oin/__main__.py create mode 100644 oin/tempo/__init__.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad6bfc1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +/config.toml diff --git a/oin/__main__.py b/oin/__main__.py new file mode 100644 index 0000000..ce90135 --- /dev/null +++ b/oin/__main__.py @@ -0,0 +1,86 @@ +import tomllib +from datetime import date, datetime, timedelta, timezone +import json +from threading import Timer +import psycopg2 +from sense_hat import SenseHat +from signal import pause +from urllib import request, parse + +from .tempo import TempoAPI, convert_color + +with open("config.toml", "rb") as config_file: + config = tomllib.load(config_file) + +sense = SenseHat() +tempo_api = TempoAPI(config.get("rte_api_key")) + + +def show_trash(): + if date.today().isocalendar().week % 2: + sense.show_message("DR", text_colour=[255, 255, 0]) + else: + sense.show_message("OM", text_colour=[0, 127, 0]) + + +def show_temp(): + sense.show_message(f"{sense.temperature:.1f}", text_colour=[255, 0, 255]) + + +def show_humidity(): + sense.show_message(f"{sense.humidity:.1f}", text_colour=[0, 0, 255]) + + +def show_tempo(): + tempo_colors = [convert_color(c) for c in tempo_api.tempo] + sense.set_pixels([tempo_colors[1]] * 48 + [tempo_colors[0]] * 16) + tc = Timer(5, sense.clear) + tc.start() + + +def stick_loop(): + event = sense.stick.wait_for_event(emptybuffer=True) + match event.direction: + case "up": + show_trash() + case "right": + show_temp() + case "left": + show_humidity() + case "down": + show_tempo() + case "middle": + sense.clear() + + ts = Timer(0, stick_loop) + ts.start() + + +def save_loop(conn): + tp = Timer(30, save_loop, (conn,)) + tp.start() + dt = datetime.now() + + temp = sense.temperature + pres = sense.pressure + humi = sense.humidity + + print(dt) + cursor = conn.cursor() + INS = "INSERT INTO sensor_data (time, sensor, value) VALUES (%s, %s, %s);" + try: + cursor.execute(INS, (dt, "temp", temp)) + cursor.execute(INS, (dt, "pres", pres)) + cursor.execute(INS, (dt, "humi", humi)) + except (Exception, psycopg2.Error) as error: + print(error.pgerror) + conn.commit() + + +with psycopg2.connect(database="tsdb", user="edpibu", host="/run/postgresql") as conn: + tp = Timer(0, save_loop, (conn,)) + tp.start() + +ts = Timer(0, stick_loop) +ts.start() +sense.show_message(">") diff --git a/oin/tempo/__init__.py b/oin/tempo/__init__.py new file mode 100644 index 0000000..b7660b7 --- /dev/null +++ b/oin/tempo/__init__.py @@ -0,0 +1,71 @@ +from datetime import date, datetime, timedelta, timezone +from threading import Timer +from urllib import request, parse +import json + + +class TempoAPI: + def __init__(self, api_key): + self.rte_api_key = api_key + self.update_rte_access_token() + + def update_rte_access_token(self): + with request.urlopen( + request.Request( + "https://digital.iservices.rte-france.com/token/oauth/", + headers={ + "Authorization": f"Basic {self.rte_api_key}", + "Content-Type": "application/x-www-form-urlencoded", + }, + method="POST", + ) + ) as res: + data = json.load(res) + + self.rte_access_token = data.get("access_token") + self.renewal_timer = Timer(data.get("expires_in"), self.update_rte_access_token) + self.renewal_timer.start() + + @property + def tempo(self): + today = datetime.combine(date.today(), datetime.min.time()).astimezone() + url = parse.urljoin( + "https://digital.iservices.rte-france.com/open_api/tempo_like_supply_contract/v1/tempo_like_calendars", + "?" + + parse.urlencode( + { + "start_date": today.isoformat(), + "end_date": (today + timedelta(days=2)).isoformat(), + }, + safe=":", + ), + ) + with request.urlopen( + request.Request( + url, + headers={ + "Authorization": f"Bearer {self.rte_access_token}", + "Accept": "application/json", + }, + ) + ) as res: + api_data = json.load(res) + + print(api_data.get("tempo_like_calendars").get("values")) + + return [ + val.get("value") + for val in api_data.get("tempo_like_calendars").get("values") + ] + + +def convert_color(color): + match color: + case "BLUE": + return [0, 0, 255] + case "WHITE": + return [255, 255, 255] + case "RED": + return [255, 0, 0] + case _: + return [255, 255, 0]