Celery Tasks & Queues
Reference for all Celery tasks, queue configuration, and worker behavior.
Task Reference
enhance_prompt_task
Signature |
|
Queue |
|
Arguments |
|
Returns |
|
Idempotent |
Yes — skips if prompt already has |
Enhances a diffusion prompt using a local LLM (Qwen2.5-3B-Instruct). Reads enhancement_style and creativity from the Prompt record. The enhancer instance is cached in _enhancer_cache between invocations.
generate_images_task
Signature |
|
Queue |
|
Arguments |
|
Returns |
|
Primary image generation task. Execution sequence:
Evict prompt enhancer and pipeline LLM from VRAM
Load diffusion model (cached between tasks via
_model_cache)Load LoRA if specified (auto-downloads from CivitAI if needed)
Generate N images sequentially (randomized seed per image)
Save to
media/diffusion/as JPEG (quality 95)Build comprehensive generation metadata
Unload LoRA to reset state
Status flow: pending → processing → completed | failed
download_lora_task
Signature |
|
Queue |
|
Arguments |
|
Returns |
|
Downloads a LoRA from CivitAI using the model’s AIR URN. Saves to {MODEL_BASE_PATH}/loras/civitai_{version_id}.safetensors. Skips if the file already exists.
create_adaptation_task
Signature |
|
Queue |
|
Arguments |
|
Returns |
|
Runs the multi-agent adaptation pipeline (LangGraph) for a VideoAdUnit. The pipeline executes concept extraction, cultural research, script writing, and four evaluation gates. On failure, sets the ad unit status to failed with an error message.
See Adaptation Pipeline for the full pipeline architecture.
generate_storyboard_task
Signature |
|
Queue |
|
Arguments |
|
Returns |
|
Generates storyboard frames from script rows. Execution sequence:
Evict pipeline LLM from VRAM
Generate diffusion prompts from script rows (optionally LLM-enhanced)
Clear GPU cache
Create DiffusionJob + StoryboardImage records (1280x720)
Queue each job to the
defaultqueueReturn array of job IDs for tracking
analyze_video_task
Signature |
|
Queue |
|
Arguments |
|
Returns |
|
Retries |
3 (exponential backoff: 60s x 2^attempt) |
10-phase video analysis pipeline with progress tracking via self.update_state():
Progress |
Phase |
Tool |
|---|---|---|
0–10% |
Metadata extraction |
PyAV (duration, resolution, FPS, audio channels) |
10–30% |
Scene detection |
PySceneDetect |
30–50% |
Transcription |
Whisper Large v3 |
50–70% |
Visual analysis |
YOLO v8x + OpenCV (objects, style, colors) |
70–90% |
Script generation |
Scene-to-script mapping |
90–95% |
Audience insights |
LLM (Qwen 7B) |
95–100% |
Finalization |
Save to database |
Queue Configuration
Two queues handle all tasks:
Queue |
Exchange |
Tasks |
|---|---|---|
|
|
|
|
|
|
Task routing is configured in settings.py:
CELERY_TASK_ROUTES = {
"cw.diffusion.tasks.enhance_prompt_task": {"queue": "enhancement"},
"cw.diffusion.tasks.generate_images_task": {"queue": "default"},
}
Tasks without explicit routing (TV Spots tasks) use the default queue.
Worker Configuration
Pool
Workers use the solo pool (single-threaded):
uv run celery -A cw worker -Q default
The solo pool is required because:
GPU safety — prevents concurrent model loading that would exhaust VRAM
macOS MPS —
fork()is not compatible with the Metal Performance Shaders backendSequential execution — one task at a time ensures predictable memory usage
Time Limits
Setting |
Value |
Effect |
|---|---|---|
|
3600s (1 hour) |
Hard kill — worker process terminated |
|
3300s (55 minutes) |
Raises |
Result Storage
Task results are stored in the Django ORM via django-celery-results:
Backend:
django-dbModel:
django_celery_results.TaskResultExtended results enabled (
CELERY_RESULT_EXTENDED = True)Viewable in Django admin under Celery > Task Results
Broker
Valkey/Redis on database index 2:
redis://{VALKEY_HOST}:{VALKEY_PORT}/2
Serialization is JSON-only (CELERY_TASK_SERIALIZER = "json"). Timezone is UTC.
Model Caching
Module-level caches keep models warm between task invocations, avoiding expensive reload cycles.
Cache |
Location |
Behavior |
|---|---|---|
|
|
Keeps one diffusion model loaded. Evicts the previous model when a different model is requested. |
|
|
Keeps HFPromptEnhancer (Qwen2.5-3B) loaded between enhancement tasks. |
VRAM management — before image generation, the enhancer cache and pipeline LLM are evicted:
_evict_enhancer()clears the enhancer cache_evict_pipeline_model()clears thePipelineModelLoadersingletongc.collect()+torch.cuda.empty_cache()/torch.mps.empty_cache()
This ensures only one large model occupies VRAM at a time.
Retry Behavior
Task |
Retries |
Strategy |
|---|---|---|
|
3 |
Exponential backoff: 60s x 2^attempt |
All other tasks |
0 |
No automatic retry; errors set job status to |
All tasks catch exceptions and persist error information to the database (error_message field on the job/ad-unit record).
Starting Workers
Single worker handling all queues:
uv run celery -A cw worker -Q default,enhancement
Separate workers per queue:
# Terminal 1: image generation and pipeline tasks
uv run celery -A cw worker -Q default
# Terminal 2: prompt enhancement
uv run celery -A cw worker -Q enhancement
Monitor with Flower:
uv run celery -A cw flower --port=5555
The recommended approach is ./start.sh or uv run honcho start, which launches all processes defined in the Procfile.