Skip to main content

Appendix: ETL Complete Reference

This appendix contains all ETL-related reference materials: diagrams, pseudocode, and complete implementation code. For simplified, abstracted versions, see the main ETL Flow & Pseudocode document.

Note: These materials reflect the PySpark implementation (recommended for production). The Pandas implementation (for development/testing) has some stubbed features (quarantine history check).


Table of Contents


Part 1: ETL Diagrams

This section contains all detailed diagrams for the ETL pipeline.

High-Level Data Flow


Detailed Validation Flow

Note: Reflects PySpark implementation (production). Pandas version has stubbed quarantine history check.


S3 Storage Structure


Component Interaction


Error Handling & Resilience


Data Quality Metrics Flow


Part 2: ETL Pseudocode

This section contains moderate-detail pseudocode for the ETL pipeline.

Main ETL Flow

PROCEDURE Main();
BEGIN
{ Initialize run with unique run_id and timestamps }
run_id := GenerateRunId();
InitializeS3Client();

{ Step 1: Read raw CSV from Bronze layer }
raw_data := ReadCSVFromS3(bronze_bucket, bronze_key);

{ Step 2: Enrich with metadata (row_hash, source_file_id, attempt_count, ingestion_timestamp) }
enriched_data := EnrichMetadata(raw_data);

{ Step 3: Loop Prevention - check for duplicates and retry limits }
loop_result := ApplyLoopPrevention(enriched_data);
{ Returns: condemned_rows (auto-condemned), continue_rows (for validation) }

{ Step 4: Validate remaining rows (schema, currency, types, timestamps) }
validation_result := ValidateRows(loop_result.continue_rows);

{ Step 5: Circuit Breaker - halt if >100 same errors/hour }
IF CircuitBreakerThresholdExceeded(validation_result.error_counts) THEN
HaltPipeline('Too many errors - human intervention required');

{ Step 6: Transform valid data - add partition keys (year, month) }
transformed_data := AddPartitionKeys(validation_result.valid_rows);

{ Step 7: Write outputs }
WriteToSilverLayer(transformed_data, run_id); { Partitioned Parquet }
WriteToQuarantine(validation_result.invalid_rows, run_id); { With error details }
WriteToCondemnedLayer(loop_result.condemned_rows, run_id);

{ Step 8: Write success marker and publish metrics }
metrics := CalculateMetrics(raw_data, transformed_data, validation_result, loop_result);
WriteSuccessMarker(metrics, run_id);
PublishCloudWatchMetrics(metrics);

{ Step 9: Require human approval before production promotion }
LogInfo('ETL completed - human approval required');
END;

Helper Functions

EnrichMetadata

Adds tracking metadata to each row for loop prevention and lineage.

FUNCTION EnrichMetadata(dataframe: DATAFRAME; source_file: STRING; ingest_time: TIMESTAMP): DATAFRAME;
BEGIN
FOR each row IN dataframe DO
BEGIN
row['row_hash'] := SHA256Hash(row); { Unique identifier for duplicate detection }
row['source_file_id'] := ExtractFileId(source_file);
row['attempt_count'] := 0; { Loaded from quarantine history for retries }
row['ingestion_timestamp'] := ingest_time;
END;
RETURN dataframe;
END;

ApplyLoopPrevention

Prevents infinite retry loops by checking for duplicates and retry limits. Returns condemned rows (no retries) and rows to continue processing.

FUNCTION ApplyLoopPrevention(dataframe: DATAFRAME): LOOP_PREVENTION_RESULT;
BEGIN
{ Check 1: TransactionID deduplication from Silver layer }
IF silver_layer_exists THEN
existing_transactions := ScanSilverLayerForTransactionIDs();
FOR each row WHERE (TransactionID, event_date) IN existing_transactions DO
MarkAsCondemned(row, 'DUPLICATE_TRANSACTION_ID');

{ Check 2: Duplicate detection in quarantine history }
quarantine_history := LoadQuarantineHistory(row_hashes);
FOR each row WHERE row_hash IN quarantine_history DO
MarkAsCondemned(row, 'DUPLICATE_FAILURE');

{ Check 3: Attempt limit (max 3 retries) }
FOR each row WHERE attempt_count >= 3 DO
MarkAsCondemned(row, 'MAX_ATTEMPTS');

