Source code for codes

"""
Code-specific utilities for the ``cluster_generator`` library.
"""
from pathlib import Path

import h5py
import numpy as np
from unyt import uconcatenate, unyt_array

from cluster_generator.model import ClusterModel
from cluster_generator.particles import ClusterParticles
from cluster_generator.utils import mylog


[docs] def write_amr_particles( particles, output_filename, ptypes, ptype_num, overwrite=True, in_cgs=False, format="hdf5", ): """ Write the particles to an HDF5 file to be read in by the GAMER, FLASH, or RAMSES codes. Parameters ---------- particles : :class:`cluster_generator.particles.ClusterParticles` The ClusterParticles instance which will be written. output_filename : string The file to write the particles to. overwrite : boolean, optional Overwrite an existing file with the same name. Default: False. ptypes: TODO ptype_num: TODO overwrite: TODO in_cgs: TODO format: TODO Returns ------- TODO """ import h5py from scipy.io import FortranFile if Path(output_filename).exists() and not overwrite: raise IOError( f"Cannot create {output_filename}. It exists and overwrite=False." ) nparts = [particles.num_particles[ptype] for ptype in ptypes] if format == "hdf5": write_class = h5py.File elif format == "fortran": write_class = FortranFile num_particles = 0 with write_class(output_filename, "w") as f: pdata = [] for field in ["particle_position", "particle_velocity", "particle_mass"]: fd = uconcatenate([particles[ptype, field] for ptype in ptypes], axis=0) if hasattr(fd, "units") and in_cgs: fd.convert_to_cgs() if format == "hdf5": f.create_dataset(field, data=np.asarray(fd)) else: if field == "particle_mass": num_particles = fd.size pdata.append(np.asarray(fd).astype("float64").T) if format == "hdf5": fd = np.concatenate( [ ptype_num[ptype] * np.ones(nparts[i]) for i, ptype in enumerate(ptypes) ] ) f.create_dataset("particle_type", data=fd) else: f.write_record(num_particles) f.write_record(np.vstack(pdata).T)
[docs] def setup_gamer_ics(ics, regenerate_particles=False, use_tracers=False): r""" Generate the "Input_TestProb" lines needed for use with the ClusterMerger setup in GAMER. If the particles (dark matter and potentially star) have not been created yet, they will be created at this step. New profile files will also be created which have all fields in CGS units for reading into GAMER. If a magnetic field file is present in the ICs, a note will be given about how it should be named for GAMER to use it. Parameters ---------- ics : ClusterICs object The ClusterICs object to generate the GAMER ICs from. regenerate_particles : boolean, optional If particle files have already been created and this flag is set to True, the particles will be re-created. Default: False use_tracers : boolean Set to True to add tracer particles. Default: False """ gamer_ptypes = ["dm", "star"] if use_tracers: gamer_ptypes.insert(0, "tracer") gamer_ptype_num = {"tracer": 0, "dm": 2, "star": 3} hses = [ClusterModel.from_h5_file(hf) for hf in ics.profiles] parts = ics._generate_particles(regenerate_particles=regenerate_particles) outlines = [f"Merger_Coll_NumHalos\t\t{ics.num_halos}\t# number of halos"] for i in range(ics.num_halos): particle_file = f"{ics.basename}_gamerp_{i+1}.h5" if ics.num_particles["star"][i] == 0: ptypes = gamer_ptypes[:-1] else: ptypes = gamer_ptypes write_amr_particles( parts[i], particle_file, ptypes, gamer_ptype_num, in_cgs=True, format="hdf5" ) hse_file_gamer = ics.profiles[i].replace(".h5", "_gamer.h5") hses[i].write_model_to_h5( hse_file_gamer, overwrite=True, in_cgs=True, r_max=ics.r_max ) vel = ics.velocity[i].to_value("km/s") outlines += [ f"Merger_File_Prof{i+1}\t\t{hse_file_gamer}\t# profile table of cluster {i+1}", f"Merger_File_Par{i+1}\t\t{particle_file}\t# particle file of cluster {i+1}", f"Merger_Coll_PosX{i+1}\t\t{ics.center[i][0].v}\t# X-center of cluster {i+1} in kpc", f"Merger_Coll_PosY{i+1}\t\t{ics.center[i][1].v}\t# Y-center of cluster {i+1} in kpc", f"Merger_Coll_VelX{i+1}\t\t{vel[0]}\t# X-velocity of cluster {i+1} in km/s", f"Merger_Coll_VelY{i+1}\t\t{vel[1]}\t# Y-velocity of cluster {i+1} in km/s", ] mylog.info("Write the following lines to Input__TestProblem: ") for line in outlines: print(line) if ics.mag_file is not None: mylog.info( f"Rename the file '{ics.mag_file}' to 'B_IC' " f"and place it in the same directory as the " f"Input__* files, and set OPT__INIT_BFIELD_BYFILE " f"to 1 in Input__Parameter" )
[docs] def setup_flash_ics(ics, use_particles=True, regenerate_particles=False): r""" Generate the "flash.par" lines needed for use with the GalaxyClusterMerger setup in FLASH. If the particles (dark matter and potentially star) have not been created yet, they will be created at this step. Parameters ---------- ics : ClusterICs object The ClusterICs object to generate the GAMER ICs from. use_particles : boolean, optional If True, set up particle distributions. Default: True regenerate_particles : boolean, optional If particle files have already been created, particles are being used, and this flag is set to True, the particles will be re-created. Default: False """ if use_particles: ics._generate_particles(regenerate_particles=regenerate_particles) outlines = [f"testSingleCluster\t=\t{ics.num_halos} # number of halos"] for i in range(ics.num_halos): vel = ics.velocity[i].to("km/s") outlines += [ f"profile{i+1}\t=\t{ics.profiles[i]}\t# profile table of cluster {i+1}", f"xInit{i+1}\t=\t{ics.center[i][0]}\t# X-center of cluster {i+1} in kpc", f"yInit{i+1}\t=\t{ics.center[i][1]}\t# Y-center of cluster {i+1} in kpc", f"vxInit{i+1}\t=\t{vel[0]}\t# X-velocity of cluster {i+1} in km/s", f"vyInit{i+1}\t=\t{vel[1]}\t# Y-velocity of cluster {i+1} in km/s", ] if use_particles: outlines.append( f"Merger_File_Par{i+1}\t=\t{ics.particle_files[i]}\t# particle file of cluster {i+1}", ) mylog.info("Add the following lines to flash.par: ") for line in outlines: print(line)
[docs] def setup_athena_ics(ics): r""" Parameters ---------- ics : ClusterICs object The ClusterICs object to generate the Athena ICs from. """ mylog.info("Add the following lines to athinput.cluster3d: ")
[docs] def setup_enzo_ics(ics): r""" Parameters ---------- ics : ClusterICs object The ClusterICs object to generate the Enzo ICs from. """ pass
[docs] def setup_ramses_ics(ics, regenerate_particles=False): r""" Parameters ---------- ics : ClusterICs object The ClusterICs object to generate the Ramses ICs from. regenerate_particles : boolean, optional If particle files have already been created, particles are being used, and this flag is set to True, the particles will be re-created. Default: False """ names = ["Main", "Sub", "Third"] config_lines = ["# Merger Dynamics Setting, do not change the general format"] hses = [ClusterModel.from_h5_file(hf) for hf in ics.profiles] parts = ics._generate_particles(regenerate_particles=regenerate_particles) fields_to_write = ["radius", "density", "pressure"] for i in range(ics.num_halos): if i > 0: config_lines.append("#") config_lines += [f"# {names[i]}", "#", "#", f"Halo {i+1}"] hses[i].write_model_to_binary( f"halo{i+1}_prof.dat", overwrite=True, in_cgs=True, r_max=ics.r_max, fields_to_write=fields_to_write, ) vel = ics.velocity[i].to_value("km/s") pos = ics.center[i].to_value("kpc") config_lines += [ f"x_cen[kpc] ={pos[0]:16.6e}", f"y_cen[kpc] ={pos[1]:16.6e}", f"z_cen[kpc] ={pos[2]:16.6e}", f"vx_cen[kms] ={vel[0]:16.6e}", f"vy_cen[kms] ={vel[1]:16.6e}", f"vz_cen[kms] ={vel[2]:16.6e}", ] write_amr_particles( parts[i], f"halo{i+1}_part.dat", ["dm"], {"dm": 1}, format="fortran", in_cgs=True, ) mylog.info("Simulation setups saved to Merger_Config.txt.") np.savetxt("Merger_Config.txt", config_lines, fmt="%s")
[docs] def setup_arepo_ics( ics, boxsize, nx, ic_file, overwrite=False, regenerate_particles=False, prng=None ): """Setup Arepo ICs""" fields = {} cparts = ics.setup_particle_ics( regenerate_particles=regenerate_particles, prng=prng ) ngrid = nx**3 dx = boxsize / nx le = 0.5 * dx re = boxsize - 0.5 * dx posg = ( np.mgrid[le : re : nx * 1j, le : re : nx * 1j, le : re : nx * 1j] .reshape(3, ngrid) .T ) rmax2 = ics.r_max**2 idxs = np.sum((posg - ics.center[0].v) ** 2, axis=1) > rmax2[0] if ics.num_halos > 1: idxs |= np.sum((posg - ics.center[1].v) ** 2, axis=1) > rmax2[1] if ics.num_halos > 2: idxs |= np.sum((posg - ics.center[2].v) ** 2, axis=1) > rmax2[2] dV = dx**3 nleft = idxs.sum() idens = np.argmin(cparts["gas", "density"].d) dens = cparts["gas", "density"].d[idens] * np.ones(nleft) eint = cparts["gas", "thermal_energy"].d[idens] * np.ones(nleft) fields["gas", "particle_position"] = unyt_array(posg[idxs, :], "kpc") fields["gas", "particle_velocity"] = unyt_array(np.zeros((nleft, 3)), "kpc/Myr") fields["gas", "particle_mass"] = unyt_array(dens * dV, "Msun") fields["gas", "density"] = unyt_array(dens, "Msun/kpc**3") fields["gas", "thermal_energy"] = unyt_array(eint, "kpc**2/Myr**2") mylog.info( "Background cell density is %g g/cm**3.", fields["gas", "density"][0].to_value("g/cm**3"), ) mylog.info( "Background cell mass is %g Msun.", fields["gas", "particle_mass"][0].to_value("Msun"), ) all_parts = cparts + ClusterParticles.from_fields(fields) all_parts.write_to_gadget_file(ic_file, boxsize, overwrite=overwrite, code="arepo")
[docs] def resample_arepo_ics(ics, infile, outfile, overwrite=False): """Resample Arepo ICs""" parts = ClusterParticles.from_gadget_file(infile) new_parts = ics.resample_particle_ics(parts) with h5py.File(infile, "r") as f: boxsize = f["Header"].attrs["BoxSize"] new_parts.write_to_gadget_file(outfile, boxsize, overwrite=overwrite)
[docs] def setup_gizmo_ics(ics): r""" Parameters ---------- ics : ClusterICs object The ClusterICs object to generate the GIZMO funcs from. """ pass
[docs] def setup_art_ics(ics): """Setup Art ICs""" pass