Skip to main content

How to build automated data extraction pipelines for machine learning

Explore the web archive data sources, how to extract data with them and the different use cases AI training data

It’s easy to build a data extraction pipeline that works for a while. The hard part is building a pipeline that doesn’t break when things start to shift. 

Some machine learning (ML) pipelines fail not because the model is incorrect, but because the data feeding it is flawed. APIs throttle. Schema structures change. IPs get blocked. Jobs fail quietly. And before you notice, corrupted inputs are degrading your model’s performance.

The real challenge is building a resilient automated pipeline that can adapt to the chaos of the real web. That means modular extractors, validation layers, observability and retry logic baked in from the start.

In this guide, you’ll learn how to design data pipelines that not only work once but continue to deliver high-quality data for your ML workflows even when individual components break. The pipeline remains intact, the system recovers and the data continues to flow.

What are automated data extraction pipelines? 

An automated data extraction pipeline is a modular system designed to reliably collect, process and deliver web or API-sourced data for downstream applications like ML training, analytics or dashboards.

Unlike one-time scrapers, these pipelines:

  • Extract data at set intervals or based on triggers 
  • Transform and validate incoming data in batch or near-real-time using structured logic
  • Store or route outputs into systems like PostgreSQL, object storage or vector databases for indexing and retrieval

For example, a weekly fashion trends pipeline might:

  • Scrape data or call APIs every 24 hours using scheduled jobs (like Prefect, Dagster or Airflow)
  • Run the data through a validation and cleaning service (e.g., FastAPI or Pydantic models)
  • Store structured data in PostgreSQL and embed rich metadata in Pinecone for retrieval

Automation reduces failure points, ensures consistency and allows for versioned, reproducible inputs, which are key requirements for ML models in production.

Why most data extraction pipelines break 

Before we build a solid one, let’s talk about why most don’t last:

  • Tight coupling: When your scraper is hardcoded to a specific page layout or DOM structure. If one tag changes, like a renamed class or reordered div, everything breaks.
  • No observability: Without logs or monitoring tools, it’s impossible to tell if a job ran successfully, how long it took or where it failed. You won’t know if your data is delayed, malformed or missing until it’s too late.
  • Lack of retry logic: A single timeout or server hiccup can bring down the whole process. Without catching and retrying failures, you’ll lose data without even knowing it.
  • Scalability ignored: A pipeline that works fine with 1,000 rows might choke on 100,000 rows. Without memory management, batching or streaming, you’re one large dataset away from an Out-of-Memory error.
  • Missing fail-safes: No validation steps, no alerts when things go wrong and no automatic fallbacks. You won’t notice misaligned data until your model produces unexpected results or fails to perform as expected.

Anatomy of a modern data pipeline 

A modern data extraction pipeline is one that’s built to handle change and scale without breaking your entire workflow. Here’s an overview of what it should look like:

  • Modular design: Each part of the pipeline, extraction, parsing, validation and delivery, is a self-contained unit. If one part fails, the others don’t collapse with it.
  • Flexible selectors over fixed paths: Rather than relying on static element paths, the scraper uses XPath heuristics, semantic cues and AI-driven parsing to stay resilient as layouts evolve.
  • Built-in observability: From job start to finish, the pipeline tracks every request, response and transformation. If something breaks, you know exactly where and why.
  • Smart retry logic: Failed requests don’t kill the job. They trigger intelligent retries with exponential backoff, rotating proxies or fallback methods.
  • Scalable infrastructure: Whether you’re pulling 100 rows or 10 million, the pipeline spins up distributed workers to keep pace, with minimal stress. 
  • Validation and alerts: The pipeline doesn’t just assume success. It checks data against expected formats or thresholds. 

Tools for building automated data extraction pipelines (for ML workflows)

Here’s the tech stack for our extraction pipeline:

ToolWhat is it?Why did we use it?
Pydantic Schema and data validation library for PythonDefines strict, reusable data schemas to catch malformed records before they enter the ML pipeline
PlaywrightHeadless browser automation for dynamic sitesExtracts data from JavaScript-rendered pages (e.g., product listings) like a real user
ScraperAPIAPI for scraping with built-in proxy rotation, CAPTCHA handling and headersHelps extract data from sites with anti-bot protection, without managing infrastructure
DagsterData orchestration framework for building and monitoring jobsSchedules and connects all pipeline stages with failure hooks and observability
Parquet + JSONData formats used for storage and model inputStores clean, deduplicated datasets in formats optimized for ML and debugging

You can easily replace any part of this pipeline with your preferred tools. For scraping, use Bright Data for AI-focused parsing or Apify for social media – in fact, both support hybrid browser and API-based extractors.. Switch validation libraries, from Pydantic to something like Cerberus or Marshmallow. Prefer Airflow or Prefect over Dagster? Go for it.

What matters isn’t the specific stack, it’s the architecture: Modular extractors, strict data contracts, resilient scheduling and observability at every stage.

