design pattern 2025-04-29 12 min read

A/B Testing ML Models in Production: Shadow Mode, Canary Releases, and Interleaving

Learn how to safely roll out new ML models using shadow mode testing, canary releases, and interleaving experiments—with concrete implementation patterns and statistical guidance.

A/B testing canary deployment shadow mode model rollout MLOps experimentation

The ML Model Rollout Problem

Shipping a new model version is not like shipping a bug fix. A bug fix is either correct or incorrect. A new model version might improve average-case performance while degrading performance for a specific user segment. It might improve click-through rate while decreasing session length. The interactions are complex.

Production ML systems use a layered approach to model rollouts:

  1. Shadow mode: Run the new model, don't use its predictions
  2. Canary release: Route a small percentage of traffic to the new model
  3. Interleaving (ranking systems): Mix results from both models in a single response
  4. Full rollout: Cut over all traffic

Pattern 1: Shadow Mode (Dark Launch)

The new model receives all requests and generates predictions, but the old model's predictions are served to users. Shadow predictions are logged for offline comparison.

Architecture

Request → Feature Extraction → Old Model → Response to user
                           ↓
                       New Model → Predictions logged (NOT served)

When to use

  • Before any live traffic experiment
  • To validate that the new model can handle production traffic patterns
  • To check for prediction distribution drift and anomalies

Implementation

import asyncio
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class ShadowPrediction:
    request_id: str
    primary_prediction: float
    shadow_prediction: float
    features: dict

class ShadowModeServer:
    def __init__(self, primary_model, shadow_model, logger_client):
        self.primary = primary_model
        self.shadow = shadow_model
        self.logger = logger_client

    async def predict(self, request_id: str, features: dict) -> float:
        # Run both models concurrently
        primary_task = asyncio.create_task(
            asyncio.to_thread(self.primary.predict, features)
        )
        shadow_task = asyncio.create_task(
            asyncio.to_thread(self.shadow.predict, features)
        )

        # Return primary prediction immediately when ready
        primary_pred = await primary_task

        # Log shadow prediction asynchronously (fire-and-forget)
        async def log_shadow():
            try:
                shadow_pred = await shadow_task
                self.logger.log(ShadowPrediction(
                    request_id=request_id,
                    primary_prediction=primary_pred,
                    shadow_prediction=shadow_pred,
                    features=features,
                ))
            except Exception as e:
                logger.error(f"Shadow model failed: {e}")
                # Shadow failures MUST NOT affect primary path

        asyncio.create_task(log_shadow())
        return primary_pred  # user gets this

Shadow mode analysis

After 24–48 hours, analyze shadow predictions:

import pandas as pd
from scipy import stats

shadow_logs = pd.read_parquet("s3://shadow-logs/date=2025-04-29/")

# Distribution comparison
ks_stat, p_value = stats.ks_2samp(
    shadow_logs["primary_prediction"],
    shadow_logs["shadow_prediction"],
)
print(f"KS statistic: {ks_stat:.4f}, p-value: {p_value:.4f}")
# p < 0.05 → distributions differ significantly → investigate before proceeding

# Agreement rate (useful for classification)
agreement = (
    (shadow_logs["primary_prediction"] > 0.5) ==
    (shadow_logs["shadow_prediction"] > 0.5)
).mean()
print(f"Decision agreement rate: {agreement:.2%}")

# Segment analysis — find where models disagree
shadow_logs["disagreement"] = abs(
    shadow_logs["primary_prediction"] - shadow_logs["shadow_prediction"]
)
print(shadow_logs.groupby("country")["disagreement"].mean().sort_values(ascending=False).head(10))

Pattern 2: Canary Release

Route a controlled percentage of traffic (typically 1% → 5% → 20% → 50% → 100%) to the new model while monitoring metrics.

Traffic splitting implementation

import hashlib

def get_model_variant(user_id: str, canary_fraction: float = 0.05) -> str:
    """
    Deterministic assignment: same user always gets same variant.
    Uses modulo hashing so assignments are stable across restarts.
    """
    # Add experiment name to hash for independent experiments
    hash_input = f"model-rollout-v2:{user_id}".encode()
    bucket = int(hashlib.md5(hash_input).hexdigest(), 16) % 10000
    threshold = int(canary_fraction * 10000)

    return "v2" if bucket < threshold else "v1"

class CanaryModelServer:
    def __init__(self, model_v1, model_v2, canary_fraction: float = 0.05):
        self.models = {"v1": model_v1, "v2": model_v2}
        self.canary_fraction = canary_fraction

    def predict(self, user_id: str, features: dict) -> dict:
        variant = get_model_variant(user_id, self.canary_fraction)
        prediction = self.models[variant].predict(features)

        return {
            "prediction": prediction,
            "model_variant": variant,  # log this for analysis
        }

Canary monitoring dashboard

Track these metrics by variant in real-time:

from dataclasses import dataclass, field
from collections import defaultdict
import numpy as np

@dataclass
class VariantMetrics:
    predictions: list = field(default_factory=list)
    latencies_ms: list = field(default_factory=list)
    errors: int = 0
    requests: int = 0

    @property
    def error_rate(self) -> float:
        return self.errors / max(self.requests, 1)

    @property
    def p99_latency(self) -> float:
        return np.percentile(self.latencies_ms, 99) if self.latencies_ms else 0

    @property
    def mean_prediction(self) -> float:
        return np.mean(self.predictions) if self.predictions else 0

