Source code for rank_preserving_calibration.nearly

# rank_preserving_calibration/nearly.py
"""
Nearly isotonic regression utilities.

This module provides "relaxed" isotonic constraints that allow small
violations of monotonicity—useful when strict isotonicity is too restrictive.

Exports:
    - project_near_isotonic_euclidean: ε-slack projection (exact L2), optional sum target.
    - prox_near_isotonic: exact proximal operator for λ * sum (z_i - z_{i+1})_+.
    - prox_near_isotonic_with_sum: same prox with an exact sum constraint via translation.
"""

from __future__ import annotations

import numpy as np

__all__ = [
    "project_near_isotonic_euclidean",
    "prox_near_isotonic",
    "prox_near_isotonic_with_sum",
]

# ---------------------------------------------------------------------
# Weighted isotonic regression (PAV) in O(n)
# ---------------------------------------------------------------------


def _pav_increasing(
    y: np.ndarray, w: np.ndarray | None = None, rtol: float = 0.0
) -> np.ndarray:
    """
    Pool-Adjacent-Violators (L2) for a 1D sequence (nondecreasing), supporting weights.

    Parameters
    ----------
    y : (n,) array_like
        Sequence to fit monotonically (already in the order you care about).
    w : (n,) array_like, optional
        Positive weights. If None, all ones.
    rtol : float, optional (default 0.0)
        Tolerance used in merge decision: we treat blocks as monotone if
        left_mean <= right_mean + rtol * (|left_mean| + |right_mean| + 1).

    Returns
    -------
    z : (n,) ndarray
        Isotonic fit minimizing sum_i w_i * (z_i - y_i)^2 subject to z nondecreasing.

    Notes
    -----
    - Strict by default (rtol=0.0) to avoid micro-violations in tests.
    - Idempotent: applying to an already isotone sequence returns it unchanged.
    """
    y = np.asarray(y, dtype=np.float64)
    n = y.size
    if n <= 1:
        return y.copy()

    if w is None:
        w = np.ones(n, dtype=np.float64)
    else:
        w = np.asarray(w, dtype=np.float64)
        if w.shape != y.shape:
            raise ValueError("weights must have the same shape as y")
        if np.any(w <= 0):
            raise ValueError("weights must be strictly positive")

    # Block stacks: start index (in original index space), weighted mean, weight sum.
    start = np.empty(n, dtype=np.int64)
    mean = np.empty(n, dtype=np.float64)
    wsum = np.empty(n, dtype=np.float64)
    top = -1

    def _tol(a: float, b: float) -> float:
        return rtol * (abs(a) + abs(b) + 1.0)

    for i in range(n):
        top += 1
        start[top] = i
        mean[top] = y[i]
        wsum[top] = w[i]

        # Merge backward while violating monotonicity beyond tolerance
        while top > 0:
            left, right = mean[top - 1], mean[top]
            if left <= right + _tol(left, right):
                break
            new_w = wsum[top - 1] + wsum[top]
            mean[top - 1] = (wsum[top - 1] * left + wsum[top] * right) / new_w
            wsum[top - 1] = new_w
            top -= 1

    # Expand the piecewise-constant block means back to length n
    z = np.empty(n, dtype=np.float64)
    for j in range(top + 1):
        s = start[j]
        e = start[j + 1] if j < top else n
        z[s:e] = mean[j]
    return z


# ---------------------------------------------------------------------
# (A) Hard-slack (ε) nearly isotonic projection
# ---------------------------------------------------------------------


