Source code for dte_adj

import numpy as np
from typing import Tuple, Optional, Any
from scipy.stats import norm
from copy import deepcopy
from abc import ABC
from .util import compute_confidence_intervals, compute_ldte, compute_lpte

__all__ = [
    "SimpleDistributionEstimator",
    "AdjustedDistributionEstimator",
    "SimpleStratifiedDistributionEstimator",
    "AdjustedStratifiedDistributionEstimator",
    "SimpleLocalDistributionEstimator",
    "AdjustedLocalDistributionEstimator",
]


class DistributionEstimatorBase(ABC):
    """A mixin including several convenience functions to compute and display distribution functions."""

    def __init__(self):
        """
        Initializes the DistributionFunctionMixin.

        Returns:
            DistributionFunctionMixin: An instance of the estimator.
        """
        self.covariates = None
        self.outcomes = None
        self.treatment_arms = None

    def predict_dte(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        locations: np.ndarray,
        alpha: float = 0.05,
        variance_type="moment",
        n_bootstrap=500,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Compute Distribution Treatment Effects (DTE) based on the estimator for the distribution function.

        The DTE measures the difference in cumulative distribution functions between treatment groups
        at specified locations. It quantifies how treatment affects the probability of observing
        outcomes below each threshold.

        Args:
            target_treatment_arm (int): The index of the treatment arm of the treatment group.
            control_treatment_arm (int): The index of the treatment arm of the control group.
            locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
            alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
            variance_type (str, optional): Variance type to be used to compute confidence intervals.
                Available values are "moment", "simple", and "uniform". Defaults to "moment".
            n_bootstrap (int, optional): Number of bootstrap samples. Defaults to 500.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
                - Expected DTEs (np.ndarray): Treatment effect estimates at each location
                - Lower bounds (np.ndarray): Lower confidence interval bounds
                - Upper bounds (np.ndarray): Upper confidence interval bounds

        Example:
            .. code-block:: python

                import numpy as np
                from dte_adj import SimpleDistributionEstimator

                # Generate sample data
                X = np.random.randn(1000, 5)
                D = np.random.binomial(1, 0.5, 1000)
                Y = X[:, 0] + 2 * D + np.random.randn(1000)

                # Fit estimator
                estimator = SimpleDistributionEstimator()
                estimator.fit(X, D, Y)

                # Compute DTE
                locations = np.linspace(Y.min(), Y.max(), 20)
                dte, lower, upper = estimator.predict_dte(
                    target_treatment_arm=1,
                    control_treatment_arm=0,
                    locations=locations,
                    variance_type="moment"
                )

                print(f"DTE shape: {dte.shape}")  # Should match locations.shape
                print(f"Average DTE: {dte.mean():.3f}")
        """
        return self._compute_dtes(
            target_treatment_arm,
            control_treatment_arm,
            locations,
            alpha,
            variance_type,
            n_bootstrap,
        )

    def predict_pte(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        locations: np.ndarray,
        alpha: float = 0.05,
        variance_type="moment",
        n_bootstrap=500,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Compute Probability Treatment Effects (PTE) based on the estimator for the distribution function.

        The PTE measures the difference in probability mass between treatment groups for intervals
        defined by consecutive location pairs. It quantifies how treatment affects the probability
        of observing outcomes within specific ranges.

        Args:
            target_treatment_arm (int): The index of the treatment arm of the treatment group.
            control_treatment_arm (int): The index of the treatment arm of the control group.
            locations (np.ndarray): Scalar values defining interval boundaries for probability computation.
                For each interval (locations[i], locations[i+1]], the PTE is computed.
            alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
            variance_type (str, optional): Variance type to be used to compute confidence intervals.
                Available values are "moment", "simple", and "uniform". Defaults to "moment".
            n_bootstrap (int, optional): Number of bootstrap samples. Defaults to 500.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
                - Expected PTEs (np.ndarray): Treatment effect estimates for each interval,
                  shape (len(locations)-1,)
                - Lower bounds (np.ndarray): Lower confidence interval bounds
                - Upper bounds (np.ndarray): Upper confidence interval bounds

        Example:
            .. code-block:: python

                import numpy as np
                from dte_adj import SimpleDistributionEstimator

                # Generate sample data
                X = np.random.randn(1000, 5)
                D = np.random.binomial(1, 0.5, 1000)
                Y = X[:, 0] + 2 * D + np.random.randn(1000)

                # Fit estimator
                estimator = SimpleDistributionEstimator()
                estimator.fit(X, D, Y)

                # Define interval boundaries
                locations = np.array([-2, -1, 0, 1, 2])  # Creates intervals: (-2,-1], (-1,0], (0,1], (1,2]

                # Compute PTE
                pte, lower, upper = estimator.predict_pte(
                    target_treatment_arm=1,
                    control_treatment_arm=0,
                    locations=locations,
                    variance_type="moment"
                )

                print(f"PTE shape: {pte.shape}")  # Should be (4,) for 4 intervals
                print(f"Interval effects: {pte}")
        """
        return self._compute_ptes(
            target_treatment_arm,
            control_treatment_arm,
            locations,
            alpha,
            variance_type,
            n_bootstrap,
        )

    def predict_qte(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        quantiles: Optional[np.ndarray] = None,
        alpha: float = 0.05,
        n_bootstrap=500,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Compute Quantile Treatment Effects (QTE) based on the estimator for the distribution function.

        The QTE measures the difference in quantiles between treatment groups, providing insights
        into how treatment affects different parts of the outcome distribution. For stratified
        estimators, the computation properly accounts for strata.

        Args:
            target_treatment_arm (int): The index of the treatment arm of the treatment group.
            control_treatment_arm (int): The index of the treatment arm of the control group.
            quantiles (np.ndarray, optional): Quantiles used for QTE. Defaults to [0.1, 0.2, ..., 0.9].
            alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05.
            n_bootstrap (int, optional): Number of bootstrap samples. Defaults to 500.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing:
                - Expected QTEs (np.ndarray): Treatment effect estimates at each quantile
                - Lower bounds (np.ndarray): Lower confidence interval bounds
                - Upper bounds (np.ndarray): Upper confidence interval bounds

        Example:
            .. code-block:: python

                import numpy as np
                from dte_adj import SimpleStratifiedDistributionEstimator

                # Generate stratified sample data
                X = np.random.randn(1000, 5)
                strata = np.random.choice([0, 1, 2], size=1000)
                D = np.random.binomial(1, 0.5, 1000)
                Y = X[:, 0] + 2 * D + 0.5 * strata + np.random.randn(1000)

                # Fit stratified estimator
                estimator = SimpleStratifiedDistributionEstimator()
                estimator.fit(X, D, Y, strata)

                # Compute QTE at specific quantiles
                quantiles = np.array([0.25, 0.5, 0.75])  # 25th, 50th, 75th percentiles
                qte, lower, upper = estimator.predict_qte(
                    target_treatment_arm=1,
                    control_treatment_arm=0,
                    quantiles=quantiles,
                    n_bootstrap=100
                )

                print(f"QTE at quantiles {quantiles}: {qte}")
                print(f"Median effect (50th percentile): {qte[1]:.3f}")
        """
        qte = self._compute_qtes(
            target_treatment_arm,
            control_treatment_arm,
            quantiles,
            self.covariates,
            self.treatment_arms,
            self.outcomes,
            self.strata,
        )
        n_obs = len(self.outcomes)
        indexes = np.arange(n_obs)

        qtes = np.zeros((n_bootstrap, qte.shape[0]))
        for b in range(n_bootstrap):
            bootstrap_indexes = np.random.choice(indexes, size=n_obs, replace=True)

            qtes[b] = self._compute_qtes(
                target_treatment_arm,
                control_treatment_arm,
                quantiles,
                self.covariates[bootstrap_indexes],
                self.treatment_arms[bootstrap_indexes],
                self.outcomes[bootstrap_indexes],
                self.strata[bootstrap_indexes],
            )

        qte_var = qtes.var(axis=0)

        qte_lower = qte + norm.ppf(alpha / 2) * np.sqrt(qte_var)
        qte_upper = qte + norm.ppf(1 - alpha / 2) * np.sqrt(qte_var)

        return qte, qte_lower, qte_upper

    def _compute_dtes(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        locations: np.ndarray,
        alpha: float,
        variance_type: str,
        n_bootstrap: int,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute expected DTEs."""
        treatment_cdf, treatment_cdf_mat, _ = self._compute_cumulative_distribution(
            target_treatment_arm,
            locations,
            self.covariates,
            self.treatment_arms,
            self.outcomes,
        )
        control_cdf, control_cdf_mat, _ = self._compute_cumulative_distribution(
            control_treatment_arm,
            locations,
            self.covariates,
            self.treatment_arms,
            self.outcomes,
        )

        dte = treatment_cdf - control_cdf

        mat_indicator = (self.outcomes[:, np.newaxis] <= locations).astype(int)

        lower_bound, upper_bound = compute_confidence_intervals(
            vec_y=self.outcomes,
            vec_d=self.treatment_arms,
            vec_loc=locations,
            mat_y_u=mat_indicator,
            vec_prediction_target=treatment_cdf,
            vec_prediction_control=control_cdf,
            mat_entire_predictions_target=treatment_cdf_mat,
            mat_entire_predictions_control=control_cdf_mat,
            ind_target=target_treatment_arm,
            ind_control=control_treatment_arm,
            alpha=alpha,
            variance_type=variance_type,
            n_bootstrap=n_bootstrap,
        )

        return (
            dte,
            lower_bound,
            upper_bound,
        )

    def _compute_ptes(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        locations: np.ndarray,
        alpha: float,
        variance_type: str,
        n_bootstrap: int,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Compute expected PTEs."""
        treatment_pdf, treatment_pdf_mat, _ = self._compute_interval_probability(
            target_treatment_arm,
            locations,
            self.covariates,
            self.treatment_arms,
            self.outcomes,
        )
        control_pdf, control_pdf_mat, _ = self._compute_interval_probability(
            control_treatment_arm,
            locations,
            self.covariates,
            self.treatment_arms,
            self.outcomes,
        )

        pte = treatment_pdf - control_pdf

        # Compute interval indicators for confidence intervals
        mat_indicator = (self.outcomes[:, np.newaxis] <= locations).astype(int)
        mat_interval_indicator = mat_indicator[:, 1:] - mat_indicator[:, :-1]

        lower_bound, upper_bound = compute_confidence_intervals(
            vec_y=self.outcomes,
            vec_d=self.treatment_arms,
            vec_loc=locations[:-1],  # Use interval boundaries
            mat_y_u=mat_interval_indicator,
            vec_prediction_target=treatment_pdf,
            vec_prediction_control=control_pdf,
            mat_entire_predictions_target=treatment_pdf_mat,
            mat_entire_predictions_control=control_pdf_mat,
            ind_target=target_treatment_arm,
            ind_control=control_treatment_arm,
            alpha=alpha,
            variance_type=variance_type,
            n_bootstrap=n_bootstrap,
        )

        return (
            pte,
            lower_bound,
            upper_bound,
        )

    def _compute_qtes(
        self,
        target_treatment_arm: int,
        control_treatment_arm: int,
        quantiles: np.ndarray,
        covariates: np.ndarray,
        treatment_arms: np.ndarray,
        outcomes: np.array,
        strata: np.ndarray,
    ) -> np.ndarray:
        """Compute expected QTEs."""
        locations = np.sort(outcomes)

        def find_quantile(quantile, arm):
            low, high = 0, locations.shape[0] - 1
            result = -1
            while low <= high:
                mid = (low + high) // 2
                # Temporarily store original strata and use the provided strata
                original_strata = self.strata
                self.strata = strata

                val, _, _ = self._compute_cumulative_distribution(
                    arm,
                    np.full((1), locations[mid]),
                    covariates,
                    treatment_arms,
                    outcomes,
                )

                # Restore original strata
                self.strata = original_strata

                if val[0] <= quantile:
                    result = locations[mid]
                    low = mid + 1
                else:
                    high = mid - 1
            return result

        result = np.zeros(quantiles.shape)
        for i, q in enumerate(quantiles):
            result[i] = find_quantile(q, target_treatment_arm) - find_quantile(
                q, control_treatment_arm
            )

        return result

    def predict(self, treatment_arm: int, locations: np.ndarray) -> np.ndarray:
        """
        Compute cumulative distribution values.

        Args:
            treatment_arm (int): The index of the treatment arm.
            outcomes (np.ndarray): Scalar values to be used for computing the cumulative distribution.

        Returns:
            np.ndarray: Estimated cumulative distribution values for the input.
        """
        if self.outcomes is None:
            raise ValueError(
                "This estimator has not been trained yet. Please call fit first"
            )

        if treatment_arm not in self.treatment_arms:
            raise ValueError(
                f"This target treatment arm was not included in the training data: {treatment_arm}"
            )

        return self._compute_cumulative_distribution(
            treatment_arm,
            locations,
            self.covariates,
            self.treatment_arms,
            self.outcomes,
        )[0]

    def _compute_cumulative_distribution(
        self,
        target_treatment_arm: int,
        locations: np.ndarray,
        covariates: np.ndarray,
        treatment_arms: np.ndarray,
        outcomes: np.array,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Compute the cumulative distribution values.

        Args:
            target_treatment_arm (int): The index of the treatment arm.
            locations (np.ndarray): Scalar values to be used for computing the cumulative distribution.
            covariates: (np.ndarray): An array of covariates variables in the observed data.
            treatment_arms (np.ndarray): An array of treatment arms in the observed data.
            outcomes (np.ndarray): An array of outcomes in the observed data.

        Returns:
            Tuple[np.ndarray, np.ndarray, np.ndarray]: Estimated cumulative distribution values, prediction for each observation, and superset prediction for each observation.
        """
        raise NotImplementedError()


[docs] class SimpleStratifiedDistributionEstimator(DistributionEstimatorBase): """A class is for estimating the empirical distribution function and computing the Distributional parameters for CAR."""
[docs] def fit( self, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray, strata: np.ndarray, ) -> "DistributionEstimatorBase": """ Train the DistributionEstimatorBase. Args: covariates (np.ndarray): Pre-treatment covariates. treatment_arms (np.ndarray): The index of the treatment arm. outcomes (np.ndarray): Scalar-valued observed outcome. Returns: DistributionEstimatorBase: The fitted estimator. """ if covariates.shape[0] != treatment_arms.shape[0]: raise ValueError("The shape of covariates and treatment_arm should be same") if covariates.shape[0] != outcomes.shape[0]: raise ValueError("The shape of covariates and outcome should be same") self.covariates = covariates self.treatment_arms = treatment_arms self.outcomes = outcomes self.strata = strata return self
def _compute_cumulative_distribution( self, target_treatment_arm: int, locations: np.ndarray, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute the cumulative distribution values. Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data Returns: Tuple of numpy arrays: - np.ndarray: Unconditional cumulative distribution values. - np.ndarray: Adjusted cumulative distribution for each observation. - np.ndarray: Conditional cumulative distribution for each observation. """ n_records = outcomes.shape[0] n_loc = locations.shape[0] prediction = np.zeros((n_records, n_loc)) treatment_mask = treatment_arms == target_treatment_arm strata = self.strata s_list = np.unique(strata) w_s = {} for s in s_list: s_mask = strata == s w_s[s] = (s_mask & treatment_mask).sum() / s_mask.sum() n_obs = outcomes.shape[0] n_loc = locations.shape[0] for i, outcome in enumerate(locations): for j in range(n_obs): s = strata[j] prediction[j, i] = (outcomes[j] <= outcome) / w_s[s] * treatment_mask[j] unconditional_pred = {s: prediction[s == strata].mean(axis=0) for s in s_list} conditional_prediction = np.array([unconditional_pred[s] for s in strata]) weights = np.array([w_s[s] for s in strata])[:, np.newaxis] prediction = ( (outcomes[:, np.newaxis] <= locations) - conditional_prediction ) / weights * treatment_mask[:, np.newaxis] + conditional_prediction return prediction.mean(axis=0), prediction, conditional_prediction def _compute_interval_probability( self, target_treatment_arm: int, locations: np.ndarray, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """Compute the interval probabilities. Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the interval probabilities. covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data Returns: Tuple of numpy arrays: - np.ndarray: Estimated unconditional interval probabilities. - np.ndarray: Adjusted for each observation. - np.ndarray: Conditional for each observation. """ n_records = outcomes.shape[0] n_loc = locations.shape[0] prediction = np.zeros((n_records, n_loc)) treatment_mask = treatment_arms == target_treatment_arm strata = self.strata s_list = np.unique(strata) w_s = {} for s in s_list: s_mask = strata == s w_s[s] = (s_mask & treatment_mask).sum() / s_mask.sum() n_obs = outcomes.shape[0] n_loc = locations.shape[0] for i, outcome in enumerate(locations): for j in range(n_obs): s = strata[j] prediction[j, i] = (outcomes[j] <= outcome) / w_s[s] * treatment_mask[j] unconditional_pred = {s: prediction[s == strata].mean(axis=0) for s in s_list} conditional_prediction = np.array([unconditional_pred[s] for s in strata]) weights = np.array([w_s[s] for s in strata])[:, np.newaxis] prediction = ( (outcomes[:, np.newaxis] <= locations) - conditional_prediction ) / weights * treatment_mask[:, np.newaxis] + conditional_prediction cdf = prediction.mean(axis=0) return ( cdf[1:] - cdf[:-1], prediction[:, 1:] - prediction[:, :-1], conditional_prediction[:, 1:] - conditional_prediction[:, :-1], )
[docs] class AdjustedStratifiedDistributionEstimator(DistributionEstimatorBase): """A class is for estimating the adjusted distribution function and computing the Distributional parameters for CAR.""" def __init__(self, base_model: Any, folds=3, is_multi_task=False): """ Initializes the AdjustedDistributionEstimator. Args: base_model (scikit-learn estimator): The base model implementing used for conditional distribution function estimators. The model should implement fit(data, targets) and predict_proba(data). folds (int): The number of folds for cross-fitting. is_multi_task(bool): Whether to use multi-task learning. If True, your base model needs to support multi-task prediction (n_samples, n_features) -> (n_samples, n_targets). Returns: AdjustedDistributionEstimator: An instance of the estimator. """ if (not hasattr(base_model, "predict")) and ( not hasattr(base_model, "predict_proba") ): raise ValueError( "Base model should implement either predict_proba or predict" ) self.base_model = base_model self.folds = folds self.is_multi_task = is_multi_task super().__init__()
[docs] def fit( self, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray, strata: np.ndarray, ) -> "DistributionEstimatorBase": """ Train the DistributionEstimatorBase. Args: covariates (np.ndarray): Pre-treatment covariates. treatment_arms (np.ndarray): The index of the treatment arm. outcomes (np.ndarray): Scalar-valued observed outcome. Returns: DistributionEstimatorBase: The fitted estimator. """ if covariates.shape[0] != treatment_arms.shape[0]: raise ValueError("The shape of covariates and treatment_arm should be same") if covariates.shape[0] != outcomes.shape[0]: raise ValueError("The shape of covariates and outcome should be same") self.covariates = covariates self.treatment_arms = treatment_arms self.outcomes = outcomes self.strata = strata return self
def _compute_cumulative_distribution( self, target_treatment_arm: int, locations: np.ndarray, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute the cumulative distribution values. Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data Returns: Tuple of numpy arrays: - np.ndarray: Unconditional cumulative distribution values. - np.ndarray: Adjusted cumulative distribution for each observation. - np.ndarray: Conditional cumulative distribution for each observation. """ n_records = outcomes.shape[0] n_loc = locations.shape[0] superset_prediction = np.zeros((n_records, n_loc)) prediction = np.zeros((n_records, n_loc)) treatment_mask = treatment_arms == target_treatment_arm folds = np.random.randint(self.folds, size=n_records) strata = self.strata s_list = np.unique(strata) if self.is_multi_task: binomial = (outcomes.reshape(-1, 1) <= locations) * 1 # (n_records, n_loc) for fold in range(self.folds): fold_mask = (folds != fold) & treatment_mask for s in s_list: s_mask = strata == s weight = (s_mask & treatment_mask).sum() / s_mask.sum() superset_mask = (folds == fold) & s_mask subset_train_mask = (folds != fold) & s_mask & treatment_mask covariates_train = covariates[subset_train_mask] binomial_train = binomial[subset_train_mask] if len(np.unique(binomial_train)) > 1: self.model = deepcopy(self.base_model) self.model.fit(covariates_train, binomial_train) pred = self._compute_model_prediction( self.model, covariates[superset_mask] ) prediction[superset_mask] = ( pred + treatment_mask[superset_mask].reshape(-1, 1) * (binomial[superset_mask] - pred) / weight ) superset_prediction[superset_mask] = pred else: for i, location in enumerate(locations): binomial = (outcomes <= location) * 1 # (n_records) for fold in range(self.folds): fold_mask = (folds != fold) & treatment_mask covariates_train = covariates[fold_mask] binomial_train = binomial[fold_mask] # Pool the records across strata and train the model if len(np.unique(binomial_train)) > 1: self.model = deepcopy(self.base_model) self.model.fit(covariates_train, binomial_train) for s in s_list: s_mask = strata == s weight = (s_mask & treatment_mask).sum() / s_mask.sum() superset_mask = (folds == fold) & s_mask subset_train_mask = (folds != fold) & s_mask & treatment_mask covariates_train = covariates[subset_train_mask] binomial_train = binomial[subset_train_mask] # TODO: revisit the logic here if len(np.unique(binomial_train)) > 1: # self.model = deepcopy(self.base_model) # self.model.fit(covariates_train, binomial_train) pass else: pred = binomial_train[0] superset_prediction[superset_mask, i] = pred prediction[superset_mask, i] = ( pred + treatment_mask[superset_mask] * (binomial[superset_mask] - pred) / weight ) continue pred = self._compute_model_prediction( self.model, covariates[superset_mask] ) prediction[superset_mask, i] = ( pred + treatment_mask[superset_mask] * (binomial[superset_mask] - pred) / weight ) superset_prediction[superset_mask, i] = pred return prediction.mean(axis=0), prediction, superset_prediction def _compute_interval_probability( self, target_treatment_arm: int, locations: np.ndarray, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.array, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute the interval probabilities. Args: target_treatment_arm (int): The index of the treatment arm. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. covariates: (np.ndarray): An array of covariates variables in the observed data. treatment_arm (np.ndarray): An array of treatment arms in the observed data. outcomes (np.ndarray): An array of outcomes in the observed data Returns: Tuple of numpy arrays: - np.ndarray: Unconditional interval probabilities. - np.ndarray: Adjusted interval probabilities for each observation. - np.ndarray: Conditional interval probabilities for each observation. """ n_records = outcomes.shape[0] n_loc = locations.shape[0] superset_prediction = np.zeros((n_records, n_loc - 1)) prediction = np.zeros((n_records, n_loc - 1)) treatment_mask = treatment_arms == target_treatment_arm folds = np.random.randint(self.folds, size=n_records) strata = self.strata s_list = np.unique(strata) binominals = (outcomes[:, np.newaxis] <= locations) * 1 # (n_records, n_loc) for i in range(len(locations) - 1): binomial = binominals[:, i + 1] - binominals[:, i] for fold in range(self.folds): fold_mask = (folds != fold) & treatment_mask covariates_train = covariates[fold_mask] binomial_train = binomial[fold_mask] if len(np.unique(binomial_train)) > 1: self.model = deepcopy(self.base_model) self.model.fit(covariates_train, binomial_train) for s in s_list: s_mask = strata == s wight = (s_mask & treatment_mask).sum() / s_mask.sum() superset_mask = (folds == fold) & s_mask subset_train_mask = (folds != fold) & s_mask & treatment_mask covariates_train = covariates[subset_train_mask] binomial_train = binomial[subset_train_mask] if len(np.unique(binomial_train)) == 1: pred = binomial_train[0] superset_prediction[superset_mask, i] = pred prediction[superset_mask, i] = ( pred + treatment_mask[superset_mask] * (binomial[superset_mask] - pred) / wight ) continue pred = self._compute_model_prediction( self.model, covariates[superset_mask] ) prediction[superset_mask, i] = ( pred + treatment_mask[superset_mask] * (binomial[superset_mask] - pred) / wight ) superset_prediction[superset_mask, i] = pred return prediction.mean(axis=0), prediction, superset_prediction def _compute_model_prediction(self, model, covariates: np.ndarray) -> np.ndarray: if hasattr(model, "predict_proba"): if self.is_multi_task: # suppose the shape of prediction is (n_records, n_locations) return model.predict_proba(covariates) probabilities = model.predict_proba(covariates) if probabilities.ndim == 1: # when the shape of prediction is (n_records) return probabilities # when the shape of prediction is (n_records, 2) return probabilities[:, 1] else: return model.predict(covariates)
[docs] class SimpleDistributionEstimator(SimpleStratifiedDistributionEstimator): """ A class for computing the empirical distribution function and distributional treatment effects using simple (unadjusted) estimation methods. This estimator computes Distribution Treatment Effects (DTE), Probability Treatment Effects (PTE), and Quantile Treatment Effects (QTE) without using machine learning models for adjustment. It provides a baseline approach suitable when treatment assignment is random or when covariate adjustment is not needed. Example: .. code-block:: python import numpy as np from dte_adj import SimpleDistributionEstimator # Generate sample data X = np.random.randn(1000, 5) D = np.random.binomial(1, 0.5, 1000) # Random treatment Y = X[:, 0] + 2 * D + np.random.randn(1000) # Fit simple estimator estimator = SimpleDistributionEstimator() estimator.fit(X, D, Y) # Compute treatment effects locations = np.linspace(Y.min(), Y.max(), 20) dte, lower, upper = estimator.predict_dte(1, 0, locations) pte, pte_lower, pte_upper = estimator.predict_pte(1, 0, locations) """ def __init__(self): """Initializes the SimpleDistributionEstimator. Returns: SimpleDistributionEstimator: An instance of the estimator. """ super().__init__()
[docs] def fit( self, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray ) -> "SimpleDistributionEstimator": """ Set parameters. Args: covariates (np.ndarray): Pre-treatment covariates. treatment_arms (np.ndarray): The index of the treatment arm. outcomes (np.ndarray): Scalar-valued observed outcome. Returns: SimpleDistributionEstimator: The fitted estimator. """ if covariates.shape[0] != treatment_arms.shape[0]: raise ValueError("The shape of covariates and treatment_arm should be same") if covariates.shape[0] != outcomes.shape[0]: raise ValueError("The shape of covariates and outcome should be same") self.covariates = covariates self.treatment_arms = treatment_arms self.outcomes = outcomes self.strata = np.zeros(len(self.covariates)) return self
[docs] class AdjustedDistributionEstimator(AdjustedStratifiedDistributionEstimator): """ A class for computing distribution treatment effects using machine learning adjustment. This estimator uses cross-fitting with ML models to adjust for confounding when computing Distribution Treatment Effects (DTE), Probability Treatment Effects (PTE), and Quantile Treatment Effects (QTE). It provides more precise estimates when treatment assignment depends on observed covariates. Example: .. code-block:: python import numpy as np from sklearn.ensemble import RandomForestClassifier from dte_adj import AdjustedDistributionEstimator # Generate confounded data X = np.random.randn(1000, 5) treatment_prob = 1 / (1 + np.exp(-(X[:, 0] + X[:, 1]))) D = np.random.binomial(1, treatment_prob, 1000) Y = X.sum(axis=1) + 2 * D + np.random.randn(1000) # Fit adjusted estimator base_model = RandomForestClassifier(n_estimators=100) estimator = AdjustedDistributionEstimator(base_model, folds=3) estimator.fit(X, D, Y) # Compute adjusted treatment effects locations = np.linspace(Y.min(), Y.max(), 20) dte, lower, upper = estimator.predict_dte(1, 0, locations, variance_type="moment") """
[docs] def fit( self, covariates: np.ndarray, treatment_arms: np.ndarray, outcomes: np.ndarray ) -> "AdjustedDistributionEstimator": """ Set parameters. Args: covariates (np.ndarray): Pre-treatment covariates. treatment_arms (np.ndarray): The index of the treatment arm. outcomes (np.ndarray): Scalar-valued observed outcome. Returns: AdjustedDistributionEstimator: The fitted estimator. """ if covariates.shape[0] != treatment_arms.shape[0]: raise ValueError("The shape of covariates and treatment_arm should be same") if covariates.shape[0] != outcomes.shape[0]: raise ValueError("The shape of covariates and outcome should be same") self.covariates = covariates self.treatment_arms = treatment_arms self.outcomes = outcomes self.strata = np.zeros(len(self.covariates)) return self
[docs] class SimpleLocalDistributionEstimator(SimpleStratifiedDistributionEstimator): """ A class for computing Local Distribution Treatment Effects (LDTE) and Local Probability Treatment Effects (LPTE) using simple empirical estimation. This estimator computes treatment effects that are weighted by treatment propensity within each stratum, providing estimates that are locally robust to treatment assignment heterogeneity across strata. It uses empirical methods without ML adjustment. """ def __init__(self): """ Initializes the SimpleLocalDistributionEstimator. Returns: SimpleLocalDistributionEstimator: An instance of the estimator. """ super().__init__()
[docs] def fit( self, covariates: np.ndarray, treatment_arms: np.ndarray, treatment_indicator: np.ndarray, outcomes: np.ndarray, strata: np.ndarray, ) -> "SimpleLocalDistributionEstimator": """ Train the SimpleLocalDistributionEstimator. Args: covariates (np.ndarray): Pre-treatment covariates. treatment_arms (np.ndarray): Treatment assignment variable (Z). treatment_indicator (np.ndarray): Treatment indicator variable (D). outcomes (np.ndarray): Scalar-valued observed outcome. strata (np.ndarray): Stratum indicators. Returns: SimpleLocalDistributionEstimator: The fitted estimator. """ super().fit(covariates, treatment_arms, outcomes, strata) self.treatment_indicator = treatment_indicator return self
[docs] def predict_ldte( self, target_treatment_arm: int, control_treatment_arm: int, locations: np.ndarray, alpha: float = 0.05, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute Local Distribution Treatment Effects (LDTE). LDTE measures the difference in cumulative distribution functions between treatment groups weighted by treatment propensity within each stratum. This provides estimates that are locally robust to treatment assignment heterogeneity across strata. Args: target_treatment_arm (int): The index of the treatment arm of the treatment group. control_treatment_arm (int): The index of the treatment arm of the control group. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing: - Expected LDTEs (np.ndarray): Local treatment effect estimates at each location - Lower bounds (np.ndarray): Lower confidence interval bounds - Upper bounds (np.ndarray): Upper confidence interval bounds Example: .. code-block:: python import numpy as np from sklearn.linear_model import LogisticRegression from dte_adj import AdjustedLocalDistributionEstimator # Generate sample data with strata np.random.seed(42) X = np.random.randn(1000, 5) strata = np.random.choice([0, 1], size=1000) # Binary strata D = np.random.binomial(1, 0.3 + 0.4 * strata, 1000) # Treatment depends on strata Y = X[:, 0] + 2 * D + strata + np.random.randn(1000) # Fit local estimator base_model = LogisticRegression() estimator = AdjustedLocalDistributionEstimator(base_model) estimator.fit(X, D, D, Y, strata) # treatment_arms = treatment_indicator for binary case # Compute LDTE locations = np.linspace(Y.min(), Y.max(), 20) ldte, lower, upper = estimator.predict_ldte( target_treatment_arm=1, control_treatment_arm=0, locations=locations ) print(f"LDTE shape: {ldte.shape}") # Should match locations.shape print(f"Average LDTE: {ldte.mean():.3f}") """ return compute_ldte( self, target_treatment_arm, control_treatment_arm, locations, alpha )
[docs] def predict_lpte( self, target_treatment_arm: int, control_treatment_arm: int, locations: np.ndarray, alpha: float = 0.05, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute Local Probability Treatment Effects (LPTE). LPTE measures the difference in probability mass between treatment groups for intervals weighted by treatment propensity within each stratum. This provides interval-based treatment effect estimates that are locally robust to treatment assignment heterogeneity. Args: target_treatment_arm (int): The index of the treatment arm of the treatment group. control_treatment_arm (int): The index of the treatment arm of the control group. locations (np.ndarray): Scalar values defining interval boundaries for probability computation. For each interval (locations[i], locations[i+1]], the LPTE is computed. alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing: - Expected LPTEs (np.ndarray): Local treatment effect estimates for each interval, shape (len(locations)-1,) - Lower bounds (np.ndarray): Lower confidence interval bounds - Upper bounds (np.ndarray): Upper confidence interval bounds Example: .. code-block:: python import numpy as np from sklearn.linear_model import LogisticRegression from dte_adj import SimpleLocalDistributionEstimator # Generate sample data with strata np.random.seed(42) X = np.random.randn(1000, 5) strata = np.random.choice([0, 1, 2], size=1000) # Multiple strata D = np.random.binomial(1, 0.2 + 0.3 * (strata == 1) + 0.4 * (strata == 2), 1000) Y = X[:, 0] + 1.5 * D + 0.5 * strata + np.random.randn(1000) # Fit simple local estimator estimator = SimpleLocalDistributionEstimator() estimator.fit(X, D, D, Y, strata) # Define interval boundaries locations = np.array([-2, -1, 0, 1, 2]) # Creates 4 intervals # Compute LPTE lpte, lower, upper = estimator.predict_lpte( target_treatment_arm=1, control_treatment_arm=0, locations=locations ) print(f"LPTE shape: {lpte.shape}") # Should be (4,) for 4 intervals print(f"Interval effects: {lpte}") """ return compute_lpte( self, target_treatment_arm, control_treatment_arm, locations, alpha )
[docs] class AdjustedLocalDistributionEstimator(AdjustedStratifiedDistributionEstimator): """ A class for computing Local Distribution Treatment Effects (LDTE) and Local Probability Treatment Effects (LPTE) using ML-adjusted estimation. This estimator combines local treatment effect estimation with machine learning adjustment, providing treatment effects that are both locally robust to treatment assignment heterogeneity and adjusted for confounding through observed covariates. It uses cross-fitting for more precise estimates in complex treatment assignment scenarios. """ def __init__(self, base_model: Any, folds=3, is_multi_task=False): """ Initializes the AdjustedLocalDistributionEstimator. Args: base_model (scikit-learn estimator): The base model implementing used for conditional distribution function estimators. folds (int): The number of folds for cross-fitting. is_multi_task(bool): Whether to use multi-task learning. Returns: AdjustedLocalDistributionEstimator: An instance of the estimator. """ super().__init__(base_model, folds, is_multi_task)
[docs] def fit( self, covariates: np.ndarray, treatment_arms: np.ndarray, treatment_indicator: np.ndarray, outcomes: np.ndarray, strata: np.ndarray, ) -> "AdjustedLocalDistributionEstimator": """ Train the AdjustedLocalDistributionEstimator. Args: covariates (np.ndarray): Pre-treatment covariates. treatment_arms (np.ndarray): Treatment assignment variable (Z). treatment_indicator (np.ndarray): Treatment indicator variable (D). outcomes (np.ndarray): Scalar-valued observed outcome. strata (np.ndarray): Stratum indicators. Returns: AdjustedLocalDistributionEstimator: The fitted estimator. """ super().fit(covariates, treatment_arms, outcomes, strata) self.treatment_indicator = treatment_indicator return self
[docs] def predict_ldte( self, target_treatment_arm: int, control_treatment_arm: int, locations: np.ndarray, alpha: float = 0.05, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute Local Distribution Treatment Effects (LDTE) with ML-adjusted estimation. LDTE measures the difference in cumulative distribution functions between treatment groups weighted by treatment propensity within each stratum, using ML models for adjustment. Currently, this API only supports analytical confidence interval. Args: target_treatment_arm (int): The index of the treatment arm of the treatment group. control_treatment_arm (int): The index of the treatment arm of the control group. locations (np.ndarray): Scalar values to be used for computing the cumulative distribution. alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing: - Expected LDTEs (np.ndarray): ML-adjusted local treatment effect estimates - Lower bounds (np.ndarray): Lower confidence interval bounds - Upper bounds (np.ndarray): Upper confidence interval bounds Example: .. code-block:: python import numpy as np from sklearn.ensemble import RandomForestClassifier from dte_adj import AdjustedLocalDistributionEstimator # Generate sample data with complex treatment assignment np.random.seed(42) X = np.random.randn(1000, 5) strata = np.random.choice([0, 1], size=1000) # Treatment depends on covariates and strata treatment_prob = 0.2 + 0.3 * (X[:, 0] > 0) + 0.2 * strata D = np.random.binomial(1, treatment_prob, 1000) Y = X.sum(axis=1) + 2 * D + strata + np.random.randn(1000) # Fit adjusted local estimator base_model = RandomForestClassifier(n_estimators=50, random_state=42) estimator = AdjustedLocalDistributionEstimator(base_model, folds=3) estimator.fit(X, D, D, Y, strata) # Compute LDTE locations = np.linspace(Y.min(), Y.max(), 15) ldte, lower, upper = estimator.predict_ldte( target_treatment_arm=1, control_treatment_arm=0, locations=locations ) print(f"ML-adjusted LDTE shape: {ldte.shape}") print(f"Average LDTE: {ldte.mean():.3f}") """ return compute_ldte( self, target_treatment_arm, control_treatment_arm, locations, alpha )
[docs] def predict_lpte( self, target_treatment_arm: int, control_treatment_arm: int, locations: np.ndarray, alpha: float = 0.05, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Compute Local Probability Treatment Effects (LPTE) with ML-adjusted estimation. LPTE measures the difference in probability mass between treatment groups for intervals weighted by treatment propensity within each stratum, using ML models for adjustment. Currently, this API only supports analytical confidence interval. Args: target_treatment_arm (int): The index of the treatment arm of the treatment group. control_treatment_arm (int): The index of the treatment arm of the control group. locations (np.ndarray): Scalar values defining interval boundaries for probability computation. For each interval (locations[i], locations[i+1]], the LPTE is computed. alpha (float, optional): Significance level of the confidence bound. Defaults to 0.05. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray]: A tuple containing: - Expected LPTEs (np.ndarray): ML-adjusted local treatment effect estimates, shape (len(locations)-1,) - Lower bounds (np.ndarray): Lower confidence interval bounds - Upper bounds (np.ndarray): Upper confidence interval bounds Example: .. code-block:: python import numpy as np from sklearn.ensemble import GradientBoostingClassifier from dte_adj import AdjustedLocalDistributionEstimator # Generate sample data with confounding np.random.seed(42) X = np.random.randn(1000, 5) strata = np.random.choice([0, 1, 2], size=1000) # Complex treatment assignment mechanism logit_score = X[:, 0] + 0.5 * X[:, 1] + strata treatment_prob = 1 / (1 + np.exp(-logit_score)) D = np.random.binomial(1, treatment_prob, 1000) Y = X.sum(axis=1) + 1.5 * D + 0.3 * strata + np.random.randn(1000) # Fit adjusted estimator with gradient boosting base_model = GradientBoostingClassifier(n_estimators=100, random_state=42) estimator = AdjustedLocalDistributionEstimator(base_model, folds=5) estimator.fit(X, D, D, Y, strata) # Define intervals and compute LPTE locations = np.array([-3, -1, 0, 1, 3]) # 4 intervals lpte, lower, upper = estimator.predict_lpte( target_treatment_arm=1, control_treatment_arm=0, locations=locations ) print(f"ML-adjusted LPTE shape: {lpte.shape}") # Should be (4,) print(f"Interval effects: {lpte}") """ return compute_lpte( self, target_treatment_arm, control_treatment_arm, locations, alpha )