17  Data pipelines: ETL and feature stores

17.1 The glue code problem

A common misconception about machine learning systems is that they’re mostly model code — the elegant lines where you fit a random forest or tune a gradient boosting classifier. In practice, the model is a small box in the centre of a much larger system. The rest is data pipelines: extracting raw data from sources, transforming it into features the model can consume, loading it into the right place at the right time, and doing all of this reliably, repeatedly, and at scale.

Google’s influential paper on technical debt in ML systems called this “glue code” — the infrastructure that connects data sources to models and models to decisions. In their experience, the model itself accounted for roughly 5% of the total code in a production ML system. The other 95% was data collection, feature extraction, validation, serving infrastructure, and monitoring.

If you’ve built microservices, this ratio will feel familiar. The business logic in a typical service is a thin layer; the rest is routing, serialisation, error handling, retries, logging, and health checks. Data pipelines are the same pattern applied to data: the statistical transformation is the thin layer, surrounded by engineering that makes it reliable.

This chapter covers the engineering patterns that make data pipelines robust. If Section 16.1 was about ensuring your results are reproducible, this chapter is about ensuring the data that feeds those results is correct, consistent, and available when you need it.

17.2 ETL: a pattern you already know

ETL stands for Extract, Transform, Load — a three-stage pattern for moving data from source systems into a form suitable for analysis or modelling.

Extract reads raw data from its source: a database, an API, a CSV dump, a message queue, or an event stream. The key engineering concern is that sources are unreliable. APIs have rate limits. Database queries can time out. CSV files arrive late, malformed, or not at all. Every extraction step needs error handling, retries, and validation — the same defensive programming you’d apply to any external dependency.

Transform converts raw data into a useful shape. This might mean joining tables, filtering rows, computing derived columns, handling missing values, encoding categorical variables, or normalising numerical features. Transforms are where most bugs live, because they encode domain assumptions. A transform that fills missing revenue with zero is making a very different assumption from one that fills it with the column median — and both might be wrong.

Load writes the transformed data to its destination: a data warehouse, a feature store, a parquet file, or a model’s input buffer. The key engineering concern is atomicity — if a load fails partway through, the destination should not be left in an inconsistent state. This is the same problem as database transactions, and the solutions are similar: write to a staging location first, validate, then swap.

import pandas as pd
import numpy as np

rng = np.random.default_rng(42)

# ---- Extract: simulate reading from a messy source ----
raw = pd.DataFrame({
    "user_id": range(1, 1001),
    "signup_date": pd.date_range("2023-01-01", periods=1000, freq="8h"),
    "revenue": rng.exponential(50, 1000),
    "country": rng.choice(["GB", "US", "DE", "FR", None], 1000,
                          p=[0.3, 0.3, 0.15, 0.15, 0.1]),
    "sessions": rng.poisson(12, 1000),
})

# Introduce realistic messiness
raw.loc[rng.choice(1000, 30, replace=False), "revenue"] = np.nan
raw.loc[rng.choice(1000, 15, replace=False), "sessions"] = -1  # bad data

print(f"Raw: {raw.shape[0]} rows, {raw.isna().sum().sum()} nulls, "
      f"{(raw['sessions'] < 0).sum()} invalid sessions")
Raw: 1000 rows, 137 nulls, 15 invalid sessions

The raw data have the problems you’d expect from a real source: missing values, invalid entries, and mixed types. The transform step is where we impose structure.

