Source code for modularml.core.experiment.results.eval_results

"""Results container for evaluation phases."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Literal

from modularml.core.data.batch import Batch
from modularml.core.experiment.results.phase_results import PhaseResults

if TYPE_CHECKING:
    from modularml.core.data.featureset_view import FeatureSetView
    from modularml.core.references.execution_reference import TensorLike
    from modularml.core.topology.graph_node import GraphNode
    from modularml.utils.data.data_format import DataFormat


[docs] @dataclass class EvalResults(PhaseResults): """ Results container for a single forward-pass evaluation phase. Description: EvalResults wraps the outputs of an EvalPhase, which executes a single epoch (epoch=0) over multiple batches. This class provides convenience methods for: - Automatic tensor stacking across batches - Loss aggregation (sum/mean) over batches All methods leverage the base PhaseResults query interface and use AxisSeries.collapse() for batch aggregation. Attributes: label (str): Phase label inherited from :class:`PhaseResults`. Example: Accessing EvalPhase results >>> # Run evaluation >>> eval_results = experiment.run_evaluation(phase=eval_phase) # doctest: +SKIP >>> # Get stacked outputs for a node (all batches concatenated) >>> outputs = eval_results.stacked_tensors( # doctest: +SKIP ... node="output_node", domain="outputs" ... ) >>> # Get total loss across all batches >>> total_loss = eval_results.aggregated_losses( # doctest: +SKIP ... node="output_node" ... ) >>> # Get all source data utilized in evaluation >>> source_view = eval_results.source_view(node="output_node") # doctest: +SKIP """ # ================================================ # Representation # ================================================ def __repr__(self): n_batches = self.n_batches if self._execution else 0 return f"EvalResults(label='{self.label}', batches={n_batches})" # ================================================ # Properties # ================================================ @property def batch_indices(self) -> list[int]: """ Sorted list of recorded batch indices. Returns: list[int]: Batch indices in ascending order. """ batch_vals = self.execution_contexts().axis_values("batch") return sorted(int(e) for e in batch_vals) @property def n_batches(self) -> int: """ The number of batches executed during evaluation. Returns: int: Number of recorded batches. """ return len(self.batch_indices) # ================================================ # Execution Data & Loss Querying # ================================================
[docs] def stacked_tensors( self, node: str | GraphNode, domain: Literal["outputs", "targets", "tags", "sample_uuids"], *, role: str = "default", fmt: DataFormat | None = None, unscale: bool = False, ) -> TensorLike: """ Retrieve tensors for a node, concatenated across all batches. Description: Collects tensors from the specified domain across all evaluation batches and concatenates them along the batch dimension using backend-aware concatenation (torch.cat, np.concatenate, or tf.concat). This is the primary method for retrieving complete evaluation outputs or targets in a single tensor. Args: node (str | GraphNode): The node to retrieve tensors for. Can be the node instance, its ID, or its label. domain (Literal["outputs", "targets", "tags", "sample_uuids"]): The domain of data to return: - outputs: the tensors produced by the node forward pass - targets: the expected output tensors (only for tail nodes) - tags: any tracked tags during the node's forward pass - sample_uuids: the sample identifiers role (str, optional): If multi-role data, specifies which role to return. Defaults to "default". fmt (DataFormat | None, optional): Format to cast returned tensors to. If None, uses as-produced format. Defaults to None. unscale (bool, optional): Whether to inverse any applied scalers. Only valid for tail nodes with domain in ["outputs", "targets"]. Defaults to False. Returns: TensorLike: A single tensor containing concatenated data from all batches. Example: Accessing all predictions across all eval batches: >>> # Get all predictions stacked >>> predictions = eval_results.stacked_tensors( # doctest: +SKIP ... node="output_node", ... domain="outputs", ... ) >>> # Get targets, unscaled, as numpy >>> targets = eval_results.stacked_tensors( # doctest: +SKIP ... node="output_node", ... domain="targets", ... fmt="np", ... unscale=True, ... ) """ tensor_series = self.tensors( node=node, domain=domain, role=role, fmt=fmt, unscale=unscale, ) # Collapse batch axis, then get single value (only epoch=0) collapsed = tensor_series.collapse(axis="batch", reducer="concat") return collapsed.one()
[docs] def stacked_batches( self, node: str | GraphNode, *, fmt: DataFormat | None = None, ) -> Batch: """ Retrieve all batches for a node, concatenated into a single Batch. Description: Collects Batch objects from all evaluation batches and concatenates them using Batch.concat(). This provides access to all data domains (outputs, targets, tags, sample_uuids) plus role weights and masks in a single container. Args: node (str | GraphNode): The node to retrieve batches for. fmt (DataFormat | None, optional): Format to cast tensor data to. Defaults to None. Returns: Batch: A single Batch containing concatenated data from all batches. Example: Getting concatenated batches across all execution batches >>> batch = eval_results.stacked_batches( # doctest: +SKIP ... node="output_node" ... ) >>> print(f"Total samples: {batch.batch_size}") # doctest: +SKIP >>> print(f"Outputs shape: {batch.outputs.shape}") # doctest: +SKIP """ batch_series = self.batches(node=node) batches = list(batch_series.values()) if fmt is not None: batches = [b.to_format(fmt) for b in batches] return Batch.concat(*batches, fmt=fmt)
[docs] def aggregated_losses( self, node: str | GraphNode, *, reducer: Literal["mean", "sum"] = "mean", ) -> dict[str, float]: """ Aggregates losses over all batches within this eval phase. Args: node (str | GraphNode): The node to filter losses to. Can be the node instance, its ID, or its label. reducer (Literal['mean', 'sum']): How losses should be aggregated. Defaults to "mean". Returns: dict[str, float]: Aggregated lossed, keyed by the AppliedLoss label. """ n_losses = self.losses(node=node) # Ensure only one epoch if n_losses.shape["epoch"] != 1: msg = "EvalResults contain more than one epoch. Cannot collapse losses." raise ValueError(msg) n_losses = n_losses.collapse(axis="epoch", reducer="first") # Aggregate over batches b_losses = n_losses.collapse(axis="batch", reducer=reducer) # Ensure only "label" axis remains if len(b_losses.axes) != 1 or b_losses.axes[0] != "label": msg = ( "Failed to collapse losses. Expected only a remaining axis of " f"'label'. Got: {b_losses.axes}." ) raise RuntimeError(msg) return {k: lr.auxiliary for k, lr in b_losses.items()}
# ================================================ # Source Data Access # ================================================
[docs] def source_views( self, node: str | GraphNode, *, role: str = "default", batch: int | None = None, ) -> dict[str, FeatureSetView]: """ Get the source FeatureSetViews that contributed data to the given node. Description: Traces the node back to its upstream FeatureSets, collects all unique sample UUIDs from execution results, and returns a view of each upstream FeatureSet filtered to only the samples used. Note that the returned views contain only unique sample UUIDs used in generating these phase results. They are not a 1-to-1 mapping of result sample to source sample. Use `tensors()` to get exact execution data. Args: node (str | GraphNode): The node to trace upstream from. Can be the node instance, its ID, or its label. role (str, optional): Restrict to samples from this role only. Defaults to "default". batch (int | None, optional): Restrict to samples from this batch only. Returns: dict[str, FeatureSetView]: A mapping of FeatureSet label to FeatureSetView containing only the samples used during execution. """ return super().source_views( node=node, role=role, epoch=None, batch=batch, )
[docs] def source_view( self, node: str | GraphNode, *, role: str = "default", batch: int | None = None, ) -> FeatureSetView: """ Get the single source FeatureSetView for the given node. Description: Convenience method for the common case where a node has exactly one upstream FeatureSet. Raises `ValueError` if multiple upstream FeatureSets exist. Note that the returned views contain only unique sample UUIDs used in generating these phase results. They are not a 1-to-1 mapping of result sample to source sample. Use `tensors()` to get exact execution data. Args: node (str | GraphNode): The node to trace upstream from. role (str, optional): Restrict to samples from this role only. Defaults to "default". batch (int | None, optional): Restrict to samples from this batch only. Returns: FeatureSetView: A view of the single upstream FeatureSet filtered to only the samples used during execution. Raises: ValueError: If the node has multiple upstream FeatureSets. """ views = self.source_views(node=node, role=role, batch=batch) if len(views) != 1: msg = ( f"Node has {len(views)} upstream FeatureSets: " f"{list(views.keys())}. Use source_views() instead." ) raise ValueError(msg) return next(iter(views.values()))