Skip to content

Metrics

apply_par(performance, budget, par_factor=10.0)

Apply PAR-k (Penalized Average Runtime) transformation to performance data.

This function replaces timeout values (values > budget) with budget * par_factor. This is crucial for algorithm selection because raw timeout values (e.g., 1200.999) look almost identical to near-timeout solves (e.g., 1199), but in practice timeouts should be heavily penalized.

Parameters:

Name Type Description Default
performance DataFrame | ndarray

Performance data where each value represents the runtime of an algorithm on an instance. Values greater than the budget indicate timeouts.

required
budget float

The algorithm cutoff time. Values exceeding this are considered timeouts.

required
par_factor float

The penalization factor. Timeouts will be replaced with budget * par_factor. Defaults to 10.0 (PAR10).

10.0

Returns:

Type Description
DataFrame | ndarray

pd.DataFrame | np.ndarray: Performance data with timeouts penalized. Returns the same type as the input.

Examples:

>>> import pandas as pd
>>> perf = pd.DataFrame({'algo1': [100, 1201, 500], 'algo2': [200, 200, 1201]})
>>> apply_par(perf, budget=1200, par_factor=10)
   algo1   algo2
0    100     200
1  12000     200
2    500   12000
Source code in asf/metrics/par10.py
def apply_par(
    performance: pd.DataFrame | np.ndarray,
    budget: float,
    par_factor: float = 10.0,
) -> pd.DataFrame | np.ndarray:
    """
    Apply PAR-k (Penalized Average Runtime) transformation to performance data.

    This function replaces timeout values (values > budget) with budget * par_factor.
    This is crucial for algorithm selection because raw timeout values (e.g., 1200.999)
    look almost identical to near-timeout solves (e.g., 1199), but in practice
    timeouts should be heavily penalized.

    Args:
        performance (pd.DataFrame | np.ndarray): Performance data where each value
            represents the runtime of an algorithm on an instance. Values greater
            than the budget indicate timeouts.
        budget (float): The algorithm cutoff time. Values exceeding this are considered
            timeouts.
        par_factor (float, optional): The penalization factor. Timeouts will be
            replaced with budget * par_factor. Defaults to 10.0 (PAR10).

    Returns:
        pd.DataFrame | np.ndarray: Performance data with timeouts penalized.
            Returns the same type as the input.

    Examples:
        >>> import pandas as pd
        >>> perf = pd.DataFrame({'algo1': [100, 1201, 500], 'algo2': [200, 200, 1201]})
        >>> apply_par(perf, budget=1200, par_factor=10)
           algo1   algo2
        0    100     200
        1  12000     200
        2    500   12000
    """
    if isinstance(performance, pd.DataFrame):
        result = performance.copy()
        result = result.where(result <= budget, budget * par_factor)
        return result
    else:
        return np.where(performance <= budget, performance, budget * par_factor)

apply_par10(performance, budget)

Apply PAR10 (Penalized Average Runtime with factor 10) transformation.

Convenience function that calls apply_par with par_factor=10.

Parameters:

Name Type Description Default
performance DataFrame | ndarray

Performance data.

required
budget float

The algorithm cutoff time.

required

Returns:

Type Description
DataFrame | ndarray

pd.DataFrame | np.ndarray: Performance data with timeouts penalized by 10x.

See Also

apply_par: The general PAR-k transformation function.

Source code in asf/metrics/par10.py
def apply_par10(
    performance: pd.DataFrame | np.ndarray,
    budget: float,
) -> pd.DataFrame | np.ndarray:
    """
    Apply PAR10 (Penalized Average Runtime with factor 10) transformation.

    Convenience function that calls apply_par with par_factor=10.

    Args:
        performance (pd.DataFrame | np.ndarray): Performance data.
        budget (float): The algorithm cutoff time.

    Returns:
        pd.DataFrame | np.ndarray: Performance data with timeouts penalized by 10x.

    See Also:
        apply_par: The general PAR-k transformation function.
    """
    return apply_par(performance, budget, par_factor=10.0)

compute_solve_rate(schedules, performance, budget)

Compute the solve rate for selector predictions.

For each instance in the schedules, determines if it was solved within budget. An instance is solved if at least one algorithm in the schedule completes within its allocated time.

Parameters

schedules : dict[str, list[tuple[str, float] | str]] Selector predictions mapping instance_id to schedule/selections. performance : pd.DataFrame Performance data for the algorithms. budget : float The time budget for solving.

Returns

float Solve rate (fraction of instances solved within budget, 0-1).

