It’s easy to build a data extraction pipeline that works for a while. The hard part is building a pipeline that doesn’t break when things start to shift.
Some machine learning (ML) pipelines fail not because the model is incorrect, but because the data feeding it is flawed. APIs throttle. Schema structures change. IPs get blocked. Jobs fail quietly. And before you notice, corrupted inputs are degrading your model’s performance.
The real challenge is building a resilient automated pipeline that can adapt to the chaos of the real web. That means modular extractors, validation layers, observability and retry logic baked in from the start.
In this guide, you’ll learn how to design data pipelines that not only work once but continue to deliver high-quality data for your ML workflows even when individual components break. The pipeline remains intact, the system recovers and the data continues to flow.
What are automated data extraction pipelines?
An automated data extraction pipeline is a modular system designed to reliably collect, process and deliver web or API-sourced data for downstream applications like ML training, analytics or dashboards.
Unlike one-time scrapers, these pipelines:
- Extract data at set intervals or based on triggers
- Transform and validate incoming data in batch or near-real-time using structured logic
- Store or route outputs into systems like PostgreSQL, object storage or vector databases for indexing and retrieval
For example, a weekly fashion trends pipeline might:
- Scrape data or call APIs every 24 hours using scheduled jobs (like Prefect, Dagster or Airflow)
- Run the data through a validation and cleaning service (e.g., FastAPI or Pydantic models)
- Store structured data in PostgreSQL and embed rich metadata in Pinecone for retrieval
Automation reduces failure points, ensures consistency and allows for versioned, reproducible inputs, which are key requirements for ML models in production.
Why most data extraction pipelines break
Before we build a solid one, let’s talk about why most don’t last:
- Tight coupling: When your scraper is hardcoded to a specific page layout or DOM structure. If one tag changes, like a renamed class or reordered div, everything breaks.
- No observability: Without logs or monitoring tools, it’s impossible to tell if a job ran successfully, how long it took or where it failed. You won’t know if your data is delayed, malformed or missing until it’s too late.
- Lack of retry logic: A single timeout or server hiccup can bring down the whole process. Without catching and retrying failures, you’ll lose data without even knowing it.
- Scalability ignored: A pipeline that works fine with 1,000 rows might choke on 100,000 rows. Without memory management, batching or streaming, you’re one large dataset away from an Out-of-Memory error.
- Missing fail-safes: No validation steps, no alerts when things go wrong and no automatic fallbacks. You won’t notice misaligned data until your model produces unexpected results or fails to perform as expected.
Anatomy of a modern data pipeline
A modern data extraction pipeline is one that’s built to handle change and scale without breaking your entire workflow. Here’s an overview of what it should look like:
- Modular design: Each part of the pipeline, extraction, parsing, validation and delivery, is a self-contained unit. If one part fails, the others don’t collapse with it.
- Flexible selectors over fixed paths: Rather than relying on static element paths, the scraper uses XPath heuristics, semantic cues and AI-driven parsing to stay resilient as layouts evolve.
- Built-in observability: From job start to finish, the pipeline tracks every request, response and transformation. If something breaks, you know exactly where and why.
- Smart retry logic: Failed requests don’t kill the job. They trigger intelligent retries with exponential backoff, rotating proxies or fallback methods.
- Scalable infrastructure: Whether you’re pulling 100 rows or 10 million, the pipeline spins up distributed workers to keep pace, with minimal stress.
- Validation and alerts: The pipeline doesn’t just assume success. It checks data against expected formats or thresholds.
Tools for building automated data extraction pipelines (for ML workflows)
Here’s the tech stack for our extraction pipeline:
| Tool | What is it? | Why did we use it? |
| Pydantic | Schema and data validation library for Python | Defines strict, reusable data schemas to catch malformed records before they enter the ML pipeline |
| Playwright | Headless browser automation for dynamic sites | Extracts data from JavaScript-rendered pages (e.g., product listings) like a real user |
| ScraperAPI | API for scraping with built-in proxy rotation, CAPTCHA handling and headers | Helps extract data from sites with anti-bot protection, without managing infrastructure |
| Dagster | Data orchestration framework for building and monitoring jobs | Schedules and connects all pipeline stages with failure hooks and observability |
| Parquet + JSON | Data formats used for storage and model input | Stores clean, deduplicated datasets in formats optimized for ML and debugging |
You can easily replace any part of this pipeline with your preferred tools. For scraping, use Bright Data for AI-focused parsing or Apify for social media – in fact, both support hybrid browser and API-based extractors.. Switch validation libraries, from Pydantic to something like Cerberus or Marshmallow. Prefer Airflow or Prefect over Dagster? Go for it.
What matters isn’t the specific stack, it’s the architecture: Modular extractors, strict data contracts, resilient scheduling and observability at every stage.
Once you adopt that mindset, you can plug in whatever tools best fit your team, infrastructure or use case. Let’s dive into the main tutorial.
How to build a modern automated data extraction pipeline
We’ll walk you through how to build a pipeline that scrapes product data, like name, price and availability, from Amazon and delivers it in a clean, ML-ready format for a price prediction model.
Why Amazon? It’s a high-traffic, well-structured site that serves as a solid representation of how most modern e-commerce websites are built. Once this pipeline is working reliably with Amazon, you can easily extend it to scrape data from multiple retailers in the future or a different ML use case entirely.
Step 1: Define data requirements for your ML use case
Before you touch code, you need to clearly define what kind of data your ML model actually needs, how often it needs it, what format and quality it expects and what systems and decisions depend on the data.
Our test use case involves predicting future product prices based on historical data, merchant behavior and market signals. So our requirements will look like this:
| Requirement | Description |
| Domains | E-commerce (e.g., Amazon, eBay, Footlocker) |
| Data sources | Public product pages, APIs if available |
| Target fields | product_id, product_name, price, currency, availability, merchant, timestamp |
| Update frequency | Every six hours (new prices & availability change frequently) |
| Min records/run | 10,000 product records per scheduled job |
| Output formats | Parquet (for ML training & analytics), JSON (for retrieval/vector search), CSV (for prototyping/debugging) |
| Quality thresholds | ≥ 80% completeness, ≤ 20% null rate, Must pass schema validation (ProductData model) |
| Target site (current) | amazon.com |
| Monitoring metrics | Extraction success rate (%)Total records extractedInvalid recordsSchema drift detection |
| Alert triggers | Success rate < 90%Total records < 10,000Invalid record rate > 20%No output file generated within the expected window |
Your data requirements serve as a source of truth that shapes what you build, how you monitor it and how you know it’s working.
Step 2: Define the product data schema (data contract)
Once you know what data you need and where to get it, define a strict schema for it. This prevents malformed or missing data from slipping into your ML workflow.
We’ll use Pydantic to define a reusable data contract that all extractors must return. To create the data contract:
- Make a directory for the entire project. I called mine ml_data_pipeline.
| mkdir ml_data_pipeline cd ml_data_pipeline |
- Inside your directory, create a schema folder and a product_data.py file to store your data contract.
- Within your product data file, store all your data schemas for different types of data, like merchant_data or review_data. This keeps your schemas modular, readable and reusable across multiple pipelines.
For this tutorial, we’re defining a product_data.py because we only need product-related data for now.
| mkdir schema touch schema/product_data.py |
- In product_data.py, write a ProductData schema using Pydantic
Pydantic is a data validation library for Python. It’s used to define your data schema and offers built-in type validation, error handling and clean serialization, making your pipeline more reliable and maintainable.
It makes sure all incoming data matches expected formats, catching issues before they corrupt your ML workflow. It also integrates well with tools like Prefect and Dagster, making it ideal for modern, production-grade pipelines.
| class ProductData(BaseModel): product_id: str product_name: str price: float currency: str = “USD” availability: bool merchant: str timestamp: datetime = datetime.now(timezone.utc) class Config: extra = “allow” |
Here we’re creating a model called ProductData that inherits from BaseModel. This gives us automatic validation, type checking and serialization. Think of it as a contract: Every product record needs to follow this structure. Fields we’re capturing:
| product_id | Unique identifier for the product (string) |
| product_name | The product title |
| price | Float value, this gets cast/validated automatically |
| currency | Defaults to “USD” unless we specify otherwise |
| availability | Boolean (e.g., in stock = True) |
| merchant | The seller or brand name |
| timestamp | Captures the time the record was created or scraped. We default it to the current UTC time. |
However, we are also setting class Config: extra to “allow”. This instructs Pydantic not to raise an error if extra fields appear in the data. It just lets them pass through. That’s useful if Amazon changes its structure or adds metadata. We don’t lose the data, and the pipeline doesn’t crash.
- Create a test_schema.py file at the root of your project to simulate using the schema.
To make sure our schema actually works the way we expect, we wrote a simple test script using test_schema.py. All it does is take a sample product dictionary (like the kind you’d scrape from Amazon), validate it against the ProductData schema and print it out in dictionary format.
| from schema.product_data import ProductData data = { “product_id”: “SKU12345”, “product_name”: “Nike Air Max”, “price”: 129.99, “availability”: True, “merchant”: “Footlocker” } validated = ProductData(**data) print(validated.dict()) |
Pydantic instantly checks that:
- Price is a float
- Availability is a boolean
- Required fields like product_name and product_id are present
If anything is incorrect, such as passing a string for a price or omitting a required field, you’ll receive a clear, structured validation error. That means when Amazon changes its structure (like swapping class names or removing an element), your pipeline doesn’t just crash, or worse, pass through silent, insufficient data. You’ll catch the problem at the validation step and can log it, raise an alert or route it to a fallback.
You can pair Pydantic with observability tools to know when inputs shift. By failing fast, you prevent broken or incomplete data from polluting your storage, model features or downstream tasks.
Step 3: Build modular extractors for any site
To make the pipeline modular and source-agnostic, we start by defining a shared interface that all extractors must follow:
- In your root folder, create the extractor directory and base file:
| cd ml_data_pipepline mkdir extractor touch base.py |
- Inside base.py, add the following:
| from abc import ABC, abstractmethod from typing import List, Dict, Any from datetime import datetime, timezone |
These imports allow us to define an abstract base class, specify input/output types and generate UTC timestamps for extracted data.
- Define the BaseExtractor class. This is the base class from which all extractors will inherit.
| class BaseExtractor(ABC): |
- Add the extract() method signature. This method creates a template for extracting data from a single URL.
| @abstractmethod async def extract(self, url: str, config: Dict[str, Any]) -> Dict[str, Any]: pass |
- Add the extract_batch() method signature. This method creates a template for extracting data from multiple URLs in one run.
| @abstractmethod async def extract_batch( self, urls: List[str], config: Dict[str, Any] ) -> List[Dict[str, Any]]: pass |
- Create a _safe_result() utility method.
| Add the _safe_result() Utility Method def _safe_result(self, data=None, error=None): if data is not None and “timestamp” not in data: data[“timestamp”] = datetime.now(timezone.utc).isoformat() return { “success”: data is not None, “data”: data, “error”: str(error) if error else None, } |
This helper method wraps all extractor responses in a consistent format. If the data is valid, it adds a timestamp (if missing). If an error occurs, it captures the exception without crashing the pipeline.
Step 4: Implement extractors for different use cases
Now that you’ve defined a shared interface (BaseExtractor), the next step is to create real extractors that follow that contract. Each one implements the same two methods — extract() and extract_batch() — but uses a different strategy based on the site’s structure and behavior.
The benefit? Your pipeline stays modular. You can switch between scraping methods — headless browsers, proxy-based services, static HTML fetchers or mock extractors — without touching any downstream logic.
- In a extractor/playwright_extractor.py file, create a PlaywrightExtractor for dynamic, JavaScript pages.
If a site loads its content dynamically with JavaScript, static HTTP requests won’t cut it. That’s where Playwright comes in. Playwright emulates real browser behavior, allowing you to interact with the page as if you were a user. This extractor uses flexible strategies like XPath heuristics, semantic tags and content-aware fallbacks. That way, it can adapt as site layouts evolve.
| from .base import BaseExtractor from typing import Dict, List from playwright.async_api import async_playwright from datetime import datetime, timezone import re class PlaywrightExtractor(BaseExtractor): async def extract(self, url: str, config: Dict) -> Dict: try: async with async_playwright() as p: browser = await p.chromium.launch(headless=True) page = await browser.new_page() await page.goto(url, timeout=30000) html = await page.content() merchant = url.split(“/”)[2] product_id = None product_name = None price = None currency = “USD” availability = None # — Amazon selectors — if “amazon.” in merchant: # Product ID (ASIN) asin_match = re.search(r”/dp/([A-Z0-9]{10})”, url) product_id = asin_match.group(1) if asin_match else None if not product_id: asin_tag = await page.query_selector(“input#ASIN”) if asin_tag: product_id = await asin_tag.get_attribute(“value”) # Product name title_tag = await page.query_selector(“#productTitle”) if title_tag: product_name = (await title_tag.inner_text()).strip() # Price price_tag = await page.query_selector(“.a-offscreen”) if price_tag: price_text = (await price_tag.inner_text()).strip() currency = “USD” if price_text.startswith(“$”): currency = “USD” elif price_text.startswith(“£”): currency = “GBP” elif price_text.startswith(“€”): currency = “EUR” price = float(re.sub(r”[^\d.]”, “”, price_text)) # Availability avail_tag = await page.query_selector(“#availability”) if avail_tag: avail_text = (await avail_tag.inner_text()).lower() availability = ( “in stock” in avail_text or “available” in avail_text ) # — Jumia Example — elif “jumia.” in merchant: # Use appropriate selectors for Jumia… title_tag = await page.query_selector(“h1.-fs20”) if title_tag: product_name = (await title_tag.inner_text()).strip() price_tag = await page.query_selector(“span.-b”) if price_tag: price_text = (await price_tag.inner_text()).strip() currency = “NGN” price = float(re.sub(r”[^\d.]”, “”, price_text)) avail_tag = await page.query_selector(“div.-df.-i-ctr.-gy-5”) if avail_tag: avail_text = (await avail_tag.inner_text()).lower() availability = ( “in stock” in avail_text or “available” in avail_text ) # — Generic fallback — else: # Fallback to generic selectors for title, price, etc. h1 = await page.query_selector(“h1”) product_name = (await h1.inner_text()).strip() if h1 else None price_sel = await page.query_selector(“.price, [class*=’price’]”) price_text = ( (await price_sel.inner_text()).strip() if price_sel else None ) if price_text: price = float(re.sub(r”[^\d.]”, “”, price_text)) # Try basic keywords for availability html_text = html.lower() if ( “out of stock” in html_text or “currently unavailable” in html_text ): availability = False elif “in stock” in html_text or “available” in html_text: availability = True # Product ID fallback: last path part or query param path = url.split(“/”) product_id = path[-1][:10] if path else None # Timestamp timestamp = datetime.now(timezone.utc).isoformat() await browser.close() return self._safe_result( data={ “product_id”: product_id, “product_name”: product_name, “price”: price, “currency”: currency, “availability”: availability, “merchant”: merchant, “timestamp”: timestamp, } ) except Exception as e: return self._safe_result(data=None, error=e) async def extract_batch(self, urls: List[str], config: Dict) -> List[Dict]: return [await self.extract(u, config) for u in urls] |
- Create another file: extractor/scraperapi_extractor.py: ScraperAPI programmatically handles proxy rotation, request headers and access control challenges, improving reliability when accessing content at scale. It’s well-suited for high-volume scraping, especially when direct access is inconsistent or blocked.
| from .base import BaseExtractor from typing import Dict, List import requests from bs4 import BeautifulSoup from datetime import datetime, timezone import re from urllib.parse import urlparse class ScraperAPIExtractor(BaseExtractor): def __init__(self): self.base_url = “https://api.scraperapi.com/” self.api_key = “(Replace with your own API key, ideally from a secure config source or environment variable)” async def extract(self, url: str, config: Dict) -> Dict: payload = {“api_key”: self.api_key, “url”: url} try: r = requests.get(self.base_url, params=payload, timeout=30) r.raise_for_status() html = r.text soup = BeautifulSoup(html, “html.parser”) domain = urlparse(url).netloc.lower() # Defaults product_id = None product_name = None price = None currency = “USD” availability = False merchant = domain.split(“.”)[-2].capitalize() # ———————- AMAZON ———————- if “amazon” in domain: # Product ID (ASIN) asin_tag = soup.find(“input”, {“id”: “ASIN”}) if asin_tag: product_id = asin_tag.get(“value”, None) if not product_id: match = re.search(r”/dp/([A-Z0-9]{10})”, url) if match: product_id = match.group(1) # Product Name title_tag = soup.find(id=”productTitle”) product_name = title_tag.text.strip() if title_tag else None # Price price_tag = soup.find(“span”, {“class”: “a-offscreen”}) if price_tag: price_text = price_tag.text.strip() if price_text.startswith(“$”): currency = “USD” elif price_text.startswith(“£”): currency = “GBP” elif price_text.startswith(“€”): currency = “EUR” price = float(re.sub(r”[^\d.]”, “”, price_text)) # Availability availability = self._extract_availability(soup) # Merchant merchant_tag = soup.find(“a”, id=”bylineInfo”) if merchant_tag: merchant = merchant_tag.text.strip() # ———————- JUMIA ———————- elif “jumia” in domain: # Product ID (SKU) sku_tag = soup.find(“input”, {“name”: “sku”}) if sku_tag: product_id = sku_tag.get(“value”, None) # Product Name name_tag = soup.find(“h1”, {“class”: re.compile(“prd-title”, re.I)}) product_name = name_tag.text.strip() if name_tag else None # Price price_tag = soup.find(“span”, {“class”: re.compile(“price”, re.I)}) if price_tag: price_text = price_tag.text.strip() price = float(re.sub(r”[^\d.]”, “”, price_text)) currency = “NGN” # Availability # Jumia hides “out of stock” in product-unavailable, else it’s available if soup.find(“div”, {“class”: re.compile(“product-unavailable”, re.I)}): availability = False else: availability = True # Merchant merchant_tag = soup.find( “a”, {“class”: re.compile(“seller-name”, re.I)} ) if merchant_tag: merchant = merchant_tag.text.strip() # ———————- WALMART ———————- elif “walmart” in domain: # Product ID (ID in URL) match = re.search(r”/(\d{6,})”, url) if match: product_id = match.group(1) # Product Name name_tag = soup.find(“h1”) product_name = name_tag.text.strip() if name_tag else None # Price price_tag = soup.find(“span”, {“class”: re.compile(“price”, re.I)}) if price_tag: price_text = price_tag.text.strip() price = float(re.sub(r”[^\d.]”, “”, price_text)) currency = “USD” # Availability availability = self._extract_availability( soup, in_stock_keywords=[“in stock”, “pickup today”, “delivery”], out_stock_keywords=[“out of stock”, “not available”], ) # Merchant merchant = “Walmart” # ———————- GENERIC (fallback) ———————- else: # Try to find a product name by common selectors name_tag = soup.find( [“h1”, “h2”], {“class”: re.compile(“(title|name)”, re.I)} ) product_name = name_tag.text.strip() if name_tag else None # Price price_tag = soup.find( “span”, {“class”: re.compile(“(price|amount)”, re.I)} ) if price_tag: price_text = price_tag.text.strip() price = float(re.sub(r”[^\d.]”, “”, price_text)) # Merchant merchant = domain.split(“.”)[-2].capitalize() # Availability (generic keyword scan) availability = self._extract_availability(soup) # Timestamp timestamp = datetime.now(timezone.utc).isoformat() return self._safe_result( data={ “product_id”: product_id, “product_name”: product_name, “price”: price, “currency”: currency, “availability”: availability, “merchant”: merchant, “timestamp”: timestamp, } ) except Exception as e: return self._safe_result(data=None, error=e) def _extract_availability( self, soup, in_stock_keywords=None, out_stock_keywords=None ): “””Universal availability extraction with keyword scanning.””” if in_stock_keywords is None: in_stock_keywords = [ “in stock”, “available to ship”, “usually ships”, “order soon”, “pickup today”, “delivery”, “available”, ] if out_stock_keywords is None: out_stock_keywords = [ “out of stock”, “currently unavailable”, “temporarily out of stock”, “not available”, ] # Try id=”availability” avail_tag = soup.find(attrs={“id”: “availability”}) avail_text = avail_tag.get_text(strip=True).lower() if avail_tag else “” # Try class if not avail_text: avail_tag = soup.find(attrs={“class”: re.compile(“availability”, re.I)}) if avail_tag: avail_text = avail_tag.get_text(strip=True).lower() # Fallback: keyword scan if not avail_text: page_text = soup.get_text(separator=” “, strip=True).lower() for txt in in_stock_keywords + out_stock_keywords: if txt in page_text: avail_text = txt break if avail_text: if any(k in avail_text for k in in_stock_keywords): return True if any(k in avail_text for k in out_stock_keywords): return False return None # Unknown async def extract_batch(self, urls: List[str], config: Dict) -> List[Dict]: return [await self.extract(url, config) for url in urls] |
Make sure your API key is retrieved from os.environ or passed via configuration.
- Create another file extractor/mock_extractor.py: For local testing and debugging. This is useful when developing or testing other parts of the pipeline without making real requests.
| from .base import BaseExtractor from typing import Dict, List import asyncio class MockExtractor(BaseExtractor): async def extract(self, url: str, config: Dict) -> Dict: await asyncio.sleep(0.1) # simulate network latency return self._safe_result( data={ “product_id”: “SKU123”, “product_name”: “Test Shoe”, “price”: 99.99, “currency”: “USD”, “availability”: True, “merchant”: “MockStore”, } ) async def extract_batch(self, urls: List[str], config: Dict) -> List[Dict]: return [await self.extract(url, config) for url in urls] |
The mock extractor returns dummy data instantly using the BaseExtractor interface. This keeps your pipeline fast and predictable during development.
Step 5: Route extractors with a factory
Create a factory.py file inside your extractor folder. This is where you centralize routing logic. Instead of hardcoding which extractor to use in your main pipeline, you create a small factory function called get_extractor that picks which scraping backend to use, based on the name you pass in.
It supports three options:
- “mock” – for testing locally with fake data (MockExtractor)
- “scraperapi” – to use the ScraperAPI backend (good for sites that block direct requests)
- “playwright” – to run a headless browser and scrape JavaScript-heavy pages
| import os from .mock_extractor import MockExtractor from .scraperapi_extractor import ScraperAPIExtractor from .playwright_extractor import PlaywrightExtractor def get_extractor(name: str = “mock”): if name == “scraperapi”: return ScraperAPIExtractor() elif name == “playwright”: return PlaywrightExtractor() elif name == “mock”: return MockExtractor() else: raise ValueError(f”Unknown extractor: {name}”) |
Now your main job runner just does:
| extractor = get_extractor(url) data = await extractor.extract(url, config) |
Step 6: Clean and validate the data
In the root directory, create a validate.py file where you can clean and validate the data your scrapers are extracting against the data schema you defined in step two.
| def clean_record(raw: dict) -> dict: try: validated = ProductData(**raw) return {“success”: True, “data”: validated.dict()} except Exception as e: return {“success”: False, “error”: str(e), “raw”: raw} def validate_batch(records: list) -> list: return [ clean_record(record.get(“data”, {})) for record in records if record.get(“success”) |
Here’s what this code does:
- We’re importing our product schema. This is the ProductData model we defined earlier with Pydantic. It serves as our contract; every record must match it.
- clean_record() validates a single record. It tries to create a ProductData instance from the raw data. If it passes, we return it as a clean dict. If it fails, we return the raw record along with the error, so we don’t just silently drop anything.
- validate_batch() runs the validation across all extractor outputs. We loop through every record where extraction was successful, grab the raw data and validate it using clean_record().
- The result is a list of structured outputs: Clean data or flagged errors.
This helps us catch bad inputs before they pollute the pipeline. Whether it’s a missing field, a type mismatch or a layout shift upstream, this layer protects everything downstream, from storage to model training.
Step 7: Convert to ML formats
The format_for_ml function takes your raw scraped product data and transforms it into a clean, ML-ready DataFrame. It ensures every record has a timestamp, calculates how old each entry is, converts availability to a numeric format for modeling and removes unnecessary columns, so your data is perfectly structured for training machine learning models or running analytics.
| def format_for_ml(data: list[dict]) -> pd.DataFrame: df = pd.DataFrame(data) if “timestamp” not in df.columns: df[“timestamp”] = datetime.now(timezone.utc) df[“timestamp”] = pd.to_datetime(df[“timestamp”], errors=”coerce”) df[“days_since_seen”] = (datetime.now(timezone.utc) – df[“timestamp”]).dt.days df[“in_stock”] = df[“availability”].astype(int) df.drop(columns=[“availability”], inplace=True, errors=”ignore”) return df |
Step 8: Orchestrate with Dagster
Dagster is a data orchestration platform that lets you define each step of your data process as an op (operation). These are connected in a job (a directed graph), which ensures your data is processed in the right order, handles errors and supports powerful logging, testing and scheduling.
In our pipeline:
- Ops are defined for:
- Loading URLs to scrape
- Extracting product data from those URLs
- Validating and cleaning the extracted data
- Saving the final output in JSON and Parquet
- Training an ML model on the processed data
- The job connects all these ops in sequence.
- A schedule triggers this job automatically every 6 hours (using cron).
| from dagster import op, job, ScheduleDefinition, Definitions, get_dagster_logger, Failure, Output, failure_hook import os import asyncio import pandas as pd # — For extraction, validation, and formatting — from ml_formatter import format_for_ml from extractors.factory import get_extractor from validator import validate_batch @op def load_urls_op(context) -> list: path = “urls.txt” with open(path) as f: urls = [line.strip() for line in f if line.strip()] if not urls: raise Failure(“No URLs loaded from file!”) context.log.info(f”Loaded {len(urls)} URLs.”) return urls @op def extract_data(context, urls: list, extractor_type=”scraperapi”) -> list: extractor = get_extractor(extractor_type) results = asyncio.run(extractor.extract_batch(urls, {})) if not results or all(r.get(“data”) is None for r in results): raise Failure(“All extractions failed.”) context.log.info(f”Extracted {len(results)} records.”) return results @op def validate_data(context, raw_data: list) -> list: validated = validate_batch(raw_data) n_valid = sum(1 for v in validated if v.get(“success”)) if n_valid == 0: raise Failure(“No valid records after validation!”) context.log.info(f”Validated {n_valid} records.”) return validated @op def save_data(context, validated: list) -> int: os.makedirs(“output”, exist_ok=True) new_df = format_for_ml([v[“data”] for v in validated if v.get(“success”)]) json_path = “output/data.json” parquet_path = “output/data.parquet” if os.path.exists(json_path): old_df = pd.read_json(json_path) combined_df = pd.concat([old_df, new_df], ignore_index=True) if not pd.api.types.is_datetime64_any_dtype(combined_df[“timestamp”]): combined_df[“timestamp”] = pd.to_datetime(combined_df[“timestamp”], errors=”coerce”) combined_df = combined_df.sort_values(“timestamp”).drop_duplicates(“product_id”, keep=”last”) else: combined_df = new_df combined_df.to_parquet(parquet_path, index=False) combined_df.to_json(json_path, orient=”records”, indent=2, date_format=”iso”) if len(combined_df) < 10: raise Failure(f”ALERT: Only {len(combined_df)} records saved!”) context.log.info(f”Saved {len(combined_df)} records.”) return len(combined_df) |
Step 9: Add monitoring and alerts
No pipeline is reliable without monitoring! Here, we set up proactive failure alerts:
- If any step fails (e.g., data extraction returns no results or validation finds no suitable records), the pipeline sends an email notification.
- This uses Dagster’s @failure_hook mechanism.
- The email contains info about which job, which op, the run ID and the error message.
This makes sure you never “silently” miss a failure; problems are brought to your attention immediately, not weeks later when the business is affected.
| import smtplib from email.mime.text import MIMEText from dagster import failure_hook # — Email Config (edit these) — EMAIL_FROM = “your_email@gmail.com” EMAIL_TO = “your_destination_email@gmail.com” EMAIL_PASSWORD = “your_app_password” # For Gmail, use an “App Password” not your main password SMTP_SERVER = “smtp.gmail.com” SMTP_PORT = 587 def send_failure_email(subject, body): msg = MIMEText(body) msg[“Subject”] = subject msg[“From”] = EMAIL_FROM msg[“To”] = EMAIL_TO try: with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: server.starttls() server.login(EMAIL_FROM, EMAIL_PASSWORD) server.sendmail(EMAIL_FROM, [EMAIL_TO], msg.as_string()) except Exception as e: print(“Email alert failed:”, e) @failure_hook def email_failure_hook(context): subject = f”Dagster Pipeline FAILURE: {context.job_name}” body = ( f”Job: {context.job_name}\n” f”Op: {context.op.name}\n” f”Run: {context.run_id}\n” f”Error: {context.failure_description}” ) send_failure_email(subject, body) |
Step 10: Connect to ML pipeline
The final step of our process is to train a machine learning model on the newly validated, deduplicated data.
This is part of the same orchestrated flow, so you never have stale or out-of-date models.
The ML step:
- Loads the latest saved data
- Trains a regression model (RandomForestRegressor) to predict product prices
It also stores the model and training metrics for future reference. If there’s not enough data or a required column is missing, it raises an error, triggering monitoring. This keeps our analytics or recommendation system always up to date.
| @op def train_ml_model(context): import pandas as pd from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error import joblib import os df = pd.read_parquet(“output/data.parquet”) if df.empty: raise Failure(“No data to train ML model.”) features = [“availability”] # You can add more features as needed if not all(f in df.columns for f in features): raise Failure(f”Missing feature columns: {features}”) X = df[features].astype(float) y = df[“price”].astype(float) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) model = RandomForestRegressor(n_estimators=10, random_state=42) model.fit(X_train, y_train) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) context.log.info(f”ML Model trained. Test MSE: {mse:.4f}”) os.makedirs(“models”, exist_ok=True) joblib.dump(model, “models/price_predictor.joblib”) with open(“models/metrics.txt”, “w”) as f: f.write(f”Test MSE: {mse:.4f}\n”) return {“test_mse”: mse} |
Bringing orchestration together: Jobs and schedules
Now we connect everything; data loading, extraction, validation and monitoring into a single Dagster job. We also schedule it to run every 6 hours and attach our failure alert hook.
| from dagster import ( op, job, get_dagster_logger, Failure, Output, failure_hook, ScheduleDefinition, Definitions, ) import pandas as pd import os import asyncio import smtplib from email.mime.text import MIMEText import pandas as pd from sklearn.ensemble import RandomForestRegressor from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error import joblib # — Your own imports for extraction & ML — from ml_formatter import format_for_ml from extractors.factory import get_extractor from validator import validate_batch # — Email Config (edit these) — EMAIL_FROM = “your_email@gmail.com” EMAIL_TO = “your_destination_email@gmail.com” EMAIL_PASSWORD = ( “your_app_password” # For Gmail, use an “App Password” not your main password ) SMTP_SERVER = “smtp.gmail.com” SMTP_PORT = 587 # — Email notification hook — def send_failure_email(subject, body): msg = MIMEText(body) msg[“Subject”] = subject msg[“From”] = EMAIL_FROM msg[“To”] = EMAIL_TO try: with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: server.starttls() server.login(EMAIL_FROM, EMAIL_PASSWORD) server.sendmail(EMAIL_FROM, [EMAIL_TO], msg.as_string()) except Exception as e: print(“Email alert failed:”, e) @failure_hook def email_failure_hook(context): subject = f”Dagster Pipeline FAILURE: {context.job_name}” body = ( f”Job: {context.job_name}\n” f”Op: {context.op.name}\n” f”Run: {context.run_id}\n” f”Error: {context.failure_description}” ) send_failure_email(subject, body) # — Ops — @op def load_urls_op(context) -> list: path = “urls.txt” with open(path) as f: urls = [line.strip() for line in f if line.strip()] if not urls: raise Failure(“No URLs loaded from file!”) context.log.info(f”Loaded {len(urls)} URLs.”) return urls @op def extract_data(context, urls: list, extractor_type=”scraperapi”) -> list: extractor = get_extractor(extractor_type) results = asyncio.run(extractor.extract_batch(urls, {})) if not results or all(r.get(“data”) is None for r in results): raise Failure(“All extractions failed.”) context.log.info(f”Extracted {len(results)} records.”) return results @op def validate_data(context, raw_data: list) -> list: validated = validate_batch(raw_data) n_valid = sum(1 for v in validated if v.get(“success”)) if n_valid == 0: raise Failure(“No valid records after validation!”) context.log.info(f”Validated {n_valid} records.”) return validated @op def save_data(context, validated: list) -> int: os.makedirs(“output”, exist_ok=True) new_df = format_for_ml([v[“data”] for v in validated if v.get(“success”)]) json_path = “output/data.json” parquet_path = “output/data.parquet” if os.path.exists(json_path): old_df = pd.read_json(json_path) combined_df = pd.concat([old_df, new_df], ignore_index=True) if not pd.api.types.is_datetime64_any_dtype(combined_df[“timestamp”]): combined_df[“timestamp”] = pd.to_datetime( combined_df[“timestamp”], errors=”coerce” ) combined_df = combined_df.sort_values(“timestamp”).drop_duplicates( “product_id”, keep=”last” ) else: combined_df = new_df combined_df.to_parquet(parquet_path, index=False) combined_df.to_json(json_path, orient=”records”, indent=2, date_format=”iso”) if len(combined_df) < 10: raise Failure(f”ALERT: Only {len(combined_df)} records saved!”) context.log.info(f”Saved {len(combined_df)} records.”) return len(combined_df) @op def train_ml_model(context): df = pd.read_parquet(“output/data.parquet”) if df.empty: raise Failure(“No data to train ML model.”) # Minimal preprocessing features = [] if “price” in df.columns and “availability” in df.columns: features = [“availability”] # You can add more features here X = df[features].astype(float) y = df[“price”].astype(float) else: raise Failure(“Required columns (‘price’, ‘availability’) not found in data.”) # Train/Test Split X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # Model model = RandomForestRegressor(n_estimators=10, random_state=42) model.fit(X_train, y_train) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) context.log.info(f”ML Model trained. Test MSE: {mse:.4f}”) # Save the model to file for future prediction runs os.makedirs(“models”, exist_ok=True) joblib.dump(model, “models/price_predictor.joblib”) # Optionally, save metrics for monitoring with open(“models/metrics.txt”, “w”) as f: f.write(f”Test MSE: {mse:.4f}\n”) return {“test_mse”: mse} # — Job (connect all) — @job(hooks={email_failure_hook}) def ecommerce_monitoring_job(): urls = load_urls_op() raw = extract_data(urls) validated = validate_data(raw) n = save_data(validated) train_ml_model() ecommerce_monitoring_schedule = ScheduleDefinition( job=ecommerce_monitoring_job, cron_schedule=”0 */6 * * *”, # Every 6 hours ) defs = Definitions( jobs=[ecommerce_monitoring_job], schedules=[ecommerce_monitoring_schedule], ) |
Save the file as pipeline_dagster.py and use dagster dev -f pipeline_dagster.py to run the Dagster server locally.
An automated pipeline built for the real world
A production-ready data extraction pipeline isn’t one that never fails; it’s one that knows how to fail well. It isolates errors, prevents insufficient data from flowing downstream and keeps the rest of the system running smoothly, even when the web gets unpredictable.
You can also use this pipeline as a template and scale your extraction logic, expand your sources and validate multiple data types with minimal overhead.
Clone the whole pipeline from the GitHub repo and customize it to your data sources, schema, and ML use case.