Try it
Download notebook
In [ ]:
# Install required packages
!pip install --quiet ipywidgets 'pydantic-ai-slim[openrouter,web-fetch]' python-dotenv pymupdf requests

import os
import urllib.request
import zipfile

# Download and extract data files
url = 'https://github.com/jsoma/workshop-ai-agents/raw/main/docs/04-pydantic-ai-and-oil-spills-more-orchestration/04-pydantic-ai-north-dakota-spills-data.zip'
print(f'Downloading data from {url}...')
urllib.request.urlretrieve(url, '04-pydantic-ai-north-dakota-spills-data.zip')

print('Extracting 04-pydantic-ai-north-dakota-spills-data.zip...')
with zipfile.ZipFile('04-pydantic-ai-north-dakota-spills-data.zip', 'r') as zip_ref:
    zip_ref.extractall('.')

os.remove('04-pydantic-ai-north-dakota-spills-data.zip')
print('✓ Data files extracted!')

A very complicated - overly complicated? - search for oil spills in North Dakota

Let's see what Pydantic AI can do for us.

We start with a PDF about an oil spill from the Hazconnect system.

Information on the incident reports can be incomplete, so we then use Pydantic AI to extract the entities from the document, perform research, and write a report.

The important part of this one is it's a structured path. Does it actually benefit from being agentic?

Setup portion

In [ ]:
from __future__ import annotations

import asyncio
import ipaddress
import json
import os
import re
import socket
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import Any, Literal
from urllib.parse import quote_plus, urljoin, urlparse

import braintrust
import pymupdf
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from markdownify import markdownify as md
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from getpass import getpass

from braintrust.wrappers.pydantic_ai import setup_pydantic_ai

load_dotenv()
if not os.getenv("OPENROUTER_API_KEY"):
    os.environ["OPENROUTER_API_KEY"] = getpass("OpenRouter API key: ")
if not os.getenv("BRAINTRUST_API_KEY"):
    os.environ["BRAINTRUST_API_KEY"] = getpass("Braintrust API key: ")

if os.getenv("BRAINTRUST_API_KEY"):
    setup_pydantic_ai(project_name="dataharvest-2026")
    print("Braintrust connection established.")
else:
    print("Set BRAINTRUST_API_KEY to enable Braintrust tracing.")

MODEL = "openrouter:google/gemini-3.1-flash-lite"
INCIDENT_PDF_PATHS = [
    Path("data/incidents/5981_95272.pdf"),
    Path("data/incidents/5984_95276.pdf"),
    Path("data/incidents/5976_95267.pdf"),
    Path("data/incidents/5978_95269.pdf"),
    Path("data/incidents/5986_95278.pdf"),
]
OUTPUT_PATH = Path("outputs/north_dakota_spill_research_reports.json")

DMR_WELL_SEARCH_URL = "https://www.dmr.nd.gov/oilgas/findwellsvw.asp"
ND_SOS_BUSINESS_SEARCH_URL = "https://firststop.sos.nd.gov/api/Records/businesssearch"
ND_SOS_BASE_URL = "https://firststop.sos.nd.gov"
FMCSA_BASE_URL = "https://safer.fmcsa.dot.gov/"
TAVILY_SEARCH_URL = "https://api.tavily.com/search"
USER_AGENT = (
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
    "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36"
)

Pydantic data classes

In [ ]:
class SourceLedgerEntry(BaseModel):
    title: str
    url: str
    source_type: str
    accessed_at: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
    notes: str | None = None


class IncidentLocation(BaseModel):
    address: str | None = None
    description: str | None = None
    latitude: float | None = None
    longitude: float | None = None
    section: str | None = None
    township: str | None = None
    range: str | None = None
    county_tribe: str | None = None


class IncidentParty(BaseModel):
    role: str
    name: str | None = None
    company: str | None = None
    title: str | None = None
    address: str | None = None
    phone: str | None = None


class ReleasedMaterial(BaseModel):
    material: str
    quantity: str | None = None
    notes: str | None = None


