Your AI Pipeline Is Just ETL With Extra Steps (And That's Fine)

| 5 min read |
data pipelines ai infrastructure

AI data pipelines aren't some new paradigm. They're ETL with a retrieval layer bolted on. The discipline that makes them work is the same discipline that has always made pipelines work: detect change, chunk intelligently, keep indexes fresh.

Quick take

Stop overcomplicating AI pipelines. They’re ETL plus retrieval ops. Diff your inputs, chunk by structure (not token count), upsert with stable IDs, and treat reindexing as a deliberate, versioned event. Skip the diffing step and retrieval drifts into garbage. I’ve seen it happen three times this year alone.


I’ve been building data pipelines since before anyone called them “data pipelines.” At the fintech startup we were ingesting financial news from hundreds of sources, normalizing it, and serving it for real-time retrieval. That was 2017. The core problems haven’t changed.

What has changed is that your pipeline now has a second consumer: a retrieval system feeding an LLM. If you treat that consumer as an afterthought, your AI product will deliver confidently wrong answers. Ruthless focus on the basics separates pipelines that work from pipelines that demo well.

The Shape of an AI Pipeline

Every AI pipeline I’ve seen in production boils down to six stages. Here’s the skeleton:

pipeline:
  stages:
    - name: extract
      # Pull from sources, normalize formats
      # PDF, HTML, API responses -> clean markdown or structured text

    - name: diff
      # Hash-based change detection
      # This is the stage most teams skip. Don't.

    - name: chunk
      # Split by document structure first, token count second
      # Preserve section boundaries and headings

    - name: embed
      # Generate vectors using a pinned model version
      # Log the model version. You will need it later.

    - name: index
      # Upsert with stable IDs and rich metadata
      # source_id + chunk_position = deterministic ID

    - name: verify
      # Check for missing chunks, stale entries, orphans
      # Alert on drift from expected source freshness

Nothing exotic. The magic is in the discipline of each stage, not in clever architecture.

The Diff Step Is Everything

Most teams skip change detection and reprocess everything on every run. At small scale, this is fine. At production scale, it’s expensive, noisy, and makes debugging a nightmare.

A simple content-hash approach works well:

func hasChanged(sourceID string, content []byte, store HashStore) bool {
    newHash := sha256.Sum256(content)
    existing, found := store.Get(sourceID)
    if !found {
        store.Set(sourceID, newHash)
        return true
    }
    if existing != newHash {
        store.Set(sourceID, newHash)
        return true
    }
    return false
}

When I built the ingestion pipeline at the fintech startup, adding a diff layer cut downstream processing costs by roughly 60%. Most sources don’t change on most runs. Detecting that early saves everything downstream.

The diff step also gives you auditability. You can answer “what changed and when” instead of shrugging at a vector store that silently drifted.

Chunking: Structure Before Size

This is where most RAG pipelines go wrong. Teams reach for a token-count splitter because it’s the default in every tutorial, then wonder why retrieval returns fragments of ideas instead of coherent answers.

Split by document structure first. Headings, sections, code blocks, list items – these are natural semantic boundaries. Only fall back to token-count splitting when a single section exceeds your context window.

def chunk_by_structure(doc: Document) -> list[Chunk]:
    chunks = []
    for section in doc.sections:
        if section.token_count <= MAX_CHUNK_TOKENS:
            chunks.append(Chunk(
                content=section.text,
                metadata={
                    "source_id": doc.id,
                    "section_heading": section.heading,
                    "position": section.index,
                    "doc_version": doc.version,
                },
                # Deterministic ID: no duplicates on re-ingestion
                id=f"{doc.id}:{section.index}",
            ))
        else:
            # Fall back to sliding window only for oversized sections
            for sub in sliding_window(section, MAX_CHUNK_TOKENS, overlap=100):
                chunks.append(Chunk(
                    content=sub.text,
                    metadata={**section.metadata, "sub_position": sub.index},
                    id=f"{doc.id}:{section.index}:{sub.index}",
                ))
    return chunks

Two things matter here. First, the id is deterministic, derived from source and position, not random. This means re-ingesting the same content produces upserts, not duplicates. Second, metadata travels with every chunk. When retrieval returns a chunk, you know exactly where it came from, which version, and which section.

I can’t overstate how many production RAG systems I’ve reviewed where chunks had no stable ID. Every reindex created duplicates. Users got the same passage three times in their context window, and the model hallucinated a consensus that didn’t exist.

Freshness Is an Operational Problem

Your pipeline isn’t done when it runs once. Sources change, APIs update, and documents get deleted. If your index doesn’t reflect reality, your AI lies with confidence.

Three rules I enforce on every pipeline:

  1. Reindex on embedding model changes. If you swap or upgrade your embedding model, every existing vector is stale. This is a full reindex event. No exceptions. Pin your model version and log it.

  2. Purge on source deletion. If a document disappears from the source, its chunks must disappear from the index. Orphaned chunks are a retrieval poison pill.

  3. Alert on freshness drift. Every source has an expected update cadence. If your financial news feed hasn’t updated in 6 hours, something is wrong. Don’t wait for a user to notice.

freshness_policy:
  sources:
    - name: product_docs
      expected_interval: 24h
      alert_after: 36h
    - name: api_changelog
      expected_interval: 7d
      alert_after: 10d
    - name: support_kb
      expected_interval: 48h
      alert_after: 72h
  on_embedding_change: full_reindex
  on_source_delete: purge_chunks

The Mistakes I Keep Seeing

After building AI infrastructure across telecom and fintech, the failure pattern is remarkably consistent:

No stable IDs. Updates create duplicates. Retrieval returns the same content multiple times. The model treats repetition as emphasis and doubles down on whatever it found.

Token-count-only chunking. A paragraph about authentication gets split mid-sentence. The first half lands in one chunk, the second half in another. Retrieval finds the first half. The model confidently gives half an answer.

Ad-hoc reindexing. Someone runs a reindex on a Friday afternoon. Nobody knows what changed. Retrieval quality shifts. The team argues about whether it got better or worse. No one can prove either way because there’s no baseline.

Missing permission metadata. The chunks are indexed without access control data. A user with restricted access asks a question and gets an answer sourced from documents they shouldn’t see. This is a compliance incident waiting to happen.

What matters

AI pipelines are pipelines. The retrieval layer adds real complexity, but the solution isn’t a new paradigm. It’s the same discipline that has always worked: detect change early, preserve meaning when you split, keep identifiers stable, and make freshness an operational concern with clear owners and alerts.

Same fundamentals, new surface area. That’s the whole story.