Event-Driven Architecture

This example demonstrates building an event-driven application using Da Vinci’s event bus capabilities.

Overview

We’ll build a user registration system that:

  • Creates user records

  • Publishes events when users are created

  • Sends welcome emails in response to events

  • Updates user statistics when events occur

This demonstrates decoupled, event-driven architecture where components communicate through events rather than direct calls.

Step 1: Define Tables

Create tables.py:

from datetime import UTC, datetime
from da_vinci.core.orm.table_object import (
    TableObject,
    TableObjectAttribute,
    TableObjectAttributeType,
)

class UserTable(TableObject):
    """Table for storing user records"""

    table_name = "users"
    partition_key_attribute = "user_id"

    attributes = [
        TableObjectAttribute(
            name="user_id",
            attribute_type=TableObjectAttributeType.STRING,
        ),
        TableObjectAttribute(
            name="email",
            attribute_type=TableObjectAttributeType.STRING,
        ),
        TableObjectAttribute(
            name="name",
            attribute_type=TableObjectAttributeType.STRING,
        ),
        TableObjectAttribute(
            name="status",
            attribute_type=TableObjectAttributeType.STRING,
            default="pending",
        ),
        TableObjectAttribute(
            name="created_at",
            attribute_type=TableObjectAttributeType.DATETIME,
        ),
    ]

class UserStatsTable(TableObject):
    """Table for tracking user statistics"""

    table_name = "user_stats"
    partition_key_attribute = "stat_key"

    attributes = [
        TableObjectAttribute(
            name="stat_key",
            attribute_type=TableObjectAttributeType.STRING,
        ),
        TableObjectAttribute(
            name="total_users",
            attribute_type=TableObjectAttributeType.NUMBER,
            default=0,
        ),
        TableObjectAttribute(
            name="active_users",
            attribute_type=TableObjectAttributeType.NUMBER,
            default=0,
        ),
        TableObjectAttribute(
            name="updated_at",
            attribute_type=TableObjectAttributeType.DATETIME,
        ),
    ]

Step 2: Create User Service with Events

Create user_service.py:

import uuid
from datetime import UTC, datetime

from da_vinci.core.orm.client import TableClient
from da_vinci.event_bus.client import EventPublisher
from da_vinci.event_bus.event import Event
from tables import UserTable


class UserService:
    """Service for managing users with event publishing"""

    def __init__(self):
        self.client = TableClient(UserTable)
        self.event_publisher = EventPublisher()

    def create_user(self, email: str, name: str) -> UserTable:
        """Create a new user and publish event"""
        user_id = str(uuid.uuid4())
        now = datetime.now(UTC)

        user = UserTable(
            user_id=user_id,
            email=email,
            name=name,
            status="pending",
            created_at=now,
        )

        # Save user
        self.client.put(user)

        # Publish event
        event = Event(
            event_type="user.created",
            body={
                "user_id": user.user_id,
                "email": user.email,
                "name": user.name,
            }
        )
        self.event_publisher.submit(event)

        return user

    def activate_user(self, user_id: str) -> UserTable:
        """Activate a user and publish event"""
        user = self.client.get(user_id)
        user.status = "active"
        self.client.put(user)

        # Publish activation event
        event = Event(
            event_type="user.activated",
            body={
                "user_id": user.user_id,
                "email": user.email,
            }
        )
        self.event_publisher.submit(event)

        return user

Step 3: Create Event Handlers

Create event_handlers.py:

from datetime import UTC, datetime
from da_vinci.core.orm.client import TableClient
from tables import UserStatsTable


def handle_user_created(event: dict) -> None:
    """
    Handler for user.created events
    Sends welcome email
    """
    detail = event.get("detail", {})
    user_email = detail.get("email")
    user_name = detail.get("name")

    # Send welcome email (pseudo-code)
    send_email(
        to=user_email,
        subject="Welcome!",
        body=f"Hello {user_name}, welcome to our platform!"
    )

    print(f"Welcome email sent to {user_email}")


def handle_user_activated(event: dict) -> None:
    """
    Handler for user.activated events
    Updates user statistics
    """
    stats_client = TableClient(UserStatsTable)

    # Get or create stats record
    stats = stats_client.get("global") or UserStatsTable(
        stat_key="global",
        total_users=0,
        active_users=0,
        updated_at=datetime.now(UTC)
    )

    # Increment counters
    stats.active_users += 1
    stats.updated_at = datetime.now(UTC)

    stats_client.put(stats)
    print(f"Stats updated: {stats.active_users} active users")