class ResearchTarget(BaseModel):
    kind: Literal["company", "carrier", "transporter", "operator", "well", "field", "location", "document", "other"]
    name: str
    reason: str


class ExtractedIncident(BaseModel):
    pdf_path: str
    incident_id: str
    headline: str
    description: str
    incident_date_time: str | None = None
    location: IncidentLocation
    released_materials: list[ReleasedMaterial] = Field(default_factory=list)
    well_name: str | None = None
    ndic_file_number: str | None = None
    oil_gas_operator: str | None = None
    oil_gas_field_name: str | None = None
    facility_company_name: str | None = None
    facility_name: str | None = None
    reporting_party: IncidentParty | None = None
    contacts: list[IncidentParty] = Field(default_factory=list)
    transportation_related: bool
    oil_gas_related: bool
    research_targets: list[ResearchTarget] = Field(default_factory=list)
    source_ledger: list[SourceLedgerEntry] = Field(default_factory=list)


class ResearchFinding(BaseModel):
    kind: Literal["company", "carrier", "well", "field", "document", "other"]
    name: str
    relation: str | None = None
    summary: str
    source_ledger: list[SourceLedgerEntry] = Field(default_factory=list)


class IncidentResearch(BaseModel):
    summary: str
    searches_performed: list[str] = Field(default_factory=list)
    findings: list[ResearchFinding] = Field(default_factory=list)
    unresolved_questions: list[str] = Field(default_factory=list)
    source_ledger: list[SourceLedgerEntry] = Field(default_factory=list)


class SpecialistFindings(BaseModel):
    company_and_carrier: list[ResearchFinding]
    well_and_field: list[ResearchFinding]
    outside_document: list[ResearchFinding]


class IncidentSynthesis(BaseModel):
    incident_id: str
    narrative: str
    parties_involved: list[str] = Field(default_factory=list)
    what_happened: list[str] = Field(default_factory=list)
    related_wells_or_fields: list[str] = Field(default_factory=list)
    useful_sources: list[SourceLedgerEntry] = Field(default_factory=list)
    unresolved_leads: list[str] = Field(default_factory=list)


@dataclass
class IncidentRun:
    pdf_path: Path
    incident: ExtractedIncident
    research: IncidentResearch
    synthesis: IncidentSynthesis

Our tools

In [ ]:
def now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def make_session() -> requests.Session:
    session = requests.Session()
    session.headers.update({"User-Agent": USER_AGENT, "Accept-Language": "en-US,en;q=0.9"})
    return session


def compact_text(value: Any) -> str:
    return re.sub(r"\s+", " ", str(value or "").replace("\xa0", " ")).strip()


def tool_json(payload: dict[str, Any]) -> str:
    return json.dumps(payload, indent=2, ensure_ascii=False)


def ledger(title: str, url: str, source_type: str, notes: str | None = None) -> dict[str, str | None]:
    return {"title": title, "url": url, "source_type": source_type, "accessed_at": now_iso(), "notes": notes}


def unique_ledgers(entries: list[SourceLedgerEntry]) -> list[SourceLedgerEntry]:
    seen: set[tuple[str, str, str]] = set()
    unique = []
    for entry in entries:
        key = (entry.title, entry.url, entry.source_type)
        if key in seen:
            continue
        seen.add(key)
        unique.append(entry)
    return unique


def fill_research_bookkeeping(research: IncidentResearch) -> IncidentResearch:
    source_ledger = unique_ledgers(
        research.source_ledger + [entry for finding in research.findings for entry in finding.source_ledger]
    )
    searches_performed = research.searches_performed or [
        entry.title
        for entry in source_ledger
        if entry.source_type
        in {
            "state_business_records",
            "state_oil_gas_records",
            "federal_transportation_records",
            "web_search",
            "web_page",
            "remote_pdf",
        }
    ]
    research.source_ledger = source_ledger
    research.searches_performed = list(dict.fromkeys(searches_performed))
    return research


