Advanced Topics

This section covers advanced features and techniques for sophisticated use cases.

Cost-Aware Threshold Optimization

Beyond Standard Metrics

While metrics like F1 score are useful, real-world applications often have specific costs associated with different types of errors:

from optimal_cutoffs import get_optimal_threshold
import numpy as np

# Medical screening example
y_true = np.array([0, 0, 1, 1, 0, 1, 1, 0])
y_prob = np.array([0.1, 0.3, 0.7, 0.9, 0.2, 0.8, 0.6, 0.4])

# Define utility matrix (benefits positive, costs negative)
medical_costs = {
    "tp": 1000,    # Benefit: Early detection saves $1000 in treatment
    "tn": 0,       # No additional cost for correct negative
    "fp": -200,    # Cost: Unnecessary procedure costs $200
    "fn": -5000    # Cost: Missed diagnosis costs $5000 in delayed treatment
}

# Optimize for expected utility
threshold_cost = get_optimal_threshold(
    y_true, y_prob,
    utility=medical_costs
)

print(f"Cost-optimized threshold: {threshold_cost:.3f}")

Bayes-Optimal Thresholds

For well-calibrated probabilities, you can calculate the theoretical optimum without training data:

from optimal_cutoffs import bayes_threshold_from_utility, bayes_threshold_from_costs

# Method 1: Direct utility specification
threshold_bayes = bayes_threshold_from_utility(
    U_tp=1000, U_tn=0, U_fp=-200, U_fn=-5000
)

# Method 2: Cost/benefit specification (cleaner interface)
threshold_bayes2 = bayes_threshold_from_costs(
    fp_cost=200, fn_cost=5000, tp_benefit=1000
)

print(f"Bayes optimal threshold: {threshold_bayes:.3f}")
print(f"Equivalent calculation: {threshold_bayes2:.3f}")

# Use Bayes threshold directly (no training labels needed)
threshold_empirical = get_optimal_threshold(
    None, y_prob,  # None for true labels
    utility=medical_costs,
    bayes=True
)

The Bayes-optimal threshold is: t* = (U_tn - U_fp) / [(U_tn - U_fp) + (U_tp - U_fn)]

Probability Calibration

Threshold optimization assumes well-calibrated probabilities. If your model’s probabilities are poorly calibrated, consider calibration first:

from sklearn.calibration import CalibratedClassifierCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification

