from math import pi
from ..core import Orbit
from scipy.fft import rfft, irfft, rfftfreq
from scipy.linalg import block_diag
from scipy.sparse.linalg import LinearOperator
from mpl_toolkits.axes_grid1 import make_axes_locatable
from functools import lru_cache
import os
import numpy as np
import matplotlib.pyplot as plt
import warnings
__all__ = [
"OrbitKS",
"RelativeOrbitKS",
"ShiftReflectionOrbitKS",
"AntisymmetricOrbitKS",
"EquilibriumOrbitKS",
"RelativeEquilibriumOrbitKS",
]
[docs]
class OrbitKS(Orbit):
"""
Base class for orbits of the Kuramoto-Sivashinsky equation.
The Kuramoto-Sivashinsky equation is a fourth order partial differential equation that serves as a simplified
testing ground for the more complicated Navier-Stokes equation. It's form in configuration space, where the
state variable :math:`u(t,x)` is typically imagined as a velocity field of a laminar flame front.
It's spatiotemporal version with doubly periodic boundary conditions takes the form
$u_t + u_{xx} + u_{xxxx} + 1/2(u^2)_x = 0$
with boundary conditions
$u(t, x) = u(t + T, x) = u(t, x+L) = u(t+T, x+L)$.
This class and its subclasses is used to find solutions to the system of differential algebraic equations (DAEs)
which result from applying a discrete Fourier transform in both space and time.
The 'state' in configuration space is ordered such that when in the physical basis, the last row
corresponds to 't=0'. This results in an extra negative sign when computing time derivatives.
This convention was chosen because it is conventional to display positive time as 'up'. This convention prevents
errors due to flipping fields up and down.
To define an orbit, the configuration space (spatiotemporal dimensions) or tile must be defined. The
unconventional approach of this package is to keep these domain dimensions as free variables.
The only additional parameter beyond the dimensions is a spatial shift parameter for solutions with continuous
spatial translation symmetry; it only applies to :class:`orbithunter.ks.orbits.RelativeOrbitKS`
and :class:`orbithunter.ks.orbits.RelativeEquilibriumOrbitKS`. Its inclusion in the class
:class:`orbithunter.ks.orbits.OrbitKS` is due to the ability to convert between Orbit types.
The various subclasses represent symmetry invariant subspaces. Due to the nature of
subspaces, it is numerically possible to find, for example, solutions with spatial reflection symmetry using
OrbitKS. The discrete symmetry invariant orbits are literaly subspaces of solutions; any subclass member
can be found using its parent class.
Historically, only `adj` and `lstsq` were used, in combination, for OrbitKS and its subclasses:
All possible methods include:
- 'adj'
- 'newton_descent'
- 'lstsq'
- 'lsqr'
- 'lsmr'
- 'bicg'
- 'bicgstab'
- 'gmres'
- 'lgmres'
- 'cg'
- 'cgs'
- 'qmr'
- 'minres'
- 'gcrotmk'
- 'cg_min'
- 'bfgs'
- 'newton-cg'
- 'l-bfgs-b'
- 'tns'
- 'slsqp'
.. warning::
If dimensions change by dramatic/nonsensible amounts then preconditioning=True can be used with certain methods
(most notably, 'adj') to account for very large parameter gradients.
.. warning::
The following are supported but NOT recommended for the KSE.
- 'nelder-mead' (very very slow)
- 'powell' (very slow)
- 'cobyla' (slow),
See Also
--------
:class:`orbithunter.core.Orbit`
"""
[docs]
def __init__(
self,
state=None,
basis=None,
parameters=None,
discretization=None,
constraints=None,
**kwargs,
):
super().__init__(
state=state,
basis=basis,
parameters=parameters,
discretization=discretization,
constraints=constraints,
**kwargs,
)
# Explicit casting during construction; too hard to generalize to base class
# because technically complex types allows (e.g. parameter arrays)
if self.parameters:
self.parameters = tuple(float(p) for p in self.parameters)
[docs]
def periodic_dimensions(self):
"""
Bools indicating whether or not dimension is periodic.
Returns
-------
tuple of bool
Flags for whether dimensions are periodic or not.
Notes
-----
Non-static method to match the signature of :class:`orbithunter.core.Orbit`
"""
return True, True
[docs]
@staticmethod
def _default_shape():
"""
The default array shape when dimensions are not specified.
Returns
-------
tuple of int :
Tuple containing the default shape.
Notes
-----
(32, 32) is used because this is the shape that generally works for small tile sizes.
"""
return 32, 32
[docs]
@staticmethod
def minimal_shape_increments():
"""
The smallest valid increment to change the discretization by.
Returns
-------
tuple of int :
The smallest valid increments to changes in discretization size which retain all functionality.
"""
return 2, 2
[docs]
@staticmethod
def minimal_shape():
"""
The smallest possible compatible discretization to have full functionality.
Returns
-------
tuple of int :
The minimal shape for this class.
"""
return 2, 4
[docs]
@staticmethod
def bases_labels():
"""
Labels of the different bases produced by transforms.
Returns
-------
tuple of str
"""
return "field", "spatial_modes", "modes"
[docs]
@staticmethod
def discretization_labels():
"""
Strings to use to label dimensions/periods
Returns
-------
tuple of str
The labels for each dimension's number of collocation points (dimension in field basis)
"""
return "n", "m"
[docs]
@staticmethod
def parameter_labels():
"""
Labels of all parameters
Returns
-------
tuple of str
Notes
-----
The parameter 's' is never used outside of RelativeOrbitKS and RelativeEquilibriumKS, however, it is included
for compatibility between conversion between symmetry types.
"""
return "t", "x", "s"
[docs]
@staticmethod
def dimension_labels():
"""
Strings to use to label dimensions/periods.
"""
return "t", "x"
def from_numpy_array(self, cdof, *args, **kwargs):
"""
Utility to convert from numpy array (orbit_vector) to Orbit instance for scipy wrappers.
Parameters
----------
cdof : ndarray
Vector with (spatiotemporal basis) state values and parameters.
kwargs :
parameters : tuple
If parameters from another Orbit instance are provided, overwrite the values within the orbit_vector
parameter_constraints : dict
constraint dictionary, keys are parameter_labels, values are bools
subclass kwargs : dict
If special kwargs are required/desired for Orbit instantiation.
Returns
-------
Orbit instance :
Orbit instance whose state and parameters are extracted from the input orbit_vector.
Notes
-----
Overwritten version for OrbitKS which casts to real-valued state variables and parameters.
This function is mainly to retrieve output from (scipy) optimization methods and convert it back into Orbit
instances. Because constrained parameters are not included in the optimization process, this is not
as simple as merely slicing the parameters from the array, as the order of the elements is determined by
the constraints. If non-scalar parameters are used, user will need to overwrite the Orbit.from_numpy_array() method.
In order to completely specific the equations in the presence of constraints it is necessary to
.. warning::
If equation has components for any parameters:
However, if the equations of motion have components for the parameters; this will in correctly overwrite
the components stored in parameters and this will need an overwrite.
"""
# The parameters and possible constants are expected to be ordered
# parameters_list = list(cdof.ravel()[self.size:])
parameters_list = [p for p in cdof.ravel()[self.size :].real]
constants_list = list(args)
# The issue with parsing the parameters is that we do not know which list element corresponds to
# which parameter unless the constraints are checked. Parameter keys which are not in the constraints dict
# are assumed to be constrained. Pop from param_list if parameters 1. exist, 2. are unconstrained.
# Not having enough parameters to pop means something is going wrong in your matvec/rmatvec functions typically.
if self.parameters is not None:
parameters = tuple(
(
parameters_list.pop(0)
if (
not self.constraints.get(each_label, True)
and len(parameters_list) > 0
)
else (
float(constants_list.pop(0))
if len(constants_list) > 0.0
else 0.0
)
)
for each_label in self.parameter_labels()
)
else:
parameters = None
return self.__class__(
**{
**vars(self),
"state": np.reshape(cdof.ravel()[: self.size], self.shape),
"parameters": parameters,
**kwargs,
}
)
[docs]
def dt(self, order=1, array=False, **kwargs):
"""
Spectral time derivatives of the current state.
Parameters
----------
order :int
The order of the derivative.
array : bool
Whether or not to return a numpy array. Used for efficiency/avoiding construction of redundant
Orbit instances.
Returns
----------
orbit_dtn : OrbitKS
The class instance whose state is the time derivative in the spatiotemporal mode basis.
Notes
-----
It is very often the case that the derivative in the field basis is desired. Instead of forcing
the user to write self.transform(to='modes').dt().transform(to='field'), the functions are written
to incur extra overhead by transforming behind the scenes. To avoid unnecessary slow down, the OrbitKS
instances must be in the modes basis to compute self.eqn()
"""
if kwargs.get("inplace", False):
return_basis = kwargs.get("return_basis", self.basis)
# Need mode basis to compute derivatives
self.transform(to="modes", array=True, inplace=True)
# Elementwise multiplication of modes with frequencies, this is the derivative. Uses numpy broadcasting.
self.state = temporal_frequencies(self.t, self.n, order) * self.state
# If the order of the derivative is odd, then imaginary component and real components switch. Need to
# account for this for our real-valued transforms.
if np.mod(order, 2):
self.state = swap_modes(self.state, axis=0)
# To for numerical efficiency, NumPy arrays can be returned.
if array:
return self.transform(to=return_basis, inplace=True).state
else:
return self.transform(to=return_basis, inplace=True)
else:
# Need mode basis to compute derivatives
modes = self.transform(to="modes", array=True)
# Elementwise multiplication of modes with frequencies, this is the derivative. Uses numpy broadcasting.
dtn_modes = temporal_frequencies(self.t, self.n, order) * modes
# If the order of the derivative is odd, then imaginary component and real components switch. Need to
# account for this for our real-valued transforms.
if np.mod(order, 2):
dtn_modes = swap_modes(dtn_modes, axis=0)
# To for numerical efficiency, NumPy arrays can be returned.
if array:
return dtn_modes
else:
# return the derivative in an instance whose basis is equivalent to the original basis of self.
orbit_dtn = self.__class__(
**{**vars(self), "state": dtn_modes, "basis": "modes"}
)
return orbit_dtn.transform(to=self.basis)
[docs]
def dx(self, **kwargs):
"""
Spatial derivative of the current state.
Parameters
----------
kwargs :
order :int
The order of the derivative.
array : bool
Whether or not to return a numpy array. Used for efficiency/avoiding construction of redundant
Orbit instances.
computation_basis : str
The basis in which to compute the derivative.
return_basis : str
Which basis to return the ShiftReflectionOrbitKS in, if array=False.
Returns
----------
orbit_dxn : OrbitKS or subclass instance
Class instance whose spatiotemporal state represents the spatial derivative in the
the basis of the original state.
Notes
-----
It is very often the case that the derivative in the field basis is desired. Instead of forcing
the user to write self.transform(to='modes').dt().transform(to='field'), the functions are written
to incur extra overhead by transforming behind the scenes. To avoid unnecessary slow down, the OrbitKS
instances must be in the modes basis to compute self.eqn()
"""
# can compute spatial derivative in spatial mode or spatiotemporal mode basis. spatial_modes basis is required
# for orbits with discrete symmetry as they are orthogonal to the dx() direction.
computation_basis = kwargs.get("computation_basis", "modes")
inplace = kwargs.get("inplace", False)
order = kwargs.get("order", 1)
array = kwargs.get("array", False)
return_basis = kwargs.get("return_basis", self.basis)
if inplace:
self.transform(to=computation_basis, array=True, inplace=True)
self.state = (
spatial_frequencies(self.x, self.m, order)[:, : self.state.shape[1]]
* self.state
)
# If the order of the differentiation is odd, need to swap imaginary and real components.
if np.mod(order, 2):
self.state = swap_modes(self.state, axis=1)
self.transform(to=return_basis, inplace=True)
if array:
return self.state
else:
return self
else:
if computation_basis == "spatial_modes":
modes = self.transform(to="spatial_modes", array=True)
dxn_modes = spatial_frequencies(self.x, self.m, order) * modes
elif computation_basis == "modes":
modes = self.transform(to="modes", array=True)
# Slicing is a correction which only affects discrete symmetry orbits.
dxn_modes = (
spatial_frequencies(self.x, self.m, order)[:, : modes.shape[1]]
* modes
)
else:
raise ValueError(
f"{str(self)}.dx(computation_basis={computation_basis}); invalid basis for spectral differentiation. "
)
# If the order of the differentiation is odd, need to swap imaginary and real components.
if np.mod(order, 2):
dxn_modes = swap_modes(dxn_modes, axis=1)
if array:
return dxn_modes
else:
orbit_dxn = self.__class__(
**{**vars(self), "state": dxn_modes, "basis": computation_basis}
)
return orbit_dxn.transform(to=return_basis, inplace=True)
[docs]
def eqn(self, **kwargs):
"""
Instance whose state is the Kuramoto-Sivashinsky equation evaluated at the current state
kwargs :
Unused, to match signature of Orbit
Returns
-------
OrbitKS :
OrbitKS whose state is the spatiotamporal fourier modes resulting from the calculation of the K-S equation:
OrbitKS.state = u_t + u_xx + u_xxxx + 1/2 (u^2)_x
"""
# to be efficient, should be in modes basis.
assert (
self.basis == "modes"
), "Convert to spatiotemporal Fourier mode basis before computing K-S equation DAEs."
# to avoid two IFFT calls, convert before nonlinear product
orbit_field = self.transform(to="field")
# Compute the Kuramoto-sivashinsky equation; linear components differ between subclasses.
mapping_modes = self._eqn_linear_component(array=True) + orbit_field._nonlinear(
orbit_field, array=True
)
return self.__class__(
**{**vars(self), "state": mapping_modes, "basis": "modes"}
)
[docs]
def jacobian(self, **kwargs):
"""
Jacobian matrix evaluated at the current state.
kwargs :
Unused, included to match signature in Orbit class.
Returns
-------
jac_ : 2-d ndarray
Jacobian matrix whose columns are derivatives with respect to all unconstrained state variables;
including periods. Has dimensions dependent on number of spatiotemporal modes and free parameters,
(self.shapes()[-1].size, self.shapes()[-1].size + n_params)
Jacobian matrix of the KSe where n_params = 2 - sum(self.constraints)
Notes
-----
Original implementation was pretty, but very inefficient. This now computes the Jacobian matrix,
minimizing the amount of allocated memory by overwriting and performing matrix-free implementations of
Fourier transform matrix operations. Computes the Jacobian
$J = D_t + D_xx + D_xxxx + F_t D_x F_x Diag(u) F_x^{-1} F_t^{-1}$ in the following steps:
"""
assert (
self.basis == "modes"
), "Convert to spatiotemporal Fourier mode basis before computing Jacobian"
field_size, smode_size, mode_size = (np.prod(shp) for shp in self.shapes())
# Begin with nonlinear term. Apply matrix operators in matrix-free fashion. begin with diag(u)
J = np.diag(self.transform(to="field", array=True).ravel()).reshape(-1, self.m)
# By creatively reshaping J, can apply FFTs to 3-d tensor.
J = self.__class__(
**{**vars(self), "state": J, "basis": "field", "discretization": J.shape}
).transform(to="spatial_modes", array=True, inplace=True)
J = J.reshape(field_size, smode_size).T.reshape(-1, self.m)
# After transforming the columns, transpose and transform again to get F_x Diag(u) F_x^{-1}
J = self.__class__(
**{**vars(self), "state": J, "basis": "field", "discretization": J.shape}
)
# Reshape back into a matrix, and then into another 3-d tensor after transposing back, to take derivative.
J = J.transform(to="spatial_modes", array=True, inplace=True).reshape(
smode_size, smode_size
)
J = J.T.reshape(-1, self.m - 2)
J = self.__class__(
**{
**vars(self),
"state": J,
"basis": "spatial_modes",
"discretization": (J.shape[0], self.m),
}
)
J = J.dx(array=True, computation_basis="spatial_modes", inplace=True)
# At this point J represents D_x F_x Diag(u) F_x^{-1}; reshape into 3-d tensor again and apply
# time transforms
J = J.reshape(-1, smode_size).T.reshape(self.n, self.m - 2, -1)
J = self.__class__(
**{
**vars(self),
"state": J,
"discretization": (*self.discretization, J.shape[-1]),
"basis": "spatial_modes",
}
).transform(to="modes", array=True, inplace=True)
# reshape, transpose and transform.
J = (
J.reshape((*self.shapes()[2], -1))
.reshape(-1, smode_size)
.T.reshape(self.n, self.m - 2, -1)
)
J = self.__class__(
**{
**vars(self),
"state": J,
"discretization": (*self.discretization, J.shape[-1]),
"basis": "spatial_modes",
}
).transform(to="modes", array=True, inplace=True)
J = J.reshape((*self.shapes()[2], -1)).reshape(J.shape[-1], J.shape[-1]).T
# Produce the linear term; spatial derivatives are diagonal; time is more complicated due to SO(2) operator.
e = np.ones(self.shapes()[2])
dx2 = self.__class__(**{**vars(self), "state": e, "basis": "modes"}).dx(order=2)
J[np.diag_indices(J.shape[0])] += dx2.state.ravel() + dx2.state.ravel() ** 2
e = e.ravel()
# For time, get the correct frequency by taking the derivative of individual elements; swapping of real
# and imaginary components handled by .dt() Performed column-wise to avoid matrix representation.
for i in range(J.shape[0]):
e *= 0
e[i] = 1.0
J[:, i] += (
self.__class__(
**{**vars(self), "state": e.reshape(self.shape), "basis": "modes"}
)
.dt()
.state.ravel()
)
J = self._jacobian_parameter_derivatives_concat(J)
return J
[docs]
def matvec(self, other, **kwargs):
"""
Matrix-vector product of a vector with the Jacobian of the current state.
Parameters
----------
other : OrbitKS
OrbitKS instance whose state represents the vector in the matrix-vector multiplication.
kwargs : dict
Included to match Orbit signature
Returns
-------
OrbitKS :
OrbitKS whose state and other parameters result from the matrix-vector product.
Notes
-----
Equivalent to computation of v_t + v_xx + v_xxxx + d_x (u .* v), where v is the state of 'other'.
"""
assert (self.basis == "modes") and (other.basis == "modes")
self_field = self.transform(to="field")
# The correct derivative of the vector in the matrix vector product needs the current state parameters in
# self but the state stored in other.
other_mode_component = other.__class__(
**{**vars(self), "state": other.state, "basis": other.basis}
)
other_field = other_mode_component.transform(to="field")
# Factor of two corrects the 1/2 u^2 from differentiation of nonlinear term.
matvec_modes = other_mode_component._eqn_linear_component(
array=True
) + 2 * self_field._nonlinear(other_field, array=True)
if not self.constraints["t"]:
# Compute the product of the partial derivative with respect to T with the vector's value of T.
# This is only relevant when other.t an incremental value dt from a numerical method.
matvec_modes += other.t * (-1.0 / self.t) * self.dt(array=True)
if not self.constraints["x"]:
# Compute the product of the partial derivative with respect to L with the vector's value of L.
# This is only relevant when other.x an incremental value dx from a numerical method.
dfdl = (
(-2.0 / self.x) * self.dx(order=2, array=True)
+ (-4.0 / self.x) * self.dx(order=4, array=True)
+ (-1.0 / self.x) * self_field._nonlinear(self_field, array=True)
)
matvec_modes += other.x * dfdl
return self.__class__(**{**vars(self), "state": matvec_modes, "basis": "modes"})
[docs]
def rmatvec(self, other, **kwargs):
"""
Matrix-vector product with the adjoint of the Jacobian
Parameters
----------
other : OrbitKS
OrbitKS whose state represents the vector in the matrix-vector product.
Returns
-------
orbit_rmatvec : OrbitKS
OrbitKS with values representative of the adjoint-vector product
Notes
-----
The adjoint vector product in this case is defined as J^T * v, where J is the jacobian matrix. Equivalent to
evaluation of -v_t + v_xx + v_xxxx - (u .* v_x). In regards to preconditioning (which is very useful
for certain numerical methods, right preconditioning and left preconditioning switch meanings when the
jacobian is transposed. i.e. Right preconditi oning of the Jacobian can include preconditioning of the state
parameters (which in this case are usually incremental corrections dt, dx, ds);
this corresponds to LEFT preconditioning of the adjoint.
The derivatives always occur with respect to the parameters of u. therefore, the evaluation of
_rmatvec_linear_component and _rnonlinear require the parameters from self.
"""
assert (self.basis == "modes") and (other.basis == "modes")
# store the state in the field basis for the pseudospectral products
self_field = self.transform(to="field")
other_modes = other.__class__(**{**vars(other), "parameters": self.parameters})
rmatvec_modes = other_modes._rmatvec_linear_component(
array=True
) + self_field._rnonlinear(other_modes, array=True)
# parameters are derived by multiplying partial derivatives w.r.t. parameters with the other orbit.
rmatvec_params = self._rmatvec_parameters(self_field, other_modes)
return self.__class__(
**{
**vars(self),
"state": rmatvec_modes,
"basis": "modes",
"parameters": rmatvec_params,
}
)
def costgrad(self, *args, **kwargs):
"""
Derivative of $1/2 |F|^2$
Parameters
----------
eqn : OrbitKS
Orbit instance whose state equals DAE evaluated with respect to current state, i.e. F(v)
kwargs :
Any keyword arguments relevant for rmatvec, eqn, or 'preconditioning'.
Returns
-------
gradient :
OrbitKS instance whose state contains $(dF/dv)^T * F = J^T F$
Notes
-----
In this case, "preconditioning" is numerical rescaling of the gradient used as a numerical tool in descent
methods.
"""
if args:
eqn = args[0]
else:
eqn = self.eqn()
grad = self.rmatvec(eqn, **kwargs)
if kwargs.get("preconditioning", False):
# This preconditions with respect to the current state. not J^T F
grad = grad.precondition(
**{"pmult": self.preconditioning_parameters(), **kwargs}
)
return grad
[docs]
def plot(
self, show=True, save=False, padding=False, fundamental_domain=False, **kwargs
):
"""
Plot the velocity field as a 2-d density plot using matplotlib's imshow
Parameters
----------
show : bool
Whether or not to display the figure
save : bool
Whether to save the figure
padding : bool
Whether or not to interpolate more points before plotting. Done numerically instead of via plt.imshow
fundamental_domain : bool
Whether to plot only the fundamental domain or not.
**kwargs :
padding_shape : (int, int)
The field discretization to plot, will be used instead of default padding if padding is enabled.
filename : str
The (custom) save name of the figure, if save==True. Save name will be populated otherwise.
extension : str
The file extension to save as, .png, .pdf, etc. Values supported by matplotlib only.
figsize : (int, int)
The matplotlib figure size.
Notes
-----
Many of the defaults are experiential quantities; for comparison, i.e. larger domains require larger figures,
however, to avoid getting too large or small, a number of defaults are set in place to make sure that the
labels and scales are sensible.
The time axis for EquilibriumOrbitKS is labeled by infinity to indicate that they do not change over time.
"""
if padding:
padding_shape = kwargs.get("padding_shape", (16 * self.n, 16 * self.m))
plot_orbit = self.resize(padding_shape)
else:
plot_orbit = self.copy()
if fundamental_domain:
plot_orbit = plot_orbit.to_fundamental_domain().transform(to="field")
else:
# The fundamental domain is never used in computation, so no operations are required if we do not want
# to plot the fundamental domain explicitly.
plot_orbit = plot_orbit.transform(to="field")
# The following creates custom tick labels and accounts for some pathological cases
# where the period is too small (only a single label) or too large (many labels, overlapping due
# to font size) Default label tick size is 10 for time and the fundamental frequency, 2 pi sqrt(2) for space.
# Create time ticks, with the separation
if plot_orbit.t > 10:
timetick_step = np.max(
[
np.min(
[
100,
(
5
* 2
** (np.max([int(np.log2(plot_orbit.t // 2)) - 3, 1]))
),
]
),
5,
]
)
yticks = np.arange(0, plot_orbit.t, timetick_step)
ylabels = np.array([str(int(y)) for y in yticks])
elif 0 < plot_orbit.t <= 10:
scaled_T = np.round(plot_orbit.t, 1)
yticks = np.array([0, plot_orbit.t])
ylabels = np.array(["0", str(scaled_T)])
else:
plot_orbit.t = np.min([plot_orbit.x, 1])
yticks = np.array([0, plot_orbit.t])
ylabels = np.array(["0", "0"])
if plot_orbit.x > 2 * pi * np.sqrt(2):
xmult = (plot_orbit.x // 64) + 1
xscale = xmult * 2 * pi * np.sqrt(2)
xticks = np.arange(0, plot_orbit.x, xscale)
xlabels = [str(round((xmult * x) / xscale)) for x in xticks]
else:
scaled_L = np.round(plot_orbit.x / (2 * pi * np.sqrt(2)), 1)
xticks = np.array([0, plot_orbit.x])
xlabels = np.array(["0", str(scaled_L)])
default_figsize = (
min([max([0.25, 0.15 * plot_orbit.x**0.7]), 16]),
min([max([0.25, 0.15 * plot_orbit.t**0.7]), 16]),
)
# # this allows for local non-zero galilean velocity to be more easily displayed
maxval = np.round(
np.abs(
np.array(
[plot_orbit.state.ravel().min(), plot_orbit.state.ravel().max()]
)
).max(),
1,
)
cbarticks = [-maxval, maxval]
cbarticklabels = [str(i) for i in np.round(cbarticks, 1)]
cmap = kwargs.get("cmap", "twilight_shifted")
figsize = kwargs.get("figsize", default_figsize)
extentL, extentT = np.min([15, figsize[0]]), np.min([15, figsize[1]])
scaled_font = np.max([int(np.min([20, np.mean(figsize)])), 10])
plt.rcParams.update({"font.size": scaled_font})
fig, ax = plt.subplots(figsize=(extentL, extentT))
image = ax.imshow(
plot_orbit.state,
extent=[0, extentL, 0, extentT],
cmap=cmap,
interpolation="none",
aspect="auto",
vmin=-maxval,
vmax=maxval,
)
xticks = (xticks / plot_orbit.x) * extentL
yticks = (yticks / plot_orbit.plotting_dimensions()[0][1]) * extentT
# Include custom ticks and tick labels
ax.set_xticks(xticks)
ax.set_yticks(yticks)
ax.set_xticklabels(xlabels, ha="left")
ax.set_yticklabels(ylabels, va="center")
ax.grid(True, linestyle="dashed", color="k", alpha=0.8)
fig.subplots_adjust(right=0.95)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size=0.075, pad=0.1)
cbar = plt.colorbar(image, cax=cax, ticks=cbarticks)
cbar.ax.set_yticklabels(cbarticklabels, fontdict={"fontsize": scaled_font})
if save or kwargs.get("filename", None):
extension = kwargs.get("extension", ".png")
filename = kwargs.get("filename", None) or self.filename(
extension=extension
)
# Create save name if one doesn't exist.
if filename.endswith(".h5"):
filename = "".join([filename.split(".h5")[0], extension])
if fundamental_domain:
# Need to rename fundamental domain or else it will overwrite, of course there
# is no such thing for solutions without any symmetries.
filename_tmp = filename.split(".")
filename = "".join([filename_tmp[0], "_fdomain.", filename_tmp[-1]])
# If filename is provided as an absolute path it overrides the value of 'directory'.
filename = os.path.abspath(os.path.join(filename))
if kwargs.get("verbose", False):
print("Saving figure to {}".format(filename))
plt.savefig(filename, bbox_inches="tight", pad_inches=0.05)
if show:
plt.show()
plt.close()
return None
[docs]
def mode_plot(self, show=True, save=False, scale="log", **kwargs):
"""
Plot the spatiotemporal Fourier spectrum as a 2-d density plot using matplotlib's imshow
Parameters
----------
show : bool
Whether or not to display the figure
save : bool
Whether to save the figure
scale : str
Whether or not to plot using transformation of `np.log10(self.state)`
**kwargs :
filename : str
The (custom) save name of the figure, if save==True. Save name will be populated otherwise.
verbose : bool
If True then prints the location of the file saving, if attempted.
extension : str
The file extension to save as, .png, .pdf, etc. Values supported by matplotlib only.
"""
if scale == "log":
modes = np.abs(self.transform(to="modes").state)
modes[modes > 0.0] = np.log10(modes[modes > 0.0])
else:
modes = self.transform(to="modes").state
fig, ax = plt.subplots()
image = ax.imshow(modes, interpolation="none", aspect="auto")
# Custom colorbar values
fig.subplots_adjust(right=0.95)
divider = make_axes_locatable(ax)
cax = divider.append_axes("right", size=0.075, pad=0.1)
plt.colorbar(image, cax=cax)
if save or kwargs.get("filename", None):
extension = kwargs.get("extension", ".png")
filename = kwargs.get("filename", None) or self.filename(
extension=extension
)
# Create save name if one doesn't exist.
if filename.endswith(".h5"):
filename = "".join([filename.split(".h5")[0], extension])
# If filename is provided as an absolute path it overrides the value of 'directory'.
filename = os.path.abspath(os.path.join(filename))
if kwargs.get("verbose", False):
print("Saving figure to {}".format(filename))
plt.savefig(filename, bbox_inches="tight", pad_inches=0.05)
if show:
plt.show()
plt.close()
return None
def preconditioning_parameters(self):
"""
Parameters bundled for convenience
Returns
-------
tuple(tuple, tuple) :
Time and spatial parameters for usage in preconditioning
Notes
-----
It is often the case that rescaling one OrbitKS with respect to another OrbitKS's parameters is desired.
This allows calls like self.precondition(other.parameters), for example.
"""
return (self.t, self.n), (self.x, self.m)
[docs]
def precondition(self, **kwargs):
"""
Rescale a vector with the inverse (absolute value) of linear spatial terms
Parameters
----------
kwargs :
pmult : tuple of tuples
Parameters passed in the form self.preconditioning_parameters
pexp : tuple
Exponents for the parameter scaling, default (1, 4)
Returns
-------
target : OrbitKS
Return the OrbitKS instance, whose state and parameters have been modified by preconditioning.
Notes
-----
Often we want to precondition a state derived from a mapping or rmatvec (gradient descent step),
with respect to another orbit's (current state's) parameters. By passing parameters we can access the
cached classmethods.
I never preconditioned the spatial shift for relative periodic solutions so I don't include it here.
"""
pmult = kwargs.get("pmult", self.preconditioning_parameters())
p_multipliers = 1.0 / (
np.abs(temporal_frequencies(*pmult[0], order=1))
+ np.abs(spatial_frequencies(*pmult[1], order=2)[:, : self.shapes()[2][1]])
+ spatial_frequencies(*pmult[1], order=4)[:, : self.shapes()[2][1]]
)
preconditioned_state = np.multiply(self.state, p_multipliers)
# Precondition the change in T and L
pexp = kwargs.get("pexp", (1, 4))
if not self.constraints["t"]:
# self is the orbit being preconditioned, i.e. the correction orbit; by default this is dt = dt / T
t = self.t * (pmult[0][0] ** -pexp[0])
else:
t = self.t
if not self.constraints["x"]:
# self is the orbit being preconditioned, i.e. the correction orbit; by default this is dx = dx / L^4
x = self.x * (pmult[1][0] ** -pexp[1])
else:
x = self.x
return self.__class__(
**{
**vars(self),
"state": preconditioned_state,
"basis": "modes",
"parameters": (t, x, self.s),
}
)
def preconditioner(self, **kwargs):
"""
Returns a diagonal preconditioner as a scipy LinearOperator instance
Parameters
----------
kwargs : dict
Any keyword arguments that are accepted by precondition.
Returns
-------
LinearOperator :
An object with callable methods that return matrix-vector products, for `v=vector` and `A=LinearOperator`,
A.matvec(v) and A.rmatvec(v) compute $A * v$ and $A^T * v$, respectively. Here the LinearOperator
approximates the inverse of the operator that defines the linear component of the KSe
"""
# To get the diagonal preconditioner, can apply preconditioning to an array of 1's, returning the multipliers.
# The orbit vector of this instance represents the diagonal of a diagonal preconditioning matrix.
multipliers = self.__class__(
**{**vars(self), "state": np.ones(self.shape), "parameters": (1, 1, 1)}
).precondition(**{"pmult": self.preconditioning_parameters(), **kwargs})
diag_M = np.concatenate(
(
multipliers.state.ravel(),
tuple(
p
for p, constr in zip(
multipliers.parameters, self.constraints.values()
)
if not constr
),
)
).reshape(-1, 1)
def matvec_(v):
# v is an orbit vector,
nonlocal diag_M
return v * diag_M.reshape(v.shape)
def matmat_(V):
# V is a matrix (M, K), this computes broadcasting via (M, 1)*(M, K)
return diag_M.reshape(-1, 1) * V
# rmatvec = matvec because diagonal
return LinearOperator(
shape=(diag_M.size, diag_M.size),
matvec=matvec_,
rmatvec=matvec_,
matmat=matmat_,
rmatmat=matmat_,
)
[docs]
def reflection(self, axis=1, signed=True):
"""
Reflect the velocity field about the spatial midpoint
Returns
-------
OrbitKS :
OrbitKS whose state is the reflected velocity field -u(L-x,t).
Notes
-----
The correction provided by np.roll is due to the location of the reflection axis in the field array.
Numerically it is located at the (self.m//2 + 1)-th element while intuitively it should be "between"
self.m//2 + 1 and self.m//2. Simply a consequence of implementation.
"""
# Different points in space represented by columns of the state array
reflected_field = -1.0 * np.roll(
np.fliplr(self.transform(to="field").state), 1, axis=axis
)
return self.__class__(
**{
**vars(self),
"state": reflected_field,
"basis": "field",
"parameters": (self.t, self.x, -1 * self.s),
}
).transform(to=self.basis)
[docs]
def roll(self, shift, axis=0):
"""
Apply numpy roll along specified axis.
Parameters
----------
shift : int or tuple of int
Number of discrete points to rotate by
axis : int or tuple of int
The numpy ndarray axes along which to roll
Returns
-------
OrbitKS :
Instance with rolled state
"""
return self.__class__(
**{
**vars(self),
"state": np.roll(self.transform(to="field").state, shift, axis=axis),
}
).transform(to=self.basis)
[docs]
def cell_shift(self, n_cell, axis=0):
"""
Rotate by fraction of the period in either axis; nearest discrete approximate is taken.
Parameters
----------
n_cell : int or array-like
Fraction of the domain to rotate by.
axis : int or array-like
The numpy ndarray axes along which to roll
Returns
-------
OrbitKS :
Instance with rolled state
Notes
-----
If provided tuples of ints, shift and axis need to be the same length as to be coherent.
"""
# To allow slicing, discretization temporarily cast as array.
return (
self.transform(to="field")
.roll(
np.sign(n_cell)
* np.array(self.discretization)[np.array(axis)]
// np.abs(n_cell),
axis=axis,
)
.transform(to=self.basis)
)
[docs]
def rotate(self, distance, axis=0, units="plotting"):
"""
Rotate the velocity field in either space or time.
Parameters
----------
distance : float
The rotation / translation amount, in dimensionless units of time or space.
axis : int
The axis of the ndarray (state) that rotations
units : str
Determines the spatial units of the provided rotation
Returns
-------
OrbitKS :
OrbitKS whose field has been rotated.
Notes
-----
Due to periodic boundary conditions, translation is equivalent to rotation on a fundamental level here.
Hence the use of 'distance' instead of 'angle'. This can be negative. Also due to the periodic boundary
conditions, a distance equaling the entire domain length is equivalent to no rotation. I.e.
the rotation is always modulo L or modulo T.
The orbit only remains a converged solution if rotations coincide with collocation
points. i.e. multiples of L / M and T / N. The reason for this is because arbitrary rotations require
interpolation of the field.
Rotation breaks discrete symmetry and destroys the solution. Users encouraged to change to OrbitKS first.
"""
if axis == 0:
orbit_to_rotate = self.transform(to="modes")
# angle to rotate by
thetaj = (
distance
* temporal_frequencies(self.t, self.n, 1)[
1 : -(orbit_to_rotate.n // 2 - 1), :
]
)
cosinej = np.cos(thetaj)
sinej = np.sin(thetaj)
# Splitting into real and imaginary components of temporal modes
modes_time_real = orbit_to_rotate.state[
1 : -(orbit_to_rotate.n // 2 - 1), :
]
modes_time_imaginary = orbit_to_rotate.state[
-(orbit_to_rotate.n // 2 - 1) :, :
]
# Elementwise product to account for matrix product with "2-D" rotation matrix
rotated_real = cosinej * modes_time_real + sinej * modes_time_imaginary
rotated_imag = -sinej * modes_time_real + cosinej * modes_time_imaginary
time_rotated_modes = np.concatenate(
(orbit_to_rotate.state[None, 0, :], rotated_real, rotated_imag),
axis=0,
)
return self.__class__(
**{**vars(self), "state": time_rotated_modes, "basis": "modes"}
).transform(to=self.basis)
else:
if units == "plotting":
# conversion from plotting units.
distance = distance * 2 * pi * np.sqrt(2)
orbit_to_rotate = self.transform(to="spatial_modes")
# angles to rotate by
thetak = (
distance
* spatial_frequencies(self.x, self.m, 1)[
:, : -(orbit_to_rotate.m // 2 - 1)
]
)
cosinek = np.cos(thetak)
sinek = np.sin(thetak)
# Rotation performed on spatial modes because otherwise rotation is ill-defined for Antisymmetric and
# Shift-reflection symmetric Orbits.
spatial_modes_real = orbit_to_rotate.state[
:, : -(orbit_to_rotate.m // 2 - 1)
]
spatial_modes_imaginary = orbit_to_rotate.state[
:, -(orbit_to_rotate.m // 2 - 1) :
]
rotated_real = (
cosinek * spatial_modes_real + sinek * spatial_modes_imaginary
)
rotated_imag = (
-sinek * spatial_modes_real + cosinek * spatial_modes_imaginary
)
rotated_spatial_modes = np.concatenate((rotated_real, rotated_imag), axis=1)
return self.__class__(
**{
**vars(self),
"state": rotated_spatial_modes,
"basis": "spatial_modes",
}
).transform(to=self.basis)
[docs]
def shift_reflection(self):
"""
Return a OrbitKS with shift-reflected velocity field
Returns
-------
OrbitKS :
OrbitKS with shift-reflected velocity field
Notes
-----
Shift reflection in this case is a composition of spatial reflection and temporal translation by
half of the period. Because these are in different dimensions these operations commute.
"""
shift_reflected_field = np.roll(
-1.0 * np.roll(np.fliplr(self.transform(to="field").state), 1, axis=1),
self.n // 2,
axis=0,
)
return self.__class__(
**{**vars(self), "state": shift_reflected_field, "basis": "field"}
).transform(to=self.basis)
def group_orbit(self, **kwargs):
"""
Group orbit generator.
Yields
------
An instance which is an element of the (discrete or continuous) group orbit (equivariance) of the
current instance.
"""
if kwargs.get("discrete", False):
# The discrete symmetry operations which preserve reflection symmetry axis. Spatial only.
for g_space in (
self,
self.reflection(),
self.cell_shift(2, axis=1),
self.cell_shift(2, axis=1).reflection(),
):
# Half cell shifts in time to account for shift-reflection invariant orbits
for g in [g_space, g_space.cell_shift(2, axis=0)]:
if kwargs.get("fundamental_domain", False):
yield g.to_fundamental_domain()
else:
yield g
elif kwargs.get("continuous", False):
rolls = kwargs.get("rolls", (1, 1))
for N in range(0, self.n, rolls[0]):
for M in range(0, self.m, rolls[1]):
if kwargs.get("fundamental_domain", False):
yield self.roll(N, axis=0).roll(
M, axis=1
).to_fundamental_domain()
else:
yield self.roll(N, axis=0).roll(M, axis=1)
else:
# Don't need cell shifts, these are within the rotations. Arbitrary rotations require interpolation;
# only roll preserves orbit's status as a solution.
rolls = kwargs.get("rolls", (1, 1))
for g in [self, self.reflection()]:
for N in range(0, g.n, rolls[0]):
for M in range(0, g.m, rolls[1]):
if kwargs.get("fundamental_domain", False):
yield g.roll(N, axis=0).roll(
M, axis=1
).to_fundamental_domain()
else:
yield g.roll(N, axis=0).roll(M, axis=1)
[docs]
def shapes(self):
"""
State array shapes in different bases; determined by symmetry selection rules.
"""
return (
(self.n, self.m),
(self.n, self.m - 2),
(max([self.n - 1, 1]), self.m - 2),
)
[docs]
def dimensions(self):
"""
Tile dimensions.
"""
return self.t, self.x
[docs]
def plotting_dimensions(self):
"""
Dimensions according to plot labels; used in clipping.
"""
return (0.0, self.t), (0.0, self.x / (2 * pi * np.sqrt(2)))
[docs]
def _pad(self, size, axis=0, **kwargs):
"""
Increase the size of the discretization via zero-padding
Parameters
----------
size : int
The new size of the discretization, must be an even integer
larger than the current size of the discretization (handled by resize method).
axis : int
The dimension of the state that will be padded.
Returns
-------
OrbitKS :
OrbitKS instance with larger discretization.
Notes
-----
Need to account for the normalization factors by multiplying by old, dividing by new.
"""
modes = self.transform(to="modes")
if np.mod(size, 2):
raise ValueError(
"New discretization size must be an even number, preferably a power of 2"
)
else:
if axis == 0:
# Due to formatting, can prepend and append zeros to second half as opposed to appending
# to first and second halves.
padding = (size - modes.n) // 2
padding_tuple = ((padding, padding), (0, 0))
padded_modes = np.concatenate(
(
modes.state[: -(modes.n // 2 - 1), :],
np.pad(modes.state[-(modes.n // 2 - 1) :, :], padding_tuple),
),
axis=0,
)
padded_modes *= np.sqrt(size / modes.n)
return self.__class__(
**{
**vars(self),
"state": padded_modes,
"basis": "modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
padding = (size - modes.m) // 2
padding_tuple = ((0, 0), (padding, padding))
padded_modes = np.concatenate(
(
modes.state[:, : -(modes.m // 2 - 1)],
np.pad(modes.state[:, -(modes.m // 2 - 1) :], padding_tuple),
),
axis=1,
)
padded_modes *= np.sqrt(size / modes.m)
return self.__class__(
**{
**vars(self),
"state": padded_modes,
"basis": "modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
[docs]
def _truncate(self, size, axis=0):
"""
Decrease the size of the discretization via truncation
Parameters
----------
size : int
The new size of the discretization, must be an even integer
smaller than the current size of the discretization (handled by resize method).
axis : int
The dimension of the state that will be truncated.
Returns
-------
OrbitKS :
OrbitKS instance with smaller discretization.
Notes
-----
Need to account for the normalization factors by multiplying by old, dividing by new.
"""
modes = self.transform(to="modes")
if np.mod(size, 2):
raise ValueError(
"New discretization size must be an even number, preferably a power of 2"
)
else:
if axis == 0:
truncate_number = int(size // 2) - 1
# Split into real and imaginary components, truncate separately.
first_half = modes.state[: truncate_number + 1, :]
second_half = modes.state[
-(modes.n // 2 - 1) : -(modes.n // 2 - 1) + truncate_number, :
]
truncated_modes = np.sqrt(size / modes.n) * np.concatenate(
(first_half, second_half), axis=0
)
return self.__class__(
**{
**vars(self),
"state": truncated_modes,
"basis": "modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
truncate_number = int(size // 2) - 1
# Split into real and imaginary components, truncate separately.
first_half = self.state[:, :truncate_number]
second_half = self.state[
:,
-(int(self.m // 2) - 1) : -(int(self.m // 2) - 1) + truncate_number,
]
truncated_modes = np.sqrt(size / modes.m) * np.concatenate(
(first_half, second_half), axis=1
)
return self.__class__(
**{
**vars(self),
"state": truncated_modes,
"basis": "modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
[docs]
def preprocess(self):
"""
Check whether the orbit converged to an equilibrium or close-to-zero solution
"""
# Take the L_2 norm of the field, if uniformly close to zero, the magnitude will be very small.
field_orbit = self.transform(to="field")
# Equilibrium is defined by having no temporal variation, i.e. time derivative is a uniformly zero.
if self.t == 0.0:
# If there is sufficient evidence that solution is an equilibrium, change its class
# store T just in case we want to refer to what the period was before conversion to EquilibriumOrbitKS
return EquilibriumOrbitKS(
state=field_orbit.state, basis="field", parameters=self.parameters
).transform(to=self.basis)
# See if the L_2 norm is beneath a threshold value, if so, replace with zeros.
elif field_orbit.norm() < field_orbit.size * 10**-9:
return EquilibriumOrbitKS(
state=np.zeros(self.discretization),
basis="field",
parameters=self.parameters,
).transform(to=self.basis)
elif field_orbit.dt().transform(to="field").norm() < field_orbit.size * 10**-9:
# If there is sufficient evidence that solution is an equilibrium, change its class
# code = 3
# store T just in case we want to refer to what the period was before conversion to EquilibriumOrbitKS
return EquilibriumOrbitKS(
state=field_orbit.state, basis="field", parameters=self.parameters
).transform(to=self.basis)
else:
return self
[docs]
def to_h5(
self,
filename=None,
dataname=None,
h5mode="a",
verbose=False,
include_cost=True,
**kwargs,
):
"""
Export current state information to HDF5 file. See core.py for more details
Parameters
----------
filename : str or None
If None then filename method will be called.
dataname : str or None
h5py group name for orbit
h5mode : str
The writing mode, see core.py for details
verbose : bool
Whether or not to print save destination.
include_cost :
Whether to save cost to h5 file.
Notes
-----
Mainly an overload simply to get a different default behavior for include_cost.
"""
super().to_h5(
filename=filename,
dataname=dataname,
h5mode=h5mode,
verbose=verbose,
include_cost=include_cost,
**kwargs,
)
[docs]
@classmethod
def dimension_based_discretization(cls, dimensions, **kwargs):
"""
Return discretization size according to orbithunter conventions for the KSe.
Parameters
----------
dimensions : tuple
tuple containing (T, L) as first two entries of tuple (i.e. self.parameters or self.dimensions)
kwargs :
resolution : str
Takes values 'coarse', 'normal', 'fine', 'power'.
These options return the according discretization sizes as described below.
discretization : tuple of int or None
If one wants to force or specify a particular dimension's discretization, it can be provided
as, for example, discretization=(N, None).
Returns
-------
discretization : tuple of ints
The new spatiotemporal field discretization; number of time points
(rows) and number of space points (columns)
Notes
-----
This function should only ever be called by resize, the returned values can always be accessed by
the appropriate attributes of the rediscretized orbit.
"""
resolution = kwargs.get("resolution", "default")
t, x = dimensions
N, M = kwargs.get("discretization", (None, None))
if N is None:
if t == 0:
N = cls._default_shape()[0]
elif resolution == "coarse":
N = np.max([2 ** (int(np.log2(t) - 2)), 16])
elif resolution == "fine":
N = np.max([2 ** (int(np.log2(t) + 1)), 32])
elif resolution == "power":
N = np.max([2 * (int(4 * t ** (1.0 / 2.0)) // 2), 32])
else:
N = np.max([2 ** (int(np.log2(t) - 1)), 32])
N = max(N, cls.minimal_shape()[0])
if M is None:
if x == 0:
M = cls._default_shape()[1]
elif resolution == "coarse":
M = np.max([2 ** (int(np.log2(x) - 1)), 16])
elif resolution == "fine":
M = np.max([2 ** (int(np.log2(x) + 2)), 32])
elif resolution == "power":
M = np.max([2 * (int(4 * x ** (1.0 / 2.0)) // 2), 32])
else:
M = np.max([2 ** (int(np.log2(x) + 0.5)), 32])
M = max(M, cls.minimal_shape()[1])
return N, M
[docs]
@classmethod
def positive_indexing(cls):
"""
Indicates whether numpy indexing corresponds to increasing or decreasing values configuration space variable
Returns
-------
tuple :
A tuple of bool, one for each continuous dimension which indicates whether positively increasing
indices indicate the 'positive direction' in configuration space.
Notes
-----
This is mainly for compatibility for the "upwards" time convention in the field of Physics, used for the
Kuramoto-Sivashinsky equation. That is, the last element along the time axis, i.e. index = -1
corresponds to t=0 if False, t=T if True, where T is the temporal period of the Orbit.
"""
return False, True
[docs]
@classmethod
def _default_parameter_ranges(cls):
"""
Default parameter ranges.
Returns
-------
dict :
Keys are parameter labels, values are intervals to sample from in parameter generation.
"""
return {"t": (20, 200), "x": (20, 100)}
[docs]
@classmethod
def _default_constraints(cls):
"""
Defaults for whether or not parameters are constrained. parameter labels are forced to be constant.
"""
return {"t": False, "x": False, "s": True}
[docs]
def _populate_state(self, **kwargs):
"""
Initialize a set of random spatiotemporal Fourier modes
Parameters
----------
**kwargs
tscale : int
"Mean" for temporal spectrum modulation
xscale : int
"Mean" for spatial spectrum modulation
xvar : float
"Variance" for temporal spectrum modulation
tvar : float
"Variance"for spatial spectrum modulation
seed : int
Value to seed random number generator with.
Returns
-------
self :
OrbitKS whose state has been modified to be a set of random Fourier modes.
Notes
-----
These are the initial condition generators that I find the most useful. If a different method is
desired, simply pass the array as 'state' variable to __init__.
By initializing the shape parameters and orbit parameters, the other properties get initialized, so
they can be referenced in what follows (). I am unsure whether or not
this is bad practice but they could be replaced by the corresponding tuples. The reason why this is avoided
is so this function generalizes to subclasses.
"""
spatial_modulation = kwargs.get("spatial_modulation", "laplace")
temporal_modulation = kwargs.get("temporal_modulation", "gaussian")
tscale = kwargs.get("tscale", int(np.round(self.t / 20.0)))
xscale = kwargs.get("xscale", int(np.round(self.x / (2 * pi * np.sqrt(2)))))
xvar = kwargs.get("xvar", max([np.sqrt(xscale), 1]))
tvar = kwargs.get("tvar", max([np.sqrt(tscale), 1]))
np.random.seed(kwargs.get("seed", None))
# also accepts discretization as kwarg
n, m = self.dimension_based_discretization(self.dimensions(), **kwargs)
if n < self.minimal_shape()[0] or m < self.minimal_shape()[1]:
warn_str = "\nminimum discretization requirements not met; methods may not work as intended."
warnings.warn(warn_str, RuntimeWarning)
self.discretization = n, m
# I think this is the easiest way to get symmetry-dependent Fourier mode arrays' shapes.
# power = 2 b.c. odd powers not defined for spacetime modes for discrete symmetries.
space_ = np.abs(
spatial_frequencies(2 * pi, self.m, 1)[:, : self.shapes()[2][1]]
).astype(int)
time_ = np.abs(temporal_frequencies(2 * pi, self.n, 1)).astype(int)
random_modes = np.random.randn(*self.shapes()[2])
# Anecdotal evidence shows that not enough space-time coupling produces traveling waves very often.
# For example, scaling the spatial and temporal modes with the same gaussian profile. Therefore,
# The following is added as a means of coupling space and time. It also mimics witnessed spectra.
xshift = kwargs.get("xshift", "sqrt")
if xshift == "sqrt":
xscale = xscale + np.sqrt(time_)
elif xshift == "log":
xscale = xscale + np.log(1 + time_)
elif xshift == "lin":
xscale = xscale + time_
elif isinstance(xshift, np.ndarray):
xscale = xscale + xshift
# Spatial then temporal modulation, done separately now.
original_mode_norm = np.linalg.norm(random_modes)
if spatial_modulation == "gaussian":
modulator = np.exp(-((space_ - xscale) ** 2) / (2 * xvar))
elif spatial_modulation == "laplace":
modulator = np.exp(-1.0 * np.abs(space_ - xscale) / np.sqrt(xvar))
elif spatial_modulation == "laplace_sqrt":
modulator = np.exp(-1.0 * np.sqrt(np.abs(space_ - xscale)) / np.sqrt(xvar))
elif spatial_modulation == "plateau_linear":
modulator = np.divide(
1, ((2 * pi * space_ / self.x) ** 2 - (2 * pi * space_ / self.x) ** 4)
) * np.ones(time_.shape)
modulator[space_ <= xscale] = 1
elif spatial_modulation == "exponential_linear":
modulator = np.exp(
(2 * pi * space_ / self.x) ** 2 - (2 * pi * space_ / self.x) ** 4
) * np.ones(time_.shape)
elif spatial_modulation == "flat_top":
modulator = np.exp(-((np.abs(space_ - xscale) / xvar) ** 5))
else:
modulator = np.ones(random_modes.shape)
modes = np.multiply(modulator, random_modes)
if temporal_modulation == "gaussian":
modulator = np.exp(-((time_ - tscale) ** 2) / (2 * tvar))
elif temporal_modulation == "laplace":
modulator = np.exp(-1.0 * np.abs(time_ - tscale) / np.sqrt(tvar))
elif temporal_modulation == "flat_top":
modulator = np.exp(-((np.abs(time_ - tscale) / tvar) ** 5))
elif temporal_modulation == "laplace_plateau":
modulator = np.exp(-1.0 * np.abs(time_ - tscale) / np.sqrt(tvar))
modulator[time_ < tscale] = 1
elif temporal_modulation == "truncate":
modulator = time_.copy()
modulator[time_ > tscale] = 0
modulator[time_ <= tscale] = 1
else:
modulator = np.ones(random_modes.shape)
modes = np.multiply(modulator, modes)
# Rescale
self.state = (original_mode_norm / np.linalg.norm(modes)) * modes
self.basis = "modes"
def _parse_state(self, state, basis, **kwargs):
"""
Instantiate state and infer shape of collocation grid from numpy array and basis
Parameters
----------
state : np.ndarray
The current spatiotemporal Orbit state.
basis : str
String taking value 'field', 'spatial_modes', 'modes'. Determines how discretization is inferred from
state array. If no state provided then this value is overwritten and None is set as the value.
**kwargs :
Unused, included so signature matches Orbit class.
"""
if isinstance(state, np.ndarray):
if len(state.shape) != 2:
raise ValueError('"state" array must be two-dimensional')
self.state = state
else:
self.state = np.array([], dtype=float).reshape(0, 0)
if self.size > 0:
# This is essentially the inverse of .shapes() method
if basis is None:
raise ValueError("basis must be provided when state is provided")
elif basis == "modes":
# N-1, M-2
n, m = self.shape[0] + 1, self.shape[1] + 2
elif basis == "field":
# N, M
n, m = self.shape
elif basis == "spatial_modes":
# N, M - 2
n, m = self.shape[0], self.shape[1] + 2
else:
raise ValueError(
'basis not recognized; must equal "field" or "spatial_modes", or "modes"'
)
if n < self.minimal_shape()[0] or m < self.minimal_shape()[1]:
warn_str = "\nminimum discretization requirements not met; methods may not work as intended."
warnings.warn(warn_str, RuntimeWarning)
self.basis = basis
self.discretization = n, m
else:
self.basis = None
self.discretization = None
def _eqn_linear_component(self, array=False):
"""
Linear component of the KSe.
Parameters
----------
array : bool
Whether to return np.ndarray or not.
Returns
-------
OrbitKS or np.ndarray :
Evaluation of the governing equations.
Notes
-----
Equal to u_t + u_xx + u_xxxx
"""
return (
self.dt(array=array)
+ self.dx(order=2, array=array)
+ self.dx(order=4, array=array)
)
def _nonlinear(self, other, array=False):
"""
Computation of the nonlinear term of the Kuramoto-Sivashinsky equation
Parameters
----------
other : OrbitKS
The second component of the nonlinear product.
array : bool
Whether to return a numpy array or OrbitKS instance
Returns
-------
np.ndarray or OrbitKS
Array or OrbitKS instance containing the nonlinear term of the KSE = 1/2 (u**2)_x
Notes
-----
The nonlinear product is the name given to the elementwise product in the field basis equivalent to the
convolution of spatiotemporal Fourier modes, the defining quality of a pseudospectral implementation.
The matrix vector product takes the form d_x (u * v), but the "normal" usage is d_x (u * u); in the latter
case 'other' should equal 'self', in the field basis.
"""
# Elementwise product, both self and other should be in physical field basis.
assert (self.basis == "field") and (other.basis == "field")
return 0.5 * (self * other).transform(to="modes").dx(array=array)
def _rnonlinear(self, other, array=False, **kwargs):
"""
Computation of the nonlinear term of the adjoint Kuramoto-Sivashinsky equation
Parameters
----------
other : OrbitKS
The second component of the nonlinear product
array : bool
Whether to return np.ndarray or OrbitKS.
Returns
-------
np.ndarray or OrbitKS :
Array or OrbitKS instance with -u*v_x as its state.
Notes
-----
The matrix-vector product comprised of adjoint Jacobian evaluated at 'self'
multiplied with the spatiotemporal modes from another orbit instance (typically the DAE modes)
elementwise/vectorized operation takes the form -u * v_x. self==u, other==v.
"""
assert self.basis == "field"
if array:
return -1.0 * (self * other.dx(return_basis="field")).transform(
to="modes", array=True
)
else:
# inplace is fine here because dx() is making a new array
return -1.0 * (self * other.dx(return_basis="field")).transform(to="modes")
def _rmatvec_parameters(self, self_field, other):
"""
Parameter values from product with partial derivatives
Parameters
----------
self_field : OrbitKS
The orbit in the field basis; this cuts down on redundant transforms.
other : OrbitKS
The adjoint/co-state variable Orbit instance.
Returns
-------
parameters : tuple
Set of parameters resulting from the last rows of the product with adjoint Jacobian.
"""
other_modes_in_vector_form = other.state.ravel()
if not self.constraints["t"]:
# partial derivative with respect to period times the adjoint/co-state variable state.
rmatvec_t = (-1.0 / self.t) * self.dt(array=True).ravel().dot(
other_modes_in_vector_form
)
else:
rmatvec_t = 0
if not self.constraints["x"]:
# change in L, dx, equal to DF/DL * v
rmatvec_x = (
(
(-2.0 / self.x) * self.dx(order=2, array=True)
+ (-4.0 / self.x) * self.dx(order=4, array=True)
+ (-1.0 / self.x) * self_field._nonlinear(self_field, array=True)
)
.ravel()
.dot(other_modes_in_vector_form)
)
else:
rmatvec_x = 0
return rmatvec_t, rmatvec_x, 0.0
def _rmatvec_linear_component(self, array=False):
"""
Linear component of the adjoint Jacobian-vector product
Parameters
----------
array : bool
Whether or not to return ndarray
Returns
-------
OrbitKS :
Linear component of the adjoint KSE
"""
return (
-1.0 * self.dt(array=array)
+ self.dx(order=2, array=array)
+ self.dx(order=4, array=array)
)
def _jac_lin(self):
"""
The linear component of the Jacobian matrix.
Returns
-------
OrbitKS or ndarray :
Linear component of the Jacobian matrix.
"""
return self._dt_matrix() + self._dx_matrix(order=2) + self._dx_matrix(order=4)
def _jac_nonlin(self):
"""
The nonlinear component of the Jacobian matrix.
Returns
-------
_jac_nonlin : matrix
Matrix which represents the nonlinear component of the Jacobian. The derivative of
the nonlinear term, which is
(D/DU) 1/2 d_x (u .* u) = (D/DU) 1/2 d_x F (diag(F^-1 u)^2) = d_x F( diag(F^-1 u) F^-1).
See
Chu, K.T. A direct matrix method for computing analytical Jacobians of discretized nonlinear
integro-differential equations. J. Comp. Phys. 2009 for details.
"""
_jac_nonlin_left = self._dx_matrix().dot(self._time_transform_matrix())
_jac_nonlin_middle = self._space_transform_matrix().dot(
np.diag(self.transform(to="field").state.ravel())
)
_jac_nonlin_right = self._inv_spacetime_transform_matrix()
_jac_nonlin = _jac_nonlin_left.dot(_jac_nonlin_middle).dot(_jac_nonlin_right)
return _jac_nonlin
def _jacobian_parameter_derivatives_concat(self, jac_):
"""
Compute and concatenate parameter partial derivative vectors to Jacobian matrix
Parameters
----------
jac_ : np.ndarray,
Jacobian matrix containined partial derivatives with respect to spatiotemporal modes.
Returns
-------
jac_ :
Jacobian augmented with parameter partial derivatives as columns. Required to solve for changes to period,
space period in optimization process.
"""
# If period is not fixed, need to include dF/dt in jacobian matrix
if not self.constraints["t"]:
time_period_derivative = (-1.0 / self.t) * self.dt(array=True).reshape(
-1, 1
)
jac_ = np.concatenate((jac_, time_period_derivative), axis=1)
# If spatial period is not fixed, need to include dF/dx in jacobian matrix
if not self.constraints["x"]:
self_field = self.transform(to="field")
spatial_period_derivative = (
(-2.0 / self.x) * self.dx(order=2, array=True)
+ (-4.0 / self.x) * self.dx(order=4, array=True)
+ (-1.0 / self.x) * self_field._nonlinear(self_field, array=True)
)
jac_ = np.concatenate(
(jac_, spatial_period_derivative.reshape(-1, 1)), axis=1
)
return jac_
def _dx_matrix(self, order=1, computation_basis="modes"):
"""
The spatial derivative matrix operator for the current state.
Parameters
----------
order :int
The order of the derivative.
computation_basis : str
The basis in which to produce the operator. 'modes' or 'spatial_modes'.
Returns
----------
spacetime_dxn : ndarray
The operator whose matrix-vector product with spatiotemporal
Fourier modes is equal to the time derivative. *Only used in
the construction of the Jacobian operator.
"""
# Coefficients which depend on the order of the derivative, see SO(2) generator of rotations for reference.
if computation_basis == "spatial_modes":
# else use time discretization size.
spacetime_dxn = np.kron(np.eye(self.n), dxn_block(self.x, self.m, order))
else:
# When the dimensions of dxn_block are the same as the mode tensor, the slicing does nothing.
spacetime_dxn = np.kron(
np.eye(self.shapes()[2][0]),
dxn_block(self.x, self.m, order)[
: self.shapes()[2][1], : self.shapes()[2][1]
],
)
return spacetime_dxn
def _dt_matrix(self, order=1):
"""
The time derivative matrix operator for the current state.
Parameters
----------
order :int
The order of the derivative.
Returns
----------
spacetime_dtn : ndarray
The operator whose matrix-vector product with spatiotemporal
Fourier modes is equal to the time derivative. *Only used in
the construction of the Jacobian operator.
"""
# Coefficients which depend on the order of the derivative, see SO(2) generator of rotations for reference.
# Zeroth frequency was not included in frequency vector.
dtn_matrix = block_diag([[0]], dtn_block(self.t, self.n, order))
# Take kronecker product to account for the number of spatial modes.
spacetime_dtn = np.kron(dtn_matrix, np.eye(self.shapes()[2][1]))
return spacetime_dtn
def _inv_spacetime_transform_matrix(self):
"""
Inverse Space-time Fourier transform operator
Returns
-------
ndarray :
Matrix operator whose action maps a set of spatiotemporal modes into a physical field u(x,t)
Notes
-----
Only used for the construction of the Jacobian matrix. Do not use this for the Fourier transform.
"""
return np.dot(
self._inv_space_transform_matrix(), self._inv_time_transform_matrix()
)
def _spacetime_transform_matrix(self):
"""
Space-time Fourier transform operator
Returns
-------
np.ndarray :
Matrix operator whose action maps a physical field u(x,t) into a set of spatiotemporal modes.
Notes
-----
Only used for the construction of the Jacobian matrix. Do not use this for the Fourier transform.
"""
return np.dot(self._time_transform_matrix(), self._space_transform_matrix())
def _time_transform(self, array=False, inplace=False):
"""
Temporal Fourier transform
Parameters
----------
array : bool
Whether or not to return a numpy array. Used for efficiency/avoiding construction of redundant
Orbit instances.
Returns
-------
OrbitKS :
OrbitKS whose state is in the spatial Fourier mode basis.
"""
if inplace:
# Take rfft, accounting for unitary normalization.
self.state = rfft(
self.state, workers=self.workers, norm="ortho", axis=0, overwrite_x=True
)
self.state = np.concatenate(
(self.state.real[:-1, :], self.state.imag[1:-1, :]), axis=0
)
self.state[1:, :] = np.sqrt(2) * self.state[1:, :]
self.basis = "modes"
if array:
return self.state
else:
return self
else:
# Take rfft, accounting for unitary normalization.
modes = rfft(self.state, workers=self.workers, norm="ortho", axis=0)
modes = np.concatenate((modes.real[:-1, :], modes.imag[1:-1, :]), axis=0)
modes[1:, :] = np.sqrt(2) * modes[1:, :]
if array:
return modes
else:
return self.__class__(
**{**vars(self), "state": modes, "basis": "modes"}
)
def _inv_time_transform(self, array=False, inplace=False):
"""
Temporal Fourier transform
Parameters
----------
array : bool
Whether or not to return a numpy array. Used for efficiency/avoiding construction of redundant
Orbit instances.
Returns
-------
OrbitKS or ndarray
OrbitKS whose state is in the temporal Fourier mode basis or the corresponding array.
"""
if inplace:
self.state = self.state.astype(complex)
self.state[1 : -max([int(self.n // 2) - 1, 1]), :] += (
1j * self.state[-max([int(self.n // 2) - 1, 1]) :, :]
)
# if first statement evaluates to zero, then take all rows of
# axis by providing None as end of slice.
n_rows = (-max([int(self.n // 2) - 1, 1]) + 1) or None
self.state = self.state[:n_rows, :]
self.state[1:, :] /= np.sqrt(2)
if self.state.shape[0] == 1:
self.state = np.pad(self.state, ((0, 1), (0, 0)))
self.state[-1, :] = 0
self.state = irfft(
self.state, workers=self.workers, norm="ortho", axis=0, overwrite_x=True
)
self.basis = "spatial_modes"
if array:
return self.state
else:
return self
else:
# Take rfft, accounting for unitary normalization.
modes = self.state.astype(complex)
modes[1 : -max([int(self.n // 2) - 1, 1]), :] += (
1j * modes[-max([int(self.n // 2) - 1, 1]) :, :]
)
# if first statement evaluates to zero, then take all rows of
# axis by providing None as end of slice.
n_rows = (-max([int(self.n // 2) - 1, 1]) + 1) or None
modes = modes[:n_rows, :]
modes[1:, :] /= np.sqrt(2)
if modes.shape[0] == 1:
modes = np.pad(modes, ((0, 1), (0, 0)))
modes[-1, :] = 0
space_modes = irfft(modes, workers=self.workers, norm="ortho", axis=0)
if array:
return space_modes
else:
return self.__class__(
**{**vars(self), "state": space_modes, "basis": "spatial_modes"}
)
def _space_transform(self, array=False, inplace=False):
"""
Spatial Fourier transform
Parameters
----------
array : bool
Whether to return numpy array or not.
Returns
-------
OrbitKS or np.ndarray:
OrbitKS instance in the physical field basis or corresponding array.
"""
if inplace:
# Take rfft, accounting for unitary normalization.
self.state = (
np.sqrt(2)
* rfft(
self.state,
workers=self.workers,
norm="ortho",
axis=1,
overwrite_x=True,
)[:, 1:-1]
)
self.state = np.concatenate((self.state.real, self.state.imag), axis=1)
self.basis = "spatial_modes"
if array:
return self.state
else:
return self
else:
# Take rfft, accounting for unitary normalization.
spatial_modes = (
np.sqrt(2)
* rfft(self.state, workers=self.workers, norm="ortho", axis=1)[:, 1:-1]
)
spatial_modes = np.concatenate(
(spatial_modes.real, spatial_modes.imag), axis=1
)
if array:
return spatial_modes
else:
return self.__class__(
**{**vars(self), "state": spatial_modes, "basis": "spatial_modes"}
)
def _inv_space_transform(self, array=False, inplace=False):
"""
Inverse spatial Fourier transform
Parameters
----------
array : bool
Whether to return numpy array or not.
Returns
-------
OrbitKS or np.ndarray:
OrbitKS instance in the physical field basis or corresponding array.
"""
if inplace:
# Do the transform inplace; do not need to change other attributes other than basis; discretization
# attribute is the shape in 'field' basis NOT current basis so it remains constant.
self.state = self.state.astype(complex)
self.state = (
self.state[:, : -(int(self.m // 2) - 1)]
+ 1j * self.state[:, -(int(self.m // 2) - 1) :]
)
# Re-add the zeroth and Nyquist spatial frequency modes (zeros) and then transform back
z = np.zeros([self.n, 1])
self.state = (1.0 / np.sqrt(2)) * irfft(
np.concatenate((z, self.state, z), axis=1),
workers=self.workers,
norm="ortho",
axis=1,
overwrite_x=True,
)
self.basis = "field"
if array:
return self.state
else:
return self
else:
# Make the modes complex valued again.
complex_modes = (
self.state[:, : -(int(self.m // 2) - 1)]
+ 1j * self.state[:, -(int(self.m // 2) - 1) :]
)
# Re-add the zeroth and Nyquist spatial frequency modes (zeros) and then transform back
z = np.zeros([self.n, 1])
field = (1.0 / np.sqrt(2)) * irfft(
np.concatenate((z, complex_modes, z), axis=1),
workers=self.workers,
norm="ortho",
axis=1,
)
if array:
return field
else:
return self.__class__(
**{**vars(self), "state": field, "basis": "field"}
)
def _space_transform_matrix(self):
"""
Spatial Fourier transform operator
Returns
-------
matrix :
Matrix operator whose action maps a physical field u(x,t) into a set of spatial Fourier modes.
Notes
-----
Only used for the construction of the Jacobian matrix. Do not use this for the Fourier transform.
"""
dft_mat = rfft(
np.eye(self.m), workers=self.workers, norm="ortho", axis=0, overwrite_x=True
)[1:-1, :]
space_dft_mat = np.sqrt(2) * np.concatenate(
(dft_mat.real, dft_mat.imag), axis=0
)
return np.kron(np.eye(self.n), space_dft_mat)
def _time_transform_matrix(self):
"""
Inverse Time Fourier transform operator
Returns
-------
matrix :
Matrix operator whose action maps a set of spatiotemporal modes into a set of spatial modes
Notes
-----
Only used for the construction of the Jacobian matrix. Do not use this for the Fourier transform.
"""
dft_mat = rfft(
np.eye(self.n), workers=self.workers, norm="ortho", axis=0, overwrite_x=True
)
time_dft_mat = np.concatenate(
(dft_mat[:-1, :].real, dft_mat[1:-1, :].imag), axis=0
)
time_dft_mat[1:, :] = np.sqrt(2) * time_dft_mat[1:, :]
return np.kron(time_dft_mat, np.eye(self.m - 2))
def _inv_time_transform_matrix(self):
"""
Time Fourier transform operator
Returns
-------
matrix :
Matrix operator whose action maps a set of spatial modes into a set of spatiotemporal modes.
Notes
-----
Only used for the construction of the Jacobian matrix. Do not use this for the Fourier transform.
"""
return self._time_transform_matrix().transpose()
def _inv_space_transform_matrix(self):
"""
Time Fourier transform operator
Returns
-------
matrix :
Matrix operator whose action maps a set of spatial modes into field basis.
Notes
-----
Only used for the construction of the Jacobian matrix. Do not use this for the Fourier transform.
"""
return self._space_transform_matrix().transpose()
def _inv_spacetime_transform(self, array=False, inplace=False):
"""
Inverse space-time Fourier transform
Parameters
----------
array : bool
Whether to return numpy array or not.
Returns
-------
OrbitKS or np.ndarray:
OrbitKS instance in the physical field basis or corresponding array.
"""
if array:
return (
self._inv_time_transform(inplace=inplace)
._inv_space_transform(inplace=inplace)
.state
)
else:
return self._inv_time_transform(inplace=inplace)._inv_space_transform(
inplace=inplace
)
def _spacetime_transform(self, array=False, inplace=False):
"""
Space-time Fourier transform
Parameters
----------
array : bool
Whether to return numpy array or not.
Returns
-------
OrbitKS or np.ndarray:
OrbitKS instance in the physical field basis or corresponding array.
"""
if array:
return (
self._space_transform(inplace=inplace)
._time_transform(inplace=inplace)
.state
)
else:
# Return transform of field
return self._space_transform(inplace=inplace)._time_transform(
inplace=inplace
)
[docs]
class RelativeOrbitKS(OrbitKS):
[docs]
def __init__(
self,
state=None,
basis=None,
parameters=None,
discretization=None,
constraints=None,
frame="comoving",
**kwargs,
):
"""
Same as OrbitKS except for setting the 'frame' attribute.
"""
self.frame = frame
super().__init__(
state=state,
basis=basis,
parameters=parameters,
discretization=discretization,
constraints=constraints,
**kwargs,
)
[docs]
def periodic_dimensions(self):
"""
Bools indicating whether or not dimension is periodic for persistent homology calculations.
Returns
-------
Notes
-----
Static for base class, however for relative periodic solutions this can be dependent on the frame/slice the
state is in.
"""
if self.frame == "comoving":
return True, True
else:
return False, True
[docs]
def dt(self, order=1, array=False, **kwargs):
"""
A time derivative of the current state.
Parameters
----------
order :int
The order of the derivative.
array : bool
Whether to return np.ndarray or Orbit instance
Returns
----------
orbit_dtn : OrbitKS or subclass instance
The class instance whose state is the time derivative in the spatiotemporal mode basis.
"""
if self.frame == "comoving":
return super().dt(order, array=array, **kwargs)
else:
raise ValueError(
"Attempting to compute time derivative of "
+ str(self)
+ " in physical reference frame."
)
[docs]
def matvec(self, other, **kwargs):
"""
Extension of parent class method
Parameters
----------
other : RelativeOrbitKS
RelativeOrbitKS instance whose state represents the vector in the matrix-vector multiplication.
Returns
-------
RelativeOrbitKS
RelativeOrbitKS whose state and other parameters result from the matrix-vector product.
Notes
-----
Equivalent to computation of (v_t + v_xx + v_xxxx + phi * v_x) + d_x (u .* v)
The additional term phi * v_x is in the linear portion of the equation, meaning that
we can calculate it and add it to the rest of the mapping a posteriori.
"""
assert (self.basis == "modes") and (other.basis == "modes")
matvec_orbit = super().matvec(other)
# this is needed unless all parameters are fixed, but that isn't ever a realistic choice.
self_dx = self.dx(array=True)
if not self.constraints["t"]:
matvec_orbit += other.t * (-1.0 / self.t) * (-self.s / self.t) * self_dx
if not self.constraints["x"]:
# Derivative of mapping with respect to T is the same as -1/T * u_t
matvec_orbit += other.x * (-1.0 / self.x) * (-self.s / self.t) * self_dx
if not self.constraints["s"]:
# technically could do self_comoving / self.s but this can be numerically unstable when self.s is small
matvec_orbit += other.s * (-1.0 / self.t) * self_dx
return matvec_orbit
[docs]
def jacobian(self, **kwargs):
"""
Jacobian matrix evaluated at the current state.
kwargs :
Unused, included to match signature in Orbit class.
Returns
-------
jac_ : 2-d ndarray
Jacobian matrix whose columns are derivatives with respect to all unconstrained state variables;
including periods. Has dimensions dependent on number of spatiotemporal modes and free parameters,
(self.shapes()[-1].size, self.shapes()[-1].size + n_params)
Jacobian matrix of the KSe where n_params = 2 - sum(self.constraints)
Notes
-----
Original implementation was pretty, but very inefficient. This now computes the Jacobian matrix,
minimizing the amount of allocated memory by overwriting and performing matrix-free implementations of
Fourier transform matrix operations. Computes the Jacobian
$J = D_t + D_xx + D_xxxx + F_t D_x F_x Diag(u) F_x^{-1} F_t^{-1}$ in the following steps:
"""
assert self.basis == "modes"
field_size, smode_size, mode_size = (np.prod(shp) for shp in self.shapes())
# Begin with nonlinear term. Apply matrix operators in matrix-free fashion. begin with diag(u)
J = np.diag(self.transform(to="field", array=True).ravel()).reshape(-1, self.m)
# By creatively reshaping J, can apply FFTs to 3-d tensor.
J = self.__class__(
**{**vars(self), "state": J, "basis": "field", "discretization": J.shape}
).transform(to="spatial_modes", array=True, inplace=True)
J = J.reshape(field_size, smode_size).T.reshape(-1, self.m)
# After transforming the columns, transpose and transform again to get F_x Diag(u) F_x^{-1}
J = self.__class__(
**{**vars(self), "state": J, "basis": "field", "discretization": J.shape}
)
# Reshape back into a matrix, and then into another 3-d tensor after transposing back, to take derivative.
J = J.transform(to="spatial_modes", array=True, inplace=True).reshape(
smode_size, smode_size
)
J = J.T.reshape(-1, self.m - 2)
J = self.__class__(
**{
**vars(self),
"state": J,
"basis": "spatial_modes",
"discretization": (J.shape[0], self.m),
}
)
J = J.dx(array=True, computation_basis="spatial_modes")
# At this point J represents D_x F_x Diag(u) F_x^{-1}; reshape into 3-d tensor again and apply
# time transforms
J = J.reshape(-1, smode_size).T.reshape(self.n, self.m - 2, -1)
J = self.__class__(
**{
**vars(self),
"state": J,
"discretization": (*self.discretization, J.shape[-1]),
"basis": "spatial_modes",
}
).transform(to="modes", array=True, inplace=True)
# reshape, transpose and transform.
J = (
J.reshape((*self.shapes()[2], -1))
.reshape(-1, smode_size)
.T.reshape(self.n, self.m - 2, -1)
)
J = self.__class__(
**{
**vars(self),
"state": J,
"discretization": (*self.discretization, J.shape[-1]),
"basis": "spatial_modes",
}
).transform(to="modes", array=True, inplace=True)
J = J.reshape((*self.shapes()[2], -1)).reshape(J.shape[-1], J.shape[-1]).T
# Produce the linear term; spatial derivatives are diagonal; time is more complicated due to SO(2) operator.
e = np.ones(self.shapes()[2])
dx2 = self.__class__(state=e, basis="modes", parameters=self.parameters).dx(
order=2
)
J[np.diag_indices(J.shape[0])] += dx2.state.ravel() + dx2.state.ravel() ** 2
e = e.ravel()
# For time, get the correct frequency by taking the derivative of individual elements; swapping of real
# and imaginary components handled by .dt()
for i in range(J.shape[0]):
e *= 0
e[i] = 1.0
J[:, i] += (
self.__class__(
state=e.reshape(self.shape),
basis="modes",
parameters=self.parameters,
)
.dt()
.state.ravel()
)
J[:, i] += (-self.s / self.t) * self.__class__(
state=e.reshape(self.shape), basis="modes", parameters=self.parameters
).dx().state.ravel()
J = self._jacobian_parameter_derivatives_concat(J)
return J
[docs]
def change_reference_frame(self, frame):
"""
Transform to (or from) the co-moving frame depending on the current reference frame
Parameters
----------
Returns
-------
RelativeOrbitKS :
RelativeOrbitKS in transformed reference frame.
Notes
-----
This operation occurs in spatial Fourier mode basis because I feel its more straightforward to understand
a time parameterized shift of spatial modes; this is the consistent approach given how the shifts are
calculated.
Spatiotemporal modes are not designed to be used in the physical reference frame due to Gibbs' phenomenon
due to discontinuity. Physical reference frame should really only be used to plot the relative periodic field.
"""
# shift is ALWAYS stored as the shift amount from comoving to physical frame.
if frame == "comoving":
if self.frame == "physical":
shift = -1.0 * self.s
else:
return self
elif frame == "physical":
if self.frame == "comoving":
shift = self.s
else:
return self
else:
raise ValueError("Trying to change to unrecognizable reference frame.")
spatial_modes = self.transform(to="spatial_modes").state
time_vector = np.flipud(
np.linspace(0, self.t, num=self.n, endpoint=True)
).reshape(-1, 1)
translation_per_period = shift / self.t
time_dependent_translations = translation_per_period * time_vector
thetak = (
time_dependent_translations
* spatial_frequencies(self.x, self.m, 1)[:, : -int(self.m // 2 - 1)]
)
cosinek = np.cos(thetak)
sinek = np.sin(thetak)
real_modes = spatial_modes[:, : -(int(self.m // 2) - 1)]
imag_modes = spatial_modes[:, -(int(self.m // 2) - 1) :]
frame_rotated_spatial_modes_real = cosinek * real_modes + sinek * imag_modes
frame_rotated_spatial_modes_imag = -sinek * real_modes + cosinek * imag_modes
frame_rotated_spatial_modes = np.concatenate(
(frame_rotated_spatial_modes_real, frame_rotated_spatial_modes_imag), axis=1
)
rotated_orbit = self.__class__(
**{
**vars(self),
"state": frame_rotated_spatial_modes,
"basis": "spatial_modes",
"frame": frame,
}
)
return rotated_orbit.transform(to=self.basis)
# def orbit_vector(self):
# """
# Vector which completely describes the orbit.
#
# """
# return np.concatenate(
# (
# self.state.reshape(-1, 1),
# np.array([[float(self.t)]]),
# np.array([[float(self.x)]]),
# np.array([[float(self.s)]]),
# ),
# axis=0,
# )
[docs]
def populate(self, attr="all", **kwargs):
"""
Randomly initialize parameters which are currently zero.
Parameters
----------
attr : str
Takes values 'all', 'parameters', or 'state'. Determines what is populated.
kwargs :74
p_ranges : dict
keys are parameter_labels, values are uniform sampling intervals or iterables to sample from
Returns
-------
Orbit :
self with new values for parameters or state or both.
"""
# Can only initialize spatial shift if both the shift and the parameters have been instantiated.
if attr in ["all", "parameters"] and self.size > 0 and self.s == 0.0:
shift = self.calculate_spatial_shift(**kwargs)
else:
shift = 0
# This will first replace any None valued parameters (or if parameters itself is None)
super().populate(attr=attr, **kwargs)
# For chaining operations.
parameters_with_shift = tuple(
shift if label == "s" else val
for label, val in zip(self.parameter_labels(), self.parameters)
)
setattr(self, "parameters", parameters_with_shift)
return self
[docs]
def calculate_spatial_shift(self, **kwargs):
"""
Calculate the phase difference between the spatial modes at t=0 and t=T
Parameters
----------
kwargs :
n_modes : int
Number of spatial modes to use in the phase calculation.
Returns
-------
shift : float
The best approximation for physical->comoving shift for relative periodic solutions.
"""
spatial_modes = self.transform(to="spatial_modes").state
m0 = spatial_modes.shape[1] // 2
modes_included = np.min([kwargs.get("n_modes", m0), m0])
if -m0 + modes_included == 0:
space_imag_slice_end = None
else:
space_imag_slice_end = -m0 + modes_included
# select the spatial modes at t=0 and t=T
modes_0 = np.concatenate(
(
spatial_modes[-1, :modes_included],
spatial_modes[-1, -m0:space_imag_slice_end],
)
).ravel()
modes_T = np.concatenate(
(
spatial_modes[0, :modes_included],
spatial_modes[0, -m0:space_imag_slice_end],
)
).ravel()
m = modes_T.size // 2
# This function is used very sparingly, extra imports kept in this scope only.
# Warnings come from fsolve not converging; only want approximate guess as exact solution won't generally exist
from scipy.optimize import fsolve
# If they are close enough to the same point, then shift equals 0
if np.linalg.norm(modes_0 - modes_T) <= 10**-6:
shift = self.x / spatial_modes.shape[1]
else:
# Get guess shift from the angle between the vectors
shift_guess = (self.x / (2 * pi)) * float(
np.arccos(
(
np.dot(np.transpose(modes_T), modes_0)
/ (np.linalg.norm(modes_T) * np.linalg.norm(modes_0))
)
)
)
def fun_(shift_):
# find shift which minimizes the differences at the boundaries.
thetak = shift_ * ((2 * pi) / self.x) * np.arange(1, m + 1)
cosinek = np.cos(thetak)
sinek = np.sin(thetak)
rotated_real_modes_T = np.multiply(cosinek, modes_T[:-m]) + np.multiply(
sinek, modes_T[-m:]
)
rotated_imag_modes_T = np.multiply(-sinek, modes_T[:-m]) + np.multiply(
cosinek, modes_T[-m:]
)
rotated_modes = np.concatenate(
(rotated_real_modes_T, rotated_imag_modes_T)
)
return np.linalg.norm(modes_0 - rotated_modes)
# suppress fsolve's warnings that occur when it stalls; not expecting an exact answer anyway.
warnings.simplefilter(action="ignore", category=RuntimeWarning)
shift = fsolve(fun_, np.array(shift_guess))[0]
warnings.resetwarnings()
# because periodic boundary conditions take modulo; "overstretching" doesn't occur from physical limits.
shift = np.sign(shift) * np.mod(np.abs(shift), self.x)
if self.frame == "comoving":
return -shift
else:
return shift
[docs]
def preprocess(self):
"""
Check whether the orbit converged to an equilibrium or close-to-zero solution
"""
orbit_with_inverted_shift = self.__class__(
**{**vars(self), "parameters": (self.t, self.x, -self.s)}
)
cost_imported_S = self.cost()
cost_negated_S = orbit_with_inverted_shift.cost()
if cost_imported_S > cost_negated_S:
orbit_ = orbit_with_inverted_shift
else:
orbit_ = self.copy()
# Take the L_2 norm of the field, if uniformly close to zero, the magnitude will be very small.
field_orbit = orbit_.transform(to="field")
# See if the L_2 norm is beneath a threshold value, if so, replace with zeros.
if field_orbit.norm() < 10**-5 or self.t == 0:
return RelativeEquilibriumOrbitKS(
state=np.zeros(self.discretization),
basis="field",
parameters=self.parameters,
).transform(to=self.basis)
# Equilibrium is defined by having no temporal variation, i.e. time derivative is a uniformly zero.
elif self.t == 0.0:
# If there is sufficient evidence that solution is an equilibrium, change its class
# store T just in case we want to refer to what the period was before conversion to EquilibriumOrbitKS
return EquilibriumOrbitKS(
state=field_orbit.state, basis="field", parameters=self.parameters
).transform(to=self.basis)
elif field_orbit.dt().transform(to="field").norm() < 10**-5:
# If there is sufficient evidence that solution is an equilibrium, change its class
return RelativeEquilibriumOrbitKS(
state=self.transform(to="modes").state,
basis="modes",
parameters=self.parameters,
).transform(to=self.basis)
else:
return orbit_
[docs]
def from_fundamental_domain(self):
return self.change_reference_frame(frame="comoving")
[docs]
def to_fundamental_domain(self):
return self.change_reference_frame(frame="physical")
[docs]
@classmethod
def _default_parameter_ranges(cls):
"""
Default parameter ranges.
Returns
-------
dict :
Keys are parameter labels, values are intervals to sample from in parameter generation.
Notes
-----
The shift parameter is included so that it can be iterated over; only s=0 allowed because this is
determined by state, in the _populate_parameters method.
"""
return {"t": (20, 200), "x": (20, 100), "s": (0, 0)}
[docs]
@classmethod
def _default_constraints(cls):
return {"t": False, "x": False, "s": False}
def _eqn_linear_component(self, array=False):
"""
Linear component of the KSE
Parameters
----------
array : bool
Whether to return ndarray or not.
Returns
-------
ndarray or class instance.
"""
return (
self.dt(array=array)
+ self.dx(order=2, array=array)
+ self.dx(order=4, array=array)
- (self.s / self.t) * self.dx(array=array)
)
def _rmatvec_parameters(self, self_field, other):
other_modes = other.state.ravel()
self_dx_modes = self.dx(array=True)
if not self.constraints["t"]:
# Derivative with respect to T term equal to DF/DT * v
rmatvec_t = (-1.0 / self.t) * (
self.dt(array=True) + (-self.s / self.t) * self_dx_modes
).ravel().dot(other_modes)
else:
rmatvec_t = 0
if not self.constraints["x"]:
# change in L, dx, equal to DF/DL * v
rmatvec_x = (
(
(-2.0 / self.x) * self.dx(order=2, array=True)
+ (-4.0 / self.x) * self.dx(order=4, array=True)
+ (-1.0 / self.x)
* (
self_field._nonlinear(self_field, array=True)
+ (-self.s / self.t) * self_dx_modes
)
)
.ravel()
.dot(other_modes)
)
else:
rmatvec_x = 0
if not self.constraints["s"]:
rmatvec_s = (-1.0 / self.t) * self_dx_modes.ravel().dot(other_modes)
else:
rmatvec_s = 0.0
return rmatvec_t, rmatvec_x, rmatvec_s
def _rmatvec_linear_component(self, array=False):
"""
Linear component of the adjoint Jacobian-vector product
Parameters
----------
array : bool
Whether or not to return ndarray
Returns
-------
Orbit or ndarray.
"""
return (
-1.0 * self.dt(array=array)
+ self.dx(order=2, array=array)
+ self.dx(order=4, array=array)
+ (self.s / self.t) * self.dx(array=array)
)
[docs]
def _pad(self, size, axis=0, **kwargs):
"""
Checks if in comoving frame then pads. See OrbitKS for more details
"""
assert (
self.frame == "comoving"
), "Mode padding requires comoving frame; set padding=False if plotting"
return super()._pad(size, axis=axis)
[docs]
def _truncate(self, size, axis=0):
"""
Checks if in comoving frame then truncates. See OrbitKS for more details
"""
assert (
self.frame == "comoving"
), "Mode truncation requires comoving frame; set padding=False if plotting"
return super()._truncate(size, axis=axis)
def _jacobian_parameter_derivatives_concat(self, jac_):
"""
Concatenate parameter partial derivatives to Jacobian matrix
Parameters
----------
jac_ : np.ndArray,
(N-1) * (M-2) dimensional array resultant from taking the derivative of the spatioatemporal mapping
with respect to Fourier modes.
Returns
-------
Jacobian augmented with parameter partial derivatives as columns. Required to solve for changes to period,
space period in optimization process. Makes the system rectangular; needs to be solved by least squares type
methods.
"""
# If period is not fixed, need to include dF/dt in jacobian matrix
if not self.constraints["t"]:
time_period_derivative = (-1.0 / self.t) * (
self.dt(array=True) + (-self.s / self.t) * self.dx(array=True)
).reshape(-1, 1)
jac_ = np.concatenate((jac_, time_period_derivative), axis=1)
# If spatial period is not fixed, need to include dF/dx in jacobian matrix
if not self.constraints["x"]:
self_field = self.transform(to="field")
spatial_period_derivative = (
(-2.0 / self.x) * self.dx(order=2, array=True)
+ (-4.0 / self.x) * self.dx(order=4, array=True)
+ (-1.0 / self.x) * self_field._nonlinear(self_field, array=True)
)
jac_ = np.concatenate(
(jac_, spatial_period_derivative.reshape(-1, 1)), axis=1
)
if not self.constraints["s"]:
spatial_shift_derivatives = (-1.0 / self.t) * self.dx(array=True)
jac_ = np.concatenate(
(jac_, spatial_shift_derivatives.reshape(-1, 1)), axis=1
)
return jac_
def _jac_lin(self):
"""
Extension of the OrbitKS method that includes the term for spatial translation symmetry.
"""
return super()._jac_lin() + (-self.s / self.t) * self._dx_matrix()
[docs]
class AntisymmetricOrbitKS(OrbitKS):
[docs]
def dx(self, **kwargs):
"""
Spatial derivative of the current state.
Parameters
----------
kwargs :
order :int
The order of the derivative.
array : bool
Whether or not to return a numpy array. Used for efficiency/avoiding construction of redundant
Orbit instances.
computation_basis : str
The basis in which to compute the derivative.
return_basis : str
Which basis to return the ShiftReflectionOrbitKS in, if array=False.
Returns
----------
ShiftReflectionOrbitKS :
Class instance whose spatiotemporal state represents the spatial derivative in the
the basis of the original state.
Notes
-----
See OrbitKS.dx() for more details
"""
order = kwargs.get("order", 1)
# can compute spatial derivative in spatial mode or spatiotemporal mode basis. spatial_modes basis is required
# for orbits with discrete symmetry as they are orthogonal to the dx() direction.
if order % 2:
# odd ordered derivatives NEED to be in spatial mode basis
return super().dx(**{**kwargs, "computation_basis": "spatial_modes"})
else:
# even ordered derivatives CAN be in the spatial mode basis or the spatiotemporal mode basis.
return super().dx(**kwargs)
[docs]
def from_fundamental_domain(self, half=0):
"""
Overwrite of parent method
"""
field = self.transform(to="field")
if half == 0.0:
return field.concat(field.reflection(), axis=1).transform(to=self.basis)
else:
return field.reflection().concat(field, axis=1).transform(to=self.basis)
[docs]
def to_fundamental_domain(self, half=0, **kwargs):
"""
Overwrite of parent method
"""
if half == 0:
return self.transform(to="field")[:, : -int(self.m // 2)].transform(
to=self.basis
)
else:
return self.transform(to="field")[:, -int(self.m // 2) :].transform(
to=self.basis
)
[docs]
def shapes(self):
"""
State array shapes in different bases.
"""
return (
(self.n, self.m),
(self.n, self.m - 2),
(max([self.n - 1, 1]), (int(self.m // 2) - 1)),
)
[docs]
def selection_rules(self):
# Apply the pattern to (int(self.m//2) - 1) modes
reflection_selection_rules_integer_flags = np.repeat(
(np.arange(0, 2 * (self.n - 1)) % 2).ravel(), (int(self.m // 2) - 1)
)
# These indices are used for the transform as well as the transform matrices; therefore they are returned
# in a format compatible with both; applying .nonzero() yields indices., resize(self.shapes()[2]) yields
# tensor format.
return reflection_selection_rules_integer_flags
[docs]
@staticmethod
def _default_shape():
"""
The shape of a generic state, see core.py for details
"""
return 1, 32
def _nonlinear(self, other, array=False):
"""
nonlinear computation of the nonlinear term of the Kuramoto-Sivashinsky equation
"""
# Elementwise product, both self and other should be in physical field basis.
assert (self.basis == "field") and (other.basis == "field")
# to get around the special behavior of discrete symmetries, will return spatial modes without this workaround.
nl_orbit = 0.5 * (self * other).transform(to="spatial_modes").dx(
computation_basis="spatial_modes", return_basis="modes"
)
if array:
return nl_orbit.state
else:
return nl_orbit
[docs]
@classmethod
def _default_parameter_ranges(cls):
"""
Default parameter ranges.
Returns
-------
dict :
Keys are parameter labels, values are intervals to sample from in parameter generation.
Notes
-----
The shift parameter is included so that it can be iterated over; only s=0 allowed because this is
determined by state, in the _populate_parameters method.
L=38.5 based on Yueheng Lan and Cvitanovic investigations
"""
return {"t": (20.0, 200.0), "x": (38.5, 100.0)}
[docs]
def _pad(self, size, axis=0, **kwargs):
"""
Overwrite of parent method
"""
modes = self.transform(to="modes")
if np.mod(size, 2):
raise ValueError(
"New discretization size must be an even number, preferably a power of 2"
)
else:
if axis == 0:
# Split into real and imaginary components, pad separately.
padding = (size - modes.n) // 2
padding_tuple = ((padding, padding), (0, 0))
padded_modes = np.concatenate(
(
modes.state[: -(modes.n // 2 - 1), :],
np.pad(modes.state[-(modes.n // 2 - 1) :, :], padding_tuple),
),
axis=0,
)
padded_modes *= np.sqrt(size / modes.n)
return self.__class__(
**{
**vars(self),
"state": padded_modes,
"basis": "modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
padding_number = (size - modes.m) // 2
padded_modes = np.sqrt(size / modes.m) * np.pad(
modes.state, ((0, 0), (0, padding_number))
)
return self.__class__(
**{
**vars(self),
"state": padded_modes,
"basis": "modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
[docs]
def _truncate(self, size, axis=0):
"""
Overwrite of parent method
"""
modes = self.transform(to="modes")
if np.mod(size, 2):
raise ValueError(
"New discretization size must be an even number, preferably a power of 2"
)
else:
if axis == 0:
truncate_number = int(size // 2) - 1
first_half = modes.state[: truncate_number + 1, :]
second_half = modes.state[
-(modes.n // 2 - 1) : -(modes.n // 2 - 1) + truncate_number, :
]
truncated_modes = np.sqrt(size / modes.n) * np.concatenate(
(first_half, second_half), axis=0
)
return self.__class__(
**{
**vars(self),
"state": truncated_modes,
"basis": "modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
truncate_number = int(size // 2) - 1
truncated_modes = (
np.sqrt(size / modes.m) * modes.state[:, :truncate_number]
)
return self.__class__(
**{
**vars(self),
"state": truncated_modes,
"basis": "modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
def _parse_state(self, state, basis, **kwargs):
if isinstance(state, np.ndarray):
if len(state.shape) != 2:
raise ValueError('"state" array must be two-dimensional')
self.state = state
else:
self.state = np.array([], dtype=float).reshape(0, 0)
if self.size > 0:
if basis is None:
raise ValueError("basis must be provided when state is provided")
elif basis == "modes":
n, m = self.shape[0] + 1, 2 * self.shape[1] + 2
elif basis == "field":
n, m = self.shape
elif basis == "spatial_modes":
n, m = self.shape[0], self.shape[1] + 2
else:
raise ValueError(
'basis not recognized; must equal "field" or "spatial_modes", or "modes"'
)
if n < self.minimal_shape()[0] or m < self.minimal_shape()[1]:
warn_str = "\nminimum discretization requirements not met; methods may not work as intended."
warnings.warn(warn_str, RuntimeWarning)
self.basis = basis
self.discretization = n, m
else:
self.discretization = None
self.basis = None
def _time_transform_matrix(self):
"""
Notes
-----
Dramatic simplification over old code; now just the full DFT matrix plus projection
"""
return super()._time_transform_matrix()[self.selection_rules().nonzero()[0], :]
def _jac_nonlin(self):
"""
The nonlinear component of the Jacobian matrix of the Kuramoto-Sivashinsky equation
Returns
-------
nonlinear_dx : matrix
Matrix which represents the nonlinear component of the Jacobian.
Notes
-----
See OrbitKS for more details.
"""
_jac_nonlin_left = self._time_transform_matrix().dot(
self._dx_matrix(computation_basis="spatial_modes")
)
_jac_nonlin_middle = self._space_transform_matrix().dot(
np.diag(self.transform(to="field").state.ravel())
)
_jac_nonlin_right = self._inv_spacetime_transform_matrix()
_jac_nonlin = _jac_nonlin_left.dot(_jac_nonlin_middle).dot(_jac_nonlin_right)
return _jac_nonlin
def _time_transform(self, array=False, inplace=False):
"""
Spatial Fourier transform
Parameters
----------
Returns
-------
OrbitKS :
OrbitKS whose state is in the spatial Fourier mode basis.
"""
if inplace:
# Take rfft, accounting for unitary normalization.
self.state = rfft(
self.state, workers=self.workers, norm="ortho", axis=0, overwrite_x=True
)
self.state = np.concatenate(
(
self.state.real[:-1, -(int(self.m // 2) - 1) :],
self.state.imag[1:-1, -(int(self.m // 2) - 1) :],
),
axis=0,
)
self.state[1:, :] = np.sqrt(2) * self.state[1:, :]
self.basis = "modes"
if array:
return self.state
else:
return self
else:
# Take rfft, accounting for unitary normalization.
modes = rfft(self.state, workers=self.workers, norm="ortho", axis=0)
modes = np.concatenate(
(
modes.real[:-1, -(int(self.m // 2) - 1) :],
modes.imag[1:-1, -(int(self.m // 2) - 1) :],
),
axis=0,
)
modes[1:, :] = np.sqrt(2) * modes[1:, :]
if array:
return modes
else:
return self.__class__(
**{**vars(self), "state": modes, "basis": "modes"}
)
def _inv_time_transform(self, array=False, inplace=False):
"""
Spatial Fourier transform
Parameters
----------
Returns
-------
OrbitKS :
OrbitKS whose state is in the spatial Fourier mode basis.
"""
if inplace:
# can take advantage of the array sizes by adding to current array and multiplying by zero instead
# of concatenating.
self.state = self.state.astype(complex)
self.state[1 : -max([int(self.n // 2) - 1, 1]), :] += (
1j * self.state[-max([int(self.n // 2) - 1, 1]) :, :]
)
# if first statement evaluates to zero, then take all rows of
# axis by providing None as end of slice.
n_rows = (-max([int(self.n // 2) - 1, 1]) + 1) or None
self.state = self.state[:n_rows, :]
self.state[1:, :] /= np.sqrt(2)
if self.state.shape[0] == 1:
self.state = np.pad(self.state, ((0, 1), (0, 0)))
self.state[-1, :] = 0
self.state = irfft(
self.state, workers=self.workers, norm="ortho", axis=0, overwrite_x=True
)
self.state = np.concatenate((0 * self.state, self.state), axis=1)
self.basis = "spatial_modes"
if array:
return self.state
else:
return self
else:
# Take rfft, accounting for unitary normalization.
modes = self.state.astype(complex)
modes[1 : -max([int(self.n // 2) - 1, 1]), :] += (
1j * modes[-max([int(self.n // 2) - 1, 1]) :, :]
)
# if first statement evaluates to zero, then take all rows of
# axis by providing None as end of slice.
n_rows = (-max([int(self.n // 2) - 1, 1]) + 1) or None
modes = modes[:n_rows, :]
modes[1:, :] /= np.sqrt(2)
if modes.shape[0] == 1:
modes = np.pad(modes, ((0, 1), (0, 0)))
modes[-1, :] = 0
modes = irfft(modes, workers=self.workers, norm="ortho", axis=0)
modes = np.concatenate((0 * modes, modes), axis=1)
if array:
return modes
else:
return self.__class__(
**{**vars(self), "state": modes, "basis": "spatial_modes"}
)
[docs]
class ShiftReflectionOrbitKS(OrbitKS):
[docs]
def dx(self, **kwargs):
"""
Spatial derivative of the current state.
Parameters
----------
kwargs :
order :int
The order of the derivative.
array : bool
Whether or not to return a numpy array. Used for efficiency/avoiding construction of redundant
Orbit instances.
computation_basis : str
The basis in which to compute the derivative.
return_basis : str
Which basis to return the ShiftReflectionOrbitKS in, if array=False.
Returns
----------
ShiftReflectionOrbitKS :
Class instance whose spatiotemporal state represents the spatial derivative in the
the basis of the original state.
Notes
-----
See OrbitKS.dx() for more details
"""
order = kwargs.get("order", 1)
# can compute spatial derivative in spatial mode or spatiotemporal mode basis. spatial_modes basis is required
# for orbits with discrete symmetry as they are orthogonal to the dx() direction.
if order % 2:
# odd ordered derivatives NEED to be in spatial mode basis
return super().dx(**{**kwargs, "computation_basis": "spatial_modes"})
else:
# even ordered derivatives CAN be in the spatial mode basis or the spatiotemporal mode basis.
return super().dx(**kwargs)
[docs]
def selection_rules(self):
"""
Symmetry selection rules
"""
# equivalent to indices 0 + j from thesis; time indices go like {0, j, j}
i = np.arange(0, self.n // 2)[:, None]
selection_tensor_pattern = np.block(
[[i % 2, (i + 1) % 2], [i[1:] % 2, (i[1:] + 1) % 2]]
)
# Apply the pattern to (int(orbit_.m//2) - 1) modes
shiftreflection_selection_rules_integer_flags = np.repeat(
selection_tensor_pattern.ravel(), (int(self.m // 2) - 1)
)
# These indices are used for the transform as well as the transform matrices; therefore they are returned
# in a format compatible with both; applying .nonzero() yields indices., resize(self.shapes()[2]) yields
# tensor format.
return shiftreflection_selection_rules_integer_flags
def concat(self, other, axis=0):
"""
Join two Orbits together by concatenating their state arrays and adding/averaging their parameters.
Parameters
----------
other : Orbit
Orbit to be joined with.
axis : int
The axis along which to concatenate the state arrays.
Returns
-------
Orbit :
An instance whose stats have been concatenated, and whose parameters have been summed along that dimension.
Notes
-----
To cause the least amount of distortion, the grid spacings of each state should be approximately equivalent
(if discretizations of continuous dimension).
.. warning::
`self.concat(other)` is only equivalent to `other.concat(self)` if `other` and `self` both have parameters
defined, and the parameters not corresponding to dimensions are the same (i.e. categorical parameters).
When this is not the case, the parameters of the instance calling the method are used: `self.concat(other)`
will use `self`'s categoricals.
.. warning::
This function does not check the bases which each Orbit state is in.
"""
concat_orbit = super().concat(other, axis=axis)
if axis == 1:
return concat_orbit.roll(other.m // 2, axis=1)
else:
return concat_orbit
[docs]
def shapes(self):
"""
State array shapes in different bases. See core.py for details.
"""
return (
(self.n, self.m),
(self.n, self.m - 2),
(max([self.n - 1, 1]), (int(self.m // 2) - 1)),
)
[docs]
def to_fundamental_domain(self, half=0):
"""
Overwrite of parent method
"""
if half == 0:
return self.transform(to="field")[-int(self.n // 2) :, :].transform(
to=self.basis
)
else:
return self.transform(to="field")[: -int(self.n // 2), :].transform(
to=self.basis
)
[docs]
def from_fundamental_domain(self, half=0):
"""
Reconstruct full field from discrete fundamental domain
"""
field = self.transform(to="field")
if half == 0:
return field.reflection().concat(field, axis=0).transform(to=self.basis)
else:
return field.concat(field.reflection(), axis=0).transform(to=self.basis)
def _nonlinear(self, other, array=False):
"""
nonlinear computation of the nonlinear term of the Kuramoto-Sivashinsky equation
"""
# Elementwise product, both self and other should be in physical field basis.
assert (self.basis == "field") and (other.basis == "field")
# to get around the special behavior of discrete symmetries, will return spatial modes without this workaround.
nl_orbit = 0.5 * (self * other).transform(to="spatial_modes").dx(
computation_basis="spatial_modes", return_basis="modes"
)
if array:
return nl_orbit.state
else:
return nl_orbit
[docs]
def _pad(self, size, axis=0, **kwargs):
"""
Overwrite of parent method
"""
modes = self.transform(to="modes")
if np.mod(size, 2):
raise ValueError(
"New discretization size must be an even number, preferably a power of 2"
)
else:
if axis == 0:
padding = (size - modes.n) // 2
padding_tuple = ((padding, padding), (0, 0))
padded_modes = np.concatenate(
(
modes.state[: -(modes.n // 2 - 1), :],
np.pad(modes.state[-(modes.n // 2 - 1) :, :], padding_tuple),
),
axis=0,
)
padded_modes *= np.sqrt(size / modes.n)
return self.__class__(
**{
**vars(self),
"state": padded_modes,
"basis": "modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
padding_number = (size - modes.m) // 2
padded_modes = np.sqrt(size / modes.m) * np.pad(
modes.state, ((0, 0), (0, padding_number))
)
return self.__class__(
**{
**vars(self),
"state": padded_modes,
"basis": "modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
[docs]
def _truncate(self, size, axis=0):
"""
Overwrite of parent method
"""
modes = self.transform(to="modes")
if np.mod(size, 2):
raise ValueError(
"New discretization size must be an even number, preferably a power of 2"
)
else:
if axis == 0:
truncate_number = int(size // 2) - 1
first_half = modes.state[: truncate_number + 1, :]
second_half = modes.state[
-(modes.n // 2 - 1) : -(modes.n // 2 - 1) + truncate_number, :
]
truncated_modes = np.sqrt(size / modes.n) * np.concatenate(
(first_half, second_half), axis=0
)
return self.__class__(
**{
**vars(self),
"state": truncated_modes,
"basis": "modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
truncate_number = int(size // 2) - 1
truncated_modes = (
np.sqrt(size / modes.m) * modes.state[:, :truncate_number]
)
return self.__class__(
**{
**vars(self),
"state": truncated_modes,
"basis": "modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
def _jac_nonlin(self):
"""
The nonlinear component of the Jacobian matrix of the Kuramoto-Sivashinsky equation
Returns
-------
nonlinear_dx : matrix
Matrix which represents the nonlinear component of the Jacobian.
Notes
-----
See OrbitKS for more details.
"""
_jac_nonlin_left = self._time_transform_matrix().dot(
self._dx_matrix(computation_basis="spatial_modes")
)
_jac_nonlin_middle = self._space_transform_matrix().dot(
np.diag(self.transform(to="field").state.ravel())
)
_jac_nonlin_right = self._inv_spacetime_transform_matrix()
_jac_nonlin = _jac_nonlin_left.dot(_jac_nonlin_middle).dot(_jac_nonlin_right)
return _jac_nonlin
def _parse_state(self, state, basis, **kwargs):
if isinstance(state, np.ndarray):
if len(state.shape) != 2:
raise ValueError('"state" array must be two-dimensional')
self.state = state
else:
self.state = np.array([], dtype=float).reshape(0, 0)
if self.size > 0:
if basis is None:
raise ValueError("basis must be provided when state is provided")
elif basis == "modes":
n, m = self.shape[0] + 1, 2 * self.shape[1] + 2
elif basis == "field":
n, m = self.shape
elif basis == "spatial_modes":
n, m = self.shape[0], self.shape[1] + 2
else:
raise ValueError(
'basis not recognized; must equal "field" or "spatial_modes", or "modes"'
)
if n < self.minimal_shape()[0] or m < self.minimal_shape()[1]:
warn_str = "\nminimum discretization requirements not met; methods may not work as intended."
warnings.warn(warn_str, RuntimeWarning)
self.basis = basis
self.discretization = n, m
else:
self.discretization = None
self.basis = None
def _time_transform(self, array=False, inplace=False):
"""
Spatial Fourier transform
Parameters
----------
Returns
-------
OrbitKS :
OrbitKS whose state is in the spatial Fourier mode basis.
"""
if inplace:
# Take rfft, accounting for orthogonal normalization.
self.state = rfft(
self.state, workers=self.workers, norm="ortho", axis=0, overwrite_x=True
)
# Project onto shift-reflection subspace.
self.state[::2, : -(int(self.m // 2) - 1)] = 0
self.state[1::2, -(int(self.m // 2) - 1) :] = 0
# Due to projection, can add the different components without mixing information, this allows
# us to avoid a complex operation like shuffling.
self.state = np.concatenate(
(
(
self.state.real[:-1, : -(int(self.m // 2) - 1)]
+ self.state.real[:-1, -(int(self.m // 2) - 1) :]
),
(
self.state.imag[1:-1, : -(int(self.m // 2) - 1)]
+ self.state.imag[1:-1, -(int(self.m // 2) - 1) :]
),
),
axis=0,
)
self.state[1:, :] = np.sqrt(2) * self.state[1:, :]
self.basis = "modes"
if array:
return self.state
else:
return self
else:
# Take rfft, accounting for orthogonal normalization.
modes = rfft(self.state, workers=self.workers, norm="ortho", axis=0)
# Project onto shift-reflection subspace.
modes[::2, : -(int(self.m // 2) - 1)] = 0
modes[1::2, -(int(self.m // 2) - 1) :] = 0
# Due to projection, can add the different components without mixing information, this allows
# us to avoid a complex operation like shuffling.
modes = np.concatenate(
(
(
modes.real[:-1, : -(int(self.m // 2) - 1)]
+ modes.real[:-1, -(int(self.m // 2) - 1) :]
),
(
modes.imag[1:-1, : -(int(self.m // 2) - 1)]
+ modes.imag[1:-1, -(int(self.m // 2) - 1) :]
),
),
axis=0,
)
modes[1:, :] = np.sqrt(2) * modes[1:, :]
if array:
return modes
else:
return self.__class__(
**{**vars(self), "state": modes, "basis": "modes"}
)
def _inv_time_transform(self, array=False, inplace=False):
"""
Spatial Fourier transform
Parameters
----------
array : bool
Whether to return np.ndarray or Orbit subclass instance
Returns
-------
OrbitKS :
OrbitKS whose state is in the spatial Fourier mode basis.
"""
if inplace:
# can take advantage of the array sizes by adding to current array and multiplying by zero instead
# of concatenating.
self.state = self.state.astype(complex)
self.state[1 : -max([int(self.n // 2) - 1, 1]), :] += (
1j * self.state[-max([int(self.n // 2) - 1, 1]) :, :]
)
# if first statement evaluates to zero, then take all rows of
# axis by providing None as end of slice.
n_rows = (-max([int(self.n // 2) - 1, 1]) + 1) or None
self.state = self.state[:n_rows, :]
self.state[1:, :] /= np.sqrt(2)
if self.state.shape[0] == 1:
self.state = np.pad(self.state, ((0, 1), (0, 0)))
self.state[-1, :] = 0
self.state = np.concatenate((self.state, self.state), axis=1)
self.state[::2, : -(int(self.m // 2) - 1)] = 0
self.state[1::2, -(int(self.m // 2) - 1) :] = 0
self.state = irfft(
self.state, workers=self.workers, norm="ortho", axis=0, overwrite_x=True
)
self.basis = "spatial_modes"
if array:
return self.state
else:
return self
else:
modes = self.state.copy().astype(complex)
modes[1 : -max([int(self.n // 2) - 1, 1]), :] += (
1j * modes[-max([int(self.n // 2) - 1, 1]) :, :]
)
# if first statement evaluates to zero, then take all rows of
# axis by providing None as end of slice.
n_rows = (-max([int(self.n // 2) - 1, 1]) + 1) or None
modes = modes[:n_rows, :]
modes[1:, :] /= np.sqrt(2)
modes = np.concatenate((modes, modes), axis=1)
if modes.shape[0] == 1:
modes = np.pad(modes, ((0, 1), (0, 0)))
modes[-1, :] = 0
modes[::2, : -(int(self.m // 2) - 1)] = 0
modes[1::2, -(int(self.m // 2) - 1) :] = 0
modes = irfft(modes, workers=self.workers, norm="ortho", axis=0)
if array:
return modes
else:
return self.__class__(
**{**vars(self), "state": modes, "basis": "spatial_modes"}
)
def _time_transform_matrix(self):
"""
Notes
-----
Dramatic simplification over old code; now just the full DFT matrix plus projection
"""
return super()._time_transform_matrix()[self.selection_rules().nonzero()[0], :]
def _inv_time_transform_matrix(self):
"""
Notes
-----
Dramatic simplification over old code; now just transpose of forward dft matrix b.c. orthogonal
"""
return self._time_transform_matrix().transpose()
[docs]
class EquilibriumOrbitKS(AntisymmetricOrbitKS):
"""
Class for temporal equilibria
Notes
-----
For convenience, this subclass accepts any (even) value for the time discretization. Only a single time point
is required however to fully represent the solution and therefore perform any computations. If the
discretization size is greater than 1 then then different bases will have the following shapes: field (N, M).
spatial modes = (N, m), spatiotemporal modes (1, m). In other words, discretizations
of this type can still be used in the optimization codes but will be much more inefficient.
The reason for this choice is because it is possible
to start with a spatiotemporal orbit with no symmetry (i.e. OrbitKS) and still end up at an equilibrium
solution. Therefore, I am accommodating transformations from other orbit types to EquilibriumOrbitKS. To
make the computations more efficient all that is required is usage of the method
self.optimize_for_calculations(), which converts N -> 1, making the shape of the state (1, M) in the
physical field basis. Also can inherit more methods with this choice.
More details are included in the thesis and in the documentation.
While only the imaginary components of the spatial modes are non-zero, both real and imaginary components are
kept to allow for odd order spatial derivatives, required for the nonlinear term. Other allowed operations such
as rotation are preferably performed after converting to a different symmetry type such as AntisymmetricOrbitKS
or OrbitKS.
"""
[docs]
@staticmethod
def minimal_shape():
"""
The smallest possible compatible discretization
Returns
-------
tuple of int :
The default array shape when dimensions are not specified.
Notes
-----
Often symmetry constraints reduce the dimensionality; if too small this reduction may leave the state empty,
used for aspect ratio correction and possibly other gluing applications.
If parity is important, than that must be built into the definition.
"""
return 1, 4
[docs]
def dt(self, order=1, array=False, **kwargs):
"""
A time derivative of the current state.
Parameters
----------
order :int
The order of the derivative.
array : bool
Whether to return np.ndarray or Orbit
Returns
----------
EquilibriumOrbitKS or np.ndarray :
Orbit or array representation of the time derivative of an equilibrium (i.e. zero).
"""
if array:
return np.zeros(self.state.shape)
else:
return self.__class__(
**{**vars(self), "state": np.zeros(self.shape), "basis": "modes"}
)
[docs]
def jacobian(self, **kwargs):
"""
Jacobian matrix evaluated at the current state.
kwargs :
Unused, included to match signature in Orbit class.
Returns
-------
jac_ : 2-d ndarray
Jacobian matrix whose columns are derivatives with respect to all unconstrained state variables;
including periods. Has dimensions dependent on number of spatiotemporal modes and free parameters,
(self.shapes()[-1].size, self.shapes()[-1].size + n_params)
Jacobian matrix of the KSe where n_params = 2 - sum(self.constraints)
Notes
-----
Original implementation was pretty, but very inefficient. This now computes the Jacobian matrix,
minimizing the amount of allocated memory by overwriting and performing matrix-free implementations of
Fourier transform matrix operations. Computes the Jacobian
$J = D_t + D_xx + D_xxxx + F_t D_x F_x Diag(u) F_x^{-1} F_t^{-1}$ in the following steps:
"""
assert self.basis == "modes"
field_size, smode_size, mode_size = (np.prod(shp) for shp in self.shapes())
# Begin with nonlinear term. Apply matrix operators in matrix-free fashion. begin with diag(u)
J = np.diag(self.transform(to="field", array=True).ravel()).reshape(-1, self.m)
# By creatively reshaping J, can apply FFTs to 3-d tensor.
J = self.__class__(
**{**vars(self), "state": J, "basis": "field", "discretization": J.shape}
).transform(to="spatial_modes", array=True, inplace=True)
J = J.reshape(field_size, smode_size).T.reshape(-1, self.m)
# After transforming the columns, transpose and transform again to get F_x Diag(u) F_x^{-1}
J = self.__class__(
**{**vars(self), "state": J, "basis": "field", "discretization": J.shape}
)
# Reshape back into a matrix, and then into another 3-d tensor after transposing back, to take derivative.
J = J.transform(to="spatial_modes", array=True, inplace=True).reshape(
smode_size, smode_size
)
J = J.T.reshape(-1, self.m - 2)
J = self.__class__(
**{
**vars(self),
"state": J,
"basis": "spatial_modes",
"discretization": (J.shape[0], self.m),
}
)
J = J.dx(array=True, computation_basis="spatial_modes")
# At this point J represents D_x F_x Diag(u) F_x^{-1}; reshape into 3-d tensor again and apply
# time transforms
J = J.reshape(-1, smode_size).T.reshape(self.n, self.m - 2, -1)
J = self.__class__(
**{
**vars(self),
"state": J,
"discretization": (*self.discretization, J.shape[-1]),
"basis": "spatial_modes",
}
).transform(to="modes", array=True, inplace=True)
# reshape, transpose and transform.
J = (
J.reshape((*self.shapes()[2], -1))
.reshape(-1, smode_size)
.T.reshape(self.n, self.m - 2, -1)
)
J = self.__class__(
**{
**vars(self),
"state": J,
"discretization": (*self.discretization, J.shape[-1]),
"basis": "spatial_modes",
}
).transform(to="modes", array=True, inplace=True)
J = J.reshape((*self.shapes()[2], -1)).reshape(J.shape[-1], J.shape[-1]).T
# Produce the linear term; spatial derivatives are diagonal; time is more complicated due to SO(2) operator.
e = np.ones(self.shapes()[2])
dx2 = self.__class__(state=e, basis="modes", parameters=self.parameters).dx(
order=2
)
J[np.diag_indices(J.shape[0])] += dx2.state.ravel() + dx2.state.ravel() ** 2
J = self._jacobian_parameter_derivatives_concat(J)
return J
# def orbit_vector(self):
# """
# Overwrite of parent method
#
# """
# return np.concatenate(
# (self.state.reshape(-1, 1), np.array([[float(self.x)]])), axis=0
# )
[docs]
def shapes(self):
"""
State array shapes in different bases. See core.py for details.
"""
return (self.n, self.m), (self.n, self.m - 2), (1, (int(self.m // 2) - 1))
[docs]
def precondition(self, **kwargs):
"""
Precondition a vector with the inverse (aboslute value) of linear spatial terms
Parameters
----------
kwargs : dict
pmult : tuple of tuples
Values for the frequencies to use in the rescaling
pexp : tuple of int
The exponentiation factor used to rescale parameter corrections.
Returns
-------
target : OrbitKS
Return the OrbitKS instance, modified by preconditioning.
Notes
-----
Often we want to precondition a state derived from a mapping or rmatvec (gradient descent step),
with respect to ANOTHER orbit's (current state's) parameters.
"""
pmult = kwargs.get("pmult", self.preconditioning_parameters())
pexp = kwargs.get("pexp", (0, 4))
p_multipliers = 1.0 / (
np.abs(spatial_frequencies(*pmult[1], order=2)[:, : self.shapes()[2][1]])
+ spatial_frequencies(*pmult[1], order=4)[:, : self.shapes()[2][1]]
)
preconditioned_state = np.multiply(self.state, p_multipliers)
# Precondition the change in T and L so that they do not dominate
if not self.constraints["x"]:
# self is the orbit being preconditioned, i.e. the correction orbit; by default this is dx = dx / L^4
x = self.x * (pmult[1][0] ** -pexp[1])
else:
x = self.x
return self.__class__(
**{
**vars(self),
"state": preconditioned_state,
"basis": "modes",
"parameters": (0, x, 0),
}
)
[docs]
def preprocess(self):
"""
Check whether the orbit converged to an equilibrium or close-to-zero solution
"""
# Take the L_2 norm of the field, if uniformly close to zero, the magnitude will be very small.
field_orbit = self.transform(to="field")
# See if the L_2 norm is beneath a threshold value, if so, replace with zeros.
if field_orbit.norm() < 10**-5:
return self.__class__(
**{
**vars(self),
"state": np.zeros(self.discretization),
"basis": "field",
}
).transform(to=self.basis)
else:
return self
[docs]
@classmethod
def dimension_based_discretization(cls, dimensions, **kwargs):
"""
Orbithunter conventions for discretization size.
Parameters
----------
dimensions : tuple
tuple containing (t, L, S) i.e. OrbitKS().parameters
kwargs :
resolution : str
Takes values 'coarse', 'normal', 'fine'. These options return one of three orbithunter conventions for
the discretization size.
N : int, default None
Temporal discretization size, if provided
M : int, default None
Spatial discretization size, if provided
Returns
-------
int, int
The new spatiotemporal field discretization; number of time points (rows) and number of space points (columns)
Notes
-----
This function should only ever be called by rediscretize, the returned values can always be accessed by
the appropriate attributes of the rediscretized orbit.
"""
# Change the default to N = 1 from N = None, this ensures that the temporal period (t=0) is never used.
resolution = kwargs.get("resolution", "default")
n, m = kwargs.get("discretization", (1, None))
t, x = dimensions
if m is None:
if x == 0:
m = cls._default_shape()[1]
elif resolution == "coarse":
m = np.max([2 ** (int(np.log2(x) - 1)), 8])
elif resolution == "fine":
m = np.max([2 ** (int(np.log2(x) + 2)), 16])
elif resolution == "power":
m = np.max([2 * (int(4 * x ** (1.0 / 2.0)) // 2), 16])
else:
m = np.max([2 ** (int(np.log2(x) + 0.5)), 16])
m = max(m, cls.minimal_shape()[1])
return n, m
[docs]
@classmethod
def _default_constraints(cls):
return {"t": True, "x": False, "s": True}
def _eqn_linear_component(self, array=False):
"""
Linear component of the KSE
Parameters
----------
array : bool
Whether to return ndarray or not.
Returns
-------
ndarray or class instance.
"""
return self.dx(order=2, array=array) + self.dx(order=4, array=array)
def _rmatvec_parameters(self, self_field, other):
other_modes_in_vector_form = other.state.ravel()
if not self.constraints["x"]:
# change in L, dx, equal to DF/DL * v
rmatvec_x = (
(
(-2.0 / self.x) * self.dx(order=2, array=True)
+ (-4.0 / self.x) * self.dx(order=4, array=True)
+ (-1.0 / self.x) * self_field._nonlinear(self_field, array=True)
)
.ravel()
.dot(other_modes_in_vector_form)
)
else:
rmatvec_x = 0
return 0.0, rmatvec_x, 0.0
def _rmatvec_linear_component(self, array=False):
"""
Linear component of the adjoint Jacobian-vector product
Parameters
----------
array : bool
Whether or not to return ndarray
Returns
-------
Orbit or ndarray.
"""
return self._eqn_linear_component(array=array)
[docs]
def _pad(self, size, axis=0, **kwargs):
"""
Overwrite of parent method
Notes
-----
If starting and ending in the spatiotemporal modes basis, this will only create an instance with a different
value of time dimensionality attribute 'N'.
"""
spatial_modes = self.transform(to="spatial_modes")
if axis == 0:
# Not technically zero-padding, just copying. Just can't be in temporal mode basis
# because it is designed to only represent the zeroth modes.
padded_spatial_modes = np.tile(spatial_modes.state[None, 0, :], (size, 1))
return self.__class__(
**{
**vars(self),
"state": padded_spatial_modes,
"basis": "spatial_modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
# Split into real and imaginary components, pad separately.
padding = (size - self.m) // 2
padding_tuple = ((0, 0), (padding, padding))
padded_modes = np.concatenate(
(
spatial_modes.state[:, : -(self.m // 2 - 1)],
np.pad(spatial_modes.state[:, -(self.m // 2 - 1) :], padding_tuple),
),
axis=1,
)
padded_modes *= np.sqrt(size / self.m)
return self.__class__(
**{
**vars(self),
"state": padded_modes,
"basis": "spatial_modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
[docs]
def _truncate(self, size, axis=0):
"""
Overwrite of parent method
"""
if axis == 0:
spatial_modes = self.transform(to="spatial_modes")
truncated_spatial_modes = spatial_modes.state[-size:, :]
return self.__class__(
**{
**vars(self),
"state": truncated_spatial_modes,
"basis": "spatial_modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
modes = self.transform(to="modes")
truncate_number = int(size // 2) - 1
truncated_modes = np.sqrt(size / modes.m) * modes.state[:, :truncate_number]
# cannot distinguish between n != 1 and n == 1 when in modes basis; therefore, pass the shape to keep track
return self.__class__(
**{
**vars(self),
"state": truncated_modes,
"basis": "modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
[docs]
@classmethod
def _default_parameter_ranges(cls):
"""
Default parameter ranges.
Returns
-------
dict :
Keys are parameter labels, values are intervals to sample from in parameter generation.
Notes
-----
Default spatial domain size; minimum L=2*pi based on fundamental orbit.
"""
return {"x": (2 * pi, 100.0)}
def _parse_state(self, state, basis, **kwargs):
"""
Parse the provided state information of an equilibrium state.
"""
if isinstance(state, np.ndarray):
if len(state.shape) != 2:
raise ValueError('"state" array must be two-dimensional')
self.state = state
else:
self.state = np.array([], dtype=float).reshape(0, 0)
if self.size > 0:
if len(self.shape) == 1:
self.state = state[None, :]
if basis is None:
raise ValueError("basis must be provided when state is provided")
elif basis == "modes":
# If passed as modes; which only contain the zeroth modes, the shape in the other bases
# cannot be inferred from the NumPy array; it must either be provided via discretization keyword
# or it will default to the minimal value.
self.state = self.state[None, 0, :]
n = kwargs.get("discretization", (None, None))[0] or 1
m = 2 * self.shape[1] + 2
elif basis == "field":
n, m = self.shape
elif basis == "spatial_modes":
n, m = self.shape[0], self.shape[1] + 2
else:
raise ValueError(
'basis not recognized; must equal "field" or "spatial_modes", or "modes"'
)
if n < self.minimal_shape()[0] or m < self.minimal_shape()[1]:
warn_str = "\nminimum discretization requirements not met; methods may not work as intended."
warnings.warn(warn_str, RuntimeWarning)
self.discretization = n, m
self.basis = basis
else:
self.discretization = None
self.basis = None
def _jac_lin(self):
"""
Extension of the OrbitKS method that includes the term for spatial translation symmetry
"""
return self._dx_matrix(order=2) + self._dx_matrix(order=4)
def _jacobian_parameter_derivatives_concat(self, jac_):
"""
Concatenate parameter partial derivatives to Jacobian matrix
Parameters
----------
jac_ : np.ndArray,
(N-1) * (M-2) dimensional array resultant from taking the derivative of the spatioatemporal mapping
with respect to Fourier modes.
Returns
-------
Jacobian augmented with parameter partial derivatives as columns. Required to solve for changes to period,
space period in optimization process. Makes the system rectangular; needs to be solved by least squares type
methods.
"""
# If spatial period is not fixed, need to include dF/dx in jacobian matrix
if not self.constraints["x"]:
self_field = self.transform(to="field")
spatial_period_derivative = (
(-2.0 / self.x) * self.dx(order=2, array=True)
+ (-4.0 / self.x) * self.dx(order=4, array=True)
+ (-1.0 / self.x) * self_field._nonlinear(self_field, array=True)
)
jac_ = np.concatenate(
(jac_, spatial_period_derivative.reshape(-1, 1)), axis=1
)
return jac_
def _inv_time_transform_matrix(self):
"""
Overwrite of parent method
Notes
-----
Originally this transform just selected the antisymmetric spatial modes (imaginary component),
but in order to be consistent with all other time transforms, I will actually apply the normalization
constant associated with a forward in time transformation. The reason for this is for comparison
of states between different subclasses.
"""
return np.tile(
np.concatenate(
(0 * np.eye((int(self.m // 2) - 1)), np.eye((int(self.m // 2) - 1))),
axis=0,
),
(self.n, 1),
)
def _time_transform_matrix(self):
"""
Overwrite of parent method
"""
time_dft_mat = np.tile(
np.concatenate(
(0 * np.eye((int(self.m // 2) - 1)), np.eye((int(self.m // 2) - 1))),
axis=1,
),
(1, self.n),
)
time_dft_mat[:, 2 * (int(self.m // 2) - 1) :] = 0
return time_dft_mat
def _time_transform(self, array=False, inplace=False):
"""
Overwrite of parent method
Notes
-----
Taking the RFFT, with orthogonal normalization, of a constant time series defined on N points is equivalent
to multiplying the constant value by sqrt(N). This is because the transform sums over N repeats of the same
value and then divides by 1/sqrt(N). i.e. (N * C)/sqrt(N) = sqrt(N) * C. Therefore we can save some time by
just doing this without calling the rfft function.
"""
# Select the nonzero (imaginary) components of modes and transform in time (w.r.t. axis=0).
if inplace:
# reshapes but also generalizes for Jacobian calculation
self.state = self.state[np.newaxis, 0, -(int(self.m // 2) - 1) :]
self.basis = "modes"
if array:
return self.state
else:
return self
else:
modes = self.state[np.newaxis, 0, -(int(self.m // 2) - 1) :]
if array:
return modes
else:
return self.__class__(
**{**vars(self), "state": modes, "basis": "modes"}
)
def _inv_time_transform(self, array=False, inplace=False):
"""
Overwrite of parent method
Notes
-----
Taking the IRFFT, with orthogonal normalization is equivalent to dividing by the normalization constant; because
there would only be
"""
if inplace:
real = np.zeros(self.state.shape)
imaginary = self.state
self.state = np.tile(np.concatenate((real, imaginary), axis=1), (self.n, 1))
self.basis = "spatial_modes"
if array:
return self.state
else:
return self
else:
real = np.zeros(self.state.shape)
imaginary = self.state
spatial_modes = np.tile(
np.concatenate((real, imaginary), axis=1), (self.n, 1)
)
if array:
return spatial_modes
else:
return self.__class__(
**{**vars(self), "state": spatial_modes, "basis": "spatial_modes"}
)
[docs]
class RelativeEquilibriumOrbitKS(RelativeOrbitKS):
[docs]
@staticmethod
def minimal_shape():
"""
The smallest possible compatible discretization
Returns
-------
tuple of int :
The default array shape when dimensions are not specified.
Notes
-----
Often symmetry constraints reduce the dimensionality; if too small this reduction may leave the state empty,
used for aspect ratio correction and possibly other gluing applications.
If parity is important, than that must be built into the definition.
"""
return 1, 4
[docs]
def dt(self, order=1, array=False, **kwargs):
"""
A time derivative of the current state.
Parameters
----------
order :int
The order of the derivative.
array : bool
Whether to return np.ndarray or Orbit
Returns
----------
RelativeEquilibriumOrbitKS :
The class instance whose state is the time derivative in
the spatiotemporal mode basis (i.e. zero in comoving frame).
"""
if self.frame == "comoving":
if array:
return np.zeros(self.state.shape)
else:
return self.__class__(
**{**vars(self), "state": np.zeros(self.shape), "basis": "modes"}
)
else:
raise ValueError(
"Attempting to compute time derivative of "
+ str(self)
+ " in physical reference frame."
+ "If this is truly desired, convert to RelativeOrbitKS first."
)
[docs]
def shapes(self):
"""
State array shapes in different bases. See core.py for details.
"""
return (self.n, self.m), (self.n, self.m - 2), (1, self.m - 2)
[docs]
def preprocess(self):
"""
Check whether the orbit converged to an equilibrium or close-to-zero solution
"""
# Take the L_2 norm of the field, if uniformly close to zero, the magnitude will be very small.
orbit_with_inverted_shift = self.copy()
orbit_with_inverted_shift.s = -self.s
cost_imported_S = self.cost()
cost_negated_S = orbit_with_inverted_shift.cost()
if cost_imported_S > cost_negated_S:
orbit_ = orbit_with_inverted_shift
else:
orbit_ = self.copy()
# Take the L_2 norm of the field, if uniformly close to zero, the magnitude will be very small.
field_orbit = orbit_.transform(to="field")
zero_check = field_orbit.norm()
if zero_check < 10**-5:
return RelativeEquilibriumOrbitKS(
**{
**vars(self),
"state": np.zeros(self.discretization),
"basis": "field",
}
).transform(to=self.basis)
else:
return orbit_
[docs]
def to_fundamental_domain(self):
return self.change_reference_frame(frame="physical")
[docs]
def from_fundamental_domain(self):
"""
For compatibility purposes with plotting and other utilities
"""
return self.change_reference_frame(frame="comoving")
[docs]
def jacobian(self, **kwargs):
"""
Jacobian matrix evaluated at the current state.
kwargs :
Unused, included to match signature in Orbit class.
Returns
-------
jac_ : 2-d ndarray
Jacobian matrix whose columns are derivatives with respect to all unconstrained state variables;
including periods. Has dimensions dependent on number of spatiotemporal modes and free parameters,
(self.shapes()[-1].size, self.shapes()[-1].size + n_params)
Jacobian matrix of the KSe where n_params = 2 - sum(self.constraints)
Notes
-----
Original implementation was pretty, but very inefficient. This now computes the Jacobian matrix,
minimizing the amount of allocated memory by overwriting and performing matrix-free implementations of
Fourier transform matrix operations. Computes the Jacobian
$J = D_t + D_xx + D_xxxx + F_t D_x F_x Diag(u) F_x^{-1} F_t^{-1}$ in the following steps:
"""
assert self.basis == "modes"
field_size, smode_size, mode_size = (np.prod(shp) for shp in self.shapes())
# Begin with nonlinear term. Apply matrix operators in matrix-free fashion. begin with diag(u)
J = np.diag(self.transform(to="field", array=True).ravel()).reshape(-1, self.m)
# By creatively reshaping J, can apply FFTs to 3-d tensor.
J = self.__class__(
**{**vars(self), "state": J, "basis": "field", "discretization": J.shape}
).transform(to="spatial_modes", array=True, inplace=True)
J = J.reshape(field_size, smode_size).T.reshape(-1, self.m)
# After transforming the columns, transpose and transform again to get F_x Diag(u) F_x^{-1}
J = self.__class__(
**{**vars(self), "state": J, "basis": "field", "discretization": J.shape}
)
# Reshape back into a matrix, and then into another 3-d tensor after transposing back, to take derivative.
J = J.transform(to="spatial_modes", array=True, inplace=True).reshape(
smode_size, smode_size
)
J = J.T.reshape(-1, self.m - 2)
J = self.__class__(
**{
**vars(self),
"state": J,
"basis": "spatial_modes",
"discretization": (J.shape[0], self.m),
}
)
J = J.dx(array=True, computation_basis="spatial_modes")
# At this point J represents D_x F_x Diag(u) F_x^{-1}; reshape into 3-d tensor again and apply
# time transforms
J = J.reshape(-1, smode_size).T.reshape(self.n, self.m - 2, -1)
J = self.__class__(
**{
**vars(self),
"state": J,
"discretization": (*self.discretization, J.shape[-1]),
"basis": "spatial_modes",
}
).transform(to="modes", array=True, inplace=True)
# reshape, transpose and transform.
J = (
J.reshape((*self.shapes()[2], -1))
.reshape(-1, smode_size)
.T.reshape(self.n, self.m - 2, -1)
)
J = self.__class__(
**{
**vars(self),
"state": J,
"discretization": (*self.discretization, J.shape[-1]),
"basis": "spatial_modes",
}
).transform(to="modes", array=True, inplace=True)
J = J.reshape((*self.shapes()[2], -1)).reshape(J.shape[-1], J.shape[-1]).T
# Produce the linear term; spatial derivatives are diagonal; time is more complicated due to SO(2) operator.
e = np.ones(self.shapes()[2])
dx2 = self.__class__(state=e, basis="modes", parameters=self.parameters).dx(
order=2
)
J[np.diag_indices(J.shape[0])] += dx2.state.ravel() + dx2.state.ravel() ** 2
e = e.ravel()
# For time, get the correct frequency by taking the derivative of individual elements; swapping of real
# and imaginary components handled by .dt()
for i in range(J.shape[0]):
e *= 0
e[i] = 1.0
J[:, i] += (-self.s / self.t) * self.__class__(
state=e.reshape(self.shape), basis="modes", parameters=self.parameters
).dx().state.ravel()
J = self._jacobian_parameter_derivatives_concat(J)
return J
[docs]
@classmethod
def dimension_based_discretization(cls, dimensions, **kwargs):
"""
Subclassed method for equilibria.
"""
kwargs.setdefault("discretization", (1, None))
n, m = super().dimension_based_discretization(dimensions, **kwargs)
return n, m
[docs]
def _pad(self, size, axis=0):
"""
Overwrite of parent method
Notes
-----
If starting and ending in the spatiotemporal modes basis, this will only create an instance with a different
value of time dimensionality attribute 'N'.
"""
assert (
self.frame == "comoving"
), "Transform to comoving frame before padding modes"
if np.mod(size, 2):
raise ValueError(
"New discretization size must be an even number, preferably a power of 2"
)
else:
if axis == 0:
# Not technically zero-padding, just copying. Just can't be in temporal mode basis
# because it is designed to only represent the zeroth modes.
spatial_modes = self.transform(to="spatial_modes")
padded_spatial_modes = np.tile(
spatial_modes.state[None, -1, :], (size, 1)
)
return self.__class__(
**{
**vars(self),
"state": padded_spatial_modes,
"basis": "spatial_modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
modes = self.transform(to="modes")
padding = (size - modes.m) // 2
padding_tuple = ((0, 0), (padding, padding))
padded_modes = np.concatenate(
(
modes.state[:, : -modes.m // 2 - 1],
np.pad(modes.state[:, -modes.m // 2 - 1 :], padding_tuple),
),
axis=1,
)
padded_modes *= np.sqrt(size / modes.m)
return self.__class__(
**{
**vars(self),
"state": padded_modes,
"basis": "modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
[docs]
def _truncate(self, size, axis=0):
"""
Subclassed method to handle RelativeEquilibriumOrbitKS mode's shape.
"""
assert (
self.frame == "comoving"
), "Transform to comoving frame before truncating modes"
if axis == 0:
truncated_spatial_modes = self.transform(to="spatial_modes").state[
-size:, :
]
return self.__class__(
**{
**vars(self),
"state": truncated_spatial_modes,
"basis": "spatial_modes",
"discretization": (size, self.m),
}
).transform(to=self.basis)
else:
truncate_number = int(size // 2) - 1
# Split into real and imaginary components, truncate separately.
spatial_modes = self.transform(to="spatial_modes")
first_half = spatial_modes.state[:, :truncate_number]
second_half = spatial_modes.state[
:,
-spatial_modes.m // 2 - 1 : -spatial_modes.m // 2 - 1 + truncate_number,
]
truncated_spatial_modes = np.sqrt(size / spatial_modes.m) * np.concatenate(
(first_half, second_half), axis=1
)
return self.__class__(
**{
**vars(self),
"state": truncated_spatial_modes,
"basis": "spatial_modes",
"discretization": (self.n, size),
}
).transform(to=self.basis)
def _eqn_linear_component(self, array=False):
"""
Linear component of the KSE
Parameters
----------
array : bool
Whether to return ndarray or not.
Returns
-------
ndarray or class instance.
"""
return (
self.dx(order=2, array=array)
+ self.dx(order=4, array=array)
- (self.s / self.t) * self.dx(array=array)
)
def _rmatvec_linear_component(self, array=False):
"""
Linear component of the adjoint Jacobian-vector product
Parameters
----------
array : bool
Whether or not to return ndarray
Returns
-------
Orbit or ndarray.
"""
return (
self.dx(order=2, array=array)
+ self.dx(order=4, array=array)
+ (self.s / self.t) * self.dx(array=array)
)
def _parse_state(self, state, basis, **kwargs):
if isinstance(state, np.ndarray):
if len(state.shape) != 2:
raise ValueError('"state" array must be two-dimensional')
self.state = state
else:
self.state = np.array([], dtype=float).reshape(0, 0)
if self.size > 0:
if basis is None:
raise ValueError("basis must be provided when state is provided")
elif basis == "modes":
self.state = self.state[None, 0, :]
# For uniformity with _populate_state, use 'shape' instead of 'n' for kwarg
n = kwargs.get("discretization", (None, None))[0] or 1
m = self.shape[1] + 2
elif basis == "field":
n, m = self.shape
elif basis == "spatial_modes":
n, m = self.shape[0], self.shape[1] + 2
else:
raise ValueError(
'basis not recognized; must equal "field" or "spatial_modes", or "modes"'
)
# To allow for multiple time point fields and spatial modes, for plotting purposes.
if n < self.minimal_shape()[0] or m < self.minimal_shape()[1]:
warn_str = "\nminimum discretization requirements not met; methods may not work as intended."
warnings.warn(warn_str, RuntimeWarning)
self.discretization = n, m
self.basis = basis
else:
self.discretization = None
self.basis = None
def _inv_time_transform_matrix(self):
"""
Overwrite of parent method
Notes
-----
Originally this transform just selected the antisymmetric spatial modes (imaginary component),
but in order to be consistent with all other time transforms, I will actually apply the normalization
constant associated with a forward in time transformation. The reason for this is for comparison
of states between different subclasses.
"""
return np.tile(np.eye(self.m - 2), (self.n, 1))
def _time_transform_matrix(self):
"""
Overwrite of parent method
Notes
-----
Input state is [N, M-2] dimensional array which is to be sliced to return only the last row.
N * (M-2) repeats of modes coming in, M-2 coming out, so M-2 rows.
"""
dft_mat = np.tile(np.eye(self.m - 2), (1, self.n))
dft_mat[:, self.m - 2 :] = 0
return dft_mat
def _time_transform(self, array=False, inplace=False):
"""
Overwrite of parent method
Notes
-----
Taking the RFFT, with orthogonal normalization, of a constant time series defined on N points is equivalent
to multiplying the constant value by sqrt(N). This is because the transform sums over N repeats of the same
value and then divides by 1/sqrt(N). i.e. (N * C)/sqrt(N) = sqrt(N) * C. Therefore we can save some time by
just doing this without calling the rfft function.
This always returns a single instant in time, for solving purposes; if N != 1 then computations will work
but will be much less efficient.
Originally wanted to include normalization but it just screws things up given how the modes are truncated.
"""
# Select the nonzero (imaginary) components of modes and transform in time (w.r.t. axis=0).
if inplace:
self.state = self.state[np.newaxis, 0, :]
self.basis = "modes"
if array:
return self.state
else:
return self
else:
modes = self.state[np.newaxis, 0, :]
if array:
return modes
else:
return self.__class__(
**{**vars(self), "state": modes, "basis": "modes"}
)
def _inv_time_transform(self, array=False, inplace=False):
"""
Overwrite of parent method
Notes
-----
Taking the IRFFT, with orthogonal normalization is equivalent to dividing by the normalization constant; because
there would only be
"""
spatial_modes = np.tile(self.state[None, 0, :], (self.n, 1))
if array:
return spatial_modes
else:
return self.__class__(
**{**vars(self), "state": spatial_modes, "basis": "spatial_modes"}
)
def _jac_lin(self):
"""
Extension of the OrbitKS method that includes the term for spatial translation symmetry.
"""
return (
self._dx_matrix(order=2)
+ self._dx_matrix(order=4)
+ (-self.s / self.t) * self._dx_matrix()
)
def _jacobian_parameter_derivatives_concat(self, jac_):
"""
Concatenate parameter partial derivatives to Jacobian matrix
Parameters
----------
jac_ : np.ndArray,
(N-1) * (M-2) dimensional array resultant from taking the derivative of the spatioatemporal mapping
with respect to Fourier modes.
Returns
-------
Jacobian augmented with parameter partial derivatives as columns. Required to solve for changes to period,
space period in optimization process. Makes the system rectangular; needs to be solved by least squares type
methods.
"""
# If period is not fixed, need to include dF/dt in jacobian matrix
if not self.constraints["t"]:
time_period_derivative = (
(-1.0 / self.t)
* (-self.s / self.t)
* self.dx(array=True).reshape(-1, 1)
)
jac_ = np.concatenate((jac_, time_period_derivative), axis=1)
# If spatial period is not fixed, need to include dF/dx in jacobian matrix
if not self.constraints["x"]:
self_field = self.transform(to="field")
spatial_period_derivative = (
(-2.0 / self.x) * self.dx(order=2, array=True)
+ (-4.0 / self.x) * self.dx(order=4, array=True)
+ (-1.0 / self.x) * self_field._nonlinear(self_field, array=True)
)
jac_ = np.concatenate(
(jac_, spatial_period_derivative.reshape(-1, 1)), axis=1
)
if not self.constraints["s"]:
spatial_shift_derivatives = (-1.0 / self.t) * self.dx(array=True)
jac_ = np.concatenate(
(jac_, spatial_shift_derivatives.reshape(-1, 1)), axis=1
)
return jac_
def swap_modes(modes, axis=0):
"""
Function which swaps real, imaginary components of arrays for SO(2) differentiation
Parameters
----------
modes : np.ndarray
Array of Fourier modes,
axis : int
Whether to swap along time (0) or space(1) dimension
Notes
-----
Both real and imaginary components are stored as real numbers. See class transforms for details.
"""
if axis == 1:
m = modes.shape[1] // 2
t_dim = modes.shape[0]
swapped_modes = np.concatenate(
(modes[:, -m:].reshape(t_dim, -1), modes[:, :-m].reshape(t_dim, -1)), axis=1
)
else:
n = (modes.shape[0] + 1) // 2 - 1
# do not need the special case as .dt() shouldn't be used for either subclass mentioned above.
swapped_modes = np.concatenate(
(modes[None, 0, :], modes[-n:, :], modes[1:-n, :]), axis=0
)
return swapped_modes
@lru_cache()
def so2_generator(order):
"""
Powers of the generator of the SO(2) Lie algebra
Parameters
----------
order : int
Order of the derivative for which this function is called to produce frequencies for.
Returns
-------
np.ndarray : 2x2 array
Equal to generator to the n-th power.
"""
return np.linalg.matrix_power(np.array([[0, -1], [1, 0]]), np.mod(order, 4))
@lru_cache()
def so2_coefficients(order):
"""
Non-zero elements of the Lie algebra generator to the order-th power
Parameters
----------
order : int
Order of the derivative for which this function is called to produce frequencies for.
Returns
-------
np.ndarray :
(2,) ndarray of correct powers of -1 for differentiation
"""
if (order % 4) == 0:
return 1, 1
elif (order % 4) == 1:
return 1, -1
elif (order % 4) == 2:
return -1, -1
elif (order % 4) == 3:
return -1, 1
@lru_cache()
def temporal_frequencies(t, n, order):
"""
Matrix/rank 2 tensor of temporal mode frequencies
Parameters
----------
t : float
Temporal period
n : int
Temporal discretization size
order : int
The order of the derivative/power of the frequencies desired.
Returns
----------
dtn_multipliers : np.ndarray
Array of spatial frequencies in the same shape as modes
Notes
-----
Creates and returns a rank 2 tensor whose elements are the properly ordered temporal frequencies,
which is the same shape as the spatiotemporal Fourier mode state. The elementwise product
with a set of spatiotemporal Fourier modes.
"""
# Extra factor of -1 because of time ordering in array.
# Extra factor of -1 because of time ordering in array.
w = (-1 * (2 * pi * n / t) * rfftfreq(n)[1:-1]) ** order
# Coefficients which depend on the order of the derivative, see SO(2) generator of rotations for reference.
c1, c2 = so2_coefficients(order)
# The Nyquist frequency is never included, this is how time frequency modes are ordered.
# Elementwise product of modes with time frequencies is the spectral derivative.
return np.concatenate(([0], c1 * w, c2 * w))[:, np.newaxis]
@lru_cache()
def spatial_frequencies(x, m, order):
"""
Matrix/rank 2 tensor of spatial mode frequencies
Parameters
----------
x : float
Spatial period
m : int
Spatial discretization size
order : int
The order of the derivative/power of the frequencies desired.
Returns
----------
dxn_multipliers : np.ndarray
Array of spatial frequencies in the same shape as modes
Notes
-----
Creates and returns a rank 2 tensor whose elements are the properly ordered spatial frequencies,
which is the same shape as the spatiotemporal Fourier mode state. The elementwise product
with a set of spatiotemporal Fourier modes.
"""
# Elementwise multiplication of modes with frequencies, this is the derivative.
q = ((2 * pi * m / x) * rfftfreq(m)[1:-1]) ** order
# Coefficients which depend on the order of the derivative, see SO(2) generator of rotations for reference.
c1, c2 = so2_coefficients(order)
# spatial frequency array, reshaped for broadcasting.
return np.concatenate((c1 * q, c2 * q))[None, :]
@lru_cache()
def dxn_block(x, m, order):
"""
Block diagonal matrix of spatial frequencies
Parameters
----------
x : float
spatial period
m : int
spatial discretization size.
order : int
Order of the desired derivative.
Returns
-------
np.ndarray :
Two dimensional block diagonal or diagonal array with SO(2) generator for multiple Fourier modes.
Notes
-----
This is the SO(2) generator for multiple Fourier modes. Only used in explicit construction of matrices.
"""
qkn = ((2 * pi * m / x) * rfftfreq(m)[None, 1:-1]) ** order
return np.kron(so2_generator(order), np.diag(qkn.ravel()))
@lru_cache()
def dtn_block(t, n, order):
"""
Block diagonal matrix of temporal frequencies
Parameters
----------
t : float
Temporal period
n : int
Temporal discretization size.
order : int
Order of the desired derivative.
Returns
-------
np.ndarray :
Two dimensional block diagonal or diagonal array with SO(2) generator for multiple Fourier modes.
Notes
-----
This is the SO(2) generator for multiple Fourier modes. Only used in explicit construction of matrices.
"""
wjn = (-(2 * pi * n / t) * rfftfreq(n)[1:-1, None]) ** order
return np.kron(so2_generator(order), np.diag(wjn.ravel()))