RETURN SplitIntoCondemnedAndContinue(dataframe);
END;

ValidateRows

Validates data quality: schema checks, currency codes, data types, timestamps, and business logic duplicates.

FUNCTION ValidateRows(dataframe: DATAFRAME): VALIDATION_RESULT;
BEGIN
FOR each row IN dataframe DO
BEGIN
{ Schema: Check required fields are not null }
IF AnyRequiredFieldIsNull(row) THEN
MarkAsInvalid(row, 'NULL_VALUE_ERROR');

{ Currency: Must be in allowed list (EUR, USD, GBP, etc.) }
ELSE IF Currency NOT IN allowed_currencies THEN
MarkAsInvalid(row, 'CURRENCY_ERROR');

{ Type: TransactionAmount must be numeric }
ELSE IF NOT IsNumeric(TransactionAmount) THEN
MarkAsInvalid(row, 'TYPE_ERROR');

{ Timestamp: Must be parseable }
ELSE IF ParseTimestamp(TransactionTimestamp) = NULL THEN
MarkAsInvalid(row, 'TIMESTAMP_ERROR');

{ Business Logic: No duplicate CustomerID + date combinations }
ELSE IF DuplicateAccountDateExists(row) THEN
MarkAsInvalid(row, 'DUPLICATE_ACCOUNT_DATE');

{ Increment attempt_count for invalid rows }
IF row is invalid THEN
row['attempt_count'] := row['attempt_count'] + 1;
END;

RETURN SplitIntoValidAndInvalid(dataframe);
END;

AddPartitionKeys

Extracts year and month from transaction timestamps for partitioning.

FUNCTION AddPartitionKeys(dataframe: DATAFRAME): DATAFRAME;
BEGIN
FOR each row IN dataframe DO
BEGIN
row['year'] := Year(parsed_timestamp);
row['month'] := ZeroPad(Month(parsed_timestamp), 2);
END;
RETURN dataframe;
END;

S3 Operations

ReadCSVFromS3

Reads CSV file from S3 and returns as dataframe.

FUNCTION ReadCSVFromS3(bucket: STRING; key: STRING): DATAFRAME;
BEGIN
TRY
content := s3_client.GetObject(bucket, key);
RETURN ParseCSV(content);
EXCEPT
ON error DO
BEGIN
LogError('Failed to read from S3: ' + error);
RAISE error;
END;
END;
END;

WriteParquetToS3

Writes dataframe as partitioned Parquet files to S3.

PROCEDURE WriteParquetToS3(dataframe: DATAFRAME; bucket: STRING; prefix: STRING; partition_cols: ARRAY);
BEGIN
IF IsEmpty(dataframe) THEN EXIT;

TRY
ConvertToParquet(dataframe);
WriteToS3(bucket, prefix, partition_cols);
EXCEPT
ON error DO
BEGIN
LogError('Failed to write Parquet: ' + error);
RAISE error;
END;
END;
END;

WriteSuccessMarker

Creates _SUCCESS file with run metrics for monitoring and lineage.

PROCEDURE WriteSuccessMarker(metrics: METRICS; run_id: STRING);
BEGIN
content := JSONSerialize(metrics);
s3_client.PutObject(bucket, prefix + '/_SUCCESS', content);
END;

Monitoring & Metrics

PublishCloudWatchMetrics

Publishes key metrics to CloudWatch for monitoring.

PROCEDURE PublishCloudWatchMetrics(metrics: METRICS);
BEGIN
IF CloudWatchDisabled() THEN EXIT; { Local testing }

