Lineage Or Die: The Quiet Control Plane That Keeps Your AI From Lying In Prod
If you can’t trace what trained your model or what fed your last inference, you’re not running AI—you’re running a slot machine. Here’s the instrumentation, lineage, and guardrails that actually work.
If you can’t tie an answer to the data and prompt that produced it, you’re not doing AI—you’re guessing with better syntax.Back to all posts
The outage you’ve already lived through
Three quarters into a RAG rollout, we watched a search-to-answer service go sideways after a seemingly innocent backfill. Features updated, a new embedding index rolled out, and latency crept from 120ms p50 to 1.8s. Hallucinations spiked. Product swore it was the model. Data swore it was the retriever. Infra swore it was the mesh. The truth? No one could trace a single answer from click to corpus.
We had no consistent lineage: no dataset_id on the features, no prompt_template_version in logs, no retrieval_index_sha in spans. We were debugging vibes. Two weeks later we put a lineage spine in place. The next incident? Ten minutes to root cause, five to rollback. This is the difference between hope and operations.
What lineage means for AI (training and inference)
Forget the pretty lineage graph in a deck. In production, lineage is the join key between:
- Training: raw sources -> transforms -> feature store snapshot -> model artifact + hyperparams
- Inference: request context -> feature reads -> retriever docs -> prompt/template -> model version -> output + post-process
It must be:
- Versioned: immutable IDs for datasets (
dataset_id), feature views (feature_view:ver), indexes (corpus_sha), prompts (prompt_v3.2), and models (model:sha256). - Propagated: carried across jobs, services, queues, and traces.
- Queryable: searchable in a catalog (Marquez/DataHub) and joinable with metrics and logs.
Why it matters when things go wrong:
- Hallucination: If you can’t tie an answer to the documents retrieved and the prompt version, you can’t tell if it’s a model issue or retrieval/grounding.
- Drift: Without feature and dataset versioning, you can’t correlate KPI drops with upstream schema changes or distribution shifts.
- Latency spikes: If spans don’t carry
corpus_shaorfeature_view, you won’t see that a larger index or cold path is to blame.
Pick a lineage spine and tag everything
You need a standard. I’ve seen DIY JSON blobs rot in six months. What sticks:
- Lineage protocol:
OpenLineage(emits well-defined events). Alternatives:Apache Atlas,DataHub. - Catalog:
Marquez(OpenLineage reference implementation) orDataHubfor broader metadata. - Orchestrator integration:
Airflow(viaopenlineage-airflow),Dagster(native),Spark(openlineage-spark).
Minimal Airflow wiring:
pip install openlineage-airflow
export OPENLINEAGE_URL="https://marquez.yourco.internal:5000"
export OPENLINEAGE_NAMESPACE="ai-prod"
export OPENLINEAGE_API_KEY="…"# dags/feature_build.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# OpenLineage picks up inlets/outlets automatically if you use providers that support it.
# Otherwise, annotate explicitly with TaskFlow API or custom lineage facets.
def build_features(**context):
# Your feature code
pass
dag = DAG(
dag_id="feature_build",
start_date=datetime(2024, 1, 1),
schedule_interval="0 * * * *",
catchup=False,
)
PythonOperator(task_id="build", python_callable=build_features, dag=dag)Spark job lineage (no code change, just submit with the agent):
spark-submit \
--packages io.openlineage:openlineage-spark:1.12.0 \
--conf spark.openlineage.transport.type=http \
--conf spark.openlineage.transport.url=$OPENLINEAGE_URL \
your_job.pyNow you have a graph of datasets, jobs, and runs. The trick is to include custom facets with the IDs you care about: dataset_id, feature_view, corpus_sha, prompt_template, model_version.
Training: version every hop and emit lineage
If it isn’t versioned, it didn’t happen.
- Datasets: Use
DVCorlakeFSto version raw and curated datasets. Emit the content-addressed hash asdataset_id. - Data quality:
Great Expectationschecks before writes; fail closed. - Features: Use
Feastand snapshot the view per training run. - Models: Track with
MLflowmodel registry; log params, metrics, and lineage facets.
Example: saving a training run with lineage in MLflow and pushing to OpenLineage:
# training/run.py
import mlflow, json, os
from openlineage.client import OpenLineageClient
mlflow.set_tracking_uri(os.getenv("MLFLOW_TRACKING_URI"))
run = mlflow.start_run(run_name="ctr_model_v42")
lineage = {
"dataset_id": "s3://ads/raw/2024-10-01#sha256:6a2f…",
"feature_view": "ctr_features:v7",
"prompt_template": None,
"model_version": None
}
mlflow.log_params({"learning_rate": 0.002, "tree_depth": 6})
mlflow.log_dict(lineage, "lineage.json")
# After training
mlflow.sklearn.log_model(model, "model")
model_version = "ctr_model@sha256:9f31…"
client = OpenLineageClient.from_environment()
client.emit(
client.new_run(
job_name="train_ctr",
run_id=run.info.run_id,
inputs=[{"namespace":"s3","name": lineage["dataset_id"]}],
outputs=[{"namespace":"mlflow","name": model_version}],
facets={"git": {"repository":"git@github.com:yourco/ads","revision":"abc123"}}
)
)
mlflow.end_run()Great Expectations as a gate:
# great_expectations/checkpoints/train.yml
name: train_quality_gate
validations:
- batch_request:
datasource_name: ads_curated
data_asset_name: ctr_training
expectation_suite_name: ctr_suite_v3
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: stop_pipeline_on_failure
action:
class_name: FailPipelineActionIf the suite fails, the training job fails. You do not build new features or models on bad data. Sounds obvious. I’ve seen teams skip this and ship regressions because “it passed unit tests”.
Inference: propagate context, measure, and guard the hot path
This is where most teams faceplant. You need to carry lineage across the entire request. At minimum, propagate:
request_id,user_id(or cohort),geodataset_idorfeature_snapshot_ts(for batch features)feature_view,model_version- For RAG:
retrieval_indexorcorpus_sha,doc_ids,prompt_template,temperature,max_tokens
Instrument with OpenTelemetry and structured logs:
# inference/app.py
from fastapi import FastAPI, Request
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
import json, time
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
tracer = trace.get_tracer(__name__)
MODEL_VERSION = "gpt-4o-mini@2025-09-15"
PROMPT_V = "support_prompt_v3.2"
CORPUS_SHA = "sha256:12ab…"
@app.post("/answer")
async def answer(req: Request):
body = await req.json()
request_id = body.get("request_id")
with tracer.start_as_current_span("inference") as span:
span.set_attribute("model.version", MODEL_VERSION)
span.set_attribute("prompt.template", PROMPT_V)
span.set_attribute("retrieval.corpus_sha", CORPUS_SHA)
span.set_attribute("user.cohort", body.get("cohort", "unknown"))
t0 = time.time()
docs = retrieve(body["query"]) # returns [(doc_id, score), ...]
span.set_attribute("retrieval.doc_ids", ",".join(d[0] for d in docs[:5]))
answer, conf = generate(body["query"], docs, PROMPT_V, MODEL_VERSION)
latency = time.time() - t0
span.set_attribute("inference.latency_ms", int(latency*1000))
log = {
"request_id": request_id,
"model_version": MODEL_VERSION,
"prompt_template": PROMPT_V,
"retrieval_corpus": CORPUS_SHA,
"doc_ids": [d[0] for d in docs],
"confidence": conf,
"latency_ms": int(latency*1000),
}
print(json.dumps(log))
# Guardrail: abstain if low confidence
if conf < 0.45:
return {"answer": None, "fallback": "human_handoff", "request_id": request_id}
return {"answer": answer, "request_id": request_id}Add a simple output safety filter. I like to start with regex + policy and graduate to NeMo Guardrails or Llama Guard:
import re
PII_RE = re.compile(r"(\d{3}-\d{2}-\d{4}|\b\d{16}\b)")
def safe_output(txt: str) -> bool:
return not bool(PII_RE.search(txt))And don’t forget caching and circuit breaking. A noisy upstream embedding service will take you down if you don’t have an Envoy/Istio circuit breaker.
# istio/virtualservice.yaml
apiVersion: networking.istio.io/v1beta1
type: VirtualService
spec:
hosts: [embedding-svc]
http:
- route:
- destination: { host: embedding-svc.default.svc.cluster.local }
timeout: 2s
retries: { attempts: 2, perTryTimeout: 500ms }
fault:
abort: { percentage: { value: 0.0 }, httpStatus: 0 }
routeHeaders: {}
trafficPolicy:
connectionPool:
http: { http1MaxPendingRequests: 50 }
outlierDetection:
consecutive5xx: 5
interval: 1s
baseEjectionTime: 30sObservability and guardrails: measure what you can roll back on
Define SLOs before the incident. Make them visible to the pager.
- Latency SLO: p95 < 500ms. Alert if burn rate > 2x over 10m.
- Answer quality proxy: confidence/grounding rate, refusal rate, post-filter hit rate.
- Retrieval health: docs per query, average recall from canary set.
Prometheus rules:
# prometheus/alerts.yaml
groups:
- name: ai-inference
rules:
- alert: InferenceLatencySLOBurn
expr: (
histogram_quantile(0.95, sum(rate(http_server_request_duration_seconds_bucket{service="inference"}[5m])) by (le)) > 0.5
)
and (
sum(rate(http_server_requests_seconds_count{service="inference"}[5m])) > 20
)
for: 10m
labels: { severity: page }
annotations:
summary: p95 latency over SLO for 10m
- alert: LowConfidenceSpike
expr: rate(ai_answer_confidence_bucket{le="0.5"}[10m]) / rate(ai_answer_confidence_count[10m]) > 0.2
for: 10m
labels: { severity: page }
annotations:
summary: Low-confidence answers exceed 20%Canary with Argo Rollouts using a metric threshold:
# rollouts/canary.yaml
apiVersion: argoproj.io/v1alpha1
kind: Rollout
metadata:
name: inference-rollout
spec:
strategy:
canary:
steps:
- setWeight: 10
- pause: { duration: 300 }
- analysis:
templates:
- templateName: latency-check
args:
- name: slo
value: "0.5"
trafficRouting:
istio: { virtualService: { name: inference-vs, routes: [primary] } }Tie it back to lineage: your alerts should include model_version, corpus_sha, and prompt_template. If canary fails, you immediately know what to roll back.
Drift and hallucination: detect, attribute, and respond
Don’t wait for users to tweet your failures. Automate drift checks and ground outputs.
- Data drift: Schedule Evidently to compare production feature distributions vs training.
- Concept drift: Track label performance on golden sets (Yes, build golden sets. Even for LLMs—use human-rated answers.)
- Hallucination control: Log retrieval doc IDs and compute grounding rate (e.g., percentage of answers with citations that match the corpus).
Evidently sample job:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
import pandas as pd
train = pd.read_parquet("s3://ads/curated/train_2024_10_01.parquet")
prod = pd.read_parquet("s3://ads/prod/features_rolling.parquet")
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=train, current_data=prod)
summary = report.as_dict()
if summary["metrics"][0]["result"]["dataset_drift"]:
raise SystemExit("Drift detected: gating deploy")Gate merges in CI if drift exceeds threshold. Yes, your PM will complain “it slows us down.” What slows you down is a week of firefighting after silent drift.
For hallucinations, simple rule: if confidence < x or grounding_rate < y, abstain or fallback to a smaller, faster model with stricter prompts. Add a circuit breaker that routes to a static FAQ when the retriever is unhealthy.
What ‘good’ looks like (and how we got there fast)
On a recent GitPlumbers engagement, we took a flailing LLM support assistant from “random vibes” to “predictable system” in three sprints:
- Stood up
Marquezand tagged Airflow + Spark withOpenLineage. - Versioned datasets with
lakeFS, features withFeast, models withMLflow. - Instrumented inference with
OTeland added abstain + PII guards. - Defined p95 latency and grounding SLOs; wired Prometheus/Alertmanager.
- Added Evidently drift checks and
Argo Rolloutscanary gates.
Results in 30 days:
- MTTR from 4h to 15m for inference incidents.
- p95 latency down 42% after killing a bloated index we could finally see.
- Hallucination complaints dropped 60% after adding abstain and doc citations.
- Cost per answer down 23% via dynamic fallback when confidence low.
If you’re here, you already know: lineage isn’t a dashboard. It’s the control plane for your AI.
Key takeaways
- Pick a lineage spine (e.g., OpenLineage) and tag every hop—training and inference—with stable IDs.
- Version everything: datasets, features, prompts, embeddings, and model artifacts; push those versions into logs and traces.
- Wire OTel in your inference services and propagate correlation IDs so you can replay, debug, and rollback fast.
- Put safety guardrails on the hot path: content filters, confidence thresholds, circuit breakers, canaries, and cost/latency budgets.
- Automate drift and data quality checks (Evidently/Great Expectations) and gate deploys with them, not vibes.
- Make lineage queryable in a catalog (Marquez/DataHub) and link it to your SLOs and dashboards.
Implementation checklist
- Adopt `OpenLineage` and stand up `Marquez` or `DataHub`.
- Emit lineage from your orchestrator (`Airflow`/`Dagster`/`Prefect`) and from Spark/DBT jobs.
- Version datasets with `DVC` or `lakeFS`; version features with `Feast`; track models with `MLflow`.
- Instrument inference with `OpenTelemetry` and structured logs; include `dataset_id`, `feature_view`, `model_version`, `prompt_template`, `retrieval_corpus`.
- Add `Great Expectations` in training; add `Evidently` drift checks on a schedule and in CI.
- Enforce guardrails: redaction, policy checks (OPA/Rego), confidence thresholds with abstain, and circuit breakers in `Istio`.
- Define SLOs and alerts (Prometheus) for latency, error rate, and hallucination proxies; use `Argo Rollouts` for canaries.
Questions we hear from teams
- What’s the fastest path to lineage if we’re already in production?
- Stand up Marquez (or DataHub) in a day, wire OpenLineage to your orchestrator (Airflow/Dagster) and Spark, and add OTel + structured logs to the inference service. Start by emitting `model_version`, `prompt_template`, `corpus_sha`, and `dataset_id`. You’ll get immediate incident response benefits without boiling the ocean.
- How do we measure hallucinations objectively?
- Use proxies: confidence scoring, citation/grounding rate (answers backed by retrieved documents), refusal rate, and golden set evaluations. Tie these metrics to lineages—prompt version, corpus SHA, model version—so you can attribute changes and roll back the right thing.
- Is OpenLineage overkill for a small team?
- No. The protocol is lightweight and the Airflow/Spark integrations are low-friction. Even a single service benefits from consistent IDs and a queryable lineage store. You can start small and add custom facets as your pipeline grows.
- Where should guardrails live: app, gateway, or model?
- All three. Put PII redaction and policy checks in the app layer (cheap), circuit breakers and rate limits in the mesh/gateway, and model-specific constraints (e.g., NeMo Guardrails) at generation. Defense in depth prevents one bad hop from taking you out.
Ready to modernize your codebase?
Let GitPlumbers help you transform AI-generated chaos into clean, scalable applications.
