Initial commit
This commit is contained in:
commit
3ef379b0b7
3 changed files with 159 additions and 0 deletions
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
__pycache__
|
||||
/config.toml
|
86
oin/__main__.py
Normal file
86
oin/__main__.py
Normal file
|
@ -0,0 +1,86 @@
|
|||
import tomllib
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
import json
|
||||
from threading import Timer
|
||||
import psycopg2
|
||||
from sense_hat import SenseHat
|
||||
from signal import pause
|
||||
from urllib import request, parse
|
||||
|
||||
from .tempo import TempoAPI, convert_color
|
||||
|
||||
with open("config.toml", "rb") as config_file:
|
||||
config = tomllib.load(config_file)
|
||||
|
||||
sense = SenseHat()
|
||||
tempo_api = TempoAPI(config.get("rte_api_key"))
|
||||
|
||||
|
||||
def show_trash():
|
||||
if date.today().isocalendar().week % 2:
|
||||
sense.show_message("DR", text_colour=[255, 255, 0])
|
||||
else:
|
||||
sense.show_message("OM", text_colour=[0, 127, 0])
|
||||
|
||||
|
||||
def show_temp():
|
||||
sense.show_message(f"{sense.temperature:.1f}", text_colour=[255, 0, 255])
|
||||
|
||||
|
||||
def show_humidity():
|
||||
sense.show_message(f"{sense.humidity:.1f}", text_colour=[0, 0, 255])
|
||||
|
||||
|
||||
def show_tempo():
|
||||
tempo_colors = [convert_color(c) for c in tempo_api.tempo]
|
||||
sense.set_pixels([tempo_colors[1]] * 48 + [tempo_colors[0]] * 16)
|
||||
tc = Timer(5, sense.clear)
|
||||
tc.start()
|
||||
|
||||
|
||||
def stick_loop():
|
||||
event = sense.stick.wait_for_event(emptybuffer=True)
|
||||
match event.direction:
|
||||
case "up":
|
||||
show_trash()
|
||||
case "right":
|
||||
show_temp()
|
||||
case "left":
|
||||
show_humidity()
|
||||
case "down":
|
||||
show_tempo()
|
||||
case "middle":
|
||||
sense.clear()
|
||||
|
||||
ts = Timer(0, stick_loop)
|
||||
ts.start()
|
||||
|
||||
|
||||
def save_loop(conn):
|
||||
tp = Timer(30, save_loop, (conn,))
|
||||
tp.start()
|
||||
dt = datetime.now()
|
||||
|
||||
temp = sense.temperature
|
||||
pres = sense.pressure
|
||||
humi = sense.humidity
|
||||
|
||||
print(dt)
|
||||
cursor = conn.cursor()
|
||||
INS = "INSERT INTO sensor_data (time, sensor, value) VALUES (%s, %s, %s);"
|
||||
try:
|
||||
cursor.execute(INS, (dt, "temp", temp))
|
||||
cursor.execute(INS, (dt, "pres", pres))
|
||||
cursor.execute(INS, (dt, "humi", humi))
|
||||
except (Exception, psycopg2.Error) as error:
|
||||
print(error.pgerror)
|
||||
conn.commit()
|
||||
|
||||
|
||||
with psycopg2.connect(database="tsdb", user="edpibu", host="/run/postgresql") as conn:
|
||||
tp = Timer(0, save_loop, (conn,))
|
||||
tp.start()
|
||||
|
||||
ts = Timer(0, stick_loop)
|
||||
ts.start()
|
||||
sense.show_message(">")
|
71
oin/tempo/__init__.py
Normal file
71
oin/tempo/__init__.py
Normal file
|
@ -0,0 +1,71 @@
|
|||
from datetime import date, datetime, timedelta, timezone
|
||||
from threading import Timer
|
||||
from urllib import request, parse
|
||||
import json
|
||||
|
||||
|
||||
class TempoAPI:
|
||||
def __init__(self, api_key):
|
||||
self.rte_api_key = api_key
|
||||
self.update_rte_access_token()
|
||||
|
||||
def update_rte_access_token(self):
|
||||
with request.urlopen(
|
||||
request.Request(
|
||||
"https://digital.iservices.rte-france.com/token/oauth/",
|
||||
headers={
|
||||
"Authorization": f"Basic {self.rte_api_key}",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
},
|
||||
method="POST",
|
||||
)
|
||||
) as res:
|
||||
data = json.load(res)
|
||||
|
||||
self.rte_access_token = data.get("access_token")
|
||||
self.renewal_timer = Timer(data.get("expires_in"), self.update_rte_access_token)
|
||||
self.renewal_timer.start()
|
||||
|
||||
@property
|
||||
def tempo(self):
|
||||
today = datetime.combine(date.today(), datetime.min.time()).astimezone()
|
||||
url = parse.urljoin(
|
||||
"https://digital.iservices.rte-france.com/open_api/tempo_like_supply_contract/v1/tempo_like_calendars",
|
||||
"?"
|
||||
+ parse.urlencode(
|
||||
{
|
||||
"start_date": today.isoformat(),
|
||||
"end_date": (today + timedelta(days=2)).isoformat(),
|
||||
},
|
||||
safe=":",
|
||||
),
|
||||
)
|
||||
with request.urlopen(
|
||||
request.Request(
|
||||
url,
|
||||
headers={
|
||||
"Authorization": f"Bearer {self.rte_access_token}",
|
||||
"Accept": "application/json",
|
||||
},
|
||||
)
|
||||
) as res:
|
||||
api_data = json.load(res)
|
||||
|
||||
print(api_data.get("tempo_like_calendars").get("values"))
|
||||
|
||||
return [
|
||||
val.get("value")
|
||||
for val in api_data.get("tempo_like_calendars").get("values")
|
||||
]
|
||||
|
||||
|
||||
def convert_color(color):
|
||||
match color:
|
||||
case "BLUE":
|
||||
return [0, 0, 255]
|
||||
case "WHITE":
|
||||
return [255, 255, 255]
|
||||
case "RED":
|
||||
return [255, 0, 0]
|
||||
case _:
|
||||
return [255, 255, 0]
|
Loading…
Reference in a new issue