Source code for scri.SpEC.com_motion

#!/usr/bin/env python

# Copyright (c) 2020, Michael Boyle
# See LICENSE file for details: <https://github.com/moble/spherical_functions/blob/master/LICENSE>
"""Script and/or library to calculate optimal translation and boost from Horizons.h5

See docstrings below for more detail.

To use this function from another script, simply do something like the following:

    from scri.SpEC.com_motion import com_motion, estimate_avg_com_motion
    x_0, v_0, t_0 = estimate_avg_com_motion('path/to/Horizons.h5')

Other options may be passed to the `estimate_avg_com_motion` function; see its docstring for details.

"""


[docs] def com_motion(path_to_horizons_h5, path_to_matter_h5=None, m_A=None, m_B=None): """Calculate the center-of-mass motion from Horizons.h5 and/or Matter.h5 Note that no input checking is done here. If the data sets are missing, incomplete, or corrupted, you should expect to get exceptions or silently incorrect data. Parameters ---------- path_to_horizons_h5 : string, optional [default is 'Horizons.h5'] Absolute or relative path to 'Horizons.h5' path_to_matter_h5: None or string [default is None] Absolute or relative path to 'Matter.h5' for neutron star trajectories. If None, we assume that only black holes are involved, so that both A and B trajectories will be in Horizons.h5. m_A: None or float [default is None] If None, the mass will be read from Horizons.h5 (for BHs) or Matter.h5 (for NSs). m_B: None or float [default is None] If None, the mass will be read from Horizons.h5 (for BHs) or Matter.h5 (for NSs). Returns ------- t : float array These are simply the coordinate times com : 3xN float array Position of CoM at each instant of time """ import numpy as np import h5py if path_to_matter_h5 is None: # BBH with h5py.File(path_to_horizons_h5, "r") as horizons: t = horizons["AhA.dir/ChristodoulouMass.dat"][:, 0] m_A = horizons["AhA.dir/ChristodoulouMass.dat"][:, 1] x_A = horizons["AhA.dir/CoordCenterInertial.dat"][:, 1:] m_B = horizons["AhB.dir/ChristodoulouMass.dat"][:, 1] x_B = horizons["AhB.dir/CoordCenterInertial.dat"][:, 1:] m = m_A + m_B CoM = ((m_A[:, np.newaxis] * x_A) + (m_B[:, np.newaxis] * x_B)) / m[:, np.newaxis] else: # BHNS or NSNS with h5py.File(path_to_matter_h5, "r") as matter: if "InertialCenterOfMassNS1.dat" in matter and "InertialCenterOfMassNS2.dat" in matter: # This is an NSNS. x_A = matter["InertialCenterOfMassNS1.dat"][:, 1:] t = matter["InertialCenterOfMassNS1.dat"][:, 0] x_B = matter["InertialCenterOfMassNS2.dat"][:, 1:] t_check = matter["InertialCenterOfMassNS2.dat"][:, 0] if (t != t_check).any(): raise ValueError(f"Two NSs do not have the same time: diff = {t-t_check}") if m_A is None or m_B is None: raise ValueError(f"Cannot figure out masses of NSs: {m_A}, {m_B}") m_A = np.atleast_1d(m_A) m_B = np.atleast_1d(m_B) elif "InertialCenterOfMassNS1.dat" in matter: # This is a BHNS with h5py.File(path_to_horizons_h5, "r") as horizons: t = horizons["AhA.dir/ChristodoulouMass.dat"][:, 0] x_A = horizons["AhA.dir/CoordCenterInertial.dat"][:, 1:] if m_A is None: m_A = horizons["AhA.dir/ChristodoulouMass.dat"][:, 1] else: m_A = np.atleast_1d(m_A) x_B = matter["InertialCenterOfMassNS1.dat"][:, 1:] t_check = matter["InertialCenterOfMassNS1.dat"][:, 0] if len(t) < len(t_check): raise ValueError( f"BH and NS time arrays have different lengths: {len(t)} vs {len(t_check)}" ) elif len(t) > len(t_check): # We allow AhA.dir/CoordCenterInertial.dat to have # more timesteps than InertialCenterOfMassNS1.dat, # as long as the early timesteps agree and # AhA.dir/CoordCenterInertial.dat just has extra steps # at the end. length = len(t_check) if m_A.shape[0] == len(t): m_A = m_A[:length] t = t[:length] x_A = x_A[:length, :] if (t != t_check).any(): raise ValueError( "BH and NS do not have the same time: " f"tsize={len(t)}, tchecksize={len(t_check)}, max diff = {max(abs(t-t_check))}" ) if m_B is None: raise ValueError(f"Cannot figure out mass of NS: {m_B}") m_B = np.atleast_1d(m_B) else: raise Exception(f"Cannot find expected keys in file. Found {matter.keys()}") m = m_A + m_B CoM = ((m_A[:, np.newaxis] * x_A) + (m_B[:, np.newaxis] * x_B)) / m[:, np.newaxis] # The input and output waveforms are in units of the total # mass, but t and CoM are in units of solar masses. Therefore we # convert the time and CoM arrays here. t /= m CoM /= m[:, np.newaxis] return t, CoM
def estimate_avg_com_motion( path_to_horizons_h5="Horizons.h5", skip_beginning_fraction=0.01, skip_ending_fraction=0.10, plot=False, fit_acceleration=False, path_to_matter_h5=None, m_A=None, m_B=None, ): """Calculate optimal translation and boost from Horizons.h5 This returns the optimal initial position and velocity such that the CoM is best approximated as having these initial values. If the coordinate system is transformed by these values, the new CoM motion will be as close to the origin as possible (in the sense of squared distance from the origin integrated over time). The translation to be applied to the data should be calculated given the values returned by this function as np.array([x_i+v_i*t_j+0.5*a_i*t_j**2 for t_j in t]) Parameters ---------- path_to_horizons_h5: string, optional [default is 'Horizons.h5'] Absolute or relative path to 'Horizons.h5' skip_beginning_fraction : float, optional Exclude this portion from the beginning of the data. Note that this is a fraction, rather than a percentage. The default value is 0.01, meaning the first 1% of the data will be ignored. skip_ending_fraction : float, optional Exclude this portion from the end of the data. Note that this is a fraction, rather than a percentage. The default value is 0.10, meaning the last 10% of the data will be ignored. plot : bool, optional If True, save plot showing CoM tracks before and after offset and translation, to file `CoM_before_and_after_translation.pdf` in the same directory as Horizons.h5. Default: False. fit_acceleration: bool, optional If True, allow for an acceleration in the fit, and return as third parameter. Default: False. path_to_matter_h5: None or string [default is None] Absolute or relative path to 'Matter.h5' for neutron star trajectories. If None, we assume that only black holes are involved, so that both A and B trajectories will be in Horizons.h5. m_A: None or float [default is None] If None, the mass will be read from Horizons.h5 (for BHs) or Matter.h5 (for NSs). m_B: None or float [default is None] If None, the mass will be read from Horizons.h5 (for BHs) or Matter.h5 (for NSs). Returns ------- x_i : length-3 array of floats Best-fit initial position of the center of mass v_i : length-3 array of floats Best-fit initial velocity of the center of mass a_i : length-3 array of floats Best-fit initial acceleration of the center of mass [only if `fit_acceleration=True` is in input arguments] t_i : float Initial time used. This is determined by the `skip_beginning_fraction` input parameter. t_f : float Final time used. This is determined by the `skip_ending_fraction` input parameter. """ import os.path import numpy as np from scipy.integrate import simps t, com = com_motion(path_to_horizons_h5, path_to_matter_h5, m_A, m_B) # We will be skipping the beginning and end of the data; # this gives us the initial and final indices t_i, t_f = t[0] + (t[-1] - t[0]) * skip_beginning_fraction, t[-1] - (t[-1] - t[0]) * skip_ending_fraction i_i, i_f = np.argmin(np.abs(t - t_i)), np.argmin(np.abs(t - t_f)) # Find the optimum analytically CoM_0 = simps(com[i_i : i_f + 1], t[i_i : i_f + 1], axis=0) CoM_1 = simps((t[:, np.newaxis] * com)[i_i : i_f + 1], t[i_i : i_f + 1], axis=0) if fit_acceleration: CoM_2 = simps((t[:, np.newaxis] ** 2 * com)[i_i : i_f + 1], t[i_i : i_f + 1], axis=0) x_i = ( 3 * ( CoM_0 * (3 * t_f**4 + 12 * t_f**3 * t_i + 30 * t_f**2 * t_i**2 + 12 * t_f * t_i**3 + 3 * t_i**4) - 12 * CoM_1 * (t_f + t_i) * (t_f**2 + 3 * t_f * t_i + t_i**2) + CoM_2 * (10 * t_f**2 + 40 * t_f * t_i + 10 * t_i**2) ) / (t_f - t_i) ** 5 ) v_i = ( 12 * ( -3 * CoM_0 * (t_f + t_i) * (t_f**2 + 3 * t_f * t_i + t_i**2) + CoM_1 * (16 * t_f**2 + 28 * t_f * t_i + 16 * t_i**2) + CoM_2 * (-15 * t_f - 15 * t_i) ) / (t_f - t_i) ** 5 ) a_i = ( 60 * (CoM_0 * (t_f**2 + 4 * t_f * t_i + t_i**2) + CoM_1 * (-6 * t_f - 6 * t_i) + 6 * CoM_2) / (t_f - t_i) ** 5 ) else: x_i = 2 * (CoM_0 * (2 * t_f**3 - 2 * t_i**3) + CoM_1 * (-3 * t_f**2 + 3 * t_i**2)) / (t_f - t_i) ** 4 v_i = 6 * (CoM_0 * (-t_f - t_i) + 2 * CoM_1) / (t_f - t_i) ** 3 a_i = 0.0 # If desired, save the plots if plot: import matplotlib as mpl mpl.use("Agg") # Must come after importing mpl, but before importing plt import matplotlib.pyplot as plt directory = os.path.dirname(os.path.abspath(path_to_horizons_h5)) try: from subprocess import check_output SXS_BBH = check_output( "awk '/alternative-names/{{print $3}}' " + os.path.join(directory, "metadata.txt"), shell=True ) if SXS_BBH: SXS_BBH = "\n" + SXS_BBH.strip() except: SXS_BBH = "" delta_x = np.array([x_i + v_i * t_j + 0.5 * a_i * t_j**2 for t_j in t]) comprm = com - delta_x max_displacement = np.linalg.norm(delta_x, axis=1).max() max_d_color = min(1.0, 10 * max_displacement) fig = plt.figure(figsize=(10, 7)) plt.plot(t, com, alpha=0.25, lw=1.5) plt.gca().set_prop_cycle(None) lineObjects = plt.plot(t, comprm, lw=2) plt.xlabel(r"Coordinate time") plt.ylabel(r"CoM coordinate values") plt.legend(iter(lineObjects), ("x", "y", "z"), loc="upper left") fig.text( 0.5, 0.91, directory + SXS_BBH + "\n$x_i$ = [{}]".format(", ".join([str(tmp) for tmp in x_i])) + "\n$v_i$ = [{}]".format(", ".join([str(tmp) for tmp in v_i])), fontsize=8, ha="center", ) fig.text( 0.004, 0.004, str(max_displacement), fontsize=24, ha="left", va="bottom", bbox=dict(facecolor=mpl.cm.jet(max_d_color), alpha=max_d_color, boxstyle="square,pad=0.2"), ) fig.savefig(os.path.join(os.path.dirname(path_to_horizons_h5), "CoM_before_and_after_transformation.pdf")) plt.close() print("Optimal x_i: [{}, {}, {}]".format(*x_i)) print("Optimal v_i: [{}, {}, {}]".format(*v_i)) if fit_acceleration: print("Optimal a_i: [{}, {}, {}]".format(*a_i)) print(f"t_i, t_f: {t_i}, {t_f}") if fit_acceleration: return x_i, v_i, a_i, t_i, t_f else: return x_i, v_i, t_i, t_f def remove_avg_com_motion( w_m=None, path_to_waveform_h5="rhOverM_Asymptotic_GeometricUnits.h5/Extrapolated_N2.dir", path_to_horizons_h5=None, skip_beginning_fraction=0.01, skip_ending_fraction=0.10, plot=False, file_write_mode="w", path_to_matter_h5=None, m_A=None, m_B=None, file_format="NRAR", write_corrected_file=True, ): """Rewrite waveform data in center-of-mass frame This simply uses `estimate_avg_com_motion`, and then transforms to that frame as appropriate. Most of the options are simply passed to that function. Note, however, that the path to the Horizons.h5 file defaults to the directory of the waveform H5 file. Additional parameters --------------------- w_m : WaveformModes, optional The original waveform to be corrected. If this is not given, it will be read from file. Whether or not it is given, the Horizons and/or Matter files must still be present. path_to_waveform_h5 : str, optional Absolute or relative path to SpEC waveform file, including the directory within the H5 file, if appropriate. Default value is 'rhOverM_Asymptotic_GeometricUnits.h5/Extrapolated_N2.dir'. path_to_horizons_h5: None or string [default is None] Absolute or relative path to 'Horizons.h5'. If None, this will be inferred from the waveform path. path_to_matter_h5: None or string [default is None] Absolute or relative path to 'Matter.h5' for neutron star trajectories. If None, this will be inferred from the waveformpath. If the file does not exist, we assume that only black holes are involved, so that both A and B trajectories will be in Horizons.h5. m_A: None or float [default is None] If None and there is no Matter.h5 file, the mass will be read from Horizons.h5; otherwise, the mass will be read from the metadata `reference_mass1`. m_B: None or float [default is None] If None and there is no Matter.h5 file, the mass will be read from Horizons.h5; otherwise, the mass will be read from the metadata `reference_mass2`. file_format: 'NRAR' or 'RPXMB' [default is 'NRAR'] The file format of the waveform data H5 file. The 'NRAR' format is the default file format found in the SXS Catalog. The 'RPXMB' format is data compressed with the rotating_paired_xor_multishuffle_bzip2 scheme. write_corrected_file: bool [default is True] Returns ------- w_m : WaveformModes object This is the transformed object in the new frame """ import os.path import pathlib import re import numpy as np from .file_io import read_from_h5, write_to_h5, rotating_paired_xor_multishuffle_bzip2 from .. import h, hdot, psi4, psi3, psi2, psi1, psi0 directory = os.path.dirname(os.path.abspath(path_to_waveform_h5.split(".h5", 1)[0] + ".h5")) subdir = os.path.basename(path_to_waveform_h5.split(".h5", 1)[1]) # Read the waveform data in if w_m is not None: pass elif file_format.lower() == "nrar": w_m = read_from_h5(path_to_waveform_h5) elif file_format.lower() == "rpxmb" or file_format.lower() == "rpxm": w_m = rotating_paired_xor_multishuffle_bzip2.load(path_to_waveform_h5)[0].to_inertial_frame() else: raise ValueError(f"File format {file_format} not recognized. Must be either 'NRAR' or 'RPXMB'.") version_hist = getattr(w_m, "version_hist", None) extrapolate_coord_radii = getattr(w_m, "extrapolate_coord_radii", None) if path_to_horizons_h5 is None: path_to_horizons_h5 = os.path.join(directory, "Horizons.h5") if path_to_matter_h5 is None: tmp = pathlib.Path(directory) / "Matter.h5" if tmp.exists(): path_to_matter_h5 = str(tmp) if m_A is None: m_A = w_m.metadata.reference_mass1 if m_B is None: m_B = w_m.metadata.reference_mass2 # Get the CoM motion from Horizons.h5 x_0, v_0, t_0, t_f = estimate_avg_com_motion( path_to_horizons_h5=path_to_horizons_h5, skip_beginning_fraction=skip_beginning_fraction, skip_ending_fraction=skip_ending_fraction, plot=plot, path_to_matter_h5=path_to_matter_h5, m_A=m_A, m_B=m_B, ) # Set up the plot and plot the original data if plot: import matplotlib as mpl mpl.use("Agg") # Must come after importing mpl, but before importing plt import matplotlib.pyplot as plt import scri.plotting try: if isinstance(w_m.metadata.alternative_names, list): SXS_BBH = "\n" + ", ".join(w_m.metadata.alternative_names) else: SXS_BBH = "\n" + w_m.metadata.alternative_names except: SXS_BBH = "" t_merger = w_m.max_norm_time() - 300.0 t_ringdown = w_m.max_norm_time() + 100.0 t_final = w_m.t[-1] delta_x = np.array([x_0 + v_0 * t_j for t_j in w_m.t]) max_displacement = np.linalg.norm(delta_x, axis=1).max() max_d_color = min(1.0, 9 * max_displacement) LM_indices1 = [[2, 2], [2, 1], [3, 3], [3, 1], [4, 3]] LM_indices1 = [[ell, m] for ell, m in LM_indices1 if [ell, m] in w_m.LM.tolist()] indices1 = [(ell * (ell + 1) - 2**2 + m) for ell, m in LM_indices1] LM_indices2 = [[2, 2], [3, 2], [4, 4], [4, 2]] LM_indices2 = [[ell, m] for ell, m in LM_indices2 if [ell, m] in w_m.LM.tolist()] indices2 = [(ell * (ell + 1) - 2**2 + m) for ell, m in LM_indices2] fig1 = plt.figure(1, figsize=(10, 7)) plt.gca().set_xscale("merger_zoom", t_merger=t_merger, t_ringdown=t_ringdown, t_final=t_final) lines1 = plt.semilogy(w_m.t, abs(w_m.data[:, indices1]), alpha=0.35, lw=1.5) fig2 = plt.figure(2, figsize=(10, 7)) plt.gca().set_xscale("merger_zoom", t_merger=t_merger, t_ringdown=t_ringdown, t_final=t_final) lines2 = plt.semilogy(w_m.t, abs(w_m.data[:, indices2]), alpha=0.35, lw=1.5) # Import auxilliary waveforms if we are transforming Psi3, Psi2, Psi1, or Psi0. To apply a CoM # correction to these data types, information is required from all higher ranked Weyl scalars, # e.g. Psi2 requires information from Psi3 and Psi4. aux_waveforms = {} if w_m.dataType not in [h, hdot, psi4]: file_prefix = ["rMPsi4", "r2Psi3", "r3Psi2OverM", "r4Psi1OverM2"] regex_template = r"""(r2News|rhOverM|rMPsi4|r2Psi3|r3Psi2OverM|r4Psi1OverM2|r5Psi0OverM3)""" for i in range(5 - w_m.dataType): match = re.search(regex_template, path_to_waveform_h5) # Assume the auxiliary waveforms are in the same directory as the waveform to be transformed aux_path = path_to_waveform_h5.replace(match.group(1), file_prefix[i]) try: if file_format.lower() == "nrar": aux_waveforms[f"psi{4-i}_modes"] = read_from_h5(aux_path) elif file_format.lower() == "rpxmb" or file_format.lower() == "rpxm": aux_waveforms[f"psi{4-i}_modes"] = rotating_paired_xor_multishuffle_bzip2.load(aux_path)[0] aux_waveforms[f"psi{4-i}_modes"].to_inertial_frame() except (FileNotFoundError, OSError): raise FileNotFoundError( f"A transformation of {w_m.data_type_string} requires waveform data from Psi{4-i}, " "but the file {aux_path} could not be found." ) # Transform the mode data w_m = w_m.transform(space_translation=x_0, boost_velocity=v_0, **aux_waveforms) # Write the data to the new file if write_corrected_file and file_format.lower() == "nrar": path_to_new_waveform_h5 = re.sub( w_m.descriptor_string + "_", "", # Remove 'rhOverM_', 'rMPsi4_', or whatever path_to_waveform_h5.replace(".h5", "_CoM.h5", 1), # Add '_CoM' once flags=re.I, ) # Ignore case of 'psi4'/'Psi4', etc. write_to_h5( w_m, path_to_new_waveform_h5, file_write_mode=file_write_mode, attributes={"space_translation": x_0, "boost_velocity": v_0}, ) w_m.boost_velocity = v_0 w_m.space_translation = x_0 if version_hist is not None: w_m.version_hist = version_hist if extrapolate_coord_radii is not None: w_m.extrapolate_coord_radii = extrapolate_coord_radii if write_corrected_file and (file_format.lower() == "rpxmb" or file_format.lower() == "rpxm"): path_to_new_waveform_h5 = path_to_waveform_h5.replace(".h5", "_CoM.h5", 1) rotating_paired_xor_multishuffle_bzip2.save(w_m, path_to_new_waveform_h5) # Finish by plotting the new data and save to PDF if plot: plt.figure(1) for line, index, (ell, m) in zip(lines1, indices1, LM_indices1): plt.semilogy(w_m.t, abs(w_m.data[:, index]), color=plt.getp(line, "color"), lw=1.5, label=f"({ell}, {m})") plt.figure(2) for line, index, (ell, m) in zip(lines2, indices2, LM_indices2): plt.semilogy(w_m.t, abs(w_m.data[:, index]), color=plt.getp(line, "color"), lw=1.5, label=f"({ell}, {m})") for fig, num in [(fig1, 1), (fig2, 2)]: plt.figure(num) plt.axvline(t_merger, color="black", lw=2, alpha=0.125) plt.axvline(t_ringdown, color="black", lw=2, alpha=0.125) plt.xlabel(r"Coordinate time") plt.ylabel(r"Mode amplitudes") plt.xlim(0.0, w_m.t[-1]) plt.ylim(1e-6, 0.6) plt.grid(axis="y") fig.text( 0.5, 0.91, os.path.abspath(path_to_waveform_h5) + SXS_BBH + "\n$x_0$ = [{}]".format(", ".join([str(tmp) for tmp in x_0])) + "\n$v_0$ = [{}]".format(", ".join([str(tmp) for tmp in v_0])), fontsize=8, ha="center", ) fig.text( 0.004, 0.004, str(max_displacement), fontsize=24, ha="left", va="bottom", bbox=dict(facecolor=mpl.cm.jet(max_d_color), alpha=max_d_color, boxstyle="square,pad=0.2"), ) plt.legend(loc="upper left", framealpha=0.9) fig.savefig(os.path.join(directory, "Modes_{}_{}_{}.pdf".format(w_m.descriptor_string, subdir[:-4], num))) plt.close() return w_m if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Calculate optimal translation and boost from Horizons.h5", epilog=( "This simply attempts to minimize the squared distance of the CoM from the origin by applying some " "constant offset and linear-in-time translation. Because it uses optimization, it may be sensitive " "to the initial guess and may not converge." ), ) parser.add_argument( "path_to_horizons_h5", default="Horizons.h5", nargs="?", action="store", help="path to the Horizons.h5 file" ) parser.add_argument( "--skip_beginning_fraction", default=0.01, nargs=1, type=float, action="store", help="optional initial portion of the data to ignore", ) parser.add_argument( "--skip_ending_fraction", default=0.10, nargs=1, type=float, action="store", help="optional final portion of the data to ignore", ) parser.add_argument( "--plot", action="store_true", help="make plots as `CoM_before_and_after_transformation.pdf` in same directory as Horizons.h5", ) args = parser.parse_args() estimate_avg_com_motion( path_to_horizons_h5=args.path_to_horizons_h5, skip_beginning_fraction=args.skip_beginning_fraction, skip_ending_fraction=args.skip_ending_fraction, plot=args.plot, )