Run in Colab Download notebook
In [ ]:
# Install required packages
!pip install --upgrade --quiet 'pydantic-ai-slim[google]'
!pip install --upgrade --quiet playwright
!pip install --upgrade --quiet beautifulsoup4
!pip install --upgrade --quiet pandas
!pip install --upgrade --quiet lxml
!pip install --upgrade --quiet html5lib
!pip install --upgrade --quiet panel
!pip install --upgrade --quiet jupyter_bokeh

print('✓ Packages installed!')

Scraper-Writer Agent

A conversational AI agent that explores websites and writes Playwright scraping scripts for you.

How it works: You describe what you want to scrape in plain English. The agent navigates the page using Playwright, examines the DOM structure, and produces a complete scraping script that runs without AI in the loop.

What you need:

  • A Google account
  • A free GOOGLE_API_KEY from Google AI Studio
  • Set it as an environment variable, or paste it in the cell below
In [1]:
#%pip install --quiet "pydantic-ai-slim[google]" playwright beautifulsoup4 pandas lxml html5lib panel jupyter_bokeh
# !playwright install-deps
# !playwright install chromium firefox
In [2]:
import os
try:
    from dotenv import load_dotenv
    load_dotenv()
except ImportError:
    pass
# Detect if we're running in Google Colab
IN_COLAB = 'COLAB_GPU' in os.environ or 'COLAB_RELEASE_TAG' in os.environ

import re
from dataclasses import dataclass

from bs4 import BeautifulSoup, Comment, NavigableString
from playwright.async_api import async_playwright
from pydantic_ai import Agent, RunContext

# Your Google AI API key — set as env var or paste here
if not os.environ.get("GOOGLE_API_KEY"):
    raise ValueError("Set GOOGLE_API_KEY env var first — get one free at https://aistudio.google.com/apikey")
In [3]:
# Colab can't open a visible browser, so we run headless there
if IN_COLAB:
    use_headless = True
else:
    use_headless = False

playwright = await async_playwright().start()
browser = await playwright.firefox.launch(headless=use_headless)
page = await browser.new_page()
await page.add_init_script("""
    Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
""")
In [4]:
MAX_CAPTURED = 100
captured_requests = []

async def log_api_response(response):
    """Background listener that captures XHR/fetch JSON responses."""
    request = response.request
    if request.resource_type not in ["xhr", "fetch"]:
        return
    content_type = response.headers.get("content-type", "")
    if "json" not in content_type:
        return
    try:
        body = await response.json()
        record_count = "unknown"
        if isinstance(body, list):
            record_count = len(body)
        elif isinstance(body, dict):
            for key in ["results", "data", "items", "records", "rows", "hits"]:
                if key in body and isinstance(body[key], list):
                    record_count = len(body[key])
                    break
        captured_requests.append({
            "url": request.url,
            "method": request.method,
            "status": response.status,
            "record_count": record_count,
            "post_data": (request.post_data or "")[:500],
        })
        if len(captured_requests) > MAX_CAPTURED:
            captured_requests.pop(0)
        print(f"  API: [{request.method}] {response.status} | ~{record_count} records | {request.url[:120]}")
    except Exception:
        pass

page.on("response", log_api_response)
print("Network capture active — API requests will be logged automatically.")
Network capture active — API requests will be logged automatically.
In [5]:
JUNK_TAGS = {"script", "style", "svg", "noscript", "iframe", "path", "defs", "clippath"}
COLLAPSE_TAGS = {"nav", "footer", "header", "aside"}
KEY_ATTRS = ["name", "type", "role", "data-testid", "aria-label", "for", "action"]
# data-* attrs to skip — framework noise that doesn't help build selectors
_JUNK_DATA_ATTRS = {"data-testid", "data-reactid", "data-reactroot", "data-react-helmet",
                     "data-emotion-css", "data-styled", "data-rbd-draggable-id",
                     "data-radix-collection-item"}


