import numpy as np
import warnings
from json import dumps
__all__ = ["cover", "scoring_functions", "OrbitCover"]
def threshold_scores(scores, thresholds):
"""
Experimental shadowing metric
Parameters
----------
scores : np.ma.masked_array
An array of scores
thresholds : np.ndarray
Upper bound for the shadowing metric to be labeled a 'detection'
Returns
-------
mask : np.ndarray
An array which masks the regions of spacetime which do not represent detections of Orbits.
"""
if len(thresholds) == 1 and isinstance(*thresholds, tuple):
thresholds = tuple(*thresholds)
if thresholds.size == scores.shape[0]:
# if more than one threshold, assume that each subset of the score array is to be scored with
# a different threshold, i.e. score array shape (5, 32, 32) and 5 thresholds, can use broadcasting to score
# each (32, 32) slice independently.
thresholds = np.array(thresholds).reshape(
(-1, *tuple(1 for i in range(len(scores.shape) - 1)))
)
# Depending on the pivots supplied, some places in the scoring array may be empty; do not threshold these elements.
if isinstance(scores, np.ma.masked_array):
return scores.data > thresholds
else:
return scores > thresholds
class OrbitCover:
"""
Class which bundles all information and data relevant for covering/shadowing computations.
Parameters
----------
base : Orbit
The Orbit instance whose state is to be scanned.
windows: list, tuple or array
An iterable of orbits which we are looking for in the base state; the orbits shadowed.
thresholds : list, tuple, array
The values used to judge whether a shadowed orbit has been detected or not. Order respects the order of
window orbits.
hull : tuple, default None
Tuple whose elements are the maximum of the discretization in each dimension; see Notes for details
core : tuple, default None
Tuple whose elements are the minima of the discretization in each dimension; see Notes for details
padded_orbit : Orbit, default None
The padded version of `base`, it is allowed to be passed because it can be computationally expensive to
repeatedly produce.
coordinate_map : function, callable, default None
Maps an array of indices (i.e. that which would slice a describes a hypercube/rectangle/orthotope into
an arbitrary selection of indices. Allows for arbitrarily complex slicing of base fields. Common application
would be to map a hypercube into a parallelogram.
window_caching_function : function, callable
If the scoring function requires a calculation which is expensive but only needs to be performed once per
window (i.e. persistent homology of an orbit) then this argument can be used to propagate the result through
the covering/shadowing process.
scoring_method : function, callable or str, default 'l2_density_mfc'
Takes in slices of base orbit and window orbit fields and returns a scalar value.
return_oob : bool
If coordinate_map is used, then it is possible to have the computational domain be mapped completely outside
of the base orbit; to keep track of these points they are allowed to be returned as a list of tuples.
ignore_oob : bool, default True
If False, then warns the user that pivots are being mapped out of bounds.
replacement : bool, default False
If True then allows pivots to be scored more than once, even after a "detection" has been recorded.
min_overlap : float, default 1.
Defines the fraction of the window that is required to overlap the base orbit, required to be a value
within interval (0, 1].
target_cover_proportion :
The fraction of unmasked pivots that need to have shadowing detections recorded before the computation
is potentially terminated midway. Value should be in (0, 1] e.g. if target_cover_proportion is 0.9, then computation
is terminated after 90% of the pivots have orbits detected at them.
periodicity : tuple of bool
Indicates whether a dimension of the base orbit's field is periodic.
"""
def __init__(
self,
base,
windows,
thresholds,
hull=None,
core=None,
padded_orbit=None,
coordinate_map=None,
window_caching_function=None,
scoring_method="l2_density_mfc",
return_oob=False,
ignore_oob=True,
replacement=True,
min_overlap=1,
target_cover_proportion=1,
periodicity=(),
**kwargs,
):
self.base = base
if (
isinstance(windows, list)
or isinstance(windows, tuple)
or isinstance(windows, np.ndarray)
):
self.windows = np.array(windows)
else:
self.windows = np.array((windows,))
self.thresholds = np.array(thresholds).reshape((-1, *(len(base.shape) * (1,))))
self.target_cover_proportion = target_cover_proportion
self.hull = hull or tuple(
max([window.discretization[i] for window in windows])
for i in range(len(base.discretization))
)
self.core = core or tuple(
min([window.discretization[i] for window in windows])
for i in range(len(base.discretization))
)
if not periodicity:
self.periodicity = tuple([False] * len(base.discretization))
else:
self.periodicity = periodicity
if padded_orbit is None:
self.padded_orbit = _pad_orbit(base, self.hull, self.periodicity)
else:
self.padded_orbit = padded_orbit
scores = kwargs.get("scores", kwargs.get("_scores", None))
if scores is None:
scores = np.full((len(self.windows), *self.padded_orbit.shape), np.inf)
self._scores = np.ma.masked_array(
scores, mask=np.ones_like(scores, dtype=bool)
)
elif isinstance(scores, np.ma.masked_array):
self._scores = scores
elif isinstance(scores, np.array):
self._scores = np.ma.masked_array(
scores, mask=threshold_scores(scores, self.thresholds)
)
else:
raise TypeError(
f"If provided, scores must be of type {type(np.ma.masked_array())}"
)
self.coordinate_map = coordinate_map
self.window_caching_function = window_caching_function
self.min_overlap = min_overlap
self.scoring_function = scoring_functions(scoring_method)
self.return_oob = return_oob
self.ignore_oob = ignore_oob
self.replacement = replacement
self.cover_percentages = kwargs.get("cover_percentages", None)
if not self.replacement and (len(self.thresholds) != len(self.windows)):
raise ValueError(
"If scoring without replacement, need to have a threshold for each window orbit."
)
def __str__(self):
return self.__class__.__name__
def __repr__(self):
dict_ = {"base shape": self.base.shape, "windows": len(self.windows)}
# convert the dictionary to a string via json.dumps
dictstr = dumps(dict_)
return self.__class__.__name__ + "(" + dictstr + ")"
@property
def scores(self):
return self._scores
@scores.setter
def scores(self, array):
self._scores = array
def mask_invalid_scores(self):
"""
Masks scores which do not satisfy the provided threshold constraint.
"""
self.scores.mask = np.logical_or(
self.scores.mask, self.scores > self.thresholds
)
return None
def trim(self, with_padding=False, masked=True):
"""
Remove all unscored pivots (i.e. padding) from the score arrays. NOT necessarily the same shape as base orbit.
"""
assert self.scores is not None, "cannot trim an empty set of scores."
if with_padding:
trimmed_scores = self.scores[
(
slice(None),
*tuple(
slice(hull_size - 1, -(hull_size - 1))
for hull_size in self.hull
),
)
]
if masked:
return trimmed_scores
else:
return trimmed_scores.data
else:
# If a mask was applied on a subdomain, return the region of spacetime scores which have finite values.
maximal_set_of_pivots = pivot_iterator(
self.padded_orbit.shape,
self.base.shape,
self.hull,
self.core,
self.periodicity,
False,
min_overlap=self.min_overlap,
)
maximal_pivot_slices = tuple(
slice(axis.min(), axis.max() + 1) for axis in maximal_set_of_pivots.T
)
trimmed_scores = self.scores[(slice(None), *maximal_pivot_slices)]
if masked:
return trimmed_scores
else:
return trimmed_scores.data
def map(self, masked=True, verbose=False):
""" Map scores representing detections back onto the spatiotemporal tile from the original base orbit.
Parameters
----------
masked: bool
If True, then only map the regions of space-time where scores meet threshold.
verbose : bool
Whether to print '-' as a form of crude progress bar
Returns
-------
ndarray :
The scores at each pivot and their mapping onto an array the same shape as `base_orbit.state`
Notes
-----
This takes the scores returned by :func:`cover` and converts them to the appropriate orbit sized array)
"""
oob_pivots = []
# Only have to iterate once per unique discretization shape, take advantage of this.
all_window_shapes = [tuple(w.discretization) for w in self.windows]
# The bases orbit periodicity has to do with scoring and whether or not to wrap windows around.
mapped_orbit_scores = np.full_like(self.scores, np.inf)
# array_of_window_shapes = np.array(all_window_shapes)
unique_window_shapes = set(all_window_shapes)
# By iterating over shapes and not windows, cut down on
for index, window_shape in enumerate(unique_window_shapes):
window_grid = np.indices(window_shape)
# Do not want to map invalid scores; scores.mask.all(axis=0) is True when there are no scores beneath threshold
if (self.scores.mask.size - self.scores.mask.sum()) > 0:
if masked:
mapping_mask = np.all(
threshold_scores(self.scores, self.thresholds), axis=0
)
else:
# even if we want unmasked, having no finite values leads to redundant computations.
mapping_mask = np.all(self.scores.mask, axis=0)
ordered_pivots = pivot_iterator(
self.padded_orbit.shape,
self.base.shape,
self.hull,
self.core,
self.periodicity,
mapping_mask,
min_overlap=self.min_overlap,
)
for i, each_pivot in enumerate(ordered_pivots):
each_pivot = tuple(each_pivot)
if verbose:
if i != 0 and i % max([1, len(ordered_pivots) // 10]) == 0:
print("-", end="")
orbit_coordinates = _subdomain_coordinates(
each_pivot,
self.base.shape,
window_shape,
window_grid,
self.hull,
self.periodicity,
coordinate_map=self.coordinate_map,
)
if np.size(orbit_coordinates) > 0:
# for window_idx in where_this_shape:
best_score_idx = np.argmin(
self.scores[(slice(None), *each_pivot)]
)
pivot_score = self.scores[(best_score_idx, *each_pivot)]
# easier to just check scores instead of masks.
if pivot_score <= self.thresholds[best_score_idx]:
filling_window = mapped_orbit_scores[
(best_score_idx, *orbit_coordinates)
]
filling_window[filling_window > pivot_score] = pivot_score
mapped_orbit_scores[
(best_score_idx, *orbit_coordinates)
] = filling_window
if len(oob_pivots) > 0 and not self.ignore_oob:
warn_str = " ".join(
[
f"\n{len(oob_pivots)} shadowing pivots unable to be scored for one or more windows because all",
f"subdomain coordinates were mapped out-of-bounds. To disable this message and return the"
f" out-of-bounds pivots along with the scores, oob_pivots, set return_oob=True or ignore_oob=True",
]
)
warnings.warn(warn_str, RuntimeWarning)
trimmed_mapped_orbit_scores = mapped_orbit_scores[
(
slice(None),
*tuple(
slice(hull_size - 1, -(hull_size - 1)) for hull_size in self.hull
),
)
]
return trimmed_mapped_orbit_scores
def minimal_covering_set(self, cover_threshold=0.99, verbose=False):
"""
Find the smallest number of masks which cover a specified proportion of the total cover area.
"""
assert (
isinstance(cover_threshold, float) and cover_threshold < 1
), "cover threshold must be provided as a float between (0, 1)."
cover_percentages = {}
# Find the orbit mask with the largest covering.
detections = np.invert(self.scores.mask).sum(
axis=tuple(range(1, len(self.scores.mask.shape)))
)
next_best_orbit_idx = np.argmax(detections)
minimal_cover_inclusion_flags = np.zeros(self.scores.mask.shape[0], dtype=bool)
minimal_cover_inclusion_flags[next_best_orbit_idx] = True
# percentage of unmasked points accounted for.
valid_pivots = np.invert(self.scores.mask)
total_valid_pivots = np.any(valid_pivots, axis=0).astype(bool).sum()
minimal_cover = valid_pivots[minimal_cover_inclusion_flags, ...]
cover_percentages[next_best_orbit_idx] = (
valid_pivots[next_best_orbit_idx, ...].sum() / total_valid_pivots
)
total_cover_percentage = (
np.any(minimal_cover, axis=0).astype(bool).sum() / total_valid_pivots
)
while total_cover_percentage < cover_threshold:
# The next-best orbit cover to add is the one which covers the most uncovered area.
covered = np.any(minimal_cover, axis=0)
currently_uncovered = np.logical_and(
valid_pivots, np.invert(covered).reshape((1, *covered.shape))
)
# Find the next best mask to add by totaling the number of points it covers which are not in minimal cover yet.
next_greatest_contribution_index = np.argmax(
currently_uncovered.sum(axis=tuple(range(1, len(self.base.shape) + 1)))
)
# demarcate the newer member of the cover.
minimal_cover_inclusion_flags[next_greatest_contribution_index] = True
minimal_cover = valid_pivots[minimal_cover_inclusion_flags, ...]
cover_percentages[next_greatest_contribution_index] = (
valid_pivots[next_greatest_contribution_index, ...].sum()
/ total_valid_pivots
)
total_cover_percentage = (
np.any(minimal_cover, axis=0).astype(bool).sum() / total_valid_pivots
)
if verbose:
print(sum(minimal_cover_inclusion_flags), total_cover_percentage)
minimal_orbitcover = OrbitCover(
**{
**vars(self),
"scores": self.scores[minimal_cover_inclusion_flags, ...],
"thresholds": self.thresholds[minimal_cover_inclusion_flags, ...],
"windows": self.windows[minimal_cover_inclusion_flags, ...],
"cover_percentages": cover_percentages,
}
)
return minimal_orbitcover
def best_subset(self, n):
# Find the orbit mask with the largest covering.
detections = np.invert(self.scores.mask).sum(
axis=tuple(range(1, len(self.scores.shape)))
)
best_n = np.argsort(detections)[-n:]
best_n_masked_scores = self.scores[best_n, ...]
cover_subset = OrbitCover(
**{
**vars(self),
"scores": best_n_masked_scores,
"thresholds": self.thresholds[best_n, ...],
"windows": self.windows[best_n, ...],
}
)
cover_subset.thresholds = np.array(cover_subset.thresholds).reshape(
(-1, *(len(self.base.shape) * (1,)))
)
return cover_subset
def scoring_functions(method):
"""
Return a callable to act as shadowing scoring metric.
Parameters
----------
method : str
The name of the callable to return.
Returns
-------
callable :
function which takes arguments `(base_slice, window, *args, **kwargs)` and returns a (scalar) score.
comparing the base slice and window. `base_slice` and `window` are arrays.
"""
if method == "amplitude":
func_ = amplitude_difference
elif method == "l2":
func_ = l2_difference
elif method == "l2_density":
func_ = l2_difference_density
elif method == "l2_mfc":
func_ = l2_difference_mean_flow_correction
elif method == "l2_density_mfc":
func_ = l2_difference_mean_flow_correction_density
elif method == "masked_l2_density":
func_ = masked_l2_difference_density
elif method == "masked_l2_density_mfc":
func_ = masked_l2_difference_mean_flow_correction_density
else:
raise ValueError(
f"name {method} of scoring function not in methods provided by orbithunter; define\n"
f"callable externally if still desired to be passed to shadowing functions."
)
return func_
def amplitude_difference(base_slice, window, *args, **kwargs):
"""
Experimental shadowing metric
Parameters
----------
base_slice : Orbit
Orbit whose state is the same shape as `window`
window : Orbit
Orbit whose state is the same shape as `window`
Returns
-------
The 'amplitude difference' metric between `base_slice` and `window`
"""
return np.sum(
np.abs(base_slice ** 2 - window ** 2),
axis=tuple(range(1, len(base_slice.shape) + 1)),
)
def l2_difference(base_slice, window, *args, **kwargs):
"""
Experimental shadowing metric
Parameters
----------
base_slice : Orbit
Orbit whose state is the same shape as `window`
window : Orbit
Orbit whose state is the same shape as `window`
Returns
-------
The 'amplitude difference' metric between `base_slice` and `window`
"""
# account for local mean flow by normalization; windows have zero mean flow by definition
return np.linalg.norm(
base_slice - window, axis=tuple(range(1, len(base_slice.shape) + 1))
)
def l2_difference_density(base_slice, window, *args, **kwargs):
"""
Experimental shadowing metric
Parameters
----------
base_slice : Orbit
Orbit whose state is the same shape as `window`
window : Orbit
Orbit whose state is the same shape as `window`
Returns
-------
The 'amplitude difference' metric between `base_slice` and `window`
"""
# account for local mean flow by normalization; windows have zero mean flow by definition
norm = np.linalg.norm(
base_slice - window, axis=tuple(range(1, len(base_slice.shape) + 1))
)
norm_density = norm / base_slice.size
return norm_density
def masked_l2_difference_density(base_slice, window, *args, **kwargs):
"""
Experimental shadowing metric
Parameters
----------
base_slice : Orbit
Orbit whose state is the same shape as `window`
window : Orbit
Orbit whose state is the same shape as `window`
Returns
-------
norm_density : float
The density of $L_2$ difference score
"""
base_mask = base_slice.copy().astype(bool)
norm = np.linalg.norm(
base_slice[base_mask] - window[base_mask],
axis=tuple(range(1, len(base_slice.shape) + 1)),
)
norm_density = norm / max([1, base_mask.sum()])
return norm_density
def masked_l2_difference_mean_flow_correction_density(
base_slice, window, *args, **kwargs
):
"""
Experimental shadowing metric
Parameters
----------
base_slice : Orbit
Orbit whose state is the same shape as `window`
window : Orbit
Orbit whose state is the same shape as `window`
Returns
-------
norm_density : float
The density of $L_2$ difference score
"""
base_mask = base_slice.copy().astype(bool)
# account for local mean flow by normalization; windows have zero mean flow by definition
norm = np.linalg.norm(
(base_slice[base_mask] - base_slice[base_mask].mean()) - window[base_mask],
axis=tuple(range(1, len(base_slice.shape) + 1)),
)
norm_density = norm / max([1, base_mask.sum()])
return norm_density
def l2_difference_mean_flow_correction(base_slice, window, *args, **kwargs):
"""
Experimental shadowing metric
Parameters
----------
base_slice : Orbit
Orbit whose state is the same shape as `window`
window : Orbit
Orbit whose state is the same shape as `window`
Returns
-------
float :
$L_2$ difference between normalized base slice and window.
"""
# account for local mean flow by normalization; windows have zero mean flow by definition
return np.linalg.norm(
(base_slice - base_slice.mean())[np.newaxis, ...] - window,
axis=tuple(range(1, len(base_slice.shape) + 1)),
)
def l2_difference_mean_flow_correction_density(base_slice, window, *args, **kwargs):
"""
Experimental shadowing metric
Parameters
----------
base_slice : Orbit
Orbit whose state is the same shape as `window`
window : Orbit
Orbit whose state is the same shape as `window`
Returns
-------
norm_density : float
The density of $L_2$ difference score between normalized base slice and window.
"""
# account for local mean flow by normalization; windows have zero mean flow by definition
norm = np.linalg.norm(
(base_slice - base_slice.mean()) - window,
axis=tuple(range(1, len(base_slice.shape) + 1)),
)
norm_density = norm / base_slice.size
return norm_density
def pivot_iterator(
pivot_array_shape, base_shape, hull, core, periodicity, mask, min_overlap=1,
):
"""
Generator for the valid window pivots for shadowing metric evaluation
Parameters
----------
pivot_array_shape : tuple of int
Shape of the numpy array corresponding to the padded base orbit
base_shape : tuple of int
Shape of the numpy array corresponding to the base orbit
hull : tuple of int
Shape of the convex hull of a set of windows (largest size in each dimension).
core : tuple of int
Shape of smallest discretization sizes in each dimension.
periodicity : tuple of bool
Elements indicate if corresponding axis is periodic. i.e. (True, False) implies axis=0 is periodic.
kwargs : dict
Keyword arguments for when user provides masking or pivot information.
mask : np.ndarray or None
numpy array, dtype boolean which indicates which pivots to mask. values of True imply exclusion of pivot.
pivots : iterable
Either a generator or iterable container containing the (unmasked) pivot positions to iterate over
scanning_region : str
Default 'all', if equals 'interior' then only permits pivots within the original base orbit's span.
Yields
------
tuple :
Contains the positions of unmasked pivots, for whom scores will be computed.
Notes
-----
Instead of constructing a generator for the desired pivots directly, it is easier to create the
iterator for all points and then subset it using a mask, especially in the context where inside-out iteration
is desired.
Could likely reduce memory costs by using np.ndindex and not np.indices but it would likely be a speed trade-off
Instead of returning a generator, yield from is used; in case of errors this function will be in the traceback
The masking that occurs ensures that every pivot which is scored, is scored with respect to ALL windows
"""
pivot_coordinates = np.indices(pivot_array_shape)
for axis_coord, base_size, hull_size, core_size, periodic in zip(
pivot_coordinates, base_shape, hull, core, periodicity
):
pad_size = hull_size - 1
if periodic:
mask = np.logical_or(mask, axis_coord < pad_size)
mask = np.logical_or(mask, axis_coord >= base_size + pad_size)
else:
# The minimum pivot can't be further from boundary than window size, else some windows will be
# completely out of bounds.
mask = np.logical_or(
mask, axis_coord < pad_size - int((1 - min_overlap) * (core_size - 1))
)
# The maximum can't be determined by window size, as smaller windows can get "closer" to the edge;
# this leads to results where only a subset of windows can provide scores, which is undesirable.
mask = np.logical_or(
mask,
axis_coord
>= (base_size + pad_size) - int(min_overlap * (hull_size - 1)),
)
truncated_pivots = []
for axis_coord, base_size, hull_size, periodic in zip(
pivot_coordinates, base_shape, hull, periodicity
):
truncated_pivots.append(axis_coord[~mask].reshape(-1, 1))
# clean up and remove the potentially very large array.
del pivot_coordinates
# If there are no pivots to order, then we can quit here.
if np.array(truncated_pivots).flatten().size > 0:
# order the pivots so they follow an inside-out approach.
truncated_pivots = np.concatenate(truncated_pivots, axis=-1)
approximate_centroid = truncated_pivots.max(axis=0) // 2
ordered_pivots = truncated_pivots[
np.argsort(np.sum(np.abs(approximate_centroid - truncated_pivots), axis=-1))
]
return ordered_pivots
else:
# so return type are identical, cast as array
return np.array(truncated_pivots).flatten()
def _subdomain_slices(
pivot, base_shape, window_shape, hull, periodicity, coordinate_map=None
):
"""
The slices for the window at the current pivot before any transformations.
Parameters
----------
pivot : tuple of int
Shape of the numpy array corresponding to the padded base orbit
base_orbit : Orbit
The orbit being scanned
window_orbit : Orbit
The orbit being used to scan with
hull : tuple of int
Convex hull of the set of windows of the current run.
periodicity : tuple of bool
Elements indicate if corresponding axis is periodic. i.e. (True, False) implies axis=0 is periodic.
Returns
-------
tuple, tuple
The slices which produce the correct subdomains of the base orbit and window orbit for
the shadowing metric function.
"""
base_slices = []
window_slices = []
for piv, base_size, span, hull_size, periodic in zip(
pivot, base_shape, window_shape, hull, periodicity
):
pad_size = hull_size - 1
# This allows generalization of rectangular to more nonlinear domains via functional mappings
if coordinate_map is None and not periodic:
base_start = max([piv, pad_size])
base_end = min([piv + span, base_size + pad_size])
base_slices.append(slice(base_start, base_end))
# base_start - piv represents the starting point after shaving off points in the boundary.
window_start = base_start - piv
window_end = window_start + base_end - base_start
window_slices.append(slice(window_start, window_end))
else:
base_slices.append(slice(piv, piv + span))
window_slices.append(slice(None))
return tuple(base_slices), tuple(window_slices)
def _subdomain_windows(
pivot,
base_shape,
window_shape,
window_grid,
hull,
periodicity,
/,
coordinate_map=None,
**kwargs,
):
"""
Parameters
----------
pivot : tuple of int
Shape of the numpy array corresponding to the padded base orbit
base_orbit : Orbit
The orbit being scanned
window_orbit : Orbit
The orbit being used to scan with
hull : tuple of int
Convex hull of the set of windows of the current run.
hull_grid : tuple of ndarray
Equal to np.indices(hull); saved because it is re-used constantly.
periodicity : tuple of bool
Elements indicate if corresponding axis is periodic. i.e. (True, False) implies axis=0 is periodic.
kwargs : dict
Keyword arguments for when user provides masking or pivot information.
mask : np.ndarray or None
numpy array, dtype boolean which indicates which pivots to mask. values of True imply exclusion of pivot.
pivots : iterable
Either a generator or iterable container containing the (unmasked) pivot positions to iterate over
scanning_region : str
Default 'all', if equals 'interior' then only permits pivots within the original base orbit's span.
coordinate_map : callable
A function which maps a "rectangular" array shape into the desired shadowing shape
(typically parallelipiped)
"""
base_slices, window_slices = _subdomain_slices(
pivot,
base_shape,
window_shape,
hull,
periodicity,
coordinate_map=coordinate_map,
)
if coordinate_map is not None:
broadcasting_shaped_pivot = np.array(pivot).reshape(
len(pivot), *tuple(len(pivot) * [1])
)
window_grid = window_grid[(slice(None), *window_slices)]
# Add the pivot to translate the pivot; for proper broadcasting this requires reshaping
subdomain_grid = window_grid + broadcasting_shaped_pivot
# Specify which type of coordinates are manipulated explicitly, may be unused, depending on coordinate_map
mapped_subdomain_grid = coordinate_map(subdomain_grid, base=True, **kwargs)
mapped_window_grid = coordinate_map(window_grid, window=True, **kwargs)
submask = np.zeros(mapped_subdomain_grid.shape[1:], dtype=bool)
for sub_coords, base_size, window_size, hull_size, periodic in zip(
mapped_subdomain_grid, base_shape, window_shape, hull, periodicity
):
pad_size = hull_size - 1
if periodic:
sub_coords[sub_coords >= (base_size + pad_size)] -= base_size
sub_coords[sub_coords < pad_size] += base_size
else:
# This type of masking was used for pivots but now because we are mapping coordinates we have
# to recheck. The bounds are different for pivots and actual coordinates.
submask = np.logical_or(submask, sub_coords < pad_size)
submask = np.logical_or(submask, sub_coords >= base_size + pad_size)
subdomain_coordinates = tuple(
c[~submask].ravel() for c in mapped_subdomain_grid
)
window_coordinates = tuple(c[~submask].ravel() for c in mapped_window_grid)
base_indexer = subdomain_coordinates
window_indexer = window_coordinates
else:
base_indexer = base_slices
window_indexer = window_slices
return base_indexer, window_indexer
def _subdomain_coordinates(
pivot,
base_shape,
window_shape,
hull_grid,
hull,
periodicity,
/,
coordinate_map=None,
**kwargs,
):
"""
Parameters
----------
pivot : tuple of int
Shape of the numpy array corresponding to the padded base orbit
base_orbit : Orbit
The orbit being scanned
window_orbit : Orbit
The orbit being used to scan with
hull : tuple of int
Convex hull of the set of windows of the current run.
hull_grid : tuple of ndarray
Equal to np.indices(hull); saved because it is re-used constantly.
periodicity : tuple of bool
Elements indicate if corresponding axis is periodic. i.e. (True, False) implies axis=0 is periodic.
kwargs : dict
Keyword arguments for when user provides masking or pivot information.
mask : np.ndarray or None
numpy array, dtype boolean which indicates which pivots to mask. values of True imply exclusion of pivot.
pivots : iterable
Either a generator or iterable container containing the (unmasked) pivot positions to iterate over
scanning_region : str
Default 'all', if equals 'interior' then only permits pivots within the original base orbit's span.
"""
base_slices, window_slices = _subdomain_slices(
pivot,
base_shape,
window_shape,
hull,
periodicity,
coordinate_map=coordinate_map,
)
broadcasting_shaped_pivot = np.array(pivot).reshape(
len(pivot), *tuple(len(pivot) * [1])
)
# Get the subdomain's coordinates with respect to the padded array, if returning more than the pivot scores
if coordinate_map is not None:
broadcasting_shaped_pivot = np.array(pivot).reshape(
len(pivot), *tuple(len(pivot) * [1])
)
window_grid = hull_grid[(slice(None), *window_slices)]
subdomain_grid = window_grid + broadcasting_shaped_pivot
# Add the pivot to translate the pivot; for proper broadcasting this requires reshaping
mapped_subdomain_grid = coordinate_map(subdomain_grid, base=True, **kwargs)
submask = np.zeros(mapped_subdomain_grid.shape[1:], dtype=bool)
for sub_coords, base_size, window_size, hull_size, periodic in zip(
mapped_subdomain_grid, base_shape, window_shape, hull, periodicity,
):
pad_size = hull_size - 1
if periodic:
sub_coords[sub_coords >= (base_size + pad_size)] -= base_size
sub_coords[sub_coords < pad_size] += base_size
else:
# This type of masking was used for pivots but now because we are mapping coordinates we have
# to recheck. The bounds are different for pivots and actual coordinates.
submask = np.logical_or(submask, sub_coords < pad_size)
submask = np.logical_or(submask, sub_coords >= base_size + pad_size)
subdomain_coordinates = tuple(
c[~submask].ravel() for c in mapped_subdomain_grid
)
else:
subdomain_grid = (
hull_grid[(slice(None), *tuple(window_slices))] + broadcasting_shaped_pivot
)
# if all boundaries are aperiodic, then we are done.
if True in periodicity:
for sub_coords, base_size, window_size, hull_size, periodic in zip(
subdomain_grid, base_shape, window_shape, hull, periodicity,
):
pad_size = hull_size - 1
if periodic:
sub_coords[sub_coords >= (base_size + pad_size)] -= base_size
sub_coords[sub_coords < pad_size] += base_size
# unravel and cast as tuples for basic and not advanced indexing
subdomain_coordinates = tuple(c.ravel() for c in subdomain_grid)
return subdomain_coordinates
def _pad_orbit(base_orbit, padding_dim, periodicity, aperiodic_mode="constant"):
"""
Pad the base orbit with the convex hull of the set of scanning windows.
Parameters
----------
base_orbit : Orbit
The Orbit to be padded (and scanned)
hull : tuple of int
The shape of the convex hull of the set of windows
periodicity : tuple of bool
Flags indicating whether or not base_orbit state array axes represent periodic dimensions or not.
aperiodic_mode : str or function
How to treat padding of aperiodic dimensions.
Returns
-------
padded_orbit : Orbit
An orbit whose state array has been padded with zeros (aperiodic dimensions) or padded by wrapping values
around the array (periodic dimensions).
"""
# This looks redundant but it is not because two padding calls with different modes are required.
periodic_padding = tuple(
(pad - 1, pad - 1) if bc else (0, 0)
for bc, pad in zip(periodicity, padding_dim)
)
aperiodic_padding = tuple(
(pad - 1, pad - 1) if not bc else (0, 0)
for bc, pad in zip(periodicity, padding_dim)
)
# Pad the base orbit state to use for computing scores, create the score array which will contain each pivot's score
padded_state = np.pad(
np.pad(base_orbit.state, periodic_padding, mode="wrap"),
aperiodic_padding,
mode=aperiodic_mode,
)
padded_orbit = base_orbit.__class__(
**{
**vars(base_orbit),
"state": padded_state,
"discretization": padded_state.shape,
}
)
return padded_orbit
[docs]def cover(orbit_cover, verbose=False, **kwargs):
"""
Function to perform multiple shadowing computations given a collection of orbits.
Parameters
----------
orbit_cover : OrbitCover
Object which contains the base orbit, window orbits, thresholds, and everything else.
Returns
-------
tuple : (orbit_cover, np.ndarray, [np.ndarray, np.ndarray])
NumPy arrays whose indices along the first axis correspond to the window orbits' position in ```window_orbits```
and whose other dimensions consist of scores or masking values. If return_pivot_arrays==True then also
return the score and masking arrays for the pivots in addition to the orbit scores and pivots.
Notes
-----
If a pivot mask is not provided, then any elements of the score array which are beneath threshold are not recalculated.
"""
thresholds = np.array(orbit_cover.thresholds)
window_orbits = np.array(orbit_cover.windows)
base_orbit = orbit_cover.base
# Only have to iterate once per unique discretization shape, take advantage of this.
all_window_shapes = [tuple(w.discretization) for w in window_orbits]
# array_of_window_shapes = np.array(all_window_shapes)
unique_window_shapes = set(all_window_shapes)
# Masking array is used for very specific or multi-part computations.
periodicity = orbit_cover.periodicity
padded_orbit = orbit_cover.padded_orbit
oob_pivots = []
# Score mask is True if scores are INVALID. If they are ALL invalid, then we do NOT want to mask them
# in the scoring process, hence the pivot should be False
pivot_mask = kwargs.get(
"pivot_mask", np.invert(orbit_cover.scores.mask.all(axis=0))
)
if pivot_mask.shape != padded_orbit.shape:
raise ValueError("Pivot mask must be same shape are padded orbit array.")
for index, window_shape in enumerate(unique_window_shapes):
where_this_shape = tuple(
i for i, shape in enumerate(all_window_shapes) if shape == window_shape
)
relevant_window_states = np.array(
[window_orbits[i].state for i in where_this_shape]
)
# retrieve the function for scoring, using a default function based on the l2 difference.
scoring_function = kwargs.get(
"scoring_function", l2_difference_mean_flow_correction_density
)
# Sometimes there are metrics, like bottleneck distance between persistence diagrams,
# which need only be computed once per window. To avoid redundant calculations, cache this result.
# if orbit_cover.window_caching_function is not None:
ordered_pivots = pivot_iterator(
orbit_cover.scores.shape[1:],
base_orbit.shape,
orbit_cover.hull,
orbit_cover.core,
periodicity,
pivot_mask,
min_overlap=orbit_cover.min_overlap,
)
for w_dim, b_dim in zip(window_shape, base_orbit.shape):
assert (
w_dim < b_dim
), "Shadowing window discretization is larger than the base orbit. resize first. "
window_grid = np.indices(window_shape)
window_oob_pivots = []
for i, each_pivot in enumerate(ordered_pivots):
each_pivot = tuple(each_pivot)
score_mask_slicer = (np.array(where_this_shape), *each_pivot)
if verbose:
if i != 0 and i % max([1, len(ordered_pivots) // 10]) == 0:
print("-", end="")
base_indexer, window_indexer = _subdomain_windows(
each_pivot,
base_orbit.shape,
window_shape,
window_grid,
orbit_cover.hull,
periodicity,
coordinate_map=orbit_cover.coordinate_map,
**kwargs,
)
base_subdomain = padded_orbit.state[base_indexer]
window_subdomains = relevant_window_states[(slice(None), *window_indexer)]
# if subdomains are empty, there is nothing to score.
if not base_subdomain.size > 0 or not window_subdomains[0].size > 0:
if orbit_cover.return_oob:
window_oob_pivots.append(each_pivot)
else:
orbit_cover.scores[score_mask_slicer] = scoring_function(
base_subdomain, window_subdomains, **kwargs
)
# Masking the scores at this point means that pivots won't be scored multiple times if they satisfy the
# corresponding threshold.
orbit_cover.scores.mask[np.array(where_this_shape), ...] = threshold_scores(
orbit_cover.scores[np.array(where_this_shape), ...],
thresholds[np.array(where_this_shape)],
)
if not orbit_cover.replacement:
pivot_mask = np.logical_or(
pivot_mask,
np.invert(orbit_cover.scores.mask[np.array(where_this_shape), ...]).any(
axis=0
),
)
if len(window_oob_pivots) > 0 and not orbit_cover.ignore_oob:
warn_str = " ".join(
[
f"\n{len(window_oob_pivots)} shadowing pivot unable to be scored for one or more windows because all",
f"subdomain coordinates were mapped out-of-bounds. To disable this message and return the"
f" out-of-bounds pivots along with the scores, oob_pivots, set return_oob=True or ignore_oob=True",
]
)
warnings.warn(warn_str, RuntimeWarning)
if orbit_cover.return_oob:
orbit_cover.oob_pivots = oob_pivots
return orbit_cover