Skip to main content

Appendix B: Complete ETL Pseudocode

This appendix contains moderate-detail pseudocode for the ETL pipeline. For a simplified high-level overview, see the main ETL Flow & Pseudocode document.

Note: This pseudocode reflects the PySpark implementation (recommended for production). The Pandas implementation (for development/testing) has some stubbed features (quarantine history check).


Main ETL Flow

PROCEDURE Main();
BEGIN
{ Initialize run with unique run_id and timestamps }
run_id := GenerateRunId();
InitializeS3Client();

{ Step 1: Read raw CSV from Bronze layer }
raw_data := ReadCSVFromS3(bronze_bucket, bronze_key);

{ Step 2: Enrich with metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) }
enriched_data := EnrichMetadata(raw_data);

{ Step 3: Loop Prevention - check for duplicates and retry limits }
loop_result := ApplyLoopPrevention(enriched_data);
{ Returns: condemned_rows (auto-condemned), continue_rows (for validation) }

{ Step 4: Validate remaining rows (schema, currency, types, timestamps) }
validation_result := ValidateRows(loop_result.continue_rows);

{ Step 5: Circuit Breaker - halt if >100 same errors/hour }
IF CircuitBreakerThresholdExceeded(validation_result.error_counts) THEN
HaltPipeline('Too many errors - human intervention required');

{ Step 6: Transform valid data - add partition keys (year, month) }
transformed_data := AddPartitionKeys(validation_result.valid_rows);

{ Step 7: Write outputs }
WriteToSilverLayer(transformed_data, run_id); { Partitioned Parquet }
WriteToQuarantine(validation_result.invalid_rows, run_id); { With error details }
WriteToCondemnedLayer(loop_result.condemned_rows, run_id);

{ Step 8: Write success marker and publish metrics }
metrics := CalculateMetrics(raw_data, transformed_data, validation_result, loop_result);
WriteSuccessMarker(metrics, run_id);
PublishCloudWatchMetrics(metrics);

{ Step 9: Require human approval before production promotion }
LogInfo('ETL completed - human approval required');
END;

Helper Functions

EnrichMetadata

Adds tracking metadata to each row for loop prevention and lineage.

FUNCTION EnrichMetadata(dataframe: DATAFRAME; source_file: STRING; ingest_time: TIMESTAMP): DATAFRAME;
BEGIN
FOR each row IN dataframe DO
BEGIN
row['row_hash'] := SHA256Hash(row); { Unique identifier for duplicate detection }
row['source_file_id'] := ExtractFileId(source_file);
row['attempt_count'] := 0; { Loaded from quarantine history for retries }
row['ingestion_timestamp'] := ingest_time;
END;
RETURN dataframe;
END;

ApplyLoopPrevention

Prevents infinite retry loops by checking for duplicates and retry limits. Returns condemned rows (no retries) and rows to continue processing.

FUNCTION ApplyLoopPrevention(dataframe: DATAFRAME): LOOP_PREVENTION_RESULT;
BEGIN
{ Check 1: TransactionID deduplication from Silver layer }
IF silver_layer_exists THEN
existing_transactions := ScanSilverLayerForTransactionIDs();
FOR each row WHERE (TransactionID, event_date) IN existing_transactions DO
MarkAsCondemned(row, 'DUPLICATE_TRANSACTION_ID');

{ Check 2: Duplicate detection in quarantine history }
quarantine_history := LoadQuarantineHistory(row_hashes);
FOR each row WHERE row_hash IN quarantine_history DO
MarkAsCondemned(row, 'DUPLICATE_FAILURE');

{ Check 3: Attempt limit (max 3 retries) }
FOR each row WHERE attempt_count >= 3 DO
MarkAsCondemned(row, 'MAX_ATTEMPTS');

RETURN SplitIntoCondemnedAndContinue(dataframe);
END;

ValidateRows

Validates data quality: schema checks, currency codes, data types, timestamps, and business logic duplicates.

