# -*- coding: utf-8 -*-
"""
Created on 2025/02/10 13:14:41
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
"""
import numpy as np
from matplotlib import pyplot as plt
from pysdkit.utils import (
simple_moving_average,
weighted_moving_average,
gaussian_smoothing,
savgol_smoothing,
exponential_smoothing,
)
from typing import Optional, Tuple, List
[docs]
class Moving_Decomp(object):
"""
Moving Average decomposition.
The 1D signal is decomposed into two parts, trend and cycle, by sliding average.
This method is very simple and very suitable for processing non-stationary time series data.
"""
[docs]
def __init__(
self,
window_size: int = 5,
method: str = "simple",
sigma: int = 2,
poly_order: int = 2,
alpha: float = 0.4,
) -> None:
"""
The input signal is decomposed by sliding average to obtain the trend and cycle parts.
:param window_size: The window size of the sliding average decomposition is preferably an odd number
:param method: Sliding decomposition method, optional ["simple", "weighted", "gaussian", "savgol", "exponential"]
:param sigma: Standard deviation for Gaussian kernel (default is 2)
:param poly_order: Order of the polynomial used to fit the samples (default is 2)
:param alpha: Smoothing factor, range from 0 to 1 (default is 0.4)
"""
self.window_size = window_size
self.method = method
# Specific parameter settings for various methods
self.sigma = sigma
self.poly_order = poly_order
self.alpha = alpha
# A list of all methods of moving average decomposition
self.methods_list = ["simple", "weighted", "gaussian", "savgol", "exponential"]
if self.method not in self.methods_list:
# Wrong smoothing method used
raise ValueError("method must be one of {}".format(self.methods_list))
[docs]
def __call__(self, signal: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""allow instances to be called like functions"""
return self.fit_transform(signal=signal)
[docs]
def __str__(self) -> str:
"""Get the full name and abbreviation of the algorithm"""
return "Moving Average decomposition (Moving_Decomp)"
[docs]
def _decomposition(
self, signal: np.ndarray, method: str
) -> Tuple[np.ndarray, np.ndarray]:
"""
Execute a sliding average decomposition algorithm.
The input signal must be a univariate signal.
For multivariate signals, multiple calls are required to decompose
:param signal: the input univariate signal of 1D numpy ndarray
:return: the trend and seasonality of the input signal
"""
# Use a specific moving average decomposition method
if method == "simple":
trend = simple_moving_average(signal=signal, window_size=self.window_size)
elif method == "weighted":
trend = weighted_moving_average(signal=signal, window_size=self.window_size)
elif method == "gaussian":
trend = gaussian_smoothing(signal=signal, sigma=self.sigma)
elif method == "savgol":
trend = savgol_smoothing(
signal=signal,
window_length=self.window_size,
poly_order=self.poly_order,
)
elif method == "exponential":
trend = exponential_smoothing(signal=signal, alpha=self.alpha)
else:
raise ValueError(
"method must be 'simple' or 'weighted' or 'gaussian' or 'savgol' or 'exponential'"
)
# Subtract the trend component from the original input signal
seasonality = signal - trend
# Returns both trend and seasonal components
return trend, seasonality
[docs]
@staticmethod
def plot_decomposition(
signal: np.ndarray,
trend: np.ndarray,
seasonality: np.ndarray,
colors: List[str] = None,
) -> Optional[plt.Figure]:
"""
Visualize the decomposition results of the input signal
:param signal: The input 1D signal of numpy ndarray
:param trend: The trend of the input 1D signal decomposed by `Moving_Decomp`
:param seasonality: The seasonality of the input 1D signal decomposed by `Moving_Decomp`
:param colors: The colors for plotting signal, trend and seasonality
:return: the figure of matplotlib for plotting
"""
# Determine the dimensionality of the input data by its shape
if colors is None:
colors = ["royalblue", "royalblue", "royalblue"]
shape = signal.shape
# If the inputs is univariate signal
if len(shape) == 1:
# Creating a drawing object
fig, ax = plt.subplots(nrows=3, ncols=1, figsize=(10, 5), sharex=True)
# Start drawing the image
ax[0].plot(signal, color=colors[0])
ax[1].plot(trend, color=colors[1])
ax[2].plot(seasonality, color=colors[2])
ax[0].set_ylabel("input signal")
ax[1].set_ylabel("trend")
ax[2].set_ylabel("seasonality")
elif len(shape) == 2:
# Get the number of input signals
n_vars, seq_len = shape
# Creating a drawing object
fig, ax = plt.subplots(
nrows=3, ncols=n_vars, figsize=(4 * n_vars, 5), sharex=True
)
# Iterate over all dimensions of variables and draw images
for n in range(n_vars):
ax[0, n].plot(signal[n, :], color=colors[n])
ax[1, n].plot(trend[n, :], color=colors[n])
ax[2, n].plot(seasonality[n, :], color=colors[n])
ax[0, n].set_title(f"Input channels {n}")
ax[0, 0].set_ylabel("input signal")
ax[1, 0].set_ylabel("trend")
ax[2, 0].set_ylabel("seasonality")
else:
raise ValueError(
"The input must be 1D univariate or multivariate signal with shape [seq_len] or [n_vars, seq_len]"
)
return fig
if __name__ == "__main__":
from pysdkit.data import generate_time_series
# univariate time series
time_series = generate_time_series()
moving_decomp = Moving_Decomp(window_size=5)
trends, seasonalities = moving_decomp.fit_transform(signal=time_series)
moving_decomp.plot_decomposition(
signal=time_series, trend=trends, seasonality=seasonalities
)
plt.show()
# multivariate time series
time_series = np.vstack([time_series] * 3)
trends, seasonalities = moving_decomp.fit_transform(signal=time_series)
moving_decomp.plot_decomposition(
signal=time_series, trend=trends, seasonality=seasonalities
)
plt.show()