TRY
metric_data := [
('InputRows', metrics.input_rows),
('ValidRows', metrics.valid_rows),
('QuarantinedRows', metrics.quarantined_rows),
('CondemnedRows', metrics.condemned_rows),
('QuarantineRate', CalculateRate(metrics.quarantined_rows, metrics.input_rows)),
('DurationSeconds', metrics.duration_seconds)
];
cloudwatch_client.PutMetricData('Ohpen/ETL', metric_data);
EXCEPT
ON error DO
LogWarning('Failed to publish metrics: ' + error);
{ Don't fail job if metrics fail }
END;
END;

Part 3: ETL Implementation Code

This section contains the complete implementation of the transaction ingestion ETL pipeline.

File: src/etl/ingest_transactions.py

Note: The implementation has been enhanced with:

  • Metadata enrichment (row_hash, source_file_id, attempt_count, ingestion_timestamp)
  • Loop prevention (duplicate detection in quarantine history, attempt limits, circuit breaker)
  • Condemned layer for rows exceeding max attempts or exact duplicates
  • Updated error types (SCHEMA_ERROR, NULL_VALUE_ERROR, TYPE_ERROR, CURRENCY_ERROR, TIMESTAMP_ERROR, DUPLICATE_FAILURE, MAX_ATTEMPTS)
  • Enhanced CloudWatch metrics (loop prevention metrics, condemned rows)

For the most up-to-date code, see src/etl/ingest_transactions.py.


import argparse
import datetime
import hashlib
import json
import logging
import os
import sys
from collections import defaultdict
from io import StringIO

import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Configure logging with structured JSON support for CloudWatch Logs Insights

class StructuredFormatter(logging.Formatter):
"""Formatter that outputs structured JSON logs for CloudWatch Logs Insights"""
def format(self, record):
log_data = {
'timestamp': datetime.datetime.utcnow().isoformat() + 'Z',
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
}
# Add any extra fields passed via extra= parameter
if hasattr(record, 'run_id'):
log_data['run_id'] = record.run_id
if hasattr(record, 'metric_name'):
log_data['metric_name'] = record.metric_name
if hasattr(record, 'metric_value'):
log_data['metric_value'] = record.metric_value
return json.dumps(log_data)

# Configure logging

log_handler = logging.StreamHandler(sys.stdout)
log_handler.setFormatter(StructuredFormatter())
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(log_handler)

# ISO-4217 Currency Allowlist (Subset for case study)

ALLOWED_CURRENCIES = {
'EUR', 'USD', 'GBP', 'JPY', 'AUD', 'CAD', 'CHF', 'CNY', 'HKD', 'NZD'
}

REQUIRED_COLUMNS = ['TransactionID', 'CustomerID', 'TransactionAmount', 'Currency', 'TransactionTimestamp']

# Error type constants matching onepager
ERROR_SCHEMA = 'SCHEMA_ERROR'
ERROR_NULL_VALUE = 'NULL_VALUE_ERROR'
ERROR_TYPE = 'TYPE_ERROR'
ERROR_CURRENCY = 'CURRENCY_ERROR'
ERROR_TIMESTAMP = 'TIMESTAMP_ERROR'
ERROR_DUPLICATE_FAILURE = 'DUPLICATE_FAILURE'
ERROR_MAX_ATTEMPTS = 'MAX_ATTEMPTS'

# Loop prevention constants
MAX_ATTEMPTS = 3
CIRCUIT_BREAKER_THRESHOLD = 100 # Same errors per hour
CIRCUIT_BREAKER_WINDOW_HOURS = 1

def get_s3_client(endpoint_url=None):
"""
Returns a boto3 S3 client.
If endpoint_url is provided (e.g., for local testing), it uses that.
"""
return boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY')
)

def read_csv_from_s3(s3_client, bucket, key):
"""Reads CSV from S3 into a pandas DataFrame."""
try:
logger.info(f"Reading from s3://{bucket}/{key}")
response = s3_client.get_object(Bucket=bucket, Key=key)
content = response['Body'].read().decode('utf-8')
return pd.read_csv(StringIO(content))
except Exception as e:
logger.error(f"Failed to read from S3: {e}")
raise

def compute_row_hash(row, columns):
"""
Compute SHA256 hash of row content for duplicate detection.

Args:
row: pandas Series representing a single row
columns: list of column names to include in hash

Returns:
str: SHA256 hash as hexadecimal string
"""
row_str = '|'.join([str(row[col]) if pd.notna(row[col]) else '' for col in columns])
return hashlib.sha256(row_str.encode('utf-8')).hexdigest()

def extract_source_file_id(source_file):
"""
Extract source_file_id from S3 path or generate from source_file.

Args:
source_file: Full S3 path (e.g., s3://bucket/path/file.csv)

Returns:
str: Source file identifier
"""
if source_file and source_file.startswith('s3://'):
parts = source_file.split('/')
if len(parts) > 0:
filename = parts[-1]
return filename.rsplit('.', 1)[0] if '.' in filename else filename
return source_file or 'unknown'

