Stop Flying Blind: Data Lineage That Keeps Your AI From Burning Prod
If you can’t answer “Which data, code, and config produced this model output?” in under 60 seconds, you don’t have an AI system—you have a slot machine. Here’s the instrumentation blueprint I wish we’d had before the 3 a.m. incident call.
If you can’t trace a prediction back to data, code, and config, you’re not doing AI—you’re doing astrology.Back to all posts
The outage that finally made us instrument everything
Two summers ago, a retail client’s LLM started quoting 2019 clearance prices in production. Same prompt, same user segment, wildly different answers. Pager went off at 2:07 a.m. We spent three hours guessing: was it the prompt template, the feature store backfill, the vector index swap, or the model pin? We had logs, but no lineage. The root cause was a silent feature-schema change in a backfilled parquet table plus a prompt tweak that bypassed a safety rule. We fixed the data and rolled back the template, but what actually fixed the incident was what we shipped after: end-to-end lineage with real guardrails.
If you can’t trace a prediction back to the dataset, code commit, configuration, and model artifact that produced it, you’re playing Russian roulette with your SLOs. Here’s what we now install on day one at GitPlumbers.
What “lineage” means for AI systems (training + inference)
Forget the slideware. Real lineage needs to capture:
- Datasets: raw sources, feature tables, vector indexes; versions/hashes (e.g.,
s3://bucket/train/2025-10-01/sha256:...). - Code + config: git SHAs, Docker image digests,
requirements.txt/poetry.lock, prompt template versions, inference routing rules. - Model artifacts:
mlflowrun IDs, model registry version, quantization flags (e.g.,gguf q4_1), serving runtime (vLLM 0.5.4). - Runtime context: request/user segment (pseudonymized), feature vector hash,
safety_policy_version, A/B or canary flag, region/cluster. - Results + evaluations: latency, token counts, content filter outcomes, offline/online eval scores.
And it needs to be queryable: “Show me all predictions produced by prompt_template=v23 using model=v1.12 fed by dataset=features/2025-10-01.” That’s not a log search. That’s a lineage graph.
We use a two-layer approach:
- Traces as the spine (
OpenTelemetry): every hop from request to model server is a span with sharedtrace_id. - Lineage events as enrichment (
OpenLineagevia Marquez/DataHub/Atlas): attach datasets, models, and versions to those spans.
Instrumentation blueprint: trace first, then enrich with lineage
The trick is minimal ceremony with maximal signal. A thin middleware does 80% of the work.
# app/inference_middleware.py
# Python 3.11, opentelemetry==1.26.0, openlineage-python==1.27.0
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from prometheus_client import Counter, Histogram
from openlineage.client import OpenLineageClient, RunEvent, RunState, Dataset,
Run, Job, NominalTimeRunFacet
from datetime import datetime
import hashlib
REQUESTS = Counter('llm_requests_total', 'Total inference requests', ['model', 'template', 'policy'])
LATENCY = Histogram('llm_request_latency_seconds', 'Inference latency', ['model'])
HALLUCINATIONS = Counter('llm_hallucinations_total', 'Flagged hallucinations', ['model'])
ol = OpenLineageClient(url="http://marquez:5000")
tracer = trace.get_tracer(__name__)
def feature_hash(features: dict) -> str:
return hashlib.sha256(str(sorted(features.items())).encode()).hexdigest()
async def infer_with_lineage(app, request, handler):
ctx = request.headers
model_v = ctx.get('x-model-version', 'gpt-4o-2024-10')
template_v = ctx.get('x-prompt-template', 'v23')
policy_v = ctx.get('x-safety-policy', 'lcg-1.2')
start = datetime.utcnow()
with tracer.start_as_current_span("inference") as span, LATENCY.labels(model_v).time():
span.set_attribute("model.version", model_v)
span.set_attribute("prompt.template", template_v)
span.set_attribute("safety.policy", policy_v)
payload = await request.json()
feats = payload.get('features', {})
fhash = feature_hash(feats)
span.set_attribute("features.hash", fhash)
REQUESTS.labels(model_v, template_v, policy_v).inc()
response = await handler(request) # call downstream model server
flagged = response.headers.get('x-content-flag', 'ok') != 'ok'
if flagged:
HALLUCINATIONS.labels(model_v).inc()
# Emit lineage (dataset + model) linked to the current span
run_id = span.get_span_context().trace_id
ol.emit(
RunEvent(
eventType=RunState.COMPLETE,
eventTime=start.isoformat()+"Z",
run=Run(str(run_id), facets={
'nominalTime': NominalTimeRunFacet(startTime=start.isoformat()+"Z", endTime=datetime.utcnow().isoformat()+"Z"),
}),
job=Job(namespace="inference", name="llm-api"),
inputs=[Dataset(namespace="feature-store", name=f"reco_features@{fhash}")],
outputs=[Dataset(namespace="models", name=f"{model_v}"), Dataset(namespace="prompts", name=f"template:{template_v}")],
)
)
return response- Why this works:
trace_idties logs, metrics, and lineage. We enrich spans with versions and emit an OpenLineage event with inputs/outputs. - Where to run it: edge API, KServe/BentoML/vLLM gateway, or as a FastAPI/Express middleware.
- What you get: one click from Grafana panel to the exact run, dataset, and template that produced a response.
Training pipeline: version everything, prove it happened
For training, we make lineage boring and deterministic.
- Version data and models
- Data:
DVCorlakeFSto content-address datasets; emitdataset_version. - Models:
MLflowfor params/artifacts, model registry; pinmodel_versionin serving.
- Data:
- Contract and validate
- Schema constraints with
Great Expectations(GX) orpandera; fail fast. - Data drift profiles with
Evidently/WhyLabson train/holdout.
- Schema constraints with
- Emit lineage on every step
- Airflow/Prefect/OpenLineage integration describes inputs/outputs; one Marquez/DataHub backend.
# airflow.cfg (excerpt)
[openlineage]
enabled = True
transport = http
url = http://marquez:5000
namespace = training
# docker-compose for Marquez (OpenLineage backend)
version: '3.8'
services:
marquez:
image: marquezproject/marquez:0.51.0
environment:
MARQUEZ_PORT: 5000
ports: ["5000:5000"]# dags/train_model.py (Airflow 2.9 + OpenLineage provider)
from airflow import DAG
from airflow.providers.openlineage.extractors import set_custom_facets
from datetime import datetime
with DAG("train_llm_reranker", start_date=datetime(2025,10,1), schedule="@daily") as dag:
def prepare_features(**context):
# produce s3://data/features/dt={{ ds }}
pass
def train(**context):
# write MLflow run and model artifact
pass
# Attach lineage facets
set_custom_facets({
"promptTemplateVersion": {"version": "v23"},
"safetyPolicyVersion": {"version": "lcg-1.2"}
})# Data versioning example with DVC
$ dvc add data/train.parquet
$ git add data/train.parquet.dvc .gitignore && git commit -m "version training data 2025-10-01"
$ dvc pushTie it together in Marquez/DataHub and you can answer audit questions in minutes, not hours.
Inference pipeline: request-level lineage + guardrails
Production inference is where incidents happen. Instrument for reality, not theory.
- IDs that matter
request_id,trace_id: propagate via headers.model_version,dataset_version,feature_hash,prompt_template_version,safety_policy_version.
- Guardrails
- Schema: reject malformed input with
pydantic. - Content:
LlamaGuard,OpenAI moderation, orAzure AI Content Safety; log outcomes. - Eval hooks: run lightweight post-hoc checks (
toxicity < threshold,json_valid=true). - Fallbacks: route to safer templates or smaller deterministic models on policy fail.
- Schema: reject malformed input with
- Routing flags
- Canary and A/B with
LaunchDarklyorUnleash; record flag values in spans.
- Canary and A/B with
# FastAPI guardrail sketch
from pydantic import BaseModel, Field
from fastapi import FastAPI, Request
class InferenceRequest(BaseModel):
user_id: str
query: str
features: dict = Field(default_factory=dict)
@app.post("/generate")
async def generate(req: InferenceRequest, request: Request):
# content safety check
safe = await is_safe(req.query)
if not safe:
return {"output": "Sorry, can’t help with that.", "policy": "blocked"}
# route by flag
if ld_client.variation("llm_canary_v2", key=req.user_id, default=False):
model = "gpt-4o-2024-10"
template = "v24"
else:
model = "gpt-4o-2024-06"
template = "v23"
# set headers so middleware emits lineage
request._headers["x-model-version"] = model
request._headers["x-prompt-template"] = template
request._headers["x-safety-policy"] = "lcg-1.2"
return await call_model(req, model, template)- SLOs and alerts
- P90 latency < 350ms, non-2xx < 1%, hallucinations flagged < 0.5%.
# Prometheus
histogram_quantile(0.90, sum(rate(llm_request_latency_seconds_bucket[5m])) by (le))
sum(rate(llm_hallucinations_total[5m])) / sum(rate(llm_requests_total[5m]))Catching the big three: hallucination, drift, latency spikes
I’ve seen these failure modes in every AI stack since 2020. Lineage makes them diagnosable.
- Hallucination
- Symptom: sudden spike in “nonsense” answers post-deploy.
- Typical cause: prompt/template change, missing grounding doc, content filter bypass.
- Mitigation:
- Pin
prompt_template_versionand record in lineage. - Add retrieval lineage: document IDs and index version used.
- Canary templates and auto-rollback when
hallucinations/requestscrosses SLO for 10m.
- Pin
# Istio canary with circuit breaking
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata: {name: llm}
spec:
host: llm.svc.cluster.local
trafficPolicy:
outlierDetection:
consecutive5xxErrors: 5
interval: 5s
baseEjectionTime: 1m
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata: {name: llm}
spec:
hosts: ["llm.example.com"]
http:
- route:
- destination: {host: llm.svc.cluster.local, subset: stable}
weight: 90
- destination: {host: llm.svc.cluster.local, subset: canary}
weight: 10
timeout: 2s
retries: {attempts: 1, perTryTimeout: 800ms}- Drift
- Symptom: gradual accuracy decay; support tickets rise; guardrails “feel” stricter.
- Typical cause: upstream feature drift, seasonal effects, schema changes.
- Mitigation:
- Online monitors (
Evidently,WhyLabs) compare live feature distributions to training baseline. - Gate training jobs on passing drift checks; emit lineage linking drift report to dataset.
- Online monitors (
# Nightly drift job pseudo
python drift_check.py --live s3://features/live/2025-11-08 \
--baseline s3://features/train/2025-10-01 \
--out s3://reports/drift/2025-11-08.json && \
openlineage emit --job drift_check --input features/live@2025-11-08 --output reports/drift@2025-11-08- Latency spikes
- Symptom: P99 jumps to 2s; tokens-per-second nosedives.
- Typical cause: vector index compaction, cold pods, noisy neighbor on the GPU, or upstream rate limit.
- Mitigation:
- Record index version and
knn_paramsin lineage; correlate spikes to index rebuilds. - Autoscale on RPS and queue depth; warm spares.
- Circuit-break to a smaller model or cached answer when > P99 SLO for 5m.
- Record index version and
Rollout plan: two weeks to stop guessing
You don’t need a platform team of 50. You need focus.
- Day 1–3
- Deploy Marquez. Turn on OpenLineage in Airflow/Prefect and your model gateway.
- Add OpenTelemetry SDKs and propagate
trace_idacross services.
- Day 4–6
- Version datasets with
lakeFSorDVC; recorddataset_versionin spans. - Register models in
MLflow. Pin versions in serving (KServe,BentoML, orvLLM).
- Version datasets with
- Day 7–9
- Add input schema validation and content safety checks; log outcomes as metrics.
- Create Grafana dashboards for latency, error rate, and hallucinations.
- Day 10–14
- Introduce canary routing via
Istioand a feature flag. Set SLOs + alerts. - Backfill lineage for last 30 days to seed the graph.
- Introduce canary routing via
At the end, you can answer: “Why did user X get output Y?” with a link, not a hunch.
What I’d do differently if starting today
- Make lineage IDs part of the API contract: requests without
trace_idandmodel_versionget rejected. - Keep one graph: don’t split lineage across multiple catalogs. Pick Marquez or DataHub and stick to it.
- Evaluate in prod, safely: shadow traffic to new templates/models, record lineage separately, and promote only when eval SLOs are met.
- Budget for storage: retain lineage events 90 days hot, 1 year cold; PII redaction at the edge.
- Automate audits: nightly job dumps “top 100 errors” with lineage links for review.
The goal isn’t pretty diagrams. It’s answering hard questions fast, under pressure, with evidence.
If this sounds like the kind of plumbing you want done once and done right, we’ve built and rescued these pipelines at fintechs, marketplaces, and healthcare companies where compliance isn’t optional. GitPlumbers can wire this up without turning your stack into a science project.
Key takeaways
- Lineage must span training and inference, not just ETL: datasets, features, model artifacts, prompts, and runtime configs are all first-class citizens.
- Use OpenTelemetry traces as the spine and OpenLineage events to enrich spans with datasets, versions, and model metadata.
- Standardize IDs: request_id, trace_id, model_version, dataset_version, feature_hash, prompt_template_version, and safety_policy_version.
- Instrument guardrails as code: schema checks, content filters, drift monitors, canaries, and circuit breakers with observable outcomes.
- Make lineage queryable and boring: one graph (Marquez/DataHub), one tracing backend, and SLO dashboards wired to alerts.
Implementation checklist
- Define lineage entities and IDs for training and inference.
- Deploy Marquez (OpenLineage) and connect Airflow/Prefect, Spark/Flink, and model servers.
- Instrument OpenTelemetry tracing at every hop; propagate context across services.
- Version datasets (DVC/lakeFS) and models (MLflow); emit lineage on every run/event.
- Add guardrails: data contracts (GX), content filters, drift monitors (Evidently/WhyLabs).
- Wire SLOs to Prometheus/Grafana; add canaries and circuit breakers (Istio).
- Operationalize: runbooks, backfills with lineage, and audit-ready storage policies.
Questions we hear from teams
- Do we need both OpenTelemetry and OpenLineage?
- Yes. OTel gives you request/trace context, timing, and service maps. OpenLineage describes data and model inputs/outputs with versions. Together they let you answer both “what happened” and “what it used.”
- Can we start without a data catalog?
- Start with Marquez. It’s lightweight and purpose-built for OpenLineage. If you already run DataHub/Atlas, you can integrate OpenLineage there—just keep one source of truth.
- How do we avoid storing sensitive content in lineage?
- Hash or tokenize PII at the edge, store only IDs and hashes in lineage, and gate raw payloads behind audit roles with strict retention. Keep prompts/templates/version IDs—not raw user text—unless required for regulated investigations.
Ready to modernize your codebase?
Let GitPlumbers help you transform AI-generated chaos into clean, scalable applications.
