import logging import subprocess import tempfile import numpy as np import scipy.io as sio log = logging.getLogger("read_mat") class ReadSwash: def __init__(self, root, out): self._root = root self._out = out self._n_x = None self._n_t = None self._i = None self._t = None self._x = None def read_raw(self, var, var_names=None): res = sio.loadmat(self._root.joinpath(f"{var}.mat"), variable_names=var_names) res.pop("__header__") res.pop("__version__") res.pop("__globals__") return res def read_t(self): raw_t = self.read_raw("tsec") self._i = np.char.lstrip(np.fromiter(raw_t.keys(), dtype="U15"), "Tsec_") self._t = np.fromiter( (raw_t[k][0, 0] * 10**3 for k in np.char.add("Tsec_", self._i)), dtype=np.uintc, ) self._n_t = self._t.size return self.t def read_x(self): self._x = self.read_const("xp", "Xp") self._n_x = self._x.size return self.x def read_scalar(self, var, var_name): raw = self.read_raw(var) return np.asarray( [raw[i0][0] for i0 in np.char.add(f"{var_name}_", self._i)], dtype=np.single ) def read_const(self, var, var_name): raw = self.read_raw(var, var_name) return raw[var_name][0] def read_vector(self, var, var_name): raw = self.read_raw(var) return ( np.asarray( [raw[i0][0] for i0 in np.char.add(f"{var_name}_x_", self._i)], dtype=np.single, ), np.asarray( [raw[i0][0] for i0 in np.char.add(f"{var_name}_y_", self._i)], dtype=np.single, ), ) def read_scalar_lay(self, var, var_name): raw = self.read_raw(var) n = len(raw.keys()) // self._n_t ra = (n,) if f"{var_name}0_{self._i[0]}" in raw.keys() else (1, n + 1) return np.asarray( [ raw[i0][0] for i0 in np.char.add( np.char.replace( f"{var_name}[]_", "[]", np.arange(*ra).astype("U1") )[None, :], self._i[:, None], ).reshape(-1) ], dtype=np.single, ).reshape((self._n_t, n, self._n_x)) def read_vector_lay(self, var, var_name): raw = self.read_raw(var) n = len(raw.keys()) // (self._n_t * 2) ra = (n,) if f"{var_name}0_x_{self._i[0]}" in raw.keys() else (1, n + 1) return ( np.asarray( [ raw[i0][0] for i0 in np.char.add( np.char.replace( f"{var_name}[]_x_", "[]", np.arange(*ra).astype("U1") )[None, :], self._i[:, None], ).reshape(-1) ], dtype=np.single, ).reshape((self._n_t, n, self._n_x)), np.asarray( [ raw[i0][0] for i0 in np.char.add( np.char.replace( f"{var_name}[]_y_", "[]", np.arange(*ra).astype("U1") )[None, :], self._i[:, None], ).reshape(-1) ], dtype=np.single, ).reshape((self._n_t, n, self._n_x)), ) def save(self, t, var=None, var_name=None): fct = { "t": self.read_t, "x": self.read_x, "s": self.read_scalar, "c": self.read_const, "v": self.read_vector, "sk": self.read_scalar_lay, "vk": self.read_vector_lay, } if t in ("x", "t"): log.info(f"Converting {t}") np.save(self._out.joinpath(t), fct[t]()) else: log.info(f"Converting {var}") np.save(self._out.joinpath(var), fct[t](var, var_name)) @property def t(self): return self._t @property def x(self): return self._x