FUNCTION ValidateRows(dataframe: DATAFRAME): VALIDATION_RESULT;
BEGIN
FOR each row IN dataframe DO
BEGIN
{ Schema: Check required fields are not null }
IF AnyRequiredFieldIsNull(row) THEN
MarkAsInvalid(row, 'NULL_VALUE_ERROR');

{ Currency: Must be in allowed list (EUR, USD, GBP, etc.) }
ELSE IF Currency NOT IN allowed_currencies THEN
MarkAsInvalid(row, 'CURRENCY_ERROR');

{ Type: TransactionAmount must be numeric }
ELSE IF NOT IsNumeric(TransactionAmount) THEN
MarkAsInvalid(row, 'TYPE_ERROR');

{ Timestamp: Must be parseable }
ELSE IF ParseTimestamp(TransactionTimestamp) = NULL THEN
MarkAsInvalid(row, 'TIMESTAMP_ERROR');

{ Business Logic: No duplicate CustomerID + date combinations }
ELSE IF DuplicateAccountDateExists(row) THEN
MarkAsInvalid(row, 'DUPLICATE_ACCOUNT_DATE');

{ Increment attempt_count for invalid rows }
IF row is invalid THEN
row['attempt_count'] := row['attempt_count'] + 1;
END;

RETURN SplitIntoValidAndInvalid(dataframe);
END;

AddPartitionKeys

Extracts year and month from transaction timestamps for partitioning.

FUNCTION AddPartitionKeys(dataframe: DATAFRAME): DATAFRAME;
BEGIN
FOR each row IN dataframe DO
BEGIN
row['year'] := Year(parsed_timestamp);
row['month'] := ZeroPad(Month(parsed_timestamp), 2);
END;
RETURN dataframe;
END;

S3 Operations

ReadCSVFromS3

Reads CSV file from S3 and returns as dataframe.

FUNCTION ReadCSVFromS3(bucket: STRING; key: STRING): DATAFRAME;
BEGIN
TRY
content := s3_client.GetObject(bucket, key);
RETURN ParseCSV(content);
EXCEPT
ON error DO
BEGIN
LogError('Failed to read from S3: ' + error);
RAISE error;
END;
END;
END;

WriteParquetToS3

Writes dataframe as partitioned Parquet files to S3.

PROCEDURE WriteParquetToS3(dataframe: DATAFRAME; bucket: STRING; prefix: STRING; partition_cols: ARRAY);
BEGIN
IF IsEmpty(dataframe) THEN EXIT;

TRY
ConvertToParquet(dataframe);
WriteToS3(bucket, prefix, partition_cols);
EXCEPT
ON error DO
BEGIN
LogError('Failed to write Parquet: ' + error);
RAISE error;
END;
END;
END;

WriteSuccessMarker

Creates _SUCCESS file with run metrics for monitoring and lineage.

PROCEDURE WriteSuccessMarker(metrics: METRICS; run_id: STRING);
BEGIN
content := JSONSerialize(metrics);
s3_client.PutObject(bucket, prefix + '/_SUCCESS', content);
END;

Monitoring & Metrics

PublishCloudWatchMetrics

Publishes key metrics to CloudWatch for monitoring.

PROCEDURE PublishCloudWatchMetrics(metrics: METRICS);
BEGIN
IF CloudWatchDisabled() THEN EXIT; { Local testing }

TRY
metric_data := [
('InputRows', metrics.input_rows),
('ValidRows', metrics.valid_rows),
('QuarantinedRows', metrics.quarantined_rows),
('CondemnedRows', metrics.condemned_rows),
('QuarantineRate', CalculateRate(metrics.quarantined_rows, metrics.input_rows)),
('DurationSeconds', metrics.duration_seconds)
];
cloudwatch_client.PutMetricData('Ohpen/ETL', metric_data);
EXCEPT
ON error DO
LogWarning('Failed to publish metrics: ' + error);
{ Don't fail job if metrics fail }
END;
END;