Task 1: ETL Pipeline - Flow & Pseudocode
Overview
This document provides a high-level pseudocode representation and visual diagrams of the transaction ingestion ETL pipeline. The full implementation is available in the appendix.
Note: This reflects the PySpark implementation (recommended for production). The Pandas implementation (for development/testing) has some stubbed features (quarantine history check).
For detailed, comprehensive diagrams showing all steps and decision points, see Appendix A: Complete ETL Pipeline Diagrams.
ETL Pipeline Flow
This diagram shows the complete ETL flow with all decision points and validation steps:
Detailed Flow: See High-Level Data Flow in Appendix A for additional diagram variations.
Main ETL Flow (Pseudocode)
PROCEDURE ProcessTransactions();
BEGIN
{ 1. Read CSV from S3 (Bronze Layer) }
raw_data := ReadCSVFromS3(bronze_layer_path);
{ 2. Enrich Metadata }
enriched_data := EnrichMetadata(raw_data);
{ Add row_hash, source_file_id, attempt_count, ingestion_timestamp to each row }
{ 3. Loop Prevention }
condemned_rows := CheckLoopPrevention(enriched_data);
{ Check for duplicate transactions, previously failed rows, and rows exceeding retry limits }
{ Auto-condemn any matching rows (no retries) }
{ 4. Validate Rows }
validation_result := ValidateRows(enriched_data);
{ Check schema, required fields, currency codes, data types, and timestamps }
{ Flag invalid rows for quarantine }
{ 5. Circuit Breaker Check }
IF CircuitBreakerThresholdExceeded() THEN
BEGIN
HaltPipeline('Too many errors detected - human intervention required');
EXIT;
END
ELSE
BEGIN
{ Continue processing }
END;
{ 6. Transform Data }
transformed_data := TransformData(validation_result.valid_rows);
{ Add partition columns (year, month) from transaction timestamps }
{ 7. Split Data }
SplitData(transformed_data, valid_rows, invalid_rows, condemned_rows);
{ Separate into valid (Silver), invalid (quarantine), condemned (no retries) }
{ 8. Write Output }
WriteToSilverLayer(valid_rows); { Partitioned Parquet }
WriteToQuarantine(invalid_rows); { With error details }
WriteToCondemnedLayer(condemned_rows);
{ 9. Write Success Marker }
WriteSuccessMarker(run_metrics);
{ Create _SUCCESS file with run metrics }
{ 10. Publish Metrics }
PublishMetricsToCloudWatch(run_metrics);
{ Send metrics to CloudWatch for monitoring }
{ 11. Human Approval Required }
RequireHumanApproval();
{ Approval needed before promoting data to production }
END;
Detailed pseudocode: See Complete ETL Pseudocode in Appendix B for the full implementation details including all helper functions, validation logic, S3 operations, and error handling.
High-Level Data Flow
This simplified diagram shows the main stages of the ETL pipeline:
Validation Process
The validation process consists of multiple checks that can result in quarantine or condemnation:
Detailed Validation Flow: See Detailed Validation Flow in Appendix A for the complete validation diagram with all error types and handling.
Data Lake Structure
The ETL pipeline writes to three main layers in S3:
Detailed S3 Structure: See S3 Storage Structure in Appendix A for the complete directory structure with example paths.
Component Interaction
The ETL pipeline interacts with several AWS services:
Detailed Component Interaction: See Component Interaction in Appendix A for the complete sequence diagram with all interactions.
Error Handling
The pipeline implements comprehensive error handling with try-catch blocks and graceful degradation:
Detailed Error Handling: See Error Handling & Resilience in Appendix A for the complete error handling diagram.
Data Quality Metrics
Metrics are calculated and published to multiple destinations:
Detailed Metrics Flow: See Data Quality Metrics Flow in Appendix A for the complete metrics diagram.
Key Design Decisions
- Run Isolation: Each ETL run writes to a unique
run_idpath to prevent data corruption during retries - Metadata Enrichment: All rows receive tracking metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) for loop prevention
- Loop Prevention: TransactionID deduplication (Silver layer scan), duplicate detection in quarantine history, attempt limits (max 3), and circuit breaker (>100 same errors/hour) prevent infinite retry loops
- Partitioning: Valid data partitioned by
yearandmonthfor query performance - Quarantine: Invalid rows are never dropped; they're preserved in quarantine with error details and retry tracking
- Condemned Layer: Rows exceeding max attempts (≤3 retries allowed, condemned after 3rd failure) or exact duplicates are moved to condemned layer (no automatic retries). Human review and approval required before reprocessing or deletion.
- Idempotency: Run isolation via
run_idensures safe reruns without overwriting previous outputs - Metadata: All runs include
_SUCCESSmarker with metrics for monitoring and lineage
Key Components
Loop Prevention
- TransactionID Deduplication: Checks Silver layer for existing transactions
- Quarantine History Check: Prevents reprocessing failed rows
- Attempt Limit: Maximum 3 retries before condemnation
Validation Steps
- Schema/Null Check: Required fields present
- Currency Validation: ISO-4217 currency codes
- Type Check: Numeric amount validation
- Timestamp Parse: Valid date/time format
- Business Rules: Duplicate account/date detection
Circuit Breaker
- Monitors quarantine rate
- Halts pipeline if threshold exceeded (>100 same errors/hour)
- Requires human intervention
Output Layers
- Silver: Validated, partitioned Parquet files
- Quarantine: Invalid rows with error details and retry tracking
- Condemned: Rows that cannot be retried (no automatic retries)
Metadata Enrichment
All rows receive tracking metadata before validation:
row_hash(SHA256): Computed from all column values for exact duplicate detectionsource_file_id: Extracted from S3 path or generated identifierattempt_count: Starts at 0, incremented on retries, loaded from quarantine historyingestion_timestamp: First ingestion time (preserved across retries for audit trail)
Monitoring & Observability
CloudWatch Metrics: Published to namespace Ohpen/ETL:
- Volume Metrics: InputRows, ValidRows, QuarantinedRows, CondemnedRows
- Quality Metrics: QuarantineRate, error type distribution
- Loop Prevention Metrics: AvgAttemptCount, AutoCondemnationRate
- Performance Metrics: DurationSeconds
CloudWatch Logs: Structured JSON logging with run_id, timestamp, level, message for CloudWatch Logs Insights queries.
Local Testing: CloudWatch automatically disabled when S3_ENDPOINT_URL is set (MinIO) or DISABLE_CLOUDWATCH=true.
Error Handling: CloudWatch publishing failures do not fail the ETL job (logged as warnings).
S3 Compatibility
- Reads use
boto3 - Partitioned Parquet writes use
s3fsfor dataset writes to AWS S3
Quarantine Metadata
Quarantine includes full audit trail:
ingest_date,run_id,ingest_time,source_file,row_hash,source_file_idattempt_count,retry_history(JSON array),ingestion_timestampvalidation_error(error type)
Condemned Layer
- Path:
quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/ - Retention: 7-year retention for compliance/audit
- Lifecycle: Transitions to Glacier after 5 years, deletion after 7 years requires human approval
- Condemnation reasons:
DUPLICATE_FAILURE,MAX_ATTEMPTS,DUPLICATE_TRANSACTION_ID - Human approval required for reprocessing or deletion (see Human Validation Policy)
See Also
Task 1 Documentation
- Complete ETL Diagrams - All detailed diagrams
- Complete ETL Pseudocode - Detailed pseudocode with all helper functions
- Full ETL Implementation - Complete implementation code
- Assumptions and Edge Cases - Design assumptions and edge case handling
- Human Validation Policy - Approval workflows referenced in the pipeline
Related Tasks
- Data Lake Architecture - Where this ETL writes validated data
- SQL Query - Example query on processed data
- CI/CD Workflow - How this ETL is deployed
Technical Documentation
- Testing Guide - Comprehensive testing documentation
- Testing Quick Start - How to run and validate tests