Source code for scri.waveform_base
# Copyright (c) 2015, Michael Boyle
# See LICENSE file for details: <https://github.com/moble/scri/blob/master/LICENSE>
import os
import inspect
import functools
import warnings
import socket
import datetime
import pprint
import copy
import numpy as np
import quaternion
import scipy.constants as spc
from scipy.interpolate import CubicSpline
from . import *
[docs]
@jit("void(c16[:,:], f8[:])")
def complex_array_norm(c, s):
for i in range(len(s)):
s[i] = 0.0
for j in range(c.shape[1]):
s[i] += c[i, j].real ** 2 + c[i, j].imag ** 2
return
[docs]
@jit("void(c16[:,:], f8[:])")
def complex_array_abs(c, s):
for i in range(len(s)):
s[i] = 0.0
for j in range(c.shape[1]):
s[i] += c[i, j].real ** 2 + c[i, j].imag ** 2
s[i] = np.sqrt(s[i])
return
[docs]
def waveform_alterations(func):
"""Temporarily increment history depth safely
This decorator stores the value of `self.__history_depth__`, then increments it by 1, calls the function,
returns the history depth to its original value, and then returns the result of the function. This should be
used on any member function that could alter the waveform on which it is called, or which could return a new
altered version of the original.
Typically, within the function itself, you will want to decrement the depth manually just before appending to the
history -- which will presumably take place at the end of the function. You do not need to undo this,
as the decorator will take care of that part.
"""
@functools.wraps(func)
def func_wrapper(self, *args, **kwargs):
if self.__history_depth__ == 0:
self._append_history("")
stored_history_depth = self.__history_depth__
self.__history_depth__ += 1
result = func(self, *args, **kwargs)
self.__history_depth__ = stored_history_depth
return result
return func_wrapper
[docs]
def test_without_assertions(errs, val, msg=""):
"""Replacement for np.testing.assert_
This function should be able to replace `assert_`, but rather than raising an exception, this just adds a
description of the problem to the `errors` variable.
"""
if not val:
try:
smsg = msg()
except TypeError:
smsg = msg
errs += [smsg]
[docs]
def test_with_assertions(errs, val, msg=""):
np.testing.assert_(val, "Failed assertion:\n\t" + msg)
class _object:
"""Useless class to allow multiple inheritance"""
def __init__(self, *args, **kwargs):
super().__init__()
[docs]
class WaveformBase(_object):
"""Object containing time, frame, and data, along with related information
This object is just the base object from which these other classes are derived:
* WaveformModes
* WaveformGrid
* WaveformInDetector
* WaveformInDetectorFT
For more specific information, see the documentation of those classes.
Attributes
----------
t : float array
Time steps corresponding to other data
frame : quaternion array
Rotors taking static basis onto decomposition basis
data : 2-d array of complex or real numbers
The nature of this data depends on the derived type. First index is time, second index depends on type.
history : list of strings
As far as possible, all functions applied to the object are recorded in the `history` variable. In fact,
the object should almost be able to be recreated using the commands in the history list. Commands taking
large arrays, however, are shortened -- so the data will not be entirely reconstructable.
version_hist : list of pairs of strings
Records the git hash and description for any change in the way SpEC outputs waveform data.
frameType : int
Index corresponding to `scri.FrameType` appropriate for `data`.
dataType : int
Index corresponding to `scri.DataType` appropriate for `data`.
r_is_scaled_out : bool
True if the `data` have been multiplied by the appropriate power of radius so that the asymptotic value can
be finite and nonzero.
m_is_scaled_out : bool
True if the `data` have been scaled by the appropriate value of the total mass so that they are dimensionless.
num : int (read only)
Automatically assigned number of this object. The constructor of this type keeps count of the number of
objects it has created, to assign each object a more-or-less unique ID for use in the history strings. This
counter is reset at the beginning of each python session. Subclasses should automatically have a different
counter.
Indexing
--------
WaveformBase objects can be indexed much like a numpy array, where the first dimension gives the time indices,
and the second gives the data-set indices. This will return another WaveformBase object containing slices of the
original data.
It is important to note, however, that as with numpy array slices, slicing a WaveformBase will not typically copy
the original data; the result will simply be a view into the data. This means that changing the data in the
slice can change the data in the original. If you want to make a copy, you should probably use the copy
constructor: `W2 = WaveformBase(W1)`. It is also possible to use the standard copy.deepcopy method.
Also note that the first slice dimension corresponds to the indices of the time, but the second dimension may NOT
correspond to indices for derived types. In particular, for `WaveformModes`, the second index corresponds to
modes, because this type enforces completeness of each ell mode. For the `WaveformBase` type, however,
the second index does correspond to the second dimension of the data.
For example,
>>> W = WaveformBase()
>>> W[10:-20]
will give all columns in the data, but only at times starting with the
10th time step, and ending one before the -20th time step. Meanwhile,
>>> W[10:-20,2]
will give the same range of times, but only the second column (unless the subclass overrides this behavior,
as in `WaveformModes`). Similarly,
>>> W[10:-20,2:5]
will return the same range of times, along with the 2,3,4 columns. Note the lack of 5 column, for consistency
with python's usual slice syntax.
>>> W[:,:0]
will return all time steps, along with all `frame` data, but `data` will be empty (because the `:0` term selects
everything before the 0th element). Similarly,
>>> W[:0,:0]
is empty of all numerical data.
"""
__num = 0 # Used to count number of Waveforms created
def __init__(self, *args, **kwargs):
"""Initializer for WaveformBase object
WaveformBase objects may be created in two ways. First, by copying an existing WaveformBase object -- in
which case the only parameter should be that object. Second, by passing any of the (writable) attributes as
keywords.
In both cases, the last step in initialization is to check the validity of the result. By default,
this will raise an exception if the result is not valid. An additional keyword parameter
`override_exception_from_invalidity` may be set if this is not desired. This may be necessary if only some
of the data can be passed in to the initializer, for example.
Keyword parameters
------------------
t: float array, empty default
frame : quaternion array, empty default
data : 2-d complex array, empty default
history : list of strings, empty default
This is the list of strings prepended to the history, an additional line is appended, showing the call to
this initializer.
version_hist : list of pairs of strings, empty default
Remains empty if waveform data is on version 0.
frameType : int, defaults to 0 (UnknownFrameType)
See scri.FrameNames for possible values
dataType : int, defaults to 0 (UnknownDataType)
See scri.DataNames for possible values
r_is_scaled_out : bool, defaults to False
Set to True if the data represented could approach a nonzero value at Scri
m_is_scaled_out : bool, defaults to False
Set to True if the data represented are dimensionless and in units where the total mass is 1
override_exception_from_invalidity: bool, defaults to False
If True, report any errors, but do not raise them.
constructor_statement : str, optional
If this is present, it will replace the default constructor statement added to the history. It is
prepended with a string of the form `'{0} = '.format(self)`, which prints the ID of the resulting object
(unique to this session only).
"""
original_kwargs = kwargs.copy()
super().__init__(*args, **kwargs) # to ensure proper calling in multiple inheritance
override_exception_from_invalidity = kwargs.pop("override_exception_from_invalidity", False)
self.__num = type(self).__num
self.__history_depth__ = 0
type(self).__num += 1 # Increment class's instance tracker
if len(args) == 0:
self.t = kwargs.pop("t", np.empty((0,), dtype=float))
self.frame = kwargs.pop("frame", np.empty((0,), dtype=np.quaternion))
self.data = kwargs.pop("data", np.empty((0, 0), dtype=complex))
# Information about this object
self.history = kwargs.pop("history", [])
self.version_hist = kwargs.pop("version_hist", [])
self.frameType = kwargs.pop("frameType", UnknownFrameType)
self.dataType = kwargs.pop("dataType", UnknownDataType)
self.r_is_scaled_out = kwargs.pop("r_is_scaled_out", False)
self.m_is_scaled_out = kwargs.pop("m_is_scaled_out", False)
if "constructor_statement" in kwargs:
self._append_history("{} = {}".format(self, kwargs.pop("constructor_statement")))
else:
opts = np.get_printoptions()
np.set_printoptions(threshold=6)
self._append_history(
"{} = {}(**{})".format(self, type(self).__name__, pprint.pformat(original_kwargs, indent=4))
)
np.set_printoptions(**opts)
elif len(args) == 1 and isinstance(args[0], type(self)):
other = args[0]
self.t = np.copy(other.t)
self.frame = np.copy(other.frame)
self.data = np.copy(other.data)
# Information about this object
self.history = other.history[:]
self.version_hist = other.version_hist[:]
self.frameType = other.frameType
self.dataType = other.dataType
self.r_is_scaled_out = other.r_is_scaled_out
self.m_is_scaled_out = other.m_is_scaled_out
self._append_history(["", "{} = {}({})".format(self, type(self).__name__, other)])
else:
raise ValueError(
"Did not understand input arguments to `{}` constructor.\n".format(type(self).__name__)
+ "Note that explicit data values must be passed as keywords,\n"
+ "whereas objects to be copied must be passed as the sole argument."
)
hostname = socket.gethostname()
cwd = os.getcwd()
time = datetime.datetime.now().isoformat()
self.__history_depth__ = 1
self.ensure_validity(alter=True, assertions=(not override_exception_from_invalidity))
self.__history_depth__ = 0
self._append_history([f"hostname = {hostname}", f"cwd = {cwd}", f"datetime = {time}", version_info()], 1)
if kwargs:
warning = "\nIn `{}` initializer, unused keyword arguments:\n".format(type(self).__name__)
warning += pprint.pformat(kwargs, indent=4)
warnings.warn(warning)
[docs]
@waveform_alterations
def ensure_validity(self, alter=True, assertions=False):
"""Try to ensure that the `WaveformBase` object is valid
This tests various qualities of the WaveformBase's members that are frequently assumed throughout the code.
If the optional argument `alter` is `True` (which is the default), this function tries to alter the
WaveformBase in place to ensure validity. Note that this is not always possible. If that is the case,
an exception may be raised. For example, if the `t` member is not a one-dimensional array of floats,
it is not clear what that data should be. Similarly, if the `t` and `data` members have mismatched
dimensions, there is no way to resolve that automatically.
Also note that this is almost certainly not be an exhaustive test of all assumptions made in the code.
If the optional `assertions` argument is `True` (default is `False`), the first test that fails will raise an
assertion error.
"""
import numbers
errors = []
alterations = []
if assertions:
test = test_with_assertions
else:
test = test_without_assertions
# Ensure that the various data are correct and compatible
test(
errors,
isinstance(self.t, np.ndarray),
"isinstance(self.t, np.ndarray) # type(self.t)={}".format(type(self.t)),
)
test(
errors,
self.t.dtype == np.dtype(float),
f"self.t.dtype == np.dtype(float) # self.t.dtype={self.t.dtype}",
)
if alter and self.t.ndim == 2 and self.t.shape[1] == 1:
self.t = self.t[:, 0]
alterations += ["{0}.t = {0}.t[:,0]".format(self)]
test(
errors,
not self.t.size or self.t.ndim == 1,
f"not self.t.size or self.t.ndim==1 # self.t.size={self.t.size}; self.t.ndim={self.t.ndim}",
)
test(
errors,
self.t.size <= 1 or np.all(np.diff(self.t) > 0.0),
"self.t.size<=1 or np.all(np.diff(self.t)>0.0) "
"# self.t.size={}; max(np.diff(self.t))={}".format(
self.t.size, (max(np.diff(self.t)) if self.t.size > 1 else np.nan)
),
)
test(errors, np.all(np.isfinite(self.t)), "np.all(np.isfinite(self.t))")
if alter and self.frame is None:
self.frame = np.empty((0,), dtype=np.quaternion)
alterations += [f"{self}.frame = np.empty((0,), dtype=np.quaternion)"]
test(
errors,
isinstance(self.frame, np.ndarray),
"isinstance(self.frame, np.ndarray) # type(self.frame)={}".format(type(self.frame)),
)
if alter and self.frame.dtype == np.dtype(float):
try: # Might fail because of shape
self.frame = quaternion.as_quat_array(self.frame)
alterations += ["{0}.frame = quaternion.as_quat_array({0}.frame)".format(self)]
except (AssertionError, ValueError):
pass
test(
errors,
self.frame.dtype == np.dtype(np.quaternion),
f"self.frame.dtype == np.dtype(np.quaternion) # self.frame.dtype={self.frame.dtype}",
)
test(
errors,
self.frame.size <= 1 or self.frame.size == self.t.size,
"self.frame.size<=1 or self.frame.size==self.t.size "
"# self.frame.size={}; self.t.size={}".format(self.frame.size, self.t.size),
)
test(errors, np.all(np.isfinite(self.frame)), "np.all(np.isfinite(self.frame))")
test(
errors,
isinstance(self.data, np.ndarray),
"isinstance(self.data, np.ndarray) # type(self.data)={}".format(type(self.data)),
)
test(errors, self.data.ndim >= 1, f"self.data.ndim >= 1 # self.data.ndim={self.data.ndim}")
test(
errors,
self.data.shape[0] == self.t.shape[0],
"self.data.shape[0]==self.t.shape[0] "
"# self.data.shape[0]={}; self.t.shape[0]={}".format(self.data.shape[0], self.t.shape[0]),
)
test(errors, np.all(np.isfinite(self.data)), "np.all(np.isfinite(self.data))")
# Information about this object
if alter and not self.history:
self.history = [""]
alterations += [f"{self}.history = ['']"]
if alter and isinstance(self.history, str):
self.history = self.history.split("\n")
alterations += ["{0}.history = {0}.history.split('\n')".format(self)]
test(
errors,
isinstance(self.history, list),
"isinstance(self.history, list) # type(self.history)={}".format(type(self.history)),
)
test(
errors,
isinstance(self.history[0], str),
"isinstance(self.history[0], str) # type(self.history[0])={}".format(type(self.history[0])),
)
test(
errors,
isinstance(self.frameType, numbers.Integral),
"isinstance(self.frameType, numbers.Integral) # type(self.frameType)={}".format(type(self.frameType)),
)
test(errors, self.frameType in FrameType, f"self.frameType in FrameType # self.frameType={self.frameType}")
test(
errors,
isinstance(self.dataType, numbers.Integral),
"isinstance(self.dataType, numbers.Integral) # type(self.dataType)={}".format(type(self.dataType)),
)
test(errors, self.dataType in DataType, f"self.dataType in DataType # self.dataType={self.dataType}")
test(
errors,
isinstance(self.r_is_scaled_out, bool),
"isinstance(self.r_is_scaled_out, bool) # type(self.r_is_scaled_out)={}".format(type(self.r_is_scaled_out)),
)
test(
errors,
isinstance(self.m_is_scaled_out, bool),
"isinstance(self.m_is_scaled_out, bool) # type(self.m_is_scaled_out)={}".format(type(self.m_is_scaled_out)),
)
test(
errors,
isinstance(self.num, numbers.Integral),
"isinstance(self.num, numbers.Integral) # type(self.num)={}".format(type(self.num)),
)
if alterations:
self._append_history(alterations)
warnings.warn("The following alterations were made:\n\t" + "\n\t".join(alterations))
if errors:
warnings.warn("The following conditions were found to be incorrectly False:\n\t" + "\n\t".join(errors))
return False
self.__history_depth__ -= 1
self._append_history("WaveformBase.ensure_validity" + f"({self}, alter={alter}, assertions={assertions})")
return True
@property
def is_valid(self):
return self.ensure_validity(alter=False, assertions=False)
# Data sizes
@property
def n_data_sets(self):
return int(np.prod(self.data.shape[1:]))
@property
def n_times(self):
return self.t.shape[0]
# Calculate weights
@property
def spin_weight(self):
return SpinWeights[self.dataType]
@property
def conformal_weight(self):
return ConformalWeights[self.dataType] + (-RScaling[self.dataType] if self.r_is_scaled_out else 0)
@property
def gamma_weight(self):
"""Non-conformal effect of a boost.
This factor allows for mass-scaling, for example. If the waveform describes `r*h/M`, for example,
then `r` and `h` vary by the conformal weight, which depends on the direction; whereas `M` is a monopole,
and thus cannot depend on the direction. Instead, `M` simply obeys the standard formula, scaling with gamma.
"""
return (MScaling[self.dataType] if self.m_is_scaled_out else 0) + (
-RScaling[self.dataType] if (self.r_is_scaled_out and self.m_is_scaled_out) else 0
)
@property
def r_scaling(self):
return RScaling[self.dataType]
@property
def m_scaling(self):
return MScaling[self.dataType]
# Text descriptions
@property
def num(self):
return self.__num
@property
def frame_type_string(self):
return FrameNames[self.frameType]
@property
def data_type_string(self):
return DataNames[self.dataType]
@property
def data_type_latex(self):
return DataNamesLaTeX[self.dataType]
@property
def descriptor_string(self):
"""Create a simple string describing the content of the waveform
This string will be suitable for file names. For example, 'rMpsi4' or 'rhOverM'. It uses the waveform's
knowledge of itself, so if this is incorrect, the result will be incorrect.
"""
if self.dataType == UnknownDataType:
return self.data_type_string
descriptor = ""
if self.r_is_scaled_out:
if RScaling[self.dataType] == 1:
descriptor = "r"
elif RScaling[self.dataType] > 1:
descriptor = "r" + str(RScaling[self.dataType])
if self.m_is_scaled_out:
Mexponent = MScaling[self.dataType] - (RScaling[self.dataType] if self.r_is_scaled_out else 0)
if Mexponent < -1:
descriptor = descriptor + self.data_type_string + "OverM" + str(-Mexponent)
elif Mexponent == -1:
descriptor = descriptor + self.data_type_string + "OverM"
elif Mexponent == 0:
descriptor = descriptor + self.data_type_string
elif Mexponent == 1:
descriptor = descriptor + "M" + self.data_type_string
elif Mexponent > 1:
descriptor = descriptor + "M" + str(Mexponent) + self.data_type_string
else:
descriptor = descriptor + self.data_type_string
return descriptor
# Data simplifications
@property
def data_2d(self):
return self.data.reshape((self.n_times, self.n_data_sets))
@property
def abs(self):
return np.abs(self.data)
@property
def arg(self):
return np.angle(self.data)
@property
def arg_unwrapped(self):
return np.unwrap(np.angle(self.data), axis=0)
[docs]
def norm(self, take_sqrt=False, indices=slice(None, None, None)):
"""L2 norm of the waveform
The optional arguments say whether to take the square-root of
the norm at each time, and allow restriction to a slice of the
data, respectively.
"""
if indices == slice(None, None, None):
n = np.empty((self.n_times,), dtype=float)
else:
n = np.empty((self.t[indices].shape[0],), dtype=float)
if take_sqrt:
complex_array_abs(self.data_2d[indices], n)
else:
complex_array_norm(self.data_2d[indices], n)
return n
[docs]
def max_norm_index(self, skip_fraction_of_data=4):
"""Index of time step with largest norm
The optional argument skips a fraction of the data. The default is
4, which means that it only searches the last three-fourths of the
data for the max. If 0 or 1 is input, this is ignored, and all the
data is searched.
"""
if skip_fraction_of_data == 0 or skip_fraction_of_data == 1:
indices = slice(None, None, None)
return np.argmax(self.norm(indices=indices))
else:
indices = slice(self.n_times // skip_fraction_of_data, None, None)
return np.argmax(self.norm(indices=indices)) + (self.n_times // skip_fraction_of_data)
[docs]
def max_norm_time(self, skip_fraction_of_data=4):
"""Return time at which largest norm occurs in data
See `help(max_norm_index)` for explanation of the optional argument.
"""
return self.t[self.max_norm_index(skip_fraction_of_data=skip_fraction_of_data)]
[docs]
def compare(self, w_a, min_time_step=0.005, min_time=-3.0e300):
"""Return a waveform with differences between the two inputs
This function simply subtracts the data in this waveform from the data
in Waveform A, and finds the rotation needed to take this frame into frame A.
Note that the waveform data are stored as complex numbers, rather than as
modulus and phase.
"""
from quaternion.means import mean_rotor_in_chordal_metric
from scri.extrapolation import intersection
import scri.waveform_modes
if self.frameType != w_a.frameType:
warning = (
"\nWarning:"
+ "\n This Waveform is in the "
+ self.frame_type_string
+ " frame,"
+ "\n The Waveform in the argument is in the "
+ w_a.frame_type_string
+ " frame."
+ "\n Comparing them probably does not make sense.\n"
)
warnings.warn(warning)
if self.n_modes != w_a.n_modes:
raise Exception(
"Trying to compare waveforms with mismatched LM data."
+ "\nA.n_modes="
+ str(w_a.n_modes)
+ "\tB.n_modes()="
+ str(self.n_modes)
)
new_times = intersection(self.t, w_a.t)
w_c = scri.waveform_modes.WaveformModes(
t=new_times,
data=np.zeros((new_times.shape[0], self.n_modes), dtype=self.data.dtype),
history=[],
version_hist=self.version_hist,
frameType=self.frameType,
dataType=self.dataType,
r_is_scaled_out=self.r_is_scaled_out,
m_is_scaled_out=self.m_is_scaled_out,
ell_min=self.ell_min,
ell_max=self.ell_max,
)
w_c.history += ["B.compare(A)\n"]
w_c.history += ["### A.history.str():\n" + "".join(w_a.history)]
w_c.history += ["### B.history.str():\n" + "".join(self.history)]
w_c.history += ["### End of old histories from `compare`"]
# Process the frame, depending on the sizes of the input frames
if w_a.frame.shape[0] > 1 and self.frame.shape[0] > 1:
# Find the frames interpolated to the appropriate times
Aframe = quaternion.squad(w_a.frame, w_a.t, w_c.t)
Bframe = quaternion.squad(self.frame, self.t, w_c.t)
# Assign the data
w_c.frame = Aframe * np.array([np.quaternion.inverse(v) for v in Bframe])
elif w_a.frame.shape[0] == 1 and self.frame.shape[0] > 1:
# Find the frames interpolated to the appropriate times
Bframe = np.quaternion.squad(self.frame, self.t, w_c.t)
# Assign the data
w_c.frame.resize(w_c.n_times)
w_c.frame = w_a.frame[0] * np.array([np.quaternion.inverse(v) for v in Bframe])
elif w_a.frame.shape[0] > 1 and self.frame.shape[0] == 1:
# Find the frames interpolated to the appropriate times
Aframe = np.quaternion.squad(w_a.frame, w_a.t, w_c.t)
# Assign the data
w_c.frame.resize(w_c.n_times)
w_c.frame = Aframe * np.quaternion.inverse(self.frame[0])
elif w_a.frame.shape[0] == 1 and self.frame.shape[0] == 1:
# Assign the data
w_c.frame = np.array(w_a.frame[0] * np.quaternions.inverse(self.frame[0]))
elif w_a.frame.shape[0] == 0 and self.frame.shape[0] == 1:
# Assign the data
w_c.frame = np.array(np.quaternions.inverse(self.frame[0]))
elif w_a.frame.shape[0] == 1 and self.frame.shape[0] == 1:
# Assign the data
w_c.frame = np.array(w_a.frame[0])
# else, leave the frame data empty
# If the average frame rotor is closer to -1 than to 1, flip the sign
if w_c.frame.shape[0] == w_c.n_times:
R_m = mean_rotor_in_chordal_metric(w_c.frame, w_c.t)
if quaternion.rotor_chordal_distance(R_m, -quaternion.one) < quaternion.rotor_chordal_distance(
R_m, quaternion.one
):
w_c.frame = -w_c.frame
elif w_c.frame.shape[0] == 1:
if quaternion.rotor_chordal_distance(w_c.frame[0], -quaternion.one) < quaternion.rotor_chordal_distance(
w_c.frame[0], quaternion.one
):
w_c.frame[0] = -w_c.frame[0]
# Now loop over each mode filling in the waveform data
for AMode in range(w_a.n_modes):
# Assume that all the ell,m data are the same, but not necessarily in the same order
BMode = self.index(w_a.LM[AMode][0], w_a.LM[AMode][1])
# Initialize the interpolators for this data set
# (Can't just re-view here because data are not contiguous)
splineReA = CubicSpline(w_a.t, w_a.data[:, AMode].real)
splineImA = CubicSpline(w_a.t, w_a.data[:, AMode].imag)
splineReB = CubicSpline(self.t, self.data[:, BMode].real)
splineImB = CubicSpline(self.t, self.data[:, BMode].imag)
# Assign the data from the transition
w_c.data[:, AMode] = (splineReA(w_c.t) - splineReB(w_c.t)) + 1j * (splineImA(w_c.t) - splineImB(w_c.t))
return w_c
@property
def data_dot(self):
return CubicSpline(self.t, self.data).derivative()(self.t)
@property
def data_ddot(self):
return CubicSpline(self.t, self.data).derivative(2)(self.t)
@property
def data_int(self):
return CubicSpline(self.t, self.data).antiderivative()(self.t)
@property
def data_iint(self):
return CubicSpline(self.t, self.data).antiderivative(2)(self.t)
# Data representations
def _append_history(self, hist, additional_depth=0):
"""Add to the object's history log
Input may be a single string or list of strings. Any newlines will be split into separate strings. Each
such string is then prepended with a number of `#`s, indicating that the content of that line was called from
within a member function, or is simply a piece of information relevant to the waveform. The idea behind this
is that the history should be -- as nearly as possible -- a script that could be run to reproduce the
waveform, so the lines beginning with `#` would not be run.
The number of `#`s is controlled by the object's `__history_depth__` field and the optional input to this
function; their sum is the number prepended. The user should never have to deal with this issue,
but all member functions should increment the `__history_depth__` before calling another member function,
and decrement it again as necessary before recording itself in the history. Also, for any lines added just
for informational purposes (e.g., the hostname, pwd, date, and versions added in `__init__`), this function
should be called with `1` as the optional argument.
"""
if not isinstance(hist, list):
hist = [hist]
self.history += [
"# " * (self.__history_depth__ + additional_depth) + hist_line
for hist_element in hist
for hist_line in hist_element.split("\n")
]
def __str__(self):
# "The goal of __str__ is to be readable; the goal of __repr__ is to be unambiguous." --- stackoverflow
return "{}_{}".format(type(self).__name__, self.num)
def __repr__(self):
# "The goal of __str__ is to be readable; the goal of __repr__ is to be unambiguous." --- stackoverflow
from textwrap import dedent
opts = np.get_printoptions()
np.set_printoptions(threshold=6, linewidth=150, precision=6)
rep = """
{0}(
t={1},
frame={2},
data={5},
frameType={6}, dataType={7},
r_is_scaled_out={8}, m_is_scaled_out={9}) # num = {10}"""
rep = rep.format(
type(self).__name__,
str(self.t).replace("\n", "\n" + " " * 15),
str(self.frame).replace("\n", "\n" + " " * 19),
self.history,
self.version_hist,
str(self.data).replace("\n", "\n" + " " * 18),
self.frameType,
self.dataType,
self.r_is_scaled_out,
self.m_is_scaled_out,
self.num,
)
np.set_printoptions(**opts)
return dedent(rep)
def __getstate__(self):
"""Get state of object for copying and pickling
The only nontrivial operation is with quaternions, since they can't
currently be pickled automatically. We just view the frame array as
a float array, and pickle as usual.
Also, we remove the `num` value, because this will get reset
properly on creation.
"""
state = copy.deepcopy(self.__dict__)
state["frame"] = quaternion.as_float_array(self.frame)
return state
def __setstate__(self, state):
"""Set state of object for copying and pickling
The only nontrivial operation is with quaternions, since they can't
currently be pickled automatically. We just view the frame array as
a float array, and unpickle as usual, then convert the float array
back to a quaternion array.
"""
new_num = self.__num
old_num = state.get("_WaveformBase__num")
self.__dict__.update(state)
# Make sure to preserve auto-incremented num
self.__num = new_num
self.frame = quaternion.as_quat_array(self.frame)
self._append_history(f"copied, deepcopied, or unpickled as {self}", 1)
self._append_history("{} = {}".format(self, f"{self}".replace(str(self.num), str(old_num))))
[docs]
@waveform_alterations
def deepcopy(self):
"""Return a deep copy of the object
This is just an alias for `copy`, which is deep anyway.
"""
W = self.copy()
W.__history_depth__ -= 1
W._append_history(f"{W} = {self}.deepcopy()")
return W
[docs]
@waveform_alterations
def copy(self):
"""Return a (deep) copy of the object
Note that this also copies all members if the object is a subclass. If you want a forgetful WaveformBase
object, you can simply use the copy constructor.
"""
W = type(self)()
state = copy.deepcopy(self.__dict__)
state.pop("_WaveformBase__num")
W.__dict__.update(state)
W.__history_depth__ -= 1
W._append_history(f"{W} = {self}.copy()")
return W
[docs]
@waveform_alterations
def copy_without_data(self):
"""Return a copy of the object, with empty `t`, `frame`, and `data` fields
Note that subclasses may override this to set some related data members. For example,
`WaveformModes.copy_without_data` sets the `ell_min` and `ell_max` fields appropriately. If you wish to only
skip `t`, `frame`, and `data`, you can simply use `WaveformBase.copy_without_data(W)`. The latter is useful
if, for example, you will only be making changes to those three fields, and want everything else preserved.
Also note that some slicing operations can achieve similar -- but different -- goals. For example,
`w = w[:, :0]` will simply empty `data` and `ells`, without affecting the `time` and `frame`.
"""
W = type(self)()
state = copy.deepcopy(self.__dict__)
state.pop("_WaveformBase__num")
state.pop("t")
state.pop("frame")
state.pop("data")
W.__dict__.update(state)
W.__history_depth__ -= 1
W._append_history(f"{W} = {self}.copy_without_data()")
return W
def _allclose(
self, other, report_all=True, rtol=1e-10, atol=1e-10, compare_history_beginnings=False, exceptions=[]
):
"""Check that member data in two waveforms are the same
For data sets (time, modes, etc.), the numpy function `np.allclose` is used, with the input tolerances. See
that function's documentation for more details. The `*__num` datum is always ignored. By default,
the `history` is ignored, though this can be partially overridden -- in which case, the shortest subset of
the histories is compared for exact equality. This is probably only appropriate for the case where one
waveform was created from the other.
Parameters
----------
other : object
Another object subclassing WaveformBase to compare
report_all: bool, optional
Wait until all attributes have been checked (and reported on) before returning the verdict
rtol : float, optional
Relative tolerance to which to compare arrays (see np.allclose), defaults to 1e-10
atol : float, optional
Absolute tolerance to which to compare arrays (see np.allclose), defaults to 1e-10
compare_history_beginnings: bool, optional
Compare the shortest common part of the `history` fields for equality, defaults to False
exceptions : list, optional
Don't compare elements in this list, corresponding to keys in the object's `__dict__`, defaults to []
"""
equality = True
if not type(self) == type(other): # not isinstance(other, self.__class__):
warnings.warn("\n (type(self)={}) != (type(other)={})".format(type(self), type(other)))
equality = False
if not report_all and not equality:
return False
for key, val in self.__dict__.items():
if key.endswith("__num") or key in exceptions:
continue
elif key == "history":
if compare_history_beginnings:
min_length = min(len(self.history), len(other.history))
if self.history[:min_length] != other.history[:min_length]:
warnings.warn("\n `history` fields differ")
equality = False
elif key == "version_hist":
if self.version_hist != other.version_hist:
warnings.warn("\n `version_hist` fields differ")
equality = False
elif isinstance(val, np.ndarray):
if val.dtype == np.quaternion:
if not np.allclose(
quaternion.as_float_array(val), quaternion.as_float_array(other.__dict__[key]), rtol, atol
):
warnings.warn(f"\n `{key}` fields differ")
equality = False
elif not np.allclose(val, other.__dict__[key], rtol, atol):
warnings.warn(f"\n `{key}` fields differ")
equality = False
else:
if not val == other.__dict__[key]:
warnings.warn(
"\n (self.{0}={1}) != (other.{0}={2}) fields differ".format(key, val, other.__dict__[key])
)
equality = False
if not report_all and not equality:
return False
return equality
# Slicing
@waveform_alterations
def __getitem__(self, key):
"""Extract subsets of the data efficiently
See the docstring of the WaveformBase class for examples.
"""
W = WaveformBase.copy_without_data(self)
# Remove trivial tuple structure first
if isinstance(key, tuple) and len(key) == 1:
key = key[0]
# Now figure out which type of return is desired
if isinstance(key, tuple) and 2 <= len(key) <= self.n_data_sets:
# Return a subset of the data from a subset of times
W.t = self.t[key[0]]
W.frame = self.frame[key[0]]
W.data = self.data[key]
elif isinstance(key, slice) or isinstance(key, int):
# Return complete data from a subset of times (key is slice), or
# return complete data from a single instant in time (key is int)
W.t = self.t[key]
W.frame = self.frame[key]
W.data = self.data[key]
else:
raise ValueError("Could not understand input `{}` (of type `{}`) ".format(key, type(key)))
W.__history_depth__ -= 1
W._append_history(f"{W} = {self}[{key}]")
return W
[docs]
@waveform_alterations
def interpolate(self, tprime):
"""Interpolate the frame and data onto the new set of time steps
Note that only `t`, `frame`, and `data` are changed in this function. If there is a corresponding data set
in a subclass, for example, the subclass must override this function to set that data set -- though this
function should probably be called to handle the ugly stuff.
"""
# Copy the information fields, but not the data
W = WaveformBase.copy_without_data(self)
W.t = np.copy(tprime)
W.frame = quaternion.squad(self.frame, self.t, W.t)
W.data = np.empty((W.n_times,) + self.data.shape[1:], dtype=self.data.dtype)
W.data_2d[:] = CubicSpline(self.t, self.data_2d)(W.t)
W.__history_depth__ -= 1
W._append_history(f"{W} = {self}.interpolate({tprime})")
return W
[docs]
@waveform_alterations
def SI_units(self, current_unit_mass_in_solar_masses, distance_from_source_in_megaparsecs=100):
"""Assuming current quantities are in geometric units, convert to SI units
This function assumes that the `dataType`, `r_is_scaled_out`, and `m_is_scaled_out` attributes are correct,
then scales the amplitude and time data appropriately so that the data correspond to data that could be
observed from a source with the given total mass at the given distance.
Note that the curvature scalars will have units of s^-2, rather than the arguably more correct m^-2. This
seems to be more standard in numerical relativity. The result can be divided by `scipy.constants.c**2`
to give units of m^-2 if desired.
Parameters
----------
current_unit_mass_in_solar_masses : float
Mass of the system in the data converted to solar masses
distance_from_source_in_megaparsecs : float, optional
Output will be waveform as observed from this distance, default=100 (Mpc)
"""
if not self.r_is_scaled_out:
warning = (
"\nTrying to convert to SI units, the radius is supposedly not scaled out.\n"
+ "This seems to suggest that the data may already be in some units..."
)
warnings.warn(warning)
if not self.m_is_scaled_out:
warning = (
"\nTrying to convert to SI units, the mass is supposedly not scaled out.\n"
+ "This seems to suggest that the data may already be in some units..."
)
warnings.warn(warning)
M_in_meters = current_unit_mass_in_solar_masses * m_sun_in_meters # m
M_in_seconds = M_in_meters / speed_of_light # s
R_in_meters = distance_from_source_in_megaparsecs * (1e6 * parsec_in_meters) # m
R_over_M = R_in_meters / M_in_meters # [dimensionless]
# The radius scaling `r_scaling` is the number of factors of the dimensionless quantity `R_over_M` required
# to keep the waveform asymptotically constant. So, for example, h and Psi4 both have `r_scaling=1`. The
# mass scaling `m_scaling` is the number of factors of `M_in_meters` required to make the waveform
# dimensionless, and does not account for the factors of mass in the radius scale. The Newman-Penrose
# quantities are curvature quantities, so they have dimensions 1/m^2, and thus have `m_scaling=2`.
if self.r_is_scaled_out:
if self.m_is_scaled_out:
amplitude_scaling = (R_over_M**-self.r_scaling) * (M_in_meters**-self.m_scaling)
else:
amplitude_scaling = R_over_M**-self.r_scaling
else:
if self.m_is_scaled_out:
amplitude_scaling = M_in_meters**-self.m_scaling
else:
amplitude_scaling = 1.0
# Copy the information fields, but not the data
W = WaveformBase.copy_without_data(self)
if self.m_is_scaled_out:
W.t = M_in_seconds * self.t # s
else:
W.t = np.copy(self.t) # supposedly already in the appropriate units...
W.frame = np.copy(self.frame)
W.data = amplitude_scaling * self.data
W.m_is_scaled_out = False
W.r_is_scaled_out = False
W.__history_depth__ -= 1
W._append_history(
"{} = {}.SI_units(current_unit_mass_in_solar_masses={}, "
"distance_from_source_in_megaparsecs={})".format(
W, self, current_unit_mass_in_solar_masses, distance_from_source_in_megaparsecs
)
)
return W