Asif Rahman

Feature engineering for time series data using Numba

Feature engineering over a multivariate time series with missing data.

Given a sequence of measurements values: np.ndarray and observation times times: np.ndarray, we want to engineer features for a machine learning model that captures the temporal trends and statistics over different temporal windows. Our data is irregularly sampled and have missing data.

Features representing the magnitude, dispersion, direction of change, and temporal trends are derived. Every time point is represented with 4 categories of features:

The engineered features include:

  1. dt: time elapsed since the measurement was made
  2. val: most recent measurement, measurements are forward filled up to a maximum duration after
  3. srng: range as \(\frac{x_{max}-x_{min}}{x_{max}+x_{min}}100\) over a short window
  4. ssgn: sign of the change (-1 or +1) between the first and last measurement in a short window, windows without measurements are filled with 0
  5. lrng: range as (max-min)/(max+min) * 100 over a long window
  6. lsgn: sign of the change (-1 or +1) between the first and last measurement in a long window, windows without measurements are filled with 0
  7. sema: slow exponential moving average, calculated after forward filling
  8. fema: fast exponential moving average, calculated after forward filling

We also need to define the following settings for every variable:

  1. forward_fill_duration: duration in minutes to forward fill a missing variable
  2. short_rolling_window_size: short rolling window size in minutes
  3. long_rolling_window_size: long rolling window size in minutes
  4. slow_ema_tau: decay rate for slow exponential moving average in minutes
  5. fast_ema_tau: decay rate for fast exponential moving average in minutes

Some derived features have no missing data, including dt, ssgn, srng, lsgn, and lrng. Other variables like val, sema, fema have missing values encoded as NaN. Missing data is forward filled up to forward_fill_duration and all missing values after the forward filling time are NaN.

from numba import njit, prange

@njit()
def ema(times, values, tau):
    """Exponential moving average for irregularly sampled time series.
    Units for ``times`` and ``tau`` should be in hours.

    Args:
        times (np.ndarray): 1D array of measurement times in hours
        values (np.ndarray): 1D array of measurements
        tau (float): time decay in hours
    """
    n = len(values)
    ret = np.empty(n, dtype=np.float64) * np.nan
    ret[0] = values[0]
    last_i = 0
    for i in range(1, n):
        if np.isnan(times[i]) | np.isnan(values[i]):
            continue
        alpha = (times[i] - times[last_i]) / tau
        w = np.exp(-alpha)
        if alpha > 1e-6:
            w2 = (1 - w) / alpha
        else:
            # use Taylor expansion for numerical stability
            w2 = 1 - (alpha / 2) + (alpha * alpha / 6) - (alpha * alpha * alpha / 24)
        ret[i] = (ret[last_i] * w) + (values[i] * (1 - w2)) + (values[last_i] * (w2 - w))
        last_i = i
    return ret


@njit()
def ffill(times, values):
    """Forward fill an array of values and times.
    Times indicate the time of last observation.
    
    Args:
        arr (np.ndarray): array of values with nan
        times (np.ndarray): array of times

    Returns:
        2D array with forward filled times (index 0)
        and values (index 1).
    """
    out_values = values.copy()
    out_times = times.copy()
    n = values.shape[0]
    out = np.zeros((n, 2), dtype=np.float64)
    lastval = np.nan
    lasttime = np.nan
    for row_idx in range(n):
        if np.isfinite(values[row_idx]):
            lastval = values[row_idx]
            lasttime = times[row_idx]
        out_values[row_idx] = lastval
        out_times[row_idx] = lasttime
    out[:, 0] = out_times
    out[:, 1] = out_values
    return out


@njit()
def sliding_windows(times, window_size):
    tdiff = times - times.reshape(-1, 1)
    twindows = (tdiff >= 0) & (tdiff <= window_size)
    return twindows


