Source code for pysdkit.tsa._dtw

# -*- coding: utf-8 -*-
"""
Created on 2025/02/12 00:11:39
@author: Whenxuan Wang
@email: wwhenxuan@gmail.com
"""
from sys import maxsize as MAXSIZE
import numpy as np

from typing import Optional, Callable


[docs] def dtw_distance( ts_a: np.ndarray, ts_b: np.ndarray, d: Callable = lambda x, y: abs(x - y), max_warping_window: Optional[int] = 10000, ) -> np.ndarray: """ Returns the DTW similarity distance between two 2-D timeseries numpy arrays. :param ts_a: array of shape [n_samples, n_timepoints] :param ts_b: array of shape [n_samples, n_timepoints] Two arrays containing n_samples of timeseries data whose DTW distance between each sample of A and B will be compared :param d: DistanceMetric object (default = abs(x-y)) the distance measure used for A_i - B_j in the DTW dynamic programming function :param max_warping_window: int, optional (default = infinity) Maximum warping window allowed by the DTW dynamic programming function :return: DTW distance between A and B """ # Create cost matrix via broadcasting with large int ts_a, ts_b = np.array(ts_a), np.array(ts_b) M, N = len(ts_a), len(ts_b) cost = MAXSIZE * np.ones((M, N)) # Initialize the first row and column cost[0, 0] = d(ts_a[0], ts_b[0]) for i in range(1, M): cost[i, 0] = cost[i - 1, 0] + d(ts_a[i], ts_b[0]) for j in range(1, N): cost[0, j] = cost[0, j - 1] + d(ts_a[0], ts_b[j]) # Populate rest of cost matrix within window for i in range(1, M): for j in range(max(1, i - max_warping_window), min(N, i + max_warping_window)): choices = cost[i - 1, j - 1], cost[i, j - 1], cost[i - 1, j] cost[i, j] = np.min(choices) + d(ts_a[i], ts_b[j]) # Return DTW distance given window return cost[-1, -1]