Batch Processing Pipeline

This example demonstrates building a batch data processing pipeline using Da Vinci.

Overview

We’ll create a data processing pipeline that:

  • Imports data from S3

  • Processes records in batches

  • Stores results in DynamoDB

  • Tracks processing status

  • Handles failures gracefully

Table Definitions

from da_vinci.core.orm.table_object import (
    TableObject,
    TableObjectAttribute,
    TableObjectAttributeType,
)

class DataRecordTable(TableObject):
    """Table for processed data records"""

    table_name = "data_records"
    partition_key_attribute = "record_id"

    global_secondary_indexes = [
        {
            "index_name": "batch_status_index",
            "partition_key": "batch_id",
            "sort_key": "status",
        }
    ]

    attributes = [
        TableObjectAttribute(
            name="record_id",
            attribute_type=TableObjectAttributeType.STRING,
        ),
        TableObjectAttribute(
            name="batch_id",
            attribute_type=TableObjectAttributeType.STRING,
        ),
        TableObjectAttribute(
            name="source_data",
            attribute_type=TableObjectAttributeType.JSON_STRING,
        ),
        TableObjectAttribute(
            name="processed_data",
            attribute_type=TableObjectAttributeType.JSON_STRING,
            optional=True,
        ),
        TableObjectAttribute(
            name="status",
            attribute_type=TableObjectAttributeType.STRING,
            default="pending",
        ),
        TableObjectAttribute(
            name="error_message",
            attribute_type=TableObjectAttributeType.STRING,
            optional=True,
        ),
        TableObjectAttribute(
            name="created_at",
            attribute_type=TableObjectAttributeType.DATETIME,
        ),
        TableObjectAttribute(
            name="processed_at",
            attribute_type=TableObjectAttributeType.DATETIME,
            optional=True,
        ),
    ]

class BatchJobTable(TableObject):
    """Table for tracking batch jobs"""

    table_name = "batch_jobs"
    partition_key_attribute = "batch_id"

    attributes = [
        TableObjectAttribute(
            name="batch_id",
            attribute_type=TableObjectAttributeType.STRING,
        ),
        TableObjectAttribute(
            name="s3_key",
            attribute_type=TableObjectAttributeType.STRING,
        ),
        TableObjectAttribute(
            name="total_records",
            attribute_type=TableObjectAttributeType.NUMBER,
            default=0,
        ),
        TableObjectAttribute(
            name="processed_records",
            attribute_type=TableObjectAttributeType.NUMBER,
            default=0,
        ),
        TableObjectAttribute(
            name="failed_records",
            attribute_type=TableObjectAttributeType.NUMBER,
            default=0,
        ),
        TableObjectAttribute(
            name="status",
            attribute_type=TableObjectAttributeType.STRING,
            default="running",
        ),
        TableObjectAttribute(
            name="started_at",
            attribute_type=TableObjectAttributeType.DATETIME,
        ),
        TableObjectAttribute(
            name="completed_at",
            attribute_type=TableObjectAttributeType.DATETIME,
            optional=True,
        ),
    ]

Batch Processor

import json
import uuid
from datetime import UTC, datetime
from typing import Any

import boto3
from da_vinci.core.orm.client import TableClient
from tables import DataRecordTable, BatchJobTable