def _is_generated_class(cls):
    """Detect auto-generated CSS class names (styled-components, emotion, CSS modules, JSS)."""
    if re.match(r'^sc-[a-f0-9]+-\d+$', cls):
        return True
    if re.match(r'^css-[a-z0-9]+$', cls):
        return True
    if re.search(r'__[a-zA-Z0-9]{5,}$', cls):
        return True
    if re.match(r'^jss\d+$', cls):
        return True
    return False


def dom_to_tree(html, selector=None, max_depth=12, max_text=80):
    """Convert HTML to a compact tree outline for LLM consumption.

    Shows tag#id.class[attr=val] notation with sibling deduplication,
    collapsed nav/footer/header/aside, and junk tag removal.
    """
    soup = BeautifulSoup(html, "html.parser")

    if selector:
        root = soup.select_one(selector)
        if not root:
            return f"No element found for selector: {selector}"
    else:
        root = soup.find("body") or soup

    # Strip junk
    for tag in root.find_all(JUNK_TAGS):
        tag.decompose()
    for comment in root.find_all(string=lambda t: isinstance(t, Comment)):
        comment.extract()

    lines = []

    def sig(tag):
        """tag#id.class[attr=val] signature."""
        s = tag.name
        if tag.get("id"):
            s += f"#{tag['id']}"
        classes = [c for c in tag.get("class", []) if not _is_generated_class(c)]
        if classes:
            s += "." + ".".join(classes[:3])
        for attr in KEY_ATTRS:
            val = tag.get(attr)
            if val:
                s += f"[{attr}={str(val)[:40]}]"
        if tag.name == "a" and tag.get("href"):
            href = tag["href"]
            if not href.startswith("javascript:"):
                s += f"[href={href[:60]}]"
        # Show one meaningful data-* attr (skip framework junk and hex hashes)
        for attr in tag.attrs if hasattr(tag, 'attrs') else []:
            if attr.startswith("data-") and attr not in _JUNK_DATA_ATTRS:
                val = str(tag[attr])
                if val and not re.match(r'^[a-f0-9]{6,}$', val):
                    s += f"[{attr}={val[:30]}]"
                    break
        return s

    def structure(tag):
        """Fingerprint for sibling deduplication: signature + child tag names."""
        child_tags = tuple(c.name for c in tag.children if hasattr(c, 'name') and c.name)
        return (sig(tag), child_tags)

    def direct_text(node):
        """First direct text content of a node (not from child elements)."""
        for child in node.children:
            if isinstance(child, NavigableString) and not isinstance(child, Comment):
                text = child.strip()
                if text:
                    return text[:max_text] + ("..." if len(text) > max_text else "")
        return ""

    def render(node, depth=0):
        if not hasattr(node, 'name') or node.name is None:
            return

        if depth > max_depth:
            child_count = sum(1 for c in node.children if hasattr(c, 'name') and c.name)
            if child_count:
                lines.append("  " * depth + f"[{child_count} children, truncated]")
            return

        # Collapse nav, footer, header, aside
        if node.name in COLLAPSE_TAGS:
            lines.append("  " * depth + f"{sig(node)}: [collapsed]")
            return

        node_sig = sig(node)
        text = direct_text(node)
        if text:
            lines.append("  " * depth + f'{node_sig}: "{text}"')
        else:
            lines.append("  " * depth + node_sig)

        # Process children with sibling deduplication
        children = [c for c in node.children if hasattr(c, 'name') and c.name]
        i = 0
        while i < len(children):
            child = children[i]
            fingerprint = structure(child)

            # Count consecutive siblings with same structure
            run = 1
            while i + run < len(children) and structure(children[i + run]) == fingerprint:
                run += 1

            if run >= 3:
                # Show first example, then summary
                render(child, depth + 1)
                lines.append("  " * (depth + 1) + f"... and {run - 1} more {child.name} with same structure")
                i += run
            else:
                render(child, depth + 1)
                i += 1

    render(root)
    return "\n".join(lines)