def read_incident_pdf_text(pdf_path: str) -> str:
    """Read one local North Dakota HazConnect incident PDF."""
    path = Path(pdf_path)
    if not path.exists():
        raise FileNotFoundError(f"Incident PDF does not exist: {pdf_path}")
    doc = pymupdf.open(path)
    text = "\n\n".join(page.get_text() for page in doc)
    return tool_json(
        {
            "pdf_path": str(path),
            "page_count": doc.page_count,
            "text": text,
            "source_ledger": [
                ledger(
                    f"Local HazConnect incident PDF {path.name}",
                    str(path),
                    "local_pdf",
                    "Read with PyMuPDF from the local data/incidents folder.",
                )
            ],
        }
    )


def parse_dmr_well_table(html: str) -> list[dict[str, str]]:
    soup = BeautifulSoup(html, "html.parser")
    table = soup.find("table", attrs={"summary": re.compile("Well Log search results", re.I)})
    if table is None:
        return []
    headers = [compact_text(cell.get_text(" ", strip=True)) for cell in table.find_all("th")]
    rows = []
    for tr in table.find_all("tr"):
        cells = [compact_text(cell.get_text(" ", strip=True)) for cell in tr.find_all("td")]
        if cells and len(cells) == len(headers):
            rows.append(dict(zip(headers, cells, strict=True)))
    return rows


def search_nd_oil_gas_wells(
    operator: str = "",
    field: str = "",
    section: str = "",
    township: str = "",
    range_number: str = "",
    limit: int = 25,
) -> str:
    """Search North Dakota DMR well records. Use numeric township/range values, e.g. 152 and 98."""
    def dmr_number(value: str | int | None) -> str:
        match = re.search(r"\d+", str(value or ""))
        return match.group(0) if match else "0"

    payload = {
        "VTI-GROUP": "0",
        "ddmOperator": operator or " ",
        "ddmField": field or " ",
        "ddmSection": dmr_number(section),
        "ddmTownship": dmr_number(township),
        "ddmRange": dmr_number(range_number),
        "B1": "Submit",
    }
    response = make_session().post(
        DMR_WELL_SEARCH_URL,
        data=payload,
        headers={"Origin": "https://www.dmr.nd.gov", "Referer": DMR_WELL_SEARCH_URL},
        timeout=30,
    )
    response.raise_for_status()
    rows = parse_dmr_well_table(response.text)
    return tool_json(
        {
            "query": payload,
            "row_count": len(rows),
            "rows": rows[:limit],
            "truncated": len(rows) > limit,
            "source_ledger": [
                ledger("ND DMR oil/gas well search results", DMR_WELL_SEARCH_URL, "state_oil_gas_records")
            ],
        }
    )


def search_nd_sos_businesses(search_value: str, active_only: bool = False, limit: int = 5) -> str:
    """Search North Dakota Secretary of State FirstStop business records."""
    session = make_session()
    session.get(f"{ND_SOS_BASE_URL}/search/business", timeout=30)
    response = session.post(
        ND_SOS_BUSINESS_SEARCH_URL,
        json={"SEARCH_VALUE": search_value, "STARTS_WITH_YN": False, "ACTIVE_ONLY_YN": active_only},
        headers={
            "authorization": "undefined",
            "content-type": "application/json",
            "origin": ND_SOS_BASE_URL,
            "referer": f"{ND_SOS_BASE_URL}/search/business",
        },
        timeout=30,
    )
    response.raise_for_status()
    records = sorted(response.json()["rows"].values(), key=lambda row: row["SORT_INDEX"])[:limit]
    return tool_json(
        {
            "search_value": search_value,
            "record_count": len(records),
            "records": records,
            "source_ledger": [
                ledger("North Dakota FirstStop business search", ND_SOS_BUSINESS_SEARCH_URL, "state_business_records")
            ],
        }
    )


