130 lines
3.5 KiB
Python
130 lines
3.5 KiB
Python
import pathlib
|
|
import re
|
|
|
|
from fluidfoam import readof
|
|
import numpy as np
|
|
|
|
|
|
class OFModel:
|
|
def __init__(self, root):
|
|
self._root = root
|
|
self._fields = {}
|
|
self._post_fields = {}
|
|
|
|
def read_mesh(self):
|
|
self._x, self._y, self._z = readof.readmesh(str(self._root))
|
|
self._n = self._x.size
|
|
|
|
def read_time(self):
|
|
_dirs = np.fromiter(
|
|
map(
|
|
lambda f: f.name,
|
|
filter(
|
|
lambda f: re.match(r"^[0-9]+(\.[0-9]+)?$", f.name),
|
|
self._root.glob("*"),
|
|
),
|
|
),
|
|
dtype="U10",
|
|
)
|
|
_t = _dirs.astype(np.half)
|
|
_sort = np.argsort(_t)
|
|
|
|
self._t_dirs = _dirs[_sort]
|
|
self._t = _t[_sort]
|
|
return self.t
|
|
|
|
def read_field(self, field, t):
|
|
if not self._root.joinpath(t, field).exists():
|
|
return np.empty((self._n))
|
|
return readof.readfield(self._root, time_name=t, name=field)
|
|
|
|
def read_field_all(self, field):
|
|
_shape = (
|
|
(self.t.size, self._n)
|
|
if readof.typefield(self._root, time_name=self._t_dirs[-1], name=field)
|
|
== "scalar"
|
|
else (self.t.size, 3, self._n)
|
|
)
|
|
|
|
_field = np.empty(_shape, dtype=np.single)
|
|
for _f, _dir in zip(_field, self._t_dirs):
|
|
_f[:] = self.read_field(field, _dir)
|
|
self.fields[field] = _field
|
|
return _field
|
|
|
|
def read_post(self, func, field):
|
|
_ft = lambda _d: self._root.joinpath(
|
|
"postProcessing", func, _d, f"line_{field}.xy"
|
|
)
|
|
_res_0 = np.loadtxt(_ft(self._t_dirs[0]))
|
|
_x = _res_0[:, 0]
|
|
_res = np.empty((self._t.size, _x.size, _res_0.shape[1] - 1))
|
|
_res[0] = _res_0[:, 1:]
|
|
for _r, _dir in zip(_res[1:], self._t_dirs[1:]):
|
|
_r[:] = np.loadtxt(_ft(_dir))[:, 1:]
|
|
_dict = {
|
|
f"x_{field}": _x,
|
|
field: np.squeeze(_res),
|
|
}
|
|
if func not in self._post_fields.keys():
|
|
self._post_fields[func] = {}
|
|
self._post_fields[func] |= _dict
|
|
return _dict
|
|
|
|
def write_field(self, field, values):
|
|
with open(self._root.joinpath("0", field), "r") as aw_file:
|
|
aw_raw = aw_file.read()
|
|
|
|
with open(self._root.joinpath("0", field), "w") as aw_file:
|
|
aw_file.write(
|
|
re.sub(
|
|
r"(?<=\(\n).*?(?=\n\))",
|
|
"\n".join(values.astype(str)),
|
|
aw_raw,
|
|
count=1,
|
|
flags=re.S,
|
|
)
|
|
)
|
|
|
|
def write_vector_field(self, field, values):
|
|
with open(self._root.joinpath("0", field), "r") as aw_file:
|
|
aw_raw = aw_file.read()
|
|
|
|
with open(self._root.joinpath("0", field), "w") as aw_file:
|
|
aw_file.write(
|
|
re.sub(
|
|
r"(?<=\(\n).*?(?=\n\))",
|
|
"\n".join(map(lambda x: f"({' '.join(x.astype('str'))})", values)),
|
|
aw_raw,
|
|
count=1,
|
|
flags=re.S,
|
|
)
|
|
)
|
|
|
|
@property
|
|
def x(self):
|
|
return self._x
|
|
|
|
@property
|
|
def y(self):
|
|
return self._y
|
|
|
|
@property
|
|
def z(self):
|
|
return self._z
|
|
|
|
@property
|
|
def coords(self):
|
|
return np.stack((self._x, self._y, self._z), axis=1)
|
|
|
|
@property
|
|
def t(self):
|
|
return self._t
|
|
|
|
@property
|
|
def fields(self):
|
|
return self._fields
|
|
|
|
@property
|
|
def post_fields(self):
|
|
return self._post_fields
|