class BatchProcessor:
    """Process data in batches from S3"""

    def __init__(self):
        self.record_client = TableClient(DataRecordTable)
        self.job_client = TableClient(BatchJobTable)
        self.s3 = boto3.client('s3')

    def start_batch(self, s3_bucket: str, s3_key: str) -> str:
        """Start a new batch processing job"""
        batch_id = str(uuid.uuid4())

        # Create batch job record
        job = BatchJobTable(
            batch_id=batch_id,
            s3_key=f"s3://{s3_bucket}/{s3_key}",
            total_records=0,
            processed_records=0,
            failed_records=0,
            status="running",
            started_at=datetime.now(UTC),
        )
        self.job_client.put(job)

        # Load and process data
        records = self._load_from_s3(s3_bucket, s3_key)

        # Update total count
        job.total_records = len(records)
        self.job_client.put(job)

        # Process in batches
        self._process_records(batch_id, records)

        return batch_id

    def _load_from_s3(self, bucket: str, key: str) -> list[dict]:
        """Load JSON data from S3"""
        response = self.s3.get_object(Bucket=bucket, Key=key)
        data = json.loads(response['Body'].read())
        return data

    def _process_records(self, batch_id: str, records: list[dict]) -> None:
        """Process records in batches"""
        batch_size = 25  # DynamoDB batch write limit

        for i in range(0, len(records), batch_size):
            batch = records[i:i + batch_size]
            self._process_batch(batch_id, batch)

    def _process_batch(self, batch_id: str, records: list[dict]) -> None:
        """Process a single batch of records"""
        job = self.job_client.get(batch_id)

        for record_data in records:
            try:
                # Process the record
                processed = self._process_record(record_data)

                # Save successful result
                record = DataRecordTable(
                    record_id=str(uuid.uuid4()),
                    batch_id=batch_id,
                    source_data=record_data,
                    processed_data=processed,
                    status="completed",
                    created_at=datetime.now(UTC),
                    processed_at=datetime.now(UTC),
                )
                self.record_client.put(record)

                job.processed_records += 1

            except Exception as e:
                # Save failed record
                record = DataRecordTable(
                    record_id=str(uuid.uuid4()),
                    batch_id=batch_id,
                    source_data=record_data,
                    status="failed",
                    error_message=str(e),
                    created_at=datetime.now(UTC),
                )
                self.record_client.put(record)

                job.failed_records += 1

        # Update job status
        if job.processed_records + job.failed_records >= job.total_records:
            job.status = "completed"
            job.completed_at = datetime.now(UTC)

        self.job_client.put(job)

    def _process_record(self, data: dict) -> dict:
        """
        Process a single record
        Implement your business logic here
        """
        # Example: transform and enrich data
        result = {
            "original": data,
            "processed_at": datetime.now(UTC).isoformat(),
            "transformed": {
                k.upper(): v for k, v in data.items()
            }
        }
        return result

    def get_batch_status(self, batch_id: str) -> dict:
        """Get status of a batch job"""
        job = self.job_client.get(batch_id)

        return {
            "batch_id": job.batch_id,
            "status": job.status,
            "total": job.total_records,
            "processed": job.processed_records,
            "failed": job.failed_records,
            "progress": (
                job.processed_records + job.failed_records
            ) / job.total_records if job.total_records > 0 else 0,
        }

    def retry_failed_records(self, batch_id: str) -> None:
        """Retry failed records from a batch"""
        # Query records for this batch using the GSI
        # Then filter for failed status
        all_batch_records = list(self.record_client.query(
            index_name="batch_status_index",
            partition_key_value=batch_id
        ))

        # Filter for failed records
        failed_records = [r for r in all_batch_records if r.status == "failed"]

        # Retry each failed record
        for record in failed_records:
            try:
                processed = self._process_record(record.source_data)
                record.processed_data = processed
                record.status = "completed"
                record.processed_at = datetime.now(UTC)
                record.error_message = None
            except Exception as e:
                record.error_message = str(e)

            self.record_client.put(record)

Usage Example

processor = BatchProcessor()

# Start processing a file from S3
batch_id = processor.start_batch(
    s3_bucket="my-data-bucket",
    s3_key="data/import-2024-01-01.json"
)

print(f"Started batch: {batch_id}")

# Check status
status = processor.get_batch_status(batch_id)
print(f"Progress: {status['progress']:.1%}")
print(f"Processed: {status['processed']}/{status['total']}")
print(f"Failed: {status['failed']}")

# Retry failed records
if status['failed'] > 0:
    processor.retry_failed_records(batch_id)

Key Concepts

Batch Processing

Process records in chunks to optimize throughput.

Status Tracking

Track processing status at both job and record level.

Error Handling

Capture errors without stopping the entire batch.

Retry Logic

Ability to retry failed records separately.

Progress Monitoring

Track progress through the batch job.

Variations

Add SQS Queue

Process records asynchronously via SQS:

def queue_for_processing(self, batch_id: str, records: list[dict]):
    """Queue records for async processing"""
    sqs = boto3.client('sqs')
    queue_url = "your-queue-url"

    for record in records:
        sqs.send_message(
            QueueUrl=queue_url,
            MessageBody=json.dumps({
                "batch_id": batch_id,
                "record": record
            })
        )

Add Lambda Trigger

Automatically process files when uploaded to S3:

def s3_handler(event: dict, context: Any) -> None:
    """Lambda handler for S3 events"""
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = record['s3']['object']['key']

        processor = BatchProcessor()
        processor.start_batch(bucket, key)