"""
Types holding intermediate and final data for the algorithm.
"""
from __future__ import annotations
import logging
import typing as t
from collections import abc, Counter
from dataclasses import dataclass, field
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from eBoruta.base import _X, _Y, ValidationError
from eBoruta.dataprep import prepare_x, prepare_y, prepare_w, has_missing
from eBoruta.utils import get_duplicates
LOGGER = logging.getLogger(__name__)
[docs]
@dataclass(frozen=True)
class TrialData(t.Generic[_Y]):
"""
Data for a Boruta trial.
"""
x_train: pd.DataFrame
x_test: pd.DataFrame
y_train: _Y
y_test: _Y
#: Weights for train and test folds
w_train: t.Optional[np.ndarray] = None
w_test: t.Optional[np.ndarray] = None
@property
def shapes(self) -> str:
"""
Descriptor property.
:return: a string with shapes of x and y attributes.
"""
return (
f"x_train: {self.x_train.shape}, y_train: {self.y_train.shape}, "
f"x_test: {self.x_test.shape}, y_test: {self.y_test.shape}"
)
# @dataclass
[docs]
class Dataset(t.Generic[_X, _Y]):
"""
A container holding permanent data (x, y and weights) for
training/validation/testing/etc.
"""
[docs]
def __init__(self, x: t.Any, y: t.Any, w: t.Any = None, min_features: int = 5):
self._x, self._y, self._w = prepare_x(x), prepare_y(y), prepare_w(w)
self.min_features = min_features
if has_missing(self.y):
raise ValidationError("Missing values in y")
if len(self.x) != len(self.y):
raise ValidationError(
f"The number of observations in x {len(self.x)} "
f"does not match the number in y {len(self.y)}"
)
if self.w is not None and len(self.x) != len(self.w):
raise ValidationError(
f"The number of observations in x {len(self.x)} "
f"does not match the number in w {len(self.w)}"
)
@property
def x(self) -> pd.DataFrame:
"""
:return: Variables' dataframe.
"""
return self._x
@property
def y(self) -> np.ndarray:
"""
:return: Target variables' array.
"""
return self._y
@property
def w(self) -> np.ndarray | None:
"""
:return: Sample weights' array.
"""
return self._w
[docs]
def generate_trial_sample(
self, columns: None | list[str] | np.ndarray = None, **kwargs
) -> TrialData:
"""
Generates data for a single Boruta trial based on :attr:`x`, :attr:`y`,
and :attr:`w`. Creates a copy of :attr:`x`, permutes rows, and renames
columns as "shadow_{original_name}". Concatenates original dataframe
and the one with the shadow features to create a copy of the learning
data with at least twice as many features.
If the number of features in :attr:`x` after selecting by ``columns``
is below :attr:`min_features`, randomly oversample existing features
to account for the difference. Thus, the returned dataframe to always
have at least :attr:`min_features` columns.
:param columns: An optional list or array of columns to select from
:attr:`x`.
:param kwargs: Keyword args passed to :func:`train_test_split` used to
create train/test splits. Enable this feature by passing
``test_size={f}`` where ``f`` is the test size fraction.
This allows using different datasets for training and importance
computation.
:return: A prepared trial data.
:raises RuntimeError: If resulting features have duplicate names.
"""
if columns is None:
columns = list(self.x.columns)
if not isinstance(columns, list):
columns = list(columns)
x_init = self.x[columns].copy()
LOGGER.debug(f"Using columns {columns} as features")
x_shadow = (
self.x[columns]
.copy()
.sample(frac=1)
.reset_index(drop=True)
.rename(columns={c: f"shadow_{c}" for c in columns})
)
if self.min_features is not None:
n_add_samples = self.min_features - len(columns)
if n_add_samples > 0:
sampled = (
x_shadow.sample(n=n_add_samples, axis=1, replace=True)
.sample(frac=1)
.reset_index(drop=True)
)
name_counts = Counter(sampled.columns)
for name, count in name_counts.items():
d = {name: [f"{name}_{i}" for i in range(count)]}
sampled.rename(
columns=lambda c: d[c].pop(0) if c in d.keys() else c,
inplace=True,
)
x_shadow = pd.concat([x_shadow, sampled], axis=1)
LOGGER.debug(
f"Added {sampled.shape[1]} columns to reach "
f"the min number of features {self.min_features}"
)
LOGGER.debug(
f"Created a dataset of shadow features with shape {x_shadow.shape}"
)
x = pd.concat([x_init, x_shadow], axis=1)
duplicates = list(get_duplicates(x.columns))
if duplicates:
raise RuntimeError(f"Features contain duplicate names {duplicates}")
LOGGER.debug(
f"Merged with initial dataset to get a dataset with shape {x.shape}"
)
y = self.y.copy()
w = self.w.copy() if self.w is not None else self.w
test_size = kwargs.get("test_size")
if test_size is None:
return TrialData(x, x, y, y, w, w)
if self.w is None:
return TrialData(*train_test_split(x, y, **kwargs))
return TrialData(*train_test_split(x, y, w, **kwargs))
[docs]
@dataclass
class Features:
# TODO: consider adding slicing on steps and selecting columns pandas-like
"""
A dynamic container representing a set of features used by Boruta
throughout the run.
It's created internally and maintained by :class:`eBoruta.algorithm.eBoruta`.
"""
#: An array of feature names.
names: np.ndarray
accepted_mask: np.ndarray = field(init=False)
rejected_mask: np.ndarray = field(init=False)
tentative_mask: np.ndarray = field(init=False)
hit_history: pd.DataFrame = field(init=False)
imp_history: pd.DataFrame = field(init=False)
dec_history: pd.DataFrame = field(init=False)
_history: pd.DataFrame | None = None
def __post_init__(self):
n = len(self.names)
self.accepted_mask, self.rejected_mask = np.zeros(n).astype(bool), np.zeros(
n
).astype(bool)
self.tentative_mask = np.ones(n).astype(bool)
self.hit_history = pd.DataFrame(columns=self.names)
self.imp_history = pd.DataFrame(columns=self.names)
self.dec_history = pd.DataFrame(columns=self.names)
def __len__(self) -> int:
return len(self.hit_history)
def __getitem__(self, item: t.Any) -> t.Self:
def get_selectors() -> tuple[int | slice, list]:
match item:
case [int() | slice(), abc.Sequence()]:
if len(item) != 2:
raise IndexError('Too many indexing items')
return item
case slice():
return item, list(self.names)
case list():
return slice(1, len(self.names)), item
case _:
raise IndexError('Unsupported idx type')
steps, cols = get_selectors()
if isinstance(steps, int):
steps = [steps]
new = Features(np.intersect1d(self.names, cols))
new.hit_history = self.hit_history.iloc[steps][cols].copy()
new.imp_history = self.imp_history.iloc[steps][cols + ['Threshold']].copy()
new.dec_history = self.dec_history.iloc[steps][cols].copy()
decisions = new.dec_history.iloc[-1]
new.accepted_mask = decisions == 1
new.rejected_mask = decisions == -1
new.tentative_mask = decisions == 0
return new
@property
def shape(self) -> tuple[int, int]:
"""
:return: (# steps, # features)
"""
return len(self), len(self.names)
@property
def accepted(self) -> np.ndarray:
"""
return: An array of feature names marked as accepted.
"""
return self.names[self.accepted_mask]
@property
def rejected(self) -> np.ndarray:
"""
:return: An array of feature names marked as rejected.
"""
return self.names[self.rejected_mask]
@property
def tentative(self) -> np.ndarray:
"""
:return: An array of feature names marked as tentative.
"""
return self.names[self.tentative_mask]
@property
def history(self) -> pd.DataFrame:
"""
:return: A history dataframe created using :meth:`compose_summary` if
it doesn't exist.
"""
if self._history is None:
self._history = self.compose_history()
return self._history
[docs]
def accepted_at_step(self, step: int) -> np.ndarray:
"""
:param step: Step (trial) number.
:return: Feature names accepted at `step`.
"""
df = self.history
return df[(df.Step == step) & (df.Decision == 'Accepted')]['Feature'].values
[docs]
def compose_history(self) -> pd.DataFrame:
"""
Access the selection history and compose a summary table.
:return: A history dataframe.
"""
if self._history is not None:
LOGGER.warning(
f"Overwriting existing history with shape {self._history.shape}"
)
self.reset_history_index()
imp = self.melt_history(
self.imp_history.drop(columns="Threshold"), "Importance"
)
hit = self.melt_history(self.hit_history, "Hit")
dec = self.melt_history(self.dec_history, "Decision")
threshold = self.imp_history.reset_index().rename(columns={"index": "Step"})[
["Step", "Threshold"]
]
_steps = imp["Step"].values
_feature = imp["Feature"].values
df = pd.concat(
(_x.drop(columns=["Step", "Feature"]) for _x in [imp, hit, dec]), axis=1
)
df["Step"] = _steps
df["Feature"] = _feature
df = df[["Feature", "Step", "Importance", "Hit", "Decision"]].merge(
threshold, on="Step", how="left"
)
df["Decision"] = df["Decision"].map(
{0: "Tentative", -1: "Rejected", 1: "Accepted"}
)
df["Step"] += 1
return df
[docs]
@staticmethod
def melt_history(df: pd.DataFrame, value_name: str) -> pd.DataFrame:
df = df.copy()
columns = df.columns
df["Step"] = np.arange(len(df), dtype=int)
df = df.melt(
id_vars="Step",
value_vars=columns,
var_name="Feature",
value_name=value_name,
)
return df
[docs]
def reset_history_index(self) -> None:
"""
Bulk-:meth:`pd.DataFrame.reset_index`. of importance, decision and
hit history dataframes.
"""
for df in [self.imp_history, self.dec_history, self.hit_history]:
df.reset_index(drop=True, inplace=True)
if __name__ == "__main__":
raise RuntimeError