"""Functions to write, run, and read for the Turbostream 3 solver."""
import numpy as np
from timeit import default_timer as timer
import turbigen.grid
import turbigen.solvers.ts3
import h5py
import sys
import shutil
import os
import subprocess
from scipy.spatial import KDTree
from time import sleep
import turbigen.util
import json
import re
from turbigen.solvers.base import BaseSolver
logger = turbigen.util.make_logger()
[docs]
class TS4Config(BaseSolver):
"""Settings with default values for the TS4 solver."""
_name = "TS4"
cfl = 25.0
"""Courant--Friedrichs--Lewy number, reduce for more stability."""
cfl_ramp_nstep = 500
"""Ramp the CFL number up over the first `cfl_ramp_nstep` steps."""
cfl_ramp_st = 1.0
"""Starting value for CFL ramp."""
custom_pipeline = ""
"""Specify a custom pipeline to convert Turbostream 3 to 4 input file. Should run
using pvpython and take two command-line arguments like `pvpython
custom_pipeline.py input_ts3.hdf5 input_ts4`"""
environment_script = (
"/usr/local/software/turbostream/ts42111/bashrc_module_ts42111_a100"
)
"""Setup environment shell script to be sourced before running."""
implicit_scheme = 1
"""Whether to use implicit time-marching scheme."""
nstep = 5000
"""Number of time steps for the calculation."""
nstep_avg = 1000
"""Number of time steps to average over."""
nstep_ts3 = 0
"""Number of steps for a Turbostream 3 initial guess"""
spf_probe = []
"""List of span fractions to place probes."""
point_probe = []
"""Specify point probes."""
logical_probe = []
"""Specify logical probes."""
tables_path = ""
"""Path to gas tables for real working fluids."""
body_force_template = ""
"""Path to a body_force.ofp definition file template."""
body_force_params = {}
"""Parameters to be added to body force template."""
viscous_model = 2
"""Turbulence model, 0 for inviscid, 1 for laminar, 2 for Spalart-Allmaras."""
outlet_tag = "Outlet"
"""Identifier string for the outlet patch."""
monitor_script = ""
"""Path to a monitoring script, given the workdir as an argument."""
area_avg_pout = True
pout_fac_ramp_nstep = 0
inlet_relax_fac = 0.5
nstep_save_start_probe_1d = 0
nstep_save_probe_1d = 100
nstep_save_start_probe_2d = 0
nstep_save_probe_2d = 100
cfl_turb_fac = 0.5
plot_conv = True
precon = 0
precon_fac_ramp_nstep = 100
precon_fac_ramp_st = 0.1
precon_fac_ramp_en = 1.0
precon_sigma_pgr = 3.0
halo_implementation = 1
interpolation_update = 1 # 1 to freeze interpolating plane posn
@property
def config_path(self):
return os.path.join(self.workdir, "config.ofp")
def to_ofp(self):
v = DEFAULT_CONFIG.copy()
for k in v:
if hasattr(self, k):
v[k] = getattr(self, k)
# Raise error if averaging not OK
nstep = v["nstep"]
istep_avg_start = v["istep_avg_start"] = v["nstep"] - self.nstep_avg
if istep_avg_start >= nstep and nstep > 0:
raise Exception(f"istep_avg_start={istep_avg_start} is > nstep={nstep}")
if v["cfl_ramp"]:
v["cfl_ramp_en"] = v["cfl"]
if v.get("pout_fac_ramp_nstep"):
v["pout_fac_ramp"] = 1
if v["precon"]:
v["precon_fac_ramp"] = 1
return v
re_nan = re.compile(r"^detected NAN$", flags=re.MULTILINE)
re_current_step = re.compile(r"^RK LOOP NO:\s*(\d*)$", flags=re.MULTILINE)
def _read_convergence(fname, nstep):
"""Parse a log file to get convergence history."""
# Preallocate
resid = np.full((nstep,), np.nan)
inlet = np.full((nstep, 3), np.nan)
outlet = np.full((nstep, 3), np.nan)
# Loop over lines in file
istep = 0
with open(fname, "r") as f:
for line in f:
if line.startswith("ROVX DELTA:"):
resid[istep] = float(line.split()[-1])
elif line.startswith("INLET: "):
inlet[istep, :] = [float(x) for x in line.split()[1:]]
elif line.startswith("OUTLET: "):
outlet[istep, :] = [float(x) for x in line.split()[1:]]
istep += 1
return resid, inlet, outlet
def _check_nan(fname):
"""Return step number of divergence from TS4 log, or zero if no NANs found."""
NBYTES = 2048
with open(fname, "r") as f:
while chunk := f.read(NBYTES):
if re_nan.search(chunk):
try:
return int(re_current_step.findall(chunk)[-1])
except Exception:
return -1
return 0
def _write_wall_distance(g, fname):
"""Given a TS4 input file, use wall nodes from this grid to get wall distance."""
# Read the unstructured data from TS4 hdf5
f = h5py.File(fname, "r+")
# Extract coordinates
x = np.copy(f["node_array_x"])
y = np.copy(f["node_array_y"])
z = np.copy(f["node_array_z"])
# Convert Cartesian coordinates to polar
r = np.sqrt(y**2.0 + z**2.0)
t = -np.arctan2(z, y)
xrrt = np.stack((x, r, r * t), axis=1)
# Initialise a kdtree of wall points
kdtree = KDTree(g.get_wall_nodes().T)
# Query the wall distances
wdist, _ = kdtree.query(xrrt, workers=32)
# Insert into grid
f["node_array_wdist"][:] = wdist
f.close()
def _write_ofp(fname, var):
"""Write a dictionary of configuration variables in ofp format."""
with open(fname, "w") as f:
f.writelines([k + " = " + str(v) + "\n" for k, v in var.items()])
def _read_ofp(fname):
"""Read a dictionary of configuration variables in ofp format."""
var = {}
with open(fname, "w") as f:
for line in f:
k, v = line.split()
if "." in v or "e" in v:
var[k] = float(v)
else:
var[k] = int(v)
return var
DEFAULT_CONFIG = {
"cfl": 50.0,
"cfl_ramp": 1,
"cfl_ramp_nstep": 500,
"cfl_ramp_st": 1.0,
"cfl_turb_fac": 0.25,
"fluid_model": 0,
"implicit_scheme": 1,
"inlet_relax_fac": 0.5,
"interpolation_extend_fac": 0.5,
"interpolation_time_step_fac": 0.1,
"istep_avg_start": 4000,
# "jacobian_diagonal_fac": 0.3,
"kappa2": 1.0,
"kappa4": 1.0 / 128.0,
"mixing_alpha": 1.0,
"mixing_bc_method": 2,
"mixing_distribution_kind": 1,
"mixing_distribution_spacing": 5e-4,
"mixing_method": 2,
"mixing_nstation": 200,
"mixing_rf": 0.25,
"mixing_rf_ramp": 1,
"mixing_rf_ramp_nstep": 500,
"mixing_rf_ramp_st": 0.05,
"mixing_rf_ramp_en": 0.25,
"node_ordering_option": 2,
"nstep": 5000,
"prandtl_turbulent": 0.9,
"sa_helicity_model": 0,
"sa_lim_neg": 1,
"time_step_mod_fac_nsmooth": 0,
"time_step_mod_fac_smooth_frac": 0.5,
"update_mixing_nstep": 5,
"viscous_model": 2,
"precon": 0,
"precon_dissipation": 0,
"precon_sigma_pgr": 3.0,
"precon_beta": 3.0,
"precon_mach_ref": 0.1,
"precon_mach_lim": 0.1,
"precon_fac_ramp": 0,
"precon_fac_ramp_nstep": 100,
"precon_fac_ramp_st": 0.1,
"precon_fac_ramp_en": 1.0,
"pout_fac_ramp_nstep": 0,
"pout_fac_ramp": 0,
"pout_fac_ramp_st": 0.8,
"pout_fac_ramp_en": 1.0,
"use_gpu_direct": 1,
}
def _read_flow(grid, fname, fname_avg):
start = timer()
# Read the unstructured data from TS4 hdf5
f = h5py.File(fname, "r")
fa = h5py.File(fname_avg, "r")
x = np.copy(f["node_array_x"])
y = np.copy(f["node_array_y"])
z = np.copy(f["node_array_z"])
ro = np.copy(fa["node_array_ro"])
rovx = np.copy(fa["node_array_rovx"])
rovy = np.copy(fa["node_array_rovy"])
rovz = np.copy(fa["node_array_rovz"])
roe = np.copy(fa["node_array_roe"])
turb0 = np.copy(fa["node_array_turb0"])
f.close()
fa.close()
end = timer()
logger.debug(f"Read flow in {end-start} s")
# Divide out density
vx = rovx / ro
vy = rovy / ro
vz = rovz / ro
vsq = vx**2.0 + vy**2.0 + vz**2.0
# Convert Cartesian coordinates to polar
r = np.sqrt(y**2.0 + z**2.0)
t = np.arctan2(-z, y)
rt = r * t
# Convert Cartesian velocities to polar
vt = -(vy * np.sin(t) + vz * np.cos(t))
vr = vy * np.cos(t) - vz * np.sin(t)
# Calculate internal energy
u = roe / ro - 0.5 * vsq
# Build kdtree for nodes in the TS4 hdf5
kdtree = KDTree(np.column_stack((x, r, rt)))
# Now loop over blocks in the grid
for block in grid:
# Extract unstructure coordinates for this block
xrrtb = block.to_unstructured().xrrt.T
# Get nearest neighbours from the TS4 grid
_, ind_pts = kdtree.query(xrrtb, workers=-1)
# Check these really are the points we want
# assert np.allclose(block.x, x[ind_pts].reshape(block.shape))
# assert np.allclose(block.r, r[ind_pts].reshape(block.shape))
# assert np.allclose(block.t, t[ind_pts].reshape(block.shape))
# Assign the velocities
block.Vx = vx[ind_pts].reshape(block.shape)
block.Vr = vr[ind_pts].reshape(block.shape)
block.Vt = vt[ind_pts].reshape(block.shape)
# Set thermodynamic properties
Tu0_old = block.Tu0 + 0.0
block.Tu0 = 0.0
block.set_rho_u(
ro[ind_pts].reshape(block.shape), u[ind_pts].reshape(block.shape)
)
block.set_Tu0(Tu0_old)
# Assign turbulent viscosity
block.mu_turb = turb0[ind_pts].reshape(block.shape)
def _write_throttle(ts4_conf, grid, fname):
# Get indices for outlet bcells
with h5py.File(fname, "r") as f:
bcell_names = list(f["bcell_names"].attrs["names"])
bcell_ind = [i for i, b in enumerate(bcell_names) if ts4_conf.outlet_tag == b]
if not bcell_ind:
raise Exception(
f"Could not find throttle outlet tag {ts4_conf.outlet_tag}, "
f"should be one of {bcell_names}"
)
# Loop over outlet patches
mass_target = []
throttle_kind = []
throttle_k0 = []
mdot_flag = False
for patch in grid.outlet_patches:
# TS4 only has integral control. Multiply by Nb because targets *patch*
# not *annulus* mass flow. Have to reduce a bit because TS4 converges
# in fewer steps than TS3.
Nb = patch.block.Nb
fac_adjust = 0.5 if ts4_conf.implicit_scheme else 1.0
if patch.Kpid is not None:
mdot_flag = True
mass_target.append(patch.mdot_target / float(Nb))
throttle_kind.append(0)
throttle_k0.append(patch.Kpid[1] * fac_adjust)
else:
mass_target.append(0.0)
throttle_kind.append(1)
throttle_k0.append(0.0)
# Assemble throttle data
throt_ofp = {
"throttle_tag_list": bcell_ind,
"throttle_target_list": mass_target,
"throttle_k0_list": throttle_k0,
"throttle_correction_initial_list": [0.0 for _ in bcell_ind],
"throttle_kind_list": throttle_kind,
}
throt_file_path = os.path.join(ts4_conf.workdir, "throttle_config.ofp")
if os.path.exists(throt_file_path):
os.remove(throt_file_path)
if mdot_flag or ts4_conf.area_avg_pout:
_write_ofp(throt_file_path, throt_ofp)
return mdot_flag
def _write_probes_ofp(ts4_conf):
probe_ofp = os.path.join(ts4_conf.workdir, "probes.ofp")
with open(probe_ofp, "w") as f:
# Initialise probe list
f.write(
"""
import ts.process.probe.probe_definition
probe_list = []
"""
)
def _write_point_probe(ts4_conf, xyzp, dom, label):
probe_ofp = os.path.join(ts4_conf.workdir, "probes.ofp")
istep_save_start = ts4_conf.nstep - ts4_conf.nstep_avg
with open(probe_ofp, "a") as f:
# Initialise probe list
f.write(
f"""
p = ts.process.probe.probe_definition.ProbeDefinition()
p.kind = "point"
p.x = {xyzp[0].tolist()}
p.y = {xyzp[1].tolist()}
p.z = {xyzp[2].tolist()}
p.idomain = {dom}
p.absolute_frame = True
p.fname_root = "point_probe_{label}"
p.write_2d = True
p.nstep_save_start_1d = {istep_save_start}
p.nstep_save_1d = {ts4_conf.nstep_save_probe_1d}
p.nstep_save_start_2d = {istep_save_start}
p.nstep_save_2d = {ts4_conf.nstep_save_probe_2d}
p.time_average = False # Must be False in a steady calc?
probe_list.append(p)
"""
)
def _write_logical_probe(ts4_conf, tag, label):
probe_ofp = os.path.join(ts4_conf.workdir, "probes.ofp")
istep_save_start = ts4_conf.nstep - ts4_conf.nstep_avg
with open(probe_ofp, "a") as f:
# Initialise probe list
f.write(
f"""
p = ts.process.probe.probe_definition.ProbeDefinition()
p.kind = "logical"
p.val = "{tag}"
p.fname_root = "logical_probe_{label}"
p.write_2d = False
p.nstep_save_start_1d = {istep_save_start}
p.nstep_save_1d = {ts4_conf.nstep_save_probe_1d}
p.nstep_save_start_2d = 99999
p.nstep_save_2d = 0
probe_list.append(p)
"""
)
def _write_xr_probe(machine, ts4_conf, spf):
probe_ofp = os.path.join(ts4_conf.workdir, "probes.ofp")
# Evaluate the xr curve, extrapolate a little
eps = 0.01
mq = np.linspace(-eps, machine.ann.npts - 1 + eps, 100)
xr = machine.ann.evaluate_xr(mq, spf)
logger.debug(f"min xr={xr.min(axis=1)}, max xr={xr.max(axis=1)}")
istep_save_start = ts4_conf.nstep - ts4_conf.nstep_avg
# Fill values into template
pstr = []
for irow in range(machine.ann.nrow):
pstr.append(
f"""
p = ts.process.probe.probe_definition.ProbeDefinition()
p.kind = "xr"
p.s1 = {xr[0].tolist()}
p.s2 = {xr[1].tolist()}
p.idomain = {irow}
p.fname_root = "xr_probe_row_{irow}_spf_{str(spf).replace('.','')}"
p.write_2d = True
p.nstep_save_start_1d = {istep_save_start}
p.nstep_save_1d = {ts4_conf.nstep_save_probe_1d}
p.nstep_save_start_2d = {istep_save_start}
p.nstep_save_2d = {ts4_conf.nstep_save_probe_2d}
probe_list.append(p)
"""
)
with open(probe_ofp, "a") as f:
f.writelines(pstr)
def run(grid, settings, machine):
"""Write, run, and read TS4 results for a grid object, specifying some settings."""
ts4_conf = TS4Config(**settings)
input_file_path = os.path.join(ts4_conf.workdir, "input_ts4.hdf5")
output_file_path = os.path.join(ts4_conf.workdir, "output_ts4.hdf5")
output_avg_file_path = os.path.join(ts4_conf.workdir, "output_ts4_avg.hdf5")
soln_exists = os.path.exists(output_file_path)
if ts4_conf.skip:
if soln_exists:
logger.info("Skipping running, loading previous solution.")
_read_flow(grid, output_file_path, output_avg_file_path)
else:
logger.info("Skipping running, keeping initial guess.")
return
ofp = ts4_conf.to_ofp()
# Invert the sign of inlet pitch angle
inpatch = grid.inlet_patches[0]
inpatch.Beta *= -1.0
# Extract nominal fluid properties
ofp["gamma"] = inpatch.state.gamma
ofp["cp"] = inpatch.state.cp
ofp["viscosity"] = inpatch.state.mu
ofp["prandtl_laminar"] = inpatch.state.Pr
ofp["prandtl_turbulent"] = inpatch.state.Pr * 1.25
ofp["fluid_model"] = 0 if isinstance(grid[0], turbigen.grid.PerfectBlock) else 1
if ofp["fluid_model"]:
if not ts4_conf.tables_path:
raise Exception("No `tables_path` specified for real gas fluid")
dest_tables = os.path.join(ts4_conf.workdir, "fluid_model_tables.npz")
shutil.copyfile(ts4_conf.tables_path, dest_tables)
# Put body force definition into working directory, with substitutions
if ts4_conf.body_force_template:
with open(ts4_conf.body_force_template, "r") as f:
body_force_str = f.read()
for k, v in ts4_conf.body_force_params.items():
body_force_str = re.sub(
rf"{k}\s*=.*$", f"{k} = {v}", body_force_str, flags=re.MULTILINE
)
# Check syntax
try:
compile(body_force_str, 'dummy.py', 'exec')
except Exception as e:
raise Exception('Error in body force template!') from e
dest_body_force = os.path.join(ts4_conf.workdir, "body_force_config.ofp")
with open(dest_body_force, "w") as f:
f.write(body_force_str)
ofp["body_force_mode"] = 1
_write_ofp(ts4_conf.config_path, ofp)
ts3_conf = turbigen.solvers.ts3.TS3Config(workdir=ts4_conf.workdir).robust()
# Get number of GPUs from environment var
ngpu = int(os.environ["SLURM_NTASKS"])
nnode = int(os.environ["SLURM_NNODES"])
npernode = ngpu // nnode
if ts4_conf.nstep_ts3:
logger.info("Running TS3 initial guess...")
ts3_conf.nstep = ts4_conf.nstep_ts3
ts3_conf.ntask = ngpu
ts3_conf.nnode = nnode
turbigen.solvers.ts3._run(grid, ts3_conf)
grid.update_outlet()
turbigen.solvers.ts3._write_hdf5(grid, ts3_conf)
if pipeline := ts4_conf.custom_pipeline:
# Write a dictionary of information that *might* be needed by custom pipelines
# In particular, TS4 cannot work out Nblade for wall distance filter by iself
pipeline_params = {
"Nb_row": [int(blks[0].Nb) for blks in grid.row_blocks],
"cp": inpatch.state.cp,
"ho": inpatch.state.h,
}
pipeline_params_path = os.path.join(ts4_conf.workdir, "pipeline_params.json")
with open(pipeline_params_path, "w") as f:
json.dump(pipeline_params, f)
convert_cmd = (
f"pvpython {os.path.abspath(pipeline)} input.hdf5 input_ts4 > convert.log"
)
logger.iter(f"Converting TS3->TS4 using custom pipeline {pipeline}...")
# When runing headless, paraview seems to segfault on exit, despite the
# pipeline completing successfully. So we ignore errors when executing
# pvpython. We could verify success by checking for existence of TS4
# input file instead.
check = False
else:
convert_script = os.path.join(
os.path.dirname(__file__), "convert_ts3_to_ts4_native.py"
)
convert_cmd = f"{convert_script} input.hdf5 input_ts4 > convert.log"
check = True
logger.info("Converting TS3->TS4...")
cmd_str = (
f"source {ts4_conf.environment_script} 2> /dev/null;"
f"cd {ts4_conf.workdir}; {convert_cmd}"
)
try:
subprocess.run(cmd_str, shell=True, check=check, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise Exception(
f"""Running TS3->TS4 conversion failed, exit code {e.returncode}
COMMAND: {cmd_str}
STDERR: {e.stderr.decode(sys.getfilesystemencoding()).strip()}
"""
) from None
input_file_path = os.path.join(ts4_conf.workdir, "input_ts4.hdf5")
# Write throttle config file
throttle_flag = _write_throttle(ts4_conf, grid, input_file_path)
# Assume that the custom pipeline will set wall distances for us
if not ts4_conf.custom_pipeline:
_write_wall_distance(grid, input_file_path)
# Write probe config file
if ts4_conf.spf_probe or ts4_conf.point_probe:
_write_probes_ofp(ts4_conf)
# Write span fraction probes
if ts4_conf.spf_probe:
for spf in ts4_conf.spf_probe:
_write_xr_probe(machine, ts4_conf, spf)
# Write point probes
if ts4_conf.point_probe:
# Loop over a list of probes
for pp_conf in ts4_conf.point_probe:
xyz = np.array(pp_conf["xyz"]).T
idomain = int(pp_conf["domain"])
# Get dimensions of input data
assert xyz.ndim == 2
assert xyz.shape[0] == 3
_write_point_probe(ts4_conf, xyz, idomain, pp_conf["label"])
# Write logical probes
if ts4_conf.logical_probe:
# Loop over a list of probes
for pp_conf in ts4_conf.logical_probe:
_write_logical_probe(ts4_conf, **pp_conf)
logger.info(f"Using {ngpu} GPUs on {nnode} nodes, {npernode} per node.")
logger.info("Running TS4...")
cmd_str = (
f"source {ts4_conf.environment_script} 2> /dev/null;"
f"cd {ts4_conf.workdir};"
f"mpirun -np {ngpu} python $TSHOME/$TSDIR/bin/turbostream.py"
" config.ofp input_ts4.hdf5 input_ts4.hdf5 input_ts4.hdf5 output_ts4"
f" {npernode} --fname_out_procid=procids.hdf5 > log_ts4.txt 2> err.txt"
)
sleep(1)
# Start monitoring script
if ts4_conf.monitor_script:
monitor_cmd = f"python {ts4_conf.monitor_script} {ts4_conf.workdir}"
monitor_proc = subprocess.Popen(monitor_cmd, shell=True)
try:
subprocess.run(cmd_str, shell=True, check=check, stderr=subprocess.PIPE)
# TODO catch keyboard interrupt here
except subprocess.CalledProcessError as e:
raise Exception(
f"""Running TS4 failed, exit code {e.returncode}
COMMAND: {cmd_str}
STDERR: {e.stderr.decode(sys.getfilesystemencoding()).strip()}
"""
) from None
if ts4_conf.monitor_script:
monitor_proc.kill()
log_path = os.path.join(ts4_conf.workdir, "log_ts4.txt")
if istep_nan := _check_nan(log_path):
raise Exception(f"TS4 diverged at step {istep_nan}")
else:
# Check convergence
istep = np.linspace(0, ts4_conf.nstep - 1, ts4_conf.nstep)
resid, inlet, outlet = _read_convergence(log_path, ts4_conf.nstep)
# Multiply by numbers of blades
Nb = [b[0].Nb for b in grid.row_blocks]
inlet[:, :2] *= Nb[0]
outlet[:, :2] *= Nb[-1]
mdot = np.stack((inlet[:, 0], outlet[:, 0]))
err = mdot[0] / np.maximum(mdot[1], 1e-9) - 1.0
if ts4_conf.plot_conv:
import matplotlib.pyplot as plt
fig, ax = plt.subplots(1, 2)
ax[0].plot(istep, np.log10(resid))
ax[1].plot(istep, err)
ax[1].set_ylim([-0.05, 0.05])
plt.tight_layout()
plt.savefig(os.path.join(ts4_conf.workdir, "conv.pdf"))
plt.close()
# output_file_path = os.path.join(ts4_conf.workdir, "output_ts4.hdf5")
_read_flow(grid, output_file_path, output_avg_file_path)
# Restore the sign of inlet pitch angle
inpatch.Beta *= -1.0