def enrich_metadata(df, source_file=None, ingest_time=None):
"""
Enrich dataframe with tracking metadata: row_hash, source_file_id, attempt_count, ingestion_timestamp.

Args:
df: pandas DataFrame
source_file: Source file path (optional)
ingest_time: Ingestion timestamp (optional, defaults to now)

Returns:
pandas DataFrame with enriched metadata columns
"""
df = df.copy()
df['row_hash'] = df.apply(lambda row: compute_row_hash(row, df.columns.tolist()), axis=1)
df['source_file_id'] = extract_source_file_id(source_file) if source_file else 'unknown'
if ingest_time:
df['ingestion_timestamp'] = ingest_time
else:
df['ingestion_timestamp'] = datetime.datetime.now(datetime.timezone.utc).isoformat()
df['attempt_count'] = 0
df['retry_history'] = None
return df

def check_quarantine_history(s3_client, quarantine_bucket, quarantine_prefix, row_hashes):
"""
Check quarantine history for duplicate row_hashes.
For case study: simplified implementation (returns empty dict).
In production, this would query Glue Catalog or use Athena for efficient lookup.
"""
logger.debug(f"Checking quarantine history for {len(row_hashes)} row hashes")
return {}

def check_attempt_limit(attempt_count):
"""Check if attempt_count exceeds maximum allowed attempts."""
return attempt_count >= MAX_ATTEMPTS

class CircuitBreaker:
"""Circuit breaker to halt pipeline when error threshold exceeded."""

def __init__(self, threshold=CIRCUIT_BREAKER_THRESHOLD, window_hours=CIRCUIT_BREAKER_WINDOW_HOURS):
self.threshold = threshold
self.window_hours = window_hours
self.error_counts = defaultdict(list)

def record_error(self, error_type, timestamp=None):
"""Record an error occurrence."""
if timestamp is None:
timestamp = datetime.datetime.now(datetime.timezone.utc)
self.error_counts[error_type].append(timestamp)
cutoff = timestamp - datetime.timedelta(hours=self.window_hours)
self.error_counts[error_type] = [
ts for ts in self.error_counts[error_type] if ts > cutoff
]

def check_threshold(self, error_type):
"""Check if error threshold exceeded for given error type."""
return len(self.error_counts[error_type]) >= self.threshold

def get_error_count(self, error_type):
"""Get current error count for given error type."""
return len(self.error_counts[error_type])

def validate_and_transform(df, run_id, source_file=None, ingest_time=None,
s3_client=None, quarantine_bucket=None, quarantine_prefix=None,
circuit_breaker=None):
"""
Validates the dataframe and splits it into valid and quarantine dataframes.
Writes validated data to Silver layer (Parquet), quarantines invalid rows.
Adds partition columns (year, month) to valid data for Silver layer.
"""
# Check missing columns
missing_cols = [c for c in REQUIRED_COLUMNS if c not in df.columns]
if missing_cols:
raise ValueError(f"Input missing required columns: {missing_cols}")

# Initialize validation status
df['validation_error'] = None

# Optional metadata columns (passed in from the ingestion context)
# Keep these stable in tests by only adding them when explicitly provided.
if ingest_time is not None:
df['ingest_time'] = ingest_time
if source_file is not None:
df['source_file'] = source_file

# 1. Null checks
null_mask = df[REQUIRED_COLUMNS].isnull().any(axis=1)
df.loc[null_mask, 'validation_error'] = 'Missing required fields'

# 2. Currency validation
# Only check rows that passed null checks to avoid overwriting
currency_mask = (~null_mask) & (~df['Currency'].isin(ALLOWED_CURRENCIES))
df.loc[currency_mask, 'validation_error'] = 'Invalid Currency'

