# Copyright (c) 2021, Michael Boyle
# See LICENSE file for details: <https://github.com/moble/spherical_functions/blob/master/LICENSE>
import warnings
import os
import re
import ast
import numpy as np
import h5py
import quaternion
import spherical_functions as sf
from ... import (
jit,
WaveformModes,
FrameNames,
DataType,
DataNames,
UnknownDataType,
h,
hdot,
psi4,
psi3,
psi2,
psi1,
psi0,
Inertial,
)
import sxs
from sxs.metadata import Metadata
from . import corotating_paired_xor, rotating_paired_xor_multishuffle_bzip2
from ...asymptotic_bondi_data import AsymptoticBondiData
rpxmb = rotating_paired_xor_multishuffle_bzip2
[docs]
@jit
def index_is_monotonic(y):
length = y.size
monotonic = np.ones_like(y, dtype=np.bool_)
direction = y[-1] - y[0]
if direction > 0.0:
max_value = y[0]
for i in range(1, length):
if y[i] <= max_value:
monotonic[i] = False
else:
max_value = y[i]
else:
min_value = y[0]
for i in range(1, length):
if y[i] >= min_value:
monotonic[i] = False
else:
min_value = y[i]
return monotonic
[docs]
def monotonic_indices(y):
indices = np.arange(y.size)
return indices[index_is_monotonic(y)]
[docs]
def monotonize(y):
return y[monotonic_indices(y)]
[docs]
def read_from_h5(file_name, **kwargs):
"""Read data from an H5 file in SXS format
Note that SXS format is essentially compatible with NRAR format. The existence of this function is not to be
taken as the author's endorsement of either SXS or NRAR format.
Parameters
----------
file_name : str
Path to H5 file containing the data, optionally including the path within the file itself to the directory
containing the data. For example, the standard SXS data with N=2 might be obtained with the file name
`'rhOverM_Asymptotic_GeometricUnits.h5/Extrapolated_N2.dir'`.
Keyword parameters
------------------
frameType : int, optional
dataType : int, optional
r_is_scaled_out : bool, optional
m_is_scaled_out : bool, optional
These four parameters are documented in the docstring of the WaveformBase object. Note that if any of these
is present in the H5 file (which is not common) that value will override this argument. If neither the file
nor these parameters are present, defaults will be applied, assuming that the frame is inertial, R and M are
both scaled out, and the data type (hdot, h, psi4, psi4, psi2, psi1, or psi0) can be gleaned from `file_name`.
See also
--------
scri.rpxmb.save : Save a waveform in RPXMB format
scri.rpxmb.load : Load a file in RPXMB format
"""
import os.path
import re
import h5py
import quaternion
# This unfortunate concoction is needed to determine the (ell,m) values of the various mode data sets
pattern_Ylm = re.compile(r"""Y_l(?P<L>[0-9]+)_m(?P<M>[-+0-9]+)\.dat""")
# Initialize an empty object to be filled with goodies
w = WaveformModes(constructor_statement=f'scri.SpEC.read_from_h5("{file_name}", **{kwargs})')
# Get an h5py handle to the desired part of the h5 file
try:
file_name, root_group = file_name.rsplit(".h5", 1)
file_name += ".h5"
except ValueError:
root_group = "" # FileName is just a file, not a group in a file
try:
f_h5 = h5py.File(file_name, "r")
except OSError:
print(f"\n`read_from_h5` could not open the file '{file_name}'\n\n")
raise
if root_group:
f = f_h5[root_group]
else:
f = f_h5
# If it exists, add the metadata file to `w` as an object. So, for example, the initial spin on
# object 1 can be accessed as `w.metadata.initial_spin1`. See the documentation of
# `sxs.metadata.Metadata` for more details. And in IPython, tab completion works on the
# `w.metadata` object.
try:
w.metadata = Metadata.from_file(
os.path.join(os.path.dirname(file_name), "metadata"), ignore_invalid_lines=True, cache_json=False
)
except:
pass # Probably couldn't find the metadata.json/metadata.txt file
try: # Make sure the h5py.File gets closed, even in the event of an exception
# Add the old history to the new history, if found
try:
try:
old_history = f["History.txt"][()].decode()
except AttributeError:
old_history = f["History.txt"][()]
w._append_history("", 0)
w._append_history("<previous_history>", 2)
w._append_history(old_history, 1)
w._append_history("</previous_history>", 2)
except KeyError:
old_history = False
# Get the frame data, converting to quaternion objects
try:
w.frame = quaternion.as_quat_array(f["Frame"])
except KeyError:
pass # There was no frame data
# Get the descriptive items
try:
w.frameType = int(f.attrs["FrameType"])
except KeyError:
if "frameType" in kwargs:
w.frameType = int(kwargs.pop("frameType"))
else:
warning = (
f"\n`frameType` was not found in '{file_name}' or the keyword arguments.\n"
+ "Using default value `{}`. You may want to set it manually.\n\n".format(FrameNames[1])
)
warnings.warn(warning)
w.frameType = 1
try:
w.dataType = translate_data_types_GWFrames_to_waveforms(int(f.attrs["DataType"]))
except KeyError:
if "dataType" in kwargs:
w.dataType = int(kwargs.pop("dataType"))
else:
found = False
for type_int, type_name in zip(reversed(DataType), reversed(DataNames)):
if type_name.lower() in file_name.lower():
found = True
w.dataType = type_int
warning = (
f"\n`dataType` was not found in '{file_name}' or the keyword arguments.\n"
+ f"Using default value `{type_name}`. You may want to set it manually."
)
warnings.warn(warning)
break
if not found:
warning = (
f"\n`dataType` was not found in '{file_name}' or the keyword arguments.\n"
+ "You may want to set it manually."
)
warnings.warn(warning)
try:
w.r_is_scaled_out = bool(f.attrs["RIsScaledOut"])
except KeyError:
if "r_is_scaled_out" in kwargs:
w.r_is_scaled_out = bool(kwargs.pop("r_is_scaled_out"))
else:
warning = (
f"\n`r_is_scaled_out` was not found in '{file_name}' or the keyword arguments.\n"
+ "Using default value `True`. You may want to set it manually.\n\n"
)
warnings.warn(warning)
w.r_is_scaled_out = True
try:
w.m_is_scaled_out = bool(f.attrs["MIsScaledOut"])
except KeyError:
if "m_is_scaled_out" in kwargs:
w.m_is_scaled_out = bool(kwargs.pop("m_is_scaled_out"))
else:
warning = (
f"\n`m_is_scaled_out` was not found in '{file_name}' or the keyword arguments.\n"
+ "Using default value `True`. You may want to set it manually.\n\n"
)
warnings.warn(warning)
w.m_is_scaled_out = True
# Get the names of all the data sets in the h5 file, and check for matches
YLMdata = [data_set for data_set in list(f) for m in [pattern_Ylm.search(data_set)] if m]
if len(YLMdata) == 0:
raise ValueError(
f"Couldn't understand data set names in '{file_name}'.\n"
+ "Maybe you need to add the directory within the h5 file.\n"
+ f"E.g.: '{file_name}/Extrapolated_N2.dir'."
)
# Sort the data set names by increasing ell, then increasing m
YLMdata = sorted(
YLMdata,
key=lambda data_set: [
int(pattern_Ylm.search(data_set).group("L")),
int(pattern_Ylm.search(data_set).group("M")),
],
)
LM = np.array(
sorted(
[
[int(m.group("L")), int(m.group("M"))]
for data_set in YLMdata
for m in [pattern_Ylm.search(data_set)]
if m
]
)
)
ell_min, ell_max = min(LM[:, 0]), max(LM[:, 0])
if not np.array_equal(LM, sf.LM_range(ell_min, ell_max)):
raise ValueError(f"Input [ell,m] modes are not complete. Found modes:\n{LM}\n")
n_modes = len(LM)
# Get the time data (assuming all are equal)
T = f[YLMdata[0]][:, 0]
monotonic = index_is_monotonic(T)
w.t = T[monotonic]
n_times = len(w.t)
# Loop through, setting data in each mode
w.data = np.empty((n_times, n_modes), dtype=complex)
for m, DataSet in enumerate(YLMdata):
if f[DataSet].shape[0] != n_times:
raise ValueError(
f"The number of time steps in this dataset should be {n_times}; "
+ "it is {} in '{}'.".format(f[DataSet].shape[0], DataSet)
)
w.data[:, m] = f[DataSet][:, 1:3].view(dtype=complex)[monotonic, 0]
# Now that the data is set, we can set these
w.ells = ell_min, ell_max
# If possible, retrieve the CoM-correction parameters
try:
if hasattr(f, "attrs") and "space_translation" in f.attrs:
w.space_translation = np.array(list(f.attrs["space_translation"]))
elif old_history:
pattern = r"'{}': array\((.*?)\)".format("space_translation")
matches = re.search(pattern, old_history)
if matches:
w.space_translation = np.array(ast.literal_eval(matches.group(1)))
except:
pass
try:
if hasattr(f, "attrs") and "boost_velocity" in f.attrs:
w.boost_velocity = np.array(list(f.attrs["boost_velocity"]))
elif old_history:
pattern = r"'{}': array\((.*?)\)".format("boost_velocity")
matches = re.search(pattern, old_history)
if matches:
w.boost_velocity = np.array(ast.literal_eval(matches.group(1)))
except:
pass
# If possible, retrieve the CoM-correction parameters
try:
if "VersionHist.ver" in f_h5:
w.version_hist = [
[git_hash.decode("ascii"), message.decode("ascii")]
for git_hash, message in f_h5["VersionHist.ver"][()].tolist()
]
except:
pass
# Check up on the validity of the waveform
if not w.ensure_validity(alter=True, assertions=False):
raise ValueError("The data resulting from this input is invalid")
except KeyError:
print("\nThis H5 file appears to have not stored all the required information.\n\n")
raise # Re-raise the exception after adding our information
finally: # Use `finally` to make sure this happens:
f_h5.close()
if kwargs:
import pprint
warnings.warn("\nUnused kwargs passed to this function:\n{}".format(pprint.pformat(kwargs, width=1)))
return w
[docs]
def write_to_h5(w, file_name, file_write_mode="w", attributes={}, use_NRAR_format=True):
"""
Output the Waveform to an HDF5 file. Default behavior uses the NRAR format.
Note that the file_name is prepended with some descriptive information involving the data type and the frame type,
such as 'rhOverM_Corotating_' or 'rMpsi4_Aligned_'.
See also
--------
scri.rpxmb.save : Save a waveform in RPXMB format
scri.rpxmb.load : Load a file in RPXMB format
"""
import os.path
import h5py
import warnings
group = None
if ".h5" in file_name and not file_name.endswith(".h5"):
file_name, group = file_name.split(".h5")
file_name += ".h5"
# Add descriptive prefix to file_name
base_name = w.descriptor_string + "_" + os.path.basename(file_name)
if not os.path.dirname(file_name):
file_name = base_name
else:
file_name = os.path.join(os.path.dirname(file_name), base_name)
# Open the file for output
try:
f = h5py.File(file_name, file_write_mode)
except OSError: # If that did not work...
print(f"write_to_h5 was unable to open the file '{file_name}'.\n\n")
raise # re-raise the exception after the informative message above
try:
# If we are writing to a group within the file, create it
if group:
g = f.create_group(group)
else:
g = f
# Now write all the data to various groups in the file
g.attrs["OutputFormatVersion"] = "scri.SpEC"
g.create_dataset("History.txt", data="\n".join(w.history) + f"\n\nwrite_to_h5({w}, {file_name})\n")
g.attrs["FrameType"] = w.frameType
g.attrs["DataType"] = translate_data_types_waveforms_to_GWFrames(w.dataType)
g.attrs["RIsScaledOut"] = int(w.r_is_scaled_out)
g.attrs["MIsScaledOut"] = int(w.m_is_scaled_out)
if len(w.version_hist) > 0:
try:
version_hist = [[e.encode("ascii", "ignore") for e in hm] for hm in w.version_hist]
except AttributeError:
version_hist = w.version_hist
g.create_dataset("VersionHist.ver", data=version_hist)
for attr in attributes:
try:
g.attrs[attr] = attributes[attr]
except:
warning = f"scri.SpEC.write_to_h5 unable to output attribute {attr}={attributes[attr]}"
warnings.warn(warning)
if use_NRAR_format:
for i_m in range(w.n_modes):
ell, m = w.LM[i_m]
Data_m = g.create_dataset(
f"Y_l{ell}_m{m}.dat",
data=[[t, d.real, d.imag] for t, d in zip(w.t, w.data[:, i_m])],
compression="gzip",
shuffle=True,
)
Data_m.attrs["ell"] = ell
Data_m.attrs["m"] = m
else:
g.create_dataset("Time", data=w.t.tolist(), compression="gzip", shuffle=True)
if len(w.frame) > 0:
g.create_dataset("Frame", data=[[r.w, r.x, r.y, r.z] for r in w.frame])
else:
g.create_dataset("Frame", shape=())
Data = g.create_group("Data")
for i_m in range(w.n_modes):
ell, m = w.LM[i_m]
Data_m = Data.create_dataset(
"l{}_m{:+}".format(int(ell), int(m)), data=w.data[:, i_m], compression="gzip", shuffle=True
)
Data_m.attrs["ell"] = ell
Data_m.attrs["m"] = m
finally: # Use `finally` to make sure this happens:
f.close()
[docs]
def create_abd_from_h5(file_format, convention="SpEC", **kwargs):
"""Returns an AsymptoticBondiData object with waveform data loaded from specified H5 files.
The AsymptoticBondiData class internally uses the Moreschi-Boyle conventions, see the following reference:
O. Moreschi, On angular momentum at future null infinity, DOI:10.1088/0264-9381/3/4/006
If necessary, the waveform data will be converted to the Moreschi-Boyle conventions when loaded.
Parameters
----------
file_format : 'SXS', 'CCE', 'SpECTRECCE1', 'RPDMB', or 'RPXMB'
The H5 files may be in the one of the following file formats:
* 'SXS' - Dimensionless extrapolated waveform files found in the SXS Catalog, also known as NRAR format.
* 'CCE' - Asymptotic waveforms output by SpECTRE CCE. These are not dimensionless.
* 'SpECTRECCE1' - Asymptotic waveforms output by SpECTRE CCE's original format.
* SpECTRECCE' - Asymptotic waveforms output by SpECTRE CCE's newest format. (NOTE: this may break compatibility)
* 'RPDMB' - Dimensionless waveforms compressed using the rotating_paired_diff_multishuffle_bzip2 format.
* 'RPXMB' - Dimensionless waveforms compressed using the rotating_paired_xor_multishuffle_bzip2 format.
convention : 'SpEC' or 'Moreschi-Boyle'
The data conventions of the waveform data that will be loaded. This defaults to 'SpEC' since this will be
most often used with 'SpEC' convention waveforms.
Keyword Parameters
------------------
Psi4 : str, optional
Psi3 : str, optional
Psi2 : str, optional
Psi1 : str, optional
Psi0 : str, optional
h : str, optional
Path to H5 file containing the data. At least ONE the above waveform quantities is required.
Returns
-------
AsymptoticBondiData
"""
# Use case insensitive parameters
file_format = file_format.lower()
convention = convention.lower()
# Load waveform data from H5 files into WaveformModes objects
WMs = {}
filenames = {}
if file_format == "spectrecce" or file_format == "spectrecce1":
file_name = kwargs.pop("file_name")
with h5py.File(file_name, "r") as f:
cce = f["Cce"]
time = cce["Strain.dat"][:, 0]
indices = monotonic_indices(time)
time = time[indices]
ell_max = int(np.sqrt((cce["Strain.dat"].shape[1]-1) / 2) - 1)
for data_label in ["Psi4", "Psi3", "Psi2", "Psi1", "Psi0", "Strain"]:
if data_label != "Strain":
dataType = DataNames.index(data_label)
else:
dataType = DataNames.index("h")
WMs[data_label] = WaveformModes(
t=time,
data=cce[f"{data_label}.dat"][indices, 1:].view(np.complex128),
ell_min=0,
ell_max=ell_max,
frameType=Inertial,
dataType=dataType,
constructor_statement=f"create_abd_from_h5({file_format}, {convention=}, {file_name=})",
r_is_scaled_out=True,
m_is_scaled_out=False,
)
else:
for data_label in ["Psi4", "Psi3", "Psi2", "Psi1", "Psi0", "h"]:
if data_label in kwargs:
filenames[data_label] = kwargs.pop(data_label)
if file_format == "sxs":
WMs[data_label] = read_from_h5(filenames[data_label])
elif file_format == "cce":
WMs[data_label] = read_from_h5(
filenames[data_label],
dataType=DataNames.index(data_label),
frameType=Inertial,
r_is_scaled_out=True,
m_is_scaled_out=False,
)
elif file_format == "rpxmb" or file_format == "rpxm":
WMs[data_label] = rotating_paired_xor_multishuffle_bzip2.load(filenames[data_label])[0]
WMs[data_label].to_inertial_frame()
elif file_format == "rpdmb" or file_format == "rpdm":
WMs[data_label] = WaveformModes.from_sxs(
sxs.rpdmb.load(filenames[data_label])
)
else:
raise ValueError(
f"File format '{file_format}' not recognized. "
"Must be 'SXS', 'CCE', 'SpECTRECCE', 'RPDMB', or 'RPXMB'."
)
if kwargs:
import pprint
warnings.warn("\nUnused kwargs passed to this function:\n{}".format(pprint.pformat(kwargs, width=1)))
# Sanity check
if not WMs:
raise ValueError("No filenames have been provided. The data of at least one waveform quantity is required.")
WM_ref = WMs[list(WMs.keys())[0]]
for i in WMs:
if not (WM_ref.t == WMs[i].t).all():
raise ValueError(
f"All waveforms must share the same set of times. The data "
f"for {list(WMs.keys())[i].data_type_string} has a different set of times."
)
# Create an instance of AsymptoticBondiData
abd = AsymptoticBondiData(
time=WM_ref.t,
ell_max=WM_ref.ell_max,
multiplication_truncator=max,
)
# Define factors to convert between input waveform convention and Moreschi-Boyle convention
conversion_factor = {
# "input convention" : [Ψ₀, Ψ₁, Ψ₂, Ψ₃, Ψ₄, h]
"moreschi-boyle": [1, 1, 1, 1, 1, 1],
"spec": [2, -np.sqrt(2), 1, -1 / np.sqrt(2), 0.5, 0.5],
}
# Load the WaveformModes data into the ABD object and convert to the Moreschi-Boyle convention.
if "Psi4" in WMs:
abd.psi4[:, sf.LM_index(WMs["Psi4"].ell_min, -WMs["Psi4"].ell_min, 0) :] = (
conversion_factor[convention][4] * WMs["Psi4"].data
)
if "Psi3" in WMs:
abd.psi3[:, sf.LM_index(WMs["Psi3"].ell_min, -WMs["Psi3"].ell_min, 0) :] = (
conversion_factor[convention][3] * WMs["Psi3"].data
)
if "Psi2" in WMs:
abd.psi2[:, sf.LM_index(WMs["Psi2"].ell_min, -WMs["Psi2"].ell_min, 0) :] = (
conversion_factor[convention][2] * WMs["Psi2"].data
)
if "Psi1" in WMs:
abd.psi1[:, sf.LM_index(WMs["Psi1"].ell_min, -WMs["Psi1"].ell_min, 0) :] = (
conversion_factor[convention][1] * WMs["Psi1"].data
)
if "Psi0" in WMs:
abd.psi0[:, sf.LM_index(WMs["Psi0"].ell_min, -WMs["Psi0"].ell_min, 0) :] = (
conversion_factor[convention][0] * WMs["Psi0"].data
)
# ABD uses the Newman-Penrose scalar sigma instead of the strain h, so we have to take
# the complex conjugate.
if "h" in WMs:
abd.sigma[:, sf.LM_index(WMs["h"].ell_min, -WMs["h"].ell_min, 0) :] = (
conversion_factor[convention][5] * WMs["h"].data
)
abd.sigma = abd.sigma.bar
elif "Strain" in WMs:
abd.sigma[:, sf.LM_index(WMs["Strain"].ell_min, -WMs["Strain"].ell_min, 0) :] = (
conversion_factor[convention][5] * WMs["Strain"].data
)
abd.sigma = abd.sigma.bar
return abd