print("DOM tree helper ready.")
DOM tree helper ready.
In [ ]:
@dataclass
class Deps:
    page: object

MODEL_NAME = "google-gla:gemini-2.5-flash"
agent = Agent(
    MODEL_NAME,
    deps_type=Deps,
    retries=3,
    system_prompt=(
        "You are a scraper-writing assistant for journalists. You explore websites "
        "using your browser tools and write Playwright + BeautifulSoup scraping scripts.\n"
        "\n"
        "## Your workflow\n"
        "1. Navigate to the target URL and read the page structure\n"
        "2. Use scan_page() on long pages to find tables and data sections that might be below the fold\n"
        "3. Use list_elements() to see forms, dropdowns, and buttons\n"
        "4. Interact with forms if needed (fill inputs, select dropdowns, click buttons)\n"
        "5. Check get_network_requests() to see if the site uses a hidden JSON API\n"
        "6. Write a complete, runnable Playwright script\n"
        "\n"
        "## Code style rules (MANDATORY)\n"
        "- Use flat async/await Playwright — NO asyncio, NO sync_playwright\n"
        "    - NO: playwright = sync_playwright().start()\n"
        "    - YES: playwright = await async_playwright().start()\n"
        "- Do not put main code in function, it will be run in a cell and we need the outputs\n"
        "    - NO: async def scrape_page(): ...; await main(), `df` in function\n"
        "    - YES: all code at top level, `df` available at the end of the cell\n"
        "- Code must be runnable in Jupyter notebook cells (top-level await)\n"
        "- Use firefox as your browser of choice\n"
        "- Use `await page.content()` + BeautifulSoup for extraction\n"
        "- Build DataFrames from a list of dicts\n"
        "- Extract each field defensively with try/except per field\n"
        "- Use meaningful CSS selectors — NEVER use generated classes like `.sc-ae8b6d27-3` or `.iUtzsJ`\n"
        "- Prefer: `.result`, `.card`, `div:has(h2)`, `tr:has(td)`, `[data-testid=...]`\n"
        "- All timeouts in your generated code must be >= 10 seconds (10000ms)\n"
        "- Avoid page.wait_for_timeout() — use page.wait_for_selector() instead\n"
        "- Don't close the browser — the user handles that\n"
        "- Show results with df.head()\n"
        "- Use Firefox (`playwright.firefox.launch`) when downloading PDFs\n"
        "\n"
        "## Important behaviors\n"
        "- If you discover a hidden JSON API via network requests, TELL THE USER immediately "
        "and offer a simpler requests-based approach instead\n"
        "- Always test one page of results before writing a pagination loop\n"
        "- When writing the final script, include ALL setup code (imports, browser launch) "
        "so it runs in a fresh notebook cell\n"
        "- Be concise — journalists are busy\n"
        "- If results use pagination, check read_page('nav') or scan_page() to find pagination controls\n"
        "\n"
        "## When you get stuck\n"
        "- ASK THE USER a clarifying question instead of giving up\n"
        "- If a button appears disabled, WAIT — pages often enable buttons after JS runs\n"
        "- If a submit button won't click or stays disabled after using fill_input(), try "
        "type_input() instead — some sites need real keypress events to enable their buttons.\n"
        "Remember that you'll have to hit backspace if there is existing content in the form.\n"
        "- Try at least 2-3 approaches before asking for help\n"
        "- If you successfully performed an action (clicked, filled a form), read the page again to see what changed\n"
    ),
)


# --- Tools ---
    