# 3. Type checks (minimal, case-study safe)
# Ensure TransactionAmount is numeric. Negative values are allowed (e.g., refunds/withdrawals);
# non-numeric values are quarantined.
amount_ok_mask = (~null_mask) & (df['validation_error'].isnull())
if amount_ok_mask.any():
amount_numeric_all = pd.to_numeric(df['TransactionAmount'], errors='coerce')
invalid_amount_mask = amount_ok_mask & (amount_numeric_all.isnull())
df.loc[invalid_amount_mask, 'validation_error'] = 'Invalid Amount'
# Normalize the column type for rows that passed the numeric check
ok_numeric_mask = amount_ok_mask & (~invalid_amount_mask)
df.loc[ok_numeric_mask, 'TransactionAmount'] = amount_numeric_all.loc[ok_numeric_mask]

# 4. Timestamp parsing
# We attempt to parse timestamps. Failures get quarantined.
# We use a temporary column for parsed datetime to avoid breaking the original for quarantine
df['parsed_timestamp'] = pd.to_datetime(df['TransactionTimestamp'], errors='coerce', utc=True)

time_mask = (~null_mask) & (df['parsed_timestamp'].isnull())
# If it was already invalid, append error; otherwise set it
df.loc[time_mask, 'validation_error'] = df.loc[time_mask, 'validation_error'].apply(
lambda x: f"{x}; Invalid Timestamp" if x else "Invalid Timestamp"
)

# 5. Duplicate detection (account/date combinations)
# Flag duplicates but don't drop - preserve for audit
# Note: This assumes CustomerID + TransactionTimestamp (date part) as the business key
# For production, this would be configurable based on business rules
valid_for_dup_check = df[df['validation_error'].isnull()].copy()
if not valid_for_dup_check.empty and 'CustomerID' in valid_for_dup_check.columns:
valid_for_dup_check['tx_date'] = valid_for_dup_check['parsed_timestamp'].dt.date
duplicate_mask = valid_for_dup_check.duplicated(
subset=['CustomerID', 'tx_date'],
keep=False
)
if duplicate_mask.any():
# Mark duplicates in original dataframe
dup_indices = valid_for_dup_check[duplicate_mask].index
for idx in dup_indices:
if df.loc[idx, 'validation_error'] is None:
df.loc[idx, 'validation_error'] = 'Duplicate account/date combination'
else:
df.loc[idx, 'validation_error'] = f"{df.loc[idx, 'validation_error']}; Duplicate account/date combination"

# Split into valid and quarantine
quarantine_df = df[df['validation_error'].notnull()].copy()
valid_df = df[df['validation_error'].isnull()].copy()

# Add partition columns to valid data (Silver layer partitioning by event time)
if not valid_df.empty:
valid_df['year'] = valid_df['parsed_timestamp'].dt.year
valid_df['month'] = valid_df['parsed_timestamp'].dt.month.astype(str).str.zfill(2)
# Drop helper column, keep original timestamp
del valid_df['parsed_timestamp']
del valid_df['validation_error']

# Add metadata to quarantine
quarantine_df['ingest_date'] = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d')
quarantine_df['run_id'] = run_id

return valid_df, quarantine_df

def write_parquet_to_s3(df, s3_client, bucket, prefix, partition_cols=None):
"""
Writes DataFrame to S3 as Parquet.
If partition_cols is provided, writes partitioned dataset using pyarrow.
"""
if df.empty:
logger.warning(f"No data to write to s3://{bucket}/{prefix}")
return

table = pa.Table.from_pandas(df)

# Buffer for writing
# Note: For strict S3 partitioning with PyArrow fs, we need s3fs.
# Use s3fs for partitioned write support which is cleaner than manual boto3 loops

import s3fs

endpoint = os.getenv('S3_ENDPOINT_URL')
key = os.getenv('AWS_ACCESS_KEY_ID')
secret = os.getenv('AWS_SECRET_ACCESS_KEY')

# Configure s3fs for AWS S3
s3_fs = s3fs.S3FileSystem(
key=key,
secret=secret,
client_kwargs={'endpoint_url': endpoint} if endpoint else {},
use_ssl=False if endpoint and endpoint.startswith('http://') else True
)

path = f"{bucket}/{prefix}"

try:
pq.write_to_dataset(
table,
root_path=path,
partition_cols=partition_cols,
filesystem=s3_fs,
existing_data_behavior='overwrite_or_ignore' # Safe because run_id makes path unique
)
logger.info(f"Successfully wrote {len(df)} rows to s3://{path}")
except Exception as e:
logger.error(f"Failed to write Parquet: {e}")
raise

