"""
CG Core module containing classes and method related to the :py:class:`model.ClusterModel` class for representing
physically self-consistent galaxy cluster models.
"""
import os
from collections import OrderedDict
import h5py
import numpy as np
import yaml
from scipy.integrate import cumtrapz, quad
from scipy.interpolate import InterpolatedUnivariateSpline
from unyt import unyt_array, unyt_quantity
from cluster_generator.particles import ClusterParticles
from cluster_generator.utils import (
G,
_closest_factors,
_enforce_style,
ensure_ytquantity,
generate_particle_radii,
integrate,
integrate_mass,
kpc_to_cm,
mp,
mu,
mue,
mylog,
)
from cluster_generator.virial import VirialEquilibrium
tt = 2.0 / 3.0
mtt = -tt
ft = 5.0 / 3.0
tf = 3.0 / 5.0
mtf = -tf
gamma = ft
et = 8.0 / 3.0
te = 3.0 / 8.0
[docs]
class ClusterModel:
r"""
The :py:class:`model.ClusterModel` class is a comprehensive representation of the cluster being modeled and can be used to generate
accurate initial conditions. The class is predicated on a fixed number of sample radii in the cluster.
Parameters
----------
fields: dict of str: :py:class:`unyt.array.unyt_array`
The fields to attribute to the :py:class:`model.ClusterModel`.
properties: dict
Additional properties to pass into the :py:class:`model.ClusterModel` instance. These are found in the :py:attr:`model.ClusterModel.properties` attribute. See
the other parameters section for details on the available properties.
dm_virial: VirialEquilibrium, optional
The virialization class to use for dark matter virialization. By default, it is None and will be set when necessary.
star_virial: VirialEquilibrium, optional
The virialization class to use for stellar matter virialization. By default, it is None and will be set when necessary.
Notes
-----
- ``__getitem__`` and ``__contains__`` are aliased down to ``self.fields``. There is no ``__setitem__``, so index
/ key assignment cannot be done. use ``ClusterModel.set_field()`` instead.
"""
#: The default included fields that can be accessed.
default_fields = [
"density",
"temperature",
"pressure",
"total_density",
"gravitational_potential",
"gravitational_field",
"total_mass",
"gas_mass",
"dark_matter_mass",
"dark_matter_density",
"stellar_density",
"stellar_mass",
]
_keep_units = ["entropy", "electron_number_density", "magnetic_field_strength"]
[docs]
def __init__(self, fields, properties=None, dm_virial=None, star_virial=None):
"""Initialize the :py:class:`model.ClusterModel` instance."""
#: The ``fields`` associated with the ``Potential`` object.
self.fields = fields
#: The number of elements in each ``field`` array.
self.num_elements = len(self.fields["radius"])
if not properties:
self._properties = {}
else:
self._properties = properties
if "meth" not in self._properties:
self._properties["meth"] = {}
self._dm_virial = dm_virial
self._star_virial = star_virial
def __contains__(self, item):
return item in self.fields
def __getitem__(self, item):
return self.fields[item]
def __setitem__(self, key, value):
self.fields[key] = value
[docs]
def keys(self):
"""Equivalent to ``self.fields.keys()``"""
return self.fields.keys()
[docs]
def items(self):
"""Equivalent to ``self.fields.items()``"""
return self.fields.items()
[docs]
def values(self):
"""Equivalent to ``self.fields.values()``"""
return self.fields.values()
@property
def dm_virial(self):
"""
The :py:class:`virial.VirialEquilibrium` instance associated with the dark matter virialization process.
"""
if self._dm_virial is None:
self._dm_virial = VirialEquilibrium(self, "dark_matter")
return self._dm_virial
@property
def star_virial(self):
"""
The :py:class:`virial.VirialEquilibrium` instance associated with the stellar virialization process.
"""
if self._star_virial is None and "stellar_density" in self:
self._star_virial = VirialEquilibrium(self, "stellar")
return self._star_virial
@property
def properties(self):
"""
The properties of the :py:class:`model.ClusterModel` instance. When passed to the :py:class:`model.ClusterModel` object,
the structure should be as a dictionary with the keys listed below and structure described therein. All of these are optional.
Parameters
----------
gravity: dict or str
If a string is passed, it should be a gravity name corresponding to the desired gravity theory. All other properties
of the gravitational theory will be taken as the standard default. If the value is another dictionary, the name of
the preferred gravitational theory should be associated with the key ``'name'``. Further parameters should be supplied
as they are described in the documentation for the corresponding gravitational theory.
Notes
-----
In addition to the user specified properties, additional properties may be found in the ``'meth'`` key of the attribute.
These are specified behind the scenes and dictate the behavior of certain backend behaviors. It is ill-advised to alter
these properties without a thorough understanding of what they do.
"""
return self._properties
[docs]
@classmethod
def from_arrays(cls, fields, stellar_density=None, **kwargs):
r"""
Initialize the :py:class:`model.ClusterModel` from ``fields`` alone.
Parameters
----------
fields: dict of str: :py:class:`unyt.array.unyt_array`
The fields from which to generate the model.
.. admonition:: development note
The ``fields`` parameter must be self-consistent in its definition, and must contain a ``radius`` key, from
which the class assesses the number of elements.
stellar_density: radial_profiles.RadialProfile, optional
The stellar density component.
kwargs:
Additional attributes to pass through to the :py:attr:`model.ClusterModel.properties` attribute.
Returns
-------
ClusterModel
See Also
--------
:py:meth:`~model.ClusterModel.from_dens_and_tden`,:py:meth:`~model.ClusterModel.from_dens_and_temp`,:py:meth:`~model.ClusterModel.from_dens_and_entr`,:py:meth:`~model.ClusterModel.from_h5_file`
"""
kwargs = _force_method(kwargs, "from_arrays")
kwargs["meth"]["profiles"] = {}
if stellar_density is not None:
kwargs["meth"]["profiles"]["stellar_density"] = stellar_density
fields["stellar_density"] = stellar_density(fields["radius"].d)
else:
pass
return cls(fields, properties=kwargs)
[docs]
@classmethod
def from_h5_file(cls, filename, r_min=None, r_max=None):
r"""
Initialize a :py:class:`model.ClusterModel` instance from HDF5 file.
Parameters
----------
filename : str
The name of the file to read the model from.
r_min: float, optional
The minimum radius
r_max: float, optional
The maximal radius
Notes
-----
.. attention::
The :py:meth:`model.ClusterModel.from_h5_file` method will seek both ``filename`` and ``filename.properties``. If
the later cannot be found, the :py:class:`model.ClusterModel` will lose its attributes and issue a warning.
Examples
--------
>>> from cluster_generator import ClusterModel
>>> hse_model = ClusterModel.from_h5_file("hse_model.h5")
"""
import dill as pickle
from cluster_generator.virial import VirialEquilibrium
with h5py.File(filename, "r") as f:
fnames = list(f["fields"].keys())
get_dm_virial = "dm_df" in f
get_star_virial = "star_df" in f
properties_path = f"{filename}.properties"
try:
with open(properties_path, "rb") as fpkl:
attrs = pickle.load(fpkl)
except FileNotFoundError:
mylog.warning(f"Failed to load properties file {properties_path}.")
attrs = {}
except SystemError:
mylog.warning("Loading pickled file from unmatched python version.")
attrs = {}
fields = OrderedDict()
for field in fnames:
a = unyt_array.from_hdf5(filename, dataset_name=field, group_name="fields")
fields[field] = unyt_array(a.d, str(a.units))
if field not in cls._keep_units:
fields[field].convert_to_base("galactic")
if r_min is None:
r_min = 0.0
if r_max is None:
r_max = fields["radius"][-1].d * 2
mask = np.logical_and(fields["radius"].d >= r_min, fields["radius"].d <= r_max)
for field in fnames:
fields[field] = fields[field][mask]
model = cls(fields, properties=attrs)
if get_dm_virial:
mask = np.logical_and(
fields["radius"].d >= r_min, fields["radius"].d <= r_max
)
df = unyt_array.from_hdf5(filename, dataset_name="dm_df")[mask]
model._dm_virial = VirialEquilibrium(model, ptype="dark_matter", df=df)
if get_star_virial:
mask = np.logical_and(
fields["radius"].d >= r_min, fields["radius"].d <= r_max
)
df = unyt_array.from_hdf5(filename, dataset_name="star_df")[mask]
model._star_virial = VirialEquilibrium(model, ptype="stellar", df=df)
return model
@classmethod
def _from_scratch(cls, fields, stellar_density=None, **kwargs):
"""Load the cluster instance from total density, total mass, and radius."""
rr = fields["radius"].d
mylog.info("Integrating gravitational potential profile.")
tdens_func = InterpolatedUnivariateSpline(rr, fields["total_density"].d)
gpot_profile = lambda r: tdens_func(r) * r
gpot1 = fields["total_mass"] / fields["radius"]
gpot2 = unyt_array(4.0 * np.pi * integrate(gpot_profile, rr), "Msun/kpc")
fields["gravitational_potential"] = -G * (gpot1 + gpot2)
fields["gravitational_potential"].convert_to_units("kpc**2/Myr**2")
if "density" in fields and "gas_mass" not in fields:
mylog.info("Integrating gas mass profile.")
m0 = fields["density"].d[0] * rr[0] ** 3 / 3.0
fields["gas_mass"] = unyt_array(
4.0 * np.pi * cumtrapz(fields["density"] * rr * rr, x=rr, initial=0.0)
+ m0,
"Msun",
)
if stellar_density is not None:
fields["stellar_density"] = unyt_array(stellar_density(rr), "Msun/kpc**3")
mylog.info("Integrating stellar mass profile.")
fields["stellar_mass"] = unyt_array(
integrate_mass(stellar_density, rr), "Msun"
)
mdm = fields["total_mass"].copy()
ddm = fields["total_density"].copy()
if "density" in fields:
mdm -= fields["gas_mass"]
ddm -= fields["density"]
if "stellar_mass" in fields:
mdm -= fields["stellar_mass"]
ddm -= fields["stellar_density"]
fields["dark_matter_density"] = ddm
fields["dark_matter_mass"] = mdm
if "density" in fields:
fields["gas_fraction"] = fields["gas_mass"] / fields["total_mass"]
fields["electron_number_density"] = fields["density"].to(
"cm**-3", "number_density", mu=mue
)
fields["entropy"] = (
fields["temperature"] * fields["electron_number_density"] ** mtt
)
return cls(fields, properties=kwargs)
[docs]
def set_rmax(self, r_max):
"""
Truncate the model at a specified maximal radius.
Parameters
----------
r_max: float
The maximal radius at which to truncate.
Returns
-------
ClusterModel
A new :py:class:`model.ClusterModel` instance with a truncated maximal radius.
"""
mask = self.fields["radius"].d <= r_max
fields = {}
for field in self.fields:
fields[field] = self.fields[field][mask]
num_elements = mask.sum()
return ClusterModel(
num_elements, fields, dm_virial=self.dm_virial, star_virial=self.star_virial
)
[docs]
def write_model_to_ascii(self, output_filename, in_cgs=False, overwrite=False):
r"""
Write the equilibrium model to an ascii text file. Uses
AstroPy's `QTable` to write the file, so that units are
included.
Parameters
----------
output_filename : str
The file to write the model to.
in_cgs : bool, optional
Whether to convert the units to cgs before writing. Default: False.
overwrite : bool, optional
Overwrite an existing file with the same name. Default: False.
"""
from astropy.table import QTable
fields = {}
for k, v in self.fields.items():
if in_cgs:
if k == "temperature":
fd = v.to_equivalent("K", "thermal")
elif k not in self._keep_units:
fd = v.in_cgs()
else:
fd = v
else:
fd = v
fields[k] = fd.to_astropy()
t = QTable(fields)
t.meta["comments"] = f"unit_system={'cgs' if in_cgs else 'galactic'}"
t.write(output_filename, overwrite=overwrite)
[docs]
def write_model_to_h5(
self, output_filename, in_cgs=False, r_min=None, r_max=None, overwrite=False
):
r"""
Write the equilibrium model to an HDF5 file.
Parameters
----------
output_filename : str
The file to write the model to.
in_cgs : bool, optional
Whether to convert the units to cgs before writing. Default False.
overwrite : bool, optional
Overwrite an existing file with the same name. Default False.
r_min: float, optional
The minimum radius to write too.
r_max: float, optional
The maximum radius to write too.
"""
if os.path.exists(output_filename) and not overwrite:
raise IOError(
f"Cannot create {output_filename}. It exists and " f"overwrite=False."
)
f = h5py.File(output_filename, "w")
f.create_dataset("num_elements", data=self.num_elements)
f.close()
self.properties["meth"]["in_cgs"] = in_cgs
self._write_model_attrs(output_filename)
if r_min is None:
r_min = 0.0
if r_max is None:
r_max = self.fields["radius"][-1].d * 2
mask = np.logical_and(
self.fields["radius"].d >= r_min, self.fields["radius"].d <= r_max
)
for k, v in self.fields.items():
if in_cgs:
if k == "temperature":
fd = v[mask].to_equivalent("K", "thermal")
elif k not in self._keep_units:
fd = v[mask].in_cgs()
else:
fd = v[mask]
else:
fd = v[mask]
fd.write_hdf5(output_filename, dataset_name=k, group_name="fields")
if getattr(self, "_dm_virial", None):
fd = self.dm_virial.df
fd.write_hdf5(output_filename, dataset_name="dm_df")
if getattr(self, "_star_virial", None):
fd = self.star_virial.df
fd.write_hdf5(output_filename, dataset_name="star_df")
def _write_model_attrs(self, output_filename):
"""Write the model attributes to file."""
import dill as pickle
with open(f"{output_filename}.properties", "wb") as atf:
pickle.dump(self.properties, atf)
[docs]
def write_model_to_binary(
self,
output_filename,
fields_to_write=None,
in_cgs=False,
r_min=None,
r_max=None,
overwrite=False,
):
"""
Write the model to unformatted Fortran binary.
.. attention::
But why would you want Fortran binary??
.. warning::
This procedure will lose all of the metadata from the ``self.gravity`` attribute and the ``self.attrs``
dictionary. As such, this should only be used in cases where the user can specify those properties manually
upon reopening the file.
Parameters
----------
output_filename: str
The output filename.
fields_to_write: list
The list of field names to write.
in_cgs: bool
If ``True``, will write in cgs.
r_min: float
The minimum radius
r_max: float
The maximum radius
overwrite: bool
``True`` will overwrite any existing file with the same path.
Returns
-------
None
"""
if fields_to_write is None:
fields_to_write = list(self.fields.keys())
from scipy.io import FortranFile
if os.path.exists(output_filename) and not overwrite:
raise IOError(
f"Cannot create {output_filename}. It exists and " f"overwrite=False."
)
if r_min is None:
r_min = 0.0
if r_max is None:
r_max = self.fields["radius"][-1].d * 2
mask = np.logical_and(
self.fields["radius"].d >= r_min, self.fields["radius"].d <= r_max
)
with FortranFile(output_filename, "w") as f:
f.write_record(self.fields["radius"][mask].size)
prof_rec = []
for k in fields_to_write:
v = self.fields[k]
if in_cgs:
if k == "temperature":
fd = v[mask].to_equivalent("K", "thermal")
elif k not in self._keep_units:
fd = v[mask].in_cgs()
else:
fd = v[mask]
else:
fd = v[mask]
prof_rec.append(fd)
f.write_record(np.array(prof_rec).T)
[docs]
def set_field(self, name, value):
r"""
Set a field with name `name` to value `value`, which is an `unyt_array`.
The array will be checked to make sure that it has the appropriate size.
"""
if not isinstance(value, unyt_array):
raise TypeError("value needs to be an unyt_array")
if value.size == self.num_elements:
if name in self.fields:
mylog.warning("Overwriting field %s." % name)
self.fields[name] = value
else:
raise ValueError(
f"The length of the array needs to be " f"{self.num_elements} elements!"
)
[docs]
@classmethod
def from_dens_and_temp(
cls,
rmin,
rmax,
density,
temperature,
stellar_density=None,
num_points=1000,
**kwargs,
):
"""
Construct a hydrostatic equilibrium model using gas density
and temperature profiles.
Parameters
----------
rmin : float
Minimum radius of profiles in kpc.
rmax : float
Maximum radius of profiles in kpc.
density : :class:`~cluster_generator.radial_profiles.RadialProfile`
A radial profile describing the gas mass density.
temperature : :class:`~cluster_generator.radial_profiles.RadialProfile`
A radial profile describing the gas temperature.
stellar_density : :class:`~cluster_generator.radial_profiles.RadialProfile`, optional
A radial profile describing the stellar mass density, if desired.
num_points : integer, optional
The number of points the profiles are evaluated at.
"""
kwargs = _force_method(kwargs, "from_dens_and_temp")
kwargs["meth"]["profiles"] = {
"temperature": temperature,
"density": density,
"stellar_density": stellar_density,
}
mylog.info("Computing the profiles from density and temperature.")
rr = np.logspace(np.log10(rmin), np.log10(rmax), num_points, endpoint=True)
fields = OrderedDict()
fields["radius"] = unyt_array(rr, "kpc")
fields["density"] = unyt_array(density(rr), "Msun/kpc**3")
fields["temperature"] = unyt_array(temperature(rr), "keV")
fields["pressure"] = fields["density"] * fields["temperature"]
fields["pressure"] /= mu * mp
fields["pressure"].convert_to_units("Msun/(Myr**2*kpc)")
pressure_spline = InterpolatedUnivariateSpline(rr, fields["pressure"].d)
dPdr = unyt_array(pressure_spline(rr, 1), "Msun/(Myr**2*kpc**2)")
fields["gravitational_field"] = dPdr / fields["density"]
fields["gravitational_field"].convert_to_units("kpc/Myr**2")
fields["gas_mass"] = unyt_array(integrate_mass(density, rr), "Msun")
fields["total_mass"] = (
-fields["radius"] ** 2 * fields["gravitational_field"] / G
)
total_mass_spline = InterpolatedUnivariateSpline(rr, fields["total_mass"].v)
dMdr = unyt_array(total_mass_spline(rr, nu=1), "Msun/kpc")
fields["total_density"] = dMdr / (4.0 * np.pi * fields["radius"] ** 2)
return cls._from_scratch(fields, stellar_density=stellar_density, **kwargs)
[docs]
@classmethod
def from_dens_and_entr(
cls,
rmin,
rmax,
density,
entropy,
stellar_density=None,
num_points=1000,
**kwargs,
):
"""
Construct the model from density and entropy.
Parameters
----------
rmin: float
The minimum radius at which to compute
rmax: float
The maximum radius at which to compute.
num_points: int
The number of sample points.
density: callable
The gas density profile.
entropy: callable
The gas entropy profile.
stellar_density: callable or unyt_array
The stellar density profile.
Returns
-------
ClusterModel
"""
kwargs = _force_method(kwargs, "from_dens_and_entr")
kwargs["meth"]["profiles"] = {
"entropy": entropy,
"density": density,
"stellar_density": stellar_density,
}
n_e = density / (mue * mp * kpc_to_cm**3)
temperature = entropy * n_e**tt
return cls.from_dens_and_temp(
rmin,
rmax,
density,
temperature,
stellar_density=stellar_density,
num_points=num_points,
**kwargs,
)
[docs]
@classmethod
def from_dens_and_tden(
cls,
rmin,
rmax,
density,
total_density,
stellar_density=None,
num_points=1000,
**kwargs,
):
"""
Construct a hydrostatic equilibrium model using gas density
and total density profiles
Parameters
----------
rmin : float
Minimum radius of profiles in kpc.
rmax : float
Maximum radius of profiles in kpc.
density : :class:`~cluster_generator.radial_profiles.RadialProfile`
A radial profile describing the gas mass density.
total_density : :class:`~cluster_generator.radial_profiles.RadialProfile`
A radial profile describing the total mass density.
stellar_density : :class:`~cluster_generator.radial_profiles.RadialProfile`, optional
A radial profile describing the stellar mass density, if desired.
num_points : integer, optional
The number of points the profiles are evaluated at.
"""
kwargs = _force_method(kwargs, "from_dens_and_tden")
kwargs["meth"]["profiles"] = {
"total_density": total_density,
"density": density,
"stellar_density": stellar_density,
}
mylog.info("Computing the profiles from density and total density.")
rr = np.logspace(np.log10(rmin), np.log10(rmax), num_points, endpoint=True)
fields = OrderedDict()
fields["radius"] = unyt_array(rr, "kpc")
fields["density"] = unyt_array(density(rr), "Msun/kpc**3")
fields["total_density"] = unyt_array(total_density(rr), "Msun/kpc**3")
mylog.info("Integrating total mass profile.")
fields["total_mass"] = unyt_array(integrate_mass(total_density, rr), "Msun")
fields["gas_mass"] = unyt_array(integrate_mass(density, rr), "Msun")
fields["gravitational_field"] = (
-G * fields["total_mass"] / (fields["radius"] ** 2)
)
fields["gravitational_field"].convert_to_units("kpc/Myr**2")
g = fields["gravitational_field"].in_units("kpc/Myr**2").v
g_r = InterpolatedUnivariateSpline(rr, g)
dPdr_int = lambda r: density(r) * g_r(r)
mylog.info("Integrating pressure profile.")
P = -integrate(dPdr_int, rr)
dPdr_int2 = lambda r: density(r) * g[-1] * (rr[-1] / r) ** 2
P -= quad(dPdr_int2, rr[-1], np.inf, limit=100)[0]
fields["pressure"] = unyt_array(P, "Msun/kpc/Myr**2")
fields["temperature"] = fields["pressure"] * mu * mp / fields["density"]
fields["temperature"].convert_to_units("keV")
return cls._from_scratch(fields, stellar_density=stellar_density, **kwargs)
[docs]
@classmethod
def no_gas(
cls, rmin, rmax, total_density, stellar_density=None, num_points=1000, **kwargs
):
"""
Initialize a :py:class:`model.ClusterModel` which is composed only of collisionless species.
Parameters
----------
rmin : float
Minimum radius of profiles in kpc.
rmax : float
Maximum radius of profiles in kpc.
total_density : :class:`~cluster_generator.radial_profiles.RadialProfile`
A radial profile describing the total mass density.
stellar_density : :class:`~cluster_generator.radial_profiles.RadialProfile`, optional
A radial profile describing the stellar mass density, if desired.
num_points : integer, optional
The number of points the profiles are evaluated at.
kwargs:
Additional keywords to pass to the properties of the resulting instance.
Returns
-------
:py:class:`model.ClusterModel`
"""
kwargs = _force_method(kwargs, "no_gas")
kwargs["meth"]["profiles"] = {
"total_density": total_density,
"stellar_density": stellar_density,
}
rr = np.logspace(np.log10(rmin), np.log10(rmax), num_points, endpoint=True)
fields = OrderedDict()
fields["radius"] = unyt_array(rr, "kpc")
fields["total_density"] = unyt_array(total_density(rr), "Msun/kpc**3")
mylog.info("Integrating total mass profile.")
fields["total_mass"] = unyt_array(integrate_mass(total_density, rr), "Msun")
fields["gravitational_field"] = (
-G * fields["total_mass"] / (fields["radius"] ** 2)
)
fields["gravitational_field"].convert_to_units("kpc/Myr**2")
return cls._from_scratch(fields, stellar_density=stellar_density, **kwargs)
[docs]
def find_field_at_radius(self, field, r):
"""
Find the value of a *field* in the profiles
at radius *r*.
"""
return unyt_array(np.interp(r, self["radius"], self[field]), self[field].units)
[docs]
def check_hse(self):
r"""
Determine the deviation of the model from hydrostatic equilibrium.
Returns
-------
chk : NumPy array
An array containing the relative deviation from hydrostatic
equilibrium as a function of radius.
"""
if "pressure" not in self.fields:
raise RuntimeError("This ClusterModel contains no gas!")
rr = self.fields["radius"].v
pressure_spline = InterpolatedUnivariateSpline(rr, self.fields["pressure"].v)
dPdx = unyt_array(pressure_spline(rr, 1), "Msun/(Myr**2*kpc**2)")
rhog = self.fields["density"] * self.fields["gravitational_field"]
chk = dPdx - rhog
chk /= rhog
mylog.info(
"The maximum relative deviation of this profile from "
"hydrostatic equilibrium is %g",
np.abs(chk).max(),
)
return chk
[docs]
def check_dm_virial(self):
"""Equivalent to ``self.dm_virial.check_virial``"""
return self.dm_virial.check_virial()
[docs]
def check_star_virial(self):
"""Equivalent to ``self.star_virial.check_virial``"""
if self._star_virial is None:
raise RuntimeError(
"Cannot check the virial equilibrium of "
"the stars because there are no stars in "
"this model!"
)
return self.star_virial.check_virial()
[docs]
def set_magnetic_field_from_beta(self, beta, gaussian=True):
"""
Set a magnetic field radial profile from
a plasma beta parameter, assuming beta = p_th/p_B.
The field can be set in Gaussian or Lorentz-Heaviside
(dimensionless) units.
Parameters
----------
beta : float
The ratio of the thermal pressure to the
magnetic pressure.
gaussian : boolean, optional
Set the field in Gaussian units such that
p_B = B^2/(8*pi), otherwise p_B = B^2/2.
Default: True
"""
B = np.sqrt(2.0 * self["pressure"] / beta)
if gaussian:
B *= np.sqrt(4.0 * np.pi)
B.convert_to_units("gauss")
self.set_field("magnetic_field_strength", B)
[docs]
def set_magnetic_field_from_density(self, B0, eta=2.0 / 3.0, gaussian=True):
"""
Set a magnetic field radial profile assuming it is proportional
to some power of the density, usually 2/3. The field can be set
in Gaussian or Lorentz-Heaviside (dimensionless) units.
Parameters
----------
B0 : float
The central magnetic field strength in units of
gauss.
eta : float, optional
The power of the density which the field is
proportional to. Default: 2/3.
gaussian : boolean, optional
Set the field in Gaussian units such that
p_B = B^2/(8*pi), otherwise p_B = B^2/2.
Default: True
"""
B0 = ensure_ytquantity(B0, "gauss")
B = B0 * (self["density"] / self["density"][0]) ** eta
if not gaussian:
B /= np.sqrt(4.0 * np.pi)
self.set_field("magnetic_field_strength", B)
[docs]
def generate_tracer_particles(
self, num_particles, r_max=None, sub_sample=1, prng=None
):
"""
Generate a set of tracer particles based on the gas distribution.
Parameters
----------
num_particles : integer
The number of particles to generate.
r_max : float, optional
The maximum radius in kpc within which to generate
particle positions. If not supplied, it will generate
positions out to the maximum radius available. Default: None
sub_sample : integer, optional
This option allows one to generate a sub-sample of unique
particle radii, densities, and energies which will then be
repeated to fill the required number of particles. Default: 1,
which means no sub-sampling.
prng : :class:`~numpy.random.RandomState` object, integer, or None
A pseudo-random number generator. Typically will only
be specified if you have a reason to generate the same
set of random numbers, such as for a test. Default is None,
which sets the seed based on the system time.
"""
from cluster_generator.utils import parse_prng
prng = parse_prng(prng)
mylog.info("We will be assigning %d tracer particles.", num_particles)
mylog.info("Compute particle positions.")
num_particles_sub = num_particles // sub_sample
radius_sub, mtot = generate_particle_radii(
self["radius"].d,
self["gas_mass"].d,
num_particles_sub,
r_max=r_max,
prng=prng,
)
if sub_sample > 1:
radius = np.tile(radius_sub, sub_sample)[:num_particles]
else:
radius = radius_sub
theta = np.arccos(prng.uniform(low=-1.0, high=1.0, size=num_particles))
phi = 2.0 * np.pi * prng.uniform(size=num_particles)
fields = OrderedDict()
fields["tracer", "particle_position"] = unyt_array(
[
radius * np.sin(theta) * np.cos(phi),
radius * np.sin(theta) * np.sin(phi),
radius * np.cos(theta),
],
"kpc",
).T
fields["tracer", "particle_velocity"] = unyt_array(
np.zeros(fields["tracer", "particle_position"].shape), "kpc/Myr"
)
fields["tracer", "particle_mass"] = unyt_array(np.zeros(num_particles), "Msun")
return ClusterParticles("tracer", fields)
[docs]
def generate_gas_particles(
self,
num_particles,
r_max=None,
sub_sample=1,
compute_potential=False,
prng=None,
):
"""
Generate a set of gas particles in hydrostatic equilibrium.
Parameters
----------
num_particles : integer
The number of particles to generate.
r_max : float, optional
The maximum radius in kpc within which to generate
particle positions. If not supplied, it will generate
positions out to the maximum radius available. Default: None
sub_sample : integer, optional
This option allows one to generate a sub-sample of unique
particle radii, densities, and energies which will then be
repeated to fill the required number of particles. Default: 1,
which means no sub-sampling.
compute_potential : boolean, optional
If True, the gravitational potential for each particle will
be computed. Default: False
prng : :class:`~numpy.random.RandomState` object, integer, or None
A pseudo-random number generator. Typically will only
be specified if you have a reason to generate the same
set of random numbers, such as for a test. Default is None,
which sets the seed based on the system time.
"""
from cluster_generator.utils import parse_prng
prng = parse_prng(prng)
mylog.info("We will be assigning %d gas particles.", num_particles)
mylog.info("Compute particle positions.")
num_particles_sub = num_particles // sub_sample
radius_sub, mtot = generate_particle_radii(
self["radius"].d,
self["gas_mass"].d,
num_particles_sub,
r_max=r_max,
prng=prng,
)
if sub_sample > 1:
radius = np.tile(radius_sub, sub_sample)[:num_particles]
else:
radius = radius_sub
theta = np.arccos(prng.uniform(low=-1.0, high=1.0, size=num_particles))
phi = 2.0 * np.pi * prng.uniform(size=num_particles)
fields = OrderedDict()
fields["gas", "particle_position"] = unyt_array(
[
radius * np.sin(theta) * np.cos(phi),
radius * np.sin(theta) * np.sin(phi),
radius * np.cos(theta),
],
"kpc",
).T
mylog.info("Compute particle thermal energies, densities, and masses.")
e_arr = 1.5 * self.fields["pressure"] / self.fields["density"]
get_energy = InterpolatedUnivariateSpline(self.fields["radius"], e_arr)
if sub_sample > 1:
energy = np.tile(get_energy(radius_sub), sub_sample)[:num_particles]
else:
energy = get_energy(radius)
fields["gas", "thermal_energy"] = unyt_array(energy, "kpc**2/Myr**2")
fields["gas", "particle_mass"] = unyt_array(
[mtot / num_particles] * num_particles, "Msun"
)
get_density = InterpolatedUnivariateSpline(
self.fields["radius"], self.fields["density"]
)
if sub_sample > 1:
density = np.tile(get_density(radius_sub), sub_sample)[:num_particles]
else:
density = get_density(radius)
fields["gas", "density"] = unyt_array(density, "Msun/kpc**3")
mylog.info("Set particle velocities to zero.")
fields["gas", "particle_velocity"] = unyt_array(
np.zeros((num_particles, 3)), "kpc/Myr"
)
if compute_potential:
energy_spline = InterpolatedUnivariateSpline(
self["radius"].d, -self["gravitational_potential"]
)
phi = -energy_spline(radius_sub)
if sub_sample > 1:
phi = np.tile(phi, sub_sample)
fields["gas", "particle_potential"] = unyt_array(phi, "kpc**2/Myr**2")
return ClusterParticles("gas", fields)
[docs]
def generate_dm_particles(
self,
num_particles,
r_max=None,
sub_sample=1,
compute_potential=False,
prng=None,
):
"""
Generate a set of dark matter particles in virial equilibrium.
Parameters
----------
num_particles : integer
The number of particles to generate.
r_max : float, optional
The maximum radius in kpc within which to generate
particle positions. If not supplied, it will generate
positions out to the maximum radius available. Default: None
sub_sample : integer, optional
This option allows one to generate a sub-sample of unique
particle radii and velocities which will then be repeated
to fill the required number of particles. Default: 1, which
means no sub-sampling.
compute_potential : boolean, optional
If True, the gravitational potential for each particle will
be computed. Default: False
prng : :class:`~numpy.random.RandomState` object, integer, or None
A pseudo-random number generator. Typically will only
be specified if you have a reason to generate the same
set of random numbers, such as for a test. Default is None,
which sets the seed based on the system time.
Returns
-------
particles : :class:`~cluster_generator.particles.ClusterParticles`
A set of dark matter particles.
"""
return self.dm_virial.generate_particles(
num_particles,
r_max=r_max,
sub_sample=sub_sample,
compute_potential=compute_potential,
prng=prng,
)
[docs]
def generate_star_particles(
self,
num_particles,
r_max=None,
sub_sample=1,
compute_potential=False,
prng=None,
):
"""
Generate a set of star particles in virial equilibrium.
Parameters
----------
num_particles : integer
The number of particles to generate.
r_max : float, optional
The maximum radius in kpc within which to generate
particle positions. If not supplied, it will generate
positions out to the maximum radius available. Default: None
sub_sample : integer, optional
This option allows one to generate a sub-sample of unique
particle radii and velocities which will then be repeated
to fill the required number of particles. Default: 1, which
means no sub-sampling.
compute_potential : boolean, optional
If True, the gravitational potential for each particle will
be computed. Default: False
prng : :class:`~numpy.random.RandomState` object, integer, or None
A pseudo-random number generator. Typically will only
be specified if you have a reason to generate the same
set of random numbers, such as for a test. Default is None,
which sets the seed based on the system time.
Returns
-------
particles : :class:`~cluster_generator.particles.ClusterParticles`
A set of star particles.
"""
return self.star_virial.generate_particles(
num_particles,
r_max=r_max,
sub_sample=sub_sample,
compute_potential=compute_potential,
prng=prng,
)
[docs]
@_enforce_style
def plot(
self, field, r_min=None, r_max=None, fig=None, ax=None, defaults=None, **kwargs
):
"""
Plot a field vs radius from this model using Matplotlib.
Parameters
----------
field : string
The field to plot.
r_min : float
The minimum radius of the plot in kpc.
r_max : float
The maximum radius of the plot in kpc.
fig : Matplotlib Figure
The figure to plot in. Default; None, in which case
one will be generated.
ax : Matplotlib Axes
The axes to plot in. Default: None, in which case
one will be generated.
defaults: dict
A dictionary containing defaults for the specific field. This is loaded by default if left unset.
The dictionary must contain keys ``'label','scale','revision'``. ``label`` must be a string with a formatting
character of the form ``"%(units)s"``; the scale make be any valid input to ``ax.set_yscale``, and the revision should
be 1 if the plot should be left as normal and ``-1`` if the plot should be flipped over the x axis (gravitational potential).
Returns
-------
The Figure, Axes tuple used for the plot.
"""
import pathlib as pt
import matplotlib.pyplot as plt
if fig is None:
fig = plt.figure(figsize=(10, 10))
if ax is None:
ax = fig.add_subplot(111)
if defaults is None:
# Loading the plot defaults
_config_directory = os.path.join(
pt.Path(__file__).parents[0], "bin", "resources", "plot_defaults.yaml"
)
with open(_config_directory, "r") as f:
plt_defaults = yaml.load(f, yaml.FullLoader)
defaults = plt_defaults[field]
if "kwargs" in defaults:
for k, v in defaults["kwargs"]:
if k not in kwargs:
kwargs[k] = v
ax.loglog(
self["radius"], defaults["revision"] * self[field], **kwargs
) # revision flips negative plots.
ax.set_xlim(r_min, r_max)
ax.set_yscale(defaults["scale"])
ax.set_ylabel(
defaults["label"] % {"unit": self[field].units.latex_representation()}
)
ax.set_xlabel(
r"Radius / $\left[%(unit)s\right]$"
% {"unit": self["radius"].units.latex_representation()}
)
return fig, ax
[docs]
@_enforce_style
def panel_plot(
self,
fields="all",
r_min=None,
r_max=None,
fig=None,
axes=None,
aspect_ratio=1,
base_length=3,
gs_kwargs=None,
**kwargs,
):
"""
Plot all of the selected fields in a grid of axes. Useful for checking for non-physical issues and getting
a general impression of the generated model.
Parameters
----------
fields: list or str
The fields to include in the plot. If ``fields="all"``, then all of the fields available will be plotted. Otherwise, ``fields`` should
be a list of field names.
r_min: float or int, optional
The minimum radius (kpc) from which to plot. Determined if not specified.
r_max: float or int, optional
The maximum radius (kpc) from which to plot. Determined if not specified.
fig: :py:class:`~matplotlib.figure.Figure`, optional
A :py:class:`~matplotlib.figure.Figure` instance onto which to place the plots. If not specified, a fresh one will be generated.
.. warning::
If a figure is specified but the axes are not specified, then this will overwrite / cover up any pre-existing
plots on those axes.
axes: dict of str: :py:class:`~matplotlib.axes.Axes`, optional
The axes onto which the plots should be drawn. These should be organized as a dictionary with keys corresponding to the
field name belonging to the plot and the corresponding axes on which that plot is drawn. If not specified, a grid is
generated on the figure and subdivided to facilitate the drawing.
aspect_ratio: float, optional
The aspect ratio of the individual subplots.
base_length: float, optional
The base length of each plot. Used to determine rest of the geometry.
gs_kwargs: dict, optional
additional kwargs to pass to the :py:class:`matplotlib.figure.GridSpec` instance.
**kwargs
Additional keyword arguments which are to be passed directly to :py:func:`matplotlib.pyplot.plot`. There are several formatting
options for these keyword arguments and the behavior depends on their format.
- If the kwarg values are :py:class:`dict`, then they should have keys corresponding to the different fields and
those kwargs will be passed to the field plot. If a key is missing, then no kwargs are passed for that
setting.
- If kwarg values are not dictionaries, they will be applied uniformly across all of the subplots.
Returns
-------
:py:class:`~matplotlib.figure.Figure`
The figure corresponding to the plot.
:py:class:`~matplotlib.axes.Axes`
The axes dictionary corresponding to the plot
"""
import pathlib as pt
from itertools import product
import matplotlib.pyplot as plt
# Loading the plot defaults
_config_directory = os.path.join(
pt.Path(__file__).parents[0], "bin", "resources", "plot_defaults.yaml"
)
with open(_config_directory, "r") as f:
plt_defaults = yaml.load(f, yaml.FullLoader)
if fields == "all":
fields = [i for i in self.fields.keys()]
else:
assert isinstance(
fields, (list, tuple)
), "The fields must be specified as an array or a tuple."
if fig is None:
fig = plt.figure()
if axes is None:
axes = {}
r, c = _closest_factors(len(fields))
# computing the figure behavior
h = (1 / (1 - 0.16)) * r * aspect_ratio * base_length
w = ((1 + 0.2) / (1 - 0.16)) * c * base_length
fig.set_figwidth(w)
fig.set_figheight(h)
fig.subplots_adjust(left=0.1, right=0.99, top=0.99, bottom=0.05)
if gs_kwargs is None:
gs_kwargs = {}
gridspec = fig.add_gridspec(r, c, **gs_kwargs)
gspec_it = product(range(0, r), range(0, c))
for gi, fi in zip(gspec_it, fields):
ri, ci = gi
axes[fi] = fig.add_subplot(gridspec[ri, ci])
else:
assert all(i in axes for i in fields), "Some fields do not have axes."
for field, ax in axes.items():
tmp_kwargs = {
k: (v[field] if isinstance(v, dict) else v) for k, v in kwargs.items()
}
self.plot(
field,
r_min=r_min,
r_max=r_max,
fig=fig,
ax=ax,
defaults=plt_defaults[field],
**tmp_kwargs,
)
return fig, axes
[docs]
def mass_in_radius(self, radius):
"""Determine the mass within a given radius."""
masses = {}
r = self.fields["radius"].to_value("kpc")
for mtype in ["total", "gas", "dark_matter", "stellar"]:
if f"{mtype}_mass" in self.fields:
masses[mtype] = self.fields[f"{mtype}_mass"][r < radius][-1]
return masses
[docs]
def find_radius_for_density(self, density):
"""Determine the radius at which the model reaches the specified density"""
density = ensure_ytquantity(density, "Msun/kpc**3").value
r = self.fields["radius"].to_value("kpc")[::-1]
d = self.fields["density"].to_value("Msun/kpc**3")[::-1]
return unyt_quantity(np.interp(density, d, r), "kpc")
[docs]
def create_dataset(
self, domain_dimensions, box_size=None, left_edge=None, velocity=None, **kwargs
):
"""
.. warning::
This method can be memory intensive. We suggest being conservative in your choice of domain size to
begin in order to avoid OOM issues. For reference, a domain dimension of ``[500,500,500]`` will take appox.
3Gb / field.
Create an in-memory, uniformly gridded dataset in 3D using yt by
placing the cluster into a box.
Parameters
----------
domain_dimensions : 3-tuple of ints
The number of cells on a side for the domain.
box_size : float, optional
The size of the box in kpc. If not specified, will default to the size of the cluster.
left_edge : array_like, optional
The minimum coordinate of the box in all three dimensions,
in kpc. Default: None, which means the left edge will
be [-r,-r,-r].
velocity: unyt_array, optional
The velocity of the cluster relative to the frame of the box. Default is 0. If specified, must be a 1X3 array.
"""
import gc
from yt.loaders import load_uniform_grid
from cluster_generator.data_structures import build_yt_dataset_fields
from cluster_generator.utils import mylog
mylog.info(f"Loading yt dataset of {self}...")
if box_size is None:
box_size = 2 * np.amax(self["radius"].d)
# Dealing with the left edge of the domain.
if left_edge is None:
left_edge = -np.amax(box_size / 2) * np.ones(3)
left_edge = np.array(left_edge)
# Building the boundary box
bbox = [
[left_edge[0], left_edge[0] + box_size],
[left_edge[1], left_edge[1] + box_size],
[left_edge[2], left_edge[2] + box_size],
]
# Creating the underlying meshgrid
try:
x, y, z = np.mgrid[
bbox[0][0] : bbox[0][1] : domain_dimensions[0] * 1j,
bbox[1][0] : bbox[1][1] : domain_dimensions[1] * 1j,
bbox[2][0] : bbox[2][1] : domain_dimensions[2] * 1j,
]
except MemoryError as err:
raise MemoryError(
f"Failed to allocate memory for the grid. Error msg = {err.__str__()}."
)
gc.collect()
data = build_yt_dataset_fields(
[x, y, z],
[self],
domain_dimensions,
unyt_array([[0, 0, 0]], "kpc"),
velocity if velocity is not None else unyt_array([[0, 0, 0]], "kpc/Myr"),
)
gc.collect()
return load_uniform_grid(
data,
domain_dimensions,
length_unit="kpc",
bbox=np.array(bbox),
mass_unit="Msun",
time_unit="Myr",
**kwargs,
)
@property
def is_physical(self):
"""
Determines if the :py:class:`model.ClusterModel` instance is physically realizable.
Returns
-------
bool
``True`` if the system is physically realizable, ``False`` otherwise.
Notes
-----
See the documentation on correcting non-physical regions. This method uses the fact that all non-physicalities in
the system is manifested in the dynamical density. The algorithm therefore checks the dynamical density for signs of
non-physicality and returns the results.
"""
density_fields = [
v.d
for k, v in self.fields.items()
if k in ["dark_matter_density", "density", "stellar_density"]
]
# Correcting negative values
for i, arr in enumerate(density_fields):
density_fields[i][np.where(arr < 0)] = 0
critical_density = np.sum([k for k in density_fields], axis=0)
if np.any(np.where(~np.isclose(critical_density, self["total_density"].v))):
# There is missing mass or too much mass.
return False
else:
# There is no mass differential.
return True
[docs]
def correct(self, mode="minimal"):
"""
Correct the :py:class:`model.ClusterModel` instance. The ``mode`` kwarg may be used to determine the precise algorithm
that is used to complete the correction.
Parameters
----------
mode: str
The algorithm to use for correction. Options are ``minimal`` and ``smooth``. If ``minimal``, then the dynamical density
is replaced with the minimum allowed consistent value but may lack smoothness. If ``smooth``, then a smooth monotone
interpolation scheme is used.
Returns
-------
:py:class:`model.ClusterModel`
"""
if self.is_physical:
mylog.info("No corrections were necessary, returned copy...")
return self
density_fields = {
k: v.d
for k, v in self.fields.items()
if ("density" in k) and (k != "total_density")
}
critical_density = np.sum(
np.array([u for u in density_fields.values()]), axis=0
)
if mode == "minimal":
self.fields["total_density"][
np.where(self.fields["total_density"].d < critical_density)
] = critical_density * (1.01)
if "density" in self.properties["meth"]["profiles"]:
density_function = self.properties["meth"]["profiles"]["density"]
else:
density_function = InterpolatedUnivariateSpline(
self["radius"].d, self["density"].d
)
total_density_function = InterpolatedUnivariateSpline(
self["radius"].d, self["total_density"].d
)
elif mode == "smooth":
from cluster_generator.numeric import hfa
self.fields["total_density"][
np.where(self.fields["total_density"].d < critical_density)
] = critical_density * (1.01)
u = np.log10(self["radius"].d)
v = np.log10(self["total_density"].d)
_u, _v = hfa(u, v, 50)
_y = 10 ** (_v)
if "density" in self.properties["meth"]["profiles"]:
density_function = self.properties["meth"]["profiles"]["density"]
else:
density_function = InterpolatedUnivariateSpline(
self["radius"].d, self["density"].d
)
total_density_function = InterpolatedUnivariateSpline(self["radius"].d, _y)
else:
raise ValueError(f"The correction mode {mode} is not a valid mode.")
return ClusterModel.from_dens_and_tden(
np.amin(self["radius"]),
np.amax(self["radius"]),
density_function,
total_density_function,
self.properties["meth"]["profiles"]["stellar_density"],
num_points=len(self["radius"]),
)
# This is only for backwards-compatibility
[docs]
class HydrostaticEquilibrium(ClusterModel):
"""Equivalent to :py:class:`ClusterModel`"""
pass
def _force_method(dictionary, meth):
if "meth" in dictionary:
if "method" in dictionary["meth"]:
pass
else:
dictionary["meth"]["method"] = meth
else:
dictionary["meth"] = {"method": meth}
return dictionary