Source code for particles

CG module for managing particle type initial conditions.
from collections import OrderedDict, defaultdict
from pathlib import Path

import h5py
import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline
from unyt import uconcatenate, unyt_array

from cluster_generator.utils import ensure_list, ensure_ytarray, mylog

gadget_fields = {
    "dm": ["Coordinates", "Velocities", "Masses", "ParticleIDs", "Potential"],
    "gas": [
    "star": ["Coordinates", "Velocities", "Masses", "ParticleIDs", "Potential"],
    "black_hole": ["Coordinates", "Velocities", "Masses", "ParticleIDs"],
    "tracer": ["Coordinates"],

gadget_field_map = {
    "Coordinates": "particle_position",
    "Velocities": "particle_velocity",
    "Masses": "particle_mass",
    "Density": "density",
    "Potential": "potential_energy",
    "InternalEnergy": "thermal_energy",
    "MagneticField": "magnetic_field",

gadget_field_units = {
    "Coordinates": "kpc",
    "Velocities": "km/s",
    "Masses": "1e10*Msun",
    "Density": "1e10*Msun/kpc**3",
    "InternalEnergy": "km**2/s**2",
    "Potential": "km**2/s**2",
    "PassiveScalars": "",
    "MagneticField": "1e5*sqrt(Msun)*km/s/(kpc**1.5)",

ptype_map = OrderedDict(
        ("PartType0", "gas"),
        ("PartType1", "dm"),
        ("PartType2", "tracer"),
        ("PartType4", "star"),
        ("PartType5", "black_hole"),

rptype_map = OrderedDict([(v, k) for k, v in ptype_map.items()])

[docs] class ClusterParticles: """Container class for particle initial conditions"""
[docs] def __init__(self, particle_types, fields): self.particle_types = ensure_list(particle_types) self.fields = fields self._update_num_particles() self._update_field_names() self.passive_scalars = []
def __getitem__(self, key): return self.fields[key] def __setitem__(self, key, value): self.fields[key] = value
[docs] def keys(self): return self.fields.keys()
def _update_num_particles(self): self.num_particles = {} for ptype in self.particle_types: self.num_particles[ptype] = self.fields[ptype, "particle_mass"].size def _update_field_names(self): self.field_names = defaultdict(list) for field in self.fields: self.field_names[field[0]].append(field[1]) def _clip_to_box(self, ptype, box_size): pos = self.fields[ptype, "particle_position"] return ~np.logical_or((pos < 0.0).any(axis=1), (pos > box_size).any(axis=1)) def __add__(self, other): fields = self.fields.copy() for field in other.fields: if field in fields: fields[field] = uconcatenate([self[field], other[field]]) else: fields[field] = other[field] particle_types = list(set(self.particle_types + other.particle_types)) return ClusterParticles(particle_types, fields) @property def num_passive_scalars(self): return len(self.passive_scalars)
[docs] def drop_ptypes(self, ptypes): """ Drop all particles with a type in *ptypes*. """ ptypes = ensure_list(ptypes) for ptype in ptypes: self.particle_types.remove(ptype) names = list(self.fields.keys()) for name in names: if name[0] in ptypes: self.fields.pop(name) self._update_num_particles() self._update_field_names()
[docs] def make_radial_cut(self, r_max, center=None, ptypes=None): """ Make a radial cut on particles. All particles outside a certain radius will be removed. Parameters ---------- r_max : float The maximum radius of the particles in kpc. center : array-like, optional The center coordinate of the system of particles to define the radius from, in units of kpc. Default: [0.0, 0.0, 0.0] ptypes : list of strings, optional The particle types to perform the radial cut on. If not set, all will be exported. """ rm2 = r_max * r_max if center is None: center = np.array([0.0] * 3) if ptypes is None: ptypes = self.particle_types ptypes = ensure_list(ptypes) for part in ptypes: cidx = ((self[part, "particle_position"].d - center) ** 2).sum( axis=1 ) <= rm2 for field in self.field_names[part]: self.fields[part, field] = self.fields[part, field][cidx] self._update_num_particles()
[docs] def add_black_hole(self, bh_mass, pos=None, vel=None, use_pot_min=False): r""" Add a black hole particle to the set of cluster particles. Parameters ---------- bh_mass : float The mass of the black hole particle in solar masses. pos : array-like, optional The position of the particle, assumed to be in units of kpc if units are not given. If use_pot_min=True this argument is ignored. Default: None, in which case the particle position is [0.0, 0.0, 0.0] kpc. vel : array-like, optional The velocity of the particle, assumed to be in units of kpc/Myr if units are not given. If use_pot_min=True this argument is ignored. Default: None, in which case the particle velocity is [0.0, 0.0, 0.0] kpc/Myr. use_pot_min : boolean, optional If True, use the dark matter particle with the minimum value of the gravitational potential to determine the position and velocity of the black hole particle. Default: False """ mass = unyt_array([bh_mass], "Msun") if use_pot_min: if ("dm", "potential_energy") not in self.fields: raise KeyError("('dm', 'potential_energy') is not available!") idx = np.argmin(self.fields["dm", "potential_energy"]) pos = unyt_array(self.fields["dm", "particle_position"][idx]).reshape(1, 3) vel = unyt_array(self.fields["dm", "particle_velocity"][idx]).reshape(1, 3) else: if pos is None: pos = unyt_array(np.zeros((1, 3)), "kpc") if vel is None: vel = unyt_array(np.zeros((1, 3)), "kpc/Myr") pos = ensure_ytarray(pos, "kpc").reshape(1, 3) vel = ensure_ytarray(vel, "kpc/Myr").reshape(1, 3) if "black_hole" not in self.particle_types: self.particle_types.append("black_hole") self.fields["black_hole", "particle_position"] = pos self.fields["black_hole", "particle_velocity"] = vel self.fields["black_hole", "particle_mass"] = mass else: uappend = lambda x, y: unyt_array(np.append(x, y, axis=0).v, x.units) self.fields["black_hole", "particle_position"] = uappend( self.fields["black_hole", "particle_position"], pos ) self.fields["black_hole", "particle_velocity"] = uappend( self.fields["black_hole", "particle_velocity"], vel ) self.fields["black_hole", "particle_mass"] = uappend( self.fields["black_hole", "particle_mass"], mass ) self._update_num_particles()
[docs] @classmethod def from_fields(cls, fields): particle_types = [] for key in fields: if key[0] not in particle_types: particle_types.append(key[0]) return cls(particle_types, fields)
[docs] @classmethod def from_file(cls, filename, ptypes=None): r""" Generate cluster particles from an HDF5 file. Parameters ---------- filename : string The name of the file to read the model from. ptypes: list A list of the particle types to load. Examples -------- >>> from cluster_generator import ClusterParticles >>> dm_particles = ClusterParticles.from_file("dm_particles.h5") """ names = {} with h5py.File(filename, "r") as f: if ptypes is None: ptypes = list(f.keys()) ptypes = ensure_list(ptypes) for ptype in ptypes: names[ptype] = list(f[ptype].keys()) fields = OrderedDict() for ptype in ptypes: for field in names[ptype]: if field == "particle_index": with h5py.File(filename, "r") as f: fields[ptype, field] = f[ptype][field][:] else: a = unyt_array.from_hdf5( filename, dataset_name=field, group_name=ptype ) fields[ptype, field] = unyt_array( a.d.astype("float64"), str(a.units) ).in_base("galactic") return cls(ptypes, fields)
[docs] @classmethod def from_h5_file(cls, filename, ptypes=None): return cls.from_file(filename, ptypes=ptypes)
[docs] @classmethod def from_gadget_file(cls, filename, ptypes=None): """ Read in particle data from a Gadget (or Arepo, GIZMO, etc.) snapshot Parameters ---------- filename : string The name of the file to read from. ptypes : string or list of strings, optional The particle types to read from the file, either a single string or a list of strings. If None, all particle types will be read from the file. Examples -------- >>> from cluster_generator import ClusterParticles >>> ptypes = ["gas", "dm"] >>> particles = ClusterParticles.from_gadget_file("snapshot_060.h5", ptypes=ptypes) """ fields = OrderedDict() f = h5py.File(filename, "r") particle_types = [] if ptypes is None: ptypes = [k for k in f if k.startswith("PartType")] else: ptypes = ensure_list(ptypes) ptypes = [rptype_map[k] for k in ptypes] for ptype in ptypes: my_ptype = ptype_map[ptype] particle_types.append(my_ptype) g = f[ptype] for field in gadget_fields[my_ptype]: if field in g: if field == "ParticleIDs": fields[my_ptype, "particle_index"] = g[field][:] else: fd = gadget_field_map[field] units = gadget_field_units[field] fields[my_ptype, fd] = unyt_array( g[field], units, dtype="float64" ).in_base("galactic") if "Masses" not in g: n_ptype = g["ParticleIDs"].size units = gadget_field_units["Masses"] n_type = int(ptype[-1]) fields[my_ptype, "particle_mass"] = unyt_array( [f["Header"].attrs["MassTable"][n_type]] * n_ptype, units ).in_base("galactic") f.close() return cls(particle_types, fields)
[docs] def write_particles(self, output_filename, overwrite=False): """ Write the particles to an HDF5 file. Parameters ---------- output_filename : string The file to write the particles to. overwrite : boolean, optional Overwrite an existing file with the same name. Default False. """ if Path(output_filename).exists() and not overwrite: raise IOError( f"Cannot create {output_filename}. It exists and overwrite=False." ) with h5py.File(output_filename, "w") as f: for ptype in self.particle_types: f.create_group(ptype) for field in self.fields: if field[1] == "particle_index": with h5py.File(output_filename, "r+") as f: g = f[field[0]] g.create_dataset("particle_index", data=self.fields[field]) else: self.fields[field].write_hdf5( output_filename, dataset_name=field[1], group_name=field[0] )
[docs] def write_particles_to_h5(self, output_filename, overwrite=False): self.write_particles(output_filename, overwrite=overwrite)
[docs] def set_field( self, ptype, name, value, units=None, add=False, passive_scalar=False ): """ Add or update a particle field using a unyt_array. The array will be checked to make sure that it has the appropriate size. Parameters ---------- ptype : string The particle type of the field to add or update. name : string The name of the field to add or update. value : unyt_array The particle field itself--an array with the same shape as the number of particles. units : string, optional The units to convert the field to. Default: None, indicating the units will be preserved. add : boolean, optional If True and the field already exists, the values in the array will be added to the already existing field array. passive_scalar : boolean, optional If set, the field to be added is a passive scalar. Default: False """ if not isinstance(value, unyt_array): value = unyt_array(value, "dimensionless") num_particles = self.num_particles[ptype] exists = (ptype, name) in self.fields if value.shape[0] == num_particles: if exists: if add: self.fields[ptype, name] += value else: mylog.warning(f"Overwriting field ({ptype}, {name}).") self.fields[ptype, name] = value else: if add: raise RuntimeError( f"Field ({ptype}, {name}) does not " f"exist and add=True!" ) else: self.fields[ptype, name] = value if passive_scalar and ptype == "gas": self.passive_scalars.append(name) if units is not None: self.fields[ptype, name].convert_to_units(units) else: raise ValueError( f"The length of the array needs to be {num_particles} particles!" )
[docs] def add_offsets(self, r_ctr, v_ctr, ptypes=None): """ Add offsets in position and velocity to the cluster particles, which can be added to one or more particle types. Parameters ---------- r_ctr : array-like A 3-element list, NumPy array, or unyt_array of the coordinates of the new center of the particle distribution. If units are not given, they are assumed to be in kpc. v_ctr : array-like A 3-element list, NumPy array, or unyt_array of the coordinates of the new bulk velocity of the particle distribution. If units are not given, they are assumed to be in kpc/Myr. ptypes : string or list of strings, optional A single string or list of strings indicating the particle type(s) to be offset. Default: None, meaning all of the particle types will be offset. This should not be used in normal circumstances. """ if ptypes is None: ptypes = self.particle_types ptypes = ensure_list(ptypes) r_ctr = ensure_ytarray(r_ctr, "kpc") v_ctr = ensure_ytarray(v_ctr, "kpc/Myr") for ptype in ptypes: self.fields[ptype, "particle_position"] += r_ctr self.fields[ptype, "particle_velocity"] += v_ctr
def _write_gadget_fields(self, ptype, h5_group, idxs, dtype): for field in gadget_fields[ptype]: if field == "ParticleIDs": continue if field == "PassiveScalars" and ptype == "gas": if self.num_passive_scalars > 0: data = np.stack( [self[ptype, s].d for s in self.passive_scalars], axis=-1 ) h5_group.create_dataset("PassiveScalars", data=data) else: my_field = gadget_field_map[field] if (ptype, my_field) in self.fields: units = gadget_field_units[field] fd = self.fields[ptype, my_field] data = fd[idxs].to(units).d.astype(dtype) h5_group.create_dataset(field, data=data)
[docs] def write_to_gadget_file( self, ic_filename, box_size, dtype="float32", overwrite=False, code=None ): """ Write the particles to a file in the HDF5 Gadget format which can be used as initial conditions for a simulation. Parameters ---------- ic_filename : string The name of the file to write to. box_size : float The width of the cubical box that the initial condition will be within in units of kpc. dtype : string, optional The datatype of the fields to write, either 'float32' or 'float64'. Default: 'float32' overwrite : boolean, optional Whether to overwrite an existing file. Default: False code : string, optional If specified, additional information will be written to the file so that it can be identified by yt as belonging to a specific frontend. Default: None """ if Path(ic_filename).exists() and not overwrite: raise IOError( f"Cannot create {ic_filename}. It exists and " f"overwrite=False." ) num_particles = {} npart = 0 mass_table = np.zeros(6) f = h5py.File(ic_filename, "w") for ptype in self.particle_types: gptype = rptype_map[ptype] idxs = self._clip_to_box(ptype, box_size) num_particles[ptype] = idxs.sum() g = f.create_group(gptype) self._write_gadget_fields(ptype, g, idxs, dtype) ids = np.arange(num_particles[ptype]) + 1 + npart g.create_dataset("ParticleIDs", data=ids.astype("uint32")) npart += num_particles[ptype] if ptype in ["star", "dm", "black_hole"]: mass_table[int(rptype_map[ptype][-1])] = g["Masses"][0] f.flush() hg = f.create_group("Header") hg.attrs["Time"] = 0.0 hg.attrs["Redshift"] = 0.0 hg.attrs["BoxSize"] = box_size hg.attrs["Omega0"] = 0.0 hg.attrs["OmegaLambda"] = 0.0 hg.attrs["HubbleParam"] = 1.0 hg.attrs["NumPart_ThisFile"] = np.array( [ num_particles.get("gas", 0), num_particles.get("dm", 0), num_particles.get("tracer", 0), 0, num_particles.get("star", 0), num_particles.get("black_hole", 0), ], dtype="uint32", ) hg.attrs["NumPart_Total"] = hg.attrs["NumPart_ThisFile"] hg.attrs["NumPart_Total_HighWord"] = np.zeros(6, dtype="uint32") hg.attrs["NumFilesPerSnapshot"] = 1 hg.attrs["MassTable"] = mass_table hg.attrs["Flag_Sfr"] = 0 hg.attrs["Flag_Cooling"] = 0 hg.attrs["Flag_StellarAge"] = 0 hg.attrs["Flag_Metals"] = 0 hg.attrs["Flag_Feedback"] = 0 hg.attrs["Flag_DoublePrecision"] = 0 hg.attrs["Flag_IC_Info"] = 0 if code == "arepo": cg = f.create_group("Config") cg.attrs["VORONOI"] = 1 f.flush() f.close()
[docs] def to_yt_dataset(self, box_size, ptypes=None): """ Create an in-memory yt dataset for the particles. Parameters ---------- box_size : float The width of the domain on a side, in kpc. ptypes : string or list of strings, optional The particle types to export to the dataset. If not set, all will be exported. """ from yt import load_particles data = self.fields.copy() if ptypes is None: ptypes = self.particle_types ptypes = ensure_list(ptypes) for ptype in ptypes: pos = data.pop((ptype, "particle_position")) vel = data.pop((ptype, "particle_velocity")) for i, ax in enumerate("xyz"): data[ptype, f"particle_position_{ax}"] = pos[:, i] data[ptype, f"particle_velocity_{ax}"] = vel[:, i] return load_particles( data, length_unit="kpc", bbox=[[0.0, box_size]] * 3, mass_unit="Msun", time_unit="Myr", )
def _sample_clusters( particles, hses, center, velocity, radii=None, resample=False, passive_scalars=None ): num_halos = len(hses) center = [ensure_ytarray(c, "kpc") for c in center] velocity = [ensure_ytarray(v, "kpc/Myr") for v in velocity] r = np.zeros((num_halos, particles.num_particles["gas"])) for i, c in enumerate(center): r[i, :] = ((particles["gas", "particle_position"] - c) ** 2).sum(axis=1).d np.sqrt(r, r) if radii is None: idxs = slice(None, None, None) else: radii = np.array(radii) idxs = np.any(r <= radii[:, np.newaxis], axis=0) d = np.zeros((num_halos, particles.num_particles["gas"])) e = np.zeros((num_halos, particles.num_particles["gas"])) m = np.zeros((num_halos, 3, particles.num_particles["gas"])) num_scalars = 0 if passive_scalars is not None: num_scalars = len(passive_scalars) s = np.zeros((num_halos, num_scalars, particles.num_particles["gas"])) for i in range(num_halos): hse = hses[i] if "density" not in hse: continue get_density = InterpolatedUnivariateSpline(hse["radius"], hse["density"]) d[i, :] = get_density(r[i, :]) e_arr = 1.5 * hse["pressure"] / hse["density"] get_energy = InterpolatedUnivariateSpline(hse["radius"], e_arr) e[i, :] = get_energy(r[i, :]) * d[i, :] m[i, :, :] = velocity[i].d[:, np.newaxis] * d[i, :] if num_scalars > 0: for j, name in enumerate(passive_scalars): get_scalar = InterpolatedUnivariateSpline(hse["radius"], hse[name]) s[i, j, :] = get_scalar(r[i, :]) * d[i, :] dens = d.sum(axis=0) eint = e.sum(axis=0) / dens mom = m.sum(axis=0) / dens if num_scalars > 0: ps = s.sum(axis=0) / dens if resample: vol = particles["gas", "particle_mass"] / particles["gas", "density"] particles["gas", "particle_mass"][idxs] = dens[idxs] * vol.d[idxs] particles["gas", "density"][idxs] = dens[idxs] particles["gas", "thermal_energy"][idxs] = eint[idxs] particles["gas", "particle_velocity"][idxs] = mom.T[idxs] if num_scalars > 0: for j, name in enumerate(passive_scalars): particles["gas", name][idxs] = ps[j, idxs] return particles
[docs] def combine_two_clusters( particles1, particles2, hse1, hse2, center1, center2, velocity1, velocity2 ): """Combines 2 clusters""" center1 = ensure_ytarray(center1, "kpc") center2 = ensure_ytarray(center2, "kpc") velocity1 = ensure_ytarray(velocity1, "kpc/Myr") velocity2 = ensure_ytarray(velocity2, "kpc/Myr") if "gas" in particles1.particle_types: particles1.add_offsets(center1, [0.0] * 3, ptypes=["gas"]) if "gas" in particles2.particle_types: particles2.add_offsets(center2, [0.0] * 3, ptypes=["gas"]) ptypes1 = particles1.particle_types.copy() ptypes2 = particles2.particle_types.copy() if "gas" in ptypes1: ptypes1.remove("gas") if "gas" in ptypes2: ptypes2.remove("gas") particles1.add_offsets(center1, velocity1, ptypes=ptypes1) particles2.add_offsets(center2, velocity2, ptypes=ptypes2) particles = particles1 + particles2 if "gas" in particles.particle_types: particles = _sample_clusters( particles, [hse1, hse2], [center1, center2], [velocity1, velocity2] ) return particles
[docs] def combine_three_clusters( particles1, particles2, particles3, hse1, hse2, hse3, center1, center2, center3, velocity1, velocity2, velocity3, ): """Combine 3 clusters""" center1 = ensure_ytarray(center1, "kpc") center2 = ensure_ytarray(center2, "kpc") center3 = ensure_ytarray(center3, "kpc") velocity1 = ensure_ytarray(velocity1, "kpc/Myr") velocity2 = ensure_ytarray(velocity2, "kpc/Myr") velocity3 = ensure_ytarray(velocity3, "kpc/Myr") if "gas" in particles1.particle_types: particles1.add_offsets(center1, [0.0] * 3, ptypes=["gas"]) if "gas" in particles2.particle_types: particles2.add_offsets(center2, [0.0] * 3, ptypes=["gas"]) if "gas" in particles3.particle_types: particles3.add_offsets(center3, [0.0] * 3, ptypes=["gas"]) ptypes1 = particles1.particle_types.copy() ptypes2 = particles2.particle_types.copy() ptypes3 = particles3.particle_types.copy() if "gas" in ptypes1: ptypes1.remove("gas") if "gas" in ptypes2: ptypes2.remove("gas") if "gas" in ptypes3: ptypes3.remove("gas") particles1.add_offsets(center1, velocity1, ptypes=ptypes1) particles2.add_offsets(center2, velocity2, ptypes=ptypes2) particles3.add_offsets(center3, velocity3, ptypes=ptypes3) particles = particles1 + particles2 + particles3 if "gas" in particles.particle_types: particles = _sample_clusters( particles, [hse1, hse2, hse3], [center1, center2, center3], [velocity1, velocity2, velocity3], ) return particles
[docs] def resample_one_cluster(particles, hse, center, velocity): """Resample radial profiles onto a single cluster's particle distribution. TODO: complete docstring""" if "gas" not in particles.particle_types: return particles center = ensure_ytarray(center, "kpc") velocity = ensure_ytarray(velocity, "kpc/Myr") r = ((particles["gas", "particle_position"] - center) ** 2).sum(axis=1).d np.sqrt(r, r) get_density = InterpolatedUnivariateSpline(hse["radius"], hse["density"]) dens = get_density(r) e_arr = 1.5 * hse["pressure"] / hse["density"] get_energy = InterpolatedUnivariateSpline(hse["radius"], e_arr) particles["gas", "thermal_energy"] = unyt_array(get_energy(r), "kpc**2/Myr**2") vol = particles["gas", "particle_mass"] / particles["gas", "density"] particles["gas", "particle_mass"] = unyt_array(dens * vol.d, "Msun") particles["gas", "particle_velocity"][:, :] = velocity particles["gas", "density"] = unyt_array(dens, "Msun/kpc**3") return particles
[docs] def resample_two_clusters( particles, hse1, hse2, center1, center2, velocity1, velocity2, radii, passive_scalars=None, ): """Resample 2 clusters""" particles = _sample_clusters( particles, [hse1, hse2], [center1, center2], [velocity1, velocity2], radii=radii, resample=True, passive_scalars=passive_scalars, ) return particles
[docs] def resample_three_clusters( particles, hse1, hse2, hse3, center1, center2, center3, velocity1, velocity2, velocity3, radii, passive_scalars=None, ): """Resample 3 clusters""" particles = _sample_clusters( particles, [hse1, hse2, hse3], [center1, center2, center3], [velocity1, velocity2, velocity3], radii=radii, resample=True, passive_scalars=passive_scalars, ) return particles