Source code in asf/metrics/baselines.py
def compute_solve_rate(
    schedules: dict[str, list[tuple[str, float] | str]],
    performance: pd.DataFrame,
    budget: float,
) -> float:
    """
    Compute the solve rate for selector predictions.

    For each instance in the schedules, determines if it was solved within budget.
    An instance is solved if at least one algorithm in the schedule completes within
    its allocated time.

    Parameters
    ----------
    schedules : dict[str, list[tuple[str, float] | str]]
        Selector predictions mapping instance_id to schedule/selections.
    performance : pd.DataFrame
        Performance data for the algorithms.
    budget : float
        The time budget for solving.

    Returns
    -------
    float
        Solve rate (fraction of instances solved within budget, 0-1).
    """
    # Get per-instance times using the performance metrics
    times_dict: dict[str, float] | float = running_time_selector_performance(
        schedules, performance, budget=budget, par=10.0, return_per_instance=True
    )

    if not isinstance(times_dict, dict):
        return 0.0

    # Count instances solved within budget (where time is not penalized)
    solved_count = sum(1 for time in times_dict.values() if time <= budget)
    total_count = len(times_dict)

    return float(solved_count / total_count) if total_count > 0 else 0.0

running_time_closed_gap(schedules, performance, budget, feature_time, par=10.0, feature_groups=None)

Calculates the closed gap metric for a given selector.

Parameters

schedules : dict[str, list[tuple[str, float] | str]] The schedules to evaluate. performance : pd.DataFrame The performance data for the algorithms. budget : float The budget for the scenario. feature_time : pd.DataFrame The feature time data for each instance. par : float, default=10.0 The penalization factor for unsolved instances. feature_groups : dict[str, Any] or None, default=None Feature group definitions including prerequisite information.

Returns

float The closed gap value, representing the improvement over the single best solver.

Source code in asf/metrics/baselines.py
def running_time_closed_gap(
    schedules: dict[str, list[tuple[str, float] | str]],
    performance: pd.DataFrame,
    budget: float,
    feature_time: pd.DataFrame,
    par: float = 10.0,
    feature_groups: dict[str, Any] | None = None,
) -> float:
    """
    Calculates the closed gap metric for a given selector.

    Parameters
    ----------
    schedules : dict[str, list[tuple[str, float] | str]]
        The schedules to evaluate.
    performance : pd.DataFrame
        The performance data for the algorithms.
    budget : float
        The budget for the scenario.
    feature_time : pd.DataFrame
        The feature time data for each instance.
    par : float, default=10.0
        The penalization factor for unsolved instances.
    feature_groups : dict[str, Any] or None, default=None
        Feature group definitions including prerequisite information.

    Returns
    -------
    float
        The closed gap value, representing the improvement over the single best solver.
    """
    # Validate feature group prerequisites if feature_groups is provided
    if feature_groups is not None:
        _validate_schedule_prerequisites(schedules, feature_groups)

    sbs_val = single_best_solver(performance, False, budget, par)
    vbs_val = virtual_best_solver(performance, False, budget, par)
    s_val = running_time_selector_performance(
        schedules, performance, budget, feature_time, par
    )

    if isinstance(s_val, dict):
        s_val = float(sum(s_val.values()))

    denominator = sbs_val - vbs_val
    if abs(denominator) < 1e-9:
        return 0.0

    return (sbs_val - s_val) / denominator

running_time_selector_performance(schedules, performance, budget=5000.0, feature_time=None, par=10.0, return_per_instance=False)

Calculates the total running time for a selector based on the given schedules and performance data.

The schedule can contain both feature groups (strings) and algorithm selections (tuples). Feature groups are evaluated in order, and their computation time is only added if the instance is not yet solved when the feature group appears in the schedule.

Parameters

schedules : dict[str, list[tuple[str, float] | str]] The schedules to evaluate, where each key is an instance and the value is a list of items. Each item can be: - A string: the name of a feature group to compute (uses full actual time) - A tuple (feature_group, budget): a feature group with a time budget - A tuple (algorithm, budget): an algorithm to run with its allocated budget performance : pd.DataFrame The performance data for the algorithms. budget : float, default=5000.0 The budget for the scenario. feature_time : pd.DataFrame or None, default=None The feature time data for each instance. Columns should be feature group names. par : float, default=10.0 The penalization factor for unsolved instances. return_per_instance : bool, default=False If True, return a dict mapping instance to running time. If False, return the sum of all running times.

Returns

dict[str, float] or float If return_per_instance is True, returns a dictionary mapping each instance to its total running time. Otherwise, returns the sum of all running times.

