Advanced Usage Examples¶
This section covers advanced calibration techniques and specialized use cases.
Multi-Class Calibration¶
While Calibre focuses on binary calibration, you can extend it to multi-class problems:
import numpy as np
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from calibre import NearlyIsotonicRegression
# Generate multi-class dataset
X, y = make_classification(
n_samples=2000,
n_features=20,
n_classes=3,
n_redundant=2,
n_informative=18,
random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.5, random_state=42
)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
y_pred_proba = model.predict_proba(X_test)
# Calibrate each class separately using One-vs-Rest approach
calibrators = {}
y_pred_cal = np.zeros_like(y_pred_proba)
for class_idx in range(3):
# Create binary labels for current class
y_binary = (y_test == class_idx).astype(int)
y_pred_binary = y_pred_proba[:, class_idx]
# Fit calibrator for this class
calibrator = NearlyIsotonicRegression(lam=1.0, method='path')
calibrator.fit(y_pred_binary, y_binary)
# Store calibrator and get calibrated predictions
calibrators[class_idx] = calibrator
y_pred_cal[:, class_idx] = calibrator.transform(y_pred_binary)
# Renormalize probabilities to sum to 1
y_pred_cal = y_pred_cal / y_pred_cal.sum(axis=1, keepdims=True)
print("Multi-class calibration completed")
print(f"Original probability sums: {y_pred_proba.sum(axis=1)[:5]}")
print(f"Calibrated probability sums: {y_pred_cal.sum(axis=1)[:5]}")
Custom Calibration Pipelines¶
Creating Calibration Ensembles¶
from calibre import (
NearlyIsotonicRegression,
ISplineCalibrator,
RelaxedPAVA,
mean_calibration_error
)
class CalibrationEnsemble:
"""Ensemble of calibration methods."""
def __init__(self, calibrators, weights=None):
self.calibrators = calibrators
self.weights = weights or [1.0] * len(calibrators)
self.weights = np.array(self.weights) / np.sum(self.weights)
def fit(self, X, y):
"""Fit all calibrators."""
for calibrator in self.calibrators:
calibrator.fit(X, y)
return self
def transform(self, X):
"""Ensemble prediction using weighted average."""
predictions = []
for calibrator in self.calibrators:
pred = calibrator.transform(X)
predictions.append(pred)
# Weighted average
ensemble_pred = np.zeros_like(predictions[0])
for pred, weight in zip(predictions, self.weights):
ensemble_pred += weight * pred
return ensemble_pred
# Create ensemble
calibrators = [
NearlyIsotonicRegression(lam=1.0, method='path'),
ISplineCalibrator(n_splines=10, degree=3, cv=3),
RelaxedPAVA(percentile=10, adaptive=True)
]
ensemble = CalibrationEnsemble(calibrators, weights=[0.4, 0.3, 0.3])
ensemble.fit(y_pred_uncal, y_test)
y_pred_ensemble = ensemble.transform(y_pred_uncal)
print(f"Ensemble MCE: {mean_calibration_error(y_test, y_pred_ensemble):.4f}")
Adaptive Calibration Selection¶
from sklearn.model_selection import validation_curve
def select_best_calibrator(X, y, calibrators, cv=5):
"""Select best calibrator using cross-validation."""
best_score = float('inf')
best_calibrator = None
best_name = None
for name, calibrator in calibrators.items():
# Use validation curve to estimate performance
try:
# Create parameter grid (using lambda for Nearly Isotonic as example)
if hasattr(calibrator, 'lam'):
param_name = 'lam'
param_range = [0.1, 1.0, 10.0]
else:
param_name = None
param_range = [None]
scores = []
for _ in range(cv):
# Simple holdout validation
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=np.random.randint(1000)
)
calibrator.fit(X_train, y_train)
y_pred = calibrator.transform(X_val)
score = mean_calibration_error(y_val, y_pred)
scores.append(score)
avg_score = np.mean(scores)
print(f"{name}: MCE = {avg_score:.4f} ± {np.std(scores):.4f}")
if avg_score < best_score:
best_score = avg_score
best_calibrator = calibrator
best_name = name
except Exception as e:
print(f"{name}: Failed - {e}")
return best_calibrator, best_name, best_score
# Test different calibrators
calibrators = {
'Nearly Isotonic (strict)': NearlyIsotonicRegression(lam=10.0),
'Nearly Isotonic (moderate)': NearlyIsotonicRegression(lam=1.0),
'Nearly Isotonic (relaxed)': NearlyIsotonicRegression(lam=0.1),
'I-Spline': ISplineCalibrator(n_splines=10),
'Relaxed PAVA': RelaxedPAVA(percentile=10)
}
best_cal, best_name, best_score = select_best_calibrator(
y_pred_uncal, y_test, calibrators
)
print(f"\\nBest calibrator: {best_name} (MCE: {best_score:.4f})")
Temperature Scaling Integration¶
Combining with temperature scaling for neural networks:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
class TemperatureScaling(nn.Module):
"""Temperature scaling for neural network calibration."""
def __init__(self):
super().__init__()
self.temperature = nn.Parameter(torch.ones(1))
def forward(self, logits):
return logits / self.temperature
def temperature_scale_then_isotonic(logits, y_true, test_logits):
"""Apply temperature scaling followed by isotonic calibration."""
# Convert to torch tensors
logits_tensor = torch.FloatTensor(logits.reshape(-1, 1))
y_tensor = torch.LongTensor(y_true)
# Temperature scaling
temp_model = TemperatureScaling()
optimizer = optim.LBFGS([temp_model.temperature], lr=0.01, max_iter=50)
def eval_loss():
optimizer.zero_grad()
scaled_logits = temp_model(logits_tensor)
loss = nn.CrossEntropyLoss()(
torch.cat([1-torch.sigmoid(scaled_logits), torch.sigmoid(scaled_logits)], 1),
y_tensor
)
loss.backward()
return loss
optimizer.step(eval_loss)
# Apply temperature scaling to test data
test_logits_tensor = torch.FloatTensor(test_logits.reshape(-1, 1))
with torch.no_grad():
temp_scaled = torch.sigmoid(temp_model(test_logits_tensor)).numpy().ravel()
# Apply isotonic calibration on top of temperature scaling
calibrator = NearlyIsotonicRegression(lam=1.0)
# Fit on temperature-scaled training predictions
train_temp_scaled = torch.sigmoid(temp_model(logits_tensor)).detach().numpy().ravel()
calibrator.fit(train_temp_scaled, y_true)
# Final calibrated predictions
final_calibrated = calibrator.transform(temp_scaled)
return final_calibrated, temp_model.temperature.item()
# Example usage (with synthetic logits)
np.random.seed(42)
logits_train = np.random.normal(0, 2, 1000)
y_train_temp = (logits_train > 0).astype(int)
logits_test = np.random.normal(0, 2, 500)
y_final, optimal_temp = temperature_scale_then_isotonic(
logits_train, y_train_temp, logits_test
)
print(f"Optimal temperature: {optimal_temp:.3f}")
Handling Concept Drift¶
Adaptive calibration for changing data distributions:
from collections import deque
class AdaptiveCalibrator:
"""Calibrator that adapts to concept drift."""
def __init__(self, base_calibrator, window_size=1000, retrain_threshold=0.05):
self.base_calibrator = base_calibrator
self.window_size = window_size
self.retrain_threshold = retrain_threshold
self.prediction_buffer = deque(maxlen=window_size)
self.target_buffer = deque(maxlen=window_size)
self.calibration_error_history = deque(maxlen=100)
self.is_fitted = False
def update(self, y_pred, y_true):
"""Update with new prediction and true label."""
self.prediction_buffer.append(y_pred)
self.target_buffer.append(y_true)
# Calculate recent calibration error
if len(self.prediction_buffer) >= 50: # Minimum samples for evaluation
recent_error = mean_calibration_error(
list(self.target_buffer)[-50:],
list(self.prediction_buffer)[-50:]
)
self.calibration_error_history.append(recent_error)
# Check if retraining is needed
if len(self.calibration_error_history) >= 10:
recent_avg = np.mean(list(self.calibration_error_history)[-10:])
if len(self.calibration_error_history) >= 20:
older_avg = np.mean(list(self.calibration_error_history)[-20:-10])
if recent_avg > older_avg + self.retrain_threshold:
self._retrain()
print(f"Retrained calibrator: error increased from {older_avg:.4f} to {recent_avg:.4f}")
def _retrain(self):
"""Retrain calibrator on recent data."""
if len(self.prediction_buffer) >= 100:
X_recent = np.array(list(self.prediction_buffer))
y_recent = np.array(list(self.target_buffer))
# Create new calibrator instance
from copy import deepcopy
self.base_calibrator = deepcopy(self.base_calibrator)
self.base_calibrator.fit(X_recent, y_recent)
self.is_fitted = True
def fit(self, X, y):
"""Initial fit."""
self.base_calibrator.fit(X, y)
self.is_fitted = True
# Initialize buffers
for x, y_val in zip(X, y):
self.prediction_buffer.append(x)
self.target_buffer.append(y_val)
return self
def transform(self, X):
"""Transform predictions."""
if not self.is_fitted:
raise ValueError("Calibrator not fitted")
return self.base_calibrator.transform(X)
# Example usage
adaptive_cal = AdaptiveCalibrator(
NearlyIsotonicRegression(lam=1.0),
window_size=500,
retrain_threshold=0.02
)
# Initial fit
adaptive_cal.fit(y_pred_uncal[:500], y_test[:500])
# Simulate streaming predictions with concept drift
for i in range(500, len(y_pred_uncal), 10):
batch_pred = y_pred_uncal[i:i+10]
batch_true = y_test[i:i+10]
# Get calibrated predictions
batch_cal = adaptive_cal.transform(batch_pred)
# Update with true labels (in practice, these come later)
for pred, true in zip(batch_pred, batch_true):
adaptive_cal.update(pred, true)
Calibration for Specific Domains¶
Time Series Calibration¶
def time_series_calibration(y_pred, y_true, timestamps, window_days=30):
"""Time-aware calibration that uses recent data for fitting."""
from datetime import datetime, timedelta
# Convert timestamps to datetime if needed
if isinstance(timestamps[0], str):
timestamps = [datetime.fromisoformat(ts) for ts in timestamps]
calibrated_predictions = np.zeros_like(y_pred)
for i, current_time in enumerate(timestamps):
# Define time window
window_start = current_time - timedelta(days=window_days)
# Find data within time window before current prediction
mask = [(ts >= window_start) and (ts < current_time) for ts in timestamps]
if np.sum(mask) >= 50: # Minimum samples for calibration
# Fit calibrator on recent data
X_recent = y_pred[mask]
y_recent = y_true[mask]
calibrator = NearlyIsotonicRegression(lam=1.0)
calibrator.fit(X_recent, y_recent)
# Calibrate current prediction
calibrated_predictions[i] = calibrator.transform([y_pred[i]])[0]
else:
# Not enough recent data, use uncalibrated prediction
calibrated_predictions[i] = y_pred[i]
return calibrated_predictions
# Example with synthetic time series data
from datetime import datetime, timedelta
# Generate timestamps
start_date = datetime(2024, 1, 1)
timestamps = [start_date + timedelta(days=i) for i in range(len(y_pred_uncal))]
# Apply time-series calibration
y_pred_ts_cal = time_series_calibration(
y_pred_uncal, y_test, timestamps, window_days=30
)
print(f"Time-series calibrated MCE: {mean_calibration_error(y_test, y_pred_ts_cal):.4f}")
High-Stakes Decision Making¶
def conservative_calibration(y_pred, y_true, risk_tolerance=0.05):
"""Conservative calibration that errs on the side of caution."""
# Use stricter calibration for high-stakes scenarios
calibrator = NearlyIsotonicRegression(lam=50.0, method='cvx') # Very strict
calibrator.fit(y_pred, y_true)
y_cal = calibrator.transform(y_pred)
# Apply additional conservative adjustment
# Push probabilities away from decision boundaries
decision_threshold = 0.5
adjustment_strength = risk_tolerance
conservative_cal = y_cal.copy()
# Make predictions more conservative (further from 0.5)
above_threshold = y_cal > decision_threshold
below_threshold = y_cal <= decision_threshold
conservative_cal[above_threshold] = np.minimum(
1.0,
y_cal[above_threshold] + adjustment_strength
)
conservative_cal[below_threshold] = np.maximum(
0.0,
y_cal[below_threshold] - adjustment_strength
)
return conservative_cal
# Apply conservative calibration
y_pred_conservative = conservative_calibration(y_pred_uncal, y_test)
print(f"Conservative calibration MCE: {mean_calibration_error(y_test, y_pred_conservative):.4f}")
print(f"Mean prediction shift: {np.mean(np.abs(y_pred_conservative - y_pred_uncal)):.4f}")
Performance Optimization¶
Efficient Batch Processing¶
def batch_calibration(model, calibrator, X_large, batch_size=10000):
"""Efficiently calibrate predictions for large datasets."""
n_samples = len(X_large)
n_batches = (n_samples + batch_size - 1) // batch_size
calibrated_predictions = []
for i in range(n_batches):
start_idx = i * batch_size
end_idx = min((i + 1) * batch_size, n_samples)
# Get batch predictions
X_batch = X_large[start_idx:end_idx]
y_pred_batch = model.predict_proba(X_batch)[:, 1]
# Calibrate batch
y_cal_batch = calibrator.transform(y_pred_batch)
calibrated_predictions.append(y_cal_batch)
if (i + 1) % 10 == 0:
print(f"Processed {i + 1}/{n_batches} batches")
return np.concatenate(calibrated_predictions)
# Example with large synthetic dataset
np.random.seed(42)
X_large = np.random.randn(50000, 20)
# Assuming model and calibrator are already fitted
y_pred_large_cal = batch_calibration(model, calibrator, X_large)