@agent.tool
async def navigate(ctx: RunContext[Deps], url: str) -> str:
    """Navigate to a URL. Returns the page title."""
    print(f"  -> navigate({url})")
    try:
        await ctx.deps.page.goto(url, timeout=30000, wait_until="domcontentloaded")
        try:
            await ctx.deps.page.wait_for_load_state("networkidle", timeout=5000)
        except Exception:
            pass
        title = await ctx.deps.page.title()
        return f"Navigated to: {title} ({url})"
    except Exception as e:
        return f"Error navigating to {url}: {str(e)[:200]}"


@agent.tool
async def read_page(ctx: RunContext[Deps], selector: str = "") -> str:
    """Get a compact tree outline of the page DOM showing tag names, classes, IDs, and attributes.
    Pass a CSS selector to zoom into a specific section for more detail."""
    print(f"  -> read_page({selector!r})")
    try:
        html = await ctx.deps.page.content()
        tree = dom_to_tree(html, selector=selector if selector else None)
        if not tree.strip():
            return "Page appears empty or could not be parsed."
        if len(tree) > 15000:
            tree = tree[:15000] + "\n... [truncated — use a more specific selector to zoom in, or try scan_page() to find data sections]"
        return tree
    except Exception as e:
        return f"Error reading page: {str(e)[:200]}"


@agent.tool
async def scan_page(ctx: RunContext[Deps]) -> str:
    """Quick inventory of data-bearing elements on the full page: tables, long lists,
    and landmark sections. Use on long pages where read_page might truncate content."""
    print("  -> scan_page()")
    try:
        result = await ctx.deps.page.evaluate("""() => {
            const items = [];
            document.querySelectorAll('table').forEach(el => {
                const rows = el.querySelectorAll('tr').length;
                const id = el.id ? '#' + el.id : '';
                const cls = [...el.classList].find(c => !/^(sc-|css-|jss)/.test(c));
                const selector = 'table' + id + (cls ? '.' + cls : '');
                const headers = Array.from(el.querySelectorAll('th'))
                    .map(th => th.textContent.trim()).filter(Boolean).slice(0, 8);
                items.push('TABLE ' + selector + ': ' + rows + ' rows'
                    + (headers.length ? ' | cols: ' + headers.join(', ') : ''));
            });
            document.querySelectorAll('ul, ol').forEach(el => {
                const liCount = el.querySelectorAll(':scope > li').length;
                if (liCount < 3) return;
                const id = el.id ? '#' + el.id : '';
                const cls = [...el.classList].find(c => !/^(sc-|css-|jss)/.test(c));
                const tag = el.tagName.toLowerCase();
                const sample = el.querySelector('li')?.textContent?.trim()?.slice(0, 50) || '';
                items.push(tag.toUpperCase() + ' ' + tag + id + (cls ? '.' + cls : '')
                    + ': ' + liCount + ' items | first: "' + sample + '"');
            });
            document.querySelectorAll('main, article, section').forEach(el => {
                const id = el.id ? '#' + el.id : '';
                if (!id && !el.getAttribute('aria-label')) return;
                const tag = el.tagName.toLowerCase();
                const label = el.getAttribute('aria-label') || '';
                const heading = el.querySelector('h1,h2,h3')?.textContent?.trim()?.slice(0, 60) || '';
                items.push('SECTION ' + tag + id + (label ? ' [' + label + ']' : '')
                    + (heading ? ': "' + heading + '"' : ''));
            });
            return items.slice(0, 30);
        }""")
        if not result:
            return "No tables, lists, or labeled sections found."
        return "\n".join(result)
    except Exception as e:
        return f"Error scanning page: {str(e)[:200]}"