Once you adopt that mindset, you can plug in whatever tools best fit your team, infrastructure or use case. Let’s dive into the main tutorial.

How to build a modern automated data extraction pipeline

We’ll walk you through how to build a pipeline that scrapes product data, like name, price and availability, from Amazon and delivers it in a clean, ML-ready format for a price prediction model.

Why Amazon? It’s a high-traffic, well-structured site that serves as a solid representation of how most modern e-commerce websites are built. Once this pipeline is working reliably with Amazon, you can easily extend it to scrape data from multiple retailers in the future or a different ML use case entirely. 

Step 1: Define data requirements for your ML use case

Before you touch code, you need to clearly define what kind of data your ML model actually needs, how often it needs it, what format and quality it expects and what systems and decisions depend on the data. 

Our test use case involves predicting future product prices based on historical data, merchant behavior and market signals. So our requirements will look like this: 

RequirementDescription 
DomainsE-commerce (e.g., Amazon, eBay, Footlocker)
Data sourcesPublic product pages, APIs if available
Target fieldsproduct_id, product_name, price, currency, availability, merchant, timestamp
Update frequencyEvery six hours (new prices & availability change frequently)
Min records/run10,000 product records per scheduled job
Output formatsParquet (for ML training & analytics), JSON (for retrieval/vector search), CSV (for prototyping/debugging)
Quality thresholds≥ 80% completeness, ≤ 20% null rate, Must pass schema validation (ProductData model)
Target site (current)amazon.com
Monitoring metricsExtraction success rate (%)Total records extractedInvalid recordsSchema drift detection
Alert triggersSuccess rate < 90%Total records < 10,000Invalid record rate > 20%No output file generated within the expected window

Your data requirements serve as a source of truth that shapes what you build, how you monitor it and how you know it’s working.

Step 2: Define the product data schema (data contract)

Once you know what data you need and where to get it, define a strict schema for it. This prevents malformed or missing data from slipping into your ML workflow.

We’ll use Pydantic to define a reusable data contract that all extractors must return. To create the data contract:

  1. Make a directory for the entire project. I called mine ml_data_pipeline.
mkdir ml_data_pipeline
cd ml_data_pipeline
  1. Inside your directory, create a schema folder and a product_data.py file to store your data contract.  
  2. Within your product data file, store all your data schemas for different types of data, like merchant_data or review_data. This keeps your schemas modular, readable and reusable across multiple pipelines.

For this tutorial, we’re defining a product_data.py because we only need product-related data for now. 

mkdir schema
touch schema/product_data.py
  1. In product_data.py, write a ProductData schema using Pydantic

Pydantic is a data validation library for Python. It’s used to define your data schema and offers built-in type validation, error handling and clean serialization, making your pipeline more reliable and maintainable. 

It makes sure all incoming data matches expected formats, catching issues before they corrupt your ML workflow. It also integrates well with tools like Prefect and Dagster, making it ideal for modern, production-grade pipelines.

class ProductData(BaseModel):
    product_id: str
    product_name: str
    price: float
    currency: str = “USD”
    availability: bool
    merchant: str
    timestamp: datetime = datetime.now(timezone.utc)

    class Config:
        extra = “allow”

Here we’re creating a model called ProductData that inherits from BaseModel. This gives us automatic validation, type checking and serialization. Think of it as a contract: Every product record needs to follow this structure. Fields we’re capturing:

product_idUnique identifier for the product (string)
product_nameThe product title
priceFloat value, this gets cast/validated automatically
currencyDefaults to “USD” unless we specify otherwise
availabilityBoolean (e.g., in stock = True)
merchantThe seller or brand name
timestampCaptures the time the record was created or scraped. We default it to the current UTC time.

However, we are also setting class Config: extra to “allow”. This instructs Pydantic not to raise an error if extra fields appear in the data. It just lets them pass through. That’s useful if Amazon changes its structure or adds metadata. We don’t lose the data, and the pipeline doesn’t crash.

  1. Create a test_schema.py file at the root of your project to simulate using the schema.

To make sure our schema actually works the way we expect, we wrote a simple test script using test_schema.py. All it does is take a sample product dictionary (like the kind you’d scrape from Amazon), validate it against the ProductData schema and print it out in dictionary format.

from schema.product_data import ProductData
data = {
    “product_id”: “SKU12345”,
    “product_name”: “Nike Air Max”,
    “price”: 129.99,
    “availability”: True,
    “merchant”: “Footlocker”
}

validated = ProductData(**data)
print(validated.dict())

Pydantic instantly checks that:

  • Price is a float
  • Availability is a boolean
  • Required fields like product_name and product_id are present

If anything is incorrect, such as passing a string for a price or omitting a required field, you’ll receive a clear, structured validation error. That means when Amazon changes its structure (like swapping class names or removing an element), your pipeline doesn’t just crash, or worse, pass through silent, insufficient data. You’ll catch the problem at the validation step and can log it, raise an alert or route it to a fallback.

