Stop Blaming the Model: Build ML Data Pipelines That Don’t Lie in Training or Serving
What it takes to ship reliable ML: contracts, quality gates, feature parity, and canary serving—backed by metrics, not vibes.
If it’s not enforced in automation, it’s not a contract—it’s a suggestion.Back to all posts
The incident every ML team eventually hits
Three quarters into a churn-reduction push at a subscription company, conversion cratered 14% after a “harmless” experimentation change. The model hadn’t changed in two weeks. Our dashboards were green. Pager duty wasn’t.
The root cause wasn’t fancy: an upstream schema tweak renamed plan_tier to plan_level in a Kafka topic. The online service defaulted the missing field to basic. The training job silently imputed a median from last week’s parquet. Boom: training-serving skew. The model was making bad assumptions about “basic” plans that never existed in training.
We fixed it the boring way: contracts at the borders, quality gates in the middle, and release discipline at the end. MTTR dropped from 6 hours to 35 minutes. Skew alerts catch drift early. Conversions rebounded in 48 hours.
This is how we build it now at GitPlumbers, every time.
The architecture that actually works
Keep it simple, observable, and testable:
- Ingest:
KafkawithDebeziumCDC from OLTP; batch from S3/GCS;Schema Registryenforced. - Storage:
Delta LakeorApache Icebergon object store; time travel enabled; compacted with ZSTD. - Transform:
dbtfor SQL lineage + tests;Spark/Beamfor heavy lifting. - Quality Gates:
Great Expectationspre/post validations; null/uniqueness/distribution checks. - Feature Store:
Feastfor offline (BigQuery/Snowflake/Delta) and online (Redis/DynamoDB) parity. - Training: Reproducible snapshots; ML tracked with
MLflow; feature retrieval via point-in-time joins. - Serving:
KServe/BentoMLwithIstiofor canary/shadow; online feature lookup. - Observability:
OpenLineage+Marquez; pipeline metrics inPrometheus/Grafana; data monitors (WhyLabs/Arize) for drift; logs with OTel. - Delivery: GitOps via
ArgoCD; infra viaTerraform; job orchestration withAirfloworDagster.
If any of that feels optional, that’s where your next outage is hiding.
Lock the front door: data contracts and schema governance
You don’t need a 50-slide deck on “data mesh.” You need hard edges and automated enforcement.
- Define contracts at the Kafka topic or API boundary using
protobuforavro. - Enforce compatibility with
Confluent Schema RegistryorKarapaceand CI checks. - Block breaking changes automatically. No human in the loop.
Example: creating a Debezium CDC connector with a schema registry and compatibility set to BACKWARD:
curl -X POST http://kafka-connect:8083/connectors -H 'Content-Type: application/json' -d '
{
"name": "users-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "pg",
"database.port": "5432",
"database.user": "replica",
"database.password": "***",
"database.dbname": "app",
"table.include.list": "public.users",
"plugin.name": "pgoutput",
"topic.prefix": "cdc",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}'In CI, fail on incompatible schemas:
curl -s http://schema-registry:8081/config | jq '.compatibility= "BACKWARD"' | curl -X PUT -H 'Content-Type: application/json' -d @- http://schema-registry:8081/config
# Validate specific subject
curl -X POST http://schema-registry:8081/compatibility/subjects/cdc-users-value/versions/latest -H 'Content-Type: application/json' -d @new-schema.avscFor REST sources, generate client-side validators with pydantic or cue and treat violations as deploy blockers.
If it’s not enforced in automation, it’s not a contract—it’s a suggestion.
Quality gates that fail fast (and explain why)
I’ve seen teams run daily jobs that “succeed” while writing 30% nulls. Your DAG being green isn’t a KPI. Put tests in the path.
- dbt tests catch relational sins: uniqueness, referential integrity, accepted values.
- Great Expectations covers distributional checks and row-level constraints.
- Quarantine bad batches to a
dead_lettertable with lineage.
dbt test example:
# models/core/users.yml
version: 2
models:
- name: dim_users
columns:
- name: user_id
tests:
- unique
- not_null
- name: plan_tier
tests:
- accepted_values:
values: ['basic', 'pro', 'enterprise']Great Expectations suite:
# expectations/dim_users_suite.yml
expectations:
- expect_table_row_count_to_be_between:
min_value: 100000
- expect_column_values_to_not_be_null:
column: user_id
- expect_column_values_to_be_in_set:
column: plan_tier
value_set: ["basic", "pro", "enterprise"]
- expect_column_kldivergence_to_be_less_than:
column: country
partition_object: "ref:country_histogram_last_14_days"
threshold: 0.05Wire these into the DAG as pre/post steps. If expectations fail, block downstream jobs and page the on-call with a human-readable diff and sample rows. Don’t bury it in logs.
We measure:
- Freshness SLO: 99.5% of feature tables updated within 15 minutes.
- Bad record rate: <0.5% rows quarantined per day.
- Time-to-detect: <5 minutes from upstream schema drift to alert.
Kill skew at the source: feature store with offline/online parity
Training-serving skew is a career tax. A feature store isn’t “nice to have”; it’s the way you ensure the same logic runs in both worlds.
With Feast:
- Offline store:
BigQuery,Snowflake, orDeltaon S3/GCS. - Online store:
RedisorDynamoDBfor low-latency reads. - Point-in-time joins prevent leakage during training.
- Same transformations via feature views.
Example Feast setup:
# feature_repo/feature_store.py
from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, String
from feast.infra.offline_stores.file_source import FileSource
from datetime import timedelta
user = Entity(name="user_id")
user_stats = FileSource(
path="s3://lake/features/user_stats.parquet",
timestamp_field="event_ts",
)
user_features = FeatureView(
name="user_features_v1",
entities=[user],
ttl=timedelta(days=7),
schema=[
Field(name="ctr_30d", dtype=Float32),
Field(name="plan_tier", dtype=String),
],
source=user_stats,
)
store = FeatureStore(repo_path=".")Training materialization:
# point-in-time correct training dataset
training_df = store.get_historical_features(
entity_df=events_df, # contains user_id and event_ts
features=["user_features_v1:ctr_30d", "user_features_v1:plan_tier"],
).to_df()Online read in serving path:
feature_vector = store.get_online_features(
features=["user_features_v1:ctr_30d", "user_features_v1:plan_tier"],
entity_rows=[{"user_id": uid}],
).to_dict()We set a hard skew SLO: JS divergence between training and online feature distributions < 0.02 over 24h. When it breaches, we cut canary traffic and investigate.
Treat training like production: deterministic, versioned, auditable
I’ve been paged at 2 a.m. because someone “retrained on the latest data” and replaced a model artifact. That’s not MLOps; that’s roulette.
Non-negotiables:
- Versioned data: use
Delta LakeorIcebergand pin by commit or timestamp. - Run tracking:
MLflowfor params/metrics/artifacts, with the dataset signature recorded. - Reproducible envs: lock images with
poetry.lock/requirements.txtandcondaenvs hashed; build via CI.
Training example:
import mlflow
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
DELTA_URI = "s3://lake/features/user_features_v1"
SNAPSHOT_TS = "2025-09-21T00:00:00Z"
features = spark.read.format("delta").option("timestampAsOf", SNAPSHOT_TS).load(DELTA_URI)
with mlflow.start_run() as run:
mlflow.set_tag("data.timestampAsOf", SNAPSHOT_TS)
mlflow.set_tag("data.path", DELTA_URI)
# train ...
mlflow.log_metric("auc", 0.812)
mlflow.sklearn.log_model(model, "model")This is the difference between “we think it’s better” and “we can prove it’s better and re-run it.”
Ship models like services: canary, shadow, rollback
Your model is a production service. Treat releases like you would for a critical API.
- Shadow new models first: duplicate traffic, no user impact, measure offline KPIs.
- Canary at 5–10% with guardrails: p95 latency, error rate, business metrics.
- Automated rollback if SLOs breach for N minutes.
KServe inference service:
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: recsys-v2
spec:
predictor:
sklearn:
storageUri: s3://models/recsys/v2/
minReplicas: 2
maxReplicas: 10Istio canary routing:
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: recsys
spec:
hosts: ["recsys.svc.cluster.local"]
http:
- route:
- destination:
host: recsys-v1-predictor
weight: 90
- destination:
host: recsys-v2-predictor
weight: 10We track via Prometheus:
recsys_prediction_latency_ms{quantile="0.95"} < 100for 30mrecsys_error_rate < 0.5%- Business: CTR lift on canary ≥ +1.5% vs control
If any fail, an Argo Rollouts analysis template flips traffic back in under 2 minutes.
Observability and lineage: shorten MTTR, stop guessing
When something breaks, you need to see exactly what and where.
- Lineage with
OpenLineage+Marquezfrom Airflow/Dagster: which upstream tables fed this feature view, which commit IDs? - Metrics: pipeline success latency, row counts, null rates to Prometheus; alerts in PagerDuty.
- Drift: embeddings and feature drift via
WhyLabs/Arize; link alerts to model version and feature set.
Airflow with OpenLineage config:
# airflow.cfg (snippet)
[openlineage]
transport = http
url = http://marquez:5000
backend = openlineage.airflowThe non-negotiable KPI here is MTTR. Target: reduce MTTR for data-caused model incidents to <45 minutes. With contracts + tests + lineage, that’s realistic.
Results we’ve delivered (and you can too)
At three different clients in the last 18 months:
- 80% reduction in bad-data incidents within 60 days by enforcing contracts + dbt/GE tests.
- 99.7% freshness compliance for critical features after moving to Delta + incremental models.
- Training-serving skew cut by 70% using Feast parity and explicit skew SLOs.
- Retrain time down from 6 hours to 90 minutes with deterministic snapshots and better partitioning.
- 22% infra cost reduction by deleting zombie jobs, right-sizing KServe autoscaling, and ZSTD compression.
It’s not magic. It’s discipline.
How to roll this out in 30-60 days
- Write the SLOs: freshness targets, null thresholds, skew limits. Tie them to KPIs.
- Put contracts on your top 5 sources with schema registry. Break the build on incompatible changes.
- Add dbt tests and GE suites to the critical path; quarantine bad data.
- Stand up Feast with one high-impact feature group; wire online Redis and offline Delta/BigQuery.
- Snapshot training data and track runs in MLflow; stop using “latest.”
- Deploy the next model via shadow + canary on KServe; add rollback.
- Instrument lineage and pipeline metrics; make MTTR visible.
If you can’t do all seven, start with 1–3. You’ll still buy yourself a lot of sleep.
Key takeaways
- Pin reliability to SLOs: freshness, quality, and skew—not vague “pipeline is green.”
- Enforce data contracts at ingestion; auto-block breaking schema changes with a registry and tests.
- Use a feature store with online/offline parity to kill skew before it kills your KPIs.
- Treat training like prod: versioned data snapshots, deterministic pipelines, MLflow tracking.
- Release models like services: canary, shadow, and rollback with clear guardrails and metrics.
Implementation checklist
- Define data SLOs: freshness, null rates, and skew thresholds tied to business KPIs.
- Implement data contracts with `protobuf`/`avro` and a schema registry; block incompatible changes.
- Add quality gates: `dbt` tests + `Great Expectations` pre/post validations.
- Adopt a feature store (e.g., Feast) for online/offline parity and time-travel retrieval.
- Snapshot training data (Delta/Iceberg), track runs in MLflow, and pin datasets by version.
- Instrument lineage (OpenLineage) and observability (Prometheus/Grafana) across the stack.
- Ship models via canary/ shadow (KServe/Istio) with automated rollback on SLO breach.
- Automate with GitOps (ArgoCD) and IaC (Terraform) to keep infra consistent and reviewable.
Questions we hear from teams
- Do I need a feature store, or can I roll my own with tables and Redis?
- You can roll your own, and that’s how most teams end up with skew. A feature store like Feast gives you offline/online parity, point-in-time joins, and a single definition of features. That’s the difference between “works in training” and “works in prod.”
- What’s the minimum viable stack to get reliability?
- Contracts + tests + versioning. Specifically: Schema Registry on ingestion topics, dbt tests + Great Expectations on critical models, and versioned data (Delta/Iceberg) with MLflow run tracking. You can add KServe/Istio and Feast as you scale.
- How do I quantify training-serving skew?
- Track distributional distance (e.g., Jensen–Shannon divergence) for key features between offline and online over a rolling window. Alert when it exceeds a threshold (e.g., 0.02). Tie breaches to automated rollback for canaries.
- We’re on Snowflake/BigQuery—does this still apply?
- Yes. Swap Spark/Delta for Snowflake/BigQuery-native transforms. Feast supports both as offline stores. Use Redis/DynamoDB for online. KServe works regardless of your warehouse; the key is consistent feature definitions and versioned snapshots.
- Isn’t this overkill for a small team?
- It’s cheaper than firefighting. Start with the top 5 data sources and 1–2 feature groups. The contracts + tests + versioning combo removes 80% of surprises. You can layer on serving discipline as traffic grows.
Ready to modernize your codebase?
Let GitPlumbers help you transform AI-generated chaos into clean, scalable applications.