@agent.tool
async def list_elements(ctx: RunContext[Deps]) -> str:
    """List interactive elements on the page: buttons, inputs, dropdowns (with their options), and links."""
    print(f"  -> list_elements()")
    try:
        result = await ctx.deps.page.evaluate("""() => {
            const items = [];
            document.querySelectorAll('button, [role=button], input[type=submit], input[type=button]').forEach(el => {
                const text = (el.textContent || el.value || '').trim().slice(0, 80);
                if (text) items.push({type: 'button', text, id: el.id || '', name: el.name || ''});
            });
            document.querySelectorAll('input[type=text], input[type=search], input:not([type]), textarea').forEach(el => {
                const label = el.labels?.[0]?.textContent?.trim() || el.placeholder || el.getAttribute('aria-label') || el.name || el.id || '';
                items.push({type: 'input', label: label.slice(0, 80), id: el.id || '', name: el.name || ''});
            });
            document.querySelectorAll('select').forEach(el => {
                const label = el.labels?.[0]?.textContent?.trim() || el.getAttribute('aria-label') || el.name || el.id || '';
                const options = Array.from(el.options).map(o => o.text.trim()).filter(t => t).slice(0, 20);
                items.push({type: 'select', label: label.slice(0, 80), id: el.id || '', name: el.name || '', options});
            });
            let linkCount = 0;
            document.querySelectorAll('a[href]').forEach(el => {
                if (linkCount >= 20) return;
                const text = (el.textContent || '').trim().slice(0, 80);
                if (text && text.length > 1) {
                    items.push({type: 'link', text, href: el.href.slice(0, 120)});
                    linkCount++;
                }
            });
            return items;
        }""")
        if not result:
            return "No interactive elements found."
        lines = []
        for item in result:
            if item["type"] == "select":
                opts = ", ".join(item.get("options", [])[:10])
                more = f" (+{len(item.get('options', [])) - 10} more)" if len(item.get("options", [])) > 10 else ""
                lines.append(f"SELECT '{item['label']}' (id={item.get('id','')}): [{opts}{more}]")
            elif item["type"] == "input":
                lines.append(f"INPUT '{item['label']}' (id={item.get('id','')}, name={item.get('name','')})")
            elif item["type"] == "button":
                lines.append(f"BUTTON '{item['text']}' (id={item.get('id','')})")
            elif item["type"] == "link":
                lines.append(f"LINK '{item['text']}'")
        return "\n".join(lines[:50])
    except Exception as e:
        return f"Error listing elements: {str(e)[:200]}"


@agent.tool
async def click(ctx: RunContext[Deps], text_or_selector: str) -> str:
    """Click an element by its visible text (tried first) or a CSS selector (fallback).
    Use a CSS selector like 'button.submit' or '#search-btn' when multiple elements share the same text."""
    print(f"  -> click({text_or_selector!r})")
    try:
        locator = ctx.deps.page.get_by_text(text_or_selector, exact=False)
        if await locator.count() > 0:
            await locator.first.click(timeout=10000)
            try:
                await ctx.deps.page.wait_for_load_state("domcontentloaded", timeout=5000)
            except Exception:
                pass
            return f"Clicked element with text '{text_or_selector}'"
    except Exception:
        pass
    try:
        await ctx.deps.page.click(text_or_selector, timeout=10000)
        try:
            await ctx.deps.page.wait_for_load_state("domcontentloaded", timeout=5000)
        except Exception:
            pass
        return f"Clicked element: {text_or_selector}"
    except Exception as e:
        return f"Could not click '{text_or_selector}': {str(e)[:200]}"


@agent.tool
async def fill_input(ctx: RunContext[Deps], label_or_selector: str, value: str) -> str:
    """Fill a text input field by its label text (tried first) or CSS selector (fallback).
    Uses Playwright's fill() which sets the value directly. If a submit button stays
    disabled after filling, try type_input() instead — it simulates real keypresses."""
    print(f"  -> fill_input({label_or_selector!r}, {value!r})")
    try:
        locator = ctx.deps.page.get_by_label(label_or_selector)
        if await locator.count() > 0:
            await locator.first.fill(value)
            return f"Filled '{label_or_selector}' with '{value}'"
    except Exception:
        pass
    try:
        await ctx.deps.page.fill(label_or_selector, value)
        return f"Filled '{label_or_selector}' with '{value}'"
    except Exception as e:
        return f"Could not fill '{label_or_selector}': {str(e)[:200]}"