You can pair Pydantic with observability tools to know when inputs shift. By failing fast, you prevent broken or incomplete data from polluting your storage, model features or downstream tasks.

Step 3: Build modular extractors for any site

To make the pipeline modular and source-agnostic, we start by defining a shared interface that all extractors must follow:

  1. In your root folder, create the extractor directory and base file:
cd ml_data_pipepline
mkdir extractor
touch base.py
  1. Inside base.py, add the following:
from abc import ABC, abstractmethod
from typing import List, Dict, Any
from datetime import datetime, timezone

These imports allow us to define an abstract base class, specify input/output types and generate UTC timestamps for extracted data.

  1. Define the BaseExtractor class. This is the base class from which all extractors will inherit.
class BaseExtractor(ABC):
  1. Add the extract() method signature. This method creates a template for extracting data from a single URL. 
    @abstractmethod
    async def extract(self, url: str, config: Dict[str, Any]) -> Dict[str, Any]:
        pass
  1. Add the extract_batch() method signature. This method creates a template for extracting data from multiple URLs in one run. 
    @abstractmethod
    async def extract_batch(
        self, urls: List[str], config: Dict[str, Any]
    ) -> List[Dict[str, Any]]:
        pass
  1. Create a _safe_result() utility method.
Add the _safe_result() Utility Method
    def _safe_result(self, data=None, error=None):
        if data is not None and “timestamp” not in data:
            data[“timestamp”] = datetime.now(timezone.utc).isoformat()
        return {
            “success”: data is not None,
            “data”: data,
            “error”: str(error) if error else None,
        }

This helper method wraps all extractor responses in a consistent format. If the data is valid, it adds a timestamp (if missing). If an error occurs, it captures the exception without crashing the pipeline.

Step 4: Implement extractors for different use cases

Now that you’ve defined a shared interface (BaseExtractor), the next step is to create real extractors that follow that contract. Each one implements the same two methods — extract() and extract_batch() — but uses a different strategy based on the site’s structure and behavior.

The benefit? Your pipeline stays modular. You can switch between scraping methods — headless browsers, proxy-based services, static HTML fetchers or mock extractors — without touching any downstream logic.

  1. In a extractor/playwright_extractor.py file, create a PlaywrightExtractor for dynamic, JavaScript pages.

If a site loads its content dynamically with JavaScript, static HTTP requests won’t cut it. That’s where Playwright comes in. Playwright emulates real browser behavior, allowing you to interact with the page as if you were a user. This extractor uses flexible strategies like XPath heuristics, semantic tags and content-aware fallbacks. That way, it can adapt as site layouts evolve.

from .base import BaseExtractor
from typing import Dict, List
from playwright.async_api import async_playwright
from datetime import datetime, timezone
import re


