Asif Rahman

Quantile binning with missing data

2021-10-03

This uses numpy and numba for fast binning of numerical data to quantiles. It also supports missing data.

import numpy as np
from numba import njit

def _find_binning_thresholds(data, max_bins=256, subsample=int(2e5)):
    if not (2 <= max_bins <= 256):
        raise ValueError(
            "max_bins={} should be no smaller than 2 "
            "and no larger than 256.".format(max_bins)
        )

    percentiles = np.linspace(0, 100, num=max_bins + 1)
    percentiles = percentiles[1:-1]
    binning_thresholds = []
    for f_idx in range(data.shape[1]):
        col_data = np.ascontiguousarray(data[:, f_idx], dtype=np.float64)
        mask = np.isfinite(col_data)
        col_data = col_data[mask]
        distinct_values = np.unique(col_data)
        if len(distinct_values) <= max_bins:
            midpoints = distinct_values[:-1] + distinct_values[1:]
            midpoints *= 0.5
        else:
            midpoints = np.percentile(
                col_data, percentiles, interpolation="midpoint"
            ).astype(np.float64)
        binning_thresholds.append(np.unique(midpoints))
    return binning_thresholds


@njit()
def _map_num_col_to_bins(data, binning_thresholds, binned):
    for i in range(data.shape[0]):
        left, right = 0, binning_thresholds.shape[0]
        while left < right:
            middle = (right + left - 1) // 2
            if data[i] <= binning_thresholds[middle]:
                right = middle
            else:
                left = middle + 1
        binned[i] = left


def _map_to_bins(data, binning_thresholds, binned):
    """Bin numerical values to discrete integer-coded levels."""
    for feature_idx in range(data.shape[1]):
        _map_num_col_to_bins(
            data[:, feature_idx],
            binning_thresholds[feature_idx],
            binned[:, feature_idx],
        )


def _assign_nan_to_bin(binned, X, actual_n_bins, assign_nan_to_unique_bin=False):
    mask = np.isnan(X)
    for i in range(X.shape[1]):
        binned[mask[:, i], i] = actual_n_bins[i] if assign_nan_to_unique_bin else np.nan
    return binned


class QuantileBinning():
    def __init__(self):
        self.bin_thresholds = []
        self.n_bins = []

    def fit(self, X):
        self.bin_thresholds = _find_binning_thresholds(X)
        self.n_bins = np.array(
            [thresholds.shape[0] + 1 for thresholds in self.bin_thresholds], dtype=np.uint32
        )

    def transform(self, X, assign_nan_to_unique_bin=False):
        binned = np.zeros_like(X, dtype=np.float32, order="F")
        _map_to_bins(X, self.bin_thresholds, binned)
        binned = _assign_nan_to_bin(binned, X, self.n_bins, assign_nan_to_unique_bin)
        return binned