Operational data synced hourly from MongoDB into Snowflake — fully orchestrated, watermark-driven, no data loss
Note: Cloud accounts are optional for local development. The pipeline uses mock connectors automatically when credentials are not set — all 20+ tests pass without a Snowflake or MongoDB account.
flowchart LR
A["MongoDB Atlas\nOrders Collection\n(operational DB)"] -->|"pymongo\nextract"| B["MongoConnector\n(incremental\nwatermark)"]
B -->|"flatten\nnested docs"| C["normalize.py\nTransformation\nLayer"]
C -->|"type coercion\nbusiness rules\nderived columns"| D["SnowflakeLoader\nMERGE upsert"]
D -->|"MERGE INTO\nupsert on ORDER_ID"| E["Snowflake\nRAW_ORDERS\n(analytics DW)"]
F["Apache Airflow\nDAG Orchestration"] -.->|"schedules\nretries\nmonitoring"| B
F -.->|"XCom metrics"| G["Airflow UI\nMetrics + Logs"]
Incremental sync pattern:
- Airflow scheduler triggers the DAG on the configured interval (default: hourly)
check_connectionsvalidates both MongoDB and Snowflake are reachableextract_transform_loadpulls only records updated since the last watermark- MERGE INTO ensures no duplicates — safe to re-run at any time (idempotent)
log_metricsreports row counts extracted and loaded per entity
| Component | Technology | Why |
|---|---|---|
| Orchestration | Apache Airflow 2.9 | Industry-standard scheduler with retry logic, UI monitoring, and DAG dependencies |
| Source | MongoDB Atlas | Represents real operational/transactional systems (NoSQL, flexible schema) |
| Destination | Snowflake | Cloud DW with auto-scaling, zero-copy cloning, and native SQL analytics |
| Connector | snowflake-connector-python | Official Snowflake Python driver with pandas integration |
| Transformation | pandas | Tabular data manipulation for schema normalization |
| Testing | pytest + mock connectors | Full test coverage without cloud accounts |
| Containerisation | Docker Compose | Single-command Airflow setup |
Run 1: Extract ALL orders (full load)
→ Load 10,000 rows to Snowflake
→ Snowflake now has MAX(updated_at) = 2024-01-15T23:59:59Z
Run 2: Query MongoDB for orders WHERE updated_at >= 2024-01-15T23:59:59Z
→ Extract only 47 changed/new records
→ MERGE INTO updates matched rows, inserts new ones
→ No duplicates, no full table scans
This is the standard pattern for keeping an operational source in sync with an analytics warehouse. The MERGE statement handles both inserts and updates in one atomic operation.
snowsync/
├── dags/
│ └── snowsync_dag.py # Airflow DAG (4 tasks)
├── src/
│ ├── connectors/
│ │ ├── mongodb_connector.py # Extract from MongoDB + MockConnector
│ │ └── snowflake_connector.py # Load to Snowflake + MockConnector
│ ├── transformations/
│ │ └── normalize.py # Type coercion, business rules, derived cols
│ └── loaders/
│ └── snowflake_loader.py # Orchestrates ETL + DDL management
├── tests/
│ └── test_transformations.py # 20+ unit + integration tests
├── .env.example # Environment variable template
├── docker-compose.yml # Airflow + Postgres setup
└── requirements.txt
git clone https://github.com/YOUR_USERNAME/snowsync.git
cd snowsync
python -m venv venv && source venv/bin/activate # Windows: venv\Scripts\activate
pip install -r requirements.txt
pytest tests/ -vAll 20+ tests use mock connectors — no Snowflake or MongoDB required.
# 1. Copy environment template
cp .env.example .env
# 2. Fill in your credentials
# - Snowflake: free 30-day trial at snowflake.com
# - MongoDB: free Atlas cluster at mongodb.com/atlas
# 3. Start Airflow
docker compose up -d
# 4. Open Airflow UI → trigger snowsync_mongo_to_snowflake DAG
open http://localhost:8080
# Username: admin | Password: admin
# 5. Monitor logs in real time
docker compose logs -f airflow-schedulerCREATE TABLE IF NOT EXISTS RAW_ORDERS (
MONGO_ID VARCHAR(256), -- Original MongoDB ObjectId
ORDER_ID VARCHAR(256) PK, -- Natural key for MERGE
CUSTOMER_ID VARCHAR(256),
CUSTOMER_NAME VARCHAR(512),
CUSTOMER_EMAIL VARCHAR(512),
STATUS VARCHAR(64), -- normalized: pending/completed/shipped/...
TOTAL_AMOUNT FLOAT,
CURRENCY VARCHAR(8), -- normalized: USD/EUR/GBP/...
ITEMS VARCHAR(16000), -- JSON string (queryable with PARSE_JSON)
SHIPPING_CITY VARCHAR(256),
SHIPPING_COUNTRY VARCHAR(64),
CREATED_AT VARCHAR(64), -- ISO 8601 UTC
UPDATED_AT VARCHAR(64), -- ISO 8601 UTC (watermark column)
_SYNCED_AT VARCHAR(64), -- Pipeline audit column
IS_HIGH_VALUE BOOLEAN, -- Derived: total_amount > 500
ORDER_MONTH VARCHAR(8), -- Derived: YYYY-MM
CUSTOMER_DOMAIN VARCHAR(256) -- Derived: email domain
);- Incremental load patterns: watermark-based sync using MAX(updated_at) to avoid full table scans
- MongoDB document flattening: handling nested dicts and arrays for tabular storage
- Snowflake MERGE: upsert semantics — update matched rows, insert new — in one atomic SQL statement
- Airflow XCom: passing metrics between tasks without shared state
- Mock connectors: writing testable ETL code that doesn't require live cloud services in CI
- Schema normalization: converting semi-structured NoSQL data into typed relational columns
MIT