metrics = defaultdict(VariantMetrics)

def record_prediction(variant: str, prediction: float, latency_ms: float):
    m = metrics[variant]
    m.predictions.append(prediction)
    m.latencies_ms.append(latency_ms)
    m.requests += 1

def check_canary_health() -> dict:
    v1, v2 = metrics["v1"], metrics["v2"]

    # Auto-rollback triggers
    alerts = []
    if v2.error_rate > v1.error_rate * 2 and v2.error_rate > 0.01:
        alerts.append("ERROR_RATE_REGRESSION")
    if v2.p99_latency > v1.p99_latency * 1.5:
        alerts.append("LATENCY_REGRESSION")

    # Prediction distribution shift
    if len(v1.predictions) > 1000 and len(v2.predictions) > 1000:
        from scipy import stats
        _, p = stats.ks_2samp(v1.predictions[-1000:], v2.predictions[-1000:])
        if p < 0.001:
            alerts.append("PREDICTION_DISTRIBUTION_SHIFT")

    return {"alerts": alerts, "v1": v1, "v2": v2}

Statistical significance check before ramp-up

from scipy import stats

def is_experiment_significant(
    v1_metric: list[float],
    v2_metric: list[float],
    min_samples: int = 1000,
    alpha: float = 0.05,
) -> dict:
    if len(v1_metric) < min_samples or len(v2_metric) < min_samples:
        return {"significant": False, "reason": "insufficient_samples"}

    t_stat, p_value = stats.ttest_ind(v1_metric, v2_metric)
    lift = (np.mean(v2_metric) - np.mean(v1_metric)) / np.mean(v1_metric)

    return {
        "significant": p_value < alpha,
        "p_value": p_value,
        "lift": lift,
        "direction": "positive" if lift > 0 else "negative",
        "recommendation": "rollout" if (p_value < alpha and lift > 0) else "rollback" if (p_value < alpha and lift < 0) else "continue_experiment",
    }

Pattern 3: Interleaving (for Ranking Systems)

Interleaving is the gold standard for comparing ranking models because it eliminates position bias and requires 10–100× fewer samples than A/B testing to reach significance.

How it works

Instead of showing user A results from model 1 and user B results from model 2, you show every user a blended list that mixes results from both models. Attribution is based on which model contributed the clicked/purchased item.

def team_draft_interleave(list_a: list, list_b: list, size: int) -> tuple[list, dict[int, str]]:
    """
    Team Draft Interleaving: the dominant method in industry.
    Returns (interleaved_list, item_to_team_mapping).
    """
    result = []
    attribution = {}  # item_id → "A" or "B"

    a_set = set(list_a)
    b_set = set(list_b)
    a_idx, b_idx = 0, 0
    team_a_picks, team_b_picks = 0, 0

    while len(result) < size and (a_idx < len(list_a) or b_idx < len(list_b)):
        # Team with fewer picks gets to pick next (coin flip on tie)
        import random
        if team_a_picks < team_b_picks or (team_a_picks == team_b_picks and random.random() < 0.5):
            # Team A picks
            while a_idx < len(list_a) and list_a[a_idx] in set(result):
                a_idx += 1
            if a_idx < len(list_a):
                item = list_a[a_idx]
                result.append(item)
                attribution[item] = "A"
                team_a_picks += 1
                a_idx += 1
        else:
            # Team B picks
            while b_idx < len(list_b) and list_b[b_idx] in set(result):
                b_idx += 1
            if b_idx < len(list_b):
                item = list_b[b_idx]
                result.append(item)
                attribution[item] = "B"
                team_b_picks += 1
                b_idx += 1

    return result, attribution

# Score after collecting user interactions
def score_interleaving_experiment(interaction_log: list[dict]) -> dict:
    wins_a, wins_b = 0, 0

    for session in interaction_log:
        attribution = session["attribution"]
        clicks = session["clicked_items"]

        a_clicks = sum(1 for item in clicks if attribution.get(item) == "A")
        b_clicks = sum(1 for item in clicks if attribution.get(item) == "B")

        if a_clicks > b_clicks:
            wins_a += 1
        elif b_clicks > a_clicks:
            wins_b += 1

    total = wins_a + wins_b
    return {
        "wins_A": wins_a,
        "wins_B": wins_b,
        "win_rate_B": wins_b / total if total > 0 else 0,
        "total_decisive_sessions": total,
    }

Rollout Decision Checklist

Before moving from one stage to the next:

Shadow → Canary (1%)

  • Shadow ran for ≥ 24 hours of production traffic
  • Prediction distribution matches expected range
  • No crashes or timeout spikes in shadow logs
  • Decision agreement rate > 85% (unless expecting major changes)

Canary 1% → 5%

  • Error rate and p99 latency parity with v1
  • No prediction distribution anomalies
  • At least 1,000 samples per variant

Canary 5% → 20% → 50% → 100%

  • Primary metric shows positive or neutral lift (statistically significant or neutral)
  • No segment regressions in top 3 user segments
  • On-call engineer aware and monitoring

Auto-rollback triggers (should be automated)

  • Error rate > 2× baseline
  • p99 latency > 1.5× baseline
  • Prediction mean deviation > 20% from baseline

Design the full experimentation and deployment pipeline with our Production ML Anti-Patterns guide.

Want to Go Deeper?

This article is part of our comprehensive curriculum on building ML systems at scale. Explore our full courses for hands-on learning.