← Back to all products
$59
Spark ETL Framework
Production PySpark ETL framework with schema validation, data quality checks, incremental processing, and medallion architecture.
PythonYAMLMarkdownJSONDatabricksPySparkSparkDelta Lake
📁 File Structure 16 files
spark-etl-framework/
├── LICENSE
├── README.md
├── configs/
│ ├── pipeline_config.yaml
│ └── quality_rules.yaml
├── guides/
│ └── etl-patterns.md
├── notebooks/
│ ├── backfill.py
│ └── run_pipeline.py
├── src/
│ ├── bronze_loader.py
│ ├── config_manager.py
│ ├── etl_base.py
│ ├── gold_aggregator.py
│ ├── quality_gate.py
│ └── silver_transformer.py
└── tests/
├── conftest.py
└── test_etl_base.py
📖 Documentation Preview README excerpt
Spark ETL Framework
Production-ready medallion architecture ETL framework for Databricks and Apache Spark.
Build reliable, observable, and maintainable data pipelines with a battle-tested extract-transform-load pattern that scales from prototype to petabyte.
---
What You Get
- Abstract ETL base class with built-in logging, metrics collection, and error handling
- Medallion architecture (Bronze / Silver / Gold) with production patterns baked in
- Data quality gates between every layer — catch issues before they propagate
- YAML-driven configuration with environment overrides and secret scope integration
- Databricks notebooks for orchestration and date-range backfills
- Comprehensive test suite with mock Spark sessions and fixture data
- Architecture guide covering idempotency, partitioning, and error recovery
File Tree
spark-etl-framework/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│ ├── etl_base.py # Abstract base ETL class (~200 lines)
│ ├── bronze_loader.py # Bronze layer ingestion (~150 lines)
│ ├── silver_transformer.py # Silver layer transforms (~200 lines)
│ ├── gold_aggregator.py # Gold layer aggregations (~150 lines)
│ ├── quality_gate.py # Inter-layer quality checks (~120 lines)
│ └── config_manager.py # YAML config + env overrides (~100 lines)
├── configs/
│ ├── pipeline_config.yaml # Pipeline configuration
│ └── quality_rules.yaml # Quality rule definitions
├── notebooks/
│ ├── run_pipeline.py # Orchestration entry point
│ └── backfill.py # Date-range backfill utility
├── tests/
│ ├── conftest.py # Spark fixtures & sample data
│ └── test_etl_base.py # Unit tests
└── guides/
└── etl-patterns.md # Architecture & patterns guide
Getting Started
1. Configure Your Pipeline
Edit configs/pipeline_config.yaml to define your source, destination, and quality rules:
pipeline:
name: "customer_orders"
schedule: "0 6 * * *"
source:
format: "json"
path: "/mnt/raw/customer_orders/"
*... continues with setup instructions, usage examples, and more.*
📄 Code Sample .py preview
src/bronze_loader.py
"""
Bronze Layer Loader — Spark ETL Framework
============================================
Ingests raw data into the Bronze (landing) layer of the medallion
architecture. Automatically infers schema, appends audit metadata
columns, and writes to Delta format.
Metadata columns added:
_source_file — originating file path
_ingested_at — UTC ingestion timestamp
_batch_id — unique batch identifier
By Datanest Digital — https://datanest.dev
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, TimestampType
from src.etl_base import ETLBase
class BronzeLoader(ETLBase):
"""
Load raw files into the Bronze Delta table with metadata enrichment.
Supports JSON, CSV, Parquet, and Avro source formats. Schema can be
explicitly provided via config or auto-inferred from the source data.
Config keys (under ``source``):
format: str — file format (json | csv | parquet | avro)
path: str — source directory path
options: dict — extra reader options (e.g. ``{"header": "true"}``)
schema: dict — optional explicit StructType definition
Config keys (under ``destination``):
database: str — target database / catalog
table: str — target table name
path: str — Delta table storage path (optional)
"""
def __init__(
self,
spark: SparkSession,
pipeline_name: str,
# ... 127 more lines ...