Source code for da_vinci.core.orm.client

import logging
from collections.abc import Generator
from enum import StrEnum, auto
from typing import Any

import boto3
from botocore.exceptions import ClientError

from da_vinci.core.exceptions import ResourceNotFoundError
from da_vinci.core.orm.orm_exceptions import (
    TableScanInvalidAttributeError,
    TableScanInvalidComparisonError,
)
from da_vinci.core.orm.table_object import (
    TableObject,
    TableObjectAttributeType,
)
from da_vinci.core.resource_discovery import ResourceDiscovery


[docs] class TableResultSortOrder(StrEnum): """Sort order for table query results""" ASCENDING = auto() DESCENDING = auto()
[docs] class PaginatorCall: """Paginator call types for DynamoDB operations""" QUERY = "query" SCAN = "scan"
[docs] class PaginatedResults: """ Container for paginated DynamoDB query results Keyword Arguments: items -- List of table objects returned from the query last_evaluated_key -- Key to use for fetching the next page (optional) """
[docs] def __init__(self, items: list[TableObject], last_evaluated_key: dict | None = None) -> None: self.items = items self.last_evaluated_key = last_evaluated_key self.has_more = last_evaluated_key is not None
def __iter__(self) -> Any: return iter(self.items)
[docs] class TableScanDefinition: """ Define filters for scanning a DynamoDB table Provides a builder pattern for constructing filter expressions for DynamoDB scan and query operations. Supports various comparison operators and handles attribute type conversions. """ _comparison_operators = { "contains": "contains", "equal": "=", "greater_than": ">", "greater_than_or_equal": ">=", "less_than": "<", "less_than_or_equal": "<=", "not_equal": "!=", }
[docs] def __init__( self, table_object_class: type[TableObject], attribute_prefix: str | None = None ) -> None: """ Create a new scan definition for a DynamoDB table Keyword Arguments: table_object_class: Table object class to scan attribute_prefix: Prefix to use for attribute names (default: None) """ self._attribute_filters: list[tuple[str, str, Any]] = [] self.attribute_prefix = attribute_prefix self.table_object_class = table_object_class
[docs] def add(self, attribute_name: str, comparison: str, value: Any) -> None: """ Add an attribute filter to the scan definition Keyword Arguments: attribute_name: Name of the attribute to filter on comparison: Comparison operator to use (ex: equal or greater_than) value: Value to compare against """ if comparison not in self._comparison_operators: raise TableScanInvalidComparisonError(comparison) attr_name = attribute_name if self.attribute_prefix: attr_name = f"{self.attribute_prefix}_{attribute_name}" attribute_definition = self.table_object_class.attribute_definition( name=attr_name, ) if not attribute_definition: raise TableScanInvalidAttributeError(attr_name) comparison = self._comparison_operators[comparison] self._attribute_filters.append((attr_name, comparison, value))
[docs] def to_expression(self) -> tuple[str, dict[str, Any]] | tuple[None, None]: """ Convert the scan definition to a DynamoDB expression Returns: DynamoDB expression """ attr_keys = "abcdefghijklmnopqrstuvwxyz" # Caching loaded attributes to avoid multiple calls to reduce the # excess looping that would occur with constant attribute_definition lookups loaded_attrs: dict[str, Any] = {} expression: list[str] = [] expression_attributes: dict[str, Any] = {} if not self._attribute_filters: return None, None for idx, fltr in enumerate(self._attribute_filters): name, comparison, value = fltr if name in loaded_attrs: attr = loaded_attrs[name] else: attr = self.table_object_class.attribute_definition(name) if not attr: raise TableScanInvalidAttributeError(name) loaded_attrs[name] = attr attr_key = ":" + attr_keys[idx] expr_part = "" if comparison == "contains": expr_part = f"contains({attr.dynamodb_key_name}, {attr_key})" else: expr_part = f"{attr.dynamodb_key_name} {comparison} {attr_key}" if ( comparison == "contains" and attr.attribute_type == TableObjectAttributeType.STRING_LIST or attr.attribute_type == TableObjectAttributeType.JSON and isinstance(value, str) ): attr_dynamodb = {attr.dynamodb_key_name: {"S": value}} else: attr_dynamodb = attr.as_dynamodb_attribute(value) expression_attributes[attr_key] = attr_dynamodb[attr.dynamodb_key_name] expression.append(expr_part) return " AND ".join(expression), expression_attributes
[docs] def to_instructions(self) -> list[str]: """ Convert the scan definition to a list of basic scan instructions Returns: List of DynamoDB scan instructions """ instructions: list[str] = [] for fltr in self._attribute_filters: name, comparison, value = fltr instructions.append(f"{name} {comparison} {value}") return instructions
[docs] def __getattr__(self, attr: str) -> Any: """ Override to dynamically add attribute filters to the scan definition using comparison names as method names. Example: scan_definition.equal('foo', 'bar') will add a filter for the attribute foo equal to bar Keyword Arguments: attr: Name of the attribute to filter on """ if attr in self._comparison_operators: def add_filter(attribute_name: str, value: Any) -> None: self.add( attribute_name=attribute_name, comparison=attr, value=value, ) return add_filter raise AttributeError(attr)
[docs] class TableClient: """ Client for interacting with DynamoDB tables using Da Vinci table objects Provides high-level operations for querying, scanning, and managing DynamoDB tables with automatic resource discovery and pagination support. """
[docs] def __init__( self, default_object_class: type[TableObject], app_name: str | None = None, deployment_id: str | None = None, table_endpoint_name: str | None = None, resource_discovery_storage_solution: str | None = None, ) -> None: """ Initialize the table client with resource discovery Keyword Arguments: default_object_class -- TableObject subclass defining the table schema app_name -- Application name for resource discovery (optional) deployment_id -- Deployment identifier for resource discovery (optional) table_endpoint_name -- Explicit table name (optional, will be discovered if not provided) resource_discovery_storage_solution -- Storage solution for resource discovery (optional) """ self.default_object_class = default_object_class self.table_name = self.default_object_class.table_name self.table_endpoint_name = table_endpoint_name if not self.table_endpoint_name: from da_vinci.core.resource_discovery import ResourceDiscoveryStorageSolution storage_solution = None if resource_discovery_storage_solution: storage_solution = ResourceDiscoveryStorageSolution( resource_discovery_storage_solution ) resource_discovery = ResourceDiscovery( resource_name=self.table_name, resource_type="table", app_name=app_name, deployment_id=deployment_id, storage_solution=storage_solution, ) self.table_endpoint_name = resource_discovery.endpoint_lookup() self.client = boto3.client("dynamodb")
[docs] @classmethod def table_resource_exists( cls, table_object_class: type[TableObject], app_name: str | None = None, deployment_id: str | None = None, ) -> bool: """ Check if a DaVinci based DynamoDB table exists Arguments: app_name: Name of the application deployment_id: Unique identifier for the installation table_object_class: The object class of the table to check """ try: resource_discovery = ResourceDiscovery( resource_name=table_object_class.table_name, resource_type="table", app_name=app_name, deployment_id=deployment_id, ) resource_discovery.endpoint_lookup() except ResourceNotFoundError: return False return True
[docs] def paginated( self, call: str | PaginatorCall = PaginatorCall.QUERY, last_evaluated_key: dict | None = None, last_evaluated_object: TableObject | None = None, limit: int | None = None, max_pages: int | None = None, parameters: dict | None = None, sort_order: TableResultSortOrder | None = TableResultSortOrder.ASCENDING, ) -> Generator[PaginatedResults, None, None]: """ Handle paginated DynamoDB table results. The last item in a page should be the last evaluated item. Keyword Arguments: call: Name of the DynamoDB client method to call, either a scan or query (default: query) last_evaluated_key: Last evaluated key from a previous page of results (default: None) last_evaluated_object: Last evaluated object from a previous page of results (default: None), only supported for query limit: Maximum number of items to retrieve per page (default: None) max_pages: Maximum number of pages to retrieve, if None it will return all available (default: None) parameters: Parameters to pass to the client method sort_order: Sort order to use for the results, only works for query calls (default: ASCENDING) """ more_results = True params = parameters or {} if "TableName" not in params: params["TableName"] = self.table_endpoint_name if "Select" not in params: params["Select"] = "ALL_ATTRIBUTES" if limit and "Limit" not in params: params["Limit"] = limit mthd = getattr(self.client, str(call)) if call == "query" and sort_order: if not self.default_object_class.sort_key_attribute: raise Exception("Table object must have sort key to enable sorting") params["ScanIndexForward"] = sort_order == TableResultSortOrder.ASCENDING if last_evaluated_key: if call == "scan": if not isinstance(last_evaluated_key, dict): raise Exception("Last evaluated key must be a dictionary for scan operations") params["ExclusiveStartKey"] = last_evaluated_key else: # query params["ExclusiveStartKey"] = last_evaluated_key elif last_evaluated_object: key_gen_args = { "partition_key_value": last_evaluated_object.attribute_value( last_evaluated_object.partition_key_attribute.name ) } if self.default_object_class.sort_key_attribute: key_gen_args["sort_key_value"] = last_evaluated_object.attribute_value( self.default_object_class.sort_key_attribute.name ) params["ExclusiveStartKey"] = last_evaluated_object.gen_dynamodb_key(**key_gen_args) logging.debug(f"Created paginated parameters: {params}") # Page iteration counter retrieved_pages = 0 # Iterate through each page of results, yielding the results as # a list of TableObjects while more_results: items: list = [] response = mthd(**params) logging.debug(f"Paginated response: {response}") for item in response.get("Items", []): item_obj = self.default_object_class.from_dynamodb_item(item) items.append(item_obj) yield PaginatedResults(items=items, last_evaluated_key=response.get("LastEvaluatedKey")) more_results = "LastEvaluatedKey" in response if more_results: logging.debug( f"More results found, continuing paginated query: {response['LastEvaluatedKey']}" ) params["ExclusiveStartKey"] = response["LastEvaluatedKey"] retrieved_pages += 1 # Break if max_pages is set and we've reached the requested limit if max_pages and retrieved_pages >= max_pages: break
def _all_objects(self) -> list[TableObject]: """ Loads all objects from a DynamoDB table into memory. Not recommended to use for large tables. """ all: list[TableObject] = [] for page in self.paginated(call="scan"): all.extend(page) return all
[docs] def get_object( self, partition_key_value: Any, sort_key_value: Any = None, consistent_read: bool | None = False, ) -> TableObject | None: """ Retrieve a single object from the table by partition and sort key Keyword Arguments: partition_key_value: Value of the partition key sort_key_value: Value of the sort key (default: None) consistent_read: Whether to use consistent read (default: False) """ dynamodb_key = self.default_object_class.gen_dynamodb_key( partition_key_value=partition_key_value, sort_key_value=sort_key_value, ) if self.table_endpoint_name is None: raise ValueError("Table endpoint name is not set") results = self.client.get_item( TableName=self.table_endpoint_name, Key=dynamodb_key, ConsistentRead=consistent_read if consistent_read is not None else False, ) logging.debug(f"Get object results: {results}") if "Item" not in results: return None return self.default_object_class.from_dynamodb_item(results["Item"])
[docs] def put_object(self, table_object: TableObject) -> None: """ Save a single object to the table Keyword Arguments: table_object: Object to save """ logging.debug(f"Saving object: {table_object.to_dynamodb_item()}") if self.table_endpoint_name is None: raise ValueError("Table endpoint name is not set") try: table_object.execute_on_update() self.client.put_item( TableName=self.table_endpoint_name, Item=table_object.to_dynamodb_item(), ) except ClientError as e: if e.response["Error"]["Code"] == "ValidationException": error_message = e.response["Error"]["Message"] if "Supplied AttributeValue is empty" in error_message: raise Exception( f"Empty attribute value detected, if using JSON type, attributes cannot be empty. Original Error: {error_message}" ) from e else: raise else: # Re-raise the error if it's not a ValidationException raise
[docs] def delete_object_by_key(self, partition_key_value: Any, sort_key_value: Any = None) -> None: """ Delete a single object from the table by partition and sort key Keyword Arguments: partition_key_value: Value of the partition key sort_key_value: Value of the sort key (default: None) """ key_args = { "partition_key_value": partition_key_value, } if sort_key_value: key_args["sort_key_value"] = sort_key_value if self.table_endpoint_name is None: raise ValueError("Table endpoint name is not set") self.client.delete_item( TableName=self.table_endpoint_name, Key=self.default_object_class.gen_dynamodb_key(**key_args), )
[docs] def delete_object(self, table_object: TableObject) -> None: """ Delete a single object from the table Keyword Arguments: table_object: Object to remove """ partition_key = table_object.partition_key_attribute key_args = { "partition_key_value": table_object.attribute_value(partition_key.name), } if table_object.sort_key_attribute: key_args["sort_key_value"] = table_object.attribute_value( table_object.sort_key_attribute.name ) if self.table_endpoint_name is None: raise ValueError("Table endpoint name is not set") self.client.delete_item( TableName=self.table_endpoint_name, Key=table_object.gen_dynamodb_key(**key_args), )
[docs] def scanner( self, scan_definition: TableScanDefinition ) -> Generator[PaginatedResults, None, None]: """ Perform a scan on the table, works similar to the paginator. Keyword Arguments: scan_definition: Scan definition to use (default: None) """ filter_expression, attribute_values = scan_definition.to_expression() params: dict[str, Any] = { "Select": "ALL_ATTRIBUTES", "TableName": self.table_endpoint_name, } if filter_expression: params["ExpressionAttributeValues"] = attribute_values params["FilterExpression"] = filter_expression yield from self.paginated(call="scan", parameters=params)
[docs] def full_scan(self, scan_definition: TableScanDefinition) -> list[TableObject]: """ Perform a full scan on the table, returns all items matching the scan definition at once. Keyword Arguments: scan_definition: Scan definition to use (default: None) """ all: list[TableObject] = [] for page in self.scanner(scan_definition=scan_definition): all.extend(page) return all
[docs] def update_object( self, partition_key_value: Any, sort_key_value: Any, updates: dict[str, Any] | None = None, remove_keys: list[str] | None = None, ) -> None: """ Updates an item in the DynamoDB table by applying SET and REMOVE operations. This method allows partial updates to items by setting new values for attributes or removing existing ones. It supports dot notation for nested JSON map updates, enabling the modification of specific keys within a JSON-like structure in DynamoDB. Arguments: partition_key_value (Any): The value of the partition key for the item to be updated. sort_key_value (Any): The value of the sort key for the item to be updated. updates (Dict[str, Any], optional): A dictionary containing attribute names (as keys) and their new values (as values) to be updated in the table. If dot notation is used in the attribute name (e.g., 'json_map.sub_key'), it will update a nested key within a DynamoDB MAP type. remove_keys (List[str], optional): A list of attribute names to be removed from the item. Dot notation can be used to remove nested attributes from a DynamoDB MAP. Example Usage: - To update a nested key inside a JSON map: updates = {'json_map.sub_key': 'new_value'} remove_keys = ['json_map.another_sub_key'] - To update a top-level attribute and remove another: updates = {'attribute1': 'new_value'} remove_keys = ['attribute2'] Notes: - This method generates DynamoDB UpdateExpressions to execute the SET and REMOVE operations in a single request. - If both updates and remove_keys are provided, they are combined in the final update expression. - Dot notation in updates or remove_keys will handle nested attributes within DynamoDB MAP types. - This method assumes the object's table schema is already defined in the `default_object_class`. Raises: ClientError: If a client error occurs during the DynamoDB update operation. Exception: If an attribute with an empty value is provided for a DynamoDB JSON attribute. Returns: None """ update_expressions: list[str] = [] expression_attribute_values: dict[str, Any] = {} expression_attribute_names: dict[str, str] = {} # Handle updates (SET operations) if updates: update_instructions: list[str] = [] for attribute_name, value in updates.items(): # Check for dot notation (e.g. 'json_map.sub_key') if "." in attribute_name: parts = attribute_name.split(".") dynamo_key = f"#{parts[0]}" nested_key = ".".join([f"#{part}" for part in parts[1:]]) dynamo_value = f":val_{attribute_name.replace('.', '_')}" # Construct the SET expression for nested MAP update_instructions.append(f"{dynamo_key}.{nested_key} = {dynamo_value}") # Prepare the attribute value and name mappings expression_attribute_values[dynamo_value] = value expression_attribute_names.update({f"#{part}": part for part in parts}) attr_def = self.default_object_class.attribute_definition(parts[0]) if attr_def is None: raise ValueError(f"Attribute {parts[0]} not found in table definition") expression_attribute_names[dynamo_key] = attr_def.dynamodb_key_name # Regular attribute (non-nested) else: dynamo_key = f"#{attribute_name}" dynamo_value = f":val_{attribute_name}" attr_definition = self.default_object_class.attribute_definition(attribute_name) if attr_definition is None: raise ValueError( f"Attribute {attribute_name} not found in table definition" ) update_instructions.append(f"{dynamo_key} = {dynamo_value}") # Wrapping in a list b/c dict_values expression_attribute_values[dynamo_value] = list( attr_definition.as_dynamodb_attribute(value).values() )[0] expression_attribute_names[dynamo_key] = attr_definition.dynamodb_key_name # Combine all SET expressions into a single string update_expressions.append("SET " + ", ".join(update_instructions)) # Handle removals (REMOVE operations) if remove_keys: removals: list[str] = [] for attribute_name in remove_keys: if "." in attribute_name: # Dot notation for removing nested MAP attributes parts = attribute_name.split(".") dynamo_key = f"#{parts[0]}" nested_key = ".".join([f"#{part}" for part in parts[1:]]) removals.append(f"{dynamo_key}.{nested_key}") expression_attribute_names.update({f"#{part}": part for part in parts}) else: # Regular attribute (non-nested) dynamo_key = f"#{attribute_name}" removals.append(dynamo_key) expression_attribute_names[dynamo_key] = attribute_name update_expressions.append(f"REMOVE {', '.join(removals)}") # Combine all expressions into a single DynamoDB expression update_expression = " ".join(update_expressions) logging.debug(f"Update expression: {update_expression}") logging.debug(f"Expression attribute values: {expression_attribute_values}") logging.debug(f"Expression attribute names: {expression_attribute_names}") # Generate the DynamoDB key for the object dynamodb_key = self.default_object_class.gen_dynamodb_key( partition_key_value=partition_key_value, sort_key_value=sort_key_value, ) logging.debug(f"DynamoDB key: {dynamodb_key}") if self.table_endpoint_name is None: raise ValueError("Table endpoint name is not set") # Execute the update in DynamoDB self.client.update_item( TableName=self.table_endpoint_name, Key=dynamodb_key, UpdateExpression=update_expression, ExpressionAttributeValues=expression_attribute_values, ExpressionAttributeNames=expression_attribute_names, )