← Back to all products

Streaming Pipeline Kit

$49

Kafka consumer/producer templates, Spark Structured Streaming jobs, exactly-once processing patterns, and dead letter queues.

📁 15 files🏷 v1.0.0
PythonYAMLJSONMarkdownDatabricksSparkDelta Lake

📁 File Structure 15 files

streaming-pipeline-kit/ ├── LICENSE ├── README.md ├── configs/ │ ├── schemas/ │ │ ├── order_events.avsc │ │ └── user_events.avsc │ └── streaming_config.yaml ├── guides/ │ └── streaming-patterns.md ├── notebooks/ │ ├── monitor_streams.py │ └── start_stream.py ├── src/ │ ├── event_processor.py │ ├── kafka_consumer.py │ ├── schema_registry.py │ ├── stream_monitor.py │ └── stream_to_delta.py └── tests/ └── test_event_processor.py

📖 Documentation Preview README excerpt

Streaming Pipeline Kit

Real-time data pipelines that just work. Production-ready Spark Structured Streaming

templates with Kafka integration, exactly-once semantics, and Delta Lake sinks.

By [Datanest Digital](https://datanest.dev) | Version 1.0.0 | $49

---

What You Get

  • Kafka Consumer — Structured Streaming reader with schema registry integration,

watermarking, and configurable checkpointing

  • Event Processor — Deduplication by event ID, late-arrival handling, and windowed

aggregations with customizable window sizes

  • Delta Lake WriterforeachBatch sink with merge/append modes, schema evolution,

and inline data quality checks

  • Stream Monitor — Query progress listener, consumer lag tracking, dead letter queue

routing, and alerting hooks

  • Schema Registry Client — Confluent-compatible client for fetching, registering,

and validating Avro schemas with compatibility checks

  • Databricks Notebooks — Ready-to-run notebooks for starting streams and monitoring

active queries in real time

  • Avro Schemas — Example schemas for user events and order events
  • Streaming Patterns Guide — Best practices for watermarks, triggers, state management,

and failure recovery

File Tree


streaming-pipeline-kit/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│   ├── kafka_consumer.py          # Kafka source with schema registry
│   ├── event_processor.py         # Dedup, late arrivals, windowed aggs
│   ├── stream_to_delta.py         # foreachBatch Delta Lake writer
│   ├── stream_monitor.py          # Progress listener & lag monitoring
│   └── schema_registry.py         # Schema Registry client
├── configs/
│   ├── streaming_config.yaml      # Kafka, checkpoint, trigger settings
│   └── schemas/
│       ├── user_events.avsc       # User event Avro schema
│       └── order_events.avsc      # Order event Avro schema
├── notebooks/
│   ├── start_stream.py            # Launch streaming pipeline
│   └── monitor_streams.py         # Real-time monitoring dashboard
├── tests/
│   └── test_event_processor.py    # Unit tests for event processing
└── guides/
    └── streaming-patterns.md      # Patterns & best practices

Getting Started

1. Configure your environment

Edit configs/streaming_config.yaml with your Kafka bootstrap servers,

schema registry URL, and checkpoint locations:

... continues with setup instructions, usage examples, and more.

📄 Code Sample .py preview

src/event_processor.py """ Event Processor — Deduplication, late-arrival handling, and windowed aggregations. Provides a composable processing pipeline for streaming events: - Exact deduplication by event ID within watermark window - Late-arrival detection and routing to a separate stream - Tumbling and sliding window aggregations - Custom transformation hooks Designed for Databricks Runtime — uses global `spark` session. By Datanest Digital | https://datanest.dev """ from __future__ import annotations from dataclasses import dataclass, field from enum import Enum from typing import Any, Callable, Optional from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StructType from pyspark.sql.window import Window class WindowType(Enum): """Supported window aggregation types.""" TUMBLING = "tumbling" SLIDING = "sliding" SESSION = "session" @dataclass class WindowConfig: """Configuration for windowed aggregations.""" window_type: WindowType = WindowType.TUMBLING window_duration: str = "5 minutes" slide_duration: Optional[str] = None # Only for sliding windows event_time_column: str = "event_timestamp" group_columns: list[str] = field(default_factory=list) aggregations: dict[str, str] = field(default_factory=dict) # Example: {"amount": "sum", "event_id": "count"} @dataclass class DeduplicationConfig: """Configuration for event deduplication.""" # ... 336 more lines ...