Raises

ValueError If the schedule is invalid (e.g., total allocated time to algorithms is zero).

Source code in asf/metrics/baselines.py
def running_time_selector_performance(
    schedules: dict[str, list[tuple[str, float] | str]],
    performance: pd.DataFrame,
    budget: float = 5000.0,
    feature_time: pd.DataFrame | None = None,
    par: float = 10.0,
    return_per_instance: bool = False,
) -> dict[str, float] | float:
    """
    Calculates the total running time for a selector based on the given schedules and performance data.

    The schedule can contain both feature groups (strings) and algorithm selections (tuples).
    Feature groups are evaluated in order, and their computation time is only added if the
    instance is not yet solved when the feature group appears in the schedule.

    Parameters
    ----------
    schedules : dict[str, list[tuple[str, float] | str]]
        The schedules to evaluate, where each key is an instance and the value is a list of items.
        Each item can be:
        - A string: the name of a feature group to compute (uses full actual time)
        - A tuple (feature_group, budget): a feature group with a time budget
        - A tuple (algorithm, budget): an algorithm to run with its allocated budget
    performance : pd.DataFrame
        The performance data for the algorithms.
    budget : float, default=5000.0
        The budget for the scenario.
    feature_time : pd.DataFrame or None, default=None
        The feature time data for each instance. Columns should be feature group names.
    par : float, default=10.0
        The penalization factor for unsolved instances.
    return_per_instance : bool, default=False
        If True, return a dict mapping instance to running time.
        If False, return the sum of all running times.

    Returns
    -------
    dict[str, float] or float
        If return_per_instance is True, returns a dictionary mapping each instance
        to its total running time. Otherwise, returns the sum of all running times.

    Raises
    ------
    ValueError
        If the schedule is invalid (e.g., total allocated time to algorithms is zero).
    """
    if feature_time is None:
        feature_time = pd.DataFrame(
            0.0,
            index=performance.index,
            columns=["feature_time"],
        )

    total_time: dict[str, float] = {}
    for instance, schedule in schedules.items():
        instance_feature_time = 0.0
        algorithm_items = []  # List of (algorithm, budget) tuples
        saw_feature_group = False

        # Process schedule items
        for item in schedule:
            if isinstance(item, str):
                # Feature group without budget: add its full computation time if available
                if item in feature_time.columns:
                    saw_feature_group = True
                    ft_val = feature_time.loc[instance, item]
                    if hasattr(ft_val, "item"):
                        ft_val = ft_val.item()
                    instance_feature_time += (
                        0.0
                        if (
                            ft_val is None
                            or (isinstance(ft_val, float) and np.isnan(ft_val))
                        )
                        else float(ft_val)
                    )
            elif isinstance(item, tuple) and len(item) == 2:
                item_name, item_budget = item
                if item_name in feature_time.columns:
                    saw_feature_group = True
                    # Feature group with budget: use min(actual_time, budget)
                    ft_val = feature_time.loc[instance, item_name]
                    if hasattr(ft_val, "item"):
                        ft_val = ft_val.item()
                    actual_ft = (
                        0.0
                        if (
                            ft_val is None
                            or (isinstance(ft_val, float) and np.isnan(ft_val))
                        )
                        else float(ft_val)
                    )
                    instance_feature_time += min(actual_ft, item_budget or 0.0)
                else:
                    # Algorithm selection
                    algorithm_items.append(
                        (item_name, item_budget if item_budget is not None else 0.0)
                    )

        if not saw_feature_group:
            instance_feature_time = float(feature_time.loc[instance].sum())

        # Calculate total algorithm time used
        total_algorithm_time = sum(alloc_budget for _, alloc_budget in algorithm_items)

        # Validate: at least some algorithm time was allocated
        if total_algorithm_time <= 0.0:
            raise ValueError(
                f"Instance {instance}: No algorithm time allocated in schedule {schedule}. "
            )

        # Check if this is a parallel portfolio (all algorithms get the same budget)
        # or sequential (budgets may vary)
        budgets = [alloc_budget for _, alloc_budget in algorithm_items]
        is_parallel = (
            len(set(budgets)) == 1 and len(algorithm_items) > 1 and budgets[0] >= budget
        )

        if is_parallel:
            # Parallel portfolio: each algorithm runs for its budget concurrently
            # Overall time is the minimum time needed to solve
            times = []
            solved = False
            for algorithm, allocated_budget in algorithm_items:
                if algorithm in performance.columns:
                    algo_perf = performance.loc[instance, algorithm]
                    if algo_perf <= allocated_budget:
                        times.append(algo_perf)
                        solved = True

            if solved:
                total_time[instance] = min(times) + instance_feature_time
            else:
                total_time[instance] = budget * par
        else:
            # Sequential: algorithms run one after another until one solves
            cumulative_time = instance_feature_time
            solved = False
            for algorithm, allocated_budget in algorithm_items:
                if solved:
                    break
                if algorithm in performance.columns:
                    algo_perf = performance.loc[instance, algorithm]
                    if algo_perf <= allocated_budget:
                        cumulative_time += algo_perf
                        solved = True
                    else:
                        cumulative_time += allocated_budget

            if solved:
                total_time[instance] = cumulative_time
            else:
                total_time[instance] = budget * par

    if return_per_instance:
        return total_time

    return float(sum(total_time.values()))

