Quantile binning with missing data
Posted on 2021-10-03
import numpy as np
from numba import njit
def _find_binning_thresholds(data, max_bins=256, subsample=int(2e5)):
if not (2 <= max_bins <= 256):
raise ValueError(
"max_bins={} should be no smaller than 2 "
"and no larger than 256.".format(max_bins)
)
percentiles = np.linspace(0, 100, num=max_bins + 1)
percentiles = percentiles[1:-1]
binning_thresholds = []
for f_idx in range(data.shape[1]):
col_data = np.ascontiguousarray(data[:, f_idx], dtype=np.float64)
mask = np.isfinite(col_data)
col_data = col_data[mask]
distinct_values = np.unique(col_data)
if len(distinct_values) <= max_bins:
midpoints = distinct_values[:-1] + distinct_values[1:]
midpoints *= 0.5
else:
midpoints = np.percentile(
col_data, percentiles, interpolation="midpoint"
).astype(np.float64)
binning_thresholds.append(np.unique(midpoints))
return binning_thresholds
@njit()
def _map_num_col_to_bins(data, binning_thresholds, binned):
for i in range(data.shape[0]):
left, right = 0, binning_thresholds.shape[0]
while left < right:
middle = (right + left - 1) // 2
if data[i] <= binning_thresholds[middle]:
right = middle
else:
left = middle + 1
binned[i] = left
def _map_to_bins(data, binning_thresholds, binned):
"""Bin numerical values to discrete integer-coded levels."""
for feature_idx in range(data.shape[1]):
_map_num_col_to_bins(
data[:, feature_idx],
binning_thresholds[feature_idx],
binned[:, feature_idx],
)
def _assign_nan_to_bin(binned, X, actual_n_bins, assign_nan_to_unique_bin=False):
mask = np.isnan(X)
for i in range(X.shape[1]):
binned[mask[:, i], i] = actual_n_bins[i] if assign_nan_to_unique_bin else np.nan
return binned
class QuantileBinning():
def __init__(self):
self.bin_thresholds = []
self.n_bins = []
def fit(self, X):
self.bin_thresholds = _find_binning_thresholds(X)
self.n_bins = np.array(
[thresholds.shape[0] + 1 for thresholds in self.bin_thresholds], dtype=np.uint32
)
def transform(self, X, assign_nan_to_unique_bin=False):
binned = np.zeros_like(X, dtype=np.float32, order="F")
_map_to_bins(X, self.bin_thresholds, binned)
binned = _assign_nan_to_bin(binned, X, self.n_bins, assign_nan_to_unique_bin)
return binned