class PlaywrightExtractor(BaseExtractor):
    async def extract(self, url: str, config: Dict) -> Dict:
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch(headless=True)
                page = await browser.new_page()
                await page.goto(url, timeout=30000)
                html = await page.content()
                merchant = url.split(“/”)[2]
                product_id = None
                product_name = None
                price = None
                currency = “USD”
                availability = None

                # — Amazon selectors —
                if “amazon.” in merchant:
                    # Product ID (ASIN)
                    asin_match = re.search(r”/dp/([A-Z0-9]{10})”, url)
                    product_id = asin_match.group(1) if asin_match else None
                    if not product_id:
                        asin_tag = await page.query_selector(“input#ASIN”)
                        if asin_tag:
                            product_id = await asin_tag.get_attribute(“value”)
                    # Product name
                    title_tag = await page.query_selector(“#productTitle”)
                    if title_tag:
                        product_name = (await title_tag.inner_text()).strip()
                    # Price
                    price_tag = await page.query_selector(“.a-offscreen”)
                    if price_tag:
                        price_text = (await price_tag.inner_text()).strip()
                        currency = “USD”
                        if price_text.startswith(“$”):
                            currency = “USD”
                        elif price_text.startswith(“£”):
                            currency = “GBP”
                        elif price_text.startswith(“€”):
                            currency = “EUR”
                        price = float(re.sub(r”[^\d.]”, “”, price_text))
                    # Availability
                    avail_tag = await page.query_selector(“#availability”)
                    if avail_tag:
                        avail_text = (await avail_tag.inner_text()).lower()
                        availability = (
                            “in stock” in avail_text or “available” in avail_text
                        )

                # — Jumia Example —
                elif “jumia.” in merchant:
                    # Use appropriate selectors for Jumia…
                    title_tag = await page.query_selector(“h1.-fs20”)
                    if title_tag:
                        product_name = (await title_tag.inner_text()).strip()
                    price_tag = await page.query_selector(“span.-b”)
                    if price_tag:
                        price_text = (await price_tag.inner_text()).strip()
                        currency = “NGN”
                        price = float(re.sub(r”[^\d.]”, “”, price_text))
                    avail_tag = await page.query_selector(“div.-df.-i-ctr.-gy-5”)
                    if avail_tag:
                        avail_text = (await avail_tag.inner_text()).lower()
                        availability = (
                            “in stock” in avail_text or “available” in avail_text
                        )

                # — Generic fallback —
                else:
                    # Fallback to generic selectors for title, price, etc.
                    h1 = await page.query_selector(“h1”)
                    product_name = (await h1.inner_text()).strip() if h1 else None
                    price_sel = await page.query_selector(“.price, [class*=’price’]”)
                    price_text = (
                        (await price_sel.inner_text()).strip() if price_sel else None
                    )
                    if price_text:
                        price = float(re.sub(r”[^\d.]”, “”, price_text))
                    # Try basic keywords for availability
                    html_text = html.lower()
                    if (
                        “out of stock” in html_text
                        or “currently unavailable” in html_text
                    ):
                        availability = False
                    elif “in stock” in html_text or “available” in html_text:
                        availability = True

                    # Product ID fallback: last path part or query param
                    path = url.split(“/”)
                    product_id = path[-1][:10] if path else None

                # Timestamp
                timestamp = datetime.now(timezone.utc).isoformat()

                await browser.close()

                return self._safe_result(
                    data={
                        “product_id”: product_id,
                        “product_name”: product_name,
                        “price”: price,
                        “currency”: currency,
                        “availability”: availability,
                        “merchant”: merchant,
                        “timestamp”: timestamp,
                    }
                )
        except Exception as e:
            return self._safe_result(data=None, error=e)

    async def extract_batch(self, urls: List[str], config: Dict) -> List[Dict]:
        return [await self.extract(u, config) for u in urls]
  1. Create another file: extractor/scraperapi_extractor.py: ScraperAPI programmatically handles proxy rotation, request headers and access control challenges, improving reliability when accessing content at scale. It’s well-suited for high-volume scraping, especially when direct access is inconsistent or blocked.
from .base import BaseExtractor
from typing import Dict, List
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timezone
import re
from urllib.parse import urlparse


class ScraperAPIExtractor(BaseExtractor):
    def __init__(self):
        self.base_url = “https://api.scraperapi.com/”
        self.api_key = “(Replace with your own API key, ideally from a secure config source or environment variable)”

    async def extract(self, url: str, config: Dict) -> Dict:
        payload = {“api_key”: self.api_key, “url”: url}
        try:
            r = requests.get(self.base_url, params=payload, timeout=30)
            r.raise_for_status()
            html = r.text
            soup = BeautifulSoup(html, “html.parser”)
            domain = urlparse(url).netloc.lower()

            # Defaults
            product_id = None
            product_name = None
            price = None
            currency = “USD”
            availability = False
            merchant = domain.split(“.”)[-2].capitalize()

            # ———————- AMAZON ———————-
            if “amazon” in domain:
                # Product ID (ASIN)
                asin_tag = soup.find(“input”, {“id”: “ASIN”})
                if asin_tag:
                    product_id = asin_tag.get(“value”, None)
                if not product_id:
                    match = re.search(r”/dp/([A-Z0-9]{10})”, url)
                    if match:
                        product_id = match.group(1)
                # Product Name
                title_tag = soup.find(id=”productTitle”)
                product_name = title_tag.text.strip() if title_tag else None
                # Price
                price_tag = soup.find(“span”, {“class”: “a-offscreen”})
                if price_tag:
                    price_text = price_tag.text.strip()
                    if price_text.startswith(“$”):
                        currency = “USD”
                    elif price_text.startswith(“£”):
                        currency = “GBP”
                    elif price_text.startswith(“€”):
                        currency = “EUR”
                    price = float(re.sub(r”[^\d.]”, “”, price_text))
                # Availability
                availability = self._extract_availability(soup)
                # Merchant
                merchant_tag = soup.find(“a”, id=”bylineInfo”)
                if merchant_tag:
                    merchant = merchant_tag.text.strip()
            # ———————- JUMIA ———————-
            elif “jumia” in domain:
                # Product ID (SKU)
                sku_tag = soup.find(“input”, {“name”: “sku”})
                if sku_tag:
                    product_id = sku_tag.get(“value”, None)
                # Product Name
                name_tag = soup.find(“h1”, {“class”: re.compile(“prd-title”, re.I)})
                product_name = name_tag.text.strip() if name_tag else None
                # Price
                price_tag = soup.find(“span”, {“class”: re.compile(“price”, re.I)})
                if price_tag:
                    price_text = price_tag.text.strip()
                    price = float(re.sub(r”[^\d.]”, “”, price_text))
                    currency = “NGN”
                # Availability
                # Jumia hides “out of stock” in product-unavailable, else it’s available
                if soup.find(“div”, {“class”: re.compile(“product-unavailable”, re.I)}):
                    availability = False
                else:
                    availability = True
                # Merchant
                merchant_tag = soup.find(
                    “a”, {“class”: re.compile(“seller-name”, re.I)}
                )
                if merchant_tag:
                    merchant = merchant_tag.text.strip()
            # ———————- WALMART ———————-
            elif “walmart” in domain:
                # Product ID (ID in URL)
                match = re.search(r”/(\d{6,})”, url)
                if match:
                    product_id = match.group(1)
                # Product Name
                name_tag = soup.find(“h1”)
                product_name = name_tag.text.strip() if name_tag else None
                # Price
                price_tag = soup.find(“span”, {“class”: re.compile(“price”, re.I)})
                if price_tag:
                    price_text = price_tag.text.strip()
                    price = float(re.sub(r”[^\d.]”, “”, price_text))
                    currency = “USD”
                # Availability
                availability = self._extract_availability(
                    soup,
                    in_stock_keywords=[“in stock”, “pickup today”, “delivery”],
                    out_stock_keywords=[“out of stock”, “not available”],
                )
                # Merchant
                merchant = “Walmart”
            # ———————- GENERIC (fallback) ———————-
            else:
                # Try to find a product name by common selectors
                name_tag = soup.find(
                    [“h1”, “h2”], {“class”: re.compile(“(title|name)”, re.I)}
                )
                product_name = name_tag.text.strip() if name_tag else None
                # Price
                price_tag = soup.find(
                    “span”, {“class”: re.compile(“(price|amount)”, re.I)}
                )
                if price_tag:
                    price_text = price_tag.text.strip()
                    price = float(re.sub(r”[^\d.]”, “”, price_text))
                # Merchant
                merchant = domain.split(“.”)[-2].capitalize()
                # Availability (generic keyword scan)
                availability = self._extract_availability(soup)

            # Timestamp
            timestamp = datetime.now(timezone.utc).isoformat()

            return self._safe_result(
                data={
                    “product_id”: product_id,
                    “product_name”: product_name,
                    “price”: price,
                    “currency”: currency,
                    “availability”: availability,
                    “merchant”: merchant,
                    “timestamp”: timestamp,
                }
            )
        except Exception as e:
            return self._safe_result(data=None, error=e)

    def _extract_availability(
        self, soup, in_stock_keywords=None, out_stock_keywords=None
    ):
        “””Universal availability extraction with keyword scanning.”””
        if in_stock_keywords is None:
            in_stock_keywords = [
                “in stock”,
                “available to ship”,
                “usually ships”,
                “order soon”,
                “pickup today”,
                “delivery”,
                “available”,
            ]
        if out_stock_keywords is None:
            out_stock_keywords = [
                “out of stock”,
                “currently unavailable”,
                “temporarily out of stock”,
                “not available”,
            ]
        # Try id=”availability”
        avail_tag = soup.find(attrs={“id”: “availability”})
        avail_text = avail_tag.get_text(strip=True).lower() if avail_tag else “”
        # Try class
        if not avail_text:
            avail_tag = soup.find(attrs={“class”: re.compile(“availability”, re.I)})
            if avail_tag:
                avail_text = avail_tag.get_text(strip=True).lower()
        # Fallback: keyword scan
        if not avail_text:
            page_text = soup.get_text(separator=” “, strip=True).lower()
            for txt in in_stock_keywords + out_stock_keywords:
                if txt in page_text:
                    avail_text = txt
                    break
        if avail_text:
            if any(k in avail_text for k in in_stock_keywords):
                return True
            if any(k in avail_text for k in out_stock_keywords):
                return False
        return None  # Unknown

    async def extract_batch(self, urls: List[str], config: Dict) -> List[Dict]:
        return [await self.extract(url, config) for url in urls]

Make sure your API key is retrieved from os.environ or passed via configuration.

  1. Create another file extractor/mock_extractor.py: For local testing and debugging. This is useful when developing or testing other parts of the pipeline without making real requests.
from .base import BaseExtractor
from typing import Dict, List
import asyncio


class MockExtractor(BaseExtractor):
    async def extract(self, url: str, config: Dict) -> Dict:
        await asyncio.sleep(0.1)  # simulate network latency
        return self._safe_result(
            data={
                “product_id”: “SKU123”,
                “product_name”: “Test Shoe”,
                “price”: 99.99,
                “currency”: “USD”,
                “availability”: True,
                “merchant”: “MockStore”,
            }
        )

    async def extract_batch(self, urls: List[str], config: Dict) -> List[Dict]:
        return [await self.extract(url, config) for url in urls]

The mock extractor returns dummy data instantly using the BaseExtractor interface. This keeps your pipeline fast and predictable during development.

Step 5: Route extractors with a factory

Create a factory.py file inside your extractor folder. This is where you centralize routing logic. Instead of hardcoding which extractor to use in your main pipeline, you create a small factory function called get_extractor that picks which scraping backend to use, based on the name you pass in.

It supports three options:

  • “mock” – for testing locally with fake data (MockExtractor)
  • “scraperapi” – to use the ScraperAPI backend (good for sites that block direct requests)
  • “playwright” – to run a headless browser and scrape JavaScript-heavy pages
import os
from .mock_extractor import MockExtractor
from .scraperapi_extractor import ScraperAPIExtractor
from .playwright_extractor import PlaywrightExtractor


def get_extractor(name: str = “mock”):
    if name == “scraperapi”:
        return ScraperAPIExtractor()
    elif name == “playwright”:
        return PlaywrightExtractor()
    elif name == “mock”:
        return MockExtractor()
    else:
        raise ValueError(f”Unknown extractor: {name}”)

Now your main job runner just does:

extractor = get_extractor(url)
data = await extractor.extract(url, config)

Step 6: Clean and validate the data 

In the root directory, create a validate.py file where you can clean and validate the data your scrapers are extracting against the data schema you defined in step two.

def clean_record(raw: dict) -> dict:
  try:
      validated = ProductData(**raw)
      return {“success”: True, “data”: validated.dict()}
  except Exception as e:
      return {“success”: False, “error”: str(e), “raw”: raw}


def validate_batch(records: list) -> list:
  return [
      clean_record(record.get(“data”, {}))
      for record in records
      if record.get(“success”)

Here’s what this code does: 

  • We’re importing our product schema. This is the ProductData model we defined earlier with Pydantic. It serves as our contract; every record must match it.
  • clean_record() validates a single record. It tries to create a ProductData instance from the raw data. If it passes, we return it as a clean dict. If it fails, we return the raw record along with the error, so we don’t just silently drop anything.
  • validate_batch() runs the validation across all extractor outputs. We loop through every record where extraction was successful, grab the raw data and validate it using clean_record().
  • The result is a list of structured outputs: Clean data or flagged errors.

This helps us catch bad inputs before they pollute the pipeline. Whether it’s a missing field, a type mismatch or a layout shift upstream, this layer protects everything downstream, from storage to model training.

Step 7: Convert to ML formats 

The format_for_ml function takes your raw scraped product data and transforms it into a clean, ML-ready DataFrame. It ensures every record has a timestamp, calculates how old each entry is, converts availability to a numeric format for modeling and removes unnecessary columns, so your data is perfectly structured for training machine learning models or running analytics.

def format_for_ml(data: list[dict]) -> pd.DataFrame:
  df = pd.DataFrame(data)
  if “timestamp” not in df.columns:
      df[“timestamp”] = datetime.now(timezone.utc)
  df[“timestamp”] = pd.to_datetime(df[“timestamp”], errors=”coerce”)
  df[“days_since_seen”] = (datetime.now(timezone.utc) – df[“timestamp”]).dt.days
  df[“in_stock”] = df[“availability”].astype(int)
  df.drop(columns=[“availability”], inplace=True, errors=”ignore”)
  return df

Step 8: Orchestrate with Dagster

Dagster is a data orchestration platform that lets you define each step of your data process as an op (operation). These are connected in a job (a directed graph), which ensures your data is processed in the right order, handles errors and supports powerful logging, testing and scheduling.

In our pipeline:

  • Ops are defined for:
    • Loading URLs to scrape
    • Extracting product data from those URLs
    • Validating and cleaning the extracted data
    • Saving the final output in JSON and Parquet
    • Training an ML model on the processed data
  • The job connects all these ops in sequence.
  • A schedule triggers this job automatically every 6 hours (using cron).
from dagster import op, job, ScheduleDefinition, Definitions, get_dagster_logger, Failure, Output, failure_hook
import os
import asyncio
import pandas as pd

# — For extraction, validation, and formatting —
from ml_formatter import format_for_ml
from extractors.factory import get_extractor
from validator import validate_batch

@op
def load_urls_op(context) -> list:
    path = “urls.txt”
    with open(path) as f:
        urls = [line.strip() for line in f if line.strip()]
    if not urls:
        raise Failure(“No URLs loaded from file!”)
    context.log.info(f”Loaded {len(urls)} URLs.”)
    return urls

@op
def extract_data(context, urls: list, extractor_type=”scraperapi”) -> list:
    extractor = get_extractor(extractor_type)
    results = asyncio.run(extractor.extract_batch(urls, {}))
    if not results or all(r.get(“data”) is None for r in results):
        raise Failure(“All extractions failed.”)
    context.log.info(f”Extracted {len(results)} records.”)
    return results

@op
def validate_data(context, raw_data: list) -> list:
    validated = validate_batch(raw_data)
    n_valid = sum(1 for v in validated if v.get(“success”))
    if n_valid == 0:
        raise Failure(“No valid records after validation!”)
    context.log.info(f”Validated {n_valid} records.”)
    return validated

@op
def save_data(context, validated: list) -> int:
    os.makedirs(“output”, exist_ok=True)
    new_df = format_for_ml([v[“data”] for v in validated if v.get(“success”)])
    json_path = “output/data.json”
    parquet_path = “output/data.parquet”

    if os.path.exists(json_path):
        old_df = pd.read_json(json_path)
        combined_df = pd.concat([old_df, new_df], ignore_index=True)
        if not pd.api.types.is_datetime64_any_dtype(combined_df[“timestamp”]):
            combined_df[“timestamp”] = pd.to_datetime(combined_df[“timestamp”], errors=”coerce”)
        combined_df = combined_df.sort_values(“timestamp”).drop_duplicates(“product_id”, keep=”last”)
    else:
        combined_df = new_df

    combined_df.to_parquet(parquet_path, index=False)
    combined_df.to_json(json_path, orient=”records”, indent=2, date_format=”iso”)
    if len(combined_df) < 10:
        raise Failure(f”ALERT: Only {len(combined_df)} records saved!”)
    context.log.info(f”Saved {len(combined_df)} records.”)
    return len(combined_df)

Step 9: Add monitoring and alerts

No pipeline is reliable without monitoring! Here, we set up proactive failure alerts:

  • If any step fails (e.g., data extraction returns no results or validation finds no suitable records), the pipeline sends an email notification.
  • This uses Dagster’s @failure_hook mechanism.
  • The email contains info about which job, which op, the run ID and the error message.

This makes sure you never “silently” miss a failure; problems are brought to your attention immediately, not weeks later when the business is affected.

import smtplib
from email.mime.text import MIMEText
from dagster import failure_hook

# — Email Config (edit these) —
EMAIL_FROM = “your_email@gmail.com”
EMAIL_TO = “your_destination_email@gmail.com”
EMAIL_PASSWORD = “your_app_password”   # For Gmail, use an “App Password” not your main password
SMTP_SERVER = “smtp.gmail.com”
SMTP_PORT = 587

def send_failure_email(subject, body):
    msg = MIMEText(body)
    msg[“Subject”] = subject
    msg[“From”] = EMAIL_FROM
    msg[“To”] = EMAIL_TO
    try:
        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls()
            server.login(EMAIL_FROM, EMAIL_PASSWORD)
            server.sendmail(EMAIL_FROM, [EMAIL_TO], msg.as_string())
    except Exception as e:
        print(“Email alert failed:”, e)

@failure_hook
def email_failure_hook(context):
    subject = f”Dagster Pipeline FAILURE: {context.job_name}”
    body = (
        f”Job: {context.job_name}\n”
        f”Op: {context.op.name}\n”
        f”Run: {context.run_id}\n”
        f”Error: {context.failure_description}”
    )
    send_failure_email(subject, body)

Step 10: Connect to ML pipeline

The final step of our process is to train a machine learning model on the newly validated, deduplicated data.

This is part of the same orchestrated flow, so you never have stale or out-of-date models.

The ML step:

  • Loads the latest saved data
  • Trains a regression model (RandomForestRegressor) to predict product prices

It also stores the model and training metrics for future reference. If there’s not enough data or a required column is missing, it raises an error, triggering monitoring. This keeps our analytics or recommendation system always up to date.

@op
def train_ml_model(context):
    import pandas as pd
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import mean_squared_error
    import joblib
    import os

    df = pd.read_parquet(“output/data.parquet”)
    if df.empty:
        raise Failure(“No data to train ML model.”)

    features = [“availability”]  # You can add more features as needed
    if not all(f in df.columns for f in features):
        raise Failure(f”Missing feature columns: {features}”)

    X = df[features].astype(float)
    y = df[“price”].astype(float)

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    model = RandomForestRegressor(n_estimators=10, random_state=42)
    model.fit(X_train, y_train)
    predictions = model.predict(X_test)
    mse = mean_squared_error(y_test, predictions)
    context.log.info(f”ML Model trained. Test MSE: {mse:.4f}”)

    os.makedirs(“models”, exist_ok=True)
    joblib.dump(model, “models/price_predictor.joblib”)

    with open(“models/metrics.txt”, “w”) as f:
        f.write(f”Test MSE: {mse:.4f}\n”)

    return {“test_mse”: mse}

Bringing orchestration together: Jobs and schedules

Now we connect everything; data loading, extraction, validation and monitoring into a single Dagster job. We also schedule it to run every 6 hours and attach our failure alert hook.

from dagster import (
    op,
    job,
    get_dagster_logger,
    Failure,
    Output,
    failure_hook,
    ScheduleDefinition,
    Definitions,
)
import pandas as pd
import os
import asyncio
import smtplib
from email.mime.text import MIMEText
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import joblib

# — Your own imports for extraction & ML —
from ml_formatter import format_for_ml
from extractors.factory import get_extractor
from validator import validate_batch

# — Email Config (edit these) —
EMAIL_FROM = “your_email@gmail.com”
EMAIL_TO = “your_destination_email@gmail.com”
EMAIL_PASSWORD = (
    “your_app_password”  # For Gmail, use an “App Password” not your main password
)
SMTP_SERVER = “smtp.gmail.com”
SMTP_PORT = 587


# — Email notification hook —
def send_failure_email(subject, body):
    msg = MIMEText(body)
    msg[“Subject”] = subject
    msg[“From”] = EMAIL_FROM
    msg[“To”] = EMAIL_TO
    try:
        with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
            server.starttls()
            server.login(EMAIL_FROM, EMAIL_PASSWORD)
            server.sendmail(EMAIL_FROM, [EMAIL_TO], msg.as_string())
    except Exception as e:
        print(“Email alert failed:”, e)


@failure_hook
def email_failure_hook(context):
    subject = f”Dagster Pipeline FAILURE: {context.job_name}”
    body = (
        f”Job: {context.job_name}\n”
        f”Op: {context.op.name}\n”
        f”Run: {context.run_id}\n”
        f”Error: {context.failure_description}”
    )
    send_failure_email(subject, body)


# — Ops —


@op
def load_urls_op(context) -> list:
    path = “urls.txt”
    with open(path) as f:
        urls = [line.strip() for line in f if line.strip()]
    if not urls:
        raise Failure(“No URLs loaded from file!”)
    context.log.info(f”Loaded {len(urls)} URLs.”)
    return urls


@op
def extract_data(context, urls: list, extractor_type=”scraperapi”) -> list:
    extractor = get_extractor(extractor_type)
    results = asyncio.run(extractor.extract_batch(urls, {}))
    if not results or all(r.get(“data”) is None for r in results):
        raise Failure(“All extractions failed.”)
    context.log.info(f”Extracted {len(results)} records.”)
    return results


@op
def validate_data(context, raw_data: list) -> list:
    validated = validate_batch(raw_data)
    n_valid = sum(1 for v in validated if v.get(“success”))
    if n_valid == 0:
        raise Failure(“No valid records after validation!”)
    context.log.info(f”Validated {n_valid} records.”)
    return validated


@op
def save_data(context, validated: list) -> int:
    os.makedirs(“output”, exist_ok=True)
    new_df = format_for_ml([v[“data”] for v in validated if v.get(“success”)])
    json_path = “output/data.json”
    parquet_path = “output/data.parquet”

    if os.path.exists(json_path):
        old_df = pd.read_json(json_path)
        combined_df = pd.concat([old_df, new_df], ignore_index=True)
        if not pd.api.types.is_datetime64_any_dtype(combined_df[“timestamp”]):
            combined_df[“timestamp”] = pd.to_datetime(
                combined_df[“timestamp”], errors=”coerce”
            )
        combined_df = combined_df.sort_values(“timestamp”).drop_duplicates(
            “product_id”, keep=”last”
        )
    else:
        combined_df = new_df

    combined_df.to_parquet(parquet_path, index=False)
    combined_df.to_json(json_path, orient=”records”, indent=2, date_format=”iso”)
    if len(combined_df) < 10:
        raise Failure(f”ALERT: Only {len(combined_df)} records saved!”)
    context.log.info(f”Saved {len(combined_df)} records.”)
    return len(combined_df)


@op
def train_ml_model(context):

    df = pd.read_parquet(“output/data.parquet”)
    if df.empty:
        raise Failure(“No data to train ML model.”)

    # Minimal preprocessing
    features = []
    if “price” in df.columns and “availability” in df.columns:
        features = [“availability”]  # You can add more features here
        X = df[features].astype(float)
        y = df[“price”].astype(float)
    else:
        raise Failure(“Required columns (‘price’, ‘availability’) not found in data.”)

    # Train/Test Split
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # Model
    model = RandomForestRegressor(n_estimators=10, random_state=42)
    model.fit(X_train, y_train)
    predictions = model.predict(X_test)
    mse = mean_squared_error(y_test, predictions)
    context.log.info(f”ML Model trained. Test MSE: {mse:.4f}”)

    # Save the model to file for future prediction runs
    os.makedirs(“models”, exist_ok=True)
    joblib.dump(model, “models/price_predictor.joblib”)

    # Optionally, save metrics for monitoring
    with open(“models/metrics.txt”, “w”) as f:
        f.write(f”Test MSE: {mse:.4f}\n”)

    return {“test_mse”: mse}


# — Job (connect all) —


@job(hooks={email_failure_hook})
def ecommerce_monitoring_job():
    urls = load_urls_op()
    raw = extract_data(urls)
    validated = validate_data(raw)
    n = save_data(validated)
    train_ml_model()


ecommerce_monitoring_schedule = ScheduleDefinition(
    job=ecommerce_monitoring_job,
    cron_schedule=”0 */6 * * *”,  # Every 6 hours
)

defs = Definitions(
    jobs=[ecommerce_monitoring_job],
    schedules=[ecommerce_monitoring_schedule],
)

Save the file as pipeline_dagster.py and use dagster dev -f pipeline_dagster.py to run the Dagster server locally.

An automated pipeline built for the real world

A production-ready data extraction pipeline isn’t one that never fails; it’s one that knows how to fail well. It isolates errors, prevents insufficient data from flowing downstream and keeps the rest of the system running smoothly, even when the web gets unpredictable.

You can also use this pipeline as a template and scale your extraction logic, expand your sources and validate multiple data types with minimal overhead.

Clone the whole pipeline from the GitHub repo and customize it to your data sources, schema, and ML use case.