# ---- Transform: clean, validate, and derive features ----
def transform_user_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean raw user data and compute features for modelling."""
    clean = df.copy()

    # Handle invalid sessions: replace negatives with NaN, then fill
    clean.loc[clean["sessions"] < 0, "sessions"] = np.nan

    # Fill missing revenue with median — assumes data are missing completely
    # at random (MCAR). If missingness is related to the value itself
    # (e.g. high earners less likely to report), this biases the result.
    clean["revenue"] = clean["revenue"].fillna(clean["revenue"].median())

    # Fill missing sessions with median
    clean["sessions"] = clean["sessions"].fillna(clean["sessions"].median())

    # Fill missing country with "Unknown" rather than dropping rows
    clean["country"] = clean["country"].fillna("Unknown")

    # Derive features
    clean["revenue_per_session"] = clean["revenue"] / clean["sessions"].clip(lower=1)
    clean["days_since_signup"] = (
        pd.Timestamp("2024-01-01") - clean["signup_date"]
    ).dt.days
    clean["is_high_value"] = (clean["revenue"] > clean["revenue"].quantile(0.75)).astype(int)

    return clean

features = transform_user_data(raw)
print(f"Transformed: {features.shape[0]} rows, {features.isna().sum().sum()} nulls")
print(f"New columns: {[c for c in features.columns if c not in raw.columns]}")
Transformed: 1000 rows, 0 nulls
New columns: ['revenue_per_session', 'days_since_signup', 'is_high_value']
# ---- Load: write to a versioned output with validation ----
import hashlib
import io

def load_with_validation(df: pd.DataFrame, path: str) -> dict:
    """Write a DataFrame to parquet with a content hash for verification."""
    buf = io.BytesIO()
    df.to_parquet(buf, index=False)
    content_hash = hashlib.sha256(buf.getvalue()).hexdigest()[:16]

    # In production, write to path; here we just report
    record = {
        "rows": len(df),
        "columns": list(df.columns),
        "content_hash": content_hash,
        "null_count": int(df.isna().sum().sum()),
    }
    return record

manifest = load_with_validation(features, "data/features/v=2024-01-01/users.parquet")
print(f"Load manifest: {manifest}")
Load manifest: {'rows': 1000, 'columns': ['user_id', 'signup_date', 'revenue', 'country', 'sessions', 'revenue_per_session', 'days_since_signup', 'is_high_value'], 'content_hash': '990ea38c175be4b2', 'null_count': 0}

The content hash from the load step connects directly to the data reproducibility patterns in Section 16.4 — it’s a fingerprint that lets you verify that the feature data hasn’t changed between when it was written and when a model reads it.

Engineering Bridge

If you’ve built microservices, ETL maps cleanly onto a familiar pattern. Extract is calling an upstream service or reading from a database — you’d wrap it in retries with exponential backoff, validate the response schema, and handle timeouts. Transform is your business logic layer — pure functions that convert one data shape into another, testable in isolation. Load is writing to your own data store — you’d use transactions or write-ahead patterns to ensure consistency. The main difference is scale: a microservice processes one request at a time; an ETL pipeline processes an entire dataset in a single batch. But the engineering principles — idempotency, validation, error handling, atomicity — are identical.

17.3 Feature engineering: the transform layer

In the ETL pattern, the transform step is where data science diverges most from traditional data engineering. A data engineer’s transform typically cleans and restructures data — joins, filters, type conversions. A data scientist’s transform also creates features: derived quantities that encode domain knowledge in a form that models can learn from.

Feature engineering is the process of turning raw data into model inputs. It sounds mechanical, but it’s one of the highest-leverage activities in applied data science. A good feature can improve a model more than switching algorithms or tuning hyperparameters.

import pandas as pd
import numpy as np

rng = np.random.default_rng(42)

# Simulated e-commerce transaction data
n = 2000
transactions = pd.DataFrame({
    "user_id": rng.integers(1, 201, n),
    "timestamp": pd.date_range("2023-06-01", periods=n, freq="37min"),
    "amount": np.round(rng.lognormal(3, 1, n), 2),
    "category": rng.choice(["electronics", "clothing", "food", "books"], n),
})

# Feature engineering: aggregate per-user behaviour
user_features = transactions.groupby("user_id").agg(
    total_spend=("amount", "sum"),
    avg_transaction=("amount", "mean"),
    transaction_count=("amount", "count"),
    unique_categories=("category", "nunique"),
    max_single_purchase=("amount", "max"),
).reset_index()

# Derived ratios
user_features["avg_to_max_ratio"] = (
    user_features["avg_transaction"] / user_features["max_single_purchase"]
)

print(user_features.describe().round(2))
       user_id  total_spend  avg_transaction  transaction_count  \
count   200.00       200.00           200.00             200.00   
mean    100.50       323.36            31.83              10.00   
std      57.88       175.18            13.03               3.14   
min       1.00        36.76             6.13               3.00   
25%      50.75       193.16            22.23               8.00   
50%     100.50       299.44            29.96              10.00   
75%     150.25       422.16            37.69              12.00   
max     200.00      1082.31            79.26              19.00   

       unique_categories  max_single_purchase  avg_to_max_ratio  
count             200.00               200.00            200.00  
mean                3.71               104.72              0.37  
std                 0.51                74.71              0.12  
min                 2.00                10.00              0.13  
25%                 3.00                53.85              0.28  
50%                 4.00                83.82              0.35  
75%                 4.00               123.71              0.45  
max                 4.00               428.17              0.80  

Each of these features encodes a specific hypothesis about user behaviour: users who spend across many categories might behave differently from those who concentrate in one; users whose average transaction is close to their maximum are consistent spenders, while those with a low ratio make occasional large purchases. The model doesn’t know these hypotheses — it just sees numbers. The feature engineer’s job is to present the numbers in a way that makes the underlying patterns learnable.

Author’s Note

I spent my first few months in data science thinking feature engineering was beneath me — just plumbing work before the real modelling began. Then I watched a colleague take a model I’d been tuning for a week and improve its performance by 8 percentage points in an afternoon, simply by adding three well-chosen features. The model was the same. The algorithm was the same. The hyperparameters were the same. What changed was the representation of the data. That was the day I understood that in applied data science, features matter more than algorithms.

17.4 The train/serve skew problem

One of the most insidious bugs in production ML systems is train/serve skew — a mismatch between how features are computed during training and how they’re computed at serving time. The model learns from one representation of the data but makes predictions on a subtly different one.

This happens more easily than you’d think. During training, you compute features in a batch pipeline using pandas on a static dataset. At serving time, you compute features in real time using a different code path — perhaps a Java service that reimplements the same logic but handles edge cases differently. A rounding difference, a different null-handling strategy, or a time-zone conversion that behaves differently between Python and Java, and your model’s predictions degrade silently.

import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

rng = np.random.default_rng(42)

# Training: compute features from historical data
train_data = rng.normal(50, 15, (500, 3))
scaler = StandardScaler()
X_train = scaler.fit_transform(train_data)
y_train = (X_train[:, 0] + 0.5 * X_train[:, 1] > 0).astype(int)

model = LogisticRegression(random_state=42)
model.fit(X_train, y_train)

# Serving: a new observation arrives
new_obs = np.array([[65, 40, 55]])

# Correct: use the fitted scaler from training
X_correct = scaler.transform(new_obs)
pred_correct = model.predict_proba(X_correct)[0, 1]

# Skewed: accidentally re-fit the scaler on just the new observation
bad_scaler = StandardScaler()
X_skewed = bad_scaler.fit_transform(new_obs)  # this centres on the single obs!
pred_skewed = model.predict_proba(X_skewed)[0, 1]

print(f"Correct prediction:  {pred_correct:.4f}")
print(f"Skewed prediction:   {pred_skewed:.4f}")
print(f"Difference:          {abs(pred_correct - pred_skewed):.4f}")
Correct prediction:  0.9678
Skewed prediction:   0.4463
Difference:          0.5215

The correct serving path uses the training scaler — the one fitted on the training data’s mean and standard deviation. The skewed path re-fits a scaler on the single new observation, which divides by that observation’s standard deviation — zero, since a single point has no variance. Sklearn silently produces NaN features, which the model then consumes to produce meaningless predictions. No exception is raised; the code runs to completion and returns an answer. It’s just the wrong answer.

To see this at scale, we can generate many test observations and compare the correct and skewed predictions for each:

import matplotlib.pyplot as plt
import warnings

test_data = rng.normal(50, 15, (100, 3))
correct_preds = model.predict_proba(scaler.transform(test_data))[:, 1]

skewed_preds = []
for obs in test_data:
    s = StandardScaler()
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")  # suppress divide-by-zero warnings
        x = s.fit_transform(obs.reshape(1, -1))
    skewed_preds.append(model.predict_proba(x)[0, 1])
skewed_preds = np.array(skewed_preds)

fig, ax = plt.subplots(figsize=(6, 5))
ax.scatter(correct_preds, skewed_preds, alpha=0.5, s=30, edgecolors="none")
ax.plot([0, 1], [0, 1], "--", color="grey", linewidth=1, label="Perfect agreement")
ax.set_xlabel("Correct prediction (training scaler)")
ax.set_ylabel("Skewed prediction (re-fitted scaler)")
ax.set_title("Train/serve skew distorts predictions")
ax.legend(frameon=False)
ax.set_xlim(-0.05, 1.05)
ax.set_ylim(-0.05, 1.05)
plt.tight_layout()
plt.show()

Train/serve skew in action. Each point is a test observation scored two ways: correctly (using the training scaler) and incorrectly (re-fitting a scaler per observation). The skewed predictions cluster around 0.5 — the model’s default when features are NaN — while correct predictions span the full probability range.{#fig-train-serve-skew width=566 height=470 fig-alt=’ Scatter plot comparing correct vs skewed predicted probabilities for

100 test observations. Correct predictions spread from 0 to 1 along the

x-axis; skewed predictions cluster near 0.5 on the y-axis, showing that

re-fitting the scaler per observation destroys the model's discrimination.’}

Engineering Bridge

Train/serve skew is the data science equivalent of dev/prod parity — one of the twelve-factor app principles. In web development, you learn (usually the hard way) that if your dev environment uses SQLite but production uses PostgreSQL, you’ll discover bugs only in production. The same principle applies to feature computation: if the training pipeline computes features differently from the serving pipeline, your model will behave differently in production than in evaluation. The solution is also the same — minimise the gap. Use the same code, the same libraries, and ideally the same feature computation path for both training and serving.

17.5 Feature stores: the shared library of features

A feature store is infrastructure that solves two problems at once: it provides a single, shared repository of feature definitions (eliminating duplicate feature engineering across teams), and it serves the same features consistently for both training and inference (eliminating train/serve skew).

Instead of each model reimplementing “days since last purchase” or “average transaction amount over 30 days,” these features are defined once, computed by a shared pipeline, and stored in a system that serves them to any model that needs them. Training reads historical features as of the time each training example occurred (point-in-time correctness). Serving reads the latest feature values for real-time predictions.

Feature stores typically have two storage layers. The offline store — a data warehouse or file system — holds historical features used in training, stored as a large columnar dataset keyed by entity ID and timestamp. The online store — a low-latency key-value store like Redis or DynamoDB — holds the latest feature values used in real-time serving. A synchronisation process keeps the online store up to date as the offline store is refreshed.

import pandas as pd
import numpy as np

rng = np.random.default_rng(42)

# Simulate a feature store's offline layer: historical features keyed by
# (user_id, event_timestamp)
n_users = 50
n_snapshots = 12  # monthly snapshots

records = []
for user_id in range(1, n_users + 1):
    base_spend = rng.exponential(200)
    for month in range(n_snapshots):
        records.append({
            "user_id": user_id,
            "snapshot_date": pd.Timestamp("2023-01-01") + pd.DateOffset(months=month),
            "total_spend_30d": round(base_spend * rng.uniform(0.5, 1.5), 2),
            "transaction_count_30d": int(rng.poisson(8)),
            "days_since_last_purchase": int(rng.exponential(5)),
        })

offline_store = pd.DataFrame(records)
print(f"Offline store: {offline_store.shape[0]} rows "
      f"({n_users} users × {n_snapshots} snapshots)")
print(f"Columns: {list(offline_store.columns)}")
print()

# Point-in-time lookup: get features as they were on a specific date
def get_features_at(store: pd.DataFrame, user_id: int,
                    as_of: pd.Timestamp) -> pd.Series:
    """Retrieve features for a user as of a given date."""
    mask = (store["user_id"] == user_id) & (store["snapshot_date"] <= as_of)
    if mask.sum() == 0:
        return pd.Series(dtype=float)
    return store.loc[mask].sort_values("snapshot_date").iloc[-1]

# Training: get features as of training label date (avoids future leakage)
train_features = get_features_at(offline_store, user_id=1,
                                  as_of=pd.Timestamp("2023-06-15"))
print(f"Training features for user 1 as of 2023-06-15:")
print(train_features[["total_spend_30d", "transaction_count_30d",
                       "days_since_last_purchase"]].to_string())
Offline store: 600 rows (50 users × 12 snapshots)
Columns: ['user_id', 'snapshot_date', 'total_spend_30d', 'transaction_count_30d', 'days_since_last_purchase']

Training features for user 1 as of 2023-06-15:
total_spend_30d             545.62
transaction_count_30d            5
days_since_last_purchase         7

The get_features_at function illustrates the critical concept of point-in-time correctness. When building training data, you must retrieve features as they existed at the time the label was observed, not as they exist today. Using today’s features to predict yesterday’s outcome is data leakage — the model sees information from the future during training, learns to rely on it, and then fails in production where that information isn’t available.

Engineering Bridge

A feature store is to ML features what a package registry (npm, PyPI, Maven) is to code libraries. Without a registry, every team copies and modifies shared code independently, leading to inconsistent behaviour and duplicated maintenance. A package registry provides a single source of truth with versioning and dependency management. A feature store does the same for computed features: it provides a canonical definition, historical versioning, and consistent access for both training (batch reads of historical data) and serving (real-time reads of current values). The parallel even extends to the governance problem — just as a package registry lets you audit who depends on a library before changing its API, a feature store lets you audit which models depend on a feature before changing its computation logic.

17.6 Data validation: catching problems early

The most expensive bugs in data pipelines are the ones that don’t crash. A malformed API response that gets silently ingested, a join that drops rows due to a key mismatch, a feature column that drifts to all zeros after an upstream schema change — these produce valid-looking data that quietly degrade your model’s predictions.

Data validation is the practice of asserting expectations about your data at each stage of the pipeline, the same way you’d add assertions and contract tests to a service boundary.

import pandas as pd
import numpy as np

def validate_features(df: pd.DataFrame) -> list[str]:
    """Validate a feature DataFrame against expected contracts."""
    errors = []

    # Schema checks
    required_cols = {"user_id", "total_spend", "transaction_count",
                     "unique_categories"}
    missing = required_cols - set(df.columns)
    if missing:
        errors.append(f"Missing columns: {missing}")

    # Null checks
    null_pct = df.isnull().mean()
    high_null_cols = null_pct[null_pct > 0.05].index.tolist()
    if high_null_cols:
        errors.append(f"Columns exceed 5% null threshold: {high_null_cols}")

    # Range checks
    if "total_spend" in df.columns and (df["total_spend"] < 0).any():
        errors.append("Negative values in total_spend")

    if "transaction_count" in df.columns and (df["transaction_count"] < 0).any():
        errors.append("Negative values in transaction_count")

    # Distribution checks (detect drift)
    if "total_spend" in df.columns:
        median_spend = df["total_spend"].median()
        if median_spend < 1 or median_spend > 10_000:
            errors.append(f"Suspicious median total_spend: {median_spend:.2f}")

    # Completeness check
    if len(df) < 100:
        errors.append(f"Fewer than 100 rows ({len(df)}) — possible extraction failure")

    return errors

# Test with clean data
rng = np.random.default_rng(42)
good_data = pd.DataFrame({
    "user_id": range(500),
    "total_spend": rng.exponential(200, 500),
    "transaction_count": rng.poisson(10, 500),
    "unique_categories": rng.integers(1, 6, 500),
})

errors = validate_features(good_data)
print(f"Clean data validation: {'PASSED' if not errors else errors}")

# Test with problematic data
bad_data = good_data.copy()
bad_data.loc[:40, "total_spend"] = np.nan  # >5% nulls
bad_data.loc[100:105, "transaction_count"] = -1  # negative counts

errors = validate_features(bad_data)
print(f"Bad data validation:   {errors}")
Clean data validation: PASSED
Bad data validation:   ["Columns exceed 5% null threshold: ['total_spend']", 'Negative values in transaction_count']

The validation function checks four categories of expectation: schema (are the right columns present?), nulls (is data completeness within tolerance?), ranges (are values physically plausible?), and distributions (has the data drifted from expected patterns?). In production, these checks run at every pipeline stage — after extraction, after each transform, and before load. If any check fails, the pipeline halts and alerts the team rather than silently writing bad data.

Author’s Note

I used to think of data validation as a “nice to have” — something you add after the pipeline is working. Then I spent two days debugging a model accuracy drop that turned out to be caused by a single upstream column changing from British date format (DD/MM/YYYY) to American format (MM/DD/YYYY). The pipeline happily ingested the data, the dates parsed without error (1 March became 3 January), and the model retrained on quietly wrong features. A simple validation check — “assert max(month) <= 12” — would have caught it in seconds. Now I write the validation before I write the transform.

17.7 Orchestrating pipeline stages

A real-world feature pipeline is rarely a single script. It’s a directed acyclic graph (DAG) of dependent stages: extraction depends on the source being available, transforms depend on the extraction completing, validation depends on the transform, and loading depends on validation passing.

In Section 16.7 we saw how DVC encodes these dependencies for reproducibility. For scheduled production pipelines — the ones that run nightly or hourly to keep feature stores up to date — dedicated orchestration tools manage execution, retries, scheduling, and alerting. The most widely used are Airflow (the established standard), Prefect (a more Pythonic alternative), and Dagster (which emphasises typed data contracts between stages).

All three share the same core abstraction: a DAG of tasks, where each task is a Python function or shell command, and edges define data dependencies. If you’ve used CI/CD systems like GitHub Actions or GitLab CI, the structure will feel familiar — jobs, stages, dependencies, and retry policies.

# Conceptual Airflow DAG — not executable in this environment
# but illustrates the pipeline structure
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    "user_features_pipeline",
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
)

extract = PythonOperator(
    task_id="extract_transactions",
    python_callable=extract_transactions,  # reads from source DB
    dag=dag,
)

transform = PythonOperator(
    task_id="compute_user_features",
    python_callable=compute_user_features,  # aggregates per user
    dag=dag,
)

validate = PythonOperator(
    task_id="validate_features",
    python_callable=validate_feature_data,  # checks quality gates
    dag=dag,
)

load = PythonOperator(
    task_id="load_to_feature_store",
    python_callable=load_to_feature_store,  # writes to offline + online stores
    dag=dag,
)

extract >> transform >> validate >> load  # define execution order

The >> operator defines dependencies: transform won’t run until extract succeeds, validate won’t run until transform succeeds. If validate fails, load never executes — bad data never reaches the feature store. The orchestrator handles scheduling (run daily at 02:00), retries (retry extract up to 3 times with exponential backoff), and alerting (page the on-call engineer if the pipeline hasn’t succeeded by 06:00).

17.8 A worked example: end-to-end feature pipeline

The following example brings together the concepts from this chapter: extraction, validation, feature engineering, and reproducible output. We’ll build a feature pipeline for the service metrics scenario from Section 16.9 and demonstrate how each stage validates its outputs.

import numpy as np
import pandas as pd
import hashlib
import io
from datetime import datetime, timezone

# ---- Stage 1: Extract ----
rng = np.random.default_rng(42)
n = 1500

raw_metrics = pd.DataFrame({
    "service_id": rng.choice(["api-gateway", "auth-service", "payment-svc",
                               "search-api", "user-svc"], n),
    "timestamp": pd.date_range("2023-01-01", periods=n, freq="6h"),
    "request_rate": rng.exponential(500, n),
    "error_count": rng.poisson(5, n),
    "p50_latency_ms": (p50 := rng.lognormal(3, 0.5, n)),
    "p99_latency_ms": p50 + rng.exponential(30, n),  # p99 >= p50 by definition
    "cpu_pct": np.clip(rng.normal(45, 20, n), 0, 100),
})

# Introduce realistic messiness
raw_metrics.loc[rng.choice(n, 20, replace=False), "error_count"] = np.nan
raw_metrics.loc[rng.choice(n, 10, replace=False), "cpu_pct"] = -1  # sensor glitch

print(f"Extracted: {raw_metrics.shape[0]} rows from 5 services")
print(f"  Nulls: {raw_metrics.isna().sum().sum()}, "
      f"Invalid CPU: {(raw_metrics['cpu_pct'] < 0).sum()}")
Extracted: 1500 rows from 5 services
  Nulls: 20, Invalid CPU: 10

The raw extract has the usual problems: null error counts (perhaps the monitoring agent was down during those intervals) and negative CPU readings from a sensor glitch. Before transforming this data, we validate the extraction to catch catastrophic failures early — a truncated extract or a missing column would waste everything downstream.

# ---- Stage 2: Validate extraction ----
def validate_extraction(df: pd.DataFrame) -> list[str]:
    """Gate check after extraction."""
    errors = []
    required = {"service_id", "timestamp", "request_rate", "error_count",
                "p50_latency_ms", "p99_latency_ms", "cpu_pct"}
    missing = required - set(df.columns)
    if missing:
        errors.append(f"Missing columns: {missing}")
    if len(df) < 100:
        errors.append(f"Too few rows: {len(df)}")
    if df["timestamp"].nunique() < 10:
        errors.append("Suspiciously few unique timestamps")
    return errors

extract_errors = validate_extraction(raw_metrics)
print(f"Extraction validation: {'PASSED' if not extract_errors else extract_errors}")
Extraction validation: PASSED

With the extraction validated, the transform stage cleans the data and engineers features. Notice that we fill missing values using per-service medians rather than a global median — the error rate for payment-svc is likely on a different scale from search-api, so a global fill would blur meaningful differences between services.

# ---- Stage 3: Transform — clean and engineer features ----
def transform_service_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """Clean raw metrics and compute service-level features."""
    clean = df.copy()

    # Fix invalid values
    clean.loc[clean["cpu_pct"] < 0, "cpu_pct"] = np.nan

    # Fill missing values with per-service medians
    for col in ["error_count", "cpu_pct"]:
        clean[col] = clean.groupby("service_id")[col].transform(
            lambda s: s.fillna(s.median())
        )

    # Derive features
    clean["error_rate"] = clean["error_count"] / clean["request_rate"].clip(lower=1)
    clean["latency_ratio"] = clean["p99_latency_ms"] / clean["p50_latency_ms"].clip(lower=1)
    clean["is_high_cpu"] = (clean["cpu_pct"] > 80).astype(int)

    # Time-based features
    clean["hour"] = clean["timestamp"].dt.hour
    clean["is_business_hours"] = clean["hour"].between(9, 17).astype(int)

    return clean

features = transform_service_metrics(raw_metrics)
print(f"Transformed: {features.shape[0]} rows, {features.shape[1]} columns")
print(f"  New features: error_rate, latency_ratio, is_high_cpu, hour, is_business_hours")
print(f"  Remaining nulls: {features.isna().sum().sum()}")
Transformed: 1500 rows, 12 columns
  New features: error_rate, latency_ratio, is_high_cpu, hour, is_business_hours
  Remaining nulls: 0

The transform has cleaned the data and produced five new features. Visualising their distributions gives us a quick sanity check — and establishes a baseline that future pipeline runs can be compared against to detect drift.

import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 3, figsize=(10, 3.5))
feat_cols = ["error_rate", "latency_ratio", "cpu_pct"]
titles = ["Error rate", "Latency ratio (p99/p50)", "CPU %"]

for ax, col, title in zip(axes, feat_cols, titles):
    ax.hist(features[col].dropna(), bins=40, edgecolor="white", linewidth=0.5)
    ax.set_xlabel(col)
    ax.set_title(title)
    ax.axvline(features[col].median(), color="red", linestyle="--", linewidth=1,
               label=f"Median: {features[col].median():.2f}")
    ax.legend(fontsize=8, frameon=False)

plt.tight_layout()
plt.show()

Distributions of the three continuous features produced by the transform stage. These baselines are what a distribution drift check would compare against on future pipeline runs.{#fig-feature-distributions width=948 height=326 fig-alt=’ Three histograms side by side showing the distributions of error

rate (right-skewed, mostly near zero), latency ratio (right-skewed,

centred around 2), and CPU percentage (roughly normal, centred around 45).’}

Now we validate the output — this is the gate between transform and load, and it’s where we check invariants that should hold after a correct transformation. If any check fails, the pipeline halts rather than writing bad features to the store.

# ---- Stage 4: Validate transform output ----
def validate_features_pipeline(df: pd.DataFrame) -> list[str]:
    """Gate check after feature engineering."""
    errors = []

    # No nulls should remain after transform
    null_cols = df.columns[df.isna().any()].tolist()
    if null_cols:
        errors.append(f"Unexpected nulls in: {null_cols}")

    # Derived features should be non-negative
    for col in ["error_rate", "latency_ratio"]:
        if col in df.columns and (df[col] < 0).any():
            errors.append(f"Negative values in {col}")

    # Latency ratio should be >= 1 (p99 >= p50 by definition)
    if "latency_ratio" in df.columns:
        below_one = (df["latency_ratio"] < 1.0).mean()
        if below_one > 0.05:
            errors.append(f"Latency ratio < 1 for {below_one:.1%} of rows")

    return errors

transform_errors = validate_features_pipeline(features)
print(f"Transform validation: {'PASSED' if not transform_errors else transform_errors}")
Transform validation: PASSED

With both validation gates passed, we can safely load the features. The load step writes the data and produces a manifest — a summary record that captures the pipeline’s provenance: row counts, a content hash, and the results of every validation check.

# ---- Stage 5: Load with provenance ----
buf = io.BytesIO()
features.to_parquet(buf, index=False)
content_hash = hashlib.sha256(buf.getvalue()).hexdigest()[:16]

manifest = {
    "pipeline": "service_features",
    "timestamp": datetime.now(timezone.utc).isoformat(),
    "input_rows": len(raw_metrics),
    "output_rows": len(features),
    "output_columns": len(features.columns),
    "content_hash": content_hash,
    "validation": {
        "extract": "PASSED" if not extract_errors else extract_errors,
        "transform": "PASSED" if not transform_errors else transform_errors,
    },
}

print("Pipeline manifest:")
for key, value in manifest.items():
    print(f"  {key}: {value}")
Pipeline manifest:
  pipeline: service_features
  timestamp: 2026-02-18T18:08:19.174728+00:00
  input_rows: 1500
  output_rows: 1500
  output_columns: 12
  content_hash: 7d316065dc147dfd
  validation: {'extract': 'PASSED', 'transform': 'PASSED'}

Each stage validates its output before the next stage begins. The final manifest records the full provenance: how many rows entered, how many survived, what the output hash is, and whether every validation gate passed. This is the data pipeline equivalent of a CI build report — a complete record of what happened, in what order, with what results.

17.9 Summary

  1. Data pipelines are the majority of ML code. The model is a small component; the infrastructure that extracts, transforms, validates, and serves data is where most engineering effort goes.

  2. ETL is a pattern, not a technology. Extract handles unreliable sources with defensive programming. Transform encodes domain knowledge as features. Load writes results atomically with content hashes for verification.

  3. Train/serve skew is a silent killer. Use the same code path for computing features in training and in serving. Feature stores formalise this by providing a single source of truth for feature definitions and values.

  4. Validate data at every pipeline boundary. Check schemas, null rates, value ranges, and distributions after each stage. A pipeline that halts on bad data is better than one that silently produces bad predictions.

  5. Orchestrate with DAGs. Express pipeline dependencies explicitly using tools like Airflow, Prefect, or DVC. This gives you automatic retries, dependency tracking, and the confidence that no stage runs before its inputs are ready and validated.

17.10 Exercises

  1. Extend the validate_features function from Section 17.6 to include a distribution drift check: compare the mean and standard deviation of each numerical column against reference statistics (provided as a dictionary), and flag any column where the mean has shifted by more than 2 standard deviations. Test it by creating a “drifted” dataset where one feature’s mean has shifted.

  2. Write a feature engineering function that takes the raw transaction data from Section 17.3 and computes time-windowed features: for each user, calculate total spend and transaction count over the last 7 days, 30 days, and 90 days relative to a given reference date. Discuss why the reference date matters for avoiding data leakage.

  3. Demonstrate the train/serve skew problem with a StandardScaler: train a logistic regression model using fit_transform on the training set, then show that (a) using transform with the training scaler produces correct serving predictions, and (b) calling fit_transform on each new observation individually produces wrong predictions. Measure the average prediction error between the two approaches.

  4. Conceptual: Your company has 15 ML models in production, and 8 of them use a feature called “customer lifetime value” (CLV). Each model team computes CLV slightly differently — different time windows, different revenue definitions, different null handling. A colleague proposes building a feature store to standardise this. What are the benefits? What are the risks? How would you handle the transition without breaking existing models?

  5. Conceptual: A junior data scientist argues that data validation is unnecessary because “the model will just learn to handle bad data.” In what sense are they wrong? In what narrow sense might they have a point? What’s the strongest argument for validation that goes beyond model accuracy?