def search_fmcsa_carriers(company_name: str, limit: int = 5) -> str:
    """Search FMCSA SAFER carrier records by company name."""
    search_url = f"{FMCSA_BASE_URL}keywordx.asp?searchstring=%2A{quote_plus(company_name)}%2A&SEARCHTYPE="
    try:
        response = make_session().get(search_url, timeout=30)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")
        matches = [
            {
                "match_name": compact_text(anchor.get_text(" ", strip=True)),
                "snapshot_url": urljoin(FMCSA_BASE_URL, anchor["href"]),
            }
            for anchor in soup.find_all("a", href=True)
            if "queryCarrierSnapshot" in anchor["href"]
        ][:limit]
        return tool_json(
            {
                "company_name": company_name,
                "match_count": len(matches),
                "matches": matches,
                "source_ledger": [ledger("FMCSA SAFER carrier search", search_url, "federal_transportation_records")],
            }
        )
    except requests.RequestException as error:
        return tool_json(
            {
                "company_name": company_name,
                "match_count": 0,
                "matches": [],
                "source_ledger": [
                    ledger(
                        "FMCSA SAFER carrier search failed",
                        search_url,
                        "federal_transportation_records",
                        f"{type(error).__name__}: {error}",
                    )
                ],
            }
        )


def tavily_search(query: str, max_results: int = 5) -> str:
    """Search the web with Tavily."""
    response = requests.post(
        TAVILY_SEARCH_URL,
        headers={"Authorization": f"Bearer {os.environ['TAVILY_API_KEY']}"},
        json={
            "query": query,
            "max_results": max(1, min(max_results, 8)),
            "search_depth": "basic",
            "include_answer": False,
            "include_raw_content": False,
        },
        timeout=30,
    )
    response.raise_for_status()
    return tool_json(
        {
            "query": query,
            "results": response.json()["results"],
            "source_ledger": [ledger(f"Tavily search: {query}", TAVILY_SEARCH_URL, "web_search")],
        }
    )


def assert_safe_public_url(url: str) -> str:
    parsed = urlparse(url)
    if parsed.scheme not in {"http", "https"} or not parsed.hostname:
        raise ValueError("Only public http(s) URLs can be fetched.")
    hostname = parsed.hostname.lower()
    if hostname in {"localhost", "127.0.0.1", "::1"} or hostname.endswith(".local"):
        raise ValueError("Local URLs are not allowed.")
    for info in socket.getaddrinfo(hostname, None):
        ip = ipaddress.ip_address(info[4][0])
        if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_reserved:
            raise ValueError(f"Refusing to fetch non-public address {ip}.")
    return parsed.geturl()


def safe_fetch_url(url: str, max_chars: int = 12000) -> str:
    """Fetch a public web page or PDF and return extracted text."""
    try:
        response = make_session().get(assert_safe_public_url(url), timeout=35)
        response.raise_for_status()
        content = response.content
        content_type = response.headers.get("content-type", "")
        is_pdf = "pdf" in content_type.lower() or content[:5] == b"%PDF-" or urlparse(response.url).path.endswith(".pdf")
        if is_pdf:
            doc = pymupdf.open(stream=content, filetype="pdf")
            text = "\n\n".join(page.get_text() for page in doc)
            source_type = "remote_pdf"
        else:
            html = content.decode(response.encoding or "utf-8", errors="replace")
            text = md(html, strip=["script", "style", "img"]).strip()
            source_type = "web_page"

        return tool_json(
            {
                "url": response.url,
                "content_type": content_type,
                "text": text[:max_chars],
                "truncated": len(text) > max_chars,
                "source_ledger": [ledger("Fetched URL", response.url, source_type, f"HTTP {response.status_code}")],
            }
        )
    except Exception as error:  # noqa: BLE001 - bad search results should become evidence, not crashes.
        return tool_json(
            {
                "url": url,
                "text": "",
                "truncated": False,
                "source_ledger": [ledger("Fetch failed", url, "web_page", f"{type(error).__name__}: {error}")],
            }
        )

Our agents

