Stop Faking Real‑Time: The Data Pipeline That Closes the CFO’s Tab, Not Your Pager
Real-time that actually drives decisions: contracts, CDC, stream processing, and ops discipline that keep dashboards right and teams sane.
Real-time isn’t a logo on a slide; it’s an operational promise you can keep.Back to all posts
The 2 a.m. page you’ve lived through
You roll in hot to a war room: the CFO’s “live” revenue dashboard is off by 12%, Finance paused a promo, and Marketing’s blaming “the pipeline.” The culprit? A backfilled job labeled “near real-time,” a schema change that slipped through, and a Kafka consumer lagging into oblivion. I’ve seen this movie at startups and at banks. Real-time isn’t a logo on a slide; it’s a set of operational promises you can keep.
This is the playbook we use at GitPlumbers to build pipelines that the business trusts—and that don’t page you when a single upstream table sneezes.
Define real-time in business terms, not milliseconds
Before tools, write the contract with the business.
- Decision and tolerance: “Pricing uses inventory within 60 seconds; P95 absolute error < 1%.”
- SLOs: Freshness (P95 < 60s), correctness (P99 pass on schema + domain rules), availability (>= 99.9% for reads).
- Error budget policy: What happens when we burn it? Slow promotions? Kill non-critical consumers? This is where execs nod until you enforce it.
Track these as first-class metrics. If your Grafana board doesn’t show freshness and correctness next to revenue, you’re flying blind.
An architecture that won’t lie to you
You don’t need to boil the ocean. Use boring, proven pieces:
- Ingestion (CDC):
Debeziumstreaming from OLTP (Postgres,MySQL,SQL Server) intoKafkawithSchema Registry(AvroorProtobuf). - Processing:
Flink(SQL or DataStream) orksqlDBfor joins/windows, exactly-once with Kafka transactions. - Quality gates: Contracts at the edge (+ DLQ),
Great Expectationsor in-stream validators. - Storage: Lakehouse (
Iceberg/Delta) inS3/GCSfor history/backfill; a serving store (Pinot,ClickHouse,Druid, orMaterialize) for sub-second queries. - Orchestration & deploy: GitOps with
ArgoCD/Flux, jobs viaArgo Workflows/Airflowwhere needed. - Observability:
Prometheus/Grafanafor lag and SLOs,OpenLineage(Marquez) for lineage, plus logs/traces viaOpenTelemetry.
Don’t stream straight into Snowflake and call it real-time. It’s fine as a consumer, not as your serving layer for decisions with sub-second SLAs.
A Debezium source connector that’s saved us more than once:
{
"name": "inventory-postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "inventory-db.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${file:/etc/creds/pg:password}",
"database.dbname": "inventory",
"schema.include.list": "public",
"table.include.list": "public.products,public.stock_levels",
"plugin.name": "pgoutput",
"slot.name": "debezium_inventory",
"publication.autocreate.mode": "filtered",
"tombstones.on.delete": "false",
"topic.prefix": "cdc.inventory",
"decimal.handling.mode": "double",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.headers": "op,ts_ms",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)",
"transforms.route.replacement": "$1.v1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://schema-registry:8081"
}
}Pin versioned topics (.v1, .v2) to explicit schemas. No silent breaking changes.
Data quality is a product: contracts, tests, and DLQs
Stop hoping producers “do the right thing.” Enforce it.
- Data contracts: Producer-owned schemas in
Schema Registrywith compatibility set toBACKWARD. Treat fields like API surfaces. - Validation at ingress: Reject events that fail schema or domain rules; route to DLQ with context.
- Downstream assertions:
Great Expectationsor streaming validators for invariants (e.g.,qty >= 0).
Avro schema example for an inventory_update event:
{
"type": "record",
"name": "InventoryUpdate",
"namespace": "com.company.inventory",
"fields": [
{"name": "sku", "type": "string"},
{"name": "warehouse_id", "type": "string"},
{"name": "delta_qty", "type": "int"},
{"name": "source", "type": {"type": "enum", "name": "Source", "symbols": ["ERP", "WMS", "RETURNS", "MANUAL"]}},
{"name": "event_time", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}Great Expectations checkpoint for a batch-safe guardrail on the lake (run on micro-batches or snapshot tables):
# great_expectations/checkpoints/inventory.yaml
name: inventory_freshness_and_bounds
config_version: 1
class_name: Checkpoint
validations:
- batch_request:
datasource_name: lake_s3
data_asset_name: s3://data-lake/iceberg/inventory_updates/*
expectation_suite_name: inventory_suiteIn-stream validation with Flink SQL and DLQ routing:
-- Filter obviously bad events; keep context for DLQ
CREATE TABLE inventory_updates (
sku STRING,
warehouse_id STRING,
delta_qty INT,
source STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (...);
CREATE TABLE inventory_updates_valid WITH (...);
CREATE TABLE inventory_updates_dlq WITH (...);
INSERT INTO inventory_updates_valid
SELECT * FROM inventory_updates
WHERE delta_qty IS NOT NULL AND delta_qty BETWEEN -100000 AND 100000;
INSERT INTO inventory_updates_dlq
SELECT *, 'delta_qty_out_of_bounds' AS reason FROM inventory_updates
WHERE delta_qty IS NULL OR delta_qty < -100000 OR delta_qty > 100000;DLQs aren’t graveyards—give them owners, SLAs, and replay tooling.
Reliability: engineer for replay, backpressure, and SLOs
Pipelines are production systems. Treat them like it.
- Exactly-once-ish: Kafka transactions + Flink checkpoints. Make sinks idempotent (upserts by key, dedupe on primary keys).
- Backpressure: Bound concurrency. If one consumer lags, don’t drown the cluster. Autoscale conservatively.
- Replay: Immutable log + versioned lake tables (
Iceberg/Delta) with time travel. - Runbooks: “How to reprocess yesterday’s partition,” “How to scrub PII,” “How to rewind a topic safely.”
Flink config that’s saved our bacon:
# application.properties for Flink job (passed as -D or env)
state.backend: rocksdb
state.checkpoints.dir: s3://flink-checkpoints/prod/
execution.checkpointing.interval: 30000
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.unaligned: true
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay: 10 sTopic configs to keep Kafka from turning into a thundering herd during outages:
kafka-configs.sh --bootstrap-server $BS --alter --entity-type topics --entity-name cdc.inventory.v1 \
--add-config min.insync.replicas=2,retention.ms=604800000,segment.ms=3600000Prometheus alert when consumer lag threatens your freshness SLO:
# alerting-rules.yaml
- alert: RealTimeFreshnessAtRisk
expr: sum(kafka_consumer_group_lag{group="inventory-aggregator"}) > 200000
for: 5m
labels:
severity: page
annotations:
summary: "Consumer lag threatens freshness SLO"
description: "inventory-aggregator lag > 200k for 5m; risk to P95 < 60s freshness."If you can’t tie an alert to an SLO, it should be a dashboard, not a page.
From events to decisions: serving layers that answer fast
The business doesn’t query Kafka. They ask questions.
- Operational analytics:
PinotorClickHousefor sub-second aggregates with upserts and late-arrival handling. - Exploratory: Land curated streams in
Iceberg/Delta->Snowflake/BigQueryfor ad hoc. - Materialized decisions: Precompute to a table the app or exec dashboard reads.
Example: near real-time inventory position per SKU/warehouse with 30s latency.
-- Flink SQL to compute running inventory using 5-min hopping windows
CREATE TABLE stock_levels (
sku STRING,
warehouse_id STRING,
on_hand INT,
PRIMARY KEY (sku, warehouse_id) NOT ENFORCED
) WITH (...);
CREATE TABLE inventory_updates_valid (...) WITH (...);
CREATE TABLE inventory_position_sink (
sku STRING,
warehouse_id STRING,
position BIGINT,
window_end TIMESTAMP(3),
PRIMARY KEY (sku, warehouse_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'inventory.position.v1',
'key.format' = 'json',
'value.format' = 'json'
);
INSERT INTO inventory_position_sink
SELECT u.sku,
u.warehouse_id,
SUM(u.delta_qty) OVER (PARTITION BY u.sku, u.warehouse_id ORDER BY u.event_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
+ COALESCE(s.on_hand, 0) AS position,
TUMBLE_END(u.event_time, INTERVAL '30' SECOND) AS window_end
FROM inventory_updates_valid u
LEFT JOIN stock_levels s
ON u.sku = s.sku AND u.warehouse_id = s.warehouse_id;Serve inventory.position.v1 into Pinot for sub-second queries to the pricing service. We’ve seen this cut stockout mis-pricing by 20% in two weeks.
A 90-day path to “good enough real-time”
You don’t need a platform committee. You need a win.
- Weeks 1–2: Define SLOs and the decision. Pick one KPI (e.g., “pricing accuracy depends on inventory fresh < 60s”).
- Weeks 2–4: Stand up MSK/Confluent or use managed Kafka; deploy Debezium for 1–2 source tables. Register schemas and lock compatibility.
- Weeks 4–6: Implement contracts and DLQ. Build a simple Flink SQL job to join and aggregate. Land to a serving store (Pinot/ClickHouse) and the lake.
- Weeks 6–8: Instrument lag/freshness. Add runbooks. Canary the consumer group and roll out behind a feature flag.
- Weeks 8–10: Backfill via lake to sync historical state. Validate with finance partners. Publish the decision API.
- Weeks 10–12: Harden (chaos tests, failover drills), document, then expand sources gradually.
Terraforming an MSK cluster the boring way:
resource "aws_msk_cluster" "rt_msk" {
cluster_name = "rt-core"
kafka_version = "3.6.0"
number_of_broker_nodes = 3
broker_node_group_info {
instance_type = "kafka.m5.large"
client_subnets = var.private_subnets
security_groups = [aws_security_group.msk.id]
storage_info { ebs_storage_info { volume_size = 1000 } }
}
configuration_info {
arn = aws_msk_configuration.default.arn
revision = aws_msk_configuration.default.latest_revision
}
}Keep IaC in Git, deploy with ArgoCD, and version your Flink jobs like any other service.
What I’d do differently (and traps I still see)
- Fan-out storms: Ten teams subscribe to a raw topic and apply their own “business rules.” Centralize contracts and publish a curated stream.
- Join explosions: Stream-join batch tables without TTL and you’ll DOS yourself. Use keyed state with eviction, or precompute reference dims.
- Schema roulette: “Optional” fields that are actually required. Set compatibility and fail fast at the registry.
- PII leakage: Mask at the edge. Keep PII in separate topics/namespaces with strict ACLs and purpose-based access.
- Infinite replays: Protect sinks with idempotency keys and dedupe windows; throttle reprocessing jobs.
- Snowflake-as-stream: It’s a consumer. Great for modeling; not your serving layer for sub-second decisions.
- Observability theater: If your alerts don’t map to SLOs, you’re collecting graphs, not running a service.
When we applied this playbook at a retail client: P95 freshness dropped from ~12 min to 35s, pricing errors fell 18%, and on-calls went from 11/month to 2/month—without doubling infra spend. That’s the bar.
Key takeaways
- Define “real-time” with business SLOs (P95 freshness, correctness, availability) before buying tools.
- Use CDC + schemas + contracts to stop bad data at the door; make validation and DLQs first-class.
- Stream processing (Flink/ksqlDB) + a serving store (Pinot/ClickHouse) beats duct-taped batch for decisions.
- Engineer for replay, idempotency, and backpressure; measure lag and error budgets like production services.
- Deliver one business-critical decision first, then scale—90 days is enough if you cut scope and do the boring ops work.
Implementation checklist
- Write explicit data SLOs: freshness, correctness, and availability targets tied to decisions.
- Stand up CDC from source-of-truth DBs with `Debezium` and register schemas (Avro/Protobuf).
- Enforce data contracts at the edge; route failures to DLQ with context and replay paths.
- Implement windowed aggregations in `Flink SQL` and publish to a queryable store (Pinot/ClickHouse).
- Version data in a lake (Iceberg/Delta) for backfill/replay; keep lineage with OpenLineage.
- Instrument lag, error rate, and checkpoint health in Prometheus; page only on SLO threats.
- Ship with GitOps (ArgoCD); canary consumer groups and keep rollback scripts ready.
- Prove value with one KPI (e.g., inventory accuracy or fraud latency) before broad rollout.
Questions we hear from teams
- How “real-time” is realistic without blowing budget?
- For most decisions, P95 freshness under 60 seconds with >99.9% availability is enough. CDC + Kafka + modest Flink clusters + a right-sized serving store (Pinot/ClickHouse) will get you there without 7-figure spend. Reserve sub-10s for true edge cases (fraud, trading).
- Can we do this on Snowflake/BigQuery only?
- They’re excellent consumers and lakes but not serving layers for sub-second SLAs. Use them for modeling and historical analytics. For decisions with tight latency, keep a streaming path into a fast OLAP store or materialized view engine and mirror curated data to Snowflake/BigQuery.
- Flink vs. ksqlDB vs. Spark Structured Streaming?
- If you need heavy joins, exactly-once sinks, and scale, Flink is the safe bet. ksqlDB is simpler for Kafka-first shops. Spark is fine for micro-batch and unified batch/stream—but its latency floor is higher. Pick the tool your team can operate at 2 a.m.
- What about schema evolution without outages?
- Use Schema Registry with compatibility (BACKWARD or FULL). Version topics for breaking changes (`topic.v2`) and dual-publish during migration. Consumers can roll forward on their schedules.
- How do we keep PII out of the wrong places?
- Define data classes, segment topics by sensitivity, and enforce masking at ingestion. Use field-level encryption where needed. Keep PII out of DLQs by hashing or dropping fields before routing.
Ready to modernize your codebase?
Let GitPlumbers help you transform AI-generated chaos into clean, scalable applications.
