Skip to main content

Task 1: ETL Pipeline - Flow & Pseudocode

Overview

This document provides a high-level pseudocode representation and visual diagrams of the transaction ingestion ETL pipeline. The full implementation is available in the appendix.

Note: This reflects the PySpark implementation (recommended for production). The Pandas implementation (for development/testing) has some stubbed features (quarantine history check).

For detailed, comprehensive diagrams showing all steps and decision points, see Appendix A: Complete ETL Pipeline Diagrams.


ETL Pipeline Flow

This diagram shows the complete ETL flow with all decision points and validation steps:

Detailed Flow: See High-Level Data Flow in Appendix A for additional diagram variations.


Main ETL Flow (Pseudocode)

PROCEDURE ProcessTransactions();
BEGIN
{ 1. Read CSV from S3 (Bronze Layer) }
raw_data := ReadCSVFromS3(bronze_layer_path);

{ 2. Enrich Metadata }
enriched_data := EnrichMetadata(raw_data);
{ Add row_hash, source_file_id, attempt_count, ingestion_timestamp to each row }

{ 3. Loop Prevention }
condemned_rows := CheckLoopPrevention(enriched_data);
{ Check for duplicate transactions, previously failed rows, and rows exceeding retry limits }
{ Auto-condemn any matching rows (no retries) }

{ 4. Validate Rows }
validation_result := ValidateRows(enriched_data);
{ Check schema, required fields, currency codes, data types, and timestamps }
{ Flag invalid rows for quarantine }

{ 5. Circuit Breaker Check }
IF CircuitBreakerThresholdExceeded() THEN
BEGIN
HaltPipeline('Too many errors detected - human intervention required');
EXIT;
END
ELSE
BEGIN
{ Continue processing }
END;

{ 6. Transform Data }
transformed_data := TransformData(validation_result.valid_rows);
{ Add partition columns (year, month) from transaction timestamps }

{ 7. Split Data }
SplitData(transformed_data, valid_rows, invalid_rows, condemned_rows);
{ Separate into valid (Silver), invalid (quarantine), condemned (no retries) }

{ 8. Write Output }
WriteToSilverLayer(valid_rows); { Partitioned Parquet }
WriteToQuarantine(invalid_rows); { With error details }
WriteToCondemnedLayer(condemned_rows);

{ 9. Write Success Marker }
WriteSuccessMarker(run_metrics);
{ Create _SUCCESS file with run metrics }

{ 10. Publish Metrics }
PublishMetricsToCloudWatch(run_metrics);
{ Send metrics to CloudWatch for monitoring }

{ 11. Human Approval Required }
RequireHumanApproval();
{ Approval needed before promoting data to production }
END;

Detailed pseudocode: See Complete ETL Pseudocode in Appendix B for the full implementation details including all helper functions, validation logic, S3 operations, and error handling.


High-Level Data Flow

This simplified diagram shows the main stages of the ETL pipeline:


Validation Process

The validation process consists of multiple checks that can result in quarantine or condemnation:

Detailed Validation Flow: See Detailed Validation Flow in Appendix A for the complete validation diagram with all error types and handling.


Data Lake Structure

The ETL pipeline writes to three main layers in S3:

Detailed S3 Structure: See S3 Storage Structure in Appendix A for the complete directory structure with example paths.


Component Interaction

The ETL pipeline interacts with several AWS services:

Detailed Component Interaction: See Component Interaction in Appendix A for the complete sequence diagram with all interactions.


Error Handling

The pipeline implements comprehensive error handling with try-catch blocks and graceful degradation:

Detailed Error Handling: See Error Handling & Resilience in Appendix A for the complete error handling diagram.


Data Quality Metrics

Metrics are calculated and published to multiple destinations:

Detailed Metrics Flow: See Data Quality Metrics Flow in Appendix A for the complete metrics diagram.


Key Design Decisions

  1. Run Isolation: Each ETL run writes to a unique run_id path to prevent data corruption during retries
  2. Metadata Enrichment: All rows receive tracking metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) for loop prevention
  3. Loop Prevention: TransactionID deduplication (Silver layer scan), duplicate detection in quarantine history, attempt limits (max 3), and circuit breaker (>100 same errors/hour) prevent infinite retry loops
  4. Partitioning: Valid data partitioned by year and month for query performance
  5. Quarantine: Invalid rows are never dropped; they're preserved in quarantine with error details and retry tracking
  6. Condemned Layer: Rows exceeding max attempts (≤3 retries allowed, condemned after 3rd failure) or exact duplicates are moved to condemned layer (no automatic retries). Human review and approval required before reprocessing or deletion.
  7. Idempotency: Run isolation via run_id ensures safe reruns without overwriting previous outputs
  8. Metadata: All runs include _SUCCESS marker with metrics for monitoring and lineage

Key Components

Loop Prevention

  • TransactionID Deduplication: Checks Silver layer for existing transactions
  • Quarantine History Check: Prevents reprocessing failed rows
  • Attempt Limit: Maximum 3 retries before condemnation

Validation Steps

  1. Schema/Null Check: Required fields present
  2. Currency Validation: ISO-4217 currency codes
  3. Type Check: Numeric amount validation
  4. Timestamp Parse: Valid date/time format
  5. Business Rules: Duplicate account/date detection

Circuit Breaker

  • Monitors quarantine rate
  • Halts pipeline if threshold exceeded (>100 same errors/hour)
  • Requires human intervention

Output Layers

  • Silver: Validated, partitioned Parquet files
  • Quarantine: Invalid rows with error details and retry tracking
  • Condemned: Rows that cannot be retried (no automatic retries)

Metadata Enrichment

All rows receive tracking metadata before validation:

  • row_hash (SHA256): Computed from all column values for exact duplicate detection
  • source_file_id: Extracted from S3 path or generated identifier
  • attempt_count: Starts at 0, incremented on retries, loaded from quarantine history
  • ingestion_timestamp: First ingestion time (preserved across retries for audit trail)

Monitoring & Observability

CloudWatch Metrics: Published to namespace Ohpen/ETL:

  • Volume Metrics: InputRows, ValidRows, QuarantinedRows, CondemnedRows
  • Quality Metrics: QuarantineRate, error type distribution
  • Loop Prevention Metrics: AvgAttemptCount, AutoCondemnationRate
  • Performance Metrics: DurationSeconds

CloudWatch Logs: Structured JSON logging with run_id, timestamp, level, message for CloudWatch Logs Insights queries.

Local Testing: CloudWatch automatically disabled when S3_ENDPOINT_URL is set (MinIO) or DISABLE_CLOUDWATCH=true.

Error Handling: CloudWatch publishing failures do not fail the ETL job (logged as warnings).

S3 Compatibility

  • Reads use boto3
  • Partitioned Parquet writes use s3fs for dataset writes to AWS S3

Quarantine Metadata

Quarantine includes full audit trail:

  • ingest_date, run_id, ingest_time, source_file, row_hash, source_file_id
  • attempt_count, retry_history (JSON array), ingestion_timestamp
  • validation_error (error type)

Condemned Layer

  • Path: quarantine/{domain}/{dataset}/condemned/ingest_date={YYYY-MM-DD}/run_id={...}/
  • Retention: 7-year retention for compliance/audit
  • Lifecycle: Transitions to Glacier after 5 years, deletion after 7 years requires human approval
  • Condemnation reasons: DUPLICATE_FAILURE, MAX_ATTEMPTS, DUPLICATE_TRANSACTION_ID
  • Human approval required for reprocessing or deletion (see Human Validation Policy)

See Also

Task 1 Documentation

Technical Documentation