Source code for ics

"""User facing initial conditions structures for interfacing with simulation softwares."""
import os
from numbers import Number

import numpy as np
from ruamel.yaml import YAML

from cluster_generator.model import ClusterModel
from cluster_generator.particles import (
    ClusterParticles,
    combine_three_clusters,
    combine_two_clusters,
    resample_one_cluster,
    resample_three_clusters,
    resample_two_clusters,
)
from cluster_generator.utils import ensure_list, ensure_ytarray, parse_prng


[docs] def compute_centers_for_binary(center, d, b, a=0.0): """ Given a common center and distance parameters, calculate the central positions of two clusters. First, the separation along the x-direction is determined by: sep_x = sqrt(d**2-b**2-a**2) where d is the distance between the two clusters, b is the impact parameter in the y-direction, and a is the impact parameter in the z-direction. So the resulting centers are calculated as: center1 = [center-0.5*sep_x, center-0.5*b, center-0.5*a] center2 = [center+0.5*sep_x, center+0.5*b, center+0.5*a] Parameters ---------- center : array-like The center from which the distance parameters for the two clusters will be calculated. d : float The distance between the two clusters. b : float The impact parameter in the y-direction, in kpc. a : float, optional The impact parameter in the z-direction, in kpc. Default: 0.0 """ d = np.sqrt(d * d - b * b - a * a) diff = np.array([d, b, a]) center1 = center - 0.5 * diff center2 = center + 0.5 * diff return center1, center2
[docs] class ClusterICs: """ TODO: docstring """
[docs] def __init__( self, basename, num_halos, profiles, center, velocity, num_particles=None, mag_file=None, particle_files=None, r_max=20000.0, r_max_tracer=None, ): """ Initialize the :py:class:`ics.ClusterICs` instance from a set of models. Parameters ---------- basename: str The name used in reference to output files from this instance. num_halos: int The number of halos to be included in the initial conditions system. profiles: list of :py:class:`model.ClusterModel` The cluster models to initialize as part of the simulation ICs. center: :py:class:`unyt.array.unyt_array` The centers of the profiles. velocity: :py:class:`unyt.array.unyt_array` The velocities of the clusters in the ICs. num_particles: dict of str: int, optional The number of particles to include for each species. Should be a dictionary with keys ``stars`` and ``dm`` (and ``gas`` if running an SPH code) followed by values corresponding to the number of particles to initialize in each case. mag_file: str, optional The path to a file containing the magnetic field. particle_files: str, optional The path to any files containing pre-generated particle data. r_max: float or int or :py:class:`unyt.array.quantity`, optional The maximal radius at which to truncate the entire simulation domain. r_max_tracer: float or int or :py:class:`unyt.array.quantity`, optional The maximal tracer radius. """ self.basename = basename self.num_halos = num_halos self.profiles = ensure_list(profiles) self.center = ensure_ytarray(center, "kpc") self.velocity = ensure_ytarray(velocity, "kpc/Myr") if self.num_halos == 1: self.center = self.center.reshape(1, 3) self.velocity = self.velocity.reshape(1, 3) self.mag_file = mag_file if isinstance(r_max, Number): r_max = [r_max] * num_halos self.r_max = np.array(r_max) if r_max_tracer is None: r_max_tracer = r_max if isinstance(r_max_tracer, Number): r_max_tracer = [r_max_tracer] * num_halos self.r_max_tracer = np.array(r_max_tracer) if num_particles is None: self.tot_np = {"dm": 0, "gas": 0, "star": 0, "tracer": 0} else: self.tot_np = num_particles self._determine_num_particles() self.particle_files = [None] * 3 if particle_files is not None: self.particle_files[:num_halos] = particle_files[:]
def _determine_num_particles(self): from collections import defaultdict dm_masses = [] gas_masses = [] star_masses = [] tracer_masses = [] for i, pf in enumerate(self.profiles): p = ClusterModel.from_h5_file(pf) idxs = p["radius"] < self.r_max[i] dm_masses.append(p["dark_matter_mass"][idxs][-1].value) if "gas_mass" in p: gmass = p["gas_mass"][idxs][-1].value else: gmass = 0.0 gas_masses.append(gmass) if "stellar_mass" in p: smass = p["stellar_mass"][idxs][-1].value else: smass = 0.0 star_masses.append(smass) if self.tot_np.get("tracer", 0) > 0: idxst = p["radius"] < self.r_max_tracer[i] tmass = p["gas_mass"][idxst][-1].value else: tmass = 0.0 tracer_masses.append(tmass) tot_dm_mass = np.sum(dm_masses) tot_gas_mass = np.sum(gas_masses) tot_star_mass = np.sum(star_masses) tot_tracer_mass = np.sum(tracer_masses) self.num_particles = defaultdict(list) for i in range(self.num_halos): if self.tot_np.get("dm", 0) > 0: ndp = np.rint(self.tot_np["dm"] * dm_masses[i] / tot_dm_mass).astype( "int" ) else: ndp = 0 self.num_particles["dm"].append(ndp) if self.tot_np.get("gas", 0) > 0: ngp = np.rint(self.tot_np["gas"] * gas_masses[i] / tot_gas_mass).astype( "int" ) else: ngp = 0 self.num_particles["gas"].append(ngp) if self.tot_np.get("star", 0) > 0: nsp = np.rint( self.tot_np["star"] * star_masses[i] / tot_star_mass ).astype("int") else: nsp = 0 self.num_particles["star"].append(nsp) if self.tot_np.get("tracer", 0) > 0: ntp = np.rint( self.tot_np["tracer"] * tracer_masses[i] / tot_tracer_mass ).astype("int") else: ntp = 0 self.num_particles["tracer"].append(ntp) def _generate_particles(self, regenerate_particles=False, prng=None): prng = parse_prng(prng) parts = [] for i, pf in enumerate(self.profiles): if regenerate_particles or self.particle_files[i] is None: m = ClusterModel.from_h5_file(pf) p = m.generate_dm_particles( self.num_particles["dm"][i], r_max=self.r_max[i], prng=prng ) if self.num_particles["star"][i] > 0: sp = m.generate_star_particles( self.num_particles["star"][i], r_max=self.r_max[i], prng=prng ) p = p + sp if self.num_particles["gas"][i] > 0: gp = m.generate_gas_particles( self.num_particles["gas"][i], r_max=self.r_max[i], prng=prng ) p = p + gp if self.num_particles["tracer"][i] > 0: tp = m.generate_tracer_particles( self.num_particles["tracer"][i], r_max=self.r_max_tracer[i], prng=prng, ) p = p + tp parts.append(p) outfile = f"{self.basename}_{i}_particles.h5" p.write_particles(outfile, overwrite=True) self.particle_files[i] = outfile else: p = ClusterParticles.from_file(self.particle_files[i]) parts.append(p) return parts
[docs] def to_file(self, filename, overwrite=False): r""" Write the initial conditions information to a file. Parameters ---------- filename : string The file to write the initial conditions information to. overwrite : boolean, optional If True, overwrite a file with the same name. Default: False """ if os.path.exists(filename) and not overwrite: raise RuntimeError(f"{filename} exists and overwrite=False!") from ruamel.yaml.comments import CommentedMap out = CommentedMap() out["basename"] = self.basename out.yaml_add_eol_comment("base name for ICs", key="basename") out["num_halos"] = self.num_halos out.yaml_add_eol_comment("number of halos", key="num_halos") out["profile1"] = self.profiles[0] out.yaml_add_eol_comment("profile for cluster 1", key="profile1") out["center1"] = self.center[0].tolist() out.yaml_add_eol_comment("center for cluster 1", key="center1") out["velocity1"] = self.velocity[0].tolist() out.yaml_add_eol_comment("velocity for cluster 1", key="velocity1") if self.particle_files[0] is not None: out["particle_file1"] = self.particle_files[0] out.yaml_add_eol_comment( "particle file for cluster 1", key="particle_file1" ) if self.num_halos > 1: out["profile2"] = self.profiles[1] out.yaml_add_eol_comment("profile for cluster 2", key="profile2") out["center2"] = self.center[1].tolist() out.yaml_add_eol_comment("center for cluster 2", key="center2") out["velocity2"] = self.velocity[1].tolist() out.yaml_add_eol_comment("velocity for cluster 2", key="velocity2") if self.particle_files[1] is not None: out["particle_file2"] = self.particle_files[1] out.yaml_add_eol_comment( "particle file for cluster 2", key="particle_file2" ) if self.num_halos == 3: out["profile3"] = self.profiles[2] out.yaml_add_eol_comment("profile for cluster 3", key="profile3") out["center3"] = self.center[2].tolist() out.yaml_add_eol_comment("center for cluster 3", key="center3") out["velocity3"] = self.velocity[2].tolist() out.yaml_add_eol_comment("velocity for cluster 3", key="velocity3") if self.particle_files[2] is not None: out["particle_file3"] = self.particle_files[2] out.yaml_add_eol_comment( "particle file for cluster 3", key="particle_file3" ) if self.tot_np.get("dm", 0) > 0: out["num_dm_particles"] = self.tot_np["dm"] out.yaml_add_eol_comment("number of DM particles", key="num_dm_particles") if self.tot_np.get("gas", 0) > 0: out["num_gas_particles"] = self.tot_np["gas"] out.yaml_add_eol_comment("number of gas particles", key="num_gas_particles") if self.tot_np.get("star", 0) > 0: out["num_star_particles"] = self.tot_np["star"] out.yaml_add_eol_comment( "number of star particles", key="num_star_particles" ) if self.tot_np.get("tracer", 0) > 0: out["num_tracer_particles"] = self.tot_np["tracer"] out.yaml_add_eol_comment( "number of tracer particles", key="num_tracer_particles" ) if self.mag_file is not None: out["mag_file"] = self.mag_file out.yaml_add_eol_comment("3D magnetic field file", key="mag_file") out["r_max"] = self.r_max.tolist() out.yaml_add_eol_comment("Maximum radii of particles", key="r_max") if self.tot_np.get("tracer", 0) > 0: out["r_max_tracer"] = self.r_max_tracer.tolist() out.yaml_add_eol_comment("Maximum radii of tracer particles", key="r_max") yaml = YAML() with open(filename, "w") as f: yaml.dump(out, f)
[docs] @classmethod def from_file(cls, filename): r""" Read the initial conditions information from a YAML-formatted `filename`. """ from ruamel.yaml import YAML yaml = YAML() with open(filename, "r") as f: params = yaml.load(f) basename = params["basename"] num_halos = params["num_halos"] profiles = [params[f"profile{i}"] for i in range(1, num_halos + 1)] center = [np.array(params[f"center{i}"]) for i in range(1, num_halos + 1)] velocity = [np.array(params[f"velocity{i}"]) for i in range(1, num_halos + 1)] num_particles = { k: params.get(f"num_{k}_particles", 0) for k in ["gas", "dm", "star"] } mag_file = params.get("mag_file", None) particle_files = [ params.get(f"particle_file{i}", None) for i in range(1, num_halos + 1) ] r_max = params.get("r_max", 20000.0) r_max_tracer = params.get("r_max_tracer", r_max) return cls( basename, num_halos, profiles, center, velocity, num_particles=num_particles, mag_file=mag_file, particle_files=particle_files, r_max=r_max, r_max_tracer=r_max_tracer, )
[docs] def setup_particle_ics(self, regenerate_particles=False, prng=None): r""" From a set of cluster models and their relative positions and velocities, set up initial conditions for use with SPH codes. This routine will either generate a single cluster or will combine two or three clusters together. If more than one cluster is generated, the gas particles will have their densities set by adding the densities from the overlap of the two particles together, and will have their thermal energies and velocities set by mass-weighting them from the two profiles. Parameters ---------- regenerate_particles: bool True to regenerate the particles prng: Random number initializer. """ profiles = [ClusterModel.from_h5_file(hf) for hf in self.profiles] parts = self._generate_particles( regenerate_particles=regenerate_particles, prng=prng ) if self.num_halos == 1: all_parts = parts[0] all_parts.add_offsets(self.center[0], self.velocity[0]) elif self.num_halos == 2: all_parts = combine_two_clusters( parts[0], parts[1], profiles[0], profiles[1], self.center[0], self.center[1], self.velocity[0], self.velocity[1], ) else: all_parts = combine_three_clusters( parts[0], parts[1], parts[2], profiles[0], profiles[1], profiles[2], self.center[0], self.center[1], self.center[2], self.velocity[0], self.velocity[1], self.velocity[2], ) return all_parts
[docs] def resample_particle_ics(self, parts, passive_scalars=None): r""" Given a Gadget-HDF5-like initial conditions file which has been output from some type of relaxation process (such as making a glass or using MESHRELAX in the case of Arepo), resample the density, thermal energy, and velocity fields onto the gas particles/cells from the initial hydrostatic profiles. Parameters ---------- parts: TODO: passive_scalars: TODO: """ profiles = [ClusterModel.from_h5_file(hf) for hf in self.profiles] if self.num_halos == 1: new_parts = resample_one_cluster( parts, profiles[0], self.center[0], self.velocity[0] ) elif self.num_halos == 2: new_parts = resample_two_clusters( parts, profiles[0], profiles[1], self.center[0], self.center[1], self.velocity[0], self.velocity[1], self.r_max, passive_scalars=passive_scalars, ) else: new_parts = resample_three_clusters( parts, profiles[0], profiles[1], profiles[2], self.center[0], self.center[1], self.center[2], self.velocity[0], self.velocity[1], self.velocity[2], self.r_max, passive_scalars=passive_scalars, ) return new_parts
[docs] def create_dataset(self, domain_dimensions, box_size, left_edge=None, **kwargs): """ Create an in-memory, uniformly gridded dataset in 3D using yt by placing the clusters into a box. When adding multiple clusters, per-volume quantities from each cluster such as density and pressure are added, whereas per-mass quantities such as temperature and velocity are mass-weighted. Parameters ---------- domain_dimensions : 3-tuple of ints The number of cells on a side for the domain. box_size : float The size of the box in kpc. left_edge : array_like, optional The minimum coordinate of the box in all three dimensions, in kpc. Default: None, which means the left edge will be [0, 0, 0]. """ from scipy.interpolate import InterpolatedUnivariateSpline from yt.loaders import load_uniform_grid if left_edge is None: left_edge = np.zeros(3) left_edge = np.array(left_edge) bbox = [ [left_edge[0], left_edge[0] + box_size], [left_edge[1], left_edge[1] + box_size], [left_edge[2], left_edge[2] + box_size], ] x, y, z = np.mgrid[ bbox[0][0] : bbox[0][1] : domain_dimensions[0] * 1j, bbox[1][0] : bbox[1][1] : domain_dimensions[1] * 1j, bbox[2][0] : bbox[2][1] : domain_dimensions[2] * 1j, ] fields1 = [ "density", "pressure", "dark_matter_density" "stellar_density", "gravitational_potential", ] fields2 = ["temperature"] fields3 = ["velocity_x", "velocity_y", "velocity_z"] units = { "density": "Msun/kpc**3", "pressure": "Msun/kpc/Myr**2", "dark_matter_density": "Msun/kpc**3", "stellar_density": "Msun/kpc**3", "temperature": "K", "gravitational_potential": "kpc**2/Myr**2", "velocity_x": "kpc/Myr", "velocity_y": "kpc/Myr", "velocity_z": "kpc/Myr", "magnetic_field_strength": "G", } fields = fields1 + fields2 data = {} for i, profile in enumerate(self.profiles): p = ClusterModel.from_h5_file(profile) xx = x - self.center.d[i][0] yy = y - self.center.d[i][1] zz = z - self.center.d[i][2] rr = np.sqrt(xx * xx + yy * yy + zz * zz) fd = InterpolatedUnivariateSpline(p["radius"].d, p["density"].d) for field in fields: if field not in p: continue if field not in data: data[field] = (np.zeros(domain_dimensions), units[field]) f = InterpolatedUnivariateSpline(p["radius"].d, p[field].d) if field in fields1: data[field][0] += f(rr) elif field in fields2: data[field][0] += f(rr) * fd(rr) for field in fields3: data[field][0] += self.velocity.d[i][0] * fd(rr) if "density" in data: for field in fields2 + fields3: data[field][0] /= data["density"][0] return load_uniform_grid( data, domain_dimensions, length_unit="kpc", bbox=bbox, mass_unit="Msun", time_unit="Myr", **kwargs, )