# -*- coding: utf-8 -*-
"""
Created on 2025/01/31 23:33:43
@author: Whenxuan Wang
@coauthor: Wentong Zhao
@email: wwhenxuan@gmail.com
"""
import numpy as np
from typing import Optional, Union, Tuple
[docs]
class RLMD(object):
"""
Robust Local Mean Decomposition
The RLMD is an improved local mean decomposition powered by a set of optimization strategies.
The optimization strategies can deal with boundary condition, envelope estimation, and sifting stopping criterion in the LMD.
It simultaneously extracts a set of mono-component signals (called product functions) and
their associated demodulation signals (i.e. AM signal and FM signal) from a mixed signal,
which is the most attracting feature comparing with other adaptive signal processing methods,
such as the EMD. The RLMD can be used for time-frequency analysis.
Zhiliang Liu, Yaqiang Jin, Ming J. Zuo, and Zhipeng Feng.
Time-frequency representation based on robust local mean decomposition for multi-component AM-FM signal analysis.
Mechanical Systems and Signal Processing. 95: 468-487, 2017.
Liu Zhiliang (2025). Robust Local Mean Decomposition (RLMD)
(https://www.mathworks.com/matlabcentral/fileexchange/66935-robust-local-mean-decomposition-rlmd),
MATLAB Central File Exchange. Retrieved July 13, 2025.
"""
[docs]
def __init__(
self,
max_imfs: Optional[int] = 10,
max_iter: Optional[int] = 30,
smooth_mode: Optional[str] = "ma",
ma_span: Optional[str] = "liu",
ma_iter_mode: Optional[str] = "fixed",
stop_threshold: Optional[Tuple[float, float, float]] = (0.005, 0.7, 0.005),
extd_r: Optional[float] = 0.2,
sifting_stopping_mode: Optional[str] = "liu",
min_peak: Optional[int] = 3,
min_ratio: Optional[float] = 0.001,
):
"""
The above parameters are the default parameters of the algorithm.
Please modify and adjust them according to the specific data when using them.
:param max_imfs: Maximum number of intrinsic mode functions to be decomposed.
:param max_iter: max iteration number in a PF sifting process.
:param smooth_mode: ma - moving average, spline - pchip.
:param ma_span: pdmax span method, see function ma_span.
:param ma_iter_mode: fixed or dynamic span for iterate ma.
:param stop_threshold: sifting stopping thresholds for Rilling's criterion.
:param extd_r: end extension length to original data.
:param sifting_stopping_mode: the sifting stoppling optimizaion.
:param min_peak: When the number of remaining extreme points of the input signal is less than a certain value,
the algorithm stops iterating.
:param min_ratio: When the energy of the remaining signal is less than a certain value,
the algorithm stops iterative updating.
"""
self.max_imfs = max_imfs
self.max_iter = max_iter
self.smooth_mode = smooth_mode
self.ma_span = ma_span
self.ma_iter_mode = ma_iter_mode
self.stop_threshold = stop_threshold
self.extd_r = extd_r
self.sifting_stopping_mode = sifting_stopping_mode
# Parameters related to stopping the algorithm's iterative update
self.min_peak = min_peak
self.min_ratio = min_ratio
[docs]
def __call__(
self, signal: np.ndarray, return_all: Optional[bool] = False
) -> np.ndarray:
"""Allow instances to be called like functions"""
return self.fit_transform(signal=signal, return_all=return_all)
[docs]
def __str__(self) -> str:
"""Get the full name and abbreviation of the algorithm"""
return "Robust Local Mean Decomposition (RLMD)"
[docs]
def _stop_lmd(
self, signal: np.ndarray, signal_energy: Union[np.ndarray, float]
) -> bool:
"""
Stopping criterion of LMD algorithm.
Check if there are enough (3) extrema to continue the decomposition
:param signal: The original input signal or the residual component of the signal
:param signal_energy: The energy of the original signal
:return:
"""
# Get the maximum and minimum index positions of the input signal
indmin, indmax, _ = extr(x=signal)
# Calculate the number of extreme points
number_peak = len(indmax) + len(indmin)
# Energy ratio of decomposed signal
ratio = np.sum(signal**2) / signal_energy
# Determine whether the algorithm can be stopped
stop = number_peak < self.min_peak or ratio < self.min_ratio
if stop:
return True
return False
[docs]
def _lmd_mean_amp(
self, x: np.ndarray
) -> Tuple[np.ndarray, np.ndarray, Union[np.ndarray, int]]:
"""
Compute mean function and amplitude function of x in LMD
:param x: the input signal.
:return: the results of mean iterative moving average
"""
# Find extremum indices
indmin, indmax, _ = extr(x=x)
# Total amount of extrema
n_extr = len(indmin) + len(indmax)
if n_extr < self.min_peak:
m = np.array([])
a = np.array([])
return m, a, n_extr
# extend original data to refrain end effect
# ext_indmin(max) contains the end point's index
ext_indmin, ext_indmax, ext_x, cut_index = extend(
x=x, indmin=indmin, indmax=indmax, extd_r=self.extd_r
)
# preparation
m0 = np.zeros(len(ext_x))
a0 = np.zeros(len(ext_x))
# TODO: Here the index array is processed by -1
ind_extextr = np.sort(np.hstack([ext_indmin, ext_indmax])) - 1
# Compute local mean and amplititude sequence
if self.smooth_mode == "ma":
for k in range(
0, len(ind_extextr) - 1
): # TODO: Note that you don't need to subtract 1 here.
subm1 = ind_extextr[k]
subm2 = ind_extextr[k + 1]
# print(subm1, subm2, len(m0), len(a0), len(ext_x))
m0[subm1:subm2] = (ext_x[subm1] + ext_x[subm2]) / 2
a0[subm1:subm2] = np.abs(ext_x[subm1] + ext_x[subm2]) / 2
span, smax = self._get_best_span(ind_extextr, x=ext_x)
# iterative moving average
m = self._itrma(x=m0, span=span, smax=smax)
a = self._itrma(x=a0, span=span, smax=smax)
# cut extension
# TODO: Here the index array is processed by -1
m = m[cut_index[0] : cut_index[1] + 1]
a = a[cut_index[0] : cut_index[1] + 1]
else:
raise ValueError(f"No specifications {self.smooth_mode} for smooth_mode!")
return m, a, n_extr
[docs]
def _itrma(self, x: np.ndarray, span: int, smax: int) -> np.ndarray:
"""
Iterative moving average dynamic step.
:param x: the input signal to be decomposed.
:param span: int, moving-average span (always odd).
:param smax: Maximum step between consecutive extrema.
:return: get the iterative moving average of the input signal.
"""
x = smooth(x, span=span)
nm = len(x)
if self.ma_iter_mode == "fixed":
# Stick Step
cntr = 0 # count times of moving average
# theoretic
max_c = np.ceil(smax / span) * 15
k = int((span + 1) / 2)
kmax = int(nm - (span - 1) / 2)
while k < kmax and cntr < max_c:
# find flat step
if x[k] == x[k + 1]:
x = smooth(x, span=span)
cntr = cntr + 1
k -= 1
k += 1
else:
raise ValueError(f"No specifications {self.ma_iter_mode} for ma_iter_mode!")
return x
[docs]
def _get_best_span(
self, ind_extr: np.ndarray, x: np.ndarray
) -> Tuple[Union[np.ndarray, int], int]:
"""
Moving average span selection.
Return the best span of moving average
:param ind_extr: indices of extremum of x
:param x: the input signal to be decomposed.
:return: - span : int, Optimal moving-average span (always odd).
- smax : int, Maximum step between consecutive extrema.
"""
ind_extr = np.asarray(ind_extr, dtype=int)
x = np.asarray(x, dtype=float)
x_extr = x[ind_extr]
# 0 elements' indices of eql_extr are the first indices of identical maximum ,or minimum, pairs
eql_extr = x_extr[2:] - x_extr[:-2]
# delete the extremum indecies between two identical maximum(minimum)
# eql_extr == 0 indicates identical values separated by one point
to_remove = np.where(eql_extr == 0)[0] + 1
# +1 because diff shrinks length
ind_extr = np.delete(ind_extr, to_remove)
# Steps between successive extrema
step_vec = ind_extr[1:] - ind_extr[:-1] + 1
smax = int(step_vec.max())
smean = step_vec.mean()
if self.ma_span.lower() == "liu":
# Probability density via histogram
counts, bins = np.histogram(step_vec, bins="auto", density=True)
bin_centers = bins[:-1] + np.diff(bins) / 2.0
span_c = np.sum(bin_centers * counts * np.diff(bins))
span_std = np.sqrt(
np.sum((bin_centers - span_c) ** 2 * counts * np.diff(bins))
)
span = int(np.ceil(span_c + 3 * span_std))
else:
raise ValueError("Unsupported ma_span specification.")
# Force span to be odd
span = int(np.ceil(span))
if span % 2 == 0:
span += 1
return span, smax
[docs]
def _is_sifting_stopping(
self, a_j: np.ndarray, j: np.ndarray, fv_i: np.ndarray
) -> Tuple[bool, np.ndarray]:
"""Sifting stopping criterion of Robust Local Mean Decomposition."""
# baseline is y = 1.
base = np.ones(shape=a_j.shape)
# df = abs(a_j - base); % difference between a_i and baseline.
df = a_j - base
# Initialize the algorithm to stop the flag
stop_sifting = False
if self.sifting_stopping_mode.lower() == "liu":
# local optimal iteration.
fv_i[j] = rms(df) + np.abs(kurtosis(x=df) - 3)
if j >= 3:
if (fv_i[j] >= fv_i[j - 1]) and (fv_i[j - 1] >= fv_i[j - 2]):
stop_sifting = True
return stop_sifting, fv_i
else:
raise ValueError(
f"No specifications {self.sifting_stopping_mode} for sifting_stopping_mode!"
)
return stop_sifting, fv_i
def extr(x: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Python implementation of the MATLAB `extr` function from the EMD toolbox.
Extracts the indices of extrema
Copied from emd toolbox by G.Rilling and P.Flandrin
http://perso.ens-lyon.fr/patrick.flandrin/emd.html
:param x: 1-D array_like Input signal.
:return: - indmin : ndarray Indices of local minima.
- indmax : ndarray Indices of local maxima.
- indzer : ndarray Indices of zero-crossings (including mid-points of flat zero segments).
"""
x = np.asarray(x, dtype=float)
n = len(x)
# ------------------------------------------------------------------
# 1. Zero-crossings
# ------------------------------------------------------------------
# Detect sign changes between consecutive samples
x1, x2 = x[:-1], x[1:]
indzer = np.where(x1 * x2 < 0)[0]
# Handle exact zeros and flat zero segments
iz = np.where(x == 0)[0]
if iz.size > 0:
# Detect contiguous zero segments
zeros = x == 0
dz = np.diff(np.r_[0, zeros, 0])
debz = np.where(dz == 1)[0]
finz = np.where(dz == -1)[0] - 1
# Middle of each flat zero segment
midz = np.round((debz + finz) / 2).astype(int)
indzer = np.unique(np.concatenate((indzer, midz)))
# ------------------------------------------------------------------
# 2. Extrema
# ------------------------------------------------------------------
d = np.diff(x)
d1, d2 = d[:-1], d[1:]
indmin = np.where((d1 * d2 < 0) & (d1 < 0))[0] + 1
indmax = np.where((d1 * d2 < 0) & (d1 > 0))[0] + 1
# Handle constant (plateau) segments
if np.any(d == 0):
bad = d == 0
dd = np.diff(np.r_[0, bad, 0])
debs = np.where(dd == 1)[0]
fins = np.where(dd == -1)[0] - 1
# Discard segments at the very edges
if debs.size and debs[0] == 0:
debs, fins = debs[1:], fins[1:]
if fins.size and fins[-1] == n - 1:
debs, fins = debs[:-1], fins[:-1]
imin, imax = [], []
for k in range(debs.size):
if d[debs[k] - 1] > 0 > d[fins[k]]:
imax.append((debs[k] + fins[k]) // 2)
elif d[debs[k] - 1] < 0 < d[fins[k]]:
imin.append((debs[k] + fins[k]) // 2)
indmin = np.unique(np.concatenate((indmin, imin)))
indmax = np.unique(np.concatenate((indmax, imax)))
return indmin.astype(int), indmax.astype(int), indzer.astype(int)
def extend(
x: np.ndarray,
indmin: np.ndarray,
indmax: np.ndarray,
extd_r: Union[np.ndarray, float],
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Python implementation of the MATLAB `extend` routine from the EMD toolbox.
:param x: 1-D ndarray, Original signal.
:param indmin: 1-D array_like, Indices of local minima (0-based).
:param indmax: 1-D array_lik, Extension ratio. 0 → no extension.
:param extd_r: float, Extension ratio. 0 → no extension.
:return: - ext_indmin : ndarray, Indices of minima in the extended signal.
- ext_indmax : ndarray, Indices of maxima in the extended signal.
- ext_x : ndarray, Extended signal.
- cut_index : ndarray, Two-element array [start, end] giving indices to cut back to the original range in the extended signal.
"""
x = np.asarray(x, dtype=float)
indmin = np.asarray(indmin, dtype=int)
indmax = np.asarray(indmax, dtype=int)
xlen = x.size
if extd_r == 0:
return indmin, indmax, x, np.array([0, xlen - 1])
# do not extend x
nbsym = int(np.ceil(extd_r * indmax.size)) # number of extrema to use
t = np.arange(xlen)
# boundary conditions for interpolations :
# ------------------------------------------------------------
# 1. Left boundary
# ------------------------------------------------------------
if indmax[0] < indmin[0]:
# first extremum is a maximum
if x[0] > x[indmin[0]]:
lmax = indmax[2 : min(indmax.size, nbsym + 1)][::-1]
lmin = indmin[1 : min(indmin.size, nbsym)][::-1]
lsym = indmax[0]
else:
lmax = indmax[1 : min(indmax.size, nbsym + 1)][::-1]
lmin = np.r_[indmin[1 : min(indmin.size, nbsym)][::-1], 0]
lsym = 0
else: # first extremum is a minimum
if x[0] < x[indmax[0]]:
lmax = indmax[1 : min(indmax.size, nbsym)][::-1]
lmin = indmin[2 : min(indmin.size, nbsym + 1)][::-1]
lsym = indmin[0]
else:
lmax = np.r_[indmax[1 : min(indmax.size, nbsym)][::-1], 0]
lmin = indmin[1 : min(indmin.size, nbsym + 1)][::-1]
lsym = 0
# ------------------------------------------------------------
# 2. Right boundary
# ------------------------------------------------------------
if indmax[-1] < indmin[-1]: # last extremum is a minimum
if x[-1] < x[indmax[-1]]:
rmax = indmax[max(indmax.size - nbsym, 0) :][::-1]
rmin = indmin[max(indmin.size - nbsym, 0) : -1][::-1]
rsym = indmin[-1]
else:
rmax = np.r_[xlen - 1, indmax[max(indmax.size - nbsym + 1, 0) :][::-1]]
rmin = indmin[max(indmin.size - nbsym, 0) :][::-1]
rsym = xlen - 1
else: # last extremum is a maximum
if x[-1] > x[indmin[-1]]:
rmax = indmax[max(indmax.size - nbsym, 0) : -1][::-1]
rmin = indmin[max(indmin.size - nbsym + 1, 0) :][::-1]
rsym = indmax[-1]
else:
rmax = indmax[max(indmax.size - nbsym, 0) :][::-1]
rmin = np.r_[xlen - 1, indmin[max(indmin.size - nbsym + 1, 0) :][::-1]]
rsym = xlen - 1
# Mirror locations
tlmin = 2 * t[lsym] - t[lmin]
tlmax = 2 * t[lsym] - t[lmax]
trmin = 2 * t[rsym] - t[rmin]
trmax = 2 * t[rsym] - t[rmax]
# Ensure the extension reaches far enough
if tlmin.size and tlmin[0] > t[0] or tlmax.size and tlmax[0] > t[0]:
if lsym == indmax[0]:
lmax = indmax[1 : min(indmax.size, nbsym + 1)][::-1]
else:
lmin = indmin[1 : min(indmin.size, nbsym + 1)][::-1]
if lsym == 0:
raise RuntimeError("Left extension bug")
lsym = 0
if trmin.size and trmin[-1] < t[-1] or trmax.size and trmax[-1] < t[-1]:
if rsym == indmax[-1]:
rmax = indmax[max(indmax.size - nbsym, 0) :][::-1]
else:
rmin = indmin[max(indmin.size - nbsym, 0) :][::-1]
if rsym == xlen - 1:
raise RuntimeError("Right extension bug")
rsym = xlen - 1
# ------------------------------------------------------------
# 3. Build extended signal
# ------------------------------------------------------------
l_end = max(np.max(lmax) if lmax.size else 0, np.max(lmin) if lmin.size else 0)
r_end = min(
np.min(rmax) if rmax.size else (xlen - 1),
np.min(rmin) if rmin.size else (xlen - 1),
)
new_lmax = l_end + 1 - lmax
new_lmin = l_end + 1 - lmin
new_rmax = rsym - rmax
new_rmin = rsym - rmin
lx_len = l_end - lsym
lx = x[lsym + 1 : l_end + 1][::-1]
rx = x[r_end:rsym][::-1]
ext_x = np.r_[lx, x[lsym : rsym + 1], rx]
lx_shift = lx_len - lsym
ext_indmin = np.r_[
new_lmin, indmin + lx_shift + 1, new_rmin + lx_shift + 1 + rsym
].astype(int)
ext_indmax = np.r_[
new_lmax, indmax + lx_shift + 1, new_rmax + lx_shift + 1 + rsym
].astype(int)
cut_index = np.array([lx_shift + 1, xlen + lx_shift], dtype=int)
return ext_indmin, ext_indmax, ext_x, cut_index
def smooth(x: np.ndarray, span: int) -> np.ndarray:
"""
The Python version for smooth(x, span) -- centered moving average in Matlab.
:param x: 1-D array_like
:param span: int, Window width (number of points). Must be an odd number; if it is an even number, 1 will be added automatically
:return: y: np.ndarray, Smoothed input sequence
"""
x = np.asarray(x, dtype=float).ravel()
N = x.size
# Ensure that span is an odd number and does not exceed the data length
span = int(span)
if span <= 0:
raise ValueError("span must be a positive integer")
if span % 2 == 0:
span += 1
if span > N:
span = (
N if N % 2 else N - 1
) # Take the largest odd number less than or equal to N
k = (span - 1) // 2
y = np.empty_like(x)
for i in range(N):
# Calculate the number of points available on the left and right
left = min(i, k)
right = min(N - 1 - i, k)
win = x[i - left : i + right + 1]
y[i] = win.mean()
return y
def rms(
x: np.ndarray,
dim: Optional[int] = 0,
*,
keepdims: bool = False,
omitnan: bool = False,
) -> Union[np.ndarray, float]:
"""
The Python version of rms function in Matlab.
:param x: array_like.
:param dim: int, optional, The axis to calculate along, default is 0 (column-wise)
:param keepdims: Whether to preserve compressed dimensions (False is consistent with MATLAB default)
:param omitnan: Whether to skip NaN (same as MATLAB default when False)
:return: the results of rms of the input signal.
"""
x = np.asarray(x, dtype=float)
if omitnan:
# 跳过 NaN:用 nanmean / nanstd 思路
mean_sq = np.nanmean(x**2, axis=dim, keepdims=keepdims)
else:
mean_sq = np.mean(x**2, axis=dim, keepdims=keepdims)
return np.sqrt(mean_sq)
def kurtosis(
x: np.ndarray,
axis: Optional[int] = 0,
bias: Optional[bool] = False,
nan_policy: Optional[str] = "propagate",
) -> Union[np.ndarray, float]:
"""
Reproduce MATLAB kurtosis(x) behavior (global formula, flag=0)
:param x: array_like.
:param axis: int or tuple, optional.
:param bias: bool, optional,
False is the population formula (equivalent to MATLAB flag=0), True is the sample formula (equivalent to MATLAB flag=1)
:param nan_policy: {'propagate', 'omit', 'raise'}, NaN handling strategy.
:return: ndarray, Kurtosis value (minus 3).
"""
x = np.asarray(x, dtype=float)
# Handling NaNs
if nan_policy == "omit":
m4 = np.nanmean((x - np.nanmean(x, axis=axis, keepdims=True)) ** 4, axis=axis)
var2 = np.nanvar(x, axis=axis, ddof=0) ** 2
elif nan_policy == "propagate":
m4 = np.mean((x - np.mean(x, axis=axis, keepdims=True)) ** 4, axis=axis)
var2 = np.var(x, axis=axis, ddof=0) ** 2
else:
raise ValueError("nan_policy must be 'propagate', 'omit' or 'raise'")
k = m4 / var2
if not bias:
# The overall formula does not need to be modified; MATLAB has subtracted 3 by default
k = k - 3
else:
# Sample unbiased correction (MATLAB flag=1)
n = np.sum(~np.isnan(x), axis=axis) if nan_policy == "omit" else x.shape[axis]
k = ((n + 1) * k - 3 * (n - 1)) * (n - 1) / ((n - 2) * (n - 3)) - 3
return k
if __name__ == "__main__":
from pysdkit.data import test_univariate_signal
from pysdkit.plot import plot_IMFs
from matplotlib import pyplot as plt
time, signal = test_univariate_signal()
rlmd = RLMD(max_imfs=8, max_iter=100, extd_r=0.1)
imfs = rlmd.fit_transform(signal)
plot_IMFs(signal, imfs)
plt.show()