Back to Writing

Python for Data Ops

Exploring Python's role in modern data operations, from ETL pipelines to automated reporting and system integration.

8 min read
PythonData OpsAutomation

Python for Data Operations

Python has become the lingua franca of data operations, but writing production-grade pipelines requires more than just knowing pandas and SQL. This guide covers the operational patterns that separate proof-of-concepts from systems that run reliably at scale.

The Production Mindset

The biggest shift from exploratory data work to production DataOps isn't about technology—it's about mindset. In notebooks, failing fast is fine. In production, every failure has a cost: wasted compute, stale dashboards, missed SLAs, and eroded trust.

Production-grade pipelines must be:

  • Idempotent: Running twice produces the same result
  • Observable: You know when things break and why
  • Testable: Automated tests catch regressions before deployment
  • Recoverable: Failures don't corrupt state

Error Handling That Actually Works

Most pipeline failures aren't code bugs—they're data quality issues. Schema changes, null values in unexpected places, upstream delays, API rate limits. Your error handling needs to account for this reality.

Pattern 1: Fail Loudly, Recover Gracefully

from typing import Optional
import logging

logger = logging.getLogger(__name__)

def extract_data(source: str, date: str) -> Optional[pd.DataFrame]:
    try:
        df = pd.read_parquet(f"{source}/{date}.parquet")
        validate_schema(df)
        return df
    except FileNotFoundError:
        logger.error(f"Missing data for {date} from {source}")
        return None
    except SchemaValidationError as e:
        logger.critical(f"Schema mismatch in {source}: {e}")
        raise  # Schema changes require human intervention
    except Exception as e:
        logger.exception(f"Unexpected error reading {source}")
        raise

The key insight: not all errors are equal. Missing data might be acceptable (return None, skip processing). Schema changes are critical (raise, alert, stop the pipeline).

Testing Strategies for Data Pipelines

Unit tests for data pipelines look different from application code. You're not just testing logic—you're testing assumptions about data shape, volume, and distribution.

The Three Layers

1. Unit Tests (Fast, Many)

Test individual transformations with synthetic data:

def test_deduplicate_users():
    input_df = pd.DataFrame({
        'user_id': [1, 1, 2],
        'event': ['login', 'login', 'signup']
    })
    result = deduplicate_users(input_df)
    assert len(result) == 2
    assert result['user_id'].is_unique

2. Integration Tests (Slower, Fewer)

Test end-to-end flows with realistic data samples:

@pytest.mark.integration
def test_daily_aggregation_pipeline(sample_events):
    result = run_daily_pipeline(sample_events, date='2024-01-15')
    assert result is not None
    assert set(result.columns) == expected_schema
    assert result['total_events'].sum() == len(sample_events)

3. Data Quality Tests (Production)

Validate assumptions about production data:

def validate_aggregates(df: pd.DataFrame) -> None:
    """Run in production after every transform."""
    assert df['revenue'].min() >= 0, "Negative revenue detected"
    assert df['user_count'].sum() > 0, "Zero users in output"
    assert not df.duplicated(subset=['date', 'cohort']).any()

These run inline with your pipeline code. If they fail, the pipeline stops before writing corrupt data downstream.

Dependency Management

Reproducibility matters. Your pipeline needs to run identically on your laptop, in CI, and in production.

Use pyproject.toml + Lock Files

[project]
name = "data-pipeline"
version = "1.0.0"
dependencies = [
    "pandas>=2.0,<3.0",
    "pyarrow>=14.0",
    "pydantic>=2.0",
]

[tool.poetry.dev-dependencies]
pytest = "^7.4"
black = "^23.0"

Lock files (poetry.lock, requirements.txt with hashes) ensure everyone gets the same versions.

Configuration Management

Never hardcode paths, credentials, or thresholds. Use environment-aware configs:

from pydantic import BaseSettings

class PipelineConfig(BaseSettings):
    data_lake_path: str
    warehouse_url: str
    max_retries: int = 3
    batch_size: int = 10000
    
    class Config:
        env_file = ".env"
        env_prefix = "PIPELINE_"

config = PipelineConfig()

Now your pipeline reads from environment variables or .env files. Different configs for dev, staging, prod—same code.

Logging for Operators

Your logs are for the person debugging at 2am, not for you while writing the code. Structure your logs:

import structlog

log = structlog.get_logger()

def process_batch(batch_id: str, size: int):
    log.info(
        "processing_batch",
        batch_id=batch_id,
        size=size,
        pipeline="daily_aggregates"
    )
    # ... processing ...
    log.info(
        "batch_complete",
        batch_id=batch_id,
        rows_processed=rows,
        duration_sec=elapsed
    )

Structured logs let you query: "Show me all batches that took >5min" or "What was the average batch size yesterday?"

Orchestration Patterns

Whether you use Airflow, Prefect, or Dagster, the same patterns apply:

Atomic Tasks

Each task should be independently retryable:

# Good: atomic, idempotent
def materialize_daily_summary(date: str):
    temp_table = f"summary_temp_{date}"
    final_table = "summary"
    
    compute_summary(date, output=temp_table)
    swap_tables(temp_table, final_table, partition=date)

If the task fails halfway, retry doesn't corrupt the final table.

Backfill-Friendly

Your pipeline should work for any date, not just "today":

# Bad: assumes "today"
df = read_data("s3://bucket/latest/")

# Good: explicit date parameter
df = read_data(f"s3://bucket/{date}/")

This makes backfills trivial: just re-run with date='2024-01-15'.

Performance Considerations

Python is slow, but your pipeline probably isn't CPU-bound—it's I/O-bound. Focus on reducing data movement:

  • Filter early: Push predicates into your data lake (Parquet column pruning)
  • Batch writes: Write 10K rows at once, not one at a time
  • Use Arrow: pandas → pyarrow → parquet is faster than pure pandas
  • Profile first: Don't optimize guesses, measure with cProfile or py-spy

Deployment Checklist

Before pushing to production:

  • ✅ Pipeline runs idempotently (can re-run safely)
  • ✅ All dependencies pinned in lock file
  • ✅ Config externalized (no hardcoded paths)
  • ✅ Structured logging in place
  • ✅ Data quality tests inline
  • ✅ Error handling distinguishes transient vs critical failures
  • ✅ Monitoring/alerting configured
  • ✅ Backfill procedure documented

The Path Forward

Production DataOps is an exercise in defensive programming. Assume upstream data will change. Assume your pipeline will fail. Build observability, testing, and recovery into every layer.

The pipelines that survive in production aren't the cleverest—they're the most boring. Predictable, testable, debuggable. That's the goal.