Build a RAG ingestion pipeline
This tutorial builds a Python pipeline that turns a folder of PDFs into a queryable index for an LLM. The Data Extraction API handles the first step — converting PDFs to clean Markdown — and the rest of the pipeline chunks, embeds, stores, and retrieves.
What you’ll build
A Python CLI that:
- Extracts clean Markdown from PDFs via the Data Extraction API
- Chunks the Markdown by heading boundaries
- Embeds chunks with OpenAI
- Stores vectors in Chroma
- Answers questions with Claude, citing source sections
Why use the Data Extraction API for RAG
The Data Extraction API’s understand mode runs a full layout analysis pipeline that preserves headings, lists, tables, and reading order in the Markdown output. Stable structure means:
- Heading-aware chunking that follows the document’s actual sections
- Smaller chunks with less noise, which reduces token costs
- More reliable retrieval and fewer hallucinated answers
Prerequisites
- Python 3.10+
- A Nutrient DWS account and Data Extraction API key — sign up at the Nutrient dashboard(opens in a new tab)
- An OpenAI key for embeddings (or swap to Voyage, Cohere, or a local model)
- An Anthropic key for the LLM step (or swap to OpenAI)
Project setup
mkdir pdf-rag-data-extraction && cd pdf-rag-data-extractionpython -m venv .venv && source .venv/bin/activatepip install requests chromadb openai anthropic python-dotenv tqdmCreate a .env file:
NUTRIENT_API_KEY=your_data_extraction_api_key_hereOPENAI_API_KEY=your_openai_key_hereANTHROPIC_API_KEY=your_anthropic_key_hereFolder layout:
pdf-rag-data-extraction/├─ .env├─ pdfs/ # Drop your PDFs here├─ ingestion/│ ├─ extract.py # PDF → Markdown via Data Extraction API│ ├─ chunk.py # Markdown → chunks│ ├─ embed.py # Chunks → vectors│ └─ store.py # Vectors → Chroma├─ retrieval/│ └─ ask.py # Query → top-k context → LLM answer└─ run.py # End-to-end CLIStep 1 — Extract Markdown from PDFs
The Data Extraction API accepts a file upload and returns Markdown when output.format is "markdown":
import osimport pathlib
import requestsfrom dotenv import load_dotenv
load_dotenv()
API_KEY = os.environ["NUTRIENT_API_KEY"]ENDPOINT = "https://api.nutrient.io/extraction/parse"
def pdf_to_markdown(pdf_path: pathlib.Path) -> str: """Convert a single PDF to Markdown via the Data Extraction API.""" with open(pdf_path, "rb") as f: response = requests.post( ENDPOINT, headers={"Authorization": f"Bearer {API_KEY}"}, files={"file": f}, data={ "instructions": '{"mode":"text","output":{"format":"markdown"}}' }, ) response.raise_for_status() result = response.json() return result["output"]["markdown"]
if __name__ == "__main__": pdf = pathlib.Path("pdfs/sample.pdf") md = pdf_to_markdown(pdf) print(md[:500])The Markdown preserves headings, lists, and table structure, which is the format LLMs and chunkers handle best.
Step 2 — Chunk by heading
Markdown structure makes chunking straightforward. Split at heading boundaries and soft-cap each chunk at ~1,800 characters (~450 tokens):
import re
HEADING_RE = re.compile(r"^(#{1,6})\s+(.*)$", re.MULTILINE)
def split_by_heading(md: str, max_chars: int = 1800) -> list[dict]: """Split Markdown into chunks at heading boundaries.""" headings = list(HEADING_RE.finditer(md)) if not headings: return [ {"title": "(untitled)", "text": md[i : i + max_chars]} for i in range(0, len(md), max_chars) ] sections = [] for i, h in enumerate(headings): end = headings[i + 1].start() if i + 1 < len(headings) else len(md) title = h.group(2).strip() body = md[h.end() : end].strip() sections.append({"title": title, "body": body}) chunks = [] for s in sections: text = f"# {s['title']}\n\n{s['body']}" for i in range(0, len(text), max_chars): chunks.append({"title": s["title"], "text": text[i : i + max_chars]}) return chunksStep 3 — Embed and store
Use OpenAI’s text-embedding-3-small for embeddings and Chroma for local vector storage:
import os
from openai import OpenAI
client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
def embed(texts: list[str]) -> list[list[float]]: response = client.embeddings.create( model="text-embedding-3-small", input=texts ) return [d.embedding for d in response.data]import chromadb
client = chromadb.PersistentClient(path=".chroma")collection = client.get_or_create_collection("pdf-rag")
def upsert(ids, docs, metas, embeddings): collection.upsert( ids=ids, documents=docs, metadatas=metas, embeddings=embeddings )Swap Chroma for Pinecone, pgvector, Weaviate, or Qdrant by replacing store.py. The interface stays the same.
Step 4 — Wire it together
import hashlibimport pathlib
from tqdm import tqdm
from ingestion.chunk import split_by_headingfrom ingestion.embed import embedfrom ingestion.extract import pdf_to_markdownfrom ingestion.store import upsert
def ingest(folder: str = "pdfs") -> None: for pdf in pathlib.Path(folder).glob("*.pdf"): md = pdf_to_markdown(pdf) chunks = split_by_heading(md) ids = [ hashlib.sha1(f"{pdf.name}-{i}".encode()).hexdigest() for i in range(len(chunks)) ] docs = [c["text"] for c in chunks] metas = [ {"source": pdf.name, "section": c["title"]} for c in chunks ] for i in tqdm(range(0, len(docs), 64), desc=pdf.name): batch = slice(i, i + 64) upsert( ids=ids[batch], docs=docs[batch], metas=metas[batch], embeddings=embed(docs[batch]), )
if __name__ == "__main__": ingest()Step 5 — Retrieve and answer
Top-k retrieval from Chroma into Claude, with sources cited by section:
import osimport sys
import anthropicimport chromadb
from ingestion.embed import embed
llm = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])collection = chromadb.PersistentClient(path=".chroma").get_or_create_collection( "pdf-rag")
def ask(question: str, k: int = 6) -> str: q_embedding = embed([question])[0] res = collection.query(query_embeddings=[q_embedding], n_results=k) docs = res["documents"][0] sources = [ f"{m['source']} — {m['section']}" for m in res["metadatas"][0] ] context = "\n\n---\n\n".join(docs) msg = llm.messages.create( model="claude-sonnet-4-6", max_tokens=800, messages=[ { "role": "user", "content": ( "Answer the question using only the context. " "Cite sources by section name when relevant.\n\n" f"CONTEXT:\n{context}\n\nQUESTION: {question}" ), } ], ) return msg.content[0].text + "\n\nSources:\n" + "\n".join(sources)
if __name__ == "__main__": print(ask(sys.argv[1]))Run it
python run.pypython -m retrieval.ask "What does this document say about termination?"Production considerations
- Cache extraction — Hash PDF bytes and skip reextraction on unchanged files.
- Route by document type — Use
textmode for born-digital PDFs (fastest, cheapest). Useunderstandmode if you work with complex layouts and require extraction of formatting information, tables, and formulas. - Switch to spatial for structured data — If you need bounding boxes, table cells, or key-value pairs from forms, use
output.format: "spatial"instead of Markdown. Refer to information on how to build a document extraction pipeline. - Add evaluation — Track retrieval hit-rate and answer correctness as you change anything in the pipeline.
- Multilingual documents — Set the
options.languageparameter for non-English PDFs. See the guide on multilingual extraction.
Data Extraction API vs. DWS Processor for RAG
| Feature | Data Extraction API | DWS Processor |
|---|---|---|
| Endpoint | POST /extraction/parse | POST /build |
| Markdown output | Yes (output.format: "markdown") | Yes (output.type: "markdown") |
| Structured spatial elements | Yes (output.format: "spatial") | No |
| Layout analysis | Full segmentation + AI augmentation (understand) | Born-digital extraction |
| Best for | Complex documents, mixed workflows (Markdown + spatial) | Born-digital PDFs, Markdown-only pipelines |
Both APIs return clean Markdown suitable for RAG. The Data Extraction API adds structured element extraction and deeper layout understanding for complex documents.