Source code for pysdkit._vncmd.vncmd

# -*- coding: utf-8 -*-
"""
Created on Sat Mar 18 12:11:34 2024
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com

MATLAB code source https://www.mathworks.com/matlabcentral/fileexchange/64292-variational-nonlinear-chirp-mode-decomposition
"""
import numpy as np
from numpy.linalg import solve, norm
from scipy.sparse import diags, eye

try:
    from scipy.integrate import cumulative_trapezoid
except ImportError:
    from scipy.integrate import cumtrapz as cumulative_trapezoid
from typing import Tuple, Optional
from pysdkit._vmd.base import Base


[docs] class VNCMD(Base): """ Variational Nonlinear Chirp Mode Decomposition Chen S, Dong X, Peng Z, et al. Nonlinear chirp mode decomposition: A variational method[J]. IEEE Transactions on Signal Processing, 2017, 65(22): 6024-6037. """
[docs] def __init__( self, eIF: Optional[np.ndarray] = None, fs: Optional[float] = None, alpha: float = 3e-4, beta: float = 1e-9, var: float = 1.0, max_iter: int = 300, tol: float = 1e-5, dtype: np.dtype = np.float64, ): """ :param eIF: initial instantaneous frequency (IF) time series for all the signal modes; each row of eIF corresponds to the IF of each mode :param fs: sampling frequency :param alpha: penalty parameter controling the filtering bandwidth of VNCMD;the smaller the alpha is, the narrower the bandwidth would be :param beta: penalty parameter controling the smooth degree of the IF increment during iterations;the smaller the beta is, the more smooth the IF increment would be :param var: the variance of the Gaussian white noise; if we set var to zero, the noise variable u (see the following code) will be dropped. :param max_iter: the maximum number of iterations :param tol: tolerance of convergence criterion; typically 1e-7, 1e-8, 1e-9... :param dtype: data types used by all operations """ self.fs = fs self.eIF = eIF if self.eIF is None: self.K, self.N = None, None else: self.K, self.N = self.eIF.shape[0], self.eIF.shape[1] self.alpha = alpha self.beta = beta self.var = var self.max_iter = max_iter self.tol = tol # 保存本次运算结果 self.IFmset = None self.smset = None self.IA = None self.DTYPE = dtype
[docs] def __call__(self, signal: np.ndarray, eIF: Optional[np.ndarray] = None): """allow instances to be called like functions""" return self.fit_transform(signal=signal, eIF=eIF)
[docs] def __str__(self) -> str: """Get the full name and abbreviation of the algorithm""" return "Variational Nonlinear Chirp Mode Decomposition (VNCMD)"
[docs] @staticmethod def projec(vec: np.ndarray, var: float) -> np.ndarray: """ Projection operation. :param vec: The vector for projection. :param var: The variance of the noise. :return: numpy.ndarray: The projected vector. """ M = len(vec) e = np.sqrt(M * var) # Upper bound determined by the noise level u = vec.copy() # Copy the input vector to avoid modifying the original if np.linalg.norm(vec) > e: u = (e / np.linalg.norm(vec)) * vec return u
[docs] def difference_matrix(self, N: int) -> np.ndarray: """ Constructs an NxN second-order difference matrix. :param N: The size of the matrix. :return: The second-order difference matrix. """ # Create an K-element column vector filled with ones e = np.ones(N, dtype=self.DTYPE) # Create an K-element column vector filled with -2 e2 = -2 * np.ones(N, dtype=self.DTYPE) # Set the first element of e2 to -1 e2[0] = -1 # Set the last element of e2 to -1 e2[-1] = -1 # Create an NxN matrix filled with zeros oper = np.zeros((N, N), dtype=self.DTYPE) # Fill the main diagonal with e2 np.fill_diagonal(oper, e2) # Fill the first upper diagonal with 1s, leaving the last element np.fill_diagonal(oper[1:], e[:-1]) # Fill the first lower diagonal with 1s, leaving the last element np.fill_diagonal(oper[:, 1:], e[:-1]) return oper
[docs] def differ(self, y: np.ndarray, delta: float) -> np.ndarray: """ Compute the derivative of a discrete time series y. :param y: The input time series. :param delta: The sampling time interval of y. :return: numpy.ndarray: The derivative of the time series. """ L = len(y) ybar = np.zeros(L - 2, dtype=self.DTYPE) for i in range(1, L - 1): ybar[i - 1] = (y[i + 1] - y[i - 1]) / (2 * delta) # Prepend and append the boundary differences ybar = np.concatenate( ( np.array([(y[1] - y[0]) / delta], dtype=self.DTYPE), ybar, np.array([(y[-1] - y[-2]) / delta], dtype=self.DTYPE), ) ) return ybar
[docs] def init_K_N(self, eIF: Optional[np.ndarray]) -> Tuple[int, int, np.ndarray]: """Initialize the frequency and get its size""" if eIF is not None: K, N = eIF.shape[0], eIF.shape[1] elif self.eIF is None: raise ValueError() else: K, N = self.K, self.N eIF = self.eIF return K, N, eIF
[docs] def fit_transform(self, signal: np.ndarray, eIF: Optional[np.ndarray] = None): """ Execute VNCMD algorithm for signal decomposition :param signal: the time domain signal (1D numpy array) to be decomposed :param eIF: initial instantaneous frequency (IF) time series for all the signal modes; each row of eIF corresponds to the IF of each mode :return: - IFmset: the collection of the obtained IF time series of all the signal modes at each iteration - smset: the collection of the obtained signal modes at each iteration - IA: the finally estimated instantaneous amplitudes of the obtained signal modes """ signal = signal.astype(self.DTYPE) K, N, eIF = self.init_K_N(eIF=eIF) t = np.arange(0, N, dtype=self.DTYPE) / self.fs # Get the improved second-order difference matrix oper = self.difference_matrix(N) opedoub = np.dot(oper.T, oper) # Used to store the demodulated orthogonal signal components sinm, cosm = np.zeros((K, N), dtype=self.DTYPE), np.zeros( (K, N), dtype=self.DTYPE ) # Used to store the demodulated orthogonal signal components xm, ym = np.zeros((K, N), dtype=self.DTYPE), np.zeros((K, N), dtype=self.DTYPE) # Stores a collection of instantaneous frequency sequences of all signal modes obtained at each iteration IFsetiter = np.zeros((K, N, self.max_iter + 1), dtype=self.DTYPE) # Initialize the instantaneous frequency of the first iteration IFsetiter[:, :, 0] = eIF # Stores the collection of signal patterns obtained at each iteration ssetiter = np.zeros((K, N, self.max_iter + 1), dtype=self.DTYPE) # Lagrange multiplier, used to adjust constraints lamuda = np.zeros(N, dtype=self.DTYPE) # Initialize the variables defined above through loops for i in range(K): sinm[i, :] = np.sin( 2 * np.pi * cumulative_trapezoid(eIF[i, :], t, initial=0), dtype=self.DTYPE, ) cosm[i, :] = np.cos( 2 * np.pi * cumulative_trapezoid(eIF[i, :], t, initial=0), dtype=self.DTYPE, ) Bm = diags( sinm[i, :].T, offsets=0, shape=(N, N), dtype=self.DTYPE ).toarray() Bdoubm = diags( (sinm[i, :] ** 2).T, offsets=0, shape=(N, N), dtype=self.DTYPE ).toarray() Am = diags( cosm[i, :].T, offsets=0, shape=(N, N), dtype=self.DTYPE ).toarray() Adoubm = diags( (cosm[i, :] ** 2).T, offsets=0, shape=(N, N), dtype=self.DTYPE ).toarray() xm[i, :] = solve(2 / self.alpha * opedoub + Adoubm, np.dot(Am.T, signal)) ym[i, :] = solve(2 / self.alpha * opedoub + Bdoubm, np.dot(Bm.T, signal)) ssetiter[i, :, 0] = xm[i, :] * cosm[i, :] + ym[i, :] * sinm[i, :] # iteration counter iter = 0 # Indicates the change difference between the current iteration and the previous iteration sDif = self.tol + 1 # The cumulative sum sum_x, sum_y = np.sum(xm * cosm, axis=0), np.sum(ym * sinm, axis=0) sum_x, sum_y = np.squeeze(sum_x), np.squeeze(sum_y) # Start main loop while sDif > self.tol and iter <= self.max_iter: # Gradually increase betathr during the cycle but not exceed beta. betathr = 10 ** (iter / 36 - 10) if betathr > self.beta: betathr = self.beta u = self.projec( vec=signal - sum_x - sum_y - lamuda / self.alpha, var=self.var ) for i in range(K): Bm = diags(sinm[i, :].T, offsets=0, shape=(N, N)).toarray() Bdoubm = diags((sinm[i, :] ** 2).T, offsets=0, shape=(N, N)).toarray() Am = diags(cosm[i, :].T, offsets=0, shape=(N, N)).toarray() Adoubm = diags((cosm[i, :] ** 2).T, offsets=0, shape=(N, N)).toarray() # remove the relevant component from the sum sum_x = sum_x - xm[i, :] * cosm[i, :] xm[i, :] = np.linalg.solve( 2 / self.alpha * opedoub + Adoubm, np.dot(Am.T, (signal - sum_x - sum_y - u - lamuda / self.alpha).T), ) interx = xm[i, :] * cosm[i, :] sum_x = sum_x + interx sum_y = sum_y - ym[i, :] * sinm[i, :] ym[i, :] = np.linalg.solve( 2 / self.alpha * opedoub + Bdoubm, np.dot(Bm.T, (signal - sum_x - sum_y - u - lamuda / self.alpha).T), ) sum_y = sum_y + ym[i, :] * sinm[i, :] # compute the derivative of the functions xbar = self.differ(xm[i, :], self.fs) ybar = self.differ(ym[i, :], self.fs) # obtain the frequency increment by arctangent demodulation deltaIF = ( (xm[i, :] * ybar - ym[i, :] * xbar) / (xm[i, :] ** 2 + ym[i, :] ** 2) / 2 / np.pi ) # smooth the frequency increment by low pass filtering deltaIF = solve(2 / betathr * opedoub + eye(N), deltaIF.T) # update the IF eIF[i, :] = eIF[i, :] - 0.5 * deltaIF.T # update cos and sin functions sinm[i, :] = np.sin(2 * np.pi * cumtrapz(eIF[i, :], t, initial=0)) cosm[i, :] = np.cos(2 * np.pi * cumtrapz(eIF[i, :], t, initial=0)) # update sums sum_x = sum_x - interx + xm[i, :] * cosm[i, :] sum_y = sum_y + ym[i, :] * sinm[i, :] ssetiter[i, :, iter + 1] = xm[i, :] * cosm[i, :] + ym[i, :] * sinm[i, :] IFsetiter[:, :, iter + 1] = eIF # update the Lagrangian multiplier lamuda = lamuda + self.alpha * (u + sum_x + sum_y - signal) # restart scheme if norm(u + sum_x + sum_y - signal) > norm(signal): lamuda = np.zeros(shape=(1, len(t))) for i in range(K): Bm = diags(sinm[i, :].T, offsets=0, shape=(N, N)).toarray() Bdoubm = diags( (sinm[i, :] ** 2).T, offsets=0, shape=(N, N) ).toarray() Am = diags(cosm[i, :].T, offsets=0, shape=(N, N)).toarray() Adoubm = diags( (cosm[i, :] ** 2).T, offsets=0, shape=(N, N) ).toarray() xm[i, :] = solve( 2 / self.alpha * opedoub + Adoubm, np.dot(Am.T, signal) ) ym[i, :] = solve( 2 / self.alpha * opedoub + Bdoubm, np.dot(Bm.T, signal) ) ssetiter[i, :, iter + 1] = ( xm[i, :] * cosm[i, :] + ym[i, :] * sinm[i, :] ) sum_x, sum_y = np.sum(xm * cosm, axis=1), np.sum(ym * sinm, axis=1) # compute the convergence index sDif = 0 for i in range(K): sDif += ( norm(ssetiter[i, :, iter + 1] - ssetiter[i, :, iter]) / norm(ssetiter[i, :, iter]) ) ** 2 # Increase the number of push iterations iter += 1 print(sDif) self.IFmset = IFsetiter[:, :, 0:iter] self.smset = ssetiter[:, :, 0:iter] self.IA = np.sqrt(xm**2 + ym**2) # print(self.IFmset.shape, self.smset.shape) return self.IFmset[:, :, -1], self.smset[:, :, -1], self.IA