def write_condemned_to_s3(df, s3_client, bucket, prefix):
"""
Writes condemned rows to S3 in condemned layer.

Args:
df: pandas DataFrame with condemned rows
s3_client: boto3 S3 client
bucket: S3 bucket
prefix: S3 prefix for condemned data
"""
if df.empty:
return
write_parquet_to_s3(df, s3_client, bucket, prefix, partition_cols=None)
logger.warning(f"Wrote {len(df)} condemned rows to s3://{bucket}/{prefix}")

def write_success_marker(s3_client, bucket, prefix, run_id, metrics):
"""Writes a _SUCCESS file with metrics JSON."""
key = f"{prefix}/_SUCCESS"
content = json.dumps(metrics, indent=2)
s3_client.put_object(Bucket=bucket, Key=key, Body=content)
logger.info(f"Wrote success marker to s3://{bucket}/{key}")

def publish_cloudwatch_metrics(metrics, namespace="Ohpen/ETL", enabled=None):
"""
Publish ETL metrics to CloudWatch (optional, disabled for local testing).

Args:
metrics: Dictionary containing metric values (input_rows, valid_rows, quarantined_rows, etc.)
namespace: CloudWatch namespace (default: "Ohpen/ETL")
enabled: Whether to publish metrics. If None, auto-detects based on environment.
Disabled if DISABLE_CLOUDWATCH=true or S3_ENDPOINT_URL is set (local testing)

Returns:
True if metrics were published, False otherwise
"""
# Auto-detect if CloudWatch should be enabled
if enabled is None:
# Disable for local testing (MinIO) or if explicitly disabled
enabled = (
os.getenv('DISABLE_CLOUDWATCH', 'false').lower() != 'true' and
os.getenv('S3_ENDPOINT_URL') is None # No endpoint URL means AWS S3
)

if not enabled:
logger.debug("CloudWatch metrics disabled (local testing or DISABLE_CLOUDWATCH=true)")
return False

try:
cloudwatch = boto3.client('cloudwatch')

# Calculate derived metrics
quarantine_rate = (
metrics['quarantined_rows'] / metrics['input_rows'] * 100
if metrics['input_rows'] > 0 else 0.0
)

# Prepare metric data
metric_data = [
{
'MetricName': 'InputRows',
'Value': metrics['input_rows'],
'Unit': 'Count',
'Timestamp': datetime.datetime.utcnow()
},
{
'MetricName': 'ValidRows',
'Value': metrics['valid_rows'],
'Unit': 'Count',
'Timestamp': datetime.datetime.utcnow()
},
{
'MetricName': 'QuarantinedRows',
'Value': metrics['quarantined_rows'],
'Unit': 'Count',
'Timestamp': datetime.datetime.utcnow()
},
{
'MetricName': 'QuarantineRate',
'Value': quarantine_rate,
'Unit': 'Percent',
'Timestamp': datetime.datetime.utcnow()
}
]

# Add condemned rows metric
if 'condemned_rows' in metrics:
metric_data.append({
'MetricName': 'CondemnedRows',
'Value': metrics['condemned_rows'],
'Unit': 'Count',
'Timestamp': datetime.datetime.utcnow()
})

# Add loop prevention metrics
if 'avg_attempt_count' in metrics:
metric_data.append({
'MetricName': 'AvgAttemptCount',
'Value': metrics['avg_attempt_count'],
'Unit': 'Count',
'Timestamp': datetime.datetime.utcnow()
})

if 'duplicate_detection_rate' in metrics:
metric_data.append({
'MetricName': 'DuplicateDetectionRate',
'Value': metrics['duplicate_detection_rate'],
'Unit': 'Percent',
'Timestamp': datetime.datetime.utcnow()
})

if 'auto_condemnation_rate' in metrics:
metric_data.append({
'MetricName': 'AutoCondemnationRate',
'Value': metrics['auto_condemnation_rate'],
'Unit': 'Percent',
'Timestamp': datetime.datetime.utcnow()
})

# Add duration if available
if 'duration_seconds' in metrics:
metric_data.append({
'MetricName': 'DurationSeconds',
'Value': metrics['duration_seconds'],
'Unit': 'Seconds',
'Timestamp': datetime.datetime.utcnow()
})

