# Copyright (c) 2019, Michael Boyle
# See LICENSE file for details: <https://github.com/moble/scri/blob/master/LICENSE>
import functools
import numpy as np
from . import jit
maxexp = np.finfo(float).maxexp * np.log(2) * 0.99
@jit
def _transition_function(x, x0, x1, y0, y1):
transition = np.empty_like(x)
ydiff = y1 - y0
i = 0
while x[i] <= x0:
i += 1
i0 = i
transition[:i0] = y0
while x[i] < x1:
tau = (x[i] - x0) / (x1 - x0)
exponent = 1.0 / tau - 1.0 / (1.0 - tau)
if exponent >= maxexp:
transition[i] = y0
else:
transition[i] = y0 + ydiff / (1.0 + np.exp(exponent))
i += 1
i1 = i
transition[i1:] = y1
return transition, i0, i1
[docs]
def transition_function(x, x0, x1, y0=0.0, y1=1.0, return_indices=False):
"""Return a smooth function that is constant outside (x0, x1).
This uses the standard smooth (C^infinity) function with derivatives of compact support to
transition between the two values, being constant outside of the transition region (x0, x1).
Parameters
==========
x: array_like
One-dimensional monotonic array of floats.
x0: float
Value before which the output will equal `y0`.
x1: float
Value after which the output will equal `y1`.
y0: float [defaults to 0.0]
Value of the output before `x0`.
y1: float [defaults to 1.0]
Value of the output after `x1`.
return_indices: bool [defaults to False]
If True, return the array and the indices (i0, i1) at which the transition occurs, such that
t[:i0]==y0 and t[i1:]==y1.
"""
if return_indices:
return _transition_function(x, x0, x1, y0, y1)
return _transition_function(x, x0, x1, y0, y1)[0]
[docs]
@jit
def transition_function_derivative(x, x0, x1, y0=0.0, y1=1.0):
"""Return derivative of the transition function
This function simply returns the derivative of `transition_function` with respect to the `x`
parameter. The parameters to this function are identical to those of that function.
Parameters
==========
x: array_like
One-dimensional monotonic array of floats.
x0: float
Value before which the output will equal `y0`.
x1: float
Value after which the output will equal `y1`.
y0: float [defaults to 0.0]
Value of the output before `x0`.
y1: float [defaults to 1.0]
Value of the output after `x1`.
"""
transition_prime = np.zeros_like(x)
ydiff = y1 - y0
i = 0
while x[i] <= x0:
i += 1
while x[i] < x1:
tau = (x[i] - x0) / (x1 - x0)
exponent = 1.0 / tau - 1.0 / (1.0 - tau)
if exponent >= maxexp:
transition_prime[i] = 0.0
else:
exponential = np.exp(1.0 / tau - 1.0 / (1.0 - tau))
transition_prime[i] = (
-ydiff
* exponential
* (-1.0 / tau ** 2 - 1.0 / (1.0 - tau) ** 2)
* (1 / (x1 - x0))
/ (1.0 + exponential) ** 2
)
i += 1
return transition_prime
[docs]
@jit
def bump_function(x, x0, x1, x2, x3, y0=0.0, y12=1.0, y3=0.0):
"""Return a smooth bump function that is constant outside (x0, x3) and inside (x1, x2).
This uses the standard C^infinity function with derivatives of compact support to transition
between the the given values. By default, this is a standard bump function that is 0 outside of
(x0, x3), and is 1 inside (x1, x2), but the constant values can all be adjusted optionally.
Parameters
==========
x: array_like
One-dimensional monotonic array of floats.
x0: float
Value before which the output will equal `y0`.
x1, x2: float
Values between which the output will equal `y12`.
x3: float
Value after which the output will equal `y3`.
y0: float [defaults to 0.0]
Value of the output before `x0`.
y12: float [defaults to 1.0]
Value of the output after `x1` but before `x2`.
y3: float [defaults to 0.0]
Value of the output after `x3`.
"""
bump = np.empty_like(x)
ydiff01 = y12 - y0
ydiff23 = y3 - y12
i = 0
while x[i] <= x0:
i += 1
bump[:i] = y0
while x[i] < x1:
tau = (x[i] - x0) / (x1 - x0)
exponent = 1.0 / tau - 1.0 / (1.0 - tau)
if exponent >= maxexp:
bump[i] = y0
else:
bump[i] = y0 + ydiff01 / (1.0 + np.exp(exponent))
i += 1
i1 = i
while x[i] <= x2:
i += 1
bump[i1:i] = y12
while x[i] < x3:
tau = (x[i] - x2) / (x3 - x2)
exponent = 1.0 / tau - 1.0 / (1.0 - tau)
if exponent >= maxexp:
bump[i] = y12
else:
bump[i] = y12 + ydiff23 / (1.0 + np.exp(exponent))
i += 1
bump[i:] = y3
return bump
[docs]
def transition_to_constant(f, t, t1, t2):
"""Smoothly transition from the function to a constant.
This works (implicitly) by multiplying the derivative of `f` with the transition function, and
then integrating. Using integration by parts, this simplifies to multiplying `f` itself by the
transition function, and then subtracting the integral of `f` times the derivative of the
transition function. This integral is effectively restricted to the region (t1, t2). Note that
the final value (after t2) will depend on the precise values of `t1` and `t2`, and the behavior
of `f` in between.
Parameters
==========
f: array_like
One-dimensional array corresponding to the following `t` parameter.
t: array_like
One-dimensional monotonic array of floats.
t1: float
Value before which the output will equal `f`.
t2: float
Value after which the output will be constant.
"""
from quaternion import indefinite_integral
transition, i1, i2 = transition_function(t, t1, t2, y0=1.0, y1=0.0, return_indices=True)
transition_dot = transition_function_derivative(t, t1, t2, y0=1.0, y1=0.0)
f_transitioned = f * transition
f_transitioned[i1:i2] -= indefinite_integral(f[i1:i2] * transition_dot[i1:i2], t[i1:i2])
f_transitioned[i2:] = f_transitioned[i2 - 1]
return f_transitioned
[docs]
@jit
def xor_timeseries(c):
"""XOR a time-series of data in place
Assumes time varies along the first dimension of the input array, but any number of other
dimensions are supported.
This function leaves the first time step unchanged, but successive timesteps are the XOR from
the preceding time step — storing only the bits that have changed. This transformation is
useful when storing the data because it allows for greater compression in many cases.
Note that the direction in which this operation is done matters. This function starts with the
last time, changes that data in place, and proceeds to earlier times. To undo this
transformation, we need to start at early times and proceed to later times.
The function `xor_timeseries_reverse` achieves the opposite transformation, recovering the
original data with bit-for-bit accuracy.
"""
u = c.view(np.uint64)
for i in range(u.shape[0] - 1, 0, -1):
u[i] = np.bitwise_xor(u[i - 1], u[i])
return c
[docs]
@jit
def xor_timeseries_reverse(c):
"""XOR a time-series of data in place
This function reverses the effects of `xor_timeseries`. See that function's docstring for
details.
"""
u = c.view(np.uint64)
for i in range(1, u.shape[0]):
u[i] = np.bitwise_xor(u[i - 1], u[i])
return c
[docs]
@jit
def fletcher32(data):
"""Compute the Fletcher-32 checksum of an array
This checksum is very easy to implement from scratch and very fast.
Note that it's not entirely clear that everyone agrees on the naming of
these functions. This version uses 16-bit input, 32-bit accumulators,
block sizes of 360, and a modulus of 65_535.
Parameters
==========
data: ndarray
This array can have any dtype, but must be able to be viewed as uint16.
Returns
=======
checksum: uint32
"""
data = data.reshape((data.size,)).view(np.uint16)
size = data.size
c0 = np.uint32(0)
c1 = np.uint32(0)
j = 0
block_size = 360 # largest number of sums that can be performed without overflow
while j < size:
block_length = min(block_size, size - j)
for i in range(block_length):
c0 += data[j]
c1 += c0
j += 1
c0 %= np.uint32(65535)
c1 %= np.uint32(65535)
return c1 << np.uint32(16) | c0
[docs]
@functools.lru_cache()
def multishuffle(shuffle_widths, forward=True):
"""Construct functions to "multi-shuffle" data
The standard "shuffle" algorithm (as found in HDF5, for example) takes an
array of numbers and shuffles their bytes so that all bytes of a given
significance are stored together — the first byte of all the numbers are
stored contiguously, then the second byte of all the numbers, and so on.
The motivation for this is that — with reasonably smooth data — bytes in
the same byte position in sequential numbers are usually more related to
each other than they are to other bytes within the same number, which means
that shuffling results in better compression of the data.
There is no reason that shuffling can only work byte-by-byte, however.
There is also a "bitshuffle" algorithm, which works in the same way, but
collecting bits rather than bytes. More generally, we could vary the
number of bits stored together as we move along the numbers. For example,
we could store the first 8 bits of each number, followed by the next 4 bits
of each number, etc. This is the "multi-shuffle" algorithm.
With certain types of data, this can reduce the compressed data size
significantly. For example, with float data for which successive values
have been XOR-ed, the sign bit will very rarely change, the next 11 bits
(representing the exponent) and a few of the following bits (representing
the highest-significance digits) will typically be highly correlated, while
as we move to lower significance there will be less correlation. Thus, we
might shuffle the first 8 bits together, followed by the next 8, then the
next 4, the next 4, the next 2, and so on — decreasing the shuffle width as
we go. The `shuffle_widths` input might look like [8, 8, 4, 4, 2, 2, 1, 1,
1, 1, ...].
There are also some cases where we see correlation *increasing* again at
low significance. For example, if a number results from cancellation — the
subtraction of two numbers much larger than their difference — then its
lower-significance bits will be 0. If we then multiply that by some
integer (e.g., for normalization), there may be some very correlated but
nonzero pattern. In either case, compression might improve if the values
at the end of our shuffle_widths list increase.
Parameters
==========
shuffle_widths: list of integers
These integers represent the number of bits in each piece of each
number that is shuffled, starting from the highest significance, and
proceeding to the lowest. The sum of these numbers must be the total
bit width of the numbers that will be given as input — which must
currently be 8, 16, 32, or 64. There is no restriction on the
individual widths, but note that if they do not fit evenly into 8-bit
bytes, the result is unlikely to compress well.
forward: bool [defaults to True]
If True, the returned function will shuffle data; if False, the
returned function will reverse this process — unshuffle.
Returns
=======
shuffle_func: numba JIT function
This function takes just one parameter — the array to be shuffled — and
returns the shuffled array. Note that the input array *must* be flat
(have just one dimension), and will be viewed as an array of unsigned
integers of the input bit width. This can affect the shape of the
array and order of elements. You should ensure that this process will
result in an array of numbers in the order that you want. For example,
if you have a 2-d array of floats `a` that are more continuous along
the first dimension, you might pass `np.ravel(a.view(np.uint64), 'F')`,
where F represents Fortran order, which varies more quickly in the
first dimension.
"""
import numpy as np
import numba as nb
bit_width = np.sum(shuffle_widths, dtype=np.int64) # uint64 casts to float under floor division...
if bit_width not in [8, 16, 32, 64]:
raise ValueError(f"Total bit width must be one of [8, 16, 32, 64], not {bit_width}")
dtype = np.dtype(f"u{bit_width//8}")
bit_width = dtype.type(bit_width)
reversed_shuffle_widths = np.array(list(reversed(shuffle_widths)), dtype=dtype)
one = dtype.type(1)
if forward:
def shuffle(a):
a = a.view(dtype)
if a.ndim != 1:
raise ValueError(
"\nThis function only accepts flat arrays. Make sure you flatten "
"(using ravel, reshape, or flatten)\n in a way that keeps your data"
"contiguous in the order you want."
)
b = np.zeros_like(a)
b_array_bit = np.uint64(0)
for i, shuffle_width in enumerate(reversed_shuffle_widths):
mask_shift = np.sum(reversed_shuffle_widths[:i])
mask = dtype.type(2 ** shuffle_width - 1)
pieces_per_element = bit_width // shuffle_width
for a_array_index in range(a.size):
b_array_index = b_array_bit // bit_width
b_element_bit = dtype.type(b_array_bit % bit_width)
masked = (a[a_array_index] >> mask_shift) & mask
b[b_array_index] += masked << b_element_bit
if b_element_bit + shuffle_width > bit_width:
b[b_array_index + one] += masked >> (bit_width - b_element_bit)
b_array_bit += shuffle_width
return b
return nb.jit(shuffle)
else:
# This function is almost the same as above, except for:
# 1) swap a <-> b in input and output
# 2) reverse the effect of the line in which b was set from a
def unshuffle(b):
b = b.view(dtype)
if b.ndim != 1:
raise ValueError(
"\nThis function only accepts flat arrays. Make sure you flatten "
"(using ravel, reshape, or flatten)\n in a way that keeps your data"
"contiguous in the order you want."
)
a = np.zeros_like(b)
b_array_bit = dtype.type(0)
for i, shuffle_width in enumerate(reversed_shuffle_widths):
mask_shift = np.sum(reversed_shuffle_widths[:i])
mask = dtype.type(2 ** shuffle_width - 1)
pieces_per_element = bit_width // shuffle_width
for a_array_index in range(a.size):
b_array_index = b_array_bit // bit_width
b_element_bit = b_array_bit % bit_width
masked = (b[b_array_index] >> b_element_bit) & mask
a[a_array_index] += masked << mask_shift
if b_element_bit + shuffle_width > bit_width:
a[a_array_index] += (
(b[b_array_index + one] << (bit_width - b_element_bit)) & mask
) << mask_shift
b_array_bit += shuffle_width
return a
return nb.jit(unshuffle)