Python for Data Ops
Exploring Python's role in modern data operations, from ETL pipelines to automated reporting and system integration.
Python for Data Operations
Python has become the lingua franca of data operations, but writing production-grade pipelines requires more than just knowing pandas and SQL. This guide covers the operational patterns that separate proof-of-concepts from systems that run reliably at scale.
The Production Mindset
The biggest shift from exploratory data work to production DataOps isn't about technology—it's about mindset. In notebooks, failing fast is fine. In production, every failure has a cost: wasted compute, stale dashboards, missed SLAs, and eroded trust.
Production-grade pipelines must be:
- •Idempotent: Running twice produces the same result
- •Observable: You know when things break and why
- •Testable: Automated tests catch regressions before deployment
- •Recoverable: Failures don't corrupt state
Error Handling That Actually Works
Most pipeline failures aren't code bugs—they're data quality issues. Schema changes, null values in unexpected places, upstream delays, API rate limits. Your error handling needs to account for this reality.
Pattern 1: Fail Loudly, Recover Gracefully
from typing import Optional
import logging
logger = logging.getLogger(__name__)
def extract_data(source: str, date: str) -> Optional[pd.DataFrame]:
try:
df = pd.read_parquet(f"{source}/{date}.parquet")
validate_schema(df)
return df
except FileNotFoundError:
logger.error(f"Missing data for {date} from {source}")
return None
except SchemaValidationError as e:
logger.critical(f"Schema mismatch in {source}: {e}")
raise # Schema changes require human intervention
except Exception as e:
logger.exception(f"Unexpected error reading {source}")
raise
The key insight: not all errors are equal. Missing data might be acceptable (return None, skip processing). Schema changes are critical (raise, alert, stop the pipeline).
Testing Strategies for Data Pipelines
Unit tests for data pipelines look different from application code. You're not just testing logic—you're testing assumptions about data shape, volume, and distribution.
The Three Layers
1. Unit Tests (Fast, Many)
Test individual transformations with synthetic data:
def test_deduplicate_users():
input_df = pd.DataFrame({
'user_id': [1, 1, 2],
'event': ['login', 'login', 'signup']
})
result = deduplicate_users(input_df)
assert len(result) == 2
assert result['user_id'].is_unique
2. Integration Tests (Slower, Fewer)
Test end-to-end flows with realistic data samples:
@pytest.mark.integration
def test_daily_aggregation_pipeline(sample_events):
result = run_daily_pipeline(sample_events, date='2024-01-15')
assert result is not None
assert set(result.columns) == expected_schema
assert result['total_events'].sum() == len(sample_events)
3. Data Quality Tests (Production)
Validate assumptions about production data:
def validate_aggregates(df: pd.DataFrame) -> None:
"""Run in production after every transform."""
assert df['revenue'].min() >= 0, "Negative revenue detected"
assert df['user_count'].sum() > 0, "Zero users in output"
assert not df.duplicated(subset=['date', 'cohort']).any()
These run inline with your pipeline code. If they fail, the pipeline stops before writing corrupt data downstream.
Dependency Management
Reproducibility matters. Your pipeline needs to run identically on your laptop, in CI, and in production.
Use pyproject.toml + Lock Files
[project]
name = "data-pipeline"
version = "1.0.0"
dependencies = [
"pandas>=2.0,<3.0",
"pyarrow>=14.0",
"pydantic>=2.0",
]
[tool.poetry.dev-dependencies]
pytest = "^7.4"
black = "^23.0"
Lock files (poetry.lock, requirements.txt with hashes) ensure everyone gets the same versions.
Configuration Management
Never hardcode paths, credentials, or thresholds. Use environment-aware configs:
from pydantic import BaseSettings
class PipelineConfig(BaseSettings):
data_lake_path: str
warehouse_url: str
max_retries: int = 3
batch_size: int = 10000
class Config:
env_file = ".env"
env_prefix = "PIPELINE_"
config = PipelineConfig()
Now your pipeline reads from environment variables or .env files. Different configs for dev, staging, prod—same code.
Logging for Operators
Your logs are for the person debugging at 2am, not for you while writing the code. Structure your logs:
import structlog
log = structlog.get_logger()
def process_batch(batch_id: str, size: int):
log.info(
"processing_batch",
batch_id=batch_id,
size=size,
pipeline="daily_aggregates"
)
# ... processing ...
log.info(
"batch_complete",
batch_id=batch_id,
rows_processed=rows,
duration_sec=elapsed
)
Structured logs let you query: "Show me all batches that took >5min" or "What was the average batch size yesterday?"
Orchestration Patterns
Whether you use Airflow, Prefect, or Dagster, the same patterns apply:
Atomic Tasks
Each task should be independently retryable:
# Good: atomic, idempotent
def materialize_daily_summary(date: str):
temp_table = f"summary_temp_{date}"
final_table = "summary"
compute_summary(date, output=temp_table)
swap_tables(temp_table, final_table, partition=date)
If the task fails halfway, retry doesn't corrupt the final table.
Backfill-Friendly
Your pipeline should work for any date, not just "today":
# Bad: assumes "today"
df = read_data("s3://bucket/latest/")
# Good: explicit date parameter
df = read_data(f"s3://bucket/{date}/")
This makes backfills trivial: just re-run with date='2024-01-15'.
Performance Considerations
Python is slow, but your pipeline probably isn't CPU-bound—it's I/O-bound. Focus on reducing data movement:
- •Filter early: Push predicates into your data lake (Parquet column pruning)
- •Batch writes: Write 10K rows at once, not one at a time
- •Use Arrow: pandas → pyarrow → parquet is faster than pure pandas
- •Profile first: Don't optimize guesses, measure with cProfile or py-spy
Deployment Checklist
Before pushing to production:
- •✅ Pipeline runs idempotently (can re-run safely)
- •✅ All dependencies pinned in lock file
- •✅ Config externalized (no hardcoded paths)
- •✅ Structured logging in place
- •✅ Data quality tests inline
- •✅ Error handling distinguishes transient vs critical failures
- •✅ Monitoring/alerting configured
- •✅ Backfill procedure documented
The Path Forward
Production DataOps is an exercise in defensive programming. Assume upstream data will change. Assume your pipeline will fail. Build observability, testing, and recovery into every layer.
The pipelines that survive in production aren't the cleverest—they're the most boring. Predictable, testable, debuggable. That's the goal.