@agent.tool
async def type_input(ctx: RunContext[Deps], label_or_selector: str, value: str) -> str:
    """Type into a text input field character by character, simulating real keypresses.
    Use this instead of fill_input() when a site relies on keydown/keyup/input events
    (e.g. to enable a submit button, trigger autocomplete, or validate as you type)."""
    print(f"  -> type_input({label_or_selector!r}, {value!r})")
    try:
        locator = ctx.deps.page.get_by_label(label_or_selector)
        if await locator.count() > 0:
            await locator.first.click()
            await locator.first.press_sequentially(value, delay=50)
            return f"Typed '{value}' into '{label_or_selector}'"
    except Exception:
        pass
    try:
        await ctx.deps.page.click(label_or_selector)
        await ctx.deps.page.locator(label_or_selector).press_sequentially(value, delay=50)
        return f"Typed '{value}' into '{label_or_selector}'"
    except Exception as e:
        return f"Could not type into '{label_or_selector}': {str(e)[:200]}"


@agent.tool
async def select_dropdown(ctx: RunContext[Deps], label_or_selector: str, option: str) -> str:
    """Select an option from a <select> dropdown by its label text or CSS selector."""
    print(f"  -> select_dropdown({label_or_selector!r}, {option!r})")
    try:
        locator = ctx.deps.page.get_by_label(label_or_selector)
        if await locator.count() > 0:
            await locator.first.select_option(label=option)
            return f"Selected '{option}' from '{label_or_selector}'"
    except Exception:
        pass
    try:
        await ctx.deps.page.select_option(label_or_selector, label=option)
        return f"Selected '{option}' from '{label_or_selector}'"
    except Exception as e:
        return f"Could not select from '{label_or_selector}': {str(e)[:200]}"


@agent.tool
async def get_network_requests(ctx: RunContext[Deps]) -> str:
    """Return captured XHR/fetch API requests (JSON only). Useful for discovering hidden APIs behind JS-heavy sites."""
    print(f"  -> get_network_requests()")
    if not captured_requests:
        return "No API requests captured yet. Navigate to a page and interact with it first."
    lines = []
    for r in captured_requests[-20:]:
        line = f"[{r['method']}] {r['status']} | ~{r['record_count']} records | {r['url'][:150]}"
        if r.get('post_data'):
            line += f"\n  POST body: {r['post_data'][:200]}"
        lines.append(line)
    return "\n".join(lines)

@agent.tool
async def wait_for(ctx: RunContext[Deps], selector: str, timeout: int = 10000) -> str:
    """Wait for an element matching a CSS selector to appear or become visible. Use this when
    a page is loading dynamic content, or when a button needs time to become enabled."""
    print(f"  -> wait_for({selector!r}, timeout={timeout})")
    try:
        await ctx.deps.page.wait_for_selector(selector, state="visible", timeout=timeout)
        return f"Element '{selector}' is now visible."
    except Exception as e:
        return f"Timed out waiting for '{selector}': {str(e)[:200]}"

@agent.tool                                                                                                           
async def open_tab(ctx: RunContext[Deps]) -> str:                                                                     
    """Open a new browser tab and switch to it."""                                                                    
    print("  -> open_tab()")                                                                                          
    try:                                                                                                              
        new_page = await ctx.deps.page.context.new_page()
        new_page.on("response", log_api_response)
        ctx.deps.page = new_page
        return f"Opened new tab. You now have {len(ctx.deps.page.context.pages)} tabs."
    except Exception as e:
        return f"Error opening tab: {str(e)[:200]}"