[docs] def project_near_isotonic_euclidean( v: np.ndarray, eps: float, sum_target: float | None = None, weights: np.ndarray | None = None, ) -> np.ndarray: """ Project v onto the set { z : z_{i+1} >= z_i - eps } in L2. Reduction: define w_i = v_i + i * eps. Then the constraint becomes standard isotonic on w. Let w* = isotonic(w) (weighted if weights are provided). The exact projection is z* = w* - i * eps. If sum_target is given, apply a uniform shift so 1^T z* = sum_target (translation invariance makes this exact). Parameters ---------- v : (n,) array_like Input vector to project. eps : float Slack parameter (>= 0). Allows z[i+1] >= z[i] - eps instead of strict z[i+1] >= z[i]. sum_target : float, optional If provided, perform an exact uniform shift so the result sums to this value. weights : (n,) array_like, optional Positive weights for a weighted L2 projection. Returns ------- z : (n,) ndarray Projected vector satisfying near-isotonic constraint (and sum, if requested). Notes ----- - Idempotent: applying the projection twice gives the same result. """ v = np.asarray(v, dtype=np.float64) n = v.size if n <= 1: z = v.copy() if sum_target is not None: z += (float(sum_target) - float(z.sum())) / max(n, 1) return z if eps < 0: raise ValueError("eps must be nonnegative.") ar = np.arange(n, dtype=np.float64) w = v + eps * ar iw = _pav_increasing(w, weights, rtol=0.0) z = iw - eps * ar if sum_target is not None: z += (float(sum_target) - float(z.sum())) / n return z
# --------------------------------------------------------------------- # (B) Penalized (λ) nearly isotonic proximal operator # --------------------------------------------------------------------- def _diff(z: np.ndarray) -> np.ndarray: """Forward differences Dz with (Dz)_i = z_i - z_{i+1}.""" return z[:-1] - z[1:] def _diffT(p: np.ndarray, n: int) -> np.ndarray: """ Adjoint of the forward-difference: D^T p. For D with rows [0..0,1,-1,0..0], we have: (D^T p)_0 = p_0 (D^T p)_i = p_i - p_{i-1}, i = 1..n-2 (D^T p)_{n-1} = -p_{n-1} """ out = np.empty(n, dtype=p.dtype) out[0] = p[0] out[1:-1] = p[1:] - p[:-1] out[-1] = -p[-1] return out def _solve_tridiag( low: np.ndarray, d: np.ndarray, u: np.ndarray, b: np.ndarray ) -> np.ndarray: """ Solve a tridiagonal system with the Thomas algorithm: low: sub-diagonal (length n-1) d: main diagonal (length n) u: super-diagonal (length n-1) b: RHS (length n) Returns x solving T x = b where T has (low, d, u). """ n = d.size # Working copies c = u.copy() dd = d.copy() bb = b.copy() # Forward elimination for i in range(1, n): w = low[i - 1] / dd[i - 1] dd[i] -= w * c[i - 1] bb[i] -= w * bb[i - 1] # Back substitution x = np.empty_like(bb) x[-1] = bb[-1] / dd[-1] for i in range(n - 2, -1, -1): x[i] = (bb[i] - c[i] * x[i + 1]) / dd[i] return x
[docs] def prox_near_isotonic( y: np.ndarray, lam: float, *, rho: float = 1.0, max_iters: int = 2000, abstol: float = 1e-8, reltol: float = 1e-6, return_info: bool = False, ) -> np.ndarray | tuple[np.ndarray, dict]: r""" Exact proximal operator for the λ-penalty nearly-isotonic term: prox_{λ R}(y), R(z) = ∑_{i=1}^{n-1} (z_i - z_{i+1})_+, where (x)_+ = max(0, x). This penalizes *downward* steps, relaxing strict isotonicity. We solve: minimize_z 0.5 * ||z - y||_2^2 + λ * ∑ (Dz)_+ with (Dz)_i = z_i - z_{i+1} by ADMM on the split Dz = r: z-update: (I + ρ D^T D) z = y + ρ D^T (r - u) [SPD tridiagonal solve] r-update: r = prox_{(λ/ρ)‖·‖_{+,1}}(Dz + u) [one-sided soft-threshold] u-update: u ← u + Dz - r One-sided soft-threshold (elementwise): prox_{t(·)_+}(v) = { v - t, if v > t 0, if 0 ≤ v ≤ t v, if v < 0 } """ y = np.asarray(y, dtype=np.float64) n = y.size if n <= 1: return ( (y.copy(), {"iterations": 0, "pr_res": 0.0, "du_res": 0.0}) if return_info else y.copy() ) if lam < 0: raise ValueError("lam must be nonnegative.") if lam == 0.0: return ( (y.copy(), {"iterations": 0, "pr_res": 0.0, "du_res": 0.0}) if return_info else y.copy() ) # Precompute tridiagonal system for z-update: A = I + ρ D^T D (SPD). main = np.empty(n, dtype=np.float64) main[0] = 1.0 + rho main[1:-1] = 1.0 + 2.0 * rho main[-1] = 1.0 + rho off = -rho * np.ones(n - 1, dtype=np.float64) # Initialize z = y.copy() r = _diff(z) u = np.zeros(n - 1, dtype=np.float64) pr_res = du_res = np.nan for k in range(1, max_iters + 1): # z-update rhs = y + rho * _diffT(r - u, n) z = _solve_tridiag(off, main, off, rhs) # r-update: one-sided soft-threshold at t = λ/ρ v = _diff(z) + u t = lam / rho r_prev = r r = v.copy() mask_pos = v > t mask_mid = (v >= 0.0) & (~mask_pos) r[mask_pos] = v[mask_pos] - t r[mask_mid] = 0.0 # v < 0 unchanged (no penalty) # u-update Dz = _diff(z) u += Dz - r # Stopping criteria (Boyd et al.) pr = Dz - r dr = rho * _diffT(r - r_prev, n) pr_res = np.linalg.norm(pr) du_res = np.linalg.norm(dr) m = n - 1 eps_pr = np.sqrt(m) * abstol + reltol * max( np.linalg.norm(Dz), np.linalg.norm(r) ) eps_du = np.sqrt(n) * abstol + reltol * np.linalg.norm(rho * _diffT(u, n)) if pr_res <= eps_pr and du_res <= eps_du: if return_info: return z, { "iterations": k, "pr_res": float(pr_res), "du_res": float(du_res), } return z # Hit max_iters if return_info: return z, { "iterations": max_iters, "pr_res": float(pr_res), "du_res": float(du_res), } return z
[docs] def prox_near_isotonic_with_sum( y: np.ndarray, lam: float, sum_target: float, *, return_info: bool = False, **kwargs, ) -> np.ndarray | tuple[np.ndarray, dict]: """ Prox with a **sum constraint**: minimize_z 0.5 * ||z - y||_2^2 + λ ∑ (z_i - z_{i+1})_+ subject to 1^T z = sum_target. Because R(z) depends only on *differences*, it is translation-invariant: R(z + c·1) = R(z). Hence the constrained prox is obtained exactly by: z* = prox_{λR}(y) + ((sum_target - 1^T prox_{λR}(y)) / n) · 1 Returns ------- If return_info = False (default): z* (ndarray) If return_info = True: (z*, info_dict) """ ret = prox_near_isotonic(y, lam, return_info=return_info, **kwargs) if return_info: z, info = ret if isinstance(ret, tuple) else (ret, {}) c = (float(sum_target) - float(z.sum())) / z.size z_shifted = z + c return z_shifted, info else: z = ret if isinstance(ret, np.ndarray) else ret[0] c = (float(sum_target) - float(z.sum())) / z.size return z + c