Source code for pysdkit._vmd2d.vmd2d
# -*- coding: utf-8 -*-
"""
Created on 2025/02/02 17:00:47
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
"""
import numpy as np
from pysdkit.utils import fft2d, ifft2d, fftshift, ifftshift
from typing import Optional, Union, Tuple
[docs]
class VMD2D(object):
"""
Variational Mode Decomposition for 2D Image
Konstantin, Dragomiretskiy, and Dominique Zosso. "Two-dimensional variational mode decomposition."
Energy Minimization Methods in Computer Vision and Pattern Recognition. Vol. 8932. 2015.
MATLAB code: https://www.mathworks.com/matlabcentral/fileexchange/45918-two-dimensional-variational-mode-decomposition?s_tid=srchtitle
"""
[docs]
def __init__(
self,
K: int,
alpha: int,
tau: Optional[float] = 0.0,
DC: Optional[bool] = False,
init: Optional[str] = "zero",
max_iter: Optional[int] = 3000,
tol: Optional[float] = 1e-7,
random_seed: Optional[int] = 42,
) -> None:
"""
:param K: the number of modes to be recovered
:param alpha: the balancing parameter for data fidelity constraint
:param tau: time-step of dual ascent ( pick 0 for noise-slack )
:param DC: true, if the first mode is put and kept at DC (0-freq)
:param init: zero means all omegas start at 0 and random means all omegas start initialized randomly
:param max_iter: the maximum number of iterations
:param tol: the tolerance parameter for data fidelity constraint
:param random_seed: random seed for randomly initialization
"""
self.K = K # 分解子信号的数目
# 调控分解效果的参数
self.alpha, self.tau, self.DC, self.init = alpha, tau, DC, init
# 算法停止迭代的准则
self.max_iter, self.tol = max_iter, tol
# 创建一个随机数生成器用于初始化
self.rng = np.random.RandomState(random_seed)
[docs]
def __call__(
self,
img: np.ndarray,
K: Optional[int] = None,
return_all: Optional[bool] = False,
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""allow instances to be called like functions"""
return self.fit_transform(img=img, K=K, return_all=return_all)
[docs]
def __str__(self) -> str:
"""Get the full name and abbreviation of the algorithm"""
return "Variational mode decomposition for 2D Images (VMD2D)"
[docs]
def init_omega(self, K: int) -> np.ndarray:
"""Initialization of omega_k"""
omega = np.zeros(shape=(self.max_iter, 2, K))
if self.init == "zero":
# spread omegas radially
if self.DC is True:
# if DC, keep first mode at 0, 0
maxK = K + 1
else:
maxK = K
for k in range(0, maxK):
omega[0, 0, k] = 0.25 * np.cos(np.pi * (k - 1) / maxK)
omega[0, 1, k] = 0.25 * np.sin(np.pi * (k - 1) / maxK)
elif self.init == "random":
# random on half-plane
for k in range(0, K):
omega[0, 0, k] = self.rng.rand() - 1 / 2
omega[0, 1, k] = self.rng.rand() / 2
if self.DC is True:
# DC component (if expected)
omega[0, 0, 0] = 0
omega[0, 1, 0] = 0
else:
raise ValueError("init must be 'zero' or 'random'")
return omega
[docs]
def fit_transform(
self,
img: np.ndarray,
K: Optional[int] = None,
return_all: Optional[bool] = False,
) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
Image decomposition using VMD algorithm
:param img: the input image of 2D ndarray in numpy
:param K: the maximum number of modes to be decomposed
:param return_all: Whether to return all results of the algorithm, False only return the collection of decomposed modes,
True plus the spectra of the modes and the estimated mode center-frequencies
:return: The extracted modes from the signals / images
"""
K = K if K is not None else self.K
# Resolution of image
Hy, Hx = img.shape
X, Y = np.meshgrid(np.arange(1, Hx + 1) / Hx, np.arange(1, Hy + 1) / Hy)
# Spectral domain discretization
fx = 1 / Hx
fy = 1 / Hy
freqs_1 = X - 0.5 - fx
freqs_2 = Y - 0.5 - fy
# For future generalizations: alpha might be individual for each mode
Alpha = self.alpha * np.ones(K)
# Construct img and image_hat
img_hat = fftshift(fft2d(img))
# Storage matrices for (Fourier) modes. All iterations are not recorded.
u_hat = np.zeros(shape=(Hy, Hx, K)) # 用于记录每个模态的分解结果
u_hat_old = u_hat.copy()
sum_uk = 0
# Storage matrices for (Fourier) Lagrange Multiplier
mu_hat = np.zeros(shape=(Hy, Hx))
# N iterations at most, 2 spatial coordinates, K clusters
omega = self.init_omega(K)
##### Main loop for iterative updates #####
# Stopping criteria tolerances
uDiff = self.tol + np.spacing(1)
omegaDiff = self.tol + np.spacing(1)
# first run
n = 0
# run until convergence or max number of iterations
while (uDiff > self.tol or omegaDiff > self.tol) and n < self.max_iter - 1:
# first things first
k = 0
# compute the halfplane mask for the 2D "analytic signal"
HilbertMask = (
np.sign(freqs_1 * omega[n, 0, k] + freqs_2 * omega[n, 1, k]) + 1
)
# update first mode accumulator
sum_uk = u_hat[:, :, -1] + sum_uk - u_hat[:, :, k]
# update first mode's spectrum through wiener filter (on half plane)
u_hat[:, :, k] = ((img_hat - sum_uk - mu_hat[:, :] / 2) * HilbertMask) / (
1
+ Alpha[k]
* ((freqs_1 - omega[n, 0, k]) ** 2 + (freqs_2 - omega[n, 1, k]) ** 2)
)
# update first mode's central frequency as spectral center of gravity
if self.DC is False:
omega[n + 1, 0, k] = np.sum(
np.sum(freqs_1 * (np.abs(u_hat[:, :, k]) ** 2))
) / np.sum(np.sum(np.abs(u_hat[:, :, k]) ** 2))
omega[n + 1, 1, k] = np.sum(
np.sum(freqs_2 * (np.abs(u_hat[:, :, k]) ** 2))
) / np.sum(np.sum(np.abs(u_hat[:, :, k]) ** 2))
# Keep omegas on same halfplane
if omega[n + 1, 1, k] < 0:
omega[n + 1, :, k] = -omega[n + 1, :, k]
# recover full spectrum from analytic signal
u_hat[:, :, k] = fftshift(
fft2d(np.real(ifft2d(ifftshift(np.squeeze(u_hat[:, :, k])))))
)
# work on other modes
for k in range(1, K):
# recompute Hilbert mask
HilbertMask = (
np.sign(freqs_1 * omega[n, 0, k] + freqs_2 * omega[n, 1, k]) + 1
)
# update accumulator
sum_uk = u_hat[:, :, k - 1] + sum_uk - u_hat[:, :, k]
# update signal spectrum
u_hat[:, :, k] = (
(img_hat - sum_uk - mu_hat[:, :] / 2) * HilbertMask
) / (
1
+ Alpha[k]
* (
(freqs_1 - omega[n, 0, k]) ** 2
+ (freqs_2 - omega[n, 1, k]) ** 2
)
)
# update signal frequencies
omega[n + 1, 0, k] = np.sum(
np.sum(freqs_1 * (np.abs(u_hat[:, :, k]) ** 2))
) / np.sum(np.sum(np.abs(u_hat[:, :, k]) ** 2))
omega[n + 1, 1, k] = np.sum(
np.sum(freqs_2 * (np.abs(u_hat[:, :, k]) ** 2))
) / np.sum(np.sum(np.abs(u_hat[:, :, k]) ** 2))
# Keep omegas on same halfplane
if omega[n + 1, 1, k] < 0:
omega[n + 1, :, k] = -omega[n + 1, :, k]
# recover full spectrum from analytic signal
u_hat[:, :, k] = fftshift(
fft2d(np.real(ifft2d(ifftshift(np.squeeze(u_hat[:, :, k])))))
)
# Gradient ascent for augmented Lagrangian
mu_hat[:, :] = mu_hat[:, :] + self.tau * (np.sum(u_hat, axis=2) - img_hat)
# increment iteration counter
n += 1
# whether convergence?
uDiff = np.spacing(1)
omegaDiff = np.spacing(1)
for k in range(0, K):
omegaDiff = omegaDiff + np.sum(
np.sum(np.abs(omega[n, :, :] - omega[n - 1, :, :]) ** 2)
)
uDiff = uDiff + np.sum(
np.sum(
1
/ (Hx * Hy)
* (u_hat[:, :, k] - u_hat_old[:, :, k])
* np.conj((u_hat[:, :, k] - u_hat_old[:, :, k]))
)
)
uDiff = np.abs(uDiff)
# update the u_hat old
u_hat_old = u_hat.copy()
##### Signal or Image Reconstruction #####
# Inverse Fourier Transform to compute (spatial) modes
u = np.zeros(shape=(Hy, Hx, K))
for k in range(0, K):
u[:, :, k] = np.real(ifft2d(ifftshift(np.squeeze(u_hat[:, :, k]))))
# Should the omega-history be returned, or just the final results?
if return_all:
return u, u_hat, omega
else:
return u
if __name__ == "__main__":
from pysdkit.data import test_grayscale
img = test_grayscale()
vmd2d = VMD2D(
K=5, alpha=5000, tau=0.25, DC=True, init="random", tol=1e-6, max_iter=3000
)
u = vmd2d.fit_transform(img)
print(u.shape)
from matplotlib import pyplot as plt
plt.imshow(img, cmap="gray")
plt.show()
for i in range(u.shape[2]):
plt.imshow(u[:, :, i], cmap="gray")
plt.show()