Skip to main content

PySpark Migration Guide

Overview

This guide explains how to migrate from the Pandas-based ETL pipeline to the PySpark-optimized version for improved performance and scalability.

Performance Benefits

  • 5-10x faster processing for large datasets
  • 100x scalability (2-100 DPUs vs 1 DPU max)
  • Distributed processing across multiple nodes
  • Predicate pushdown reduces I/O by 50-90%
  • Broadcast joins for efficient duplicate detection

Architecture Changes

Before (Pandas)

AWS Glue Python Shell (1 DPU)
├── Single-node processing
├── Row-by-row operations
├── In-memory limitations
└── Sequential processing

After (PySpark)

AWS Glue Spark Job (2-100 DPUs)
├── Distributed processing
├── Vectorized operations
├── Partition-based parallelism
└── Broadcast joins for small tables

Migration Steps

1. Update Terraform Infrastructure

The Terraform configuration has been updated to include a new Spark job:

cd tasks/04_devops_cicd/infra/terraform
terraform plan
terraform apply

This creates:

  • ohpen-transaction-etl-spark - New PySpark job (recommended)
  • ohpen-transaction-etl - Original Python Shell job (kept for compatibility)

2. Upload PySpark Scripts to S3

# Package and upload the Spark-optimized scripts
aws s3 cp tasks/01_data_ingestion_transformation/src/etl/ \
s3://ohpen-artifacts/scripts/ \
--recursive \
--exclude "*.pyc" \
--exclude "__pycache__"

3. Update Glue Job Script Location

The new Spark job uses:

  • Script: s3://ohpen-artifacts/scripts/ingest_transactions_spark.py
  • Entry point: src.etl.ingest_transactions_spark.main

4. Test the Migration

  1. Keep the original Python Shell job running
  2. Test the new Spark job with a subset of data
  3. Compare results and performance
  4. Switch over once validated

Option B: Gradual Migration

  1. Start with Spark job for new data only
  2. Backfill historical data using Spark job
  3. Decommission Python Shell job once stable

Code Changes Summary

Module Mapping

Original ModulePySpark ModuleKey Changes
s3_operations.pys3_operations_spark.pySpark DataFrame readers/writers, predicate pushdown
metadata.pymetadata_spark.pyVectorized hash computation (no df.apply())
validation.pyvalidation_spark.pySpark SQL filters, window functions
loop_prevention.pyloop_prevention_spark.pyBroadcast joins for duplicate detection
validator.pyvalidator_spark.pySpark DataFrame operations
ingest_transactions.pyingest_transactions_spark.pySparkSession initialization, Spark config

Key Optimizations Applied

1. Vectorized Operations

Before (Pandas):

df['row_hash'] = df.apply(lambda row: compute_row_hash(row, columns), axis=1)

After (PySpark):

df = df.withColumn(
'row_hash',
sha2(concat_ws('|', *[coalesce(col(c), lit('')) for c in columns]), 256)
)

Benefit: 10-100x faster, distributed across partitions

2. Broadcast Joins

Before (Pandas):

# Stub implementation - not actually checking history
quarantine_history = {}

After (PySpark):

quarantine_history_df = spark.read.parquet(quarantine_path)
df = df.join(broadcast(quarantine_history_df), on='row_hash', how='left')

Benefit: Efficient duplicate detection, no expensive shuffles

3. Predicate Pushdown

Before (Pandas):

raw_df = pd.read_csv(StringIO(content))  # Load entire file
valid_df = raw_df[raw_df['Currency'].isin(ALLOWED_CURRENCIES)] # Then filter

After (PySpark):

raw_df = spark.read.csv(path).filter(
col("Currency").isin(ALLOWED_CURRENCIES) # Filter at source
)

Benefit: 50-90% reduction in I/O, faster processing

Configuration Changes

Spark Job Settings

The new Spark job is configured with:

  • Workers: 2 DPUs (G.1X) - scale up as needed
  • Glue Version: 4.0 (Spark 3.3)
  • Optimizations:
    • Adaptive query execution
    • Partition coalescing
    • Skew join handling
    • Kryo serialization

Scaling Guidelines

Data VolumeRecommended DPUsWorker Type
< 1GB/month2G.1X
1-10GB/month4-8G.1X
10-100GB/month10-20G.2X
> 100GB/month20+G.2X or G.4X

Testing

Local Testing

For local development, install PySpark:

pip install -r requirements-spark.txt

Run tests:

cd tasks/01_data_ingestion_transformation
pytest tests/ -v

AWS Glue Testing

  1. Create a test run in AWS Glue console
  2. Use a small test file (e.g., 1000 rows)
  3. Verify output matches Pandas version
  4. Check CloudWatch logs for errors

Monitoring

Key Metrics to Watch

  • Processing Time: Should be 5-10x faster
  • DPU Hours: Should be lower due to faster execution
  • Data Quality: Should match Pandas version exactly
  • Error Rates: Should be same or lower

CloudWatch Metrics

The Spark job publishes the same CloudWatch metrics:

  • ETLStart / ETLComplete
  • QuarantinedRows
  • Custom metrics in Ohpen/ETL namespace

Rollback Plan

If issues occur:

  1. Immediate: Switch back to Python Shell job (ohpen-transaction-etl)
  2. Investigation: Check CloudWatch logs for errors
  3. Fix: Update Spark job configuration or code
  4. Re-test: Validate with test dataset before re-enabling

Performance Comparison

Expected Improvements

MetricPandas (Python Shell)PySpark (Spark Job)Improvement
Processing time (500MB)5-10 min1-2 min5-10x faster
Max file size~40MB100GB+2500x larger
Scalability1 DPU2-100 DPUs100x capacity
Cost per run$0.44/DPU-hr$0.44/DPU-hr (faster)5-10x cost reduction

Real-World Benchmarks

Based on industry benchmarks:

  • 100 large CSV files: PySpark 11 min vs Pandas 60+ min
  • 2B records (400GB): PySpark completes, Pandas fails

Troubleshooting

Common Issues

1. Out of Memory Errors

Solution: Increase number of workers or use larger worker types (G.2X, G.4X)

2. Slow Performance

Check:

  • Are predicate pushdown filters applied?
  • Is data properly partitioned?
  • Are broadcast joins being used for small tables?

3. Data Quality Differences

Solution: Compare row counts and sample data between Pandas and PySpark outputs

Debug Mode

Enable Spark UI for debugging:

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

Access Spark UI via AWS Glue console → Job runs → Spark UI

Next Steps

  1. Phase 1: Deploy Spark job alongside Python Shell job
  2. Phase 2: Test with production data subset
  3. Phase 3: Full migration to Spark job
  4. 🔄 Phase 4: Implement Glue Data Catalog (for Athena)
  5. 🔄 Phase 5: Add Lambda triggers for event-driven processing

References

Support

For issues or questions:

  1. Check CloudWatch logs
  2. Review Spark UI for execution plans
  3. Compare with Pandas version output
  4. Consult optimization analysis document
© 2026 Stephen AdeiCC BY 4.0