import json
import logging
from datetime import UTC, datetime
from os import environ
from typing import Any
from uuid import uuid4
import boto3
DEFAULT_LOG_LEVEL_NAME = "INFO"
S3_BUCKET_ENV_VAR = "DA_VINCI_S3_LOGGING_BUCKET"
[docs]
class S3LogHandler(logging.Handler):
"""Custom log handler to store logs in memory and offload them to S3."""
[docs]
def __init__(
self, execution_id: str, namespace: str, metadata: dict[str, Any] | None = None
) -> None:
"""
Set up the S3 log handler with the execution ID and namespace.
Keyword Arguments:
execution_id -- The execution ID
namespace -- The namespace for the logs
metadata -- Additional metadata to store with the logs
"""
super().__init__()
self.execution_id = execution_id
self.namespace = namespace
self.log_entries: list[dict[str, Any]] = []
self.metadata = metadata or {}
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""
Capture log records and store them in memory.
Keyword Arguments:
record -- The log record
"""
log_entry = {
"timestamp": datetime.now(tz=UTC).isoformat(),
"level": record.levelname,
"message": record.getMessage(),
}
self.log_entries.append(log_entry)
[docs]
def get_log_entries(self) -> list:
"""
Return the collected log entries.
"""
return self.log_entries
[docs]
def to_dict(self) -> dict:
"""
Return the collected log entries as a dictionary.
"""
return {
"execution_id": self.execution_id,
"metadata": self.metadata,
"namespace": self.namespace,
"entries": self.log_entries,
}
[docs]
class Logger:
[docs]
def __init__(
self,
namespace: str,
log_level_name: str | None = None,
s3_logging_enabled: bool | None = False,
s3_logging_bucket_name: str | None = None,
) -> None:
"""
Initialize the logger with the namespace and log level name.
Keyword Arguments:
namespace -- The namespace for the logger
log_level_name -- The log level name (default: INFO)
s3_logging_enabled -- Enable logging to S3 (default: False)
s3_logging_bucket_name -- The S3 bucket name for logging (default: None)
"""
self.namespace = namespace
self.execution_id = str(uuid4()) # Unique ID for each execution
log_level_name = log_level_name or environ.get("LOG_LEVEL", DEFAULT_LOG_LEVEL_NAME)
self.pylogger = logging.getLogger()
self.log_level_name = log_level_name.upper()
self.pylogger.setLevel(self.log_level_name)
# Custom handler to collect logs in memory
self.s3_log_handler = S3LogHandler(self.execution_id, namespace)
self.pylogger.addHandler(self.s3_log_handler)
# S3 logging configuration
self.s3_logging_enabled = s3_logging_enabled or S3_BUCKET_ENV_VAR in environ
if self.s3_logging_enabled:
self.s3_bucket = s3_logging_bucket_name or environ.get(S3_BUCKET_ENV_VAR)
else:
self.s3_bucket = None
self.s3_client = boto3.client("s3") if self.s3_logging_enabled else None
if self.s3_logging_enabled:
logging.info(f"S3 logging enabled: {self.s3_bucket}")
logging.info(f"Logger initialized with execution ID: {self.execution_id}")
[docs]
def dump_to_s3(self) -> None:
"""
Dump the collected logs to an S3 bucket as a JSON file.
"""
if not self.s3_logging_enabled:
self.pylogger.warning("S3 logging is disabled. Skipping S3 log upload.")
return
log_filename = f"logs/{self.namespace}/{self.execution_id}.json"
try:
log_data = json.dumps(self.s3_log_handler.to_dict(), indent=4)
if self.s3_client is None or self.s3_bucket is None:
self.pylogger.warning("S3 client or bucket not configured. Skipping S3 log upload.")
return
self.s3_client.put_object(Bucket=self.s3_bucket, Key=log_filename, Body=log_data)
self.pylogger.info(
f"Log successfully uploaded to S3: s3://{self.s3_bucket}/{log_filename}"
)
except Exception as e:
self.pylogger.error(f"Failed to upload log to S3: {e}")
[docs]
def finalize(self) -> None:
"""
Call this method to finalize logging and upload logs to S3 if enabled.
"""
self.dump_to_s3()
[docs]
def debug(self, message: str) -> None:
"""
Add a log message with level DEBUG.
Keyword Arguments:
message -- The log message
"""
self.pylogger.debug(message)
[docs]
def info(self, message: str) -> None:
"""
Add a log message with level INFO.
Keyword Arguments:
message -- The log message
"""
self.pylogger.info(message)
[docs]
def warning(self, message: str) -> None:
"""
Add a log message with level WARNING.
Keyword Arguments:
message -- The log message
"""
self.pylogger.warning(message)
[docs]
def error(self, message: str) -> None:
"""
Add a log message with level ERROR.
Keyword Arguments:
message -- The log message
"""
self.pylogger.error(message)