Source code for cw.tvspots.admin

"""
Django admin configuration for campaigns and video ad units.

Uses Django Unfold for tabs, display decorators, and styled actions.
"""

from django.contrib import admin, messages
from django.shortcuts import redirect
from django.urls import path, reverse
from django.utils.html import format_html
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
from unfold.admin import ModelAdmin, TabularInline
from unfold.decorators import action, display

from cw.core.widgets import InsightsEditorWidget, ScriptEditorWidget, VideoPlayerWidget

from .models import (
    AdUnitMedia,
    AdUnitScriptRow,
    Brand,
    Campaign,
    KeyFrame,
    Storyboard,
    StoryboardImage,
    VideoAdUnit,
    VideoProcessingResult,
)

# ---------------------------------------------------------------------------
# Brand
# ---------------------------------------------------------------------------


[docs] @admin.register(Brand) class BrandAdmin(ModelAdmin): list_display = ["name", "code", "show_active", "show_campaign_count", "updated_at"] list_filter = ["is_active"] search_fields = ["name", "code", "description", "guidelines"] readonly_fields = ["created_at", "updated_at"] fieldsets = ( ( _("Brand"), { "classes": ["tab"], "fields": ("code", "name", "is_active"), }, ), ( _("Description"), { "classes": ["tab"], "fields": ("description",), }, ), ( _("Guidelines"), { "classes": ["tab"], "fields": ("guidelines",), }, ), ( _("Insights"), { "classes": ["tab"], "fields": ("insights",), }, ), ( _("Metadata"), { "classes": ["tab"], "fields": ("created_at", "updated_at"), }, ), )
[docs] def formfield_for_dbfield(self, db_field, request, **kwargs): if db_field.name == "insights": kwargs["widget"] = InsightsEditorWidget() return super().formfield_for_dbfield(db_field, request, **kwargs)
[docs] @display(description=_("Active"), boolean=True) def show_active(self, obj): return obj.is_active
[docs] @display(description=_("Campaigns")) def show_campaign_count(self, obj): return obj.campaigns.count()
# --------------------------------------------------------------------------- # Campaign # ---------------------------------------------------------------------------
[docs] class AdUnitMediaInline(TabularInline): """Inline display of uploaded videos for Campaign.""" model = AdUnitMedia tab = True extra = 1 fields = ["video_file", "show_status", "show_progress", "show_metadata", "show_actions"] readonly_fields = ["show_status", "show_progress", "show_metadata", "show_actions"] show_change_link = True
[docs] @display( description=_("Status"), label={ "pending": "info", "uploaded": "info", "processing": "warning", "completed": "success", "failed": "danger", "reviewed": "success", }, ) def show_status(self, obj): if obj.pk: return obj.get_status_display() return "-"
[docs] @display(description=_("Progress")) def show_progress(self, obj): """Display progress bar for processing tasks.""" if not obj.pk or obj.status not in ["processing"]: return "-" # Add progress bar with data attributes for JavaScript polling return mark_safe( f'<div class="video-progress-container" data-media-id="{obj.pk}">' f'<div class="progress-bar" style="width: 0%; height: 20px; background: #4CAF50; transition: width 0.3s;"></div>' f'<div class="progress-text" style="text-align: center; margin-top: 4px; font-size: 12px;">Starting...</div>' f'</div>' )
[docs] @display(description=_("Video Info")) def show_metadata(self, obj): if not obj.pk or not obj.duration: return "-" parts = [] if obj.duration: parts.append(f"{obj.duration:.1f}s") if obj.resolution_width and obj.resolution_height: parts.append(f"{obj.resolution_width}×{obj.resolution_height}") if obj.file_size: # Convert bytes to MB size_mb = obj.file_size / (1024 * 1024) parts.append(f"{size_mb:.1f}MB") return " • ".join(parts) if parts else "-"
[docs] @display(description=_("Actions")) def show_actions(self, obj): if not obj.pk: return "-" links = [] # Link to processing result if obj.result_id: result_url = reverse("admin:tvspots_videoprocessingresult_change", args=[obj.result_id]) links.append(format_html('<a href="{}">View Results</a>', result_url)) # Link to created ad unit if obj.video_ad_unit_id: ad_unit_url = reverse("admin:tvspots_videoadunit_change", args=[obj.video_ad_unit_id]) links.append(format_html('<a href="{}">View Ad Unit</a>', ad_unit_url)) from django.utils.safestring import mark_safe return mark_safe(" • ".join(str(link) for link in links)) if links else "-"
[docs] class VideoAdUnitInline(TabularInline): """Inline display of ad units for Campaign.""" model = VideoAdUnit tab = True extra = 0 fields = ["code", "title", "origin_or_adaptation", "language", "show_status"] readonly_fields = ["code", "title", "origin_or_adaptation", "language", "show_status"] can_delete = False show_change_link = True
[docs] def has_add_permission(self, request, obj=None): return False
[docs] @display( description=_("Status"), label={ "Pending": "info", "Processing": "warning", "Completed": "success", "Failed": "danger", "Concept Analysis": "warning", "Cultural Analysis": "warning", "Writing": "warning", "Format Evaluation": "warning", "Cultural Evaluation": "warning", "Concept Evaluation": "warning", "Evaluating Brand": "warning", "Revising": "warning", }, ) def show_status(self, obj): return obj.get_status_display()
[docs] @admin.register(Campaign) class CampaignAdmin(ModelAdmin): list_display = [ "script_title", "client_name", "product_name", "job_id", "show_ad_units_count", ] list_filter = ["client_name", "brand"] search_fields = ["script_title", "client_name", "job_id"] readonly_fields = ["created_at", "updated_at"] inlines = [AdUnitMediaInline, VideoAdUnitInline] actions_list = ["import_campaign_action"] actions_detail = [ "create_origin_ad_unit_action", "create_adaptation_action", "bulk_upload_videos_action", "export_campaign_action", ] fieldsets = ( ( _("Project"), { "classes": ["tab"], "fields": ( ("client_name", "brand"), ("product_name", "job_id"), "script_title", ), }, ), ( _("Original Script Data"), { "classes": ["tab"], "fields": ("original_script_data",), }, ), ( _("Metadata"), { "classes": ["tab"], "fields": ("created_at", "updated_at"), }, ), )
[docs] @display(description=_("Ad Units")) def show_ad_units_count(self, obj): count = obj.ad_units.count() if count > 0: url = reverse("admin:tvspots_videoadunit_changelist") return format_html( '<a href="{}?campaign__id__exact={}">{}</a>', url, obj.id, count, ) return "0"
[docs] def save_formset(self, request, form, formset, change): """Handle saving inline formsets and auto-queue video processing.""" # Only process AdUnitMedia formsets if formset.model != AdUnitMedia: super().save_formset(request, form, formset, change) return instances = formset.save(commit=False) # Track new AdUnitMedia instances with videos new_videos = [] for instance in instances: # Check if this is a new AdUnitMedia with a video file if not instance.pk and instance.video_file: instance.status = "uploaded" instance.save() new_videos.append(instance) else: instance.save() # Save many-to-many relationships formset.save_m2m() # Delete any marked for deletion for obj in formset.deleted_objects: obj.delete() # Queue processing tasks for new videos if new_videos: from .tasks import analyze_video_task for media in new_videos: result = analyze_video_task.apply_async(args=[media.pk], queue="default") # Store task ID for progress tracking media.celery_task_id = result.id media.save(update_fields=["celery_task_id"]) messages.info( request, f"Video processing queued for {media.video_file.name}", )
[docs] def get_urls(self): """Add custom URLs for import, origin creation, and adaptation actions.""" urls = super().get_urls() custom_urls = [ path( "import/", self.admin_site.admin_view(self.import_campaign_view), name="tvspots_campaign_import", ), path( "<int:object_id>/create-origin/", self.admin_site.admin_view(self.create_origin_ad_unit_view), name="tvspots_campaign_create_origin", ), path( "<int:object_id>/create-adaptation/", self.admin_site.admin_view(self.create_adaptation_view), name="tvspots_campaign_create_adaptation", ), path( "language-models/<int:language_id>/", self.admin_site.admin_view(self.language_models_api), name="tvspots_campaign_language_models_api", ), path( "<int:object_id>/bulk-upload-videos/", self.admin_site.admin_view(self.bulk_upload_videos_view), name="tvspots_campaign_bulk_upload_videos", ), ] return custom_urls + urls
[docs] def import_campaign_view(self, request): """Handle importing a campaign from JSON.""" import json from django.db import transaction from django.template.response import TemplateResponse if request.method == "POST": # Check for uploaded file first, then fallback to pasted JSON json_file = request.FILES.get("json_file") json_data = None if json_file: try: json_data = json_file.read().decode("utf-8") except Exception as e: messages.error(request, f"Error reading file: {e}") return redirect("admin:tvspots_campaign_import") else: json_data = request.POST.get("json_data", "").strip() if not json_data: messages.error(request, "JSON data or file is required.") return redirect("admin:tvspots_campaign_import") try: data = json.loads(json_data) except json.JSONDecodeError as e: messages.error(request, f"Invalid JSON: {e}") return redirect("admin:tvspots_campaign_import") # Validate required fields errors = self._validate_campaign_json(data) if errors: for error in errors: messages.error(request, error) return redirect("admin:tvspots_campaign_import") # Check for duplicate job_id job_id = data["job_id"] if Campaign.objects.filter(job_id=job_id).exists(): messages.error(request, f"Campaign with job_id '{job_id}' already exists.") return redirect("admin:tvspots_campaign_import") # Create records try: with transaction.atomic(): from cw.audiences.models import Language # Lookup Language by code language_code = data.get("language", "en-US") try: language = Language.objects.get(code=language_code) except Language.DoesNotExist: messages.error( request, f"Language '{language_code}' not found. Please create it first or use an existing language code.", ) return redirect("admin:tvspots_campaign_import") campaign = Campaign.objects.create( client_name=data["client_name"], product_name=data.get("product_name", ""), script_title=data["script_title"], job_id=job_id, original_script_data=data, ) video_ad_unit = VideoAdUnit.objects.create( campaign=campaign, origin_or_adaptation="ORIGIN", code="ORIGIN", title="Origin", language=language, status="completed", ) for idx, row_data in enumerate(data["script_rows"]): AdUnitScriptRow.objects.create( ad_unit=video_ad_unit, order_index=idx, shot_number=row_data.get("shot_number", f"{idx + 1:02d}"), timecode=row_data.get("timecode_start", ""), visual_text=row_data["visual_text"], audio_text=row_data["audio_text"], ) messages.success( request, f"Created Campaign '{campaign.script_title}' with {len(data['script_rows'])} script rows.", ) return redirect("admin:tvspots_campaign_change", campaign.pk) except Exception as e: messages.error(request, f"Failed to create Campaign: {e}") return redirect("admin:tvspots_campaign_import") # Render import form return TemplateResponse( request, "admin/tvspots/campaign/import_campaign.html", { **self.admin_site.each_context(request), "title": _("Import Campaign from JSON"), "opts": self.model._meta, }, )
def _validate_campaign_json(self, data: dict) -> list: """Validate JSON against expected schema. Returns list of errors.""" errors = [] required = ["client_name", "script_title", "job_id", "script_rows"] for field in required: if field not in data: errors.append(f"Missing required field: {field}") if errors: return errors if not isinstance(data["script_rows"], list): errors.append("script_rows must be an array") return errors if len(data["script_rows"]) == 0: errors.append("script_rows must have at least one row") for idx, row in enumerate(data["script_rows"]): if not isinstance(row, dict): errors.append(f"script_rows[{idx}] must be an object") continue if "visual_text" not in row or not row["visual_text"]: errors.append(f"script_rows[{idx}] missing or empty visual_text") if "audio_text" not in row or not row["audio_text"]: errors.append(f"script_rows[{idx}] missing or empty audio_text") return errors @action( description=_("Import Campaign"), url_path="import-campaign-action", ) def import_campaign_action(self, request): """Redirect to the import campaign view.""" return redirect("admin:tvspots_campaign_import") @action( description=_("Create Origin Ad Unit"), url_path="create-origin-ad-unit-action", ) def create_origin_ad_unit_action(self, request, object_id): """Redirect to the create origin ad unit view.""" return redirect("admin:tvspots_campaign_create_origin", object_id)
[docs] def create_origin_ad_unit_view(self, request, object_id): """Handle creating an origin ad unit for a campaign.""" import json from django.template.response import TemplateResponse from cw.audiences.models import Language campaign = Campaign.objects.get(pk=object_id) if request.method == "POST": from cw.audiences.models import Persona title = request.POST.get("title") code = request.POST.get("code") language_id = request.POST.get("language") brand_id = request.POST.get("brand") persona_id = request.POST.get("persona") script_data_raw = request.POST.get("script_data", "") # Validate required fields if not title or not code: messages.error(request, "Please provide both title and code.") return redirect("admin:tvspots_campaign_create_origin", object_id) # Get the dimension objects language = Language.objects.filter(pk=language_id).first() if language_id else None brand = Brand.objects.filter(pk=brand_id).first() if brand_id else campaign.brand persona = Persona.objects.filter(pk=persona_id).first() if persona_id else None # Parse script data if provided script_data = None if script_data_raw: try: script_data = json.loads(script_data_raw) except json.JSONDecodeError: messages.error(request, "Invalid JSON in script data.") return redirect("admin:tvspots_campaign_create_origin", object_id) # Check for existing origin with same code existing_origin = campaign.ad_units.filter( origin_or_adaptation="ORIGIN", code=code ).first() if existing_origin: messages.warning( request, f"An origin ad unit with code '{code}' already exists.", ) return redirect("admin:tvspots_videoadunit_change", existing_origin.pk) # Create VideoAdUnit for origin origin = VideoAdUnit.objects.create( campaign=campaign, origin_or_adaptation="ORIGIN", code=code, title=title, language=language, brand=brand, persona=persona, use_pipeline=False, # Origins don't use pipeline status="draft", ) # Create script rows if script data provided if script_data and "scenes" in script_data: for scene in script_data["scenes"]: AdUnitScriptRow.objects.create( ad_unit=origin, shot_number=scene.get("scene_number", 0), visual_description=scene.get("visual", ""), audio_voiceover=scene.get("audio", {}).get("voiceover", ""), audio_music=scene.get("audio", {}).get("music", ""), audio_sfx=scene.get("audio", {}).get("sfx", ""), notes=scene.get("action", ""), ) messages.success( request, f"Origin ad unit '{title}' created successfully.", ) return redirect("admin:tvspots_videoadunit_change", origin.pk) # Get available dimensions from cw.audiences.models import Persona languages = Language.objects.filter(is_active=True).select_related("primary_model").order_by("name") brands = Brand.objects.filter(is_active=True).order_by("name") personas = Persona.objects.filter(is_active=True).order_by("name") return TemplateResponse( request, "admin/tvspots/campaign/create_origin.html", { **self.admin_site.each_context(request), "title": _("Create Origin Ad Unit"), "opts": self.model._meta, "campaign": campaign, "languages": languages, "brands": brands, "personas": personas, "campaign_brand_id": campaign.brand_id, }, )
@action( description=_("Create Adaptation"), url_path="create-adaptation-action", permissions=["create_adaptation_action"], ) def create_adaptation_action(self, request, object_id): """Redirect to the create adaptation view.""" return redirect("admin:tvspots_campaign_create_adaptation", object_id)
[docs] def has_create_adaptation_action_permission(self, request, object_id=None): """Only show button if there's an origin video ad unit.""" if object_id: try: campaign = Campaign.objects.get(pk=object_id) return campaign.ad_units.filter(origin_or_adaptation="ORIGIN").exists() except Campaign.DoesNotExist: return False return False
@action( description=_("Bulk Upload Videos"), url_path="bulk-upload-videos-action", ) def bulk_upload_videos_action(self, request, object_id): """Redirect to the bulk upload videos view.""" return redirect("admin:tvspots_campaign_bulk_upload_videos", object_id)
[docs] def create_adaptation_view(self, request, object_id): """Handle creating an adaptation of a campaign.""" import json from django.template.response import TemplateResponse from cw.audiences.models import Language from cw.core.models import LLMModel from .tasks import create_adaptation_task campaign = Campaign.objects.get(pk=object_id) origin_ad_unit = campaign.ad_units.filter(origin_or_adaptation="ORIGIN").first() if not origin_ad_unit: messages.error(request, "No origin ad unit found for this campaign.") return redirect("admin:tvspots_campaign_change", object_id) if request.method == "POST": from cw.audiences.models import Country, Persona, Region region_id = request.POST.get("region") country_id = request.POST.get("country") language_id = request.POST.get("language") llm_model_id = request.POST.get("llm_model") brand_id = request.POST.get("brand") persona_id = request.POST.get("persona") # Validate required fields if not region_id or not language_id: messages.error(request, "Please select a region and language.") return redirect("admin:tvspots_campaign_create_adaptation", object_id) # Get the dimension objects region = Region.objects.get(pk=region_id) country = Country.objects.filter(pk=country_id).first() if country_id else None language = Language.objects.get(pk=language_id) llm_model = LLMModel.objects.filter(pk=llm_model_id).first() if llm_model_id else None brand = Brand.objects.filter(pk=brand_id).first() if brand_id else None persona = Persona.objects.filter(pk=persona_id).first() if persona_id else None # Build per-node model config from form pipeline_model_config = {} node_keys = ["concept", "culture", "format_gate", "culture_gate", "concept_gate", "brand_gate"] for key in node_keys: model_pk = request.POST.get(f"model_{key}") if model_pk: pipeline_model_config[key] = int(model_pk) # Check for pending/processing adaptation with same dimensions pending_adaptation = campaign.ad_units.filter( region=region, country=country, language=language, status__in=["pending", "processing"] ).first() if pending_adaptation: target_desc = f"{region.name}" if country: target_desc += f" / {country.name}" target_desc += f" ({language.code})" messages.warning( request, f"An adaptation to {target_desc} is already in progress " f"(status: {pending_adaptation.get_status_display()}).", ) return redirect("admin:tvspots_videoadunit_change", pending_adaptation.pk) # Generate code for adaptation code_parts = [] if country: code_parts.append(country.code.upper()) elif region: code_parts.append(region.code.upper()) code_parts.append(language.code.upper()) code = "-".join(code_parts) # Generate title title_parts = [] if country: title_parts.append(country.name) else: title_parts.append(region.name) title_parts.append(f"({language.code})") title = " ".join(title_parts) # Create VideoAdUnit for adaptation (always use pipeline) adaptation = VideoAdUnit.objects.create( campaign=campaign, origin_or_adaptation="ADAPTATION", code=code, title=title, region=region, country=country, language=language, llm_model=llm_model, brand=brand, persona=persona, pipeline_model_config=pipeline_model_config, source_ad_unit=origin_ad_unit, use_pipeline=True, # Always use multi-agent pipeline status="pending", ) # Queue the adaptation task result = create_adaptation_task.apply_async( args=[adaptation.pk], queue="default" ) # Store the Celery task ID adaptation.celery_task_id = result.id adaptation.save(update_fields=["celery_task_id"]) target_desc = f"{region.name}" if country: target_desc += f" / {country.name}" target_desc += f" ({language.code})" messages.success( request, f"Adaptation to {target_desc} queued for processing.", ) return redirect("admin:tvspots_campaign_change", object_id) # Get available dimensions from cw.audiences.models import Country, CountryLanguage, CountryRegion, Persona, Region regions = Region.objects.filter(is_active=True).order_by("name") countries = Country.objects.filter(is_active=True).order_by("name") languages = Language.objects.filter(is_active=True).select_related("primary_model").order_by("name") brands = Brand.objects.filter(is_active=True).order_by("name") personas = Persona.objects.filter(is_active=True).order_by("name") llm_models = LLMModel.objects.filter(is_active=True).order_by("name") # Build region → countries mapping region_countries = {} for cr in CountryRegion.objects.select_related("region", "country"): if cr.region_id not in region_countries: region_countries[cr.region_id] = [] region_countries[cr.region_id].append(cr.country_id) # Build country → languages mapping (with primary languages first) country_languages = {} for cl in CountryLanguage.objects.select_related("country", "language").order_by("-is_primary"): if cl.country_id not in country_languages: country_languages[cl.country_id] = [] country_languages[cl.country_id].append(cl.language_id) # Get pending adaptations to show pending_adaptations = campaign.ad_units.filter( status__in=["pending", "processing"] ).select_related("region", "country", "language") # Node model fields for template iteration node_model_fields = [ ("concept", _("Concept Analyst Model")), ("culture", _("Cultural Researcher Model")), ("format_gate", _("Format Gate Model")), ("culture_gate", _("Culture Gate Model")), ("concept_gate", _("Concept Gate Model")), ("brand_gate", _("Brand Gate Model")), ] return TemplateResponse( request, "admin/tvspots/campaign/create_adaptation.html", { **self.admin_site.each_context(request), "title": _("Create Adaptation"), "opts": self.model._meta, "campaign": campaign, "origin_ad_unit": origin_ad_unit, "regions": regions, "countries": countries, "languages": languages, "brands": brands, "personas": personas, "llm_models": llm_models, "campaign_brand_id": campaign.brand_id, "node_model_fields": node_model_fields, "pending_adaptations": pending_adaptations, "region_countries_json": json.dumps(region_countries), "country_languages_json": json.dumps(country_languages), }, )
[docs] def bulk_upload_videos_view(self, request, object_id): """Handle bulk upload of multiple video files for a campaign.""" from django.template.response import TemplateResponse from django.core.exceptions import ValidationError as DjangoValidationError from cw.lib.security import VideoFileValidator campaign = Campaign.objects.get(pk=object_id) # Initialize validator with settings-based configuration validator = VideoFileValidator() if request.method == "POST": video_files = request.FILES.getlist('video_files') if not video_files: messages.error(request, "Please select at least one video file to upload.") return redirect("admin:tvspots_campaign_bulk_upload_videos", object_id) # Validate all files using the comprehensive security validator validation_results = validator.validate_multiple(video_files) # Separate valid and invalid files valid_videos = [] errors = [] for video_file in video_files: file_errors = validation_results.get(video_file.name, []) if file_errors: # File failed validation for error in file_errors: errors.append(f"{video_file.name}: {error}") else: # File passed validation valid_videos.append(video_file) # Report validation errors if errors: for error in errors: messages.error(request, error) # Process valid videos if valid_videos: from .tasks import analyze_video_task created_media = [] for video_file in valid_videos: # Create AdUnitMedia instance media = AdUnitMedia.objects.create( campaign=campaign, video_file=video_file, status="uploaded", file_size=video_file.size, ) created_media.append(media) # Queue processing task and store task ID result = analyze_video_task.apply_async(args=[media.pk], queue="default") media.celery_task_id = result.id media.save(update_fields=["celery_task_id"]) # Report success messages.success( request, f"Successfully uploaded and queued {len(created_media)} video(s) for processing. " f"Skipped {len(errors)} file(s) due to validation errors." if errors else f"Successfully uploaded and queued {len(created_media)} video(s) for processing." ) return redirect("admin:tvspots_campaign_change", object_id) else: messages.error( request, "No valid videos were uploaded. Please check the validation errors above." ) return redirect("admin:tvspots_campaign_bulk_upload_videos", object_id) # Render upload form from django.conf import settings as django_settings max_size_mb = getattr( django_settings, "VIDEO_MAX_UPLOAD_SIZE_BYTES", 500 * 1024 * 1024 ) / (1024 * 1024) allowed_exts = getattr( django_settings, "VIDEO_ALLOWED_EXTENSIONS", [".mp4", ".mov", ".avi", ".mkv", ".webm"], ) return TemplateResponse( request, "admin/tvspots/campaign/bulk_upload_videos.html", { **self.admin_site.each_context(request), "title": _("Bulk Upload Videos"), "opts": self.model._meta, "campaign": campaign, "max_file_size_mb": max_size_mb, "allowed_extensions": ", ".join(allowed_exts), }, )
@action( description=_("Export Campaign with Results"), url_path="export-campaign-action", ) def export_campaign_action(self, request, object_id): """Export entire campaign with all ad unit media and processing results as JSON.""" from datetime import datetime from cw.lib.export import create_json_response, export_campaign_with_results campaign = Campaign.objects.prefetch_related( "ad_unit_media__result__key_frames", "ad_unit_media__video_ad_unit", ).get(pk=object_id) # Generate filename with job_id and timestamp timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") safe_job_id = campaign.job_id.replace("/", "-").replace(" ", "_") filename = f"campaign-{safe_job_id}-{timestamp}" # Export data data = export_campaign_with_results(campaign) return create_json_response(data, filename, pretty=True)
[docs] def language_models_api(self, request, language_id): """API endpoint to fetch available LLM models for a language.""" from django.http import JsonResponse from cw.audiences.models import Language from cw.core.models import LLMModel try: language = Language.objects.get(pk=language_id) except Language.DoesNotExist: return JsonResponse({"error": "Language not found"}, status=404) # Get primary model and alternative models models = [] if language.primary_model: models.append({ "id": language.primary_model.id, "name": language.primary_model.name, "is_primary": True, }) # Add alternative models for alt_model in language.alternative_models.all(): models.append({ "id": alt_model.id, "name": alt_model.name, "is_primary": False, }) return JsonResponse({"models": models})
# --------------------------------------------------------------------------- # VideoAdUnit # ---------------------------------------------------------------------------
[docs] class AdUnitScriptRowInline(TabularInline): """Inline display of script rows for VideoAdUnit.""" model = AdUnitScriptRow tab = True extra = 0 fields = [ "shot_number", "timecode", "visual_text", "audio_text", ] ordering = ["order_index"]
[docs] @admin.register(VideoAdUnit) class VideoAdUnitAdmin(ModelAdmin): list_display = [ "show_id", "show_campaign", "code", "title", "origin_or_adaptation", "show_target", "show_pipeline", "show_pipeline_progress", "show_status", ] list_filter = ["origin_or_adaptation", "status", "use_pipeline", "region", "country", "language"] search_fields = ["campaign__script_title", "code", "title", "error_message"] readonly_fields = [ "campaign", "ad_unit_type", "code", "title", "origin_or_adaptation", "persona", "region", "country", "language", "llm_model", "brand", "source_ad_unit", "status", "celery_task_id", "error_message", "created_at", "started_at", "completed_at", "updated_at", "concept_brief", "cultural_brief", "evaluation_history", "pipeline_metadata", "pipeline_model_config", ] inlines = [AdUnitScriptRowInline] actions_detail = ["view_storyboard_action", "generate_storyboard_action"] actions = ["retry_failed_adaptations"] fieldsets = ( ( _("Core"), { "classes": ["tab"], "fields": ( "campaign", "ad_unit_type", "origin_or_adaptation", ("code", "title"), ), }, ), ( _("Adaptation Target"), { "classes": ["tab"], "fields": ( "source_ad_unit", "persona", ("region", "country"), ("language", "llm_model"), "brand", "use_pipeline", ), }, ), ( _("Video Settings"), { "classes": ["tab"], "fields": ( "duration", "visual_style_prompt", ), }, ), ( _("Status"), { "classes": ["tab"], "fields": ( "status", "celery_task_id", "error_message", ), }, ), ( _("Timing"), { "classes": ["tab"], "fields": ("created_at", "started_at", "completed_at", "updated_at"), }, ), ( _("Concept Brief"), { "classes": ["tab"], "fields": ("concept_brief",), "description": "Output of the concept extraction pipeline node.", }, ), ( _("Cultural Brief"), { "classes": ["tab"], "fields": ("cultural_brief",), "description": "Output of the cultural research pipeline node.", }, ), ( _("Evaluation History"), { "classes": ["tab"], "fields": ("evaluation_history",), "description": "Chronological evaluation results from pipeline review nodes.", }, ), ( _("Pipeline Metadata"), { "classes": ["tab"], "fields": ("pipeline_metadata", "pipeline_model_config"), "description": "Models used, revision counts, and timing per pipeline phase.", }, ), )
[docs] @display(description=_("ID")) def show_id(self, obj): return f"#{obj.pk}"
[docs] @display(description=_("Campaign")) def show_campaign(self, obj): return obj.campaign.script_title
[docs] @display(description=_("Target")) def show_target(self, obj): if obj.origin_or_adaptation == "ORIGIN": return "—" parts = [] if obj.region: parts.append(obj.region.name) if obj.country: parts.append(obj.country.name) if obj.language: parts.append(f"({obj.language.code})") return " / ".join(parts) if parts else "—"
[docs] @display(description=_("Pipeline"), boolean=True) def show_pipeline(self, obj): return obj.use_pipeline
[docs] @display(description=_("Progress")) def show_pipeline_progress(self, obj): """Display visual progress bar for pipeline stages.""" # Only show for pipeline adaptations if not obj.use_pipeline or obj.origin_or_adaptation == "ORIGIN": return "—" # Define pipeline stages and their order stages = [ ("pending", "Queue"), ("concept_analysis", "Concept"), ("cultural_analysis", "Culture"), ("writing", "Writer"), ("format_evaluation", "Format"), ("cultural_evaluation", "Review"), ("brand_evaluation", "Brand"), ("completed", "Done"), ] # Map status to stage index status_map = { "pending": 0, "processing": 0, "concept_analysis": 1, "cultural_analysis": 2, "writing": 3, "revising": 3, "format_evaluation": 4, "cultural_evaluation": 5, "concept_evaluation": 5, "brand_evaluation": 6, "completed": 7, "failed": -1, } current_stage = status_map.get(obj.status, 0) # Handle failed status if obj.status == "failed": return mark_safe( '<div class="pipeline-progress">' '<span class="stage failed">✗ Failed</span>' '</div>' ) # Build progress bar HTML html_parts = ['<div class="pipeline-progress">'] for idx, (stage_key, stage_label) in enumerate(stages): if idx < current_stage: css_class = "stage completed" icon = "✓" elif idx == current_stage: css_class = "stage active" icon = "●" else: css_class = "stage pending" icon = "○" html_parts.append( f'<span class="{css_class}" title="{stage_label}">{icon}</span>' ) # Add connector between stages (except after last) if idx < len(stages) - 1: connector_class = "connector completed" if idx < current_stage else "connector" html_parts.append(f'<span class="{connector_class}">─</span>') html_parts.append('</div>') return mark_safe(''.join(html_parts))
show_pipeline_progress.allow_tags = True
[docs] @display( description=_("Status"), label={ "Pending": "info", "Processing": "warning", "Completed": "success", "Failed": "danger", "Concept Analysis": "warning", "Cultural Analysis": "warning", "Writing": "warning", "Format Evaluation": "warning", "Cultural Evaluation": "warning", "Concept Evaluation": "warning", "Evaluating Brand": "warning", "Revising": "warning", }, ) def show_status(self, obj): return obj.get_status_display()
@action(description=_("Retry selected failed adaptations")) def retry_failed_adaptations(self, request, queryset): """Retry failed adaptation video ad units by creating new ones with same parameters.""" from .tasks import create_adaptation_task failed_adaptations = queryset.filter(status="failed", origin_or_adaptation="ADAPTATION") if not failed_adaptations.exists(): self.message_user( request, _("No failed adaptations selected. Please select adaptations with 'Failed' status."), level=messages.WARNING, ) return retried_count = 0 for adaptation in failed_adaptations: # Create new adaptation with same parameters new_adaptation = VideoAdUnit.objects.create( campaign=adaptation.campaign, origin_or_adaptation="ADAPTATION", code=adaptation.code, title=adaptation.title, region=adaptation.region, country=adaptation.country, language=adaptation.language, llm_model=adaptation.llm_model, source_ad_unit=adaptation.source_ad_unit, use_pipeline=adaptation.use_pipeline, status="pending", ) # Queue the task task = create_adaptation_task.apply_async( args=[new_adaptation.pk], queue="default", ) new_adaptation.celery_task_id = task.id new_adaptation.save(update_fields=["celery_task_id"]) retried_count += 1 self.message_user( request, _(f"Created {retried_count} new adaptation(s) from failed adaptations."), level=messages.SUCCESS, )
[docs] def get_urls(self): """Add custom URLs for storyboard generation and viewing.""" urls = super().get_urls() custom_urls = [ path( "<int:object_id>/generate-storyboard/", self.admin_site.admin_view(self.generate_storyboard_view), name="tvspots_videoadunit_generate_storyboard", ), path( "<int:object_id>/storyboard/", self.admin_site.admin_view(self.storyboard_view), name="tvspots_videoadunit_storyboard", ), ] return custom_urls + urls
@action( description=_("View Storyboard"), url_path="view-storyboard-action", permissions=["view_storyboard_action"], ) def view_storyboard_action(self, request, object_id): """Redirect to the storyboard viewer.""" return redirect("admin:tvspots_videoadunit_storyboard", object_id)
[docs] def has_view_storyboard_action_permission(self, request, object_id=None): """Only show button if there are storyboard images.""" if object_id: try: video_ad_unit = VideoAdUnit.objects.get(pk=object_id) # Check if there are any storyboards with completed images return video_ad_unit.storyboards.filter( images__diffusion_job__status="completed" ).exists() except VideoAdUnit.DoesNotExist: return False return False
@action( description=_("Generate Storyboard"), url_path="generate-storyboard-action", permissions=["generate_storyboard_action"], ) def generate_storyboard_action(self, request, object_id): """Redirect to the generate storyboard view.""" return redirect("admin:tvspots_videoadunit_generate_storyboard", object_id)
[docs] def has_generate_storyboard_action_permission(self, request, object_id=None): """Only show button if there are script rows.""" if object_id: try: video_ad_unit = VideoAdUnit.objects.get(pk=object_id) return video_ad_unit.script_rows.exists() except VideoAdUnit.DoesNotExist: return False return False
[docs] def generate_storyboard_view(self, request, object_id): """Handle generating a storyboard for a video ad unit. Supports two modes: - **text** (default): generates from script row descriptions with optional LLM enhancement - **keyframe**: uses ControlNet with extracted video keyframes for wireframe generation """ from django.template.response import TemplateResponse from cw.diffusion.models import ControlNetModel, DiffusionModel, LoraModel video_ad_unit = VideoAdUnit.objects.get(pk=object_id) if not video_ad_unit.script_rows.exists(): messages.error(request, "No script rows found for this video ad unit.") return redirect("admin:tvspots_videoadunit_change", object_id) # Check if keyframes are available (for wireframe mode) source_media = getattr(video_ad_unit, "source_media", None) has_keyframes = ( source_media and source_media.result and source_media.result.key_frames.exists() ) if request.method == "POST": source_type = request.POST.get("source_type", "text") model_id = request.POST.get("diffusion_model") lora_id = request.POST.get("lora_model") or None images_per_row = int(request.POST.get("images_per_row", 1)) if not model_id: messages.error(request, "Please select a diffusion model.") return redirect("admin:tvspots_videoadunit_generate_storyboard", object_id) # Build storyboard creation kwargs storyboard_kwargs = { "video_ad_unit": video_ad_unit, "diffusion_model_id": model_id, "lora_model_id": lora_id, "images_per_row": images_per_row, "source_type": source_type, "status": "pending", } if source_type == "keyframe": # ControlNet wireframe mode controlnet_id = request.POST.get("controlnet_model") if not controlnet_id: messages.error(request, "Please select a ControlNet model for wireframe mode.") return redirect("admin:tvspots_videoadunit_generate_storyboard", object_id) storyboard_kwargs["controlnet_model_id"] = controlnet_id storyboard_kwargs["preprocessing_type"] = request.POST.get( "preprocessing_type", "" ) cond_scale = request.POST.get("conditioning_scale") if cond_scale: storyboard_kwargs["conditioning_scale"] = float(cond_scale) guidance_end = request.POST.get("control_guidance_end") if guidance_end: storyboard_kwargs["control_guidance_end"] = float(guidance_end) storyboard_kwargs["style_prompt"] = request.POST.get("style_prompt", "") storyboard = Storyboard.objects.create(**storyboard_kwargs) # Queue the appropriate task if source_type == "keyframe": from .tasks import generate_wireframe_storyboard_task generate_wireframe_storyboard_task.apply_async( args=[storyboard.pk], queue="default" ) keyframe_count = source_media.result.key_frames.count() total_images = keyframe_count * images_per_row messages.success( request, f"Wireframe storyboard queued ({total_images} images from " f"{keyframe_count} keyframes). Check the Storyboard page for progress.", ) else: from .tasks import generate_storyboard_task enhance_prompts = request.POST.get("enhance_prompts") == "on" generate_storyboard_task.apply_async( args=[storyboard.pk, enhance_prompts], queue="default" ) total_images = video_ad_unit.script_rows.count() * images_per_row messages.success( request, f"Storyboard generation queued ({total_images} images). " f"Check the Storyboard page for progress.", ) return redirect("admin:tvspots_storyboard_change", storyboard.pk) # Get available models, LoRAs, and ControlNets models = DiffusionModel.objects.filter(is_active=True) loras = LoraModel.objects.filter(is_active=True) controlnets = ControlNetModel.objects.filter(is_active=True) existing_storyboards = video_ad_unit.storyboards.all().select_related("diffusion_model") return TemplateResponse( request, "admin/tvspots/videoadunit/generate_storyboard.html", { **self.admin_site.each_context(request), "title": _("Generate Storyboard"), "opts": self.model._meta, "video_ad_unit": video_ad_unit, "models": models, "loras": loras, "controlnets": controlnets, "has_keyframes": has_keyframes, "existing_storyboards": existing_storyboards, "lora_compat_url": reverse("admin:diffusion_diffusionjob_compatible_loras"), }, )
[docs] def storyboard_view(self, request, object_id): """Display the storyboard viewer for a video ad unit.""" import os from django.conf import settings as django_settings from django.template.response import TemplateResponse video_ad_unit = VideoAdUnit.objects.get(pk=object_id) # Get the most recent storyboard storyboard = video_ad_unit.storyboards.order_by("-created_at").first() # Build frame data from storyboard images frames = [] completed_count = 0 processing_count = 0 pending_count = 0 if storyboard: for image in ( storyboard.images.all() .select_related("script_row", "diffusion_job", "key_frame") .order_by("script_row__order_index", "image_index") ): diffusion_job = image.diffusion_job script_row = image.script_row # Get image URL if completed image_url = None if diffusion_job.result_images: # Get the first image path img_path = diffusion_job.result_images[0] image_url = os.path.join(django_settings.MEDIA_URL, img_path) # Get keyframe thumbnail URL (for wireframe storyboards) keyframe_url = None if image.key_frame and image.key_frame.image: keyframe_url = image.key_frame.image.url # Track status counts if diffusion_job.status == "completed": completed_count += 1 elif diffusion_job.status == "processing": processing_count += 1 else: pending_count += 1 frames.append( { "shot_number": script_row.shot_number or f"{script_row.order_index + 1:02d}", "visual_text": script_row.visual_text, "audio_text": script_row.audio_text, "image_url": image_url, "keyframe_url": keyframe_url, "status": diffusion_job.status, "image_index": image.image_index, } ) return TemplateResponse( request, "admin/tvspots/videoadunit/storyboard_view.html", { **self.admin_site.each_context(request), "title": _("Storyboard"), "opts": self.model._meta, "video_ad_unit": video_ad_unit, "storyboard": storyboard, "frames": frames, "completed_count": completed_count, "processing_count": processing_count, "pending_count": pending_count, }, )
# --------------------------------------------------------------------------- # Storyboard # ---------------------------------------------------------------------------
[docs] class StoryboardImageInline(TabularInline): """Inline display of images for Storyboard.""" model = StoryboardImage tab = True extra = 0 fields = [ "script_row", "image_index", "show_key_frame_thumbnail", "diffusion_job", "show_status", ] readonly_fields = [ "script_row", "image_index", "show_key_frame_thumbnail", "diffusion_job", "show_status", ] can_delete = False show_change_link = True
[docs] def has_add_permission(self, request, obj=None): return False
[docs] @display(description=_("Status")) def show_status(self, obj): return obj.diffusion_job.get_status_display() if obj.diffusion_job else "—"
[docs] @display(description=_("Source Keyframe")) def show_key_frame_thumbnail(self, obj): if obj.key_frame and obj.key_frame.image: return format_html( '<img src="{}" style="max-width: 80px; max-height: 50px; border-radius: 4px;">', obj.key_frame.image.url, ) return "—"
[docs] @admin.register(Storyboard) class StoryboardAdmin(ModelAdmin): list_display = [ "show_id", "show_video_ad_unit", "show_source_type", "diffusion_model", "lora_model", "images_per_row", "show_status", ] list_filter = ["status", "source_type", "diffusion_model", "video_ad_unit__campaign"] search_fields = ["video_ad_unit__campaign__script_title", "video_ad_unit__code"] readonly_fields = ["created_at", "completed_at"] inlines = [StoryboardImageInline] fieldsets = ( ( _("Configuration"), { "classes": ["tab"], "fields": ( "video_ad_unit", "source_type", ("diffusion_model", "lora_model"), "images_per_row", ), }, ), ( _("ControlNet"), { "classes": ["tab"], "description": "Settings for keyframe-based wireframe generation. " "Only used when Source Type is 'Keyframe (ControlNet)'.", "fields": ( "controlnet_model", "preprocessing_type", ("conditioning_scale", "control_guidance_end"), "style_prompt", ), }, ), ( _("Status"), { "classes": ["tab"], "fields": ("status", "error_message"), }, ), ( _("Timing"), { "classes": ["tab"], "fields": ("created_at", "completed_at"), }, ), )
[docs] @display(description=_("ID")) def show_id(self, obj): return f"#{obj.pk}"
[docs] @display(description=_("Video Ad Unit")) def show_video_ad_unit(self, obj): return f"{obj.video_ad_unit.campaign.script_title} / {obj.video_ad_unit.code}"
[docs] @display( description=_("Source"), label={ "Text (Script Rows)": "info", "Keyframe (ControlNet)": "warning", }, ) def show_source_type(self, obj): return obj.get_source_type_display()
[docs] @display( description=_("Status"), label={ "Pending": "info", "Processing": "warning", "Completed": "success", "Failed": "danger", }, ) def show_status(self, obj): return obj.get_status_display()
[docs] def formfield_for_foreignkey(self, db_field, request, **kwargs): """Filter ControlNet models to only show active ones.""" if db_field.name == "controlnet_model": from cw.diffusion.models import ControlNetModel kwargs["queryset"] = ControlNetModel.objects.filter(is_active=True) return super().formfield_for_foreignkey(db_field, request, **kwargs)
[docs] def save_model(self, request, obj, form, change): """Auto-queue new storyboards on save.""" is_new = obj.pk is None super().save_model(request, obj, form, change) if is_new and obj.status == "pending": if obj.source_type == "keyframe": from .tasks import generate_wireframe_storyboard_task generate_wireframe_storyboard_task.apply_async( args=[obj.pk], queue="default" ) else: from .tasks import generate_storyboard_task generate_storyboard_task.apply_async( args=[obj.pk, True], queue="default" )
# --------------------------------------------------------------------------- # Ad Unit Media (Video Origin Extraction) # ---------------------------------------------------------------------------
[docs] class KeyFrameInline(TabularInline): """Inline display of key frames for VideoProcessingResult.""" model = KeyFrame tab = True extra = 0 fields = ["scene_number", "timestamp", "show_thumbnail", "show_objects_count"] readonly_fields = ["scene_number", "timestamp", "show_thumbnail", "show_objects_count"] can_delete = False
[docs] def has_add_permission(self, request, obj=None): return False
[docs] @display(description=_("Thumbnail")) def show_thumbnail(self, obj): if obj.image: return format_html( '<img src="{}" style="max-width: 120px; max-height: 80px; border-radius: 4px;">', obj.image.url, ) return "-"
[docs] @display(description=_("Objects")) def show_objects_count(self, obj): return len(obj.detected_objects) if obj.detected_objects else 0
[docs] @admin.register(VideoProcessingResult) class VideoProcessingResultAdmin(ModelAdmin): """Admin for video processing results with editable script.""" list_display = ["id", "show_media", "show_scene_count", "processing_time", "created_at"] search_fields = ["media__campaign__script_title", "media__campaign__client_name"] readonly_fields = [ "show_video_player", "scenes", "transcription", "visual_style", "objects_summary", "sentiment_analysis", "categories", "audience_insights", "processing_time", "models_used", "created_at", "updated_at", ] inlines = [KeyFrameInline] actions_detail = ["approve_script_action", "reject_script_action", "export_result_action"] actions = ["export_results_action"] fieldsets = ( ( _("Video Player"), { "classes": ["tab"], "fields": ("show_video_player",), }, ), ( _("Overview"), { "classes": ["tab"], "fields": ("processing_time", "models_used"), }, ), ( _("Edit Script"), { "classes": ["tab"], "fields": ("script",), "description": "Edit the generated script. Changes will be saved when you click Save.", }, ), ( _("Scenes"), { "classes": ["tab"], "fields": ("scenes",), }, ), ( _("Transcription"), { "classes": ["tab"], "fields": ("transcription",), }, ), ( _("Visual Analysis"), { "classes": ["tab"], "fields": ("visual_style", "objects_summary"), }, ), ( _("Sentiment & Categories"), { "classes": ["tab"], "fields": ("sentiment_analysis", "categories"), }, ), ( _("Audience Insights"), { "classes": ["tab"], "fields": ("audience_insights",), }, ), ( _("Metadata"), { "classes": ["tab"], "fields": ("created_at", "updated_at"), }, ), )
[docs] def formfield_for_dbfield(self, db_field, request, **kwargs): """Use custom widget for script field.""" if db_field.name == "script": kwargs["widget"] = ScriptEditorWidget() return super().formfield_for_dbfield(db_field, request, **kwargs)
[docs] @display(description=_("Video Player")) def show_video_player(self, obj): """Display video player with scene markers.""" if hasattr(obj, "media") and obj.media and obj.media.video_file: from django.templatetags.static import static video_url = obj.media.video_file.url scenes = obj.scenes or [] # Render widget manually widget = VideoPlayerWidget(video_url=video_url, scenes=scenes) return mark_safe(widget.render("video_player", None, {})) return mark_safe('<div class="text-base-500">No video available</div>')
[docs] def has_add_permission(self, request): """Results are created by the video processing pipeline only.""" return False
[docs] def has_delete_permission(self, request, obj=None): """Allow deletion of results.""" return True
[docs] @display(description=_("Media")) def show_media(self, obj): if hasattr(obj, "media") and obj.media: url = reverse("admin:tvspots_adunitmedia_change", args=[obj.media.pk]) return format_html('<a href="{}">{}</a>', url, obj.media) return "-"
[docs] @display(description=_("Scenes")) def show_scene_count(self, obj): return len(obj.scenes) if obj.scenes else 0
@action(description=_("Approve Script")) def approve_script_action(self, request, object_id): """Approve the edited script and mark media as reviewed.""" result = VideoProcessingResult.objects.get(pk=object_id) if hasattr(result, "media") and result.media: if result.media.status != "completed": messages.error( request, "Cannot approve script for media that is not in completed status.", ) return redirect( reverse("admin:tvspots_videoprocessingresult_change", args=[object_id]) ) # Update media status to reviewed result.media.status = "reviewed" result.media.save(update_fields=["status"]) messages.success( request, f"Script approved for {result.media}. You can now create an origin VideoAdUnit.", ) else: messages.warning(request, "No associated media found.") return redirect( reverse("admin:tvspots_videoprocessingresult_change", args=[object_id]) ) @action(description=_("Reject Script")) def reject_script_action(self, request, object_id): """Reject the script and reset media status for reprocessing.""" result = VideoProcessingResult.objects.get(pk=object_id) if hasattr(result, "media") and result.media: # Reset media status to uploaded for reprocessing result.media.status = "uploaded" result.media.processing_error = "Script rejected - needs reprocessing" result.media.save(update_fields=["status", "processing_error"]) messages.warning( request, f"Script rejected for {result.media}. Status reset to 'uploaded' for reprocessing.", ) else: messages.warning(request, "No associated media found.") return redirect( reverse("admin:tvspots_videoprocessingresult_change", args=[object_id]) ) @action(description=_("Export as JSON")) def export_result_action(self, request, object_id): """Export a single VideoProcessingResult as JSON.""" from datetime import datetime from cw.lib.export import create_json_response, export_video_processing_result result = VideoProcessingResult.objects.select_related("media").get(pk=object_id) # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = f"result-{result.pk}-{timestamp}" # Export data data = export_video_processing_result( result, include_keyframes=True, include_media_metadata=True, ) return create_json_response(data, filename, pretty=True) @action(description=_("Export selected results as JSON")) def export_results_action(self, request, queryset): """Export multiple VideoProcessingResults as JSON.""" from datetime import datetime from cw.lib.export import create_json_response, export_video_processing_result # Export all selected results results_data = [] for result in queryset.select_related("media"): results_data.append(export_video_processing_result( result, include_keyframes=True, include_media_metadata=True, )) # Generate filename with timestamp and count timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = f"results-bulk-{len(results_data)}-{timestamp}" # Wrap in container object data = { "export_metadata": { "export_date": datetime.utcnow().isoformat(), "export_version": "1.0", "model": "VideoProcessingResult", "count": len(results_data), }, "results": results_data, } return create_json_response(data, filename, pretty=True)
[docs] @admin.register(AdUnitMedia) class AdUnitMediaAdmin(ModelAdmin): """Admin for uploaded video files awaiting processing.""" list_display = [ "id", "campaign", "show_status", "show_progress", "show_duration", "show_resolution", "created_at", ] list_filter = ["status", "created_at"] search_fields = ["campaign__script_title", "campaign__client_name"] readonly_fields = [ "status", "duration", "resolution_width", "resolution_height", "frame_rate", "audio_channels", "audio_sample_rate", "file_size", "processing_started_at", "processing_completed_at", "processing_error", "result", "video_ad_unit", "created_at", "updated_at", ] actions_detail = ["reprocess_video_action", "create_origin_ad_unit_action", "export_media_action"] actions = ["export_media_bulk_action"] fieldsets = ( ( _("Video Upload"), { "classes": ["tab"], "fields": ("campaign", "video_file"), }, ), ( _("Processing Status"), { "classes": ["tab"], "fields": ( "status", "processing_started_at", "processing_completed_at", "processing_error", ), }, ), ( _("Video Metadata"), { "classes": ["tab"], "fields": ( "duration", ("resolution_width", "resolution_height"), "frame_rate", ("audio_channels", "audio_sample_rate"), "file_size", ), }, ), ( _("Results"), { "classes": ["tab"], "fields": ("result", "video_ad_unit"), }, ), ( _("Metadata"), { "classes": ["tab"], "fields": ("created_at", "updated_at"), }, ), )
[docs] @display( description=_("Status"), label={ "Pending Upload": "info", "Uploaded": "info", "Processing": "warning", "Completed": "success", "Failed": "danger", "Reviewed": "success", }, ) def show_status(self, obj): return obj.get_status_display()
[docs] @display(description=_("Duration")) def show_duration(self, obj): if obj.duration: mins, secs = divmod(int(obj.duration), 60) return f"{mins}:{secs:02d}" return "-"
[docs] @display(description=_("Resolution")) def show_resolution(self, obj): if obj.resolution_width and obj.resolution_height: return f"{obj.resolution_width}×{obj.resolution_height}" return "-"
[docs] @display(description=_("Progress")) def show_progress(self, obj): """Display progress bar for processing tasks in list view.""" if obj.status != "processing": return "-" # Add progress bar with data attributes for JavaScript polling return mark_safe( f'<div class="video-progress-container" data-media-id="{obj.pk}" ' f'style="min-width: 150px;">' f'<div style="background: #e0e0e0; border-radius: 4px; overflow: hidden; height: 20px;">' f'<div class="progress-bar" style="width: 0%; height: 100%; background: #4CAF50; ' f'transition: width 0.3s;"></div>' f'</div>' f'<div class="progress-text" style="text-align: center; margin-top: 4px; ' f'font-size: 11px; color: #666;">Starting...</div>' f'</div>' )
[docs] def save_model(self, request, obj, form, change): """Auto-trigger processing when video is uploaded.""" is_new = obj.pk is None has_video = bool(obj.video_file) super().save_model(request, obj, form, change) # Auto-queue processing for newly uploaded videos if is_new and has_video: obj.status = "uploaded" obj.save(update_fields=["status"]) # Queue processing task and store task ID from .tasks import analyze_video_task result = analyze_video_task.apply_async(args=[obj.pk], queue="default") obj.celery_task_id = result.id obj.save(update_fields=["celery_task_id"]) messages.info( request, f"Video processing queued for {obj}. Check back in a few minutes.", )
@action(description=_("Reprocess Video")) def reprocess_video_action(self, request, object_id): """Reprocess a failed or completed video.""" media = AdUnitMedia.objects.get(pk=object_id) if media.status not in ["failed", "completed"]: messages.error( request, f"Cannot reprocess video with status '{media.get_status_display()}'. " "Only failed or completed videos can be reprocessed.", ) return redirect( reverse("admin:tvspots_adunitmedia_change", args=[object_id]) ) # Reset status and queue processing media.status = "uploaded" media.processing_error = "" media.save(update_fields=["status", "processing_error"]) from .tasks import analyze_video_task result = analyze_video_task.apply_async(args=[media.pk], queue="default") media.celery_task_id = result.id media.save(update_fields=["celery_task_id"]) messages.success(request, f"Video reprocessing queued for {media}.") return redirect(reverse("admin:tvspots_adunitmedia_change", args=[object_id])) @action(description=_("Create Origin VideoAdUnit")) def create_origin_ad_unit_action(self, request, object_id): """Create an origin VideoAdUnit from processed results with comprehensive error handling.""" import logging logger = logging.getLogger(__name__) try: media = AdUnitMedia.objects.get(pk=object_id) logger.info( f"Creating origin VideoAdUnit for AdUnitMedia {media.id}", extra={ "ad_unit_media_id": media.id, "campaign_id": media.campaign_id, "user": request.user.username, }, ) # Validation - Status check if media.status != "completed": logger.warning( f"Cannot create VideoAdUnit: invalid status '{media.status}'", extra={"ad_unit_media_id": media.id, "status": media.status}, ) messages.error( request, f"Cannot create VideoAdUnit from video with status '{media.get_status_display()}'. " "Video must be fully processed first.", ) return redirect( reverse("admin:tvspots_adunitmedia_change", args=[object_id]) ) # Validation - Duplicate check if media.video_ad_unit: logger.info( f"VideoAdUnit already exists for media {media.id}", extra={ "ad_unit_media_id": media.id, "existing_ad_unit_id": media.video_ad_unit.pk, }, ) messages.warning( request, f"VideoAdUnit already exists for this media: {media.video_ad_unit}", ) return redirect( reverse( "admin:tvspots_videoadunit_change", args=[media.video_ad_unit.pk] ) ) # Validation - Script check if not media.result: logger.error( f"No processing result found for media {media.id}", extra={"ad_unit_media_id": media.id}, ) messages.error(request, "No processing result found. Video may not have been processed yet.") return redirect( reverse("admin:tvspots_adunitmedia_change", args=[object_id]) ) if not media.result.script: logger.error( f"No script found in processing results for media {media.id}", extra={"ad_unit_media_id": media.id, "result_id": media.result.pk}, ) messages.error(request, "No script found in processing results.") return redirect( reverse("admin:tvspots_adunitmedia_change", args=[object_id]) ) # Validate script structure script_data = media.result.script if "script_rows" not in script_data and "scenes" not in script_data: logger.error( f"Invalid script structure for media {media.id}: missing script_rows/scenes", extra={"ad_unit_media_id": media.id, "script_keys": list(script_data.keys())}, ) messages.error( request, "Invalid script structure. Missing script_rows or scenes data.", ) return redirect( reverse("admin:tvspots_adunitmedia_change", args=[object_id]) ) # Create origin VideoAdUnit logger.info(f"Creating VideoAdUnit for media {media.id}") ad_unit = VideoAdUnit.objects.create( campaign=media.campaign, ad_unit_type="VIDEO", origin_or_adaptation="ORIGIN", code=f"ORIGIN-{media.id:04d}", title=script_data.get("script_title") or f"Origin from {media.campaign.script_title}", status="completed", duration=media.duration or script_data.get("total_runtime_seconds", 0), ) logger.info( f"VideoAdUnit {ad_unit.id} created for media {media.id}", extra={ "ad_unit_media_id": media.id, "video_ad_unit_id": ad_unit.id, "campaign_id": media.campaign_id, }, ) # Create script rows from generated script rows_created = 0 if "script_rows" in script_data: # New format: script_rows array for idx, row in enumerate(script_data["script_rows"]): AdUnitScriptRow.objects.create( ad_unit=ad_unit, order_index=idx, shot_number=row.get("shot_number", str(idx + 1)), visual_text=row.get("visual_text", ""), audio_text=row.get("audio_text", ""), ) rows_created += 1 elif "scenes" in script_data: # Old format: scenes array (Phase 1/2 compatibility) for idx, scene in enumerate(script_data["scenes"]): audio_dict = scene.get("audio", {}) audio_text = audio_dict.get("voiceover", "") if isinstance(audio_dict, dict) else str(audio_dict) AdUnitScriptRow.objects.create( ad_unit=ad_unit, order_index=idx, shot_number=str(scene.get("scene_number", idx + 1)), visual_text=scene.get("visual", ""), audio_text=audio_text, ) rows_created += 1 logger.info( f"Created {rows_created} script rows for VideoAdUnit {ad_unit.id}", extra={"video_ad_unit_id": ad_unit.id, "rows_created": rows_created}, ) # Link back to media media.video_ad_unit = ad_unit media.status = "reviewed" media.save(update_fields=["video_ad_unit", "status"]) logger.info( f"Origin VideoAdUnit creation complete for media {media.id}", extra={ "ad_unit_media_id": media.id, "video_ad_unit_id": ad_unit.id, "rows_created": rows_created, "user": request.user.username, }, ) messages.success( request, format_html( 'Origin VideoAdUnit created: <a href="{}">{}</a>. ' 'Created {} script row(s). <a href="{}">Create Adaptation →</a>', reverse("admin:tvspots_videoadunit_change", args=[ad_unit.pk]), ad_unit, rows_created, reverse("admin:tvspots_videoadunit_add") + f"?source_ad_unit={ad_unit.pk}", ), ) return redirect(reverse("admin:tvspots_videoadunit_change", args=[ad_unit.pk])) except AdUnitMedia.DoesNotExist: logger.error( f"AdUnitMedia {object_id} not found", extra={"ad_unit_media_id": object_id, "user": request.user.username}, ) messages.error(request, f"AdUnitMedia with ID {object_id} not found.") return redirect(reverse("admin:tvspots_adunitmedia_changelist")) except Exception as e: logger.exception( f"Unexpected error creating origin VideoAdUnit for media {object_id}", extra={ "ad_unit_media_id": object_id, "error": str(e), "user": request.user.username, }, ) messages.error( request, f"An unexpected error occurred while creating the VideoAdUnit: {str(e)}. " "Please contact support if this persists.", ) return redirect( reverse("admin:tvspots_adunitmedia_change", args=[object_id]) ) @action(description=_("Export as JSON")) def export_media_action(self, request, object_id): """Export a single AdUnitMedia with its processing result as JSON.""" from datetime import datetime from cw.lib.export import create_json_response, export_ad_unit_media_with_result media = AdUnitMedia.objects.select_related( "campaign", "result", "video_ad_unit" ).get(pk=object_id) # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = f"media-{media.pk}-{timestamp}" # Export data data = export_ad_unit_media_with_result(media) return create_json_response(data, filename, pretty=True) @action(description=_("Export selected media as JSON")) def export_media_bulk_action(self, request, queryset): """Export multiple AdUnitMedia instances with their results as JSON.""" from datetime import datetime from cw.lib.export import create_json_response, export_ad_unit_media_with_result # Export all selected media media_data = [] for media in queryset.select_related("campaign", "result", "video_ad_unit"): media_data.append(export_ad_unit_media_with_result(media)) # Generate filename with timestamp and count timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") filename = f"media-bulk-{len(media_data)}-{timestamp}" # Wrap in container object data = { "export_metadata": { "export_date": datetime.utcnow().isoformat(), "export_version": "1.0", "model": "AdUnitMedia", "count": len(media_data), }, "media": media_data, } return create_json_response(data, filename, pretty=True)
[docs] def get_urls(self): """Add custom URLs for reprocess, create, and progress actions.""" urls = super().get_urls() custom_urls = [ path( "<int:object_id>/reprocess/", self.admin_site.admin_view(self.reprocess_video_action), name="tvspots_adunitmedia_reprocess", ), path( "<int:object_id>/create-ad-unit/", self.admin_site.admin_view(self.create_origin_ad_unit_action), name="tvspots_adunitmedia_create_ad_unit", ), path( "<int:object_id>/progress/", self.admin_site.admin_view(self.progress_api), name="tvspots_adunitmedia_progress", ), ] return custom_urls + urls
[docs] def progress_api(self, request, object_id): """API endpoint to fetch current progress of video processing task.""" from django.http import JsonResponse from celery.result import AsyncResult try: media = AdUnitMedia.objects.get(pk=object_id) except AdUnitMedia.DoesNotExist: return JsonResponse({"error": "Media not found"}, status=404) # If no task ID or not processing, return current status if not media.celery_task_id or media.status not in ["processing"]: return JsonResponse({ "status": media.status, "progress": 100 if media.status == "completed" else 0, "phase": media.status, "message": media.get_status_display(), }) # Fetch task result from Celery task_result = AsyncResult(media.celery_task_id) # Handle different task states if task_result.state == "PENDING": response = { "status": "pending", "progress": 0, "phase": "queued", "message": "Task is queued...", } elif task_result.state == "PROGRESS": info = task_result.info or {} response = { "status": "processing", "progress": info.get("current", 0), "total": info.get("total", 100), "phase": info.get("phase", "unknown"), "message": info.get("status", "Processing..."), } elif task_result.state == "SUCCESS": response = { "status": "completed", "progress": 100, "phase": "completed", "message": "Processing complete", "result": task_result.result, } elif task_result.state == "FAILURE": response = { "status": "failed", "progress": 0, "phase": "failed", "message": str(task_result.info) if task_result.info else "Task failed", "error": str(task_result.info) if task_result.info else None, } else: # RETRY, REVOKED, etc. response = { "status": task_result.state.lower(), "progress": 0, "phase": task_result.state.lower(), "message": f"Task state: {task_result.state}", } return JsonResponse(response)