Source code for sacroml.attacks.worst_case_attack

"""Run a worst case attack based upon predictive probabilities."""

from __future__ import annotations

import logging
from collections.abc import Callable, Iterable
from typing import Any

import numpy as np
from fpdf import FPDF
from sklearn.base import BaseEstimator
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import (
    GridSearchCV,
    RandomizedSearchCV,
    StratifiedShuffleSplit,
    train_test_split,
)

from sacroml import metrics
from sacroml.attacks import report
from sacroml.attacks._scorers import resolve_scorer
from sacroml.attacks.attack import Attack
from sacroml.attacks.target import Target
from sacroml.attacks.utils import get_class_by_name

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

P_THRESH = 0.05

_DEFAULT_PARAM_GRIDS: dict[str, dict[str, list]] = {
    "sklearn.ensemble.RandomForestClassifier": {
        "min_samples_split": [2, 10, 20],
        "min_samples_leaf": [1, 5, 10],
        "max_depth": [None, 5, 10],
    },
    "sklearn.neural_network.MLPClassifier": {
        "hidden_layer_sizes": [(50,), (100,), (50, 50)],
        "alpha": [1e-4, 1e-3, 1e-2],
    },
    "sklearn.linear_model.LogisticRegression": {
        "C": [0.01, 0.1, 1.0, 10.0],
    },
}

# Default hyperparameters used when attack_model is RandomForestClassifier
# and the user did not supply `attack_model_params`.
_DEFAULT_RF_PARAMS: dict[str, object] = {
    "min_samples_split": 20,
    "min_samples_leaf": 10,
    "max_depth": 5,
}


