Source code for pysdkit._vmd.mvmd
# -*- coding: utf-8 -*-
"""
Created on 2024/6/1 18:44
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
"""
import numpy as np
from typing import Optional, Tuple, Union
from .base import Base
[docs]
class MVMD(Base):
"""
Multivariate Variational mode decomposition, object-oriented interface.
ur Rehman, Naveed and Aftab, Hania (2019) 'Multivariate Variational Mode Decomposition',
IEEE Transactions on Signal Processing, 67(23), pp. 6039–6052.
Python code: https://github.com/yunyueye/MVMD
MATLAB code: https://www.mathworks.com/matlabcentral/fileexchange/72814-multivariate-variational-mode-decomposition-mvmd
"""
[docs]
def __init__(
self,
alpha: float,
K: int,
tau: float,
init: str = "zero",
DC: bool = False,
tol: float = 1e-7,
max_iter: int = 100,
) -> None:
"""
Multivariate Variational Mode Decomposition (MVMD) algorithm.
:param alpha: float
The balancing parameter of the data-fidelity constraint, controlling the trade-off
between the smoothness of the modes and the accuracy of the reconstruction.
:param K: int
The number of modes to be recovered, determining how many intrinsic mode functions
are decomposed from the input signal.
:param tau: float
The time-step of the dual ascent optimization algorithm. Setting tau to 0 applies a
noise-slack variable that aids in noise robustness.
:param init: str, optional
Initialization method for the center frequencies of the modes. Can be 'zero' (all omegas
start at 0 frequency), 'uniform' (uniformly distributed), or 'random' (random values).
Defaults to 'zero'.
:param DC: bool, optional
If True, the first mode is constrained to zero frequency, extracting the mean (DC component)
of the signal. Defaults to False.
:param tol: float, optional
Tolerance for convergence. The algorithm stops when the difference between spectra
across iterations is below this threshold. Typically around 1e-6 to 1e-7. Defaults to 1e-7.
:param max_iter: int, optional
Maximum number of iterations if convergence is not reached. Defaults to 100.
Attributes:
-----------
DTYPE : numpy.dtype
The data type for internal computations, set to numpy.complex64 to optimize
performance by reducing memory and computation requirements.
"""
super().__init__()
self.alpha = alpha
self.K = K
self.tau = tau
self.init = init.lower()
self.DC = DC
self.tol = tol
self.max_iter = max_iter
self.DTYPE = np.complex64
[docs]
def __call__(
self, signal: np.ndarray, return_all: bool = False
) -> Optional[np.ndarray]:
"""allow instances to be called like functions"""
return self.fit_transform(signal=signal, return_all=return_all)
[docs]
def __str__(self) -> str:
"""Get the full name and abbreviation of the algorithm"""
return "Multivariate Variational mode decomposition (MVMD)"
def __init_omega(self, fs: float) -> np.ndarray:
"""Initialization of omega_k"""
omega_plus = np.zeros(shape=(self.max_iter, self.K), dtype=self.DTYPE)
if self.init == "uniform":
for i in range(1, self.K + 1):
omega_plus[0, i - 1] = (0.5 / self.K) * (i - 1)
elif self.init == "random":
omega_plus[0, :] = np.sort(
np.exp(np.log(fs))
+ (np.log(0.5) - np.log(fs)) * np.random.rand(1, self.K)
)
else:
omega_plus[0, :] = 0
# Processing the DC component of the signal
if self.DC is True:
omega_plus[0, 0] = 0
return omega_plus
[docs]
def fit_transform(
self, signal: np.ndarray, return_all: bool = False
) -> Union[Tuple[np.ndarray, np.ndarray, np.ndarray], np.ndarray]:
"""
Multivariate signal decomposition using MVMD algorithm
:param signal: the time domain signal (ndarray) to be decomposed
:param return_all: Whether to return all results of the algorithm, False only return the collection of decomposed modes
:return: - u - the collection of decomposed modes, shape: [K, length, num_channels]
- u_hat - spectra of the modes,
- omega - estimated mode center-frequencies
"""
# Get the number of channels and the length of the input signal
C, T = signal.shape
# Get the sampling frequency of the signal
fs = 1.0 / float(T)
# Perform signal mirroring expansion
fMirr = self.multi_fmirror(ts=signal, C=C, T=T)
# Get the new length of the signal
T = int(fMirr.shape[1])
t = np.linspace(1.0 / float(T), 1, int(T))
# Discretization in the frequency domain
freqs = t - 0.5 - 1 / T
# Each mode has its own alpha
Alpha = self.alpha * np.ones(self.K, dtype=self.DTYPE)
# Construct and center f_hat
f_hat = self.fftshift(ts=self.fft(ts=fMirr))
f_hat_plus = f_hat
f_hat_plus[:, 0 : int(T / 2)] = 0
# matrix keeping track of every iterant // could be discarded for mem
u_hat_plus = np.zeros(
shape=(self.max_iter, len(freqs), self.K, C), dtype=self.DTYPE
)
omega_plus = self.__init_omega(fs=fs)
# Start with empty dual variables
lambda_hat = np.zeros(shape=(self.max_iter, len(freqs), C), dtype=self.DTYPE)
# Initialization of omega_k
uDiff = self.tol + np.spacing(1)
# Loop counter
n = 1
# Accumulator
sum_uk = np.zeros(shape=(len(freqs), C))
"""Start main loop"""
while uDiff > self.tol and n < self.max_iter - 1:
# update first mode accumulator
k = 1
sum_uk = (
u_hat_plus[n - 1, :, self.K - 1, :]
+ sum_uk
- u_hat_plus[n - 1, :, 0, :]
)
# update spectrum of first mode through Wiener filter of residuals
for c in range(C):
u_hat_plus[n, :, k - 1, c] = (
f_hat_plus[c, :] - sum_uk[:, c] - lambda_hat[n - 1, :, c] / 2
) / (1 + Alpha[k - 1] * np.square(freqs - omega_plus[n - 1, k - 1]))
# update first omega if not held at 0
if self.DC is False:
omega_plus[n, k - 1] = np.sum(
np.matmul(
np.expand_dims(freqs[T // 2 : T], axis=0),
np.square(np.abs(u_hat_plus[n, T // 2 : T, k - 1, :])),
)
) / np.sum(np.square(np.abs(u_hat_plus[n, T // 2 : T, k - 1, :])))
for k in range(2, self.K + 1):
# update mode accumulator
sum_uk = (
u_hat_plus[n, :, k - 2, :] + sum_uk - u_hat_plus[n - 1, :, k - 1, :]
)
# update mode spectrum
for c in range(C):
u_hat_plus[n, :, k - 1, c] = (
f_hat_plus[c, :] - sum_uk[:, c] - lambda_hat[n - 1, :, c] / 2
) / (1 + Alpha[k - 1] * np.square(freqs - omega_plus[n - 1, k - 1]))
# center frequencies
omega_plus[n, k - 1] = np.sum(
np.matmul(
np.expand_dims(freqs[T // 2 : T], axis=0),
np.square(np.abs(u_hat_plus[n, T // 2 : T, k - 1, :])),
)
) / np.sum(np.square(np.abs(u_hat_plus[n, T // 2 : T, k - 1, :])))
# Dual ascent
lambda_hat[n, :, :] = lambda_hat[n - 1, :, :] + self.tau * np.sum(
u_hat_plus[n, :, :, :], axis=1
)
# loop counter
n = n + 1
# Determine whether the algorithm converges
uDiff = np.spacing(1)
for i in range(1, self.K + 1):
uDiff = uDiff + 1 / float(T) * np.dot(
u_hat_plus[n - 1, :, i - 1, :] - u_hat_plus[n - 2, :, i - 1, :],
np.conj(
u_hat_plus[n - 1, :, i - 1, :] - u_hat_plus[n - 2, :, i - 1, :]
).T,
)
uDiff = np.sum(np.abs(uDiff))
N = min(n, self.max_iter)
omega = omega_plus[0:N, :]
# Signal reconstruction
u_hat = np.zeros(shape=(T, self.K, C), dtype=self.DTYPE)
for c in range(C):
u_hat[T // 2 : T, :, c] = np.squeeze(u_hat_plus[N - 1, T // 2 : T, :, c])
second_index = list(range(1, T // 2 + 1))
second_index.reverse()
u_hat[second_index, :, c] = np.squeeze(
np.conj(u_hat_plus[N - 1, T // 2 : T, :, c])
)
u_hat[0, :, c] = np.conj(u_hat[-1, :, c])
u = np.zeros(shape=(self.K, len(t), C), dtype=self.DTYPE)
for k in range(1, self.K + 1):
for c in range(C):
u[k - 1, :, c] = (self.ifft(self.ifftshift(u_hat[:, k - 1, c]))).real
# remove mirror part
u = u[:, T // 4 : 3 * T // 4, :]
# recompute spectrum
u_hat = np.zeros(shape=(T // 2, self.K, C), dtype=self.DTYPE)
for k in range(1, self.K + 1):
for c in range(C):
u_hat[:, k - 1, c] = self.fftshift(
ts=self.fft(ts=u[k - 1, :, c])
).conj()
# ifftshift
u = np.fft.ifftshift(u, axes=-1)
if return_all is True:
return u.real, u_hat, omega
else:
return u.real