The 80% Problem
The popular claim that "data scientists spend 80% of their time on data preparation" is often cited. Whether or not it's precisely true, the implication is real: data quality determines model quality, and most real-world data requires substantial preparation.
This guide covers the most important preprocessing patterns with production-ready implementations.
Handling Missing Values
Understanding Missingness Types
Before imputing, understand why data is missing:
- MCAR (Missing Completely at Random): Missingness is random — no relationship to observed or unobserved data. Safe to impute.
- MAR (Missing at Random): Missingness depends on observed data. Imputation based on observed features is valid.
- MNAR (Missing Not at Random): Missingness relates to the missing value itself. Example: people with high income skip income field. Imputation can introduce bias.
import pandas as pd
import numpy as np
def analyze_missingness(df: pd.DataFrame) -> pd.DataFrame:
missing_stats = pd.DataFrame({
"count": df.isnull().sum(),
"percent": (df.isnull().sum() / len(df) * 100).round(2),
})
return missing_stats[missing_stats["count"] > 0].sort_values("percent", ascending=False)
print(analyze_missingness(df))
# count percent
# income 1250 12.50
# last_login_days 89 0.89
# acquisition_channel 12 0.12
Imputation Strategies
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer # noqa
from sklearn.impute import IterativeImputer
# Simple imputation (fast, works for most cases)
numerical_imputer = SimpleImputer(strategy="median") # Robust to outliers
categorical_imputer = SimpleImputer(strategy="most_frequent")
# KNN imputation (better for features with complex relationships)
knn_imputer = KNNImputer(n_neighbors=5)
# Iterative imputation (best quality, slowest)
# Each feature imputed as a function of all others
iterative_imputer = IterativeImputer(max_iter=10, random_state=42)
The Missing Indicator Pattern
For MNAR data, add a binary indicator for missingness:
from sklearn.impute import MissingIndicator
from sklearn.pipeline import FeatureUnion
# Income is often MNAR — missing correlates with value
# Create both imputed value AND indicator
income_pipeline = FeatureUnion([
("imputed", SimpleImputer(strategy="median")),
("indicator", MissingIndicator()),
])
# Now the model sees both: imputed_income and income_was_missing
Outlier Handling
Detection
def detect_outliers(series: pd.Series, method: str = "iqr") -> pd.Series:
if method == "iqr":
Q1, Q3 = series.quantile([0.25, 0.75])
IQR = Q3 - Q1
lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
return (series < lower) | (series > upper)
elif method == "zscore":
z_scores = (series - series.mean()) / series.std()
return z_scores.abs() > 3
elif method == "isolation_forest":
from sklearn.ensemble import IsolationForest
iso = IsolationForest(contamination=0.05)
return iso.fit_predict(series.values.reshape(-1, 1)) == -1
Treatment
def winsorize(series: pd.Series, lower_q: float = 0.01, upper_q: float = 0.99) -> pd.Series:
"""Cap outliers at percentile bounds instead of removing them."""
lower = series.quantile(lower_q)
upper = series.quantile(upper_q)
return series.clip(lower=lower, upper=upper)
# Apply to training data
train_bounds = {}
for col in numerical_cols:
train_bounds[col] = {
"lower": X_train[col].quantile(0.01),
"upper": X_train[col].quantile(0.99),
}
X_train[col] = X_train[col].clip(**train_bounds[col])
# Apply SAME bounds to test data (computed from training data)
for col in numerical_cols:
X_test[col] = X_test[col].clip(**train_bounds[col])
The key pattern: compute bounds from training data, apply to test. Never recompute on test data.
Categorical Encoding at Scale
Low Cardinality: One-Hot Encoding
from sklearn.preprocessing import OneHotEncoder
encoder = OneHotEncoder(
handle_unknown="ignore", # Unknown categories at test time → all zeros
sparse_output=False,
drop="first", # Avoid multicollinearity for linear models
)
# Fit on training data only
encoder.fit(X_train[cat_cols])
High Cardinality: Target Encoding with Cross-fitting
from sklearn.model_selection import KFold
import numpy as np
def target_encode_cv(X_train, y_train, col, n_splits=5, smoothing=10):
"""
Target encoding with cross-fitting to prevent leakage.
Each sample's encoding uses only data from other folds.
"""
global_mean = y_train.mean()
encodings = np.zeros(len(X_train))
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
for train_idx, val_idx in kf.split(X_train):
# Compute mean target per category on the train fold
fold_stats = (
pd.DataFrame({"cat": X_train.iloc[train_idx][col], "target": y_train.iloc[train_idx]})
.groupby("cat")["target"]
.agg(["mean", "count"])
)
# Smoothing: blend category mean with global mean based on sample count
fold_stats["smoothed"] = (
(fold_stats["mean"] * fold_stats["count"] + global_mean * smoothing)
/ (fold_stats["count"] + smoothing)
)
# Apply to validation fold
encodings[val_idx] = X_train.iloc[val_idx][col].map(
fold_stats["smoothed"]
).fillna(global_mean)
return encodings
# For test data: use encoding trained on all training data
def fit_target_encoder(X_train, y_train, col, smoothing=10):
global_mean = y_train.mean()
stats = (
pd.DataFrame({"cat": X_train[col], "target": y_train})
.groupby("cat")["target"]
.agg(["mean", "count"])
)
stats["smoothed"] = (
(stats["mean"] * stats["count"] + global_mean * smoothing)
/ (stats["count"] + smoothing)
)
return stats["smoothed"].to_dict(), global_mean
Data Splitting Patterns
Standard Split (Tabular, IID Data)
from sklearn.model_selection import train_test_split
# 60/20/20 split — test set touched only once for final evaluation
X_temp, X_test, y_temp, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
X_train, X_val, y_train, y_val = train_test_split(
X_temp, y_temp, test_size=0.25, random_state=42, stratify=y_temp
)
# 0.25 * 0.8 = 0.2 → 60/20/20
Time Series Split
# Temporal split: no shuffling, gap between train and test
def temporal_split(df, date_col, val_months=2, test_months=2, gap_days=7):
max_date = df[date_col].max()
test_start = max_date - pd.DateOffset(months=test_months)
val_start = test_start - pd.DateOffset(months=val_months) - pd.Timedelta(days=gap_days)
train_end = val_start - pd.Timedelta(days=gap_days)
train = df[df[date_col] <= train_end]
val = df[(df[date_col] >= val_start) & (df[date_col] < test_start)]
test = df[df[date_col] >= test_start]
return train, val, test
Group-Based Split
For data where rows aren't independent (multiple rows per user, per document):
from sklearn.model_selection import GroupShuffleSplit
# Ensure all rows for a given user_id are in the same split
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, test_idx = next(gss.split(X, y, groups=df["user_id"]))
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
# Verify no user_id overlap
assert len(
set(df.iloc[train_idx]["user_id"]) & set(df.iloc[test_idx]["user_id"])
) == 0
The Pipeline Pattern (Putting It Together)
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
# Numerical: impute → winsorize → scale
numerical_pipeline = Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler()),
])
# Categorical: impute → encode
categorical_pipeline = Pipeline([
("impute", SimpleImputer(strategy="constant", fill_value="MISSING")),
("encode", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
])
preprocessor = ColumnTransformer([
("num", numerical_pipeline, numerical_cols),
("cat", categorical_pipeline, categorical_cols),
])
# Full pipeline
pipeline = Pipeline([
("preprocess", preprocessor),
("model", XGBClassifier(n_estimators=200, learning_rate=0.05)),
])
pipeline.fit(X_train, y_train)
# Save — preprocessing params included
import joblib
joblib.dump(pipeline, "model_pipeline.joblib")
Next: learn how features flow from preprocessing through feature stores to model serving in our feature store guide.