"""Code for automatic report generation."""
import json
import os
from typing import Any
import numpy as np
import pylab as plt
from fpdf import FPDF
from pypdf import PdfWriter
from sacroml.attacks.attack_report_formatter import GenerateJSONModule
# Adds a border to all pdf cells of set to 1 -- useful for debugging
BORDER = 0
DISPLAY_METRICS = [
"AUC",
"ACC",
"Advantage",
"FDIF01",
"PDIF01",
"TPR@0.1",
"TPR@0.01",
"TPR@0.001",
"TPR@1e-05",
]
MAPPINGS = {"PDIF01": lambda x: np.exp(-x)}
INTRODUCTION = (
"This report provides a summary of a series of simulated attack experiments "
"performed on the model outputs provided. An attack model is trained to "
"attempt to distinguish between outputs from training (in-sample) and "
"testing (out-of-sample) data. The metrics below describe the success of "
"this classifier. A successful classifier indicates that the original model "
"is unsafe and should not be allowed to be released from the TRE.\n In "
"particular, the simulation splits the data provided into test and train "
"sets (each will in- and out-of-sample examples). The classifier is trained "
"on the train set and evaluated on the test set. This is repeated with "
"different train/test splits a user-specified number of times.\n To help "
"place the results in context, the code may also have run a series of "
"baseline experiments. In these, random model outputs for hypothetical in- "
"and out-of-sample data are generated with identical statistical properties. "
"In these baseline cases, there is no signal that an attacker could leverage "
"and therefore these values provide a baseline against which the actual "
"values can be compared.\n For some metrics (FDIF and AUC), we are able to "
"compute p-values. In each case, shown below (in the Global metrics "
"sections) is the number of repetitions that exceeded the p-value threshold "
"both without, and with correction for multiple testing (Benjamini-Hochberg "
"procedure).\n ROC curves for all real (red) and dummy (blue) repetitions are "
"provided. These are shown in log space (as reommended here [ADD URL]) to "
"emphasise the region in which risk is highest -- the bottom left (are high "
"true positive rates possible with low false positive rates).\n A description "
"of the metrics and how to interpret them within the context of an attack is "
"given below."
)
LOGROC_CAPTION = (
"This plot shows the False Positive Rate (x) versus the True Positive Rate "
"(y). The axes are in log space enabling us to focus on areas where the "
"False Positive Rate is low (left hand area). Curves above the y = x line "
"(black dashes) in this region represent a disclosure risk as an attacker "
"can obtain many more true than false positives. The solid coloured lines "
"show the curves for the attack simulations with the true model outputs. The "
"lighter grey lines show the curves for randomly generated outputs with no "
"structure (i.e. in- and out-of- sample predictions are generated from the "
"same distributions. Solid curves consistently higher than the "
"grey curves in the left hand part of the plot are a sign of concern. "
)
MIA_GLOSSARY = {
"AUC": "Area Under the ROC curve",
"True Positive Rate (TPR)": (
"The true positive rate is the number of True Positives that are "
"predicted as positive as a proportion of the total number of positives. "
"If an attacker has N examples that were actually in the training set, "
"the TPR is the proportion of these that they predict as being in the "
"training set."
),
"ACC": "The proportion of predictions that the attacker makes that are correct.",
}
QMIA_INTRODUCTION = (
"This report summarises a Quantile Membership Inference Attack (QMIA) "
"based on Bertran et al., NeurIPS 2023 (arXiv:2307.03694). A quantile "
"regressor is trained on the non-member (test) set to learn a per-sample "
"threshold for the hinge confidence score. A sample is predicted as a "
"training-set member when its observed score exceeds the predicted "
"threshold. The attack is calibrated so that the false-positive rate on "
"non-members approximates the target alpha."
)
STRUCTURAL_INTRODUCTION = (
"This report provides a summary of a series of 'static' structural "
"attacks. These attacks do not require training a separate attack model, "
"but instead analyse the properties of the target model and its training "
"data to identify potential disclosure risks based on pre-defined rules "
"and thresholds. A 'risk detected' result for any of the checks below "
"indicates a potential vulnerability that should be investigated."
)
STRUCTURAL_GLOSSARY = {
"dof_risk": (
"Degrees of Freedom (DoF) Risk: Checks if the model is overly complex "
"relative to the size of the training data. A model with too many "
"parameters (low residual DoF) can essentially memorize training "
"records, making it vulnerable to membership inference."
),
"k_anonymity_risk": (
"K-Anonymity Risk: This checks if any training data points fall into "
"very small groups (equivalence classes) based on the model's "
"predictions. If a group has fewer than 'k' members, those "
"individuals are more easily re-identifiable."
),
"class_disclosure_risk": (
"Class Disclosure Risk: Assesses whether the model's outputs might "
"inadvertently reveal the presence of small, disclosive groups within "
"the data, especially in combination with known class labels."
),
"unnecessary_risk": (
"Unnecessary Risk: Checks if the model's hyperparameters (e.g., max "
"tree depth) are set to values known to be associated with higher "
"membership inference risk, without necessarily providing a "
"commensurate increase in accuracy."
),
"lowvals_cd_risk": (
"Low Values Class Disclosure Risk: A specific check for class "
"disclosure where the frequency of a particular class within an "
"equivalence class is below a safe threshold."
),
}
def _sanitise_floats(obj: Any) -> Any:
"""Recursively replace non-finite floats with None.
Parameters
----------
obj : Any
Object to sanitise.
Returns
-------
Any
Sanitised object with non-finite floats replaced by None.
"""
if isinstance(obj, float) and (np.isnan(obj) or np.isinf(obj)):
return None
if isinstance(obj, np.ndarray):
return _sanitise_floats(obj.tolist())
if isinstance(obj, dict):
return {k: _sanitise_floats(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_sanitise_floats(v) for v in obj]
return obj
def _strip_keys(obj: Any, exclude_keys: frozenset[str]) -> Any:
"""Recursively remove specified keys from nested dicts.
Creates new containers so the original object is not mutated.
Parameters
----------
obj : Any
Object to filter.
exclude_keys : frozenset[str]
Set of dictionary keys to exclude.
Returns
-------
Any
Filtered object with specified keys removed.
"""
if isinstance(obj, dict):
return {
k: _strip_keys(v, exclude_keys)
for k, v in obj.items()
if k not in exclude_keys
}
if isinstance(obj, list):
return [_strip_keys(v, exclude_keys) for v in obj]
return obj
def _externalise_arrays(
output: dict, dest: str, exclude_keys: frozenset[str]
) -> dict[str, str]:
"""Move large per-instance arrays out of the JSON into compressed .npz.
For each instance under ``attack_experiment_logger.attack_instance_logger``,
write the values of ``exclude_keys`` -- the same keys about to be stripped
from the JSON -- to a single compressed ``.npz`` next to the JSON, and
return ``{instance_key: filename}`` pointers. A key whose value is a nested
dict (e.g. the per-record ``individual`` block) is flattened, storing each
field under a ``key.field`` name. ``output`` is not mutated, so PDF
generation (which runs afterwards) still sees the full arrays in memory.
Driving externalisation off the same ``exclude_keys`` set keeps a single
source of truth: any key removed from the JSON is guaranteed a home in the
sidecar, with no second list to keep in sync.
Parameters
----------
output : dict
Attack output dictionary.
dest : str
Destination path (without extension); the .npz files are written
alongside it.
exclude_keys : frozenset[str]
Per-instance keys to externalise (the same set stripped from the JSON).
Returns
-------
dict[str, str]
Mapping of instance key to the relative .npz filename written.
"""
dest_dir: str = os.path.dirname(dest) or "."
base: str = os.path.basename(dest) or "report"
# log_id is the attack's stable per-instance uuid; including it keeps two
# runs (or two attacks) writing into the same directory from clobbering.
log_id: str = str(output.get("log_id", ""))[:8]
instances: dict = output.get("attack_experiment_logger", {}).get(
"attack_instance_logger", {}
)
pointers: dict[str, str] = {}
for inst_key, inst in instances.items():
if not isinstance(inst, dict):
continue
arrays: dict[str, np.ndarray] = {}
for key in exclude_keys:
val = inst.get(key)
if val is None:
continue
if isinstance(val, dict):
# Nested per-record block (e.g. `individual`): flatten each
# field under a `key.field` name so it round-trips from the npz.
for sub_key, sub_val in val.items():
arrays[f"{key}.{sub_key}"] = np.asarray(sub_val)
else:
arrays[key] = np.asarray(val)
if not arrays:
continue
suffix: str = f"_{log_id}" if log_id else ""
fname: str = f"{base}_arrays{suffix}_{inst_key}.npz"
np.savez_compressed(os.path.join(dest_dir, fname), **arrays)
pointers[inst_key] = fname
return pointers
[docs]
def write_json(
output: dict, dest: str, exclude_keys: frozenset[str] = frozenset()
) -> None:
"""Write attack report to JSON.
When ``exclude_keys`` is non-empty, the large per-instance arrays it names
are first externalised to a sidecar ``.npz`` (see ``_externalise_arrays``),
then stripped from the JSON and replaced with an ``arrays_file`` pointer.
Parameters
----------
output : dict
Attack output dictionary.
dest : str
Destination path (without extension).
exclude_keys : frozenset[str]
Keys to exclude from the JSON output to reduce file size.
"""
if exclude_keys:
pointers = _externalise_arrays(output, dest, exclude_keys)
filtered = _strip_keys(output, exclude_keys)
instances = filtered.get("attack_experiment_logger", {}).get(
"attack_instance_logger", {}
)
for inst_key, fname in pointers.items():
if inst_key in instances:
instances[inst_key]["arrays_file"] = fname
else:
filtered = output
attack_formatter = GenerateJSONModule(dest + ".json")
attack_report: str = json.dumps(_sanitise_floats(filtered), cls=CustomJSONEncoder)
attack_name: str = output["metadata"]["attack_name"]
attack_formatter.add_attack_output(attack_report, attack_name)
[docs]
class CustomJSONEncoder(json.JSONEncoder):
"""JSON encoder that can cope with numpy arrays, etc."""
[docs]
def default(self, o: object) -> object:
"""If an object is an np.ndarray, convert to list."""
if isinstance(o, np.ndarray):
return o.tolist()
if isinstance(o, (np.int64, np.int32)):
return int(o)
if isinstance(o, np.bool_):
return bool(o)
# Try the default method first
try: # pragma: no cover
return super().default(o)
except TypeError:
return str(o) # If object is not serializable, convert it to a string
def _write_dict(pdf: FPDF, data: dict, border: int = BORDER) -> None:
"""Write a dictionary to the pdf."""
for key, value in data.items():
pdf.set_font("arial", "B", 14)
pdf.cell(0, 5, key, border, 1, "L")
pdf.set_font("arial", "", 12)
pdf.multi_cell(0, 5, str(value), 0, 1)
pdf.ln(h=5)
[docs]
def title(
pdf: FPDF,
text: str,
border: int = BORDER,
font_size: int = 24,
font_style: str = "B",
) -> None:
"""Write a title block."""
pdf.set_font("arial", font_style, font_size)
pdf.ln(h=5)
pdf.cell(0, 0, text, border, 1, "C")
pdf.ln(h=5)
[docs]
def subtitle(
pdf: FPDF,
text: str,
indent: int = 10,
border: int = BORDER,
font_size: int = 12,
font_style: str = "B",
) -> None:
"""Write a subtitle block."""
pdf.cell(indent, border=border)
pdf.set_font("arial", font_style, font_size)
pdf.cell(75, 10, text, border, 1)
[docs]
def line(
pdf: FPDF,
text: str,
indent: int = 0,
border: int = BORDER,
font_size: int = 11,
font_style: str = "",
font: str = "arial",
) -> None:
"""Write a standard block."""
if indent > 0:
pdf.cell(indent, border=border)
pdf.set_font(font, font_style, font_size)
pdf.multi_cell(0, 5, text, border, 1)
def _roc_plot_single(metrics: dict, save_name: str) -> None:
"""Create a roc_plot for a single experiment."""
plt.figure()
plt.plot([0, 1], [0, 1], "k--")
plt.plot(metrics["fpr"], metrics["tpr"], "r", linewidth=2)
plt.xscale("log")
plt.yscale("log")
plt.grid()
plt.ylabel("True Positive Rate")
plt.xlabel("False Positive Rate")
plt.tight_layout()
plt.savefig(save_name)
plt.close()
def _roc_plot(metrics: dict, save_name: str) -> None:
"""Create a roc plot for multiple repetitions."""
plt.figure()
plt.plot([0, 1], [0, 1], "k--")
# Compute average ROC
base_fpr = np.linspace(0, 1, 1000)
all_tpr = np.zeros((len(metrics), len(base_fpr)), float)
for i, metric_set in enumerate(metrics):
all_tpr[i, :] = np.interp(base_fpr, metric_set["fpr"], metric_set["tpr"])
for _, metric_set in enumerate(metrics):
plt.plot(
metric_set["fpr"], metric_set["tpr"], color="lightsalmon", linewidth=0.5
)
tpr_mu = all_tpr.mean(axis=0)
plt.plot(base_fpr, tpr_mu, "r")
plt.xscale("log")
plt.yscale("log")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.tight_layout()
plt.grid()
plt.savefig(save_name)
plt.close()
[docs]
def create_mia_report(attack_output: dict) -> FPDF:
"""Make a worst case membership inference report.
Parameters
----------
attack_output : dict
Dictionary with the following items:
metadata : dict
Dictionary of metadata.
attack_experiment_logger : dict
List of metrics as dictionary items for an experiment.
dummy_attack_experiment_logger : dict
List of metrics as dictionary items across dummy experiments.
Returns
-------
pdf : fpdf.FPDF
fpdf document object
"""
mia_metrics = [
v
for _, v in attack_output["attack_experiment_logger"][
"attack_instance_logger"
].items()
]
metadata: dict = attack_output["metadata"]
path: str = metadata["attack_params"]["output_dir"]
dest_log_roc = os.path.join(path, "log_roc.png")
_roc_plot(mia_metrics, dest_log_roc)
pdf = FPDF()
pdf.add_page()
pdf.set_xy(0, 0)
title(pdf, "WorstCase MIA attack result report")
subtitle(pdf, "Introduction")
line(pdf, INTRODUCTION)
subtitle(pdf, "Experiment summary")
line(
pdf,
f"{'sacroml_version':>30s}: {str(metadata['sacroml_version']):30s}",
font="courier",
)
for key, value in metadata["attack_params"].items():
line(pdf, f"{key:>30s}: {str(value):30s}", font="courier")
subtitle(pdf, "Global metrics")
for key, value in metadata["global_metrics"].items():
line(pdf, f"{key:>30s}: {str(value):30s}", font="courier")
subtitle(pdf, "Metrics")
line(
pdf,
"The following show summaries of the attack metrics over the repetitions",
font="arial",
)
for metric in DISPLAY_METRICS:
vals = np.array([m[metric] for m in mia_metrics])
if metric in MAPPINGS:
vals = np.array([MAPPINGS[metric](v) for v in vals])
text = (
f"{metric:>12} mean = {vals.mean():.2f}, var = {vals.var():.4f}, "
f"min = {vals.min():.2f}, max = {vals.max():.2f}"
)
line(pdf, text, font="courier")
_add_log_roc_to_page(dest_log_roc, pdf)
line(pdf, LOGROC_CAPTION)
pdf.add_page()
title(pdf, "Glossary")
_write_dict(pdf, MIA_GLOSSARY)
if os.path.exists(dest_log_roc):
os.remove(dest_log_roc)
return pdf
[docs]
def create_structural_report(attack_output: dict) -> FPDF:
"""Make a structural attack report.
Parameters
----------
attack_output : dict
Dictionary with metadata and global_metrics.
Returns
-------
pdf : fpdf.FPDF
fpdf document object for the report.
"""
metadata = attack_output["metadata"]
metrics = metadata["global_metrics"]
pdf = FPDF()
pdf.add_page()
pdf.set_xy(0, 0)
title(pdf, "Structural Attack Report")
subtitle(pdf, "Introduction")
line(pdf, STRUCTURAL_INTRODUCTION)
subtitle(pdf, "Experiment Summary")
line(
pdf,
f"{'sacroml_version':>30s}: {str(metadata['sacroml_version']):30s}",
font="courier",
)
for key, value in metadata["attack_params"].items():
line(pdf, f"{key:>30s}: {str(value):30s}", font="courier")
subtitle(pdf, "Risk Summary")
for key, value in metrics.items():
risk_status = "Risk Detected" if value else "Not Detected"
line(pdf, f"{key:>30s}: {risk_status:30s}", font="courier")
pdf.add_page()
title(pdf, "Glossary")
_write_dict(pdf, STRUCTURAL_GLOSSARY)
return pdf
[docs]
def write_pdf(report_dest: str, pdf_report: FPDF) -> None:
"""Create pdf and append contents if it already exists."""
if os.path.exists(report_dest + ".pdf"):
old_pdf = report_dest + ".pdf"
new_pdf = report_dest + "_new.pdf"
pdf_report.output(new_pdf)
merger = PdfWriter()
for pdf in [old_pdf, new_pdf]:
merger.append(pdf)
merger.write(old_pdf)
merger.close()
os.remove(new_pdf)
else:
pdf_report.output(report_dest + ".pdf")
def _add_log_roc_to_page(log_roc: str = None, pdf_obj: FPDF = None) -> None:
if log_roc is not None:
pdf_obj.add_page()
subtitle(pdf_obj, "Log ROC")
pdf_obj.image(log_roc, x=None, y=None, w=0, h=140, type="", link="")
pdf_obj.set_font("arial", "", 12)
def _plot_lira_individuals(metrics: dict, dest: str) -> None:
"""Create a plot of the individual record LiRA scores."""
scores = np.array(metrics["individual"]["member_prob"])
member = np.array(metrics["individual"]["member"])
_, axes = plt.subplots(1, 2, figsize=(12.4, 4.8))
# members
mask = member == 1
y_train = scores[mask]
x_train = np.arange(y_train.shape[0])
sorted_indicies = np.argsort(y_train)
y_sorted = y_train[sorted_indicies]
axes[0].scatter(x_train, y_sorted, color="b", s=2, label="LiRA Probability")
axes[0].set_title("Member Records")
axes[0].set_xlabel("Record (sorted)")
axes[0].legend(loc=0)
# nonmembers
y_test = scores[~mask]
x_test = np.arange(y_test.shape[0])
sorted_indicies = np.argsort(y_test)
y_sorted = y_test[sorted_indicies]
axes[1].scatter(x_test, y_sorted, color="r", s=2, label="LiRA Probability")
axes[1].set_title("Nonmember Records")
axes[1].set_xlabel("Record (sorted)")
axes[1].legend(loc=0)
plt.tight_layout()
plt.savefig(dest)
plt.close()
[docs]
def create_lr_report(output: dict) -> FPDF:
"""Make a lira membership inference report.
Parameters
----------
output : dict
Dictionary with the following items:
metadata : dict
Dictionary of metadata.
attack_experiment_logger : dict
List of metrics as dictionary items for an experiments.
In case of LiRA attack scenario, this will have dictionary items of
`attack_instance_logger` that will have a single metrics dictionary.
Returns
-------
pdf : fpdf.FPDF
fpdf document object
"""
mia_metrics = [
v
for _, v in output["attack_experiment_logger"]["attack_instance_logger"].items()
][0]
metadata: dict = output["metadata"]
path: str = metadata["attack_params"]["output_dir"]
dest_log_roc = os.path.join(path, "log_roc.png")
_roc_plot_single(mia_metrics, dest_log_roc)
pdf = FPDF()
pdf.add_page()
pdf.set_xy(0, 0)
title(pdf, "Likelihood Ratio Attack Report")
subtitle(pdf, "Introduction")
subtitle(pdf, "Metadata")
line(
pdf,
f"{'sacroml_version':>30s}: {str(metadata['sacroml_version']):30s}",
font="courier",
)
for key, value in metadata["attack_params"].items():
line(pdf, f"{key:>30s}: {str(value):30s}", font="courier")
for key, value in metadata["global_metrics"].items():
line(pdf, f"{key:>30s}: {str(value):30s}", font="courier")
subtitle(pdf, "Metrics")
sub_metrics_dict = {
key: val for key, val in mia_metrics.items() if isinstance(val, float)
}
for key, value in sub_metrics_dict.items():
val = MAPPINGS[key](value) if key in MAPPINGS else value
line(pdf, f"{key:>30s}: {val:.4f}", font="courier")
pdf.add_page()
subtitle(pdf, "ROC Curve")
pdf.image(dest_log_roc, x=None, y=None, w=0, h=140, type="", link="")
dest_ind_plot = os.path.join(path, "lira_individual.png")
if "individual" in mia_metrics:
_plot_lira_individuals(mia_metrics, dest_ind_plot)
pdf.add_page()
subtitle(pdf, "Individual LiRA Scores")
pdf.image(dest_ind_plot, x=None, y=None, w=180, h=90, type="", link="")
# clean up
files = [dest_log_roc, dest_ind_plot]
for file in files:
if os.path.exists(file):
os.remove(file)
return pdf
[docs]
def create_qmia_report(output: dict) -> FPDF:
"""Make a quantile regression membership inference report.
Parameters
----------
output : dict
Dictionary with the following items:
metadata : dict
Dictionary of metadata.
attack_experiment_logger : dict
Dictionary containing ``attack_instance_logger`` with a single
metrics dictionary for the QMIA attack.
Returns
-------
pdf : fpdf.FPDF
fpdf document object.
"""
mia_metrics = [
v
for _, v in output["attack_experiment_logger"]["attack_instance_logger"].items()
][0]
metadata = output["metadata"]
path: str = metadata["attack_params"]["output_dir"]
dest_log_roc = os.path.join(path, "log_roc.png")
_roc_plot_single(mia_metrics, dest_log_roc)
pdf = FPDF()
pdf.add_page()
pdf.set_xy(0, 0)
title(pdf, "Quantile Regression Attack Report")
subtitle(pdf, "Introduction")
line(pdf, QMIA_INTRODUCTION)
subtitle(pdf, "Metadata")
line(
pdf,
f"{'sacroml_version':>30s}: {str(metadata['sacroml_version']):30s}",
font="courier",
)
for key, value in metadata["attack_params"].items():
line(pdf, f"{key:>30s}: {str(value):30s}", font="courier")
for key, value in metadata["global_metrics"].items():
line(pdf, f"{key:>30s}: {str(value):30s}", font="courier")
subtitle(pdf, "Metrics")
sub_metrics_dict = {
key: val for key, val in mia_metrics.items() if isinstance(val, float)
}
for key, value in sub_metrics_dict.items():
val = MAPPINGS[key](value) if key in MAPPINGS else value
line(pdf, f"{key:>30s}: {val:.4f}", font="courier")
pdf.add_page()
subtitle(pdf, "ROC Curve")
pdf.image(dest_log_roc, x=None, y=None, w=0, h=140, type="", link="")
# clean up
if os.path.exists(dest_log_roc):
os.remove(dest_log_roc)
return pdf
def _draw_n_vulnerable_histogram(n_vulnerable: list, output_dir: str) -> str:
"""Draw a bar chart of records grouped by number of attacks flagging them.
Parameters
----------
n_vulnerable : list
Per-record count of attacks that flagged each record.
output_dir : str
Directory in which to save the temporary PNG.
Returns
-------
str
Path to the saved PNG.
"""
os.makedirs(output_dir, exist_ok=True)
dest = os.path.join(output_dir, "_meta_n_vulnerable.png")
max_n = max(n_vulnerable) if n_vulnerable else 0
bins = list(range(max_n + 2))
fig, ax = plt.subplots(figsize=(6, 4))
ax.hist(n_vulnerable, bins=bins, color="#2e5cb8", edgecolor="white", align="left")
ax.set_xlabel("Number of attacks flagging the record")
ax.set_ylabel("Number of records")
ax.set_xticks(list(range(max_n + 1)))
plt.tight_layout()
fig.savefig(dest)
plt.close(fig)
return dest