In [ ]:
extraction_agent = Agent(
    MODEL,
    output_type=ExtractedIncident,
    tool_retries=2,
    tools=[read_incident_pdf_text],
    instructions="""
You extract structured details from one North Dakota HazConnect incident PDF.
Read the PDF path provided by the user, then fill the schema literally.

Do not infer missing names or numbers. Capture companies, transporters,
operators, wells, fields, facilities, addresses, location clues, and materials
when the PDF states them. Create research_targets for leads an investigator
should pursue next, and preserve the local PDF source ledger.
""",
)


company_agent = Agent(
    MODEL,
    output_type=list[ResearchFinding],
    tool_retries=2,
    tools=[search_nd_sos_businesses, search_fmcsa_carriers],
    instructions="""
You are the company and carrier research specialist.

Use the incident details and research targets to decide which names are worth
checking. Search North Dakota business records for companies/operators, and
FMCSA records for likely transporters, haulers, or carriers. Return [] if this
specialty has no useful lead. Preserve source ledger entries from tool results.
""",
)


well_agent = Agent(
    MODEL,
    output_type=list[ResearchFinding],
    tool_retries=2,
    tools=[search_nd_oil_gas_wells],
    instructions="""
You are the oil/gas well and field research specialist.

Use explicit well, operator, field, section, township, and range details from
the incident. If the incident gives section/township/range, a location search is
usually the best first move. Return [] if there is no meaningful oil/gas or
location lead.

Do not treat a transportation or service company as a DMR operator unless the
incident says it is one. A location-only well match is possibly_involved or
context_only, not definitely_involved. Preserve source ledger entries.
""",
)


document_agent = Agent(
    MODEL,
    output_type=list[ResearchFinding],
    tool_retries=2,
    tools=[tavily_search, safe_fetch_url],
    instructions="""
You are the outside-document research specialist.

Build your own web queries from the incident. Look for public pages or PDFs that
explain ownership, operators, fields, facilities, permits, stops, providers, or
prior incidents. Keep this bounded: at most two web searches and two fetches.
Tavily search results are leads, not evidence: fetch a promising result before
turning it into a finding. Return document findings only, or [] if no outside
document adds useful context. Preserve source ledgers.
""",
)


research_editor_agent = Agent(
    MODEL,
    output_type=IncidentResearch,
    instructions="""
You are the research editor for one North Dakota spill report.

Specialist agents have already searched company/carrier records, oil/gas well
records, and outside public documents. Combine their outputs into one concise
research record. Use only the supplied incident and specialist findings. Do not
invent facts. Preserve source ledger entries from the specialist findings.
Keep useful outside-document findings visible as kind="document"; do not fold
them into company or well findings.

Well relation rules:
- definitely_involved: the PDF or a public record names the specific well,
  operator, file number, or facility involved.
- possibly_involved: a well appears in the same section/township/range or field
  but the incident does not name that well directly.
- context_only: nearby or background records useful for understanding the area.
""",
)


synthesis_agent = Agent(
    MODEL,
    output_type=IncidentSynthesis,
    instructions="""
Synthesize one North Dakota spill investigation using only the extracted
incident and research findings. Write a compact, specific narrative explaining
what happened, who appears involved, which public records were found, what
wells/fields may be related, and what remains unresolved. Preserve useful
source ledger entries.
""",
)

...some more little convenience pieces

In [ ]:
async def extract_incident(pdf_path: Path) -> ExtractedIncident:
    response = await extraction_agent.run(f"Extract the incident report at {pdf_path}.")
    return response.output


async def research_incident(incident: ExtractedIncident) -> IncidentResearch:
    prompt = dedent(
        """
        Research your specialty for this spill report.

        Incident:
        {incident}
        """
    ).format(incident=incident.model_dump_json(indent=2))

    print("   company/carrier specialist...")
    company_response = await company_agent.run(prompt)

    print("   well/field specialist...")
    well_response = await well_agent.run(prompt)

    print("   outside-document specialist...")
    document_response = await document_agent.run(prompt)

    specialist_findings = SpecialistFindings(
        company_and_carrier=company_response.output,
        well_and_field=well_response.output,
        outside_document=document_response.output,
    )
    response = await research_editor_agent.run(
        dedent(
            """
            Combine this extracted incident and the specialist findings into one
            structured research record.

            Incident:
            {incident}

            Specialist findings:
            {findings}
            """
        ).format(
            incident=incident.model_dump_json(indent=2),
            findings=specialist_findings.model_dump_json(indent=2),
        )
    )
    return fill_research_bookkeeping(response.output)


