import pathlib import re from fluidfoam import readof import numpy as np class OFModel: def __init__(self, root): self._root = root self._fields = {} self._post_fields = {} def read_mesh(self): self._x, self._y, self._z = readof.readmesh(str(self._root)) self._n = self._x.size def read_time(self): _dirs = np.fromiter( map( lambda f: f.name, filter( lambda f: re.match(r"^[0-9]+(\.[0-9]+)?$", f.name), self._root.glob("*"), ), ), dtype="U10", ) _t = _dirs.astype(np.half) _sort = np.argsort(_t) self._t_dirs = _dirs[_sort] self._t = _t[_sort] return self.t def read_field(self, field, t): if not self._root.joinpath(t, field).exists(): return np.empty((self._n)) return readof.readfield(self._root, time_name=t, name=field) def read_field_all(self, field): _shape = ( (self.t.size, self._n) if readof.typefield(self._root, time_name=self._t_dirs[-1], name=field) == "scalar" else (self.t.size, 3, self._n) ) _field = np.empty(_shape, dtype=np.single) for _f, _dir in zip(_field, self._t_dirs): _f[:] = self.read_field(field, _dir) self.fields[field] = _field return _field def read_post(self, func, field): _ft = lambda _d: self._root.joinpath("postProcessing", func, _d, f"line_{field}.xy") _res_0 = np.loadtxt(_ft(self._t_dirs[0])) _x = _res_0[:, 0] _res = np.empty((self._t.size, _x.size)) _res[0] = _res_0[:, 1] for _r, _dir in zip(_res[1:], self._t_dirs[1:]): _r[:] = np.loadtxt(_ft(_dir))[:, 1] _dict = { f"x_{field}": _x, field: _res, } if func not in self._post_fields.keys(): self._post_fields[func] = {} self._post_fields[func] |= _dict return _dict def write_field(self, field, values): with open(self._root.joinpath("0", field), "r") as aw_file: aw_raw = aw_file.read() with open(self._root.joinpath("0", field), "w") as aw_file: aw_file.write( re.sub( r"(?<=\(\n).*?(?=\n\))", "\n".join(values.astype(str)), aw_raw, count=1, flags=re.S, ) ) def write_vector_field(self, field, values): with open(self._root.joinpath("0", field), "r") as aw_file: aw_raw = aw_file.read() with open(self._root.joinpath("0", field), "w") as aw_file: aw_file.write( re.sub( r"(?<=\(\n).*?(?=\n\))", "\n".join(map(lambda x: f"({' '.join(x.astype('str'))})", values)), aw_raw, count=1, flags=re.S, ) ) @property def x(self): return self._x @property def y(self): return self._y @property def z(self): return self._z @property def coords(self): return np.stack((self._x, self._y, self._z), axis=1) @property def t(self): return self._t @property def fields(self): return self._fields @property def post_fields(self): return self._post_fields