# Generate data
X, y = make_classification(n_samples=2000, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Train base classifier
base_clf = RandomForestClassifier(n_estimators=50, random_state=42)

# Apply calibration
calibrated_clf = CalibratedClassifierCV(base_clf, method='isotonic', cv=5)
calibrated_clf.fit(X_train, y_train)

# Compare calibrated vs uncalibrated probabilities
base_clf.fit(X_train, y_train)

y_prob_uncalibrated = base_clf.predict_proba(X_train)[:, 1]
y_prob_calibrated = calibrated_clf.predict_proba(X_train)[:, 1]

# Optimize thresholds for both
threshold_uncal = get_optimal_threshold(y_train, y_prob_uncalibrated, metric='f1')
threshold_cal = get_optimal_threshold(y_train, y_prob_calibrated, metric='f1')

print(f"Uncalibrated threshold: {threshold_uncal:.3f}")
print(f"Calibrated threshold: {threshold_cal:.3f}")

Advanced Multiclass Strategies

Coordinate Ascent Optimization

For multiclass problems requiring single-label consistency (exactly one prediction per sample):

# Standard One-vs-Rest (default)
thresholds_ovr = get_optimal_threshold(y_true, y_prob, metric='f1', method='auto')

# Coordinate ascent for single-label consistency
thresholds_coord = get_optimal_threshold(y_true, y_prob, metric='f1', method='coord_ascent')

print(f"OvR thresholds: {thresholds_ovr}")
print(f"Coordinate ascent thresholds: {thresholds_coord}")

The coordinate ascent method couples classes through joint assignment using argmax(p_j - tau_j), ensuring exactly one prediction per sample.

Custom Multiclass Costs

For different costs across classes (planned feature):

# Current workaround: Optimize each class separately
def optimize_multiclass_with_costs(y_true, y_prob, class_costs):
    """Optimize multiclass thresholds with different costs per class."""
    n_classes = y_prob.shape[1]
    thresholds = []

    for class_idx in range(n_classes):
        # Convert to binary problem
        y_binary = (y_true == class_idx).astype(int)
        y_prob_binary = y_prob[:, class_idx]

        # Get costs for this class
        costs = class_costs.get(class_idx, {"fp": -1.0, "fn": -1.0})

        threshold = get_optimal_threshold(
            y_binary, y_prob_binary,
            utility=costs
        )
        thresholds.append(threshold)

    return np.array(thresholds)

# Example usage
class_specific_costs = {
    0: {"fp": -1.0, "fn": -2.0},    # Class 0: FN twice as costly
    1: {"fp": -5.0, "fn": -1.0},    # Class 1: FP five times as costly
    2: {"fp": -1.0, "fn": -10.0},   # Class 2: FN ten times as costly
}

# custom_thresholds = optimize_multiclass_with_costs(y_true, y_prob, class_specific_costs)

Performance Optimization

Algorithm Selection

Understanding when to use each optimization method:

import time

def benchmark_methods(y_true, y_prob, methods=['auto', 'sort_scan', 'smart_brute', 'minimize']):
    """Benchmark different optimization methods."""
    results = {}

    for method in methods:
        start_time = time.time()
        try:
            threshold = get_optimal_threshold(y_true, y_prob, metric='f1', method=method)
            elapsed = time.time() - start_time
            results[method] = {'threshold': threshold, 'time': elapsed}
        except Exception as e:
            results[method] = {'threshold': None, 'time': None, 'error': str(e)}

    return results

# Test with different data sizes
for size in [1000, 5000, 10000]:
    y = np.random.randint(0, 2, size)
    y_prob = np.random.uniform(0, 1, size)

    results = benchmark_methods(y, y_prob)
    print(f"\nDataset size: {size}")
    for method, result in results.items():
        if result.get('time'):
            print(f"{method:>12}: {result['time']:.4f}s, threshold={result['threshold']:.3f}")
        else:
            print(f"{method:>12}: FAILED ({result.get('error', 'Unknown error')})")

Memory-Efficient Processing

For very large datasets:

def chunked_optimization(y_true, y_prob, chunk_size=10000, method='auto'):
    """Process large datasets in chunks."""
    n_samples = len(y_true)
    chunk_thresholds = []

    for start_idx in range(0, n_samples, chunk_size):
        end_idx = min(start_idx + chunk_size, n_samples)

        chunk_true = y_true[start_idx:end_idx]
        chunk_prob = y_prob[start_idx:end_idx]

        threshold = get_optimal_threshold(
            chunk_true, chunk_prob,
            metric='f1',
            method=method
        )
        chunk_thresholds.append(threshold)

    # Combine results (various strategies possible)
    final_threshold = np.median(chunk_thresholds)  # Robust to outliers
    return final_threshold, chunk_thresholds

# Example with large synthetic dataset
large_y = np.random.randint(0, 2, 100000)
large_prob = np.random.uniform(0, 1, 100000)

threshold, chunk_results = chunked_optimization(large_y, large_prob)
print(f"Final threshold: {threshold:.3f}")
print(f"Chunk variation: {np.std(chunk_results):.3f}")

Custom Metrics and Vectorization

Advanced Metric Registration

from optimal_cutoffs.metrics import register_metric
import numpy as np

# Define a complex custom metric
def balanced_accuracy(tp, tn, fp, fn):
    """Balanced accuracy: average of sensitivity and specificity."""
    sensitivity = tp / (tp + fn) if tp + fn > 0 else 0.0
    specificity = tn / (tn + fp) if tn + fp > 0 else 0.0
    return (sensitivity + specificity) / 2

# Vectorized version for O(n log n) optimization
def balanced_accuracy_vectorized(tp, tn, fp, fn):
    """Vectorized balanced accuracy computation."""
    sensitivity = np.divide(tp, tp + fn, out=np.zeros_like(tp, dtype=float),
                          where=(tp + fn) > 0)
    specificity = np.divide(tn, tn + fp, out=np.zeros_like(tn, dtype=float),
                          where=(tn + fp) > 0)
    return (sensitivity + specificity) / 2

# Register with vectorized version
register_metric(
    'balanced_accuracy',
    balanced_accuracy,
    vectorized_func=balanced_accuracy_vectorized,
    is_piecewise=True,
    maximize=True,
    needs_proba=False
)

# Use the custom metric
threshold = get_optimal_threshold(y_true, y_prob, metric='balanced_accuracy')
print(f"Balanced accuracy optimized threshold: {threshold:.3f}")

Non-Piecewise Metrics

For metrics that are not piecewise-constant:

def log_loss_metric(tp, tn, fp, fn):
    """Not piecewise - depends on actual probability values."""
    # This is just an example - real log loss needs probabilities
    return -(tp * np.log(0.9) + tn * np.log(0.9) + fp * np.log(0.1) + fn * np.log(0.1))

register_metric(
    'log_loss_approx',
    log_loss_metric,
    is_piecewise=False,  # Not piecewise-constant
    maximize=False,      # Minimize log loss
    needs_proba=True     # Requires probability values
)

# This will use continuous optimization methods
threshold = get_optimal_threshold(y_true, y_prob, metric='log_loss_approx', method='minimize')

Nested Cross-Validation

For unbiased performance estimation:

from optimal_cutoffs import nested_cv_threshold_optimization

# Nested cross-validation for unbiased performance estimation
outer_scores, inner_results = nested_cv_threshold_optimization(
    y_true, y_prob,
    metric='f1',
    outer_cv=5,
    inner_cv=3,
    method='auto'
)

print(f"Outer CV scores: {outer_scores}")
print(f"Unbiased performance estimate: {np.mean(outer_scores):.3f} ± {np.std(outer_scores):.3f}")

# Analyze inner fold variation
for fold_idx, inner_result in enumerate(inner_results):
    thresholds = inner_result['thresholds']
    scores = inner_result['scores']
    print(f"Outer fold {fold_idx}: threshold={np.mean(thresholds):.3f}, score={np.mean(scores):.3f}")

Production Deployment Considerations

Model Serialization

import joblib
from optimal_cutoffs import ThresholdOptimizer

# Train and optimize
optimizer = ThresholdOptimizer(metric='f1', method='auto')
optimizer.fit(y_train, y_prob_train)

# Save complete model
model_package = {
    'base_classifier': trained_classifier,  # Your trained classifier
    'threshold_optimizer': optimizer,
    'threshold': optimizer.threshold_,
    'training_metrics': {
        'f1_score': optimizer.score_,
        'threshold_std': 0.05  # From CV if available
    },
    'calibration_method': 'isotonic',  # If calibration was used
    'version': '1.0'
}

joblib.dump(model_package, 'production_model.pkl')

# Load and use in production
loaded_model = joblib.load('production_model.pkl')
base_clf = loaded_model['base_classifier']
optimizer = loaded_model['threshold_optimizer']

# Make predictions
def predict_production(X_new):
    y_prob = base_clf.predict_proba(X_new)[:, 1]
    y_pred = optimizer.predict(y_prob)
    return y_pred, y_prob

Monitoring and Drift Detection

def monitor_threshold_performance(y_true, y_prob, reference_threshold,
                                reference_score, tolerance=0.05):
    """Monitor if optimal threshold has drifted significantly."""
    from optimal_cutoffs.metrics import get_confusion_matrix, f1_score

    # Calculate current optimal threshold
    current_threshold = get_optimal_threshold(y_true, y_prob, metric='f1')

    # Calculate performance with reference threshold
    tp, tn, fp, fn = get_confusion_matrix(y_true, y_prob, reference_threshold)
    current_score_ref = f1_score(tp, tn, fp, fn)

    # Calculate performance with current optimal threshold
    tp, tn, fp, fn = get_confusion_matrix(y_true, y_prob, current_threshold)
    current_score_opt = f1_score(tp, tn, fp, fn)

    # Check for significant drift
    threshold_drift = abs(current_threshold - reference_threshold)
    performance_drop = reference_score - current_score_ref
    potential_improvement = current_score_opt - current_score_ref

    drift_detected = (threshold_drift > tolerance or
                     performance_drop > tolerance or
                     potential_improvement > tolerance)

    return {
        'drift_detected': drift_detected,
        'current_optimal_threshold': current_threshold,
        'threshold_drift': threshold_drift,
        'performance_with_reference': current_score_ref,
        'performance_drop': performance_drop,
        'potential_improvement': potential_improvement,
        'recommendation': 'retrain' if drift_detected else 'continue'
    }

# Example monitoring
monitoring_result = monitor_threshold_performance(
    y_test, y_prob_test,
    reference_threshold=0.3,
    reference_score=0.85
)

print(f"Drift detected: {monitoring_result['drift_detected']}")
print(f"Recommendation: {monitoring_result['recommendation']}")

A/B Testing Optimal Thresholds

def ab_test_thresholds(y_true, y_prob, threshold_a, threshold_b,
                      metric_func, confidence_level=0.95):
    """Statistical comparison of two thresholds."""
    from scipy import stats
    from optimal_cutoffs.metrics import get_confusion_matrix

    # Calculate performance for both thresholds
    tp_a, tn_a, fp_a, fn_a = get_confusion_matrix(y_true, y_prob, threshold_a)
    tp_b, tn_b, fp_b, fn_b = get_confusion_matrix(y_true, y_prob, threshold_b)

    score_a = metric_func(tp_a, tn_a, fp_a, fn_a)
    score_b = metric_func(tp_b, tn_b, fp_b, fn_b)

    # Bootstrap confidence intervals
    n_bootstrap = 1000
    scores_a_boot = []
    scores_b_boot = []

    for _ in range(n_bootstrap):
        # Resample with replacement
        indices = np.random.choice(len(y_true), len(y_true), replace=True)
        y_boot = y_true[indices]
        prob_boot = y_prob[indices]

        tp_a_boot, tn_a_boot, fp_a_boot, fn_a_boot = get_confusion_matrix(
            y_boot, prob_boot, threshold_a)
        tp_b_boot, tn_b_boot, fp_b_boot, fn_b_boot = get_confusion_matrix(
            y_boot, prob_boot, threshold_b)

        scores_a_boot.append(metric_func(tp_a_boot, tn_a_boot, fp_a_boot, fn_a_boot))
        scores_b_boot.append(metric_func(tp_b_boot, tn_b_boot, fp_b_boot, fn_b_boot))

    # Statistical test
    statistic, p_value = stats.ttest_rel(scores_b_boot, scores_a_boot)

    alpha = 1 - confidence_level
    significant = p_value < alpha

    return {
        'threshold_a': threshold_a,
        'threshold_b': threshold_b,
        'score_a': score_a,
        'score_b': score_b,
        'score_difference': score_b - score_a,
        'p_value': p_value,
        'significant': significant,
        'better_threshold': 'B' if score_b > score_a else 'A',
        'confidence_level': confidence_level
    }

# Example A/B test
from optimal_cutoffs.metrics import f1_score

result = ab_test_thresholds(
    y_test, y_prob_test,
    threshold_a=0.5,      # Default threshold
    threshold_b=0.3,      # Optimized threshold
    metric_func=f1_score
)

print(f"Threshold A: {result['threshold_a']:.3f}, Score: {result['score_a']:.3f}")
print(f"Threshold B: {result['threshold_b']:.3f}, Score: {result['score_b']:.3f}")
print(f"Difference: {result['score_difference']:.3f}")
print(f"Statistically significant: {result['significant']} (p={result['p_value']:.3f})")
print(f"Recommendation: Use threshold {result['better_threshold']}")