Source code for pysdkit._vmd2d.cvmd2d
# -*- coding: utf-8 -*-
"""
Created on 2025/02/12 11:06:23
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
"""
import numpy as np
from scipy.linalg import norm
from scipy.sparse import coo_matrix, csr_matrix
from typing import Optional, Union, Tuple, Any
from pysdkit.utils import fft2d, ifft2d, fftshift, ifftshift
[docs]
class CVMD2D(object):
"""
Compact Variational Mode Decomposition for 2D Images
Spatially Compact and Spectrally Sparse Image Decomposition and Segmentation.
Decomposing multidimensional signals, such as images, into spatially compact,
potentially overlapping modes of essentially wavelike nature makes these components accessible for further downstream analysis.
This decomposition enables space-frequency analysis, demodulation, estimation of local orientation, edge and corner detection,
texture analysis, denoising, inpainting, or curvature estimation.
[1] D. Zosso, K. Dragomiretskiy, A.L. Bertozzi, P.S. Weiss, Two-Dimensional Compact Variational Mode Decomposition,
Journal of Mathematical Imaging and Vision, 58(2):294–320, 2017. DOI:10.1007/s10851-017-0710-z.
[2] K. Dragomiretskiy, D. Zosso, Variational Mode Decomposition,
IEEE Trans. on Signal Processing, 62(3):531-544, 2014. DOI:10.1109/TSP.2013.2288675.
[3] K. Dragomiretskiy, D. Zosso, Two-Dimensional Variational Mode Decomposition,
EMMCVPR 2015, Hong Kong, LNCS 8932:197-208, 2015. DOI:10.1007/978-3-319-14612-6_15.
MATLAB code: https://ww2.mathworks.cn/matlabcentral/fileexchange/67285-two-dimensional-compact-variational-mode-decomposition-2d-tv-vmd?s_tid=FX_rc2_behav
"""
[docs]
def __init__(
self,
K: Optional[int] = 5,
alpha: Optional[int] = 1000,
beta: Optional[float] = 0.5,
gamma: Optional[int] = 500,
delta: Optional[np.ndarray] = np.inf,
rho: Optional[int] = 10,
rho_k: Optional[int] = 10,
tau: Optional[float] = 0.0,
tau_k: Optional[float] = 2.5,
t: Optional[float] = 1.5,
DC: Optional[bool] = False,
init: Union[str, int] = "uniform",
u_tol: Optional[float] = 1e-10,
A_tol: Optional[float] = 1e-4,
omega_tol: Optional[float] = 1e-10,
max_iter: Optional[int] = 130,
M: Optional[int] = 1,
A_phase: Optional[np.ndarray] = np.array([100, 150]),
random_seed: Optional[int] = 42,
) -> None:
"""
Please note that this algorithm is very sensitive to hyperparameters.
When using it, please try multiple configurations based on the input image features.
You can try the VMD2D algorithm instead of the complex hyperparameter configuration of the algorithm.
:param K: the number of modes to be recovered
:param alpha: narrowbandedness of subsignals coefficient (scalar)
:param beta: L1 penalty coefficient of spatial mode support (scalar)
:param gamma: Spatial support TV-term: heat diffusion coefficient
:param delta: Artifact classification threshold (inf for no artifacts)
:param rho: data fidelity coefficient
:param rho_k: u-v splitting coefficient
:param tau: time-step of dual ascent for data ( pick 0 for noise-slack )
:param tau_k: time-step of dual ascent for u-v splitting
:param t: spatial support TV-term: time-step of ODE/PDE
:param DC: true, if the first mode is put and kept at DC (0-freq)
:param init: "radially" = all omegas start initialized radially uniformly
"random" = all omegas start initialized randomly on half plane
2*K*M = use given omega list for initialization; should be 2xKxM
:param u_tol: Tolerance for u convergence
:param A_tol: Tolerance for A convergence
:param omega_tol: Tolerance for omega convergence
:param max_iter: maximum number of iterations
:param M: number of submodes
:param A_phase: 2D VMD - 2D-TV-VMD - 2D-TV-VMD-Seg scheduling
% iterations 1:a --> 2D VMD
% iterations a:b --> 2D TV VMD
% iterations b:end --> 2D TV VMD Segmentation
:param: random_seed: random seed for the omega initialization
"""
self.K = K
self.alpha = alpha
self.beta = beta
self.gamma = gamma
self.delta = delta
self.rho = rho
self.rho_k = rho_k
self.tau = tau
self.tau_k = tau_k
self.t = t
self.DC = DC
self.init = init
self.u_tol = u_tol
self.A_tol = A_tol
self.omega_tol = omega_tol
self.max_iter = max_iter
self.M = M
self.A_phase = A_phase
# A list of initialization methods
self.init_omega_list = ["uniform", "random"]
# Parameter Test
if self.init not in self.init_omega_list:
raise ValueError("init should be one of {}".format(self.init_omega_list))
# Create a generator based on a random number seed
self.rng = np.random.default_rng(seed=random_seed)
[docs]
def __call__(
self, image: np.ndarray, return_all: Optional[str] = False
) -> Union[
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray], np.ndarray
]:
"""allow instances to be called like functions"""
return self.fit_transform(image=image, return_all=return_all)
[docs]
def __str__(self) -> str:
"""Get the full name and abbreviation of the algorithm"""
return "Compact Variational Mode Decomposition for 2D Images (CVMD2D)"
[docs]
def _init_omega(self) -> np.ndarray:
"""Initialization of omega_k"""
# N iterations at most, 2 spatial coordinates, K modes, M submodes
omega = np.zeros(shape=(self.max_iter, 2, self.K, self.M))
if isinstance(self.init, str):
# 如果输入是字符串则通过已有的三种方法进行初始化
if self.init == "radially":
# Radially Uniform
# if DC, Keep first mode at 0, 0
if self.DC is True:
maxK = self.K - 1
else:
maxK = self.K
radius = 0.3
for k in range(int(self.DC), int(self.DC) + maxK):
for m in range(0, self.M):
omega[0, 0, k, m] = radius * np.cos(
np.pi * (k - 1 + (m - 1) * maxK) / maxK / self.M
)
omega[0, 1, k, m] = radius * np.sin(
np.pi * (k - 1 + (m - 1) * maxK) / maxK / self.M
)
elif self.init == "random":
# Random on Half-Plane
for k in range(0, self.K):
for m in range(0, self.M):
omega[0, 0, k, m] = self.rng.random() - 1 / 2
omega[0, 1, k, m] = self.rng.random() / 2
# DC component (if expected)
if self.DC is True:
omega[0, 0, 0, :] = 0.0
omega[0, 1, 0, :] = 0.0
else:
raise ValueError(
"init should be one of {}".format(self.init_omega_list)
)
elif np.size(self.init) == 2 * self.K * self.M:
# use given omega list for initialization; should be 2xKxM
if np.size(self.init, 0) != 2 or np.size(self.init, 1) != self.K:
raise ValueError("init parameter has inappropriate size")
omega[0, :, :, :] = self.init
else:
raise ValueError("Wrong input for `init`")
return omega
[docs]
def fit_transform(
self, image: np.ndarray, return_all: Optional[str] = False
) -> Union[
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray], np.ndarray
]:
"""
Execute the signal decomposition algorithm for 2D images
Please note that this algorithm is very sensitive to hyperparameters.
Please try multiple configurations based on the input image features.
You can try the VMD2D algorithm instead of the complex hyperparameter configuration of the algorithm.
"""
# Resolution of image
Hy, Hx = image.shape
X, Y = np.meshgrid(np.arange(1, Hx + 1) / Hx, np.arange(1, Hy + 1) / Hy)
# Spectral Domain discretization
fx = 1 / Hx
fy = 1 / Hx
freqs_1 = X - 0.5 - fx
freqs_2 = Y - 0.5 - fy
# Storage matrices for (Fourier) modes. Iterations are not recorded
u_hat = np.zeros(shape=(Hy, Hx, self.K, self.M))
# copy the u_hat
u = u_hat.copy()
u_old = u.copy()
v = u.copy()
print("origin v", v.shape)
# Augmented Lagrangian Variables
# linking variable u/v ~rho_k
lambda_k = u.copy()
# data fidelity ~rho
lambda_d = np.zeros(shape=(Hy, Hx))
# Spatial support variables
A = np.ones(shape=(Hy, Hx, self.K))
A_old = A.copy()
# Artifact map
X = np.zeros(shape=(Hy, Hx))
# Initialization of omega_k
omega = self._init_omega()
##### Main Loop for iterative updates #####
# Stopping criteria tolerances
uDiff = np.inf
ADiff = np.inf
omegaDiff = np.inf
# Stores the sum of A_j*v_j for all j not equal to k
sum_Avk = 0
# Init the interation counter
n = 0
# Main loop - run until convergence or max number of iterations
while (
(uDiff > self.u_tol or ADiff > self.A_tol or omegaDiff > self.omega_tol)
and n < self.max_iter - 1
) or n <= np.max(np.isfinite(self.A_phase) * self.A_phase):
# Mode
for k in range(0, self.K):
# Submodes
for m in range(0, self.M):
# Compute the halfplane spectral mask for the 2D "analytic signal"
HilbertMask = (
np.sign(
freqs_1 * omega[n, 0, k, m] + freqs_2 * omega[n, 1, k, m]
)
+ 1
)
# Update accumulator
if m == 0:
if k == 0:
sum_Avk = (
sum_Avk
+ A[:, :, -1] * v[:, :, -1, -1]
- A[:, :, 0] * v[:, :, 0, 0]
)
else:
sum_Avk = (
sum_Avk
+ A[:, :, k - 1] * v[:, :, k - 1, -1]
- A[:, :, k] * v[:, :, k, 0]
)
else:
sum_Avk = (
sum_Avk
+ A[:, :, k] * v[:, :, k, m - 1]
- A[:, :, k] * v[:, :, k, m]
)
# Update v (time domain averaging)
v[:, :, k, m] = (
self.rho_k * u[:, :, k, m]
+ lambda_k[:, :, k, m]
+ self.rho
* A[:, :, k]
* (image - sum_Avk + lambda_d / self.rho)
* (1 - X)
) / (self.rho_k + self.rho * (1 - X) * A[:, :, k] ** 2)
# Update u_hat (analytic signal spectrum via Wiener filter)
u_hat[:, :, k, m] = (
fftshift(
fft2d(self.rho_k * v[:, :, k, m] - lambda_k[:, :, k, m])
)
* HilbertMask
) / (
self.rho_k
+ 2
* self.alpha
* (
(freqs_1 - omega[n, 0, k, m]) ** 2
+ (freqs_2 - omega[n, 1, k, m]) ** 2
)
)
# Update center frequencies (first mode is kept at omega = 0 if DC = 1)
if not self.DC or k > 0:
# Update signal frequencies as center of mass of power spectrum
omega[n + 1, 0, k, m] = np.sum(
np.sum(freqs_1 * (np.abs(u_hat[:, :, k, m]) ** 2))
) / np.sum(np.sum(np.abs(u_hat[:, :, k, m]) ** 2))
omega[n + 1, 1, k, m] = np.sum(
np.sum(freqs_2 * (np.abs(u_hat[:, :, k, m]) ** 2))
) / np.sum(np.sum(np.abs(u_hat[:, :, k, m]) ** 2))
# Keep omegas on same halfplane (top half)
if omega[n + 1, 1, k, m] < 0:
omega[n + 1, :, k, m] = -omega[n + 1, :, k, m]
# recover full spectrum (and signal) from analytic signal spectrum
u[:, :, k, m] = np.real(
ifft2d(ifftshift(np.squeeze(u_hat[:, :, k, m])))
)
# No MBO/TV-term propagation in phase I (2D VMD, only)
# Individual MBO for TV-term Propagation in phase II (2D TV VMD)
if self.A_phase[0] <= n + 1 < self.A_phase[1]:
# Reconstruction Fidelity + Area Penalty + Segmentation Penalty
A[:, :, k] = A[:, :, k] + self.t * (
-self.beta
+ 2
* self.rho
* np.sum(v[:, :, k, :], axis=2)
* (
image
- np.sum(A * np.sum(v, axis=3), axis=2)
+ A[:, :, k] * np.sum(v[:, :, k, :], axis=2)
+ lambda_d / self.rho
)
* (1 - X)
)
A[:, :, k] = A[:, :, k] / (
1
+ self.t
* 2
* self.rho
* (1 - X)
* np.sum(v[:, :, k, :], axis=2) ** 2
)
# Project to characteristic range
A[A > 1] = 1
A[A < 0] = 0
# Propagate by heat equation
A[:, :, k] = ifft2d(
fft2d(A[:, :, k])
/ (1 + self.t * self.gamma * ifftshift(freqs_1**2 + freqs_2**2))
)
# individual MBO thresholding [0,1] (no segmentation constraint)
A[:, :, k] = A[:, :, k] >= 0.5
# Joint MBO prop. with segmentation, "Winner Takes All", phase III
if n + 1 >= self.A_phase[1]:
sum_Avk = np.sum(A * np.sum(v, axis=3), axis=2)
for k in range(0, self.K):
A[:, :, k] = A[:, :, k] + self.t * (
-self.beta
+ 2
* self.rho
* np.sum(v[:, :, k, :], axis=2)
* (
image
- sum_Avk
+ A[:, :, k] * np.sum(v[:, :, k, :], axis=2)
+ lambda_d / self.rho
)
)
A[:, :, k] = A[:, :, k] / (
1 + self.t * 2 * self.rho * np.sum(v[:, :, k, :], axis=2) ** 2
)
A[:, :, k] = ifft2d(
fft2d(A[:, :, k])
/ (1 + self.t * self.gamma * ifftshift(freqs_1**2 + freqs_2**2))
)
# Reshape A to a 2D array of shape [Hx*Hy, K]
A_reshaped = A.reshape(Hx * Hy, self.K)
# Find the index of the maximum value along the second axis (axis=1)
I = np.argmax(A_reshaped, axis=1)
# Create a sparse matrix with 1s at the positions specified by I
# Use scipy.sparse.csr_matrix to create a sparse matrix
row_indices = np.arange(Hx * Hy) # Row indices (1:Hx*Hy)
data = np.ones(Hx * Hy) # All values are 1
sparse_matrix = csr_matrix(
(data, (row_indices, I)), shape=(Hx * Hy, self.K)
)
# Convert the sparse matrix to a dense array and reshape back to 3D
A = sparse_matrix.toarray().reshape(Hx, Hy, self.K)
# Artifact thresholding
DF = image - np.sum(A * np.sum(v, axis=3), axis=2)
# Update the X array
X = DF**2 >= self.delta
# data fidelity dual ascent
lambda_d = lambda_d + self.tau * DF
# Update Lagrangian multiplier variables via gradient ascent
lambda_k = lambda_k + self.tau * (u - v)
# Update counter
n += 1
# Tolerance calculation for stopping criteria
uDiff = norm(u - u_old) ** 2 / norm(u**2 / (Hx * Hy))
ADiff = norm(A.ravel() - A_old.ravel(), ord=1) / (Hx * Hy)
# Storage of n-th iteration
u_old = u.copy()
A_old = A.copy()
omega = omega[n, :, :, :]
# Whether to return all the information of decomposition
if return_all is True:
return u, v, omega, A, X
return u
if __name__ == "__main__":
from pysdkit.data import test_grayscale
img = test_grayscale()
vmd2d = CVMD2D(
K=5,
alpha=1000,
tau=2.5,
DC=True,
init="uniform",
max_iter=130,
M=1,
A_phase=np.array([100, np.inf]),
beta=0.5,
gamma=500,
rho=10,
rho_k=10,
tau_k=2.5,
t=1.5,
)
u = vmd2d.fit_transform(img)
print(u.shape)
from matplotlib import pyplot as plt
plt.imshow(img, cmap="gray")
plt.show()
for i in range(u.shape[2]):
plt.imshow(u[:, :, i], cmap="gray")
plt.show()