Stop Flying Blind: Data Lineage That Keeps Your AI From Burning Prod

If you can’t answer “Which data, code, and config produced this model output?” in under 60 seconds, you don’t have an AI system—you have a slot machine. Here’s the instrumentation blueprint I wish we’d had before the 3 a.m. incident call.

If you can’t trace a prediction back to data, code, and config, you’re not doing AI—you’re doing astrology.
Back to all posts

The outage that finally made us instrument everything

Two summers ago, a retail client’s LLM started quoting 2019 clearance prices in production. Same prompt, same user segment, wildly different answers. Pager went off at 2:07 a.m. We spent three hours guessing: was it the prompt template, the feature store backfill, the vector index swap, or the model pin? We had logs, but no lineage. The root cause was a silent feature-schema change in a backfilled parquet table plus a prompt tweak that bypassed a safety rule. We fixed the data and rolled back the template, but what actually fixed the incident was what we shipped after: end-to-end lineage with real guardrails.

If you can’t trace a prediction back to the dataset, code commit, configuration, and model artifact that produced it, you’re playing Russian roulette with your SLOs. Here’s what we now install on day one at GitPlumbers.

What “lineage” means for AI systems (training + inference)

Forget the slideware. Real lineage needs to capture:

  • Datasets: raw sources, feature tables, vector indexes; versions/hashes (e.g., s3://bucket/train/2025-10-01/sha256:...).
  • Code + config: git SHAs, Docker image digests, requirements.txt/poetry.lock, prompt template versions, inference routing rules.
  • Model artifacts: mlflow run IDs, model registry version, quantization flags (e.g., gguf q4_1), serving runtime (vLLM 0.5.4).
  • Runtime context: request/user segment (pseudonymized), feature vector hash, safety_policy_version, A/B or canary flag, region/cluster.
  • Results + evaluations: latency, token counts, content filter outcomes, offline/online eval scores.

And it needs to be queryable: “Show me all predictions produced by prompt_template=v23 using model=v1.12 fed by dataset=features/2025-10-01.” That’s not a log search. That’s a lineage graph.

We use a two-layer approach:

  • Traces as the spine (OpenTelemetry): every hop from request to model server is a span with shared trace_id.
  • Lineage events as enrichment (OpenLineage via Marquez/DataHub/Atlas): attach datasets, models, and versions to those spans.

Instrumentation blueprint: trace first, then enrich with lineage

The trick is minimal ceremony with maximal signal. A thin middleware does 80% of the work.

# app/inference_middleware.py
# Python 3.11, opentelemetry==1.26.0, openlineage-python==1.27.0
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from prometheus_client import Counter, Histogram
from openlineage.client import OpenLineageClient, RunEvent, RunState, Dataset,
    Run, Job, NominalTimeRunFacet
from datetime import datetime
import hashlib

REQUESTS = Counter('llm_requests_total', 'Total inference requests', ['model', 'template', 'policy'])
LATENCY = Histogram('llm_request_latency_seconds', 'Inference latency', ['model'])
HALLUCINATIONS = Counter('llm_hallucinations_total', 'Flagged hallucinations', ['model'])

ol = OpenLineageClient(url="http://marquez:5000")
tracer = trace.get_tracer(__name__)

def feature_hash(features: dict) -> str:
    return hashlib.sha256(str(sorted(features.items())).encode()).hexdigest()

async def infer_with_lineage(app, request, handler):
    ctx = request.headers
    model_v = ctx.get('x-model-version', 'gpt-4o-2024-10')
    template_v = ctx.get('x-prompt-template', 'v23')
    policy_v = ctx.get('x-safety-policy', 'lcg-1.2')
    start = datetime.utcnow()

    with tracer.start_as_current_span("inference") as span, LATENCY.labels(model_v).time():
        span.set_attribute("model.version", model_v)
        span.set_attribute("prompt.template", template_v)
        span.set_attribute("safety.policy", policy_v)

        payload = await request.json()
        feats = payload.get('features', {})
        fhash = feature_hash(feats)
        span.set_attribute("features.hash", fhash)

        REQUESTS.labels(model_v, template_v, policy_v).inc()

        response = await handler(request)  # call downstream model server
        flagged = response.headers.get('x-content-flag', 'ok') != 'ok'
        if flagged:
            HALLUCINATIONS.labels(model_v).inc()

        # Emit lineage (dataset + model) linked to the current span
        run_id = span.get_span_context().trace_id
        ol.emit(
            RunEvent(
                eventType=RunState.COMPLETE,
                eventTime=start.isoformat()+"Z",
                run=Run(str(run_id), facets={
                    'nominalTime': NominalTimeRunFacet(startTime=start.isoformat()+"Z", endTime=datetime.utcnow().isoformat()+"Z"),
                }),
                job=Job(namespace="inference", name="llm-api"),
                inputs=[Dataset(namespace="feature-store", name=f"reco_features@{fhash}")],
                outputs=[Dataset(namespace="models", name=f"{model_v}"), Dataset(namespace="prompts", name=f"template:{template_v}")],
            )
        )
        return response
  • Why this works: trace_id ties logs, metrics, and lineage. We enrich spans with versions and emit an OpenLineage event with inputs/outputs.
  • Where to run it: edge API, KServe/BentoML/vLLM gateway, or as a FastAPI/Express middleware.
  • What you get: one click from Grafana panel to the exact run, dataset, and template that produced a response.

Training pipeline: version everything, prove it happened

For training, we make lineage boring and deterministic.

  1. Version data and models
    • Data: DVC or lakeFS to content-address datasets; emit dataset_version.
    • Models: MLflow for params/artifacts, model registry; pin model_version in serving.
  2. Contract and validate
    • Schema constraints with Great Expectations (GX) or pandera; fail fast.
    • Data drift profiles with Evidently/WhyLabs on train/holdout.
  3. Emit lineage on every step
    • Airflow/Prefect/OpenLineage integration describes inputs/outputs; one Marquez/DataHub backend.
# airflow.cfg (excerpt)
[openlineage]
enabled = True
transport = http
url = http://marquez:5000
namespace = training

# docker-compose for Marquez (OpenLineage backend)
version: '3.8'
services:
  marquez:
    image: marquezproject/marquez:0.51.0
    environment:
      MARQUEZ_PORT: 5000
    ports: ["5000:5000"]
# dags/train_model.py (Airflow 2.9 + OpenLineage provider)
from airflow import DAG
from airflow.providers.openlineage.extractors import set_custom_facets
from datetime import datetime

with DAG("train_llm_reranker", start_date=datetime(2025,10,1), schedule="@daily") as dag:
    def prepare_features(**context):
        # produce s3://data/features/dt={{ ds }}
        pass

    def train(**context):
        # write MLflow run and model artifact
        pass

    # Attach lineage facets
    set_custom_facets({
        "promptTemplateVersion": {"version": "v23"},
        "safetyPolicyVersion": {"version": "lcg-1.2"}
    })
# Data versioning example with DVC
$ dvc add data/train.parquet
$ git add data/train.parquet.dvc .gitignore && git commit -m "version training data 2025-10-01"
$ dvc push

Tie it together in Marquez/DataHub and you can answer audit questions in minutes, not hours.

Inference pipeline: request-level lineage + guardrails

Production inference is where incidents happen. Instrument for reality, not theory.

  • IDs that matter
    • request_id, trace_id: propagate via headers.
    • model_version, dataset_version, feature_hash, prompt_template_version, safety_policy_version.
  • Guardrails
    • Schema: reject malformed input with pydantic.
    • Content: LlamaGuard, OpenAI moderation, or Azure AI Content Safety; log outcomes.
    • Eval hooks: run lightweight post-hoc checks (toxicity < threshold, json_valid=true).
    • Fallbacks: route to safer templates or smaller deterministic models on policy fail.
  • Routing flags
    • Canary and A/B with LaunchDarkly or Unleash; record flag values in spans.
# FastAPI guardrail sketch
from pydantic import BaseModel, Field
from fastapi import FastAPI, Request

class InferenceRequest(BaseModel):
    user_id: str
    query: str
    features: dict = Field(default_factory=dict)

@app.post("/generate")
async def generate(req: InferenceRequest, request: Request):
    # content safety check
    safe = await is_safe(req.query)
    if not safe:
        return {"output": "Sorry, can’t help with that.", "policy": "blocked"}
    # route by flag
    if ld_client.variation("llm_canary_v2", key=req.user_id, default=False):
        model = "gpt-4o-2024-10"
        template = "v24"
    else:
        model = "gpt-4o-2024-06"
        template = "v23"
    # set headers so middleware emits lineage
    request._headers["x-model-version"] = model
    request._headers["x-prompt-template"] = template
    request._headers["x-safety-policy"] = "lcg-1.2"
    return await call_model(req, model, template)
  • SLOs and alerts
    • P90 latency < 350ms, non-2xx < 1%, hallucinations flagged < 0.5%.
# Prometheus
histogram_quantile(0.90, sum(rate(llm_request_latency_seconds_bucket[5m])) by (le))
sum(rate(llm_hallucinations_total[5m])) / sum(rate(llm_requests_total[5m]))

Catching the big three: hallucination, drift, latency spikes

I’ve seen these failure modes in every AI stack since 2020. Lineage makes them diagnosable.

  • Hallucination
    • Symptom: sudden spike in “nonsense” answers post-deploy.
    • Typical cause: prompt/template change, missing grounding doc, content filter bypass.
    • Mitigation:
      • Pin prompt_template_version and record in lineage.
      • Add retrieval lineage: document IDs and index version used.
      • Canary templates and auto-rollback when hallucinations/requests crosses SLO for 10m.
# Istio canary with circuit breaking
apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata: {name: llm}
spec:
  host: llm.svc.cluster.local
  trafficPolicy:
    outlierDetection:
      consecutive5xxErrors: 5
      interval: 5s
      baseEjectionTime: 1m
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata: {name: llm}
spec:
  hosts: ["llm.example.com"]
  http:
  - route:
    - destination: {host: llm.svc.cluster.local, subset: stable}
      weight: 90
    - destination: {host: llm.svc.cluster.local, subset: canary}
      weight: 10
    timeout: 2s
    retries: {attempts: 1, perTryTimeout: 800ms}
  • Drift
    • Symptom: gradual accuracy decay; support tickets rise; guardrails “feel” stricter.
    • Typical cause: upstream feature drift, seasonal effects, schema changes.
    • Mitigation:
      • Online monitors (Evidently, WhyLabs) compare live feature distributions to training baseline.
      • Gate training jobs on passing drift checks; emit lineage linking drift report to dataset.
# Nightly drift job pseudo
python drift_check.py --live s3://features/live/2025-11-08 \
  --baseline s3://features/train/2025-10-01 \
  --out s3://reports/drift/2025-11-08.json && \
openlineage emit --job drift_check --input features/live@2025-11-08 --output reports/drift@2025-11-08
  • Latency spikes
    • Symptom: P99 jumps to 2s; tokens-per-second nosedives.
    • Typical cause: vector index compaction, cold pods, noisy neighbor on the GPU, or upstream rate limit.
    • Mitigation:
      • Record index version and knn_params in lineage; correlate spikes to index rebuilds.
      • Autoscale on RPS and queue depth; warm spares.
      • Circuit-break to a smaller model or cached answer when > P99 SLO for 5m.

Rollout plan: two weeks to stop guessing

You don’t need a platform team of 50. You need focus.

  • Day 1–3
    • Deploy Marquez. Turn on OpenLineage in Airflow/Prefect and your model gateway.
    • Add OpenTelemetry SDKs and propagate trace_id across services.
  • Day 4–6
    • Version datasets with lakeFS or DVC; record dataset_version in spans.
    • Register models in MLflow. Pin versions in serving (KServe, BentoML, or vLLM).
  • Day 7–9
    • Add input schema validation and content safety checks; log outcomes as metrics.
    • Create Grafana dashboards for latency, error rate, and hallucinations.
  • Day 10–14
    • Introduce canary routing via Istio and a feature flag. Set SLOs + alerts.
    • Backfill lineage for last 30 days to seed the graph.

At the end, you can answer: “Why did user X get output Y?” with a link, not a hunch.

What I’d do differently if starting today

  • Make lineage IDs part of the API contract: requests without trace_id and model_version get rejected.
  • Keep one graph: don’t split lineage across multiple catalogs. Pick Marquez or DataHub and stick to it.
  • Evaluate in prod, safely: shadow traffic to new templates/models, record lineage separately, and promote only when eval SLOs are met.
  • Budget for storage: retain lineage events 90 days hot, 1 year cold; PII redaction at the edge.
  • Automate audits: nightly job dumps “top 100 errors” with lineage links for review.

The goal isn’t pretty diagrams. It’s answering hard questions fast, under pressure, with evidence.

If this sounds like the kind of plumbing you want done once and done right, we’ve built and rescued these pipelines at fintechs, marketplaces, and healthcare companies where compliance isn’t optional. GitPlumbers can wire this up without turning your stack into a science project.

Related Resources

Key takeaways

  • Lineage must span training and inference, not just ETL: datasets, features, model artifacts, prompts, and runtime configs are all first-class citizens.
  • Use OpenTelemetry traces as the spine and OpenLineage events to enrich spans with datasets, versions, and model metadata.
  • Standardize IDs: request_id, trace_id, model_version, dataset_version, feature_hash, prompt_template_version, and safety_policy_version.
  • Instrument guardrails as code: schema checks, content filters, drift monitors, canaries, and circuit breakers with observable outcomes.
  • Make lineage queryable and boring: one graph (Marquez/DataHub), one tracing backend, and SLO dashboards wired to alerts.

Implementation checklist

  • Define lineage entities and IDs for training and inference.
  • Deploy Marquez (OpenLineage) and connect Airflow/Prefect, Spark/Flink, and model servers.
  • Instrument OpenTelemetry tracing at every hop; propagate context across services.
  • Version datasets (DVC/lakeFS) and models (MLflow); emit lineage on every run/event.
  • Add guardrails: data contracts (GX), content filters, drift monitors (Evidently/WhyLabs).
  • Wire SLOs to Prometheus/Grafana; add canaries and circuit breakers (Istio).
  • Operationalize: runbooks, backfills with lineage, and audit-ready storage policies.

Questions we hear from teams

Do we need both OpenTelemetry and OpenLineage?
Yes. OTel gives you request/trace context, timing, and service maps. OpenLineage describes data and model inputs/outputs with versions. Together they let you answer both “what happened” and “what it used.”
Can we start without a data catalog?
Start with Marquez. It’s lightweight and purpose-built for OpenLineage. If you already run DataHub/Atlas, you can integrate OpenLineage there—just keep one source of truth.
How do we avoid storing sensitive content in lineage?
Hash or tokenize PII at the edge, store only IDs and hashes in lineage, and gate raw payloads behind audit roles with strict retention. Keep prompts/templates/version IDs—not raw user text—unless required for regulated investigations.

Ready to modernize your codebase?

Let GitPlumbers help you transform AI-generated chaos into clean, scalable applications.

Talk to GitPlumbers about wiring lineage into your AI stack Download the lineage + observability checklist

Related resources