import numpy as np
from typing import Tuple, Any
from copy import deepcopy
from .base import DistributionEstimatorBase
[docs]
class SimpleStratifiedDistributionEstimator(DistributionEstimatorBase):
"""A class is for estimating the empirical distribution function and computing the Distributional parameters for CAR."""
[docs]
def fit(
self,
covariates: np.ndarray,
treatment_arms: np.ndarray,
outcomes: np.ndarray,
strata: np.ndarray,
) -> "DistributionEstimatorBase":
"""
Train the DistributionEstimatorBase.
Args:
covariates (np.ndarray): Pre-treatment covariates.
treatment_arms (np.ndarray): The index of the treatment arm.
outcomes (np.ndarray): Scalar-valued observed outcome.
Returns:
DistributionEstimatorBase: The fitted estimator.
"""
if covariates.shape[0] != treatment_arms.shape[0]:
raise ValueError("The shape of covariates and treatment_arm should be same")
if covariates.shape[0] != outcomes.shape[0]:
raise ValueError("The shape of covariates and outcome should be same")
self.covariates = covariates
self.treatment_arms = treatment_arms
self.outcomes = outcomes
self.strata = strata
return self
def _compute_cumulative_distribution(
self,
target_treatment_arm: int,
locations: np.ndarray,
covariates: np.ndarray,
treatment_arms: np.ndarray,
outcomes: np.array,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute the cumulative distribution values.
Args:
target_treatment_arm (int): The index of the treatment arm.
locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
covariates: (np.ndarray): An array of covariates variables in the observed data.
treatment_arm (np.ndarray): An array of treatment arms in the observed data.
outcomes (np.ndarray): An array of outcomes in the observed data
Returns:
Tuple of numpy arrays:
- np.ndarray: Unconditional cumulative distribution values.
- np.ndarray: Adjusted cumulative distribution for each observation.
- np.ndarray: Conditional cumulative distribution for each observation.
"""
n_records = outcomes.shape[0]
n_loc = locations.shape[0]
prediction = np.zeros((n_records, n_loc))
treatment_mask = treatment_arms == target_treatment_arm
strata = self.strata
s_list = np.unique(strata)
w_s = {}
for s in s_list:
s_mask = strata == s
w_s[s] = (s_mask & treatment_mask).sum() / s_mask.sum()
for i, outcome in enumerate(locations):
for j in range(n_records):
s = strata[j]
prediction[j, i] = (outcomes[j] <= outcome) / w_s[s] * treatment_mask[j]
unconditional_pred = {s: prediction[s == strata].mean(axis=0) for s in s_list}
conditional_prediction = np.array([unconditional_pred[s] for s in strata])
weights = np.array([w_s[s] for s in strata])[:, np.newaxis]
prediction = (
(outcomes[:, np.newaxis] <= locations) - conditional_prediction
) / weights * treatment_mask[:, np.newaxis] + conditional_prediction
return prediction.mean(axis=0), prediction, conditional_prediction
def _compute_interval_probability(
self,
target_treatment_arm: int,
locations: np.ndarray,
covariates: np.ndarray,
treatment_arms: np.ndarray,
outcomes: np.array,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Compute the interval probabilities.
Args:
target_treatment_arm (int): The index of the treatment arm.
locations (np.ndarray): Scalar values to be used for computing the interval probabilities.
covariates: (np.ndarray): An array of covariates variables in the observed data.
treatment_arm (np.ndarray): An array of treatment arms in the observed data.
outcomes (np.ndarray): An array of outcomes in the observed data
Returns:
Tuple of numpy arrays:
- np.ndarray: Estimated unconditional interval probabilities.
- np.ndarray: Adjusted for each observation.
- np.ndarray: Conditional for each observation.
"""
n_records = outcomes.shape[0]
n_loc = locations.shape[0]
prediction = np.zeros((n_records, n_loc))
treatment_mask = treatment_arms == target_treatment_arm
strata = self.strata
s_list = np.unique(strata)
w_s = {}
for s in s_list:
s_mask = strata == s
w_s[s] = (s_mask & treatment_mask).sum() / s_mask.sum()
for i, outcome in enumerate(locations):
for j in range(n_records):
s = strata[j]
prediction[j, i] = (outcomes[j] <= outcome) / w_s[s] * treatment_mask[j]
unconditional_pred = {s: prediction[s == strata].mean(axis=0) for s in s_list}
conditional_prediction = np.array([unconditional_pred[s] for s in strata])
weights = np.array([w_s[s] for s in strata])[:, np.newaxis]
prediction = (
(outcomes[:, np.newaxis] <= locations) - conditional_prediction
) / weights * treatment_mask[:, np.newaxis] + conditional_prediction
cdf = prediction.mean(axis=0)
return (
cdf[1:] - cdf[:-1],
prediction[:, 1:] - prediction[:, :-1],
conditional_prediction[:, 1:] - conditional_prediction[:, :-1],
)
[docs]
class AdjustedStratifiedDistributionEstimator(DistributionEstimatorBase):
"""A class is for estimating the adjusted distribution function and computing the Distributional parameters for CAR."""
def __init__(self, base_model: Any, folds=3, is_multi_task=False):
"""
Initializes the AdjustedDistributionEstimator.
Args:
base_model (scikit-learn estimator): The base model implementing used for conditional distribution function estimators. The model should implement fit(data, targets) and predict_proba(data).
folds (int): The number of folds for cross-fitting.
is_multi_task(bool): Whether to use multi-task learning. If True, your base model needs to support multi-task prediction (n_samples, n_features) -> (n_samples, n_targets).
Returns:
AdjustedDistributionEstimator: An instance of the estimator.
"""
if (not hasattr(base_model, "predict")) and (
not hasattr(base_model, "predict_proba")
):
raise ValueError(
"Base model should implement either predict_proba or predict"
)
self.base_model = base_model
self.folds = folds
self.is_multi_task = is_multi_task
super().__init__()
[docs]
def fit(
self,
covariates: np.ndarray,
treatment_arms: np.ndarray,
outcomes: np.ndarray,
strata: np.ndarray,
) -> "DistributionEstimatorBase":
"""
Train the DistributionEstimatorBase.
Args:
covariates (np.ndarray): Pre-treatment covariates.
treatment_arms (np.ndarray): The index of the treatment arm.
outcomes (np.ndarray): Scalar-valued observed outcome.
Returns:
DistributionEstimatorBase: The fitted estimator.
"""
if covariates.shape[0] != treatment_arms.shape[0]:
raise ValueError("The shape of covariates and treatment_arm should be same")
if covariates.shape[0] != outcomes.shape[0]:
raise ValueError("The shape of covariates and outcome should be same")
self.covariates = covariates
self.treatment_arms = treatment_arms
self.outcomes = outcomes
self.strata = strata
return self
def _compute_cumulative_distribution(
self,
target_treatment_arm: int,
locations: np.ndarray,
covariates: np.ndarray,
treatment_arms: np.ndarray,
outcomes: np.array,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute the cumulative distribution values.
Args:
target_treatment_arm (int): The index of the treatment arm.
locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
covariates: (np.ndarray): An array of covariates variables in the observed data.
treatment_arm (np.ndarray): An array of treatment arms in the observed data.
outcomes (np.ndarray): An array of outcomes in the observed data
Returns:
Tuple of numpy arrays:
- np.ndarray: Unconditional cumulative distribution values.
- np.ndarray: Adjusted cumulative distribution for each observation.
- np.ndarray: Conditional cumulative distribution for each observation.
"""
n_records = outcomes.shape[0]
n_loc = locations.shape[0]
superset_prediction = np.zeros((n_records, n_loc))
prediction = np.zeros((n_records, n_loc))
treatment_mask = treatment_arms == target_treatment_arm
folds = np.random.randint(self.folds, size=n_records)
strata = self.strata
s_list = np.unique(strata)
if self.is_multi_task:
binomial = (outcomes.reshape(-1, 1) <= locations) * 1 # (n_records, n_loc)
for fold in range(self.folds):
fold_mask = (folds != fold) & treatment_mask
for s in s_list:
s_mask = strata == s
weight = (s_mask & treatment_mask).sum() / s_mask.sum()
superset_mask = (folds == fold) & s_mask
subset_train_mask = (folds != fold) & s_mask & treatment_mask
covariates_train = covariates[subset_train_mask]
binomial_train = binomial[subset_train_mask]
if len(np.unique(binomial_train)) > 1:
self.model = deepcopy(self.base_model)
self.model.fit(covariates_train, binomial_train)
pred = self._compute_model_prediction(
self.model, covariates[superset_mask]
)
prediction[superset_mask] = (
pred
+ treatment_mask[superset_mask].reshape(-1, 1)
* (binomial[superset_mask] - pred)
/ weight
)
superset_prediction[superset_mask] = pred
else:
for i, location in enumerate(locations):
binomial = (outcomes <= location) * 1 # (n_records)
for fold in range(self.folds):
fold_mask = (folds != fold) & treatment_mask
covariates_train = covariates[fold_mask]
binomial_train = binomial[fold_mask]
# Pool the records across strata and train the model
if len(np.unique(binomial_train)) > 1:
self.model = deepcopy(self.base_model)
self.model.fit(covariates_train, binomial_train)
for s in s_list:
s_mask = strata == s
weight = (s_mask & treatment_mask).sum() / s_mask.sum()
superset_mask = (folds == fold) & s_mask
subset_train_mask = (folds != fold) & s_mask & treatment_mask
covariates_train = covariates[subset_train_mask]
binomial_train = binomial[subset_train_mask]
# TODO: revisit the logic here
if len(np.unique(binomial_train)) > 1:
# self.model = deepcopy(self.base_model)
# self.model.fit(covariates_train, binomial_train)
pass
else:
pred = binomial_train[0]
superset_prediction[superset_mask, i] = pred
prediction[superset_mask, i] = (
pred
+ treatment_mask[superset_mask]
* (binomial[superset_mask] - pred)
/ weight
)
continue
pred = self._compute_model_prediction(
self.model, covariates[superset_mask]
)
prediction[superset_mask, i] = (
pred
+ treatment_mask[superset_mask]
* (binomial[superset_mask] - pred)
/ weight
)
superset_prediction[superset_mask, i] = pred
return prediction.mean(axis=0), prediction, superset_prediction
def _compute_interval_probability(
self,
target_treatment_arm: int,
locations: np.ndarray,
covariates: np.ndarray,
treatment_arms: np.ndarray,
outcomes: np.array,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Compute the interval probabilities.
Args:
target_treatment_arm (int): The index of the treatment arm.
locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
covariates: (np.ndarray): An array of covariates variables in the observed data.
treatment_arm (np.ndarray): An array of treatment arms in the observed data.
outcomes (np.ndarray): An array of outcomes in the observed data
Returns:
Tuple of numpy arrays:
- np.ndarray: Unconditional interval probabilities.
- np.ndarray: Adjusted interval probabilities for each observation.
- np.ndarray: Conditional interval probabilities for each observation.
"""
n_records = outcomes.shape[0]
n_loc = locations.shape[0]
superset_prediction = np.zeros((n_records, n_loc - 1))
prediction = np.zeros((n_records, n_loc - 1))
treatment_mask = treatment_arms == target_treatment_arm
folds = np.random.randint(self.folds, size=n_records)
strata = self.strata
s_list = np.unique(strata)
binominals = (outcomes[:, np.newaxis] <= locations) * 1 # (n_records, n_loc)
for i in range(len(locations) - 1):
binomial = binominals[:, i + 1] - binominals[:, i]
for fold in range(self.folds):
fold_mask = (folds != fold) & treatment_mask
covariates_train = covariates[fold_mask]
binomial_train = binomial[fold_mask]
if len(np.unique(binomial_train)) > 1:
self.model = deepcopy(self.base_model)
self.model.fit(covariates_train, binomial_train)
for s in s_list:
s_mask = strata == s
weight = (s_mask & treatment_mask).sum() / s_mask.sum()
superset_mask = (folds == fold) & s_mask
subset_train_mask = (folds != fold) & s_mask & treatment_mask
covariates_train = covariates[subset_train_mask]
binomial_train = binomial[subset_train_mask]
if len(np.unique(binomial_train)) == 1:
pred = binomial_train[0]
superset_prediction[superset_mask, i] = pred
prediction[superset_mask, i] = (
pred
+ treatment_mask[superset_mask]
* (binomial[superset_mask] - pred)
/ weight
)
continue
pred = self._compute_model_prediction(
self.model, covariates[superset_mask]
)
prediction[superset_mask, i] = (
pred
+ treatment_mask[superset_mask]
* (binomial[superset_mask] - pred)
/ weight
)
superset_prediction[superset_mask, i] = pred
return prediction.mean(axis=0), prediction, superset_prediction
def _compute_model_prediction(self, model, covariates: np.ndarray) -> np.ndarray:
if hasattr(model, "predict_proba"):
if self.is_multi_task:
# suppose the shape of prediction is (n_records, n_locations)
return model.predict_proba(covariates)
probabilities = model.predict_proba(covariates)
if probabilities.ndim == 1:
# when the shape of prediction is (n_records)
return probabilities
# when the shape of prediction is (n_records, 2)
return probabilities[:, 1]
else:
return model.predict(covariates)