@njit()
def feature_engineering(
    times,
    values,
    forward_fill_duration,
    short_rolling_window_size,
    long_rolling_window_size,
    slow_ema_tau,
    fast_ema_tau,
):
    """Feature engineering.
    
    1. Age of the observation
    2. Most recent forward-filled measurement
    3. Dispersion as (max-min)/(max+min) over short window
    4. Sign of change between first and last value in short window
    5. Dispersion as (max-min)/(max+min) over long window
    6. Sign of change between first and last value in long window
    7. slow EMA
    8. fast EMA

    Forward filling replaces nan values up to a maximum duration given by
    `forward_fill_duration`. Any remaining missing values should be imputed
    with the median or by sampling from a standard reference range.

    Args:
        values (np.ndarry): 2D array of observations
        times (np.ndarry): 1D array of observation times
        forward_fill_duration (float): duration to forward fill missing values
        short_rolling_window_size (float): observation duration for rolling 
            statistics with a short lookback window
        long_rolling_window_size (float): observation duration for rolling 
            statistics with a long lookback window
        slow_ema_tau (float): slow decay rate weights more of the past
        fast_ema_tau (float): fast decay rate weights more of the present

    Returns:
        2D matrix with shape [n_samples, 8], where the features are:
        suffixes = ['dt', 'val', 'short_rng', 'short_sgn', 'long_rng', 'long_sgn', 'slow_ema', 'fast_ema']
    """
    DT_IX = 0
    VAL_IX = 1
    SRNG_IX = 2
    SSGN_IX = 3
    LRNG_IX = 4
    LSGN_IX = 5
    SEMA_IX = 6
    FEMA_IX = 7
    
    n = values.shape[0]
    derived = np.zeros((n, 8)) * np.nan

    missing = np.isnan(values)

    # return a median imputed array if there are no observations
    if np.all(missing):
        derived[:, SSGN_IX] = 0.
        derived[:, LSGN_IX] = 0.
        derived[:, SRNG_IX] = 0.
        derived[:, LRNG_IX] = 0.
        return derived

    # indicate samples within a fixed window for each sample
    short_windows = sliding_windows(times, short_rolling_window_size)
    short_windows = short_windows & ~missing.reshape(-1,1)
    long_windows = sliding_windows(times, long_rolling_window_size)
    long_windows = long_windows & ~missing.reshape(-1,1)

    # forward fill
    xfill = ffill(times, values)
    dt = times - xfill[:,0]

    # 0. Age of the measurement
    derived[:, DT_IX] = dt

    # 1. last measured value
    derived[:, VAL_IX] = xfill[:, 1]
    expired = ~np.isnan(dt)
    expired[expired] = dt[expired] >= forward_fill_duration
    derived[expired, VAL_IX] = np.nan

    # rolling statistics
    for t in range(n):
        # samples in this window
        short_inds = short_windows[:, t]
        long_inds = long_windows[:, t]
        short_vals = values[short_inds]
        long_vals = values[long_inds]

        if short_vals.size > 0:
            # 2. rolling dispersion
            high = np.max(short_vals)
            low = np.min(short_vals)
            total = high + low
            if total > 1e-6:
                derived[t, SRNG_IX] = (high - low) / (total + 1e-6) * 100
            else:
                derived[t, SRNG_IX] = 0.

            # 3. sign of change between last and first value
            if short_vals.size >= 2:
                derived[t, SSGN_IX] = np.sign(short_vals[-1] - short_vals[0])
            else:
                derived[t, SSGN_IX] = 0.
        else:
            derived[t,SRNG_IX] = 0.
            derived[t,SSGN_IX] = 0.

        if long_vals.size > 0:
            # 4. rolling dispersion
            high = np.max(long_vals)
            low = np.min(long_vals)
            total = high + low
            if total > 1e-6:
                derived[t, LRNG_IX] = (high - low) / (total + 1e-6) * 100
            else:
                derived[t, LRNG_IX] = 0.

            # 5. sign of change between last and first value
            if long_vals.size >= 2:
                derived[t, LSGN_IX] = np.sign(long_vals[-1] - long_vals[0])
            else:
                derived[t, LSGN_IX] = 0.
        else:
            derived[t, LRNG_IX] = 0.
            derived[t, LSGN_IX] = 0.

    # exponential moving average using the forward filled data
    ema_slow = np.empty(n, dtype=np.float64) * np.nan
    ema_fast = np.empty(n, dtype=np.float64) * np.nan
    m = np.isfinite(derived[:, 1])
    vals = derived[m, 1]
    if vals.size > 0:
        ema_slow = ema(times[m], derived[m, 1], slow_ema_tau)
        ema_fast = ema(times[m], derived[m, 1], fast_ema_tau)
        # 6. slow EMA
        derived[m, SEMA_IX] = ema_slow
        # 7. fast EMA
        derived[m, FEMA_IX] = ema_fast

    return derived