Source code for dte_adj

import numpy as np
from typing import Tuple
from scipy.stats import norm
from copy import deepcopy
from abc import ABC
from .util import compute_confidence_intervals

__all__ = ["SimpleDistributionEstimator", "AdjustedDistributionEstimator"]


class DistributionEstimatorBase(ABC):
    """A mixin including several convenience functions to compute and display distribution functions."""

    def __init__(self):
        """Initializes the DistributionFunctionMixin.

        Returns:
            DistributionFunctionMixin: An instance of the estimator.
        """
        self.confoundings = None
        self.outcomes = None
        self.treatment_arms = None

    def predict_dte(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        locations: np.ndarray,
        alpha: float = 0.05,
        variance_type="moment",
        n_bootstrap=500,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute DTE based on the estimator for the distribution function.

        Args:
            target_treatment_arm (int): The index of the treatment arm of the treatment group.
            control_treatment_arm (int): The index of the treatment arm of the control group.
            locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
            alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
            variance_type (str, optional): Variance type to be used to compute confidence intervals. Available values are moment, simple, and uniform.
            n_bootstrap (int, optional): Number of bootstrap samples. Defaults to 500.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
                - Expected DTEs
                - Upper bounds
                - Lower bounds
        """
        return self._compute_dtes(
            target_treatment_arm,
            control_treatment_arm,
            locations,
            alpha,
            variance_type,
            n_bootstrap,
        )

    def predict_pte(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        width: float,
        locations: np.ndarray,
        alpha: float = 0.05,
        variance_type="moment",
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute PTE based on the estimator for the distribution function.

        Args:
            target_treatment_arm (int): The index of the treatment arm of the treatment group.
            control_treatment_arm (int): The index of the treatment arm of the control group.
            locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
            width (float): The width of each outcome interval.
            alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
            variance_type (str, optional): Variance type to be used to compute confidence intervals. Available values are moment, simple, and uniform.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
                - Expected PTEs
                - Upper bounds
                - Lower bounds
        """
        return self._compute_ptes(
            target_treatment_arm,
            control_treatment_arm,
            locations,
            width,
            alpha,
            variance_type,
        )

    def predict_qte(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        quantiles: np.ndarray = np.array(
            [0.1 * i for i in range(1, 10)], dtype=np.float32
        ),
        alpha: float = 0.05,
        n_bootstrap=500,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute QTE based on the estimator for the distribution function.

        Args:
            target_treatment_arm (int): The index of the treatment arm of the treatment group.
            control_treatment_arm (int): The index of the treatment arm of the control group.
            quantiles (np.ndarray, optional): Quantiles used for QTE. Defaults to [0.1 * i for i in range(1, 10)].
            alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
            n_bootstrap (int, optional): Number of bootstrap samples. Defaults to 500.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
                - Expected QTEs
                - Upper bounds
                - Lower bounds
        """
        qte = self._compute_qtes(
            target_treatment_arm,
            control_treatment_arm,
            quantiles,
            self.confoundings,
            self.treatment_arms,
            self.outcomes,
        )
        n_obs = len(self.outcomes)
        indexes = np.arange(n_obs)

        qtes = np.zeros((n_bootstrap, qte.shape[0]))
        for b in range(n_bootstrap):
            bootstrap_indexes = np.random.choice(indexes, size=n_obs, replace=True)
            qtes[b] = self._compute_qtes(
                target_treatment_arm,
                control_treatment_arm,
                quantiles,
                self.confoundings[bootstrap_indexes],
                self.treatment_arms[bootstrap_indexes],
                self.outcomes[bootstrap_indexes],
            )

        qte_var = qtes.var(axis=0)

        qte_lower = qte + norm.ppf(alpha / 2) * np.sqrt(qte_var)
        qte_upper = qte + norm.ppf(1 - alpha / 2) * np.sqrt(qte_var)

        return qte, qte_lower, qte_upper

    def _compute_dtes(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        locations: np.ndarray,
        alpha: float,
        variance_type: str,
        n_bootstrap: int,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute expected DTEs."""
        treatment_cdf, treatment_cdf_mat = self._compute_cumulative_distribution(
            target_treatment_arm,
            locations,
            self.confoundings,
            self.treatment_arms,
            self.outcomes,
        )
        control_cdf, control_cdf_mat = self._compute_cumulative_distribution(
            control_treatment_arm,
            locations,
            self.confoundings,
            self.treatment_arms,
            self.outcomes,
        )

        dte = treatment_cdf - control_cdf

        mat_indicator = (self.outcomes[:, np.newaxis] <= locations).astype(int)

        lower_bound, upper_bound = compute_confidence_intervals(
            vec_y=self.outcomes,
            vec_d=self.treatment_arms,
            vec_loc=locations,
            mat_y_u=mat_indicator,
            vec_prediction_target=treatment_cdf,
            vec_prediction_control=control_cdf,
            mat_entire_predictions_target=treatment_cdf_mat,
            mat_entire_predictions_control=control_cdf_mat,
            ind_target=target_treatment_arm,
            ind_control=control_treatment_arm,
            alpha=alpha,
            variance_type=variance_type,
            n_bootstrap=n_bootstrap,
        )

        return (
            dte,
            lower_bound,
            upper_bound,
        )

    def _compute_ptes(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        locations: np.ndarray,
        width: float,
        alpha: float,
        variance_type: str,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute expected PTEs."""
        treatment_cumulative_pre, treatment_cdf_mat_pre = (
            self._compute_cumulative_distribution(
                target_treatment_arm,
                locations,
                self.confoundings,
                self.treatment_arms,
                self.outcomes,
            )
        )
        treatment_cumulative_post, treatment_cdf_mat_post = (
            self._compute_cumulative_distribution(
                target_treatment_arm,
                locations + width,
                self.confoundings,
                self.treatment_arms,
                self.outcomes,
            )
        )
        treatment_pdf = treatment_cumulative_post - treatment_cumulative_pre
        control_cumulative_pre, control_cdf_mat_pre = (
            self._compute_cumulative_distribution(
                control_treatment_arm,
                locations,
                self.confoundings,
                self.treatment_arms,
                self.outcomes,
            )
        )
        control_cumulative_post, control_cdf_mat_post = (
            self._compute_cumulative_distribution(
                control_treatment_arm,
                locations + width,
                self.confoundings,
                self.treatment_arms,
                self.outcomes,
            )
        )
        control_pdf = control_cumulative_post - control_cumulative_pre

        pte = treatment_pdf - control_pdf

        mat_indicator_pre = (self.outcomes[:, np.newaxis] <= locations).astype(int)
        mat_indicator_post = (self.outcomes[:, np.newaxis] <= locations + width).astype(
            int
        )

        lower_bound, upper_bound = compute_confidence_intervals(
            vec_y=self.outcomes,
            vec_d=self.treatment_arms,
            vec_loc=locations,
            mat_y_u=mat_indicator_post - mat_indicator_pre,
            vec_prediction_target=treatment_pdf,
            vec_prediction_control=control_pdf,
            mat_entire_predictions_target=treatment_cdf_mat_post
            - treatment_cdf_mat_pre,
            mat_entire_predictions_control=control_cdf_mat_post - control_cdf_mat_pre,
            ind_target=target_treatment_arm,
            ind_control=control_treatment_arm,
            alpha=alpha,
            variance_type=variance_type,
        )

        return (
            pte,
            lower_bound,
            upper_bound,
        )

    def _compute_qtes(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        quantiles: np.ndarray,
        confoundings: np.ndarray,
        treatment_arms: np.ndarray,
        outcomes: np.array,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute expected QTEs."""
        locations = np.sort(outcomes)

        def find_quantile(quantile, arm):
            low, high = 0, locations.shape[0] - 1
            result = -1
            while low <= high:
                mid = (low + high) // 2
                val, _ = self._compute_cumulative_distribution(
                    arm,
                    np.full((1), locations[mid]),
                    confoundings,
                    treatment_arms,
                    outcomes,
                )
                if val[0] <= quantile:
                    result = locations[mid]
                    low = mid + 1
                else:
                    high = mid - 1
            return result

        result = np.zeros(quantiles.shape)
        for i, q in enumerate(quantiles):
            result[i] = find_quantile(q, target_treatment_arm) - find_quantile(
                q, control_treatment_arm
            )

        return result

    def fit(
        self, confoundings: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray
    ) -> "DistributionEstimatorBase":
        """Train the DistributionEstimatorBase.

        Args:
            confoundings (np.ndarray): Pre-treatment covariates.
            treatment_arms (np.ndarray): The index of the treatment arm.
            outcomes (np.ndarray): Scalar-valued observed outcome.

        Returns:
            DistributionEstimatorBase: The fitted estimator.
        """
        if confoundings.shape[0] != treatment_arms.shape[0]:
            raise ValueError(
                "The shape of confounding and treatment_arm should be same"
            )

        if confoundings.shape[0] != outcomes.shape[0]:
            raise ValueError("The shape of confounding and outcome should be same")

        self.confoundings = confoundings
        self.treatment_arms = treatment_arms
        self.outcomes = outcomes

        return self

    def predict(self, treatment_arm: int, locations: np.ndarray) -> np.ndarray:
        """Compute cumulative distribution values.

        Args:
            treatment_arm (int): The index of the treatment arm.
            outcomes (np.ndarray): Scalar values to be used for computing the cumulative distribution.

        Returns:
            np.ndarray: Estimated cumulative distribution values for the input.
        """
        if self.outcomes is None:
            raise ValueError(
                "This estimator has not been trained yet. Please call fit first"
            )

        if treatment_arm not in self.treatment_arms:
            raise ValueError(
                f"This target treatment arm was not included in the training data: {treatment_arm}"
            )

        return self._compute_cumulative_distribution(
            treatment_arm,
            locations,
            self.confoundings,
            self.treatment_arms,
            self.outcomes,
        )[0]

    def _compute_cumulative_distribution(
        self,
        target_treatment_arm: int,
        locations: np.ndarray,
        confoundings: np.ndarray,
        treatment_arms: np.ndarray,
        outcomes: np.array,
    ) -> np.ndarray:
        """Compute the cumulative distribution values."""
        raise NotImplementedError()


[docs] class SimpleDistributionEstimator(DistributionEstimatorBase): """A class for computing the empirical distribution function and the distributional parameters based on the distribution function. """ def __init__(self): """Initializes the SimpleDistributionEstimator. Returns: SimpleDistributionEstimator: An instance of the estimator. """ super().__init__() def _compute_cumulative_distribution( self, target_treatment_arm: int, locations: np.ndarray, confoundings: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> np.ndarray: """Compute the cumulative distribution values. Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. confoundings: (np.ndarray): An array of confounding variables in the observed data. treatment_arms (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data. Returns: np.ndarray: Estimated cumulative distribution values. """ unique_treatment_arm = np.unique(treatment_arms) d_confounding = {} d_outcome = {} n_obs = outcomes.shape[0] n_loc = locations.shape[0] for arm in unique_treatment_arm: selected_confounding = confoundings[treatment_arms == arm] selected_outcome = outcomes[treatment_arms == arm] sorted_indices = np.argsort(selected_outcome) d_confounding[arm] = selected_confounding[sorted_indices] d_outcome[arm] = selected_outcome[sorted_indices] cumulative_distribution = np.zeros(locations.shape) for i, outcome in enumerate(locations): cumulative_distribution[i] = ( np.searchsorted(d_outcome[target_treatment_arm], outcome, side="right") ) / len(d_outcome[target_treatment_arm]) return cumulative_distribution, np.zeros((n_obs, n_loc))
[docs] class AdjustedDistributionEstimator(DistributionEstimatorBase): """A class is for estimating the adjusted distribution function and computing the Distributional parameters based on the trained conditional estimator.""" def __init__(self, base_model, folds=3, is_multi_task=False): """Initializes the AdjustedDistributionEstimator. Args: base_model (scikit-learn estimator): The base model implementing used for conditional distribution function estimators. The model should implement fit(data, targets) and predict_proba(data). folds (int): The number of folds for cross-fitting. is_multi_task(bool): Whether to use multi-task learning. If True, your base model needs to support multi-task prediction (n_samples, n_features) -> (n_samples, n_targets). Returns: AdjustedDistributionEstimator: An instance of the estimator. """ if (not hasattr(base_model, "predict")) and ( not hasattr(base_model, "predict_proba") ): raise ValueError( "Base model should implement either predict_proba or predict" ) self.base_model = base_model self.folds = folds self.is_multi_task = is_multi_task super().__init__() def _compute_cumulative_distribution( self, target_treatment_arm: int, locations: np.ndarray, confoundings: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> np.ndarray: """Compute the cumulative distribution values. Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. confoundings: (np.ndarray): An array of confounding variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data Returns: np.ndarray: Estimated cumulative distribution values. """ n_records = outcomes.shape[0] n_loc = locations.shape[0] cumulative_distribution = np.zeros(n_loc) superset_prediction = np.zeros((n_records, n_loc)) treatment_mask = treatment_arms == target_treatment_arm confounding_in_arm = confoundings[treatment_mask] n_records_in_arm = len(confounding_in_arm) if self.is_multi_task: subset_prediction = np.zeros( (n_records_in_arm, n_loc) ) # (n_records_in_arm, n_loc) binominal = (outcomes.reshape(-1, 1) <= locations) * 1 # (n_records, n_loc) cdf = binominal[treatment_mask].mean(axis=0) # (n_loc) for fold in range(self.folds): superset_mask = np.arange(n_records) % self.folds == fold subset_test_mask = superset_mask & treatment_mask subset_train_mask = (~superset_mask) & treatment_mask subset_test_mask_inner = superset_mask[treatment_mask] confounding_train = confoundings[subset_train_mask] binominal_train = binominal[subset_train_mask] model = deepcopy(self.base_model) model.fit(confounding_train, binominal_train) subset_prediction[subset_test_mask_inner] = ( self._compute_model_prediction( model, confoundings[subset_test_mask] ) ) superset_prediction[superset_mask] = self._compute_model_prediction( model, confoundings[superset_mask] ) cumulative_distribution = ( cdf - subset_prediction.mean(axis=0) + superset_prediction.mean(axis=0) ) # (n_loc) else: for i, location in enumerate(locations): subset_prediction = np.zeros(n_records_in_arm) binominal = (outcomes <= location) * 1 # (n_records) cdf = binominal[treatment_mask].mean() for fold in range(self.folds): superset_mask = np.arange(n_records) % self.folds == fold subset_test_mask = superset_mask & treatment_mask subset_train_mask = (~superset_mask) & treatment_mask subset_test_mask_inner = superset_mask[treatment_mask] confounding_train = confoundings[subset_train_mask] binominal_train = binominal[subset_train_mask] if len(np.unique(binominal_train)) == 1: subset_prediction[subset_test_mask_inner] = binominal_train[0] superset_prediction[superset_mask, i] = binominal_train[0] continue model = deepcopy(self.base_model) model.fit(confounding_train, binominal_train) subset_prediction[subset_test_mask_inner] = ( self._compute_model_prediction( model, confoundings[subset_test_mask] ) ) superset_prediction[superset_mask, i] = ( self._compute_model_prediction( model, confoundings[superset_mask] ) ) cumulative_distribution[i] = ( cdf - subset_prediction.mean() + superset_prediction[:, i].mean() ) return cumulative_distribution, superset_prediction def _compute_model_prediction(self, model, confoundings: np.ndarray) -> np.ndarray: if hasattr(model, "predict_proba"): if self.is_multi_task: # suppose the shape of prediction is (n_records, n_locations) return model.predict_proba(confoundings) probabilities = model.predict_proba(confoundings) if probabilities.ndim == 1: # when the shape of prediction is (n_records) return probabilities # when the shape of prediction is (n_records, 2) return probabilities[:, 1] else: return model.predict(confoundings)