Batch Processing Pipeline
This example demonstrates building a batch data processing pipeline using Da Vinci.
Overview
We’ll create a data processing pipeline that:
Imports data from S3
Processes records in batches
Stores results in DynamoDB
Tracks processing status
Handles failures gracefully
Table Definitions
from da_vinci.core.orm.table_object import (
TableObject,
TableObjectAttribute,
TableObjectAttributeType,
)
class DataRecordTable(TableObject):
"""Table for processed data records"""
table_name = "data_records"
partition_key_attribute = "record_id"
global_secondary_indexes = [
{
"index_name": "batch_status_index",
"partition_key": "batch_id",
"sort_key": "status",
}
]
attributes = [
TableObjectAttribute(
name="record_id",
attribute_type=TableObjectAttributeType.STRING,
),
TableObjectAttribute(
name="batch_id",
attribute_type=TableObjectAttributeType.STRING,
),
TableObjectAttribute(
name="source_data",
attribute_type=TableObjectAttributeType.JSON_STRING,
),
TableObjectAttribute(
name="processed_data",
attribute_type=TableObjectAttributeType.JSON_STRING,
optional=True,
),
TableObjectAttribute(
name="status",
attribute_type=TableObjectAttributeType.STRING,
default="pending",
),
TableObjectAttribute(
name="error_message",
attribute_type=TableObjectAttributeType.STRING,
optional=True,
),
TableObjectAttribute(
name="created_at",
attribute_type=TableObjectAttributeType.DATETIME,
),
TableObjectAttribute(
name="processed_at",
attribute_type=TableObjectAttributeType.DATETIME,
optional=True,
),
]
class BatchJobTable(TableObject):
"""Table for tracking batch jobs"""
table_name = "batch_jobs"
partition_key_attribute = "batch_id"
attributes = [
TableObjectAttribute(
name="batch_id",
attribute_type=TableObjectAttributeType.STRING,
),
TableObjectAttribute(
name="s3_key",
attribute_type=TableObjectAttributeType.STRING,
),
TableObjectAttribute(
name="total_records",
attribute_type=TableObjectAttributeType.NUMBER,
default=0,
),
TableObjectAttribute(
name="processed_records",
attribute_type=TableObjectAttributeType.NUMBER,
default=0,
),
TableObjectAttribute(
name="failed_records",
attribute_type=TableObjectAttributeType.NUMBER,
default=0,
),
TableObjectAttribute(
name="status",
attribute_type=TableObjectAttributeType.STRING,
default="running",
),
TableObjectAttribute(
name="started_at",
attribute_type=TableObjectAttributeType.DATETIME,
),
TableObjectAttribute(
name="completed_at",
attribute_type=TableObjectAttributeType.DATETIME,
optional=True,
),
]
Batch Processor
import json
import uuid
from datetime import UTC, datetime
from typing import Any
import boto3
from da_vinci.core.orm.client import TableClient
from tables import DataRecordTable, BatchJobTable
class BatchProcessor:
"""Process data in batches from S3"""
def __init__(self):
self.record_client = TableClient(DataRecordTable)
self.job_client = TableClient(BatchJobTable)
self.s3 = boto3.client('s3')
def start_batch(self, s3_bucket: str, s3_key: str) -> str:
"""Start a new batch processing job"""
batch_id = str(uuid.uuid4())
# Create batch job record
job = BatchJobTable(
batch_id=batch_id,
s3_key=f"s3://{s3_bucket}/{s3_key}",
total_records=0,
processed_records=0,
failed_records=0,
status="running",
started_at=datetime.now(UTC),
)
self.job_client.put(job)
# Load and process data
records = self._load_from_s3(s3_bucket, s3_key)
# Update total count
job.total_records = len(records)
self.job_client.put(job)
# Process in batches
self._process_records(batch_id, records)
return batch_id
def _load_from_s3(self, bucket: str, key: str) -> list[dict]:
"""Load JSON data from S3"""
response = self.s3.get_object(Bucket=bucket, Key=key)
data = json.loads(response['Body'].read())
return data
def _process_records(self, batch_id: str, records: list[dict]) -> None:
"""Process records in batches"""
batch_size = 25 # DynamoDB batch write limit
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
self._process_batch(batch_id, batch)
def _process_batch(self, batch_id: str, records: list[dict]) -> None:
"""Process a single batch of records"""
job = self.job_client.get(batch_id)
for record_data in records:
try:
# Process the record
processed = self._process_record(record_data)
# Save successful result
record = DataRecordTable(
record_id=str(uuid.uuid4()),
batch_id=batch_id,
source_data=record_data,
processed_data=processed,
status="completed",
created_at=datetime.now(UTC),
processed_at=datetime.now(UTC),
)
self.record_client.put(record)
job.processed_records += 1
except Exception as e:
# Save failed record
record = DataRecordTable(
record_id=str(uuid.uuid4()),
batch_id=batch_id,
source_data=record_data,
status="failed",
error_message=str(e),
created_at=datetime.now(UTC),
)
self.record_client.put(record)
job.failed_records += 1
# Update job status
if job.processed_records + job.failed_records >= job.total_records:
job.status = "completed"
job.completed_at = datetime.now(UTC)
self.job_client.put(job)
def _process_record(self, data: dict) -> dict:
"""
Process a single record
Implement your business logic here
"""
# Example: transform and enrich data
result = {
"original": data,
"processed_at": datetime.now(UTC).isoformat(),
"transformed": {
k.upper(): v for k, v in data.items()
}
}
return result
def get_batch_status(self, batch_id: str) -> dict:
"""Get status of a batch job"""
job = self.job_client.get(batch_id)
return {
"batch_id": job.batch_id,
"status": job.status,
"total": job.total_records,
"processed": job.processed_records,
"failed": job.failed_records,
"progress": (
job.processed_records + job.failed_records
) / job.total_records if job.total_records > 0 else 0,
}
def retry_failed_records(self, batch_id: str) -> None:
"""Retry failed records from a batch"""
# Query records for this batch using the GSI
# Then filter for failed status
all_batch_records = list(self.record_client.query(
index_name="batch_status_index",
partition_key_value=batch_id
))
# Filter for failed records
failed_records = [r for r in all_batch_records if r.status == "failed"]
# Retry each failed record
for record in failed_records:
try:
processed = self._process_record(record.source_data)
record.processed_data = processed
record.status = "completed"
record.processed_at = datetime.now(UTC)
record.error_message = None
except Exception as e:
record.error_message = str(e)
self.record_client.put(record)
Usage Example
processor = BatchProcessor()
# Start processing a file from S3
batch_id = processor.start_batch(
s3_bucket="my-data-bucket",
s3_key="data/import-2024-01-01.json"
)
print(f"Started batch: {batch_id}")
# Check status
status = processor.get_batch_status(batch_id)
print(f"Progress: {status['progress']:.1%}")
print(f"Processed: {status['processed']}/{status['total']}")
print(f"Failed: {status['failed']}")
# Retry failed records
if status['failed'] > 0:
processor.retry_failed_records(batch_id)
Key Concepts
- Batch Processing
Process records in chunks to optimize throughput.
- Status Tracking
Track processing status at both job and record level.
- Error Handling
Capture errors without stopping the entire batch.
- Retry Logic
Ability to retry failed records separately.
- Progress Monitoring
Track progress through the batch job.
Variations
Add SQS Queue
Process records asynchronously via SQS:
def queue_for_processing(self, batch_id: str, records: list[dict]):
"""Queue records for async processing"""
sqs = boto3.client('sqs')
queue_url = "your-queue-url"
for record in records:
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
"batch_id": batch_id,
"record": record
})
)
Add Lambda Trigger
Automatically process files when uploaded to S3:
def s3_handler(event: dict, context: Any) -> None:
"""Lambda handler for S3 events"""
for record in event['Records']:
bucket = record['s3']['bucket']['name']
key = record['s3']['object']['key']
processor = BatchProcessor()
processor.start_batch(bucket, key)