"""
File upload security validation for video files.
Provides comprehensive security checks for uploaded video files including:
- File size validation
- Extension whitelisting
- MIME type verification
- File header (magic bytes) verification
- Filename sanitization
- Path traversal prevention
- Malicious content detection
All validation failures are logged for security auditing.
"""
import logging
import os
import re
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
import magic
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.files.uploadedfile import UploadedFile
logger = logging.getLogger(__name__)
[docs]
class FileSecurityValidator:
"""Base class for file security validators.
All validators should inherit from this class and implement the validate() method.
Validation failures are logged with audit information.
"""
[docs]
def __init__(self, enabled: bool = True):
"""Initialize validator.
Args:
enabled: Whether this validator is enabled. Disabled validators always pass.
"""
self.enabled = enabled
[docs]
def validate(self, file: UploadedFile, **kwargs) -> None:
"""Validate the uploaded file.
Args:
file: The uploaded file to validate
**kwargs: Additional validator-specific parameters
Raises:
ValidationError: If validation fails
"""
if not self.enabled:
return
self._validate_impl(file, **kwargs)
def _validate_impl(self, file: UploadedFile, **kwargs) -> None:
"""Implementation of validation logic. Must be overridden by subclasses.
Args:
file: The uploaded file to validate
**kwargs: Additional validator-specific parameters
Raises:
ValidationError: If validation fails
"""
raise NotImplementedError("Validators must implement _validate_impl()")
def _log_validation_failure(
self,
file: UploadedFile,
reason: str,
severity: str = "WARNING",
**extra_context
) -> None:
"""Log validation failure for security auditing.
Args:
file: The uploaded file that failed validation
reason: Human-readable reason for failure
severity: Log level (WARNING, ERROR, CRITICAL)
**extra_context: Additional context to include in log
"""
log_data = {
"event": "file_upload_validation_failed",
"validator": self.__class__.__name__,
"upload_filename": file.name,
"file_size": file.size,
"content_type": getattr(file, 'content_type', None),
"reason": reason,
**extra_context,
}
if severity == "ERROR":
logger.error(f"File validation failed: {reason}", extra=log_data)
elif severity == "CRITICAL":
logger.critical(f"Critical file validation failure: {reason}", extra=log_data)
else:
logger.warning(f"File validation failed: {reason}", extra=log_data)
def _log_validation_success(self, file: UploadedFile, **extra_context) -> None:
"""Log successful validation for auditing.
Args:
file: The uploaded file that passed validation
**extra_context: Additional context to include in log
"""
log_data = {
"event": "file_upload_validation_passed",
"validator": self.__class__.__name__,
"upload_filename": file.name,
"file_size": file.size,
"content_type": getattr(file, 'content_type', None),
**extra_context,
}
logger.info(f"File validation passed: {self.__class__.__name__}", extra=log_data)
[docs]
class FileSizeValidator(FileSecurityValidator):
"""Validates file size against configured limits.
Prevents denial-of-service attacks via oversized uploads and ensures
storage constraints are respected.
"""
[docs]
def __init__(
self,
max_size_bytes: Optional[int] = None,
min_size_bytes: int = 1,
enabled: bool = True,
):
"""Initialize file size validator.
Args:
max_size_bytes: Maximum allowed file size in bytes.
Defaults to settings.VIDEO_MAX_UPLOAD_SIZE_BYTES
min_size_bytes: Minimum allowed file size in bytes (default: 1)
enabled: Whether this validator is enabled
"""
super().__init__(enabled=enabled)
self.max_size_bytes = max_size_bytes or getattr(
settings,
"VIDEO_MAX_UPLOAD_SIZE_BYTES",
500 * 1024 * 1024 # 500 MB default
)
self.min_size_bytes = min_size_bytes
def _validate_impl(self, file: UploadedFile, **kwargs) -> None:
"""Validate file size is within allowed range.
Args:
file: The uploaded file to validate
Raises:
ValidationError: If file size is outside allowed range
"""
file_size = file.size
# Check minimum size (detect empty or corrupted uploads)
if file_size < self.min_size_bytes:
self._log_validation_failure(
file,
f"File size ({file_size} bytes) below minimum ({self.min_size_bytes} bytes)",
severity="WARNING",
file_size_bytes=file_size,
min_size_bytes=self.min_size_bytes,
)
raise ValidationError(
f"File size ({file_size} bytes) is below the minimum allowed size "
f"({self.min_size_bytes} bytes)."
)
# Check maximum size
if file_size > self.max_size_bytes:
size_mb = file_size / (1024 * 1024)
max_mb = self.max_size_bytes / (1024 * 1024)
self._log_validation_failure(
file,
f"File size ({size_mb:.1f} MB) exceeds maximum ({max_mb:.1f} MB)",
severity="WARNING",
file_size_bytes=file_size,
max_size_bytes=self.max_size_bytes,
)
raise ValidationError(
f"File size ({size_mb:.1f} MB) exceeds the maximum allowed size "
f"({max_mb:.1f} MB)."
)
self._log_validation_success(
file,
file_size_bytes=file_size,
max_size_bytes=self.max_size_bytes,
)
[docs]
class FileExtensionValidator(FileSecurityValidator):
"""Validates file extension against a whitelist.
Prevents upload of executable files or files with misleading extensions.
Case-insensitive comparison.
"""
[docs]
def __init__(
self,
allowed_extensions: Optional[Set[str]] = None,
enabled: bool = True,
):
"""Initialize extension validator.
Args:
allowed_extensions: Set of allowed extensions (with leading dot, e.g., {'.mp4', '.mov'}).
Defaults to settings.VIDEO_ALLOWED_EXTENSIONS
enabled: Whether this validator is enabled
"""
super().__init__(enabled=enabled)
self.allowed_extensions = allowed_extensions or set(
getattr(
settings,
"VIDEO_ALLOWED_EXTENSIONS",
[".mp4", ".mov", ".avi", ".mkv", ".webm"],
)
)
# Normalize to lowercase
self.allowed_extensions = {ext.lower() for ext in self.allowed_extensions}
def _validate_impl(self, file: UploadedFile, **kwargs) -> None:
"""Validate file extension is in the whitelist.
Args:
file: The uploaded file to validate
Raises:
ValidationError: If file extension is not allowed
"""
# Extract extension (case-insensitive)
filename = file.name or ""
ext = os.path.splitext(filename)[1].lower()
if ext not in self.allowed_extensions:
self._log_validation_failure(
file,
f"File extension '{ext}' not in whitelist",
severity="WARNING",
extension=ext,
allowed_extensions=list(self.allowed_extensions),
)
raise ValidationError(
f"File extension '{ext}' is not allowed. "
f"Allowed extensions: {', '.join(sorted(self.allowed_extensions))}"
)
self._log_validation_success(file, extension=ext)
[docs]
class MimeTypeValidator(FileSecurityValidator):
"""Validates MIME type from file content (not just declared content-type).
Uses python-magic (libmagic) to detect actual file type from content,
preventing extension-based spoofing attacks.
"""
[docs]
def __init__(
self,
allowed_mime_types: Optional[Set[str]] = None,
verify_content: bool = True,
enabled: bool = True,
):
"""Initialize MIME type validator.
Args:
allowed_mime_types: Set of allowed MIME types.
Defaults to settings.VIDEO_ALLOWED_MIME_TYPES
verify_content: If True, uses libmagic to detect MIME type from content.
If False, only checks declared content_type header.
enabled: Whether this validator is enabled
"""
super().__init__(enabled=enabled)
self.allowed_mime_types = allowed_mime_types or set(
getattr(
settings,
"VIDEO_ALLOWED_MIME_TYPES",
[
"video/mp4",
"video/quicktime",
"video/x-msvideo",
"video/x-matroska",
"video/webm",
],
)
)
self.verify_content = verify_content
def _detect_mime_type(self, file: UploadedFile) -> str:
"""Detect MIME type from file content using libmagic.
Args:
file: The uploaded file to analyze
Returns:
Detected MIME type string
"""
# Read first chunk for MIME detection
file.seek(0)
chunk = file.read(8192) # Read 8KB for magic detection
file.seek(0) # Reset position
# Use python-magic to detect MIME type
mime = magic.Magic(mime=True)
detected_mime = mime.from_buffer(chunk)
return detected_mime
def _validate_impl(self, file: UploadedFile, **kwargs) -> None:
"""Validate MIME type is in the whitelist.
Args:
file: The uploaded file to validate
Raises:
ValidationError: If MIME type is not allowed or doesn't match content
"""
declared_mime = getattr(file, 'content_type', 'application/octet-stream')
if self.verify_content:
# Detect actual MIME type from file content
try:
detected_mime = self._detect_mime_type(file)
except Exception as e:
self._log_validation_failure(
file,
f"Failed to detect MIME type: {e}",
severity="ERROR",
declared_mime=declared_mime,
error=str(e),
)
raise ValidationError(
f"Unable to verify file type. The file may be corrupted. Error: {e}"
)
# Check if detected MIME is allowed
if detected_mime not in self.allowed_mime_types:
self._log_validation_failure(
file,
f"Detected MIME type '{detected_mime}' not in whitelist",
severity="ERROR",
declared_mime=declared_mime,
detected_mime=detected_mime,
allowed_mime_types=list(self.allowed_mime_types),
)
raise ValidationError(
f"File type '{detected_mime}' is not allowed. "
f"Allowed types: {', '.join(sorted(self.allowed_mime_types))}"
)
# Warn if declared and detected MIME don't match
if declared_mime and declared_mime != detected_mime:
logger.warning(
f"MIME type mismatch: declared '{declared_mime}' but detected '{detected_mime}'",
extra={
"event": "mime_type_mismatch",
"upload_filename": file.name,
"declared_mime": declared_mime,
"detected_mime": detected_mime,
},
)
self._log_validation_success(
file,
declared_mime=declared_mime,
detected_mime=detected_mime,
)
else:
# Only check declared content-type
if declared_mime not in self.allowed_mime_types:
self._log_validation_failure(
file,
f"Declared MIME type '{declared_mime}' not in whitelist",
severity="WARNING",
declared_mime=declared_mime,
allowed_mime_types=list(self.allowed_mime_types),
)
raise ValidationError(
f"File type '{declared_mime}' is not allowed. "
f"Allowed types: {', '.join(sorted(self.allowed_mime_types))}"
)
self._log_validation_success(file, declared_mime=declared_mime)
[docs]
class FilenameSanitizer(FileSecurityValidator):
"""Sanitizes filenames to prevent path traversal and other attacks.
Removes or escapes dangerous characters and patterns from filenames.
Does not raise ValidationError - instead modifies the filename in-place.
"""
# Dangerous patterns that could indicate path traversal or command injection
DANGEROUS_PATTERNS = [
r"\.\.", # Parent directory reference
r"[<>:\"|?*]", # Windows forbidden characters
r"[\x00-\x1f]", # Control characters
r"^\..*", # Hidden files (optional - currently allowed)
]
# Maximum filename length (without extension)
MAX_FILENAME_LENGTH = 255
[docs]
def __init__(
self,
remove_path_components: bool = True,
replace_spaces: bool = True,
lowercase: bool = False,
enabled: bool = True,
):
"""Initialize filename sanitizer.
Args:
remove_path_components: Remove any path components (keep only basename)
replace_spaces: Replace spaces with underscores
lowercase: Convert filename to lowercase
enabled: Whether this validator is enabled
"""
super().__init__(enabled=enabled)
self.remove_path_components = remove_path_components
self.replace_spaces = replace_spaces
self.lowercase = lowercase
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize a filename.
Args:
filename: The original filename
Returns:
Sanitized filename
"""
# Remove path components (security: prevent directory traversal)
if self.remove_path_components:
# Handle both Unix and Windows path separators
# Replace all path separators with Unix separator, then use basename
filename = filename.replace("\\", "/")
filename = os.path.basename(filename)
# Split into name and extension
name, ext = os.path.splitext(filename)
# Remove dangerous patterns
for pattern in self.DANGEROUS_PATTERNS:
name = re.sub(pattern, "", name)
# Replace spaces
if self.replace_spaces:
name = name.replace(" ", "_")
# Remove leading/trailing dots and whitespace
name = name.strip(". ")
# Ensure we have a valid name before lowercase/truncate
if not name:
name = "unnamed"
# Convert to lowercase (after ensuring name exists)
if self.lowercase:
name = name.lower()
ext = ext.lower()
# Truncate if too long
if len(name) > self.MAX_FILENAME_LENGTH:
name = name[:self.MAX_FILENAME_LENGTH]
return f"{name}{ext}"
def _validate_impl(self, file: UploadedFile, **kwargs) -> None:
"""Sanitize the filename (modifies file.name in-place).
Args:
file: The uploaded file to sanitize
"""
original_name = file.name
sanitized_name = self._sanitize_filename(original_name)
if original_name != sanitized_name:
logger.warning(
f"Filename sanitized: '{original_name}' → '{sanitized_name}'",
extra={
"event": "filename_sanitized",
"original_name": original_name,
"sanitized_name": sanitized_name,
},
)
# Modify the file object's name
file.name = sanitized_name
self._log_validation_success(
file,
original_name=original_name,
sanitized_name=sanitized_name,
)
[docs]
class VideoFileValidator:
"""Composite validator for video file uploads.
Chains multiple validators together and provides a single validation interface.
Use this as the main entry point for video file validation.
"""
[docs]
def __init__(
self,
max_size_bytes: Optional[int] = None,
allowed_extensions: Optional[Set[str]] = None,
allowed_mime_types: Optional[Set[str]] = None,
verify_mime_content: bool = True,
validate_headers: bool = True,
sanitize_filenames: bool = True,
enable_all: bool = True,
):
"""Initialize composite video file validator.
Args:
max_size_bytes: Maximum file size in bytes
allowed_extensions: Set of allowed extensions (e.g., {'.mp4', '.mov'})
allowed_mime_types: Set of allowed MIME types
verify_mime_content: Use libmagic to verify MIME from content
validate_headers: Check file headers (magic bytes)
sanitize_filenames: Sanitize filenames for security
enable_all: Enable all validators (can be overridden per-validator)
"""
self.validators: List[FileSecurityValidator] = []
# Size validation
self.validators.append(
FileSizeValidator(
max_size_bytes=max_size_bytes,
enabled=enable_all,
)
)
# Extension validation
self.validators.append(
FileExtensionValidator(
allowed_extensions=allowed_extensions,
enabled=enable_all,
)
)
# MIME type validation
self.validators.append(
MimeTypeValidator(
allowed_mime_types=allowed_mime_types,
verify_content=verify_mime_content,
enabled=enable_all,
)
)
# File header validation
if validate_headers:
self.validators.append(FileHeaderValidator(enabled=enable_all))
# Filename sanitization (should run last)
if sanitize_filenames:
self.validators.append(
FilenameSanitizer(
remove_path_components=True,
replace_spaces=True,
lowercase=False,
enabled=enable_all,
)
)
[docs]
def validate(self, file: UploadedFile) -> None:
"""Run all validators on the uploaded file.
Args:
file: The uploaded file to validate (UploadedFile for new uploads,
FieldFile for existing files)
Raises:
ValidationError: If any validator fails
"""
# Skip validation for existing files (FieldFile) - only validate new uploads
from django.db.models.fields.files import FieldFile
if isinstance(file, FieldFile):
logger.debug(
f"Skipping validation for existing file '{file.name}'",
extra={
"event": "video_validation_skipped",
"upload_filename": file.name,
"reason": "existing_file",
},
)
return
# Get content_type if available (UploadedFile has it, FieldFile doesn't)
content_type = getattr(file, 'content_type', None)
logger.info(
f"Starting video file validation for '{file.name}'",
extra={
"event": "video_validation_started",
"upload_filename": file.name,
"file_size": file.size,
"content_type": content_type,
"validator_count": len(self.validators),
},
)
validation_errors = []
for validator in self.validators:
try:
validator.validate(file)
except ValidationError as e:
validation_errors.append(str(e))
if validation_errors:
# Combine all validation errors
combined_error = " | ".join(validation_errors)
logger.error(
f"Video file validation failed for '{file.name}': {combined_error}",
extra={
"event": "video_validation_failed",
"upload_filename": file.name,
"file_size": file.size,
"errors": validation_errors,
},
)
raise ValidationError(combined_error)
logger.info(
f"Video file validation passed for '{file.name}'",
extra={
"event": "video_validation_passed",
"upload_filename": file.name,
"file_size": file.size,
},
)
[docs]
def validate_multiple(self, files: List[UploadedFile]) -> Dict[str, List[str]]:
"""Validate multiple files and return validation results.
Args:
files: List of uploaded files to validate
Returns:
Dictionary mapping filenames to list of validation errors (empty list if valid)
"""
results = {}
for file in files:
try:
self.validate(file)
results[file.name] = [] # No errors
except ValidationError as e:
results[file.name] = [str(e)]
return results