@agent.tool
async def switch_tab(ctx: RunContext[Deps], tab_index: int) -> str:
    """Switch to a different browser tab by index (0-based)."""
    print(f"  -> switch_tab({tab_index})")
    try:
        pages = ctx.deps.page.context.pages
        if tab_index < 0 or tab_index >= len(pages):
            return f"Invalid tab index. You have {len(pages)} tabs (0-{len(pages)-1})."
        ctx.deps.page = pages[tab_index]
        title = await ctx.deps.page.title()
        return f"Switched to tab {tab_index}: {title} ({ctx.deps.page.url})"
    except Exception as e:
        return f"Error switching tabs: {str(e)[:200]}"


@agent.tool
async def list_tabs(ctx: RunContext[Deps]) -> str:
    """List all open browser tabs."""
    print("  -> list_tabs()")
    try:
        pages = ctx.deps.page.context.pages
        lines = []
        for i, p in enumerate(pages):
            title = await p.title()
            active = " (active)" if p == ctx.deps.page else ""
            lines.append(f"[{i}] {title}{p.url}{active}")
        return "\n".join(lines)
    except Exception as e:
        return f"Error listing tabs: {str(e)[:200]}"


@agent.tool
async def go_back(ctx: RunContext[Deps]) -> str:
    """Navigate back (browser back button)."""
    print("  -> go_back()")
    try:
        await ctx.deps.page.go_back(timeout=10000)
        title = await ctx.deps.page.title()
        return f"Went back to: {title}"
    except Exception as e:
        return f"Error going back: {str(e)[:200]}"


@agent.tool
async def run_javascript(ctx: RunContext[Deps], code: str) -> str:
    """Execute JavaScript on the page and return the result. Use for checking element
    state (disabled, hidden), triggering events, or reading values not in the DOM tree."""
    print(f"  -> run_javascript({code[:80]})")
    try:
        result = await ctx.deps.page.evaluate(code)
        return str(result)[:2000]
    except Exception as e:
        return f"JS error: {str(e)[:200]}"

print("Agent ready — navigate, read_page, scan_page, list_elements, click, fill_input, type_input, select_dropdown, get_network_requests")
Agent ready — navigate, read_page, scan_page, list_elements, click, fill_input, type_input, select_dropdown, get_network_requests
In [7]:
class ChatSession:
    """Multi-turn conversation wrapper."""

    def __init__(self, agent, deps):
        self.agent = agent
        self.deps = deps
        self.history = []

    async def chat(self, message):
        result = await self.agent.run(
            message, deps=self.deps, message_history=self.history
        )
        self.history = result.all_messages()
        return result.output

    def reset(self):
        self.history = []
        captured_requests.clear()
        print("Conversation reset!")


session = ChatSession(agent, Deps(page=page))
print("Session ready. Start chatting!")
Session ready. Start chatting!

Try it!

Describe a website you want to scrape. The agent will explore it and write a Playwright script for you.

Example prompts:

Go to https://webapps1.chicago.gov/buildingrecords/home and figure out how to search for building permits at 121 N La Salle St in Chicago and save them as a CSV

Explore https://analytics.opensyllabus.org/record/works and write a scraper for the top 100 books

In [8]:
# You used to have to do this. You don't now! You have a chatbot!

