1
Fork 0
internship/olaflow/processing/olaflow.py

117 lines
2.9 KiB
Python

import pathlib
import re
from fluidfoam import readof
import numpy as np
class OFModel:
def __init__(self, root):
self._root = root
self._fields = {}
def read_mesh(self):
self._x, self._y, self._z = readof.readmesh(str(self._root))
self._n = self._x.size
def read_time(self):
_dirs = np.fromiter(
map(
lambda f: f.name,
filter(
lambda f: re.match(r"^[0-9]+(\.[0-9]+)?$", f.name),
self._root.glob("*"),
),
),
dtype="U10",
)
_t = _dirs.astype(np.half)
_sort = np.argsort(_t)
self._t_dirs = _dirs[_sort]
self._t = _t[_sort]
return self.t
def read_field(self, field, t):
if not self._root.joinpath(t, field).exists():
return np.empty((self._n))
return readof.readfield(self._root, time_name=t, name=field)
def read_field_all(self, field):
_shape = (
(self.t.size, self._n)
if readof.typefield(self._root, time_name=self._t_dirs[-1], name=field)
== "scalar"
else (self.t.size, 3, self._n)
)
_field = np.empty(_shape, dtype=np.single)
for _f, _dir in zip(_field, self._t_dirs):
_f[:] = self.read_field(field, _dir)
self.fields[field] = _field
return _field
def write_field(self, field, values):
with open(self._root.joinpath("0", field), "r") as aw_file:
aw_raw = aw_file.read()
with open(self._root.joinpath("0", field), "w") as aw_file:
aw_file.write(
re.sub(
r"(?<=\(\n).*?(?=\n\))",
"\n".join(values.astype(str)),
aw_raw,
count=1,
flags=re.S,
)
)
def write_vector_field(self, field, values):
with open(self._root.joinpath("0", field), "r") as aw_file:
aw_raw = aw_file.read()
with open(self._root.joinpath("0", field), "w") as aw_file:
aw_file.write(
re.sub(
r"(?<=\(\n).*?(?=\n\))",
"\n".join(map(lambda x: f"({' '.join(x.astype('str'))})", values)),
aw_raw,
count=1,
flags=re.S,
)
)
@property
def x(self):
return self._x
@property
def y(self):
return self._y
@property
def z(self):
return self._z
@property
def coords(self):
return np.stack((self._x, self._y, self._z), axis=1)
@property
def t(self):
return self._t
@property
def fields(self):
return self._fields
@property
def X(self):
return self._X
@property
def Z(self):
return self._Z
def FIELD(self, field):
return np.where(self._C > 0, field[:, C], np.nan)