Your Model Didn’t Fail — Your Data Pipeline Did: Training+Serving Data That Doesn’t Lie
The fastest way to fix ML in production isn’t a new model—it’s a reliable data pipeline that feeds training and serving from the same, tested truth.
If your training and serving paths don’t share code, tests, and data contracts, you don’t have one model—you have two that disagree in production.Back to all posts
The incident you’ve lived: great offline metrics, garbage in prod
Two quarters ago we walked into a retail client where a “state-of-the-art” churn model face-planted in prod. Offline AUC: 0.91. In production: customers targeted for retention were 18% less likely to churn than the “control.” Turned out the model was fine. The data pipeline lied. Training used end-of-day aggregates. Serving used a homegrown API stitching Redis, two Kafka topics, and a CSV snapshot on S3. The snapshot schema changed on Tuesdays. Guess which day they deployed.
I’ve seen this movie at unicorns and at banks. The cure isn’t a better model; it’s a pipeline that treats data as a first-class production system—same transforms, same contracts, same tests, same versions.
What a train+serve-ready pipeline actually looks like
Keep the architecture boring, observable, and versioned:
- Ingest:
Kafka/Kinesisfor streams; batch fromSnowflake/BigQuery/PostgresviaFivetranorAirbyte. - Storage: Lake with
Delta LakeorApache Icebergon S3/GCS/ADLS; warehouse if you must, but version the data. - Transform:
dbtfor SQL,Sparkfor heavy lifting. All transformations tested and documented. - Validate:
Great Expectations/dbt testswith gates that can stop the line. - Lineage:
OpenLineage+Marquez(or Monte Carlo/Bigeye if you’re spending money). - Features:
Feastto define features once and materialize to batch + online stores (e.g.,Redis,DynamoDB). - Training: Reproducible jobs with
MLflowtracking and containerized runners. - Serving:
Seldon Core/KServe/TF ServingbehindIstiowith canary routes. - Orchestration:
AirfloworArgo Workflows. Deploy infra withTerraform. Ops withGitOpsviaArgoCD.
Glue it with SLOs, not vibes.
Reliability and quality: data SLOs or it didn’t happen
Stop arguing “is this data good?” and publish SLOs:
- Freshness: 99.5% of days, features are updated by 00:05 UTC.
- Completeness: 99.9% of records have non-null
customer_id,order_id. - Schema stability: breaking changes require 7-day notice and dual-write compatibility.
Enforce with tests and failures that matter.
# dbt_project.yml (dbt 1.7+)
name: core
version: 1.0.0
models:
core:
+materialized: table
+schema: analytics
# models/orders.yml
version: 2
models:
- name: orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: total_cents
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"Layer Great Expectations for distribution checks:
# check.py
from great_expectations.dataset import PandasDataset
import pandas as pd
df = pd.read_parquet("s3://lake/analytics/orders.parquet")
orders = PandasDataset(df)
orders.expect_column_values_to_not_be_null("order_id")
orders.expect_column_values_to_be_between("total_cents", min_value=0, max_value=10_000_000)
res = orders.validate()
if not res["success"]:
raise SystemExit("Validation failed")Wire it into the DAG and fail fast:
# Airflow 2.x
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from datetime import datetime
with DAG("ml_training_pipeline", schedule_interval="@daily", start_date=datetime(2024,1,1), catchup=False) as dag:
dbt_run = KubernetesPodOperator(
task_id="dbt_run",
name="dbt-run",
image="ghcr.io/dbt-labs/dbt:1.7.2",
cmds=["dbt","run","--project-dir","/dbt"]
)
ge_check = KubernetesPodOperator(
task_id="ge_check",
name="ge-check",
image="python:3.11-slim",
cmds=["python","/app/check.py"]
)
dbt_run >> ge_checkBonus points: publish SLO burn-rate alerts to Slack and PagerDuty, same as you do for API SLOs.
Reproducible training: version data, features, and code
If you can’t recreate last week’s training set, you’re not doing ML—you’re doing improv.
- Version the data with
Delta Lake(or Iceberg). Use time travel for exact snapshots. - Track experiments and artifacts in
MLflow. Log the data version used for training. - Containerize training with pinned images; pin Python packages (no “latest”).
# Spark + Delta time travel in training
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
features = spark.read.format("delta").option("versionAsOf", 352).load("s3://lake/features/customers")# MLflow 2.x logging
import mlflow
mlflow.set_experiment("churn_xgb")
with mlflow.start_run() as run:
mlflow.log_param("data_version", "delta:customers@v=352")
mlflow.log_param("code_sha", "9d1e6c7")
mlflow.sklearn.log_model(model, "model", registered_model_name="churn_xgb")That data_version is your escape hatch during incident response. When someone asks, “what changed?”, you can answer in seconds, not in a war room.
Feature parity in serving: stop paying the skew tax
Training computed avg_ticket_30d with a precise SQL window. Serving reimplemented it in Node with a different rounding rule. Congratulations, you just built two models.
Use a feature store so training and serving read the same definitions.
# Feast 0.36+ feature definitions
from datetime import timedelta
from feast import Feature, FeatureView, Entity, ValueType, FileSource
customer = Entity(name="customer_id", value_type=ValueType.INT64)
source = FileSource(path="s3://lake/features/customers.parquet", timestamp_field="event_time")
customer_features = FeatureView(
name="customer_features",
entities=[customer],
ttl=timedelta(days=14),
features=[
Feature(name="tenure_months", dtype=ValueType.INT32),
Feature(name="avg_ticket", dtype=ValueType.FLOAT),
],
online=True,
batch_source=source,
)Configure online store for low-latency reads:
# feature_store.yaml
project: churn
registry: s3://feast/registry.db
provider: aws
online_store:
type: redis
connection_string: redis-master:6379
offline_store:
type: fileNow your batch training dataset and your online lookups share the same feature semantics. If you must compute streaming features, compute them once in a streaming job (e.g., Flink/Spark Structured Streaming) that also writes to both the offline table and the online store.
Ship it safely: GitOps, canaries, and rollbacks that actually roll back
If you’re still scp’ing models onto EC2, we need to talk.
- Deploy model servers declaratively with
KServe/Seldon Coreand manage viaArgoCD. - Version configuration alongside models; link the model registry version to the deployment.
- Use
Istiocanaries to ramp traffic with guardrails.
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: churn-model
spec:
hosts: ["churn.api.svc.cluster.local"]
http:
- route:
- destination: {host: churn-v2, subset: v2, port: {number: 9000}}
weight: 10
- destination: {host: churn-v1, subset: v1, port: {number: 9000}}
weight: 90Wire canary decisions to business metrics, not just model metrics. If conversion_rate or call_center_load tanks during the ramp, abort automatically.
Observe the pipeline and the business: one pane that matters
Telemetry that doesn’t tie to money is noise. Instrument from ingestion to decisions:
- Data Observability:
OpenLineage->Marquezto see what upstream changed.
# Airflow openlineage config
openlineage:
transport:
type: http
url: http://marquez:5000- Pipeline Metrics: expose Prometheus gauges for freshness and null ratios.
# metrics.py
from datetime import datetime
from prometheus_client import Gauge, start_http_server
freshness_g = Gauge("features_customers_freshness_seconds","Age of latest feature row")
null_ratio_g = Gauge("features_customers_null_ratio","Null ratio of critical cols")
start_http_server(9102)
# ... compute latest_event_time and null_ratio ...
freshness_g.set((datetime.utcnow() - latest_event_time).total_seconds())
null_ratio_g.set(null_ratio)- Streaming Health: watch Kafka consumer lag via
kafka_exporter; alert when lag threatens SLOs. - Model + Biz KPIs: export
prediction_latency_ms,pct_missing_features, and tie toretained_customers,avg_handle_timein Grafana.
When we installed this end-to-end at the retailer, MTTR for data incidents dropped from 10 hours to 90 minutes, freshness SLO went from 94% to 99.7%, and we stopped three bad deploys at 10% canary before customers noticed.
Monday morning plan (and what we’d do differently next time)
- Write down data SLOs for your top 5 datasets and publish them in the repo.
- Add
dbttests and aGreat Expectationsgate; fail the pipeline on breach. - Convert training datasets to
Delta/Iceberg; logversionAsOftoMLflow. - Stand up
Feastfor one use case; reuse features in both training and serving. - Deploy your model via GitOps with a 10%
Istiocanary; link rollout to business KPIs. - Add Prometheus metrics and OpenLineage; build a single Grafana dashboard that mixes data, model, and business signals.
What we’d change sooner: stop dual-maintaining feature logic in ad-hoc scripts. Centralize. Every minute keeping two versions in sync is a minute you’re not spending on a better model.
If your training and serving paths don’t share code, tests, and data contracts, you don’t have one model—you have two that disagree in production.
GitPlumbers plugs these holes quickly because we’ve broken our heads on them before—at scale, with real revenue on the line. If you’re tired of “just retrain it” as the answer, let’s fix the pipeline and make the model boring again.
Key takeaways
- Training and serving must read from the same validated, versioned data contracts or you’ll pay the training-serving skew tax forever.
- Set data SLOs (freshness, completeness, schema) and enforce them with `dbt` tests, `Great Expectations`, and lineage; fail fast before bad data hits models.
- Version everything: data (Delta/Iceberg time travel), features (Feast), code (Git), and models/params (MLflow).
- Deploy serving with GitOps and canary (Istio); wire data and model metrics into Prometheus to tie quality to business KPIs.
- Measure outcomes: cut MTTR for data incidents, improve freshness SLO attainment, and reduce bad-release rate via canaries.
Implementation checklist
- Define data SLOs: freshness, completeness, schema stability; publish them next to the dataset.
- Add `dbt` + `dbt-tests` (and `dbt-utils`) on your transformation layer; fail pipelines on test failure.
- Instrument ingestion and feature jobs with Prometheus metrics (freshness, row counts, null ratios).
- Use Delta/Iceberg time travel for training datasets; log the exact data version/hash to MLflow.
- Adopt Feast (or equivalent) to reuse feature logic in both batch (training) and online (serving).
- Orchestrate with Airflow/Argo; block training on validation gates and lineage completeness (OpenLineage/Marquez).
- Deploy model servers via GitOps (ArgoCD) and roll out with Istio canaries tied to business guardrails.
- Alert on SLO burn rates and Kafka consumer lag; page humans only when customer impact is imminent.
Questions we hear from teams
- Do I really need both dbt and Great Expectations?
- Use dbt tests for schema-level and relational checks close to your SQL transforms; add Great Expectations for richer distribution and field-level validations. They’re complementary. Start with dbt tests; layer GE where drift risk is high.
- Feast vs custom feature service?
- You can roll your own, but you’ll reimplement registry, materialization, and online/offline parity. Feast gives you consistent definitions and pluggable stores (Redis/Dynamo/Bigtable) with minimal ceremony. Start with Feast; customize only where latency or scale demands.
- Warehouse or lake for ML features?
- Use a lake with Delta/Iceberg for time travel and cost control. If your org is warehouse-heavy (Snowflake/BigQuery), it’s fine—just ensure versioning and reproducibility. The anti-pattern is ephemeral, unversioned training extracts.
- How do I tie model rollout to business guardrails?
- Emit business KPIs to Prometheus (or pull from your analytics system) and build canary rules that halt or roll back when KPIs degrade beyond thresholds. Pair with Istio for traffic ramp and Argo Rollouts/Seldon for orchestrated promotion.
- What measurable outcomes should I expect?
- Teams who implement this pattern typically see 50–80% reduction in MTTR for data incidents, 99%+ freshness attainment, 30–60% fewer bad model releases (thanks to canaries), and faster retrains (days to hours) due to reproducibility.
Ready to modernize your codebase?
Let GitPlumbers help you transform AI-generated chaos into clean, scalable applications.