response = await session.chat(
    "Go to https://webapps1.chicago.gov/buildingrecords/home and figure out how to search for building permits at 121 N La Salle St in Chicago and save them as a CSV"
)
print(response)
  -> navigate(https://webapps1.chicago.gov/buildingrecords/home)
  -> read_page('')
  -> run_javascript(document.querySelector("#rbnAgreement1").click())
  -> click('#submit')
  -> read_page('')
  -> fill_input('#fullAddress', '121 N La Salle St')
  -> click('#submit')
  -> wait_for('#submit:not([disabled])', timeout=15000)
  -> type_input('#fullAddress', '121 N La Salle St')
  -> wait_for('#submit:not([disabled])', timeout=15000)
  -> click('#submit')
  -> read_page('')
  -> navigate(https://webapps1.chicago.gov/buildingrecords/home)
  -> run_javascript(document.querySelector("#rbnAgreement1").click())
  -> click('#submit')
  -> type_input('#fullAddress', '121 North LaSalle Street')
  -> wait_for('#submit:not([disabled])', timeout=15000)
  -> click('#submit')
  -> read_page('')
  -> scan_page()
To get the data, I'll write a script that navigates to the page, accepts the agreement, enters the address "121 North LaSalle Street", and then extracts the permit data from the `resultstable_permits` table.

Here's the script:

```python
import pandas as pd
from playwright.async_api import async_playwright
from bs4 import BeautifulSoup

async def scrape():
    async with async_playwright() as p:
        browser = await p.firefox.launch()
        page = await browser.new_page()

        print("Navigating to https://webapps1.chicago.gov/buildingrecords/home")
        await page.goto("https://webapps1.chicago.gov/buildingrecords/home", timeout=60000)

        print("Accepting agreement...")
        await page.click("#rbnAgreement1", timeout=10000)
        await page.click("#submit", timeout=10000)
        await page.wait_for_load_state("networkidle")

        print("Entering address: 121 North LaSalle Street")
        await page.type("#fullAddress", "121 North LaSalle Street", timeout=10000)

        print("Waiting for submit button to be enabled...")
        await page.wait_for_selector("#submit:not([disabled])", timeout=15000)

        print("Clicking submit button...")
        await page.click("#submit", timeout=10000)
        await page.wait_for_load_state("networkidle")

        print("Extracting data from the permit table...")
        html_content = await page.content()
        soup = BeautifulSoup(html_content, 'html.parser')

        permit_table = soup.find('table', id='resultstable_permits')
        if permit_table:
            headers = [th.get_text(strip=True) for th in permit_table.find_all('th')]
            rows = []
            for tr in permit_table.find_all('tr')[1:]: # Skip header row
                cells = [td.get_text(strip=True) for td in tr.find_all('td')]
                if len(cells) == len(headers):
                    rows.append(cells)
            
            df = pd.DataFrame(rows, columns=headers)
            print("Permit data extracted:")
            print(df.head())

            # Save to CSV
            df.to_csv("building_permits_121_n_la_salle_st.csv", index=False)
            print("\nData saved to building_permits_121_n_la_salle_st.csv")
        else:
            print("Permit table not found.")

        await browser.close()

await scrape()
```

In [9]:
# response = await session.chat("Now write me the complete Playwright script.")
# print(response)
In [10]:
import io                                                                                                             
import contextlib
import panel as pn                                                                                                    
pn.extension()                                                                                                      
pn.config.theme = "dark"                                                                                              

async def callback(contents, user, instance):
    if contents.strip().lower() in ("reset", "/reset"):
        session.reset()
        return "Conversation reset! Start fresh."

    log = io.StringIO()
    try:
        with contextlib.redirect_stdout(log):
            response = await session.chat(contents)
    except Exception as e:
        return f"**Error:** `{type(e).__name__}: {e}`"

    tool_log = log.getvalue().strip()
    if tool_log:
        instance.send(f"```\n{tool_log}\n```", user="Tools", respond=False)

    return response

chat = pn.chat.ChatInterface(
    callback=callback,
    show_rerun=False,
    show_undo=False,
)
chat.send(
    "Ready! Describe a site you want to scrape. Type **reset** to start over.",
    user="Agent",
    respond=False,
)
chat
Out[10]:

Tips

Start a new conversation: session.reset() — clears chat history and captured API requests.

Switch models: Change the model string in the agent definition above:

  • google-gla:gemini-2.5-flash — free, fast, good enough for most sites
  • google-gla:gemini-2.5-pro — free tier available, better at complex multi-step sites

Check captured APIs: Run captured_requests in any cell to see what JSON APIs the agent discovered.

If the agent gets stuck: Try being more specific — "click the Search button", "look at the table in div.results" — or session.reset() and try a different approach. It's very stupid so it definitely won't work perfectly.