# Publish metrics (CloudWatch allows up to 20 metrics per call)
cloudwatch.put_metric_data(
Namespace=namespace,
MetricData=metric_data
)

# Log structured metrics for CloudWatch Logs Insights
logger.info(
"Published CloudWatch metrics",
extra={
'run_id': metrics.get('run_id'),
'metric_name': 'ETLRunComplete',
'metric_value': {
'input_rows': metrics['input_rows'],
'valid_rows': metrics['valid_rows'],
'quarantined_rows': metrics['quarantined_rows'],
'condemned_rows': metrics.get('condemned_rows', 0),
'quarantine_rate': quarantine_rate,
'avg_attempt_count': metrics.get('avg_attempt_count', 0.0),
'auto_condemnation_rate': metrics.get('auto_condemnation_rate', 0.0),
'duplicate_detection_rate': metrics.get('duplicate_detection_rate', 0.0)
}
}
)

return True

except Exception as e:
# Don't fail the job if CloudWatch publishing fails
logger.warning(
f"Failed to publish CloudWatch metrics: {e}",
extra={'run_id': metrics.get('run_id')}
)
return False

def main():
"""
ETL Pipeline: Bronze -> Silver
Reads CSV from Bronze layer (raw, immutable), validates, and writes Parquet to Silver layer.
Invalid rows are quarantined for audit.
"""
parser = argparse.ArgumentParser(description="Ingest Transactions ETL (Bronze -> Silver)")
parser.add_argument('--input-bucket', required=True)
parser.add_argument('--input-key', required=True)
parser.add_argument('--output-bucket', required=True)
parser.add_argument('--output-prefix', required=True)
parser.add_argument('--quarantine-bucket', required=True)
parser.add_argument('--quarantine-prefix', required=True)
parser.add_argument('--endpoint-url', default=os.getenv('S3_ENDPOINT_URL'), help="S3 Endpoint URL (optional, for local testing)")
parser.add_argument('--disable-cloudwatch', action='store_true', help="Disable CloudWatch metrics publishing (for local testing)")

args = parser.parse_args()

start_time = datetime.datetime.now(datetime.timezone.utc)
run_id = start_time.strftime('%Y%m%dT%H%M%SZ')
ingest_time = start_time.isoformat()

# Log with structured format for CloudWatch Logs Insights
logger.info(
"Starting ETL run",
extra={'run_id': run_id, 'metric_name': 'ETLStart', 'metric_value': 1}
)

s3 = get_s3_client(args.endpoint_url)

# 1. Read from Bronze layer (raw, immutable)
# Bronze path convention: bronze/mortgages/transactions/ingest_date=YYYY-MM-DD/run_id=.../file.csv.gz
raw_df = read_csv_from_s3(s3, args.input_bucket, args.input_key)
logger.info(f"Read {len(raw_df)} rows from Bronze layer (s3://{args.input_bucket}/{args.input_key}).")

# 2. Initialize Circuit Breaker
circuit_breaker = CircuitBreaker()

# 3. Validate & Transform (Bronze -> Silver)
source_file = f"s3://{args.input_bucket}/{args.input_key}"
valid_df, quarantine_df, condemned_df = validate_and_transform(
raw_df,
run_id,
source_file=source_file,
ingest_time=ingest_time,
s3_client=s3,
quarantine_bucket=args.quarantine_bucket,
quarantine_prefix=args.quarantine_prefix,
circuit_breaker=circuit_breaker,
)

# 3. Write Valid Data to Silver layer (Partitioned Parquet)
# Silver path convention: silver/mortgages/transactions/year=YYYY/month=MM/schema_v=v1/run_id=.../part-0000.parquet
# We include run_id and schema_v in the prefix to ensure idempotency, history, and schema versioning
schema_version = 'v1' # In production, this would come from config or schema registry
run_output_prefix = f"{args.output_prefix}/schema_v={schema_version}/run_id={run_id}"

if not valid_df.empty:
write_parquet_to_s3(
valid_df,
s3,
args.output_bucket,
run_output_prefix,
partition_cols=['year', 'month']
)

