# SPDX-License-Identifier: GPL-3.0-or-later
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import figure
from numpy import complex128, float64
from numpy.typing import NDArray
from scipy.signal import hilbert
from scipy.stats import rayleigh
import deltares_wave_toolbox.cores.core_engine as core_engine
import deltares_wave_toolbox.cores.core_spectral as core_spectral
import deltares_wave_toolbox.cores.core_time as core_time
import deltares_wave_toolbox.cores.core_wavefunctions as core_wavefunctions
import deltares_wave_toolbox.spectrum as spectrum
[docs]
class WaveHeights:
"""The WaveHeights class
Parameters
----------
hwave : NDArray[float64]
1D array containing the wave heights of the individual waves [m]
twave : NDArray[float64]
1D array containing the periods of the individual waves [s]
"""
def __init__(self, hwave: NDArray[float64], twave: NDArray[float64]) -> None:
hwave, _ = core_engine.convert_to_vector(hwave)
twave, _ = core_engine.convert_to_vector(twave)
self.hwave = hwave
self.nwave = len(hwave)
self.twave = twave
def __str__(self) -> str:
return f"Series with {self.nwave} elements"
def __repr__(self) -> str:
return f"{type(self).__name__} (series nt = {self.nwave})"
[docs]
def sort(self) -> None:
"""Sort wave heights and wave periods
Sorts the wave height and wave period.
The sorting is done such that in hWaveSorted the wave heights of hWave
are sorted in descending order. This same sorting is applied to
tWave.
"""
self.hwave, self.twave = core_time.sort_wave_params(self.hwave, self.twave)
[docs]
def get_Hrms(self) -> float:
"""Compute root-mean-squared wave height
Returns
-------
float
Hrms
"""
return np.sqrt(np.mean(self.hwave**2))
[docs]
def get_Hmax(self) -> float64:
"""Return maximum wave height
Returns
-------
float64
Hmax
"""
return np.max(self.hwave)
[docs]
def get_Hs(self) -> tuple[float, float]:
"""Compute significant wave height & associated period
Hs (significant wave height) is the average of the highest 1/3 of the waves
Returns
-------
tuple[float, float]
Hs, Ts
"""
return self.highest_waves(1 / 3)
[docs]
def get_H2p_Rayleigh(self) -> float:
"""Compute 2% exceedance wave height assuming theoretical Rayleigh distribution
Returns
-------
float
H2p
"""
return self.get_Hs()[0] * rayleigh.ppf(0.98, scale=1 / np.sqrt(2)) / np.sqrt(2)
[docs]
def get_exceedance_waveheight(self, excPerc: float) -> float:
"""Computes wave height with given exceedance probability
This function computes the wave height hExcPerc with given exceedance probability percentage excPerc.
Parameters
----------
excPerc : float
exceedance probability percentage. excPerc = 2 means an exceedance percentage of 2%. The value of excPerc
should not exceed 100, or be smaller than 0 [%]
Returns
-------
float
wave height with given exceedance probability [m]
"""
self.hwave, self.twave = core_time.sort_wave_params(self.hwave, self.twave)
return core_time.exceedance_wave_height(hWaveSorted=self.hwave, excPerc=excPerc)
[docs]
def highest_waves(self, fracP: float) -> tuple[float, float]:
"""Computes wave parameters of selection largest waves
This function computes the wave height hFracP and wave period tFracP by taking the average of the fraction
fracP of the highest waves. When fracP = 1/3, then hFracP is equal to the significant wave height and tFracP
is equal to the significant wave period.
Parameters
----------
fracP : float
fraction. Should be between 0 and 1 [-]
Returns
-------
tuple[float, float]
hFracP : float
average of the wave heights of the highest fracP waves [m]
tFracP : float
average of the wave periods of the highest fracP waves [s]
"""
self.hwave, self.twave = core_time.sort_wave_params(self.hwave, self.twave)
hFracP, tFracP = core_time.highest_waves_params(
hWaveSorted=self.hwave, tWaveSorted=self.twave, fracP=fracP
)
return hFracP, tFracP
[docs]
def plot_exceedance_waveheight(self, savepath: str = "") -> figure.Figure:
"""Plot exceedances of wave heights
Parameters
----------
savepath : str, optional
path to save figure, by default ""
Returns
-------
figure.Figure
figure object
"""
self.hwave, self.twave = core_time.sort_wave_params(self.hwave, self.twave)
fig = plt.figure()
plt.plot(
self.hwave, np.linspace(0, self.nwave, self.nwave, self.nwave) / self.nwave
)
plt.yscale("log")
plt.grid("on")
plt.xlabel("Wave height [$m$]")
plt.ylabel("P exceedance ")
if savepath != "":
plt.savefig(savepath)
return fig
[docs]
def plot_exceedance_waveheight_Rayleigh(
self,
savepath: str = "",
normalize: bool = False,
plot_BG: bool = False,
water_depth: float = -1.0,
cota_slope: float = 250.0,
hm0: float = -1.0,
) -> figure.Figure:
"""Plot exceedances of wave heights compared to Rayleigh distribution
Parameters
----------
savepath : str, optional
path to save figure, by default ""
normalize : bool, optional
normalize wave heights with significant wave height (Hs), by default False
plot_BG : bool, optional
include theoretical Battjes & Groenendijk (2000) distribution in plot, by default False
water_depth : float, optional
water depth needed for Battjes & Groenendijk (2000), by default -1.0 [m]
cota_slope : float, optional
foreshore slope needed for Battjes & Groenendijk (2000), by default 250.0 [-]
hm0 : float, optional
spectral wave height needed for Battjes & Groenendijk (2000), by default -1.0 [m]
Returns
-------
figure.Figure
figure object
Raises
------
ValueError
Raised when plot_BG is True and Hm0 is not provided
"""
if normalize:
# Normalize with significant wave height
H_normalize = self.get_Hs()[0]
y_label = r"Normalized wave height $\frac{H_{i}}{H_{s}}$ [-]"
else:
# no normalization
H_normalize = 1
y_label = r"Wave height $H_{i}$ [$m$]"
Rayleigh_x = np.sqrt(-np.log(np.arange(1, self.nwave + 1) / self.nwave))
H2p_Rayleigh = self.get_H2p_Rayleigh()
Rayleigh_theoretical_dist = H2p_Rayleigh * np.sqrt(
np.log(1 - np.arange(self.nwave, 0, -1) / (self.nwave + 1)) / np.log(0.02)
)
self.hwave, self.twave = core_time.sort_wave_params(self.hwave, self.twave)
fig = plt.figure()
plt.plot(
Rayleigh_x,
Rayleigh_theoretical_dist / H_normalize,
label="Theoretical Rayleigh distribution",
)
if plot_BG:
if hm0 == -1.0:
raise ValueError(
"Please provide Hm0 when using Battjes & Groenendijk distribution"
)
else:
(
hwave_BG,
Pexceedance_BG,
) = core_wavefunctions.compute_BattjesGroenendijk_wave_height_distribution(
hm0, self.nwave, water_depth, cota_slope=cota_slope
)
plt.plot(
np.sqrt(-np.log(Pexceedance_BG)),
hwave_BG / H_normalize,
label="Battjes & Groenendijk (2000) distribution",
)
plt.plot(Rayleigh_x, self.hwave / H_normalize, label="Wave height distribution")
plt.grid("on")
plt.xlabel(r"$P_{exceedance}$ [$\%$]")
plt.ylabel(y_label)
plt.legend()
xtick_positions = np.array([100, 50, 20, 10, 5, 2, 1, 0.1, 0.01, 0.001])
ax = plt.gca()
ax.set_xticks(
rayleigh.ppf(1 - xtick_positions / 100, scale=1 / np.sqrt(2)),
labels=xtick_positions,
)
if savepath != "":
plt.savefig(savepath)
return fig
[docs]
def plot_hist_waveheight(self, savepath: str = "") -> figure.Figure:
"""Plot Histogram of wave heights
Parameters
----------
savepath : str, optional
path to save figure, by default ""
Returns
-------
figure.Figure
figure object
"""
fig = plt.figure()
plt.hist(self.hwave, label="Distribution")
plt.grid("on")
plt.xlabel("H [$m$]")
plt.ylabel("P ")
plt.legend()
if savepath != "":
plt.savefig(savepath)
return fig
[docs]
class Series(WaveHeights):
"""The wave Series class
Contains a time series (typically) of water level elevations. Inherits from WaveHeights class.
Parameters
----------
time : NDArray[float64]
1D array containing time axis. The numbers in the array t must be increasing and uniformly spaced
(uniform time step) [s]
xTime : NDArray[float64]
1D array containing signal values, i.e. the time series of the signal. The value xTime(i) must be the signal
value at time t(i). Usually water surface elevation [m]
"""
def __init__(self, time: NDArray[float64], xTime: NDArray[float64]) -> None:
time, tSize = core_engine.convert_to_vector(time)
xTime, xSize = core_engine.convert_to_vector(xTime)
assert tSize[0] == xSize[0], "Input error: array sizes differ in dimension"
assert np.var(np.diff(time)) < np.mean(
np.diff(time) / 100
), "Input error: time step is not equidistant"
self.time = time
self.xTime = xTime
self.nt = len(time)
self.dt = np.mean(np.diff(self.time))
[
hWave,
tWave,
_,
_,
_,
_,
_,
] = self._determine_individual_waves()
super().__init__(hWave, tWave)
def __str__(self) -> str:
return f"Series with {self.nt} elements"
def __repr__(self) -> str:
return f"{type(self).__name__} (series nt = {self.nt})"
[docs]
def get_crossing(self, typeCross: str = "down") -> tuple[int, NDArray[float64]]:
"""Get zero crossings form time series
Determine either the zero up- or down-crossings of the time series.
Parameters
----------
typeCross : str, optional
Search for up- or down-crossings, by default "down"
Returns
-------
tuple[int, NDArray[float64]]
nWave : int
Number of waves in the signal, where one wave corresponds to two successive zero-crossings. Wave i
starts at time tCross(i), and end at time tCross(i+1) [-]
tCross : NDArray[float64]
1D array of length (nWave+1), containing the time of all zero-crossings. The time of the
zero-crossings is determined by linear interpolation. Note that in case of no zero-crossing, the
array tCross is empty. Note that in case of one zero-crossing, the number of waves is zero. [s]
"""
nWave, tCross = core_time.determine_zero_crossing(
t=self.time, xTime=self.xTime, typeCross=typeCross
)
return nWave, tCross
[docs]
def get_skewness(self) -> tuple[float, float]:
"""Compute skewness
Sk is skewness of the signal
Returns
-------
float
Sk
"""
Sk = np.mean((self.xTime - np.mean(self.xTime)) ** 3) / np.mean(
(self.xTime - np.mean(self.xTime)) ** 2
) ** (1.5)
return Sk
[docs]
def get_asymmetry(self) -> tuple[float, float]:
"""Compute asymmetry
As is asymmetry of the signal
Returns
-------
float
As
"""
H = np.imag(hilbert(self.xTime))
As = np.mean(H**3) / np.mean((self.xTime - np.mean(self.xTime)) ** 2) ** (1.5)
return As
[docs]
def get_spectrum(
self,
nperseg: int = 256,
noverlap: int = 0,
nfft: int = 0,
windows_type: str = "hann",
dfDesired: float = 0.01,
use_dfDesired: bool = True,
) -> spectrum.Spectrum:
"""create spectrum
Parameters
----------
nperseg : int, optional
Length of each segment, by default None
noverlap : int, optional
number of points in overlap, by default None
nfft : int, optional
length of fft, by default None
window_type : str, optional
window type, by default "hann"
Returns
-------
spectrum.Spectrum
Spectrum in spectrum object
"""
if use_dfDesired:
nperseg = np.round((1 / self.dt) / dfDesired).astype(int)
noverlap = np.round(nperseg / 2).astype(int)
[f, S] = core_spectral.compute_spectrum_welch_wrapper(
self.xTime,
dt=self.dt,
nperseg=nperseg,
noverlap=noverlap,
nfft=nfft,
window_type=windows_type,
)
return spectrum.Spectrum(f, S)
[docs]
def get_spectrum_raw(self, dfDesired: float = 0.01) -> spectrum.Spectrum:
"""create spectrum with a desired frequency, but without applying segments
Parameters
----------
dfDesired : float, optional
desired frequency spacing in Hertz on which the wave spectrum must be computed.
If this parameter is omitted, then dfDesired = f(1) - f(0), by default 0.01 [Hz]
Returns
-------
spectrum.Spectrum
Spectrum in spectrum object
"""
[f, S] = core_spectral.compute_spectrum_time_series(
self.time, self.xTime, dfDesired
)
return spectrum.Spectrum(f, S)
[docs]
def max(self) -> float:
return np.max(self.xTime)
[docs]
def min(self) -> float:
return np.min(self.xTime)
[docs]
def mean(self) -> float:
return np.mean(self.xTime)
[docs]
def var(self) -> float:
return np.var(self.xTime)
[docs]
def get_bandpassfilter(self, fmin: float, fmax: float):
"""get bandpassfiltered signal
Parameters
----------
fmin : lower frequency for bandpassfilter
fmax : higher frequency for bandpassfilter
Returns
-------
series.Series
Series in series object
"""
xTimeFiltered = core_spectral.bandpassfilter(self.time, self.xTime, fmin, fmax)
xTimeFiltered = Series(self.time, xTimeFiltered)
return xTimeFiltered
[docs]
def get_fourier_comp(
self,
) -> tuple[NDArray[float64], NDArray[complex128], bool]:
"""get Fourier components from series
Returns
-------
tuple[NDArray[float64], NDArray[complex128], bool]
f : NDArray[float64]
1D array containing frequency values, for folded Fourier transform. The frequency axis runs from 0 to
the Nyquist frequency. The number of elements in array f is close to half the number of elements in
array xTime. To be precise, length(f) = floor(nT/2) + 1, with nT the number of elements in array
xTime [Hz]
xFreq : NDArray[complex128]
1D array (complex!) containing the folded Fourier coefficients of xTime. The value xFreq(i) must be the
Fourier coefficient at frequency f(i). The number of elements in f and xFreq are the same.
isOdd : bool
logical indicating whether nT, the number of time points in xTime, is even (isOdd=False) or odd
(isOdd=True)
"""
f, xFreq, isOdd = core_spectral.time2freq_nyquist(self.time, self.xTime)
return f, xFreq, isOdd
def _determine_individual_waves(
self, typeCross: str = "down", fcutoff=0, minimum_wave=0
) -> tuple[
NDArray[float64],
NDArray[float64],
NDArray[float64],
NDArray[float64],
NDArray[float64],
NDArray[float64],
]:
"""determine individual waves in series
Parameters
----------
typeCross : str, optional
Search for up- or down-crossings, by default "down"
Returns
-------
tuple[ NDArray[float64], NDArray[float64], NDArray[float64], NDArray[float64], NDArray[float64],
NDArray[float64], ]
tWave : NDArray[float64]
1D array containing the periods of the individual waves [s]
hWave : NDArray[float64]
1D array containing the wave heights of the individual waves [m]
aCrest : NDArray[float64]
1D array containing the maximum amplitude of the crests of the individual waves [m]
aTrough : NDArray[float64]
1D array containing the maximum amplitude of the troughs of the individual waves [m]
tCrest : NDArray[float64]
1D array containing the time at which maximum crest amplitude of the individual waves occurs [s]
tTrough : NDArray[float64]
1D array containing the time at which maximum trough amplitude of the individual waves occurs [s]
Notes
-----
* All these arrays have a length equal to nWave, which is the number of waves in the wave train
* The values of aTrough are always smaller than zero
* hWave = aCrest - aTrough
"""
if fcutoff != 0:
xtime_low = core_spectral.bandpassfilter(
t=self.time, xTime=self.xTime, flow=0, fhigh=fcutoff
)
else:
xtime_low = 0
_, tCross = core_time.determine_zero_crossing(
t=self.time, xTime=self.xTime - xtime_low, typeCross=typeCross
)
(
hWave,
tWave,
aCrest,
aTrough,
tCrest,
tTrough,
) = core_time.determine_params_individual_waves(
tCross=tCross, t=self.time, xTime=self.xTime
)
# remove waves lower than minimum_wave
remove_index = np.where(hWave < minimum_wave)[0]
self.tCross = np.delete(tCross, remove_index)
hWave = np.delete(hWave, remove_index)
tWave = np.delete(tWave, remove_index)
aCrest = np.delete(aCrest, remove_index)
aTrough = np.delete(aTrough, remove_index)
tCrest = np.delete(tCrest, remove_index)
tTrough = np.delete(tTrough, remove_index)
super().__init__(hWave, tWave) # set values here instead of init
return hWave, tWave, aCrest, aTrough, tCrest, tTrough, tCross
[docs]
def plot(self, savepath: str = "", plot_crossing: bool = False) -> figure.Figure:
"""Plot Series
Parameters
----------
savepath : str, optional
path to save figure, by default ""
plot_crossing : bool, optional
plot zero crossings in figure, by default False
Returns
-------
figure.Figure
figure object
"""
fig = plt.figure()
plt.plot(self.time, self.xTime, label="series")
if plot_crossing and hasattr(self, "tCross"):
plt.plot(self.tCross, np.asarray(self.tCross) * 0, "ro", label="crossing")
plt.grid("on")
plt.xlabel("time [$s$]")
plt.ylabel("z [$m$]")
plt.legend()
if savepath != "":
plt.savefig(savepath)
return fig