Asif Rahman

Quantile binning with missing data

This uses Numpy and Numba for fast binning of numerical data to quantiles. It also supports missing data.

import numpy as np
from numba import njit

def _find_binning_thresholds(data, max_bins=256, subsample=int(2e5)):
    if not (2 <= max_bins <= 256):
        raise ValueError(
            "max_bins={} should be no smaller than 2 "
            "and no larger than 256.".format(max_bins)
        )

    percentiles = np.linspace(0, 100, num=max_bins + 1)
    percentiles = percentiles[1:-1]
    binning_thresholds = []
    for f_idx in range(data.shape[1]):
        col_data = np.ascontiguousarray(data[:, f_idx], dtype=np.float64)
        mask = np.isfinite(col_data)
        col_data = col_data[mask]
        distinct_values = np.unique(col_data)
        if len(distinct_values) <= max_bins:
            midpoints = distinct_values[:-1] + distinct_values[1:]
            midpoints *= 0.5
        else:
            midpoints = np.percentile(
                col_data, percentiles, interpolation="midpoint"
            ).astype(np.float64)
        binning_thresholds.append(np.unique(midpoints))
    return binning_thresholds


@njit()
def _map_num_col_to_bins(data, binning_thresholds, binned):
    for i in range(data.shape[0]):
        left, right = 0, binning_thresholds.shape[0]
        while left < right:
            middle = (right + left - 1) // 2
            if data[i] <= binning_thresholds[middle]:
                right = middle
            else:
                left = middle + 1
        binned[i] = left


def _map_to_bins(data, binning_thresholds, binned):
    """Bin numerical values to discrete integer-coded levels."""
    for feature_idx in range(data.shape[1]):
        _map_num_col_to_bins(
            data[:, feature_idx],
            binning_thresholds[feature_idx],
            binned[:, feature_idx],
        )


def _assign_nan_to_bin(binned, X, actual_n_bins, assign_nan_to_unique_bin=False):
    mask = np.isnan(X)
    for i in range(X.shape[1]):
        binned[mask[:, i], i] = actual_n_bins[i] if assign_nan_to_unique_bin else np.nan
    return binned


class QuantileBinning():
    def __init__(self):
        self.bin_thresholds = []
        self.n_bins = []

    def fit(self, X):
        self.bin_thresholds = _find_binning_thresholds(X)
        self.n_bins = np.array(
            [thresholds.shape[0] + 1 for thresholds in self.bin_thresholds], dtype=np.uint32
        )

    def transform(self, X, assign_nan_to_unique_bin=False):
        binned = np.zeros_like(X, dtype=np.float32, order="F")
        _map_to_bins(X, self.bin_thresholds, binned)
        binned = _assign_nan_to_bin(binned, X, self.n_bins, assign_nan_to_unique_bin)
        return binned