[docs] class WorstCaseAttack(Attack): """Worst case attack."""
[docs] def __init__( self, output_dir: str = "outputs", write_report: bool = True, n_reps: int = 10, reproduce_split: int | Iterable[int] | None = 5, p_thresh: float = 0.05, n_dummy_reps: int = 1, train_beta: int = 1, test_beta: int = 1, test_prop: float = 0.2, include_model_correct_feature: bool = False, sort_probs: bool = True, attack_model: str = "sklearn.ensemble.RandomForestClassifier", attack_model_params: dict[str, object] | None = None, attack_model_param_grid: dict | list[dict] | str | None = None, search_type: str = "grid", search_n_iter: int = 10, tuning_metric: str | Callable = "AUC", report_individual: bool = False, ) -> None: """Construct an object to execute a worst case attack. Parameters ---------- output_dir : str Name of the directory where outputs are stored. write_report : bool Whether to generate a JSON and PDF report. n_reps : int Number of attacks to run -- in each iteration an attack model is trained on a different subset of the data. reproduce_split : int or Iterable[int] or None Variable that controls the reproducibility of the data split. It can be an integer or a list of integers of length `n_reps`. Default : 5. p_thresh : float Threshold to determine significance of things. For instance `auc_p_value` and `pdif_vals`. n_dummy_reps : int Number of baseline (dummy) experiments to do. train_beta : int Value of b for beta distribution used to sample the in-sample (training) probabilities. test_beta : int Value of b for beta distribution used to sample the out-of-sample (test) probabilities. test_prop : float Proportion of data to use as a test set for the attack model. include_model_correct_feature : bool Inclusion of additional feature to hold whether or not the target model made a correct prediction for each example. sort_probs : bool Whether to sort combined preds (from training and test) to have highest probabilities in the first column. attack_model : str Class name of the attack model. attack_model_params : dict or None Dictionary of hyperparameters for the `attack_model` such as `min_sample_split`, `min_samples_leaf`, etc. attack_model_param_grid : dict, list[dict], "default", or None If ``None`` (default) no tuning is performed and behaviour matches earlier versions. If ``"default"``, a built-in grid for the configured ``attack_model`` is used (see ``_DEFAULT_PARAM_GRIDS``). Otherwise pass a sklearn-style grid (a dict or list of dicts). search_type : str ``"grid"`` (default) or ``"random"`` to select between :class:`~sklearn.model_selection.GridSearchCV` and :class:`~sklearn.model_selection.RandomizedSearchCV`. Ignored when ``attack_model_param_grid`` is ``None``. search_n_iter : int Number of parameter settings sampled when ``search_type='random'``. Ignored otherwise. tuning_metric : str or callable Scoring metric used by the search. Defaults to ``"AUC"``. Accepts any key in :data:`sacroml.attacks._scorers.SCORERS`, any sklearn scoring string, or a custom callable following the sklearn ``(estimator, X, y)`` protocol. report_individual : bool Whether to expose per-record membership probabilities in the output. When True, each repetition's metrics dict gains an ``"individual"`` key holding ``"member_prob"`` (the attack classifier's probability of membership for each test sample) and ``"member"`` (the ground truth label). The arrays are sized to the attack-model test slice, not the full target training set. """ super().__init__(output_dir=output_dir, write_report=write_report) self.n_reps: int = n_reps self.reproduce_split: int | Iterable[int] | None = reproduce_split self.p_thresh: float = p_thresh self.n_dummy_reps: int = n_dummy_reps self.train_beta: int = train_beta self.test_beta: int = test_beta self.test_prop: float = test_prop self.include_model_correct_feature: bool = include_model_correct_feature self.sort_probs: bool = sort_probs self.attack_model: str = attack_model self.attack_model_params: dict[str, object] | None = attack_model_params self.attack_model_param_grid: dict | list[dict] | str | None = ( attack_model_param_grid ) self.search_type: str = search_type if not isinstance(search_n_iter, int) or search_n_iter < 1: msg = f"search_n_iter must be a positive integer; got {search_n_iter!r}." raise ValueError(msg) self.search_n_iter: int = search_n_iter try: self._resolved_tuning_scorer: str | Callable = resolve_scorer(tuning_metric) self.tuning_metric: str | Callable = tuning_metric except ValueError as exc: logger.warning( "Invalid tuning_metric %r (%s); falling back to 'AUC'.", tuning_metric, exc, ) self.tuning_metric = "AUC" self._resolved_tuning_scorer = resolve_scorer("AUC") self.report_individual: bool = report_individual self.dummy_attack_metrics: list = [] self._tuned_params: dict | None = None self._tuning_info: dict | None = None
def __str__(self) -> str: """Return name of attack.""" return "WorstCase attack"
[docs] @classmethod def attackable(cls, target: Target) -> bool: # pragma: no cover """Return whether a target can be assessed with WorstCaseAttack.""" required_methods: list[str] = ["predict_proba", "predict"] if ( target.has_model() and target.has_data() and all(hasattr(target.model, method) for method in required_methods) ) or target.has_probas(): return True logger.info("WARNING: WorstCaseAttack requires more Target details.") return False
def _attack(self, target: Target) -> dict: """Run worst case attack. Parameters ---------- target : attacks.target.Target target as a Target class object Returns ------- dict Attack report. """ train_c: np.ndarray | None = None test_c: np.ndarray | None = None # compute target model probas if possible if target.has_model() and target.has_data(): # pragma: no cover proba_train = target.model.predict_proba(target.X_train) proba_test = target.model.predict_proba(target.X_test) if self.include_model_correct_feature: train_c = 1 * (target.y_train == target.model.predict(target.X_train)) test_c = 1 * (target.y_test == target.model.predict(target.X_test)) # use supplied target model probas if unable to compute else: proba_train = target.proba_train proba_test = target.proba_test # execute attack self.attack_from_preds( proba_train, proba_test, train_correct=train_c, test_correct=test_c, ) # create the report output: dict[str, Any] = self._make_report(target) # write the report self._write_report(output) # return the report return output def _make_report(self, target: Target) -> dict[str, Any]: """Create attack report.""" output = super()._make_report(target) output["dummy_attack_experiments_logger"] = ( self._get_dummy_attack_metrics_experiments_instances() ) return output
[docs] def attack_from_preds( self, proba_train: np.ndarray, proba_test: np.ndarray, train_correct: np.ndarray | None = None, test_correct: np.ndarray | None = None, ) -> None: """Run attack based upon the predictions in proba_train and proba_test. Parameters ---------- proba_train : np.ndarray Array of train predictions. One row per example, one column per class. proba_test : np.ndarray Array of test predictions. One row per example, one column per class. """ logger.info("Running main attack repetitions") attack_metric_dict = self.run_attack_reps( proba_train, proba_test, train_correct=train_correct, test_correct=test_correct, ) self.attack_metrics = attack_metric_dict["mia_metrics"] self.dummy_attack_metrics = [] if self.n_dummy_reps > 0: logger.info("Running dummy attack reps") n_train_rows = len(proba_train) n_test_rows = len(proba_test) for _ in range(self.n_dummy_reps): d_train_preds, d_test_preds = self.generate_arrays( n_train_rows, n_test_rows, self.train_beta, self.test_beta, ) temp_attack_metric_dict = self.run_attack_reps( d_train_preds, d_test_preds ) temp_metrics = temp_attack_metric_dict["mia_metrics"] self.dummy_attack_metrics.append(temp_metrics) logger.info("Finished running attacks")
def _prepare_attack_data( self, proba_train: np.ndarray, proba_test: np.ndarray, train_correct: np.ndarray = None, test_correct: np.ndarray = None, ) -> tuple[np.ndarray, np.ndarray]: """Prepare training data and labels for attack model. Combines the train and test preds into a single numpy array (optionally) sorting each row to have the highest probabilities in the first column. Constructs a label array that has ones corresponding to training rows and zeros to testing rows. """ if self.sort_probs: logger.info("Sorting probabilities to leave highest value in first column") proba_train = -np.sort(-proba_train, axis=1) proba_test = -np.sort(-proba_test, axis=1) logger.info("Creating MIA data") if self.include_model_correct_feature and train_correct is not None: proba_train = np.hstack((proba_train, train_correct[:, None])) proba_test = np.hstack((proba_test, test_correct[:, None])) mi_x: np.ndarray = np.vstack((proba_train, proba_test)) mi_y: np.ndarray = np.hstack( (np.ones(len(proba_train)), np.zeros(len(proba_test))) ) return (mi_x, mi_y) def _get_attack_model(self) -> BaseEstimator: """Return an instantiated attack model. After tuning has run, ``self._tuned_params`` is preferred over the constructor-supplied ``attack_model_params``. """ model = get_class_by_name(self.attack_model) if self._tuned_params is not None: return model(**self._tuned_params) params: dict[str, object] | None = self.attack_model_params # set custom default parameters for RF attack model if ( params is None and self.attack_model == "sklearn.ensemble.RandomForestClassifier" ): params = _DEFAULT_RF_PARAMS # Fallthrough: only reached when attack_model is not RF AND the user # did not supply attack_model_params -- instantiate with sklearn defaults. return model(**params) if params is not None else model() def _make_rep_splitter(self, random_state: int) -> StratifiedShuffleSplit: """Splitter used by both the tuning search and the rep loop. Both call sites must produce byte-identical folds for the "tuning counts toward n_reps" property to hold; centralising construction here prevents the two from drifting. """ return StratifiedShuffleSplit( n_splits=self.n_reps, test_size=self.test_prop, random_state=random_state, ) def _resolve_param_grid(self) -> dict | list[dict] | None: """Return the parameter grid, or ``None`` if tuning is disabled.""" grid = self.attack_model_param_grid if grid is None: return None if isinstance(grid, str): if grid == "default": if self.attack_model not in _DEFAULT_PARAM_GRIDS: available = sorted(_DEFAULT_PARAM_GRIDS.keys()) msg = ( f"No default param grid for attack_model " f"{self.attack_model!r}; defaults are available for: " f"{available}. Pass an explicit grid instead." ) raise ValueError(msg) return _DEFAULT_PARAM_GRIDS[self.attack_model] msg = ( "attack_model_param_grid must be a dict, list of dicts, " f'"default", or None; got string {grid!r}.' ) raise ValueError(msg) return grid def _tune_if_needed( self, mi_x: np.ndarray, mi_y: np.ndarray, random_state: int ) -> None: """Run the tuning search if a grid is configured. A no-op when tuning has already been performed on this instance, so dummy-attack repetitions reuse the params found by the real run. """ if self._tuned_params is not None: return grid = self._resolve_param_grid() if grid is None: return if self.n_reps < 2: msg = ( "Tuning requires n_reps >= 2 because cross-validation needs " f"at least 2 folds; got n_reps={self.n_reps}." ) raise ValueError(msg) splitter = self._make_rep_splitter(random_state) scorer = self._resolved_tuning_scorer base_estimator = self._get_attack_model() if self.search_type == "grid": search = GridSearchCV( base_estimator, grid, cv=splitter, scoring=scorer, refit=False, n_jobs=1, ) elif self.search_type == "random": search = RandomizedSearchCV( base_estimator, grid, n_iter=self.search_n_iter, cv=splitter, scoring=scorer, refit=False, random_state=random_state, n_jobs=1, ) else: msg = ( f"Unknown search_type {self.search_type!r}; " "expected 'grid' or 'random'." ) raise ValueError(msg) logger.info("Tuning attack model via %s search over %s", self.search_type, grid) search.fit(mi_x, mi_y) self._tuned_params = dict(search.best_params_) cv_results = search.cv_results_ candidates: list[dict] = [ { "params": dict(params), "mean_test_score": float(mean), "std_test_score": float(std), "rank_test_score": int(rank), } for params, mean, std, rank in zip( cv_results["params"], cv_results["mean_test_score"], cv_results["std_test_score"], cv_results["rank_test_score"], strict=True, ) ] best_idx = int(np.flatnonzero(cv_results["rank_test_score"] == 1)[0]) best_candidate_per_fold_scores = [ float(cv_results[f"split{i}_test_score"][best_idx]) for i in range(self.n_reps) ] self._tuning_info = { "best_params": dict(search.best_params_), "best_score": float(search.best_score_), "best_candidate_per_fold_scores": best_candidate_per_fold_scores, "tuning_metric": ( self.tuning_metric if isinstance(self.tuning_metric, str) else getattr(self.tuning_metric, "__name__", repr(self.tuning_metric)) ), "search_type": self.search_type, "param_grid": grid, "n_candidates": len(candidates), "cv_results": candidates, } logger.info( "Tuning best_params=%s best_score=%.4f", self._tuned_params, search.best_score_, ) def _get_reproducible_split(self) -> list: """Return a list of splits.""" split: int | Iterable[int] | None = self.reproduce_split n_reps = self.n_reps if isinstance(split, int): split = [split] + [x**2 for x in range(split, split + n_reps - 1)] else: # remove potential duplicates split = list(dict.fromkeys(split)) if len(split) == n_reps: pass elif len(split) > n_reps: print("split", split, "nreps", n_reps) split = list(split)[0:n_reps] print( "WARNING: the length of the parameter 'reproduce_split' " "is longer than n_reps. Values have been removed." ) else: # assign values to match length of n_reps split += [split[-1] * x for x in range(2, (n_reps - len(split) + 2))] print( "WARNING: the length of the parameter 'reproduce_split' " "is shorter than n_reps. Values have been added." ) print("reproduce split now", split) return split
[docs] def run_attack_reps( self, proba_train: np.ndarray, proba_test: np.ndarray, train_correct: np.ndarray = None, test_correct: np.ndarray = None, ) -> dict: """Run actual attack reps from train and test predictions. Parameters ---------- proba_train : np.ndarray Predictions from the model on training (in-sample) data. proba_test : np.ndarray Predictions from the model on testing (out-of-sample) data. Returns ------- dict Dictionary of mia_metrics (a list of metric across repetitions). """ mi_x, mi_y = self._prepare_attack_data( proba_train, proba_test, train_correct, test_correct ) split = self._get_reproducible_split() self._tune_if_needed(mi_x, mi_y, random_state=split[0]) if self._tuned_params is not None: # Use the same CV splits the search ran over so the "reps" and # the "tuning CV folds" coincide (the tuning counts toward n_reps). splitter = self._make_rep_splitter(split[0]) fold_splits = list(splitter.split(mi_x, mi_y)) else: # Legacy path: variable random states per rep (one resample each). indices = np.arange(len(mi_y)) fold_splits = [ train_test_split( indices, test_size=self.test_prop, stratify=mi_y, random_state=split[rep], shuffle=True, ) for rep in range(self.n_reps) ] mia_metrics: list[dict] = [] for rep, (train_idx, test_idx) in enumerate(fold_splits): logger.info("Rep %d of %d", rep + 1, self.n_reps) mi_train_x, mi_test_x = mi_x[train_idx], mi_x[test_idx] mi_train_y, mi_test_y = mi_y[train_idx], mi_y[test_idx] attack_classifier = self._get_attack_model() attack_classifier.fit(mi_train_x, mi_train_y) y_pred_proba = attack_classifier.predict_proba(mi_test_x) mia_metrics.append(metrics.get_metrics(y_pred_proba, mi_test_y)) if self.report_individual: mia_metrics[-1]["individual"] = { "member_prob": y_pred_proba[:, 1].tolist(), "member": np.asarray(mi_test_y).tolist(), } if self.include_model_correct_feature and train_correct is not None: # Compute the Yeom TPR and FPR yeom_preds = mi_test_x[:, -1] tn, fp, fn, tp = confusion_matrix(mi_test_y, yeom_preds).ravel() mia_metrics[-1]["yeom_tpr"] = tp / (tp + fn) mia_metrics[-1]["yeom_fpr"] = fp / (fp + tn) mia_metrics[-1]["yeom_advantage"] = ( mia_metrics[-1]["yeom_tpr"] - mia_metrics[-1]["yeom_fpr"] ) logger.info("Finished simulating attacks") return {"mia_metrics": mia_metrics}
def _get_global_metrics(self, attack_metrics: list) -> dict: """Summarise metrics from a metric list. Parameters ---------- attack_metrics : List list of attack metrics dictionaries Returns ------- global_metrics : Dict Dictionary of summary metrics """ global_metrics = {} if attack_metrics is not None and len(attack_metrics) != 0: auc_p_vals = [ metrics.auc_p_val( m["AUC"], m["n_pos_test_examples"], m["n_neg_test_examples"] )[0] for m in attack_metrics ] m = attack_metrics[0] _, auc_std = metrics.auc_p_val( 0.5, m["n_pos_test_examples"], m["n_neg_test_examples"] ) global_metrics["null_auc_3sd_range"] = ( f"{0.5 - 3 * auc_std:.4f} -> {0.5 + 3 * auc_std:.4f}" ) global_metrics["n_sig_auc_p_vals"] = self._get_n_significant( auc_p_vals, self.p_thresh ) global_metrics["n_sig_auc_p_vals_corrected"] = self._get_n_significant( auc_p_vals, self.p_thresh, bh_fdr_correction=True ) pdif_vals = [np.exp(-m["PDIF01"]) for m in attack_metrics] global_metrics["n_sig_pdif_vals"] = self._get_n_significant( pdif_vals, self.p_thresh ) global_metrics["n_sig_pdif_vals_corrected"] = self._get_n_significant( pdif_vals, self.p_thresh, bh_fdr_correction=True ) return global_metrics def _get_n_significant( self, p_val_list: list[float], p_thresh: float, bh_fdr_correction: bool = False ) -> int: """Return number of p-values significant at `p_thresh`. Can perform multiple testing correction. """ if not bh_fdr_correction: return sum(1 for p in p_val_list if p <= p_thresh) p_val_list = np.asarray(sorted(p_val_list)) n_vals = len(p_val_list) hoch_vals = np.array([(k / n_vals) * P_THRESH for k in range(1, n_vals + 1)]) bh_sig_list = p_val_list <= hoch_vals return np.where(bh_sig_list)[0].max() + 1 if any(bh_sig_list) else 0 def _generate_array(self, n_rows: int, beta: float) -> np.ndarray: """Generate array of predictions, used when doing baseline experiments. Parameters ---------- n_rows : int The number of rows worth of data to generate. beta : float The beta parameter for sampling probabilities. Returns ------- preds : np.ndarray Array of predictions. Two columns, `n_rows` rows. """ preds = np.zeros((n_rows, 2), float) for row_idx in range(n_rows): train_class = np.random.choice(2) train_prob = np.random.beta(1, beta) preds[row_idx, train_class] = train_prob preds[row_idx, 1 - train_class] = 1 - train_prob return preds
[docs] def generate_arrays( self, n_rows_in: int, n_rows_out: int, train_beta: float = 2, test_beta: float = 2, ) -> tuple[np.ndarray, np.ndarray]: """Generate train and test prediction arrays, used when computing baseline. Parameters ---------- n_rows_in : int Number of rows of in-sample (training) probabilities. n_rows_out : int Number of rows of out-of-sample (testing) probabilities. train_beta : float Beta value for generating train probabilities. test_beta : float: Beta value for generating test probabilities. Returns ------- proba_train : np.ndarray Array of train predictions (n_rows x 2 columns). proba_test : np.ndarray Array of test predictions (n_rows x 2 columns). """ proba_train = self._generate_array(n_rows_in, train_beta) proba_test = self._generate_array(n_rows_out, test_beta) return proba_train, proba_test
def _construct_metadata(self) -> None: """Construct the metadata object after attacks.""" super()._construct_metadata() self.metadata["global_metrics"] = self._get_global_metrics(self.attack_metrics) self.metadata["baseline_global_metrics"] = self._get_global_metrics( self._unpack_dummy_attack_metrics_experiments_instances() ) if self._tuning_info is not None: self.metadata["tuning"] = self._tuning_info def _unpack_dummy_attack_metrics_experiments_instances(self) -> list: """Construct the metadata object after attacks.""" dummy_attack_metrics_instances = [] for exp_rep, _ in enumerate(self.dummy_attack_metrics): temp_dummy_attack_metrics = self.dummy_attack_metrics[exp_rep] dummy_attack_metrics_instances += temp_dummy_attack_metrics return dummy_attack_metrics_instances def _get_attack_metrics_instances(self) -> dict: """Construct the metadata object after attacks.""" attack_metrics_experiment = {} attack_metrics_instances = {} for rep, _ in enumerate(self.attack_metrics): attack_metrics_instances["instance_" + str(rep)] = self.attack_metrics[rep] attack_metrics_experiment["attack_instance_logger"] = attack_metrics_instances return attack_metrics_experiment def _get_dummy_attack_metrics_experiments_instances(self) -> dict: """Construct the metadata object after attacks.""" dummy_attack_metrics_experiments = {} for exp_rep, _ in enumerate(self.dummy_attack_metrics): temp_dummy_attack_metrics = self.dummy_attack_metrics[exp_rep] dummy_attack_metric_instances = {} for rep, _ in enumerate(temp_dummy_attack_metrics): dummy_attack_metric_instances["instance_" + str(rep)] = ( temp_dummy_attack_metrics[rep] ) temp = {} temp["attack_instance_logger"] = dummy_attack_metric_instances dummy_attack_metrics_experiments[ "dummy_attack_metrics_experiment_" + str(exp_rep) ] = temp return dummy_attack_metrics_experiments def _make_pdf(self, output: dict) -> FPDF: """Create PDF report.""" return report.create_mia_report(output)