# Install required packages
!pip install --quiet ipywidgets 'pydantic-ai-slim[openrouter,web-fetch]' python-dotenv pymupdf requests
import os
import urllib.request
import zipfile
# Download and extract data files
url = 'https://github.com/jsoma/workshop-ai-agents/raw/main/docs/04-pydantic-ai-and-oil-spills-more-orchestration/04-pydantic-ai-north-dakota-spills-data.zip'
print(f'Downloading data from {url}...')
urllib.request.urlretrieve(url, '04-pydantic-ai-north-dakota-spills-data.zip')
print('Extracting 04-pydantic-ai-north-dakota-spills-data.zip...')
with zipfile.ZipFile('04-pydantic-ai-north-dakota-spills-data.zip', 'r') as zip_ref:
zip_ref.extractall('.')
os.remove('04-pydantic-ai-north-dakota-spills-data.zip')
print('✓ Data files extracted!')
Let's see what Pydantic AI can do for us.
We start with a PDF about an oil spill from the Hazconnect system.
Information on the incident reports can be incomplete, so we then use Pydantic AI to extract the entities from the document, perform research, and write a report.
The important part of this one is it's a structured path. Does it actually benefit from being agentic?
from __future__ import annotations
import asyncio
import ipaddress
import json
import os
import re
import socket
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import Any, Literal
from urllib.parse import quote_plus, urljoin, urlparse
import braintrust
import pymupdf
import requests
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from markdownify import markdownify as md
from pydantic import BaseModel, Field
from pydantic_ai import Agent
from getpass import getpass
from braintrust.wrappers.pydantic_ai import setup_pydantic_ai
load_dotenv()
if not os.getenv("OPENROUTER_API_KEY"):
os.environ["OPENROUTER_API_KEY"] = getpass("OpenRouter API key: ")
if not os.getenv("BRAINTRUST_API_KEY"):
os.environ["BRAINTRUST_API_KEY"] = getpass("Braintrust API key: ")
if os.getenv("BRAINTRUST_API_KEY"):
setup_pydantic_ai(project_name="dataharvest-2026")
print("Braintrust connection established.")
else:
print("Set BRAINTRUST_API_KEY to enable Braintrust tracing.")
MODEL = "openrouter:google/gemini-3.1-flash-lite"
INCIDENT_PDF_PATHS = [
Path("data/incidents/5981_95272.pdf"),
Path("data/incidents/5984_95276.pdf"),
Path("data/incidents/5976_95267.pdf"),
Path("data/incidents/5978_95269.pdf"),
Path("data/incidents/5986_95278.pdf"),
]
OUTPUT_PATH = Path("outputs/north_dakota_spill_research_reports.json")
DMR_WELL_SEARCH_URL = "https://www.dmr.nd.gov/oilgas/findwellsvw.asp"
ND_SOS_BUSINESS_SEARCH_URL = "https://firststop.sos.nd.gov/api/Records/businesssearch"
ND_SOS_BASE_URL = "https://firststop.sos.nd.gov"
FMCSA_BASE_URL = "https://safer.fmcsa.dot.gov/"
TAVILY_SEARCH_URL = "https://api.tavily.com/search"
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36"
)
class SourceLedgerEntry(BaseModel):
title: str
url: str
source_type: str
accessed_at: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
notes: str | None = None
class IncidentLocation(BaseModel):
address: str | None = None
description: str | None = None
latitude: float | None = None
longitude: float | None = None
section: str | None = None
township: str | None = None
range: str | None = None
county_tribe: str | None = None
class IncidentParty(BaseModel):
role: str
name: str | None = None
company: str | None = None
title: str | None = None
address: str | None = None
phone: str | None = None
class ReleasedMaterial(BaseModel):
material: str
quantity: str | None = None
notes: str | None = None
class ResearchTarget(BaseModel):
kind: Literal["company", "carrier", "transporter", "operator", "well", "field", "location", "document", "other"]
name: str
reason: str
class ExtractedIncident(BaseModel):
pdf_path: str
incident_id: str
headline: str
description: str
incident_date_time: str | None = None
location: IncidentLocation
released_materials: list[ReleasedMaterial] = Field(default_factory=list)
well_name: str | None = None
ndic_file_number: str | None = None
oil_gas_operator: str | None = None
oil_gas_field_name: str | None = None
facility_company_name: str | None = None
facility_name: str | None = None
reporting_party: IncidentParty | None = None
contacts: list[IncidentParty] = Field(default_factory=list)
transportation_related: bool
oil_gas_related: bool
research_targets: list[ResearchTarget] = Field(default_factory=list)
source_ledger: list[SourceLedgerEntry] = Field(default_factory=list)
class ResearchFinding(BaseModel):
kind: Literal["company", "carrier", "well", "field", "document", "other"]
name: str
relation: str | None = None
summary: str
source_ledger: list[SourceLedgerEntry] = Field(default_factory=list)
class IncidentResearch(BaseModel):
summary: str
searches_performed: list[str] = Field(default_factory=list)
findings: list[ResearchFinding] = Field(default_factory=list)
unresolved_questions: list[str] = Field(default_factory=list)
source_ledger: list[SourceLedgerEntry] = Field(default_factory=list)
class SpecialistFindings(BaseModel):
company_and_carrier: list[ResearchFinding]
well_and_field: list[ResearchFinding]
outside_document: list[ResearchFinding]
class IncidentSynthesis(BaseModel):
incident_id: str
narrative: str
parties_involved: list[str] = Field(default_factory=list)
what_happened: list[str] = Field(default_factory=list)
related_wells_or_fields: list[str] = Field(default_factory=list)
useful_sources: list[SourceLedgerEntry] = Field(default_factory=list)
unresolved_leads: list[str] = Field(default_factory=list)
@dataclass
class IncidentRun:
pdf_path: Path
incident: ExtractedIncident
research: IncidentResearch
synthesis: IncidentSynthesis
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def make_session() -> requests.Session:
session = requests.Session()
session.headers.update({"User-Agent": USER_AGENT, "Accept-Language": "en-US,en;q=0.9"})
return session
def compact_text(value: Any) -> str:
return re.sub(r"\s+", " ", str(value or "").replace("\xa0", " ")).strip()
def tool_json(payload: dict[str, Any]) -> str:
return json.dumps(payload, indent=2, ensure_ascii=False)
def ledger(title: str, url: str, source_type: str, notes: str | None = None) -> dict[str, str | None]:
return {"title": title, "url": url, "source_type": source_type, "accessed_at": now_iso(), "notes": notes}
def unique_ledgers(entries: list[SourceLedgerEntry]) -> list[SourceLedgerEntry]:
seen: set[tuple[str, str, str]] = set()
unique = []
for entry in entries:
key = (entry.title, entry.url, entry.source_type)
if key in seen:
continue
seen.add(key)
unique.append(entry)
return unique
def fill_research_bookkeeping(research: IncidentResearch) -> IncidentResearch:
source_ledger = unique_ledgers(
research.source_ledger + [entry for finding in research.findings for entry in finding.source_ledger]
)
searches_performed = research.searches_performed or [
entry.title
for entry in source_ledger
if entry.source_type
in {
"state_business_records",
"state_oil_gas_records",
"federal_transportation_records",
"web_search",
"web_page",
"remote_pdf",
}
]
research.source_ledger = source_ledger
research.searches_performed = list(dict.fromkeys(searches_performed))
return research
def read_incident_pdf_text(pdf_path: str) -> str:
"""Read one local North Dakota HazConnect incident PDF."""
path = Path(pdf_path)
if not path.exists():
raise FileNotFoundError(f"Incident PDF does not exist: {pdf_path}")
doc = pymupdf.open(path)
text = "\n\n".join(page.get_text() for page in doc)
return tool_json(
{
"pdf_path": str(path),
"page_count": doc.page_count,
"text": text,
"source_ledger": [
ledger(
f"Local HazConnect incident PDF {path.name}",
str(path),
"local_pdf",
"Read with PyMuPDF from the local data/incidents folder.",
)
],
}
)
def parse_dmr_well_table(html: str) -> list[dict[str, str]]:
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table", attrs={"summary": re.compile("Well Log search results", re.I)})
if table is None:
return []
headers = [compact_text(cell.get_text(" ", strip=True)) for cell in table.find_all("th")]
rows = []
for tr in table.find_all("tr"):
cells = [compact_text(cell.get_text(" ", strip=True)) for cell in tr.find_all("td")]
if cells and len(cells) == len(headers):
rows.append(dict(zip(headers, cells, strict=True)))
return rows
def search_nd_oil_gas_wells(
operator: str = "",
field: str = "",
section: str = "",
township: str = "",
range_number: str = "",
limit: int = 25,
) -> str:
"""Search North Dakota DMR well records. Use numeric township/range values, e.g. 152 and 98."""
def dmr_number(value: str | int | None) -> str:
match = re.search(r"\d+", str(value or ""))
return match.group(0) if match else "0"
payload = {
"VTI-GROUP": "0",
"ddmOperator": operator or " ",
"ddmField": field or " ",
"ddmSection": dmr_number(section),
"ddmTownship": dmr_number(township),
"ddmRange": dmr_number(range_number),
"B1": "Submit",
}
response = make_session().post(
DMR_WELL_SEARCH_URL,
data=payload,
headers={"Origin": "https://www.dmr.nd.gov", "Referer": DMR_WELL_SEARCH_URL},
timeout=30,
)
response.raise_for_status()
rows = parse_dmr_well_table(response.text)
return tool_json(
{
"query": payload,
"row_count": len(rows),
"rows": rows[:limit],
"truncated": len(rows) > limit,
"source_ledger": [
ledger("ND DMR oil/gas well search results", DMR_WELL_SEARCH_URL, "state_oil_gas_records")
],
}
)
def search_nd_sos_businesses(search_value: str, active_only: bool = False, limit: int = 5) -> str:
"""Search North Dakota Secretary of State FirstStop business records."""
session = make_session()
session.get(f"{ND_SOS_BASE_URL}/search/business", timeout=30)
response = session.post(
ND_SOS_BUSINESS_SEARCH_URL,
json={"SEARCH_VALUE": search_value, "STARTS_WITH_YN": False, "ACTIVE_ONLY_YN": active_only},
headers={
"authorization": "undefined",
"content-type": "application/json",
"origin": ND_SOS_BASE_URL,
"referer": f"{ND_SOS_BASE_URL}/search/business",
},
timeout=30,
)
response.raise_for_status()
records = sorted(response.json()["rows"].values(), key=lambda row: row["SORT_INDEX"])[:limit]
return tool_json(
{
"search_value": search_value,
"record_count": len(records),
"records": records,
"source_ledger": [
ledger("North Dakota FirstStop business search", ND_SOS_BUSINESS_SEARCH_URL, "state_business_records")
],
}
)
def search_fmcsa_carriers(company_name: str, limit: int = 5) -> str:
"""Search FMCSA SAFER carrier records by company name."""
search_url = f"{FMCSA_BASE_URL}keywordx.asp?searchstring=%2A{quote_plus(company_name)}%2A&SEARCHTYPE="
try:
response = make_session().get(search_url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
matches = [
{
"match_name": compact_text(anchor.get_text(" ", strip=True)),
"snapshot_url": urljoin(FMCSA_BASE_URL, anchor["href"]),
}
for anchor in soup.find_all("a", href=True)
if "queryCarrierSnapshot" in anchor["href"]
][:limit]
return tool_json(
{
"company_name": company_name,
"match_count": len(matches),
"matches": matches,
"source_ledger": [ledger("FMCSA SAFER carrier search", search_url, "federal_transportation_records")],
}
)
except requests.RequestException as error:
return tool_json(
{
"company_name": company_name,
"match_count": 0,
"matches": [],
"source_ledger": [
ledger(
"FMCSA SAFER carrier search failed",
search_url,
"federal_transportation_records",
f"{type(error).__name__}: {error}",
)
],
}
)
def tavily_search(query: str, max_results: int = 5) -> str:
"""Search the web with Tavily."""
response = requests.post(
TAVILY_SEARCH_URL,
headers={"Authorization": f"Bearer {os.environ['TAVILY_API_KEY']}"},
json={
"query": query,
"max_results": max(1, min(max_results, 8)),
"search_depth": "basic",
"include_answer": False,
"include_raw_content": False,
},
timeout=30,
)
response.raise_for_status()
return tool_json(
{
"query": query,
"results": response.json()["results"],
"source_ledger": [ledger(f"Tavily search: {query}", TAVILY_SEARCH_URL, "web_search")],
}
)
def assert_safe_public_url(url: str) -> str:
parsed = urlparse(url)
if parsed.scheme not in {"http", "https"} or not parsed.hostname:
raise ValueError("Only public http(s) URLs can be fetched.")
hostname = parsed.hostname.lower()
if hostname in {"localhost", "127.0.0.1", "::1"} or hostname.endswith(".local"):
raise ValueError("Local URLs are not allowed.")
for info in socket.getaddrinfo(hostname, None):
ip = ipaddress.ip_address(info[4][0])
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_multicast or ip.is_reserved:
raise ValueError(f"Refusing to fetch non-public address {ip}.")
return parsed.geturl()
def safe_fetch_url(url: str, max_chars: int = 12000) -> str:
"""Fetch a public web page or PDF and return extracted text."""
try:
response = make_session().get(assert_safe_public_url(url), timeout=35)
response.raise_for_status()
content = response.content
content_type = response.headers.get("content-type", "")
is_pdf = "pdf" in content_type.lower() or content[:5] == b"%PDF-" or urlparse(response.url).path.endswith(".pdf")
if is_pdf:
doc = pymupdf.open(stream=content, filetype="pdf")
text = "\n\n".join(page.get_text() for page in doc)
source_type = "remote_pdf"
else:
html = content.decode(response.encoding or "utf-8", errors="replace")
text = md(html, strip=["script", "style", "img"]).strip()
source_type = "web_page"
return tool_json(
{
"url": response.url,
"content_type": content_type,
"text": text[:max_chars],
"truncated": len(text) > max_chars,
"source_ledger": [ledger("Fetched URL", response.url, source_type, f"HTTP {response.status_code}")],
}
)
except Exception as error: # noqa: BLE001 - bad search results should become evidence, not crashes.
return tool_json(
{
"url": url,
"text": "",
"truncated": False,
"source_ledger": [ledger("Fetch failed", url, "web_page", f"{type(error).__name__}: {error}")],
}
)
extraction_agent = Agent(
MODEL,
output_type=ExtractedIncident,
tool_retries=2,
tools=[read_incident_pdf_text],
instructions="""
You extract structured details from one North Dakota HazConnect incident PDF.
Read the PDF path provided by the user, then fill the schema literally.
Do not infer missing names or numbers. Capture companies, transporters,
operators, wells, fields, facilities, addresses, location clues, and materials
when the PDF states them. Create research_targets for leads an investigator
should pursue next, and preserve the local PDF source ledger.
""",
)
company_agent = Agent(
MODEL,
output_type=list[ResearchFinding],
tool_retries=2,
tools=[search_nd_sos_businesses, search_fmcsa_carriers],
instructions="""
You are the company and carrier research specialist.
Use the incident details and research targets to decide which names are worth
checking. Search North Dakota business records for companies/operators, and
FMCSA records for likely transporters, haulers, or carriers. Return [] if this
specialty has no useful lead. Preserve source ledger entries from tool results.
""",
)
well_agent = Agent(
MODEL,
output_type=list[ResearchFinding],
tool_retries=2,
tools=[search_nd_oil_gas_wells],
instructions="""
You are the oil/gas well and field research specialist.
Use explicit well, operator, field, section, township, and range details from
the incident. If the incident gives section/township/range, a location search is
usually the best first move. Return [] if there is no meaningful oil/gas or
location lead.
Do not treat a transportation or service company as a DMR operator unless the
incident says it is one. A location-only well match is possibly_involved or
context_only, not definitely_involved. Preserve source ledger entries.
""",
)
document_agent = Agent(
MODEL,
output_type=list[ResearchFinding],
tool_retries=2,
tools=[tavily_search, safe_fetch_url],
instructions="""
You are the outside-document research specialist.
Build your own web queries from the incident. Look for public pages or PDFs that
explain ownership, operators, fields, facilities, permits, stops, providers, or
prior incidents. Keep this bounded: at most two web searches and two fetches.
Tavily search results are leads, not evidence: fetch a promising result before
turning it into a finding. Return document findings only, or [] if no outside
document adds useful context. Preserve source ledgers.
""",
)
research_editor_agent = Agent(
MODEL,
output_type=IncidentResearch,
instructions="""
You are the research editor for one North Dakota spill report.
Specialist agents have already searched company/carrier records, oil/gas well
records, and outside public documents. Combine their outputs into one concise
research record. Use only the supplied incident and specialist findings. Do not
invent facts. Preserve source ledger entries from the specialist findings.
Keep useful outside-document findings visible as kind="document"; do not fold
them into company or well findings.
Well relation rules:
- definitely_involved: the PDF or a public record names the specific well,
operator, file number, or facility involved.
- possibly_involved: a well appears in the same section/township/range or field
but the incident does not name that well directly.
- context_only: nearby or background records useful for understanding the area.
""",
)
synthesis_agent = Agent(
MODEL,
output_type=IncidentSynthesis,
instructions="""
Synthesize one North Dakota spill investigation using only the extracted
incident and research findings. Write a compact, specific narrative explaining
what happened, who appears involved, which public records were found, what
wells/fields may be related, and what remains unresolved. Preserve useful
source ledger entries.
""",
)
async def extract_incident(pdf_path: Path) -> ExtractedIncident:
response = await extraction_agent.run(f"Extract the incident report at {pdf_path}.")
return response.output
async def research_incident(incident: ExtractedIncident) -> IncidentResearch:
prompt = dedent(
"""
Research your specialty for this spill report.
Incident:
{incident}
"""
).format(incident=incident.model_dump_json(indent=2))
print(" company/carrier specialist...")
company_response = await company_agent.run(prompt)
print(" well/field specialist...")
well_response = await well_agent.run(prompt)
print(" outside-document specialist...")
document_response = await document_agent.run(prompt)
specialist_findings = SpecialistFindings(
company_and_carrier=company_response.output,
well_and_field=well_response.output,
outside_document=document_response.output,
)
response = await research_editor_agent.run(
dedent(
"""
Combine this extracted incident and the specialist findings into one
structured research record.
Incident:
{incident}
Specialist findings:
{findings}
"""
).format(
incident=incident.model_dump_json(indent=2),
findings=specialist_findings.model_dump_json(indent=2),
)
)
return fill_research_bookkeeping(response.output)
async def synthesize_incident(incident: ExtractedIncident, research: IncidentResearch) -> IncidentSynthesis:
response = await synthesis_agent.run(
dedent(
"""
Extracted incident:
{incident}
Research findings:
{research}
"""
).format(incident=incident.model_dump_json(indent=2), research=research.model_dump_json(indent=2))
)
return response.output
def markdown_bullets(items: list[str]) -> str:
return "\n".join(f"- {item}" for item in items) if items else "- None noted."
def markdown_sources(sources: list[SourceLedgerEntry]) -> str:
if not sources:
return "- None recorded."
return "\n".join(
f"- [{source.title}]({source.url}) ({source.source_type})"
+ (f" - {source.notes}" if source.notes else "")
for source in sources
)
def markdown_report(
incident: ExtractedIncident,
research: IncidentResearch,
synthesis: IncidentSynthesis,
) -> str:
sources = unique_ledgers(synthesis.useful_sources or research.source_ledger)
findings = "\n".join(
f"- **{finding.kind}: {finding.name}**"
+ (f" ({finding.relation})" if finding.relation else "")
+ f" - {finding.summary}"
for finding in research.findings
)
sections = [
f"## Incident {incident.incident_id}: {incident.headline}",
synthesis.narrative,
"### Parties Involved\n" + markdown_bullets(synthesis.parties_involved),
"### What Happened\n" + markdown_bullets(synthesis.what_happened),
"### Related Wells or Fields\n" + markdown_bullets(synthesis.related_wells_or_fields),
"### Research Findings\n" + (findings or "- None recorded."),
"### Unresolved Leads\n" + markdown_bullets(synthesis.unresolved_leads),
"### Sources\n" + markdown_sources(sources),
]
return "\n\n".join(section.strip() for section in sections)
def write_outputs(runs: list[IncidentRun]) -> None:
OUTPUT_PATH.parent.mkdir(exist_ok=True)
payload = {
"generated_at": now_iso(),
"model": MODEL,
"incidents": [
{
"pdf_path": str(run.pdf_path),
"incident": run.incident.model_dump(),
"research": run.research.model_dump(),
"synthesis": run.synthesis.model_dump(),
}
for run in runs
],
}
OUTPUT_PATH.write_text(json.dumps(payload, indent=2, ensure_ascii=False), encoding="utf-8")
print(f"\nWrote {OUTPUT_PATH}")
@braintrust.traced
async def run_one_incident(pdf_path: Path) -> IncidentRun:
print(f"\n=== {pdf_path} ===")
print("1. Extracting incident details...")
incident = await extract_incident(pdf_path)
print(f" Incident {incident.incident_id}: {incident.headline}")
if incident.research_targets:
print(" Research targets:")
for target in incident.research_targets:
print(f" - {target.kind}: {target.name} ({target.reason})")
print("2. Researching records and web sources...")
research = await research_incident(incident)
print(f" {research.summary}")
print("3. Synthesizing...")
synthesis = await synthesize_incident(incident, research)
print("\n" + markdown_report(incident, research, synthesis))
return IncidentRun(pdf_path=pdf_path, incident=incident, research=research, synthesis=synthesis)
runs = []
for pdf_path in INCIDENT_PDF_PATHS[:3]:
runs.append(await run_one_incident(pdf_path))
write_outputs(runs)