I wrote this while learning how retrieval systems fail in practice on my own home-lab setup.
The first version of my ingestion pipeline looked fine in local tests, but it quietly failed halfway through on larger runs. That left my knowledge index in a mixed state: some files were current, others were stale, and answers became inconsistent. I did not catch it quickly because I had no checkpoints, weak logging, and no reliable run summary. On constrained local hardware, those silent failures were expensive because every rerun consumed hours.
This post is my learning log for fixing that. Not a perfect system, just a progression from fragile scripts to a pipeline I could actually trust.
What I Was Trying to Build
Across this series, the goal is one system: a technical documentation repository that AI coding agents can query reliably when they need syntax, framework conventions, and implementation patterns.
I broke that into three parts:
- Build a trustworthy ingestion and indexing pipeline.
- Build a retrieval layer that returns the right chunks, not just semantically similar ones.
- Build an evaluation loop that measures faithfulness and catches regressions.
This post is Part 1: the ingestion/data foundation.
Table of contents
Open Table of contents
The State of Affairs (The Bad Old Days)
My file ingestion “infrastructure” at the time was, charitably, a collection of Python scripts loosely organized by document type. Source code had one script. Markdown documentation had another. JSON configs and CSV reports had a third. Each had its own way of reading, cleaning, and chunking content. When I found a bug in how I handled Unicode in file paths, I fixed it in two of the three scripts and left the third broken for weeks.
The shared data layer was the real problem. Every script reached directly into Azure Blob Storage, did its own thing, and wrote chunks to a second container. If a script died mid-run, it just stopped. No checkpoint. No retry. No way to resume from where it left off — I had to re-run the whole thing from scratch and hope.
Here’s what that world looked like:
flowchart TD
A[Files in Azure Blob Storage] --> B[Script A: ingest_code.py]
A --> C[Script B: ingest_docs.py]
A --> D[Script C: ingest_configs_v3_FINAL.py]
B --> E[embed_and_store_v1.py]
C --> F[embed_maybe.py]
D --> G[embed_new.py]
E --> H[(Vector Index — ???)]
F --> H
G --> H
H --> I[agent_serve_i_think.py]
style A fill:#374151,stroke:#6b7280,color:#f9fafb
style H fill:#7f1d1d,stroke:#ef4444,color:#f9fafb
style I fill:#7f1d1d,stroke:#ef4444,color:#f9fafb
Every box in that diagram was a point of failure. Every arrow was an implicit contract nobody had written down. When something broke at 2am, debugging meant SSH-ing into a VM and grepping log files.
I knew this was unsustainable. I just did not yet have a clean pattern for fixing it.
The First Principle: File Pipelines Are Not Just Ingestion Scripts
The first thing we got wrong when trying to improve things was treating the file pipeline like it was just “a smarter script.” We moved from Python scripts to Azure Logic Apps for orchestration, felt good for a week, and then realized we’d added overhead without solving the real problem: there was no shared processing layer, no consistent chunking strategy, and no way to reproduce a specific knowledge-base snapshot.
Here’s what makes a knowledge pipeline for LLM-driven systems different from a typical ETL job: you need to go backwards. At any point, you should be able to answer “what exact file versions were indexed when the system gave that answer on Tuesday?” That’s not a nice-to-have — when the agent makes a mistake and a user files a ticket, it’s the first question you need to answer.
I ended up with three non-negotiables:
- Reproducibility: A pipeline run with the same config produces the same index, period.
- Observability: Every processing stage emits metrics. You can tell at a glance that chunk quality is healthy.
- Isolation: File processing logic lives in one place, not scattered across ingestion scripts.
Why I Chose These Tools
Before the implementation details, here is the plain-language reason for each major dependency.
- Ray: I needed Python-native parallelism for file processing and embedding jobs without rewriting everything into a new framework.
- Azure Blob Storage: It was already my source of truth for files, so keeping snapshots there reduced moving parts.
- Azure AI Search: It gave me both vector and keyword search in one managed index.
- MLflow: I needed run-level metadata and artifacts so I could compare pipeline runs and debug regressions.
None of these are mandatory. If your corpus is small, a single-process pipeline plus SQLite logs is enough to start.
Enter Ray: Distributed Processing Without the Ceremony
I evaluated a few options — Azure Data Factory, Dask, and Ray. Data Factory was fine for simple copy jobs but clunky for the Python-heavy transformation I needed. Dask was appealing but its in-memory model made large file batches tricky on my setup. Ray felt like it was built for exactly this.
The thing that sold us was how little ceremony it requires. You take an existing Python function and run it across a cluster by adding a decorator:
import ray
from azure.storage.blob import BlobServiceClient
ray.init()
@ray.remote
def process_file_chunk(blob_path: str) -> dict:
client = BlobServiceClient.from_connection_string(AZURE_CONN_STR)
blob = client.get_blob_client(container="project-files", blob=blob_path)
raw = blob.download_blob().readall().decode("utf-8", errors="replace")
chunks = semantic_chunk(raw)
embeddings = embed_batch(chunks)
return {"source": blob_path, "chunks": chunks, "embeddings": embeddings}
# Fan out across 500 files in parallel
blob_paths = list_blobs(AZURE_CONN_STR, container="project-files")
futures = [process_file_chunk.remote(p) for p in blob_paths]
results = ray.get(futures)
That’s it. No JVM, no YARN, no Spark context. Just Python functions that scale across machines.
But the real power came when we started using Ray Data for the full ingestion pipeline. Ray Data gives you a distributed dataset abstraction that handles streaming from Azure Blob Storage, batching for embedding calls, and memory management — without requiring you to think about the distributed execution yourself.
import ray.data as rd
# Build the pipeline declaratively
dataset = (
rd.read_binary_files("az://project-files/docs/**")
.map(decode_and_parse) # text extraction
.map_batches(semantic_chunk, batch_size=64) # structure-aware chunking
.filter(lambda row: len(row["text"]) > 50) # discard micro-fragments
.map_batches(embed_chunks, batch_size=256, # Azure OpenAI embeddings
num_cpus=2)
.random_shuffle(seed=42)
)
# Ray handles parallelism, streaming, and memory — you just iterate
train_ds, val_ds = dataset.train_test_split(test_size=0.1)
The lazy evaluation here is key. Ray doesn’t read from Azure Blob Storage until you iterate over the dataset. Define your full pipeline, validate on a small sample, then scale up — no code changes.
Minified Production Architecture
This is the reduced version I actually reason with day to day:
flowchart TD
A[Blob files] --> B[Ray processing]
B --> C[Versioned chunks]
C --> D[Embedding workers]
D --> E[Azure AI Search index]
D --> F[MLflow run logs]
E --> G[LLM retrieval tool]
F --> H[Debug + compare runs]
style A fill:#1e3a5f,stroke:#3b82f6,color:#f9fafb
style B fill:#14532d,stroke:#22c55e,color:#f9fafb
style D fill:#3b0764,stroke:#a855f7,color:#f9fafb
style E fill:#7c2d12,stroke:#f97316,color:#f9fafb
style F fill:#1c1917,stroke:#78716c,color:#f9fafb
The File Processing Pipeline
The file processing pipeline runs on every code push and on a nightly schedule. It materializes processed chunks to a versioned store in Azure Blob Storage. The key rule I settled on early: the retrieval system never reads raw files directly. Raw files change shape. Processed chunks should be stable.
We validate chunk quality using Pandera, which lets you define DataFrame schemas as code and validate them at runtime:
import pandera as pa
from pandera import Column, DataFrameSchema, Check
chunk_schema = DataFrameSchema({
"source_path": Column(pa.String, nullable=False),
"chunk_text": Column(pa.String, checks=Check.str_length(min_value=50)),
"chunk_index": Column(pa.Int, checks=Check.ge(0)),
"file_type": Column(pa.String), # "code", "markdown", "json", "csv"
"embedding": Column(pa.Object), # List[float], validated separately
"last_modified": Column(pa.DateTime),
})
@ray.remote
def validate_and_enrich(batch: pd.DataFrame) -> pd.DataFrame:
try:
validated = chunk_schema.validate(batch, lazy=True)
return enrich_with_metadata(validated)
except pa.errors.SchemaErrors as e:
# Fail loudly with metrics — don't silently drop bad chunks
ray.train.report({"validation_errors": len(e.failure_cases)})
raise
Silent data quality failures are how you end up with an agent confidently citing deleted files six months from now.
Distributed Embedding with Ray Actors
The embedding step — turning text chunks into vectors — is the most compute-intensive part of the pipeline. We use Azure OpenAI for embeddings, but calling it sequentially for 800k chunks would take hours. With Ray, we fan out across worker actors, each managing its own Azure OpenAI client and rate-limit budget:
from openai import AzureOpenAI
from ray.train import RunConfig, CheckpointConfig
@ray.remote
class EmbeddingWorker:
def __init__(self):
self.client = AzureOpenAI(
azure_endpoint=AZURE_OPENAI_ENDPOINT,
api_key=AZURE_OPENAI_KEY,
api_version="2024-02-01",
)
def embed_batch(self, chunks: list[str]) -> list[list[float]]:
response = self.client.embeddings.create(
input=chunks,
model="text-embedding-3-large",
)
return [item.embedding for item in response.data]
# Workers checkpoint progress — survives Azure Spot VM preemption
workers = [EmbeddingWorker.remote() for _ in range(8)]
futures = [
workers[i % len(workers)].embed_batch.remote(batch)
for i, batch in enumerate(chunk_batches)
]
embeddings = ray.get(futures)
The first time this survived a preemption and resumed from checkpoint, I finally trusted unattended runs.
MLflow for Pipeline Tracking
Every ingestion run logs its parameters and output metrics to MLflow — which files were processed, what schema version was used, how many chunks were produced, and embedding quality signals:
import mlflow
with mlflow.start_run(run_name=f"ingest-{snapshot_id}"):
mlflow.log_params({
"chunk_strategy": "semantic",
"embedding_model": "text-embedding-3-large",
"file_types": ["py", "md", "json", "csv"],
"schema_version": SCHEMA_VERSION,
"snapshot_id": snapshot_id,
})
# ... run pipeline ...
mlflow.log_metrics({
"total_files": total_files,
"total_chunks": total_chunks,
"validation_errors": validation_errors,
"avg_chunk_length": avg_chunk_length,
})
mlflow.log_artifact(chunk_manifest_path)
Always log the snapshot ID and schema version alongside every run. When the agent gives a wrong answer three months from now, you want a five-second MLflow query — not a three-hour archaeology project.
The Operational Wins
After switching to this setup:
- I stopped losing long runs to partial failures because checkpointing made retries practical.
- Full corpus processing time dropped from roughly 14 hours to around 2.5 hours on similar compute.
- Trying a new chunking strategy became a same-day task instead of a multi-day rewrite.
- Fixing parsing bugs became centralized instead of scattered across script variants.
But the most important win was psychological. Engineers stopped dreading the pipeline. When you trust the infrastructure, you spend your time on the actual problem.
What We’d Do Differently
We over-engineered the chunk store early. We spent two weeks building versioning tooling before validating that our chunks produced measurably better retrieval. Start simple — az://bucket/chunks/v{timestamp}/{file_type}.parquet — and add complexity only when you hit a specific, measurable pain point.
Semantic chunking degrades on certain file types. The similarity-merge approach works well for prose markdown and docstrings, but produces oversized, incoherent chunks on heavily decorated Python (files with stacked @pytest.mark, @app.route, or @dataclass decorators). For those we fall back to function-level AST splitting.
Ray’s heterogeneous cluster support is powerful but opaque. Placement groups and resource scheduling have a steeper learning curve than the docs suggest. We spent roughly three days debugging tasks routing to the wrong node type. Budget for this.
MLflow’s SQLite backend is a silent bottleneck. At 200+ runs per week it becomes the limiting factor. Configure PostgreSQL before you hit it.
The biggest change was confidence: I could explain why a run failed, what data version was indexed, and how to recover without guesswork.
Next in this series: with a reliable knowledge index in place, we build the retrieval system that queries it — hybrid search, cross-encoder reranking, and the failure modes that only show up in production.