138 lines
4.1 KiB
Python
138 lines
4.1 KiB
Python
import logging
|
|
import subprocess
|
|
import tempfile
|
|
|
|
import numpy as np
|
|
import scipy.io as sio
|
|
|
|
|
|
log = logging.getLogger("read_mat")
|
|
|
|
|
|
class ReadSwash:
|
|
def __init__(self, root, out):
|
|
self._root = root
|
|
self._out = out
|
|
self._n_x = None
|
|
self._n_t = None
|
|
self._i = None
|
|
self._t = None
|
|
self._x = None
|
|
|
|
def read_raw(self, var, var_names=None):
|
|
res = sio.loadmat(self._root.joinpath(f"{var}.mat"), variable_names=var_names)
|
|
res.pop("__header__")
|
|
res.pop("__version__")
|
|
res.pop("__globals__")
|
|
return res
|
|
|
|
def read_t(self):
|
|
raw_t = self.read_raw("tsec")
|
|
self._i = np.char.lstrip(np.fromiter(raw_t.keys(), dtype="U15"), "Tsec_")
|
|
self._t = np.fromiter(
|
|
(raw_t[k][0, 0] * 10**3 for k in np.char.add("Tsec_", self._i)),
|
|
dtype=np.uintc,
|
|
)
|
|
self._n_t = self._t.size
|
|
return self.t
|
|
|
|
def read_x(self):
|
|
self._x = self.read_const("xp", "Xp")
|
|
self._n_x = self._x.size
|
|
return self.x
|
|
|
|
def read_scalar(self, var, var_name):
|
|
raw = self.read_raw(var)
|
|
return np.asarray(
|
|
[raw[i0][0] for i0 in np.char.add(f"{var_name}_", self._i)], dtype=np.single
|
|
)
|
|
|
|
def read_const(self, var, var_name):
|
|
raw = self.read_raw(var, var_name)
|
|
return raw[var_name][0]
|
|
|
|
def read_vector(self, var, var_name):
|
|
raw = self.read_raw(var)
|
|
return (
|
|
np.asarray(
|
|
[raw[i0][0] for i0 in np.char.add(f"{var_name}_x_", self._i)],
|
|
dtype=np.single,
|
|
),
|
|
np.asarray(
|
|
[raw[i0][0] for i0 in np.char.add(f"{var_name}_y_", self._i)],
|
|
dtype=np.single,
|
|
),
|
|
)
|
|
|
|
def read_scalar_lay(self, var, var_name):
|
|
raw = self.read_raw(var)
|
|
n = len(raw.keys()) // self._n_t
|
|
ra = (n,) if f"{var_name}0_{self._i[0]}" in raw.keys() else (1, n + 1)
|
|
return np.asarray(
|
|
[
|
|
raw[i0][0]
|
|
for i0 in np.char.add(
|
|
np.char.replace(
|
|
f"{var_name}[]_", "[]", np.arange(*ra).astype("U1")
|
|
)[None, :],
|
|
self._i[:, None],
|
|
).reshape(-1)
|
|
],
|
|
dtype=np.single,
|
|
).reshape((self._n_t, n, self._n_x))
|
|
|
|
def read_vector_lay(self, var, var_name):
|
|
raw = self.read_raw(var)
|
|
n = len(raw.keys()) // (self._n_t * 2)
|
|
ra = (n,) if f"{var_name}0_x_{self._i[0]}" in raw.keys() else (1, n + 1)
|
|
return (
|
|
np.asarray(
|
|
[
|
|
raw[i0][0]
|
|
for i0 in np.char.add(
|
|
np.char.replace(
|
|
f"{var_name}[]_x_", "[]", np.arange(*ra).astype("U1")
|
|
)[None, :],
|
|
self._i[:, None],
|
|
).reshape(-1)
|
|
],
|
|
dtype=np.single,
|
|
).reshape((self._n_t, n, self._n_x)),
|
|
np.asarray(
|
|
[
|
|
raw[i0][0]
|
|
for i0 in np.char.add(
|
|
np.char.replace(
|
|
f"{var_name}[]_y_", "[]", np.arange(*ra).astype("U1")
|
|
)[None, :],
|
|
self._i[:, None],
|
|
).reshape(-1)
|
|
],
|
|
dtype=np.single,
|
|
).reshape((self._n_t, n, self._n_x)),
|
|
)
|
|
|
|
def save(self, t, var=None, var_name=None):
|
|
fct = {
|
|
"t": self.read_t,
|
|
"x": self.read_x,
|
|
"s": self.read_scalar,
|
|
"c": self.read_const,
|
|
"v": self.read_vector,
|
|
"sk": self.read_scalar_lay,
|
|
"vk": self.read_vector_lay,
|
|
}
|
|
if t in ("x", "t"):
|
|
log.info(f"Converting {t}")
|
|
np.save(self._out.joinpath(t), fct[t]())
|
|
else:
|
|
log.info(f"Converting {var}")
|
|
np.save(self._out.joinpath(var), fct[t](var, var_name))
|
|
|
|
@property
|
|
def t(self):
|
|
return self._t
|
|
|
|
@property
|
|
def x(self):
|
|
return self._x
|