def handle_any_user_event(event: dict) -> None:
    """
    Handler for all user.* events
    Updates total user count
    """
    event_type = event.get("detail-type")

    if event_type == "user.created":
        stats_client = TableClient(UserStatsTable)

        stats = stats_client.get("global") or UserStatsTable(
            stat_key="global",
            total_users=0,
            active_users=0,
            updated_at=datetime.now(UTC)
        )

        stats.total_users += 1
        stats.updated_at = datetime.now(UTC)

        stats_client.put(stats)
        print(f"Stats updated: {stats.total_users} total users")


def send_email(to: str, subject: str, body: str) -> None:
    """Placeholder for email sending"""
    # In real implementation, use SES or similar
    print(f"Email to {to}: {subject}")

Step 4: Deploy with Event Bus

Create table_stacks.py:

from constructs import Construct
from da_vinci_cdk.stack import Stack
from da_vinci_cdk.constructs.dynamodb import DynamoDBTable
from tables import UserTable, UserStatsTable

class UserTableStack(Stack):
    """Stack for User table"""
    def __init__(self, app_name: str, deployment_id: str, scope: Construct, stack_name: str):
        super().__init__(app_name, deployment_id, scope, stack_name)
        self.table = DynamoDBTable.from_orm_table_object(table_object=UserTable, scope=self)

class UserStatsTableStack(Stack):
    """Stack for UserStats table"""
    def __init__(self, app_name: str, deployment_id: str, scope: Construct, stack_name: str):
        super().__init__(app_name, deployment_id, scope, stack_name)
        self.table = DynamoDBTable.from_orm_table_object(table_object=UserStatsTable, scope=self)

Create app.py:

from os.path import dirname, abspath
from da_vinci_cdk.application import Application
from table_stacks import UserTableStack, UserStatsTableStack

# Create application with event bus enabled
app = Application(
    app_name="user-system",
    deployment_id="dev",
    app_entry=abspath(dirname(__file__)),
    enable_event_bus=True,  # Enable event bus
)

# Add table stacks
app.add_uninitialized_stack(UserTableStack)
app.add_uninitialized_stack(UserStatsTableStack)

# Note: Event handlers are Lambda functions that are triggered by
# SQS queues. You would create Lambda functions for your handlers
# and configure them to consume from the event queues.
# See the CDK documentation for details on wiring Lambda functions to SQS.

app.synth()

Step 5: Use the Event-Driven System

from user_service import UserService

# Create service
service = UserService()

# Create a user (triggers events)
user = service.create_user(
    email="alice@example.com",
    name="Alice Smith"
)
print(f"Created user: {user.user_id}")

# This will trigger:
# 1. user.created event
# 2. handle_user_created() - sends welcome email
# 3. handle_any_user_event() - updates total user count

# Activate the user (triggers more events)
activated = service.activate_user(user.user_id)
print(f"Activated user: {activated.user_id}")

# This will trigger:
# 1. user.activated event
# 2. handle_user_activated() - updates active user count

Expected Output:

Created user: abc-123-def
Welcome email sent to alice@example.com
Stats updated: 1 total users

Activated user: abc-123-def
Stats updated: 1 active users

Key Concepts

Event Publisher

Publishes events to the event bus when actions occur.

Event Handlers

Lambda functions that respond to specific event types.

Decoupling

Services don’t directly call each other - they communicate through events.

Event Types

Use dot notation (user.created, user.activated) for event organization.

Wildcard Subscriptions

Use * to subscribe to multiple related event types.

Benefits

Complete Event Traceability

All events and processing results are stored in DynamoDB. Query event history, track which subscribers processed which events, identify failures, and build audit reports programmatically.

Extensibility

Add new handlers without modifying existing code.

Reliability

If one handler fails, others continue processing.

Scalability

Each handler scales independently.

Flexibility

Multiple handlers can respond to the same event.

Variations

Add Event Validation

from pydantic import BaseModel

class UserCreatedEvent(BaseModel):
    user_id: str
    email: str
    name: str

def handle_user_created(event: dict) -> None:
    detail = event.get("detail", {})
    validated = UserCreatedEvent(**detail)
    # Use validated.email, validated.name, etc.

Add Dead Letter Queue

Configure DLQ for failed event processing in your CDK:

app.add_event_handler(
    event_type="user.created",
    handler_function=handle_user_created,
    dead_letter_queue=True,  # Enable DLQ
)

Batch Event Processing

Process events in batches for efficiency:

def handle_user_events_batch(events: list[dict]) -> None:
    """Process multiple user events at once"""
    for event in events:
        # Process each event
        pass

Next Steps