← Back to all products

CDC Replication Toolkit

$49

Change data capture patterns with Debezium, database replication scripts, event sourcing integration, and consistency checks.

📁 19 files🏷 v1.0.0
PythonYAMLMarkdownJSONDatabricksPostgreSQL

📁 File Structure 19 files

cdc-replication-toolkit/ ├── LICENSE ├── README.md ├── configs/ │ ├── replication_config.yaml │ └── source_mappings/ │ ├── mysql_to_delta.yaml │ ├── postgres_to_delta.yaml │ └── sqlserver_to_delta.yaml ├── guides/ │ └── cdc-replication-guide.md ├── notebooks/ │ ├── replication_status.py │ └── start_replication.py ├── src/ │ ├── cdc_processor.py │ ├── debezium_parser.py │ ├── merge_applier.py │ ├── offset_manager.py │ ├── replication_monitor.py │ └── schema_mapper.py └── tests/ ├── conftest.py ├── test_cdc_processor.py └── test_debezium_parser.py

📖 Documentation Preview README excerpt

CDC Replication Toolkit

Production-ready change data capture pipeline for Databricks — Debezium parsing, MERGE INTO application, offset management, and replication monitoring.

By [Datanest Digital](https://datanest.dev) | Version 1.0.0 | $49

---

What You Get

  • CDC Event Processor — Process insert/update/delete events and apply to target Delta tables
  • Debezium Parser — Parse Debezium JSON envelope format with before/after images
  • MERGE Applier — Apply CDC changes via MERGE INTO with full operation type support
  • Offset Manager — Track Kafka offsets and LSN positions with checkpoint management
  • Schema Mapper — Map source database schemas to Delta targets with type conversion
  • Replication Monitor — Track replication lag, throughput, and error rates

File Tree


cdc-replication-toolkit/
├── README.md
├── manifest.json
├── LICENSE
├── src/
│   ├── cdc_processor.py            # CDC event processing engine
│   ├── debezium_parser.py          # Debezium JSON format parser
│   ├── merge_applier.py            # MERGE INTO change application
│   ├── offset_manager.py           # Kafka offset & LSN tracking
│   ├── schema_mapper.py            # Source→target schema mapping
│   └── replication_monitor.py      # Lag, throughput, error monitoring
├── configs/
│   ├── replication_config.yaml     # Main replication configuration
│   └── source_mappings/
│       ├── postgres_to_delta.yaml  # PostgreSQL type mappings
│       ├── mysql_to_delta.yaml     # MySQL type mappings
│       └── sqlserver_to_delta.yaml # SQL Server type mappings
├── notebooks/
│   ├── start_replication.py        # Start CDC streaming pipeline
│   └── replication_status.py       # Replication status dashboard
├── tests/
│   ├── conftest.py                 # Shared fixtures
│   ├── test_cdc_processor.py       # CDC processing tests
│   └── test_debezium_parser.py     # Debezium parsing tests
└── guides/
    └── cdc-replication-guide.md    # CDC patterns & Debezium guide

Getting Started

1. Configure Source Mappings

Edit configs/replication_config.yaml with your source database and Kafka settings:


source:
  type: postgres
  kafka_bootstrap_servers: "broker1:9092,broker2:9092"
  topic_prefix: "dbserver1"
  tables:

*... continues with setup instructions, usage examples, and more.*

📄 Code Sample .py preview

src/cdc_processor.py """ CDC Processor — Process CDC events from Kafka and apply to target Delta tables. Orchestrates the end-to-end CDC pipeline: read from Kafka, parse Debezium envelopes, map schemas, apply changes via MERGE INTO, and manage offsets. Author: Datanest Digital """ from __future__ import annotations import logging from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional import yaml from pyspark.sql import DataFrame, SparkSession from pyspark.sql import functions as F from pyspark.sql.streaming import StreamingQuery from pyspark.sql.types import StringType, StructField, StructType logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- class OperationType(str, Enum): """CDC operation types aligned with Debezium conventions.""" CREATE = "c" UPDATE = "u" DELETE = "d" READ = "r" # Snapshot read @dataclass class CDCEvent: """Parsed CDC event with before/after images.""" operation: OperationType table: str before: Optional[Dict[str, Any]] = None after: Optional[Dict[str, Any]] = None source_ts_ms: Optional[int] = None lsn: Optional[str] = None metadata: Dict[str, Any] = field(default_factory=dict) # ... 201 more lines ...