async def synthesize_incident(incident: ExtractedIncident, research: IncidentResearch) -> IncidentSynthesis:
    response = await synthesis_agent.run(
        dedent(
            """
            Extracted incident:
            {incident}

            Research findings:
            {research}
            """
        ).format(incident=incident.model_dump_json(indent=2), research=research.model_dump_json(indent=2))
    )
    return response.output


def markdown_bullets(items: list[str]) -> str:
    return "\n".join(f"- {item}" for item in items) if items else "- None noted."


def markdown_sources(sources: list[SourceLedgerEntry]) -> str:
    if not sources:
        return "- None recorded."
    return "\n".join(
        f"- [{source.title}]({source.url}) ({source.source_type})"
        + (f" - {source.notes}" if source.notes else "")
        for source in sources
    )


def markdown_report(
    incident: ExtractedIncident,
    research: IncidentResearch,
    synthesis: IncidentSynthesis,
) -> str:
    sources = unique_ledgers(synthesis.useful_sources or research.source_ledger)
    findings = "\n".join(
        f"- **{finding.kind}: {finding.name}**"
        + (f" ({finding.relation})" if finding.relation else "")
        + f" - {finding.summary}"
        for finding in research.findings
    )

    sections = [
        f"## Incident {incident.incident_id}: {incident.headline}",
        synthesis.narrative,
        "### Parties Involved\n" + markdown_bullets(synthesis.parties_involved),
        "### What Happened\n" + markdown_bullets(synthesis.what_happened),
        "### Related Wells or Fields\n" + markdown_bullets(synthesis.related_wells_or_fields),
        "### Research Findings\n" + (findings or "- None recorded."),
        "### Unresolved Leads\n" + markdown_bullets(synthesis.unresolved_leads),
        "### Sources\n" + markdown_sources(sources),
    ]

    return "\n\n".join(section.strip() for section in sections)
In [ ]:
def write_outputs(runs: list[IncidentRun]) -> None:
    OUTPUT_PATH.parent.mkdir(exist_ok=True)
    payload = {
        "generated_at": now_iso(),
        "model": MODEL,
        "incidents": [
            {
                "pdf_path": str(run.pdf_path),
                "incident": run.incident.model_dump(),
                "research": run.research.model_dump(),
                "synthesis": run.synthesis.model_dump(),
            }
            for run in runs
        ],
    }
    OUTPUT_PATH.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
    print(f"\nWrote {OUTPUT_PATH}")

Let's go!

In [ ]:
@braintrust.traced
async def run_one_incident(pdf_path: Path) -> IncidentRun:
    print(f"\n=== {pdf_path} ===")
    print("1. Extracting incident details...")
    incident = await extract_incident(pdf_path)
    print(f"   Incident {incident.incident_id}: {incident.headline}")

    if incident.research_targets:
        print("   Research targets:")
        for target in incident.research_targets:
            print(f"      - {target.kind}: {target.name} ({target.reason})")

    print("2. Researching records and web sources...")
    research = await research_incident(incident)
    print(f"   {research.summary}")

    print("3. Synthesizing...")
    synthesis = await synthesize_incident(incident, research)
    print("\n" + markdown_report(incident, research, synthesis))
    return IncidentRun(pdf_path=pdf_path, incident=incident, research=research, synthesis=synthesis)
In [ ]:
runs = []
for pdf_path in INCIDENT_PDF_PATHS[:3]:
    runs.append(await run_one_incident(pdf_path))
write_outputs(runs)