# 4. Write Quarantine Data (Partitioned by ingest_date manually or via col)
# Structure: quarantine/transactions/ingest_date=.../run_id=.../
if not quarantine_df.empty:
q_prefix = f"{args.quarantine_prefix}/ingest_date={datetime.datetime.now().strftime('%Y-%m-%d')}/run_id={run_id}"
write_parquet_to_s3(
quarantine_df,
s3,
args.quarantine_bucket,
q_prefix,
partition_cols=None # Flat file for quarantine usually easier to scan
)
logger.warning(f"Quarantined {len(quarantine_df)} invalid rows.")

# 4b. Write Condemned Data (separate layer)
# Structure: quarantine/transactions/condemned/ingest_date=.../run_id=.../
if not condemned_df.empty:
condemned_prefix = f"{args.quarantine_prefix}/condemned/ingest_date={datetime.datetime.now().strftime('%Y-%m-%d')}/run_id={run_id}"
write_condemned_to_s3(
condemned_df,
s3,
args.quarantine_bucket,
condemned_prefix
)
logger.warning(f"Condemned {len(condemned_df)} rows (max attempts or exact duplicates).")

# 5. Calculate duration
end_time = datetime.datetime.now(datetime.timezone.utc)
duration_seconds = (end_time - start_time).total_seconds()

# 6. Prepare metrics
metrics = {
'run_id': run_id,
'ingest_time': ingest_time,
'source_file': source_file,
'input_rows': len(raw_df),
'valid_rows': len(valid_df),
'quarantined_rows': len(quarantine_df),
'condemned_rows': len(condemned_df),
'duration_seconds': duration_seconds,
'status': 'SUCCESS'
}

# Calculate loop prevention metrics
all_processed = pd.concat([valid_df, quarantine_df, condemned_df], ignore_index=True) if not (valid_df.empty and quarantine_df.empty and condemned_df.empty) else pd.DataFrame()
if not all_processed.empty and 'attempt_count' in all_processed.columns:
metrics['avg_attempt_count'] = all_processed['attempt_count'].mean()

if len(condemned_df) > 0:
metrics['auto_condemnation_rate'] = (len(condemned_df) / len(raw_df)) * 100 if len(raw_df) > 0 else 0.0
duplicate_condemned = condemned_df[condemned_df['validation_error'] == ERROR_DUPLICATE_FAILURE]
if len(duplicate_condemned) > 0:
metrics['duplicate_detection_rate'] = (len(duplicate_condemned) / len(raw_df)) * 100 if len(raw_df) > 0 else 0.0

# Calculate quarantine breakdown by error type for detailed monitoring
quarantine_by_reason = {}
if not quarantine_df.empty and 'validation_error' in quarantine_df.columns:
error_counts = quarantine_df['validation_error'].value_counts().to_dict()
quarantine_by_reason = error_counts
metrics['quarantine_by_reason'] = quarantine_by_reason

# Add condemned breakdown
if not condemned_df.empty and 'validation_error' in condemned_df.columns:
condemned_by_reason = condemned_df['validation_error'].value_counts().to_dict()
metrics['condemned_by_reason'] = condemned_by_reason

# 7. Write Success Marker (for backward compatibility and manual inspection)
write_success_marker(s3, args.output_bucket, run_output_prefix, run_id, metrics)

# 8. Publish CloudWatch Metrics (optional, disabled for local testing)
cloudwatch_enabled = not args.disable_cloudwatch
publish_cloudwatch_metrics(metrics, enabled=cloudwatch_enabled)

# 9. Log completion with structured metrics for CloudWatch Logs Insights
logger.info(
"ETL run completed successfully",
extra={
'run_id': run_id,
'metric_name': 'ETLComplete',
'metric_value': {
'input_rows': metrics['input_rows'],
'valid_rows': metrics['valid_rows'],
'quarantined_rows': metrics['quarantined_rows'],
'duration_seconds': duration_seconds,
'quarantine_rate': metrics['quarantined_rows'] / metrics['input_rows'] * 100 if metrics['input_rows'] > 0 else 0.0,
'quarantine_by_reason': quarantine_by_reason
}
}
)

if __name__ == "__main__":
main()

See Also

Task 1 Documentation

Technical Documentation

© 2026 Stephen AdeiCC BY 4.0