single_best_solver(performance, maximize=False, budget=5000.0, par=10.0, batch=False)

Selects the single best solver across all instances based on the aggregated performance.

Parameters

performance : pd.DataFrame The performance data for the algorithms. maximize : bool, default=False Whether to maximize or minimize the performance. budget : float or None, default=5000.0 The runtime budget. If provided with par, timeouts are penalized. par : float or None, default=10.0 The penalization factor for timeouts. batch : bool, default=False If True, return one score per algorithm.

Returns

float or pd.Series The best aggregated performance value across all instances. In batch mode, returns each algorithm's aggregated performance.

Source code in asf/metrics/baselines.py
def single_best_solver(
    performance: pd.DataFrame,
    maximize: bool = False,
    budget: float | None = 5000.0,
    par: float | None = 10.0,
    batch: bool = False,
) -> float | pd.Series:
    """
    Selects the single best solver across all instances based on the aggregated performance.

    Parameters
    ----------
    performance : pd.DataFrame
        The performance data for the algorithms.
    maximize : bool, default=False
        Whether to maximize or minimize the performance.
    budget : float or None, default=5000.0
        The runtime budget. If provided with par, timeouts are penalized.
    par : float or None, default=10.0
        The penalization factor for timeouts.
    batch : bool, default=False
        If True, return one score per algorithm.

    Returns
    -------
    float or pd.Series
        The best aggregated performance value across all instances. In batch
        mode, returns each algorithm's aggregated performance.
    """
    if budget is not None and par is not None:
        performance_vals = np.where(performance <= budget, performance, budget * par)
    else:
        performance_vals = performance.values

    perf_sum = np.sum(performance_vals, axis=0)
    if batch:
        return pd.Series(perf_sum, index=performance.columns)

    if maximize:
        return float(np.max(perf_sum))
    else:
        return float(np.min(perf_sum))

virtual_best_solver(performance, maximize=False, budget=5000.0, par=10.0, batch=False)

Selects the virtual best solver for each instance by choosing the best performance per instance.

Parameters

performance : pd.DataFrame The performance data for the algorithms. maximize : bool, default=False Whether to maximize or minimize the performance. budget : float or None, default=5000.0 The runtime budget. If provided with par, timeouts are penalized. par : float or None, default=10.0 The penalization factor for timeouts. batch : bool, default=False If True, return one score per algorithm.

Returns

float or pd.Series The sum of the best performance values for each instance. In batch mode, returns each algorithm's aggregated performance.

Source code in asf/metrics/baselines.py
def virtual_best_solver(
    performance: pd.DataFrame,
    maximize: bool = False,
    budget: float | None = 5000.0,
    par: float | None = 10.0,
    batch: bool = False,
) -> float | pd.Series:
    """
    Selects the virtual best solver for each instance by choosing the best performance per instance.

    Parameters
    ----------
    performance : pd.DataFrame
        The performance data for the algorithms.
    maximize : bool, default=False
        Whether to maximize or minimize the performance.
    budget : float or None, default=5000.0
        The runtime budget. If provided with par, timeouts are penalized.
    par : float or None, default=10.0
        The penalization factor for timeouts.
    batch : bool, default=False
        If True, return one score per algorithm.

    Returns
    -------
    float or pd.Series
        The sum of the best performance values for each instance. In batch
        mode, returns each algorithm's aggregated performance.
    """
    if budget is not None and par is not None:
        performance_vals = np.where(performance <= budget, performance, budget * par)
    else:
        performance_vals = performance.values

    if batch:
        return pd.Series(np.sum(performance_vals, axis=0), index=performance.columns)

    if maximize:
        return float(np.max(performance_vals, axis=1).sum())
    else:
        return float(np.min(performance_vals, axis=1).sum())