Validating Spot Durations Against Broadcast Standards

In broadcast traffic and advertising scheduling automation, spot duration mismatches operate as silent pipeline killers. When Avion exports deliver durations as 00:00:29.97, 30.1s, or malformed :30 strings, downstream schedulers either reject the traffic log, truncate creative assets, or trigger regulatory compliance violations. The operational remedy requires deterministic validation at ingestion, not post-scheduling patching. This guide delivers a production-grade Python automation that parses heterogeneous Avion export formats, enforces NAB/ATSC duration standards, and synchronizes compliant records to Avstar while quarantining anomalies for traffic manager review.

Pipeline Architecture Context

Within the Avion & Avstar Ingestion Pipelines, duration validation must execute strictly before log normalization and rate-card reconciliation. Traffic managers depend on rigid broadcast standard lengths: 5, 10, 15, 30, 60, and 120 seconds. Yet, Avion CSV/XML exports routinely mix HH:MM:SS, MM:SS, raw floats, and frame-count representations. Without schema enforcement, these inconsistencies cascade into Avstar’s scheduling engine, producing break overruns, black frames, and automated make-good generation failures.

Media operations teams typically encounter three primary failure modes:

  1. Frame-rate drift: 29.97fps exports rounding to 29.97 instead of 30.0
  2. Non-standard buys: Custom :45 or :90 spots that violate network clearance rules
  3. Malformed exports: Trailing whitespace, UTF-8 BOM characters, or localized decimal separators

Resolving these requires a validation layer that normalizes inputs, applies broadcast-compliant tolerances, and routes exceptions without blocking the main ingestion thread.

Schema Design and Compliance Logic

Implementing Schema Validation with Pydantic for Traffic Data provides type coercion, field-level validators, and explicit compliance gates. The schema below parses raw duration strings, calculates frame-accurate deltas against NAB standards, and flags non-compliant records with actionable error messages. Pydantic v2’s model_validator ensures cross-field state consistency before the record enters the async processing queue.

flowchart TD
    A["Parse duration<br/>HH:MM:SS or HH:MM:SS:FF"] --> B["Convert to seconds"]
    B --> C["Snap to nearest<br/>NAB frame boundary"]
    C --> D{"Within tolerance?"}
    D -->|"yes"| E["Accept"]
    D -->|"no"| F["Flag / Reject"]

Figure — Duration strings are parsed, converted to seconds, snapped to the nearest NAB standard, and accepted or flagged based on frame tolerance.

python
import asyncio
import csv
import logging
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional
from pydantic import BaseModel, Field, ValidationError, model_validator

# Structured logging configuration for broadcast ops audit trails
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler("duration_validation_audit.log", mode="a", encoding="utf-8")
    ]
)
logger = logging.getLogger("broadcast_duration_validator")

# NAB/ATSC Standard Durations (seconds)
ALLOWED_DURATIONS_SEC = {5, 10, 15, 30, 60, 120}
# Frame tolerance: ~1 frame at 29.97fps (0.03336s) rounded up for safety
FRAME_TOLERANCE_SEC = 0.034

class SpotRecord(BaseModel):
    spot_id: str = Field(..., description="Unique traffic log identifier")
    raw_duration: str = Field(..., description="Raw duration string from Avion export")
    client_code: str
    creative_title: str
    parsed_seconds: Optional[float] = Field(default=None, description="Normalized duration in seconds")
    compliance_status: str = Field(default="PENDING")
    validation_notes: List[str] = Field(default_factory=list)

    @model_validator(mode="before")
    @classmethod
    def normalize_and_validate(cls, data: dict) -> dict:
        # Guard against non-dict inputs (e.g. when re-validating an existing model)
        if not isinstance(data, dict):
            return data

        raw = str(data.get("raw_duration", "")).strip()
        # Strip BOM and normalize localized decimal separators
        raw = raw.replace("\ufeff", "").replace(",", ".")

        # validation_notes may be absent in raw input; ensure it exists before appending
        notes = list(data.get("validation_notes") or [])

        parsed = cls._parse_duration_string(raw)
        data["parsed_seconds"] = parsed

        if parsed is None:
            data["compliance_status"] = "REJECTED"
            notes.append("MALFORMED_DURATION: Unable to parse duration string")
            data["validation_notes"] = notes
            return data

        # Check against NAB/ATSC standards with frame tolerance
        closest_standard = min(ALLOWED_DURATIONS_SEC, key=lambda x: abs(x - parsed))
        delta = abs(parsed - closest_standard)

        if delta <= FRAME_TOLERANCE_SEC:
            data["parsed_seconds"] = float(closest_standard)
            data["compliance_status"] = "COMPLIANT"
        elif parsed in ALLOWED_DURATIONS_SEC:
            data["compliance_status"] = "COMPLIANT"
        else:
            data["compliance_status"] = "NON_STANDARD"
            notes.append(
                f"DEVIATION: Parsed {parsed:.3f}s deviates {delta:.3f}s from nearest standard {closest_standard}s"
            )

        data["validation_notes"] = notes
        return data

    @staticmethod
    def _parse_duration_string(raw: str) -> Optional[float]:
        # Pattern: HH:MM:SS or MM:SS with optional fractional seconds
        time_match = re.match(r"^(?:(\d{1,2}):)?(\d{1,2}):(\d{1,2})(?:\.(\d+))?$", raw)
        if time_match:
            h = int(time_match.group(1) or 0)
            m = int(time_match.group(2))
            s = int(time_match.group(3))
            frac = float(f"0.{time_match.group(4)}") if time_match.group(4) else 0.0
            return h * 3600 + m * 60 + s + frac

        # Pattern: Raw seconds (e.g., "30.1", "30", "60s")
        sec_match = re.match(r"^(\d+(?:\.\d+)?)s?$", raw)
        if sec_match:
            return float(sec_match.group(1))

        return None

Async Processing and Quarantine Routing

Production traffic systems cannot afford synchronous blocking during validation. The following implementation batches records, routes compliant spots to the Avstar sync endpoint, and isolates anomalies in a structured quarantine directory for manual traffic manager review.

python
import json
import aiohttp
from asyncio import Queue

QUARANTINE_DIR = Path("./quarantine")
QUARANTINE_DIR.mkdir(exist_ok=True)

async def process_batch(records: List[dict], avstar_endpoint: str) -> None:
    compliant_queue: Queue = Queue()
    quarantine_list: List[SpotRecord] = []

    for rec in records:
        try:
            spot = SpotRecord(**rec)
            if spot.compliance_status == "COMPLIANT":
                compliant_queue.put_nowait(spot)
            else:
                quarantine_list.append(spot)
        except ValidationError as e:
            logger.error("SCHEMA_FAILURE: %s | Data: %s", e, rec)
            quarantine_list.append(SpotRecord(
                spot_id=rec.get("spot_id", "UNKNOWN"),
                raw_duration=rec.get("raw_duration", ""),
                client_code=rec.get("client_code", "UNKNOWN"),
                creative_title=rec.get("creative_title", ""),
                compliance_status="REJECTED",
                validation_notes=[f"VALIDATION_ERROR: {e}"]
            ))

    # Sync compliant records to Avstar
    await sync_to_avstar(compliant_queue, avstar_endpoint)

    # Persist anomalies for traffic ops review
    await persist_quarantine(quarantine_list)

async def sync_to_avstar(queue: Queue, endpoint: str) -> None:
    if queue.empty():
        logger.info("SYNC: No compliant records to transmit.")
        return

    batch = []
    while not queue.empty():
        batch.append(queue.get_nowait().model_dump())

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(endpoint, json=batch, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                resp.raise_for_status()
                logger.info("SYNC_SUCCESS: Transmitted %d compliant spots to Avstar.", len(batch))
        except aiohttp.ClientError as e:
            logger.critical("SYNC_FAILURE: Avstar endpoint unreachable. Rolling back to queue. Error: %s", e)
            # Operational recovery: dump to local JSON for manual retry
            Path("avstar_retry_batch.json").write_text(json.dumps(batch, indent=2))

async def persist_quarantine(records: List[SpotRecord]) -> None:
    if not records:
        return

    timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
    quarantine_file = QUARANTINE_DIR / f"quarantine_{timestamp}.json"
    
    payload = [r.model_dump() for r in records]
    quarantine_file.write_text(json.dumps(payload, indent=2), encoding="utf-8")
    logger.warning("QUARANTINE: %d non-compliant records isolated at %s", len(records), quarantine_file)

Operational Recovery and Troubleshooting

Even with deterministic validation, broadcast environments introduce edge cases that require structured recovery workflows. The following protocols address common failure vectors and ensure audit-ready traceability.

Frame-Rate and Drop-Frame Ambiguity

NTSC drop-frame timecode (00:00:29;29) frequently surfaces in legacy Avion exports. The regex parser intentionally rejects semicolon delimiters to prevent silent frame-count corruption. When drop-frame strings appear, traffic managers must configure the upstream export profile to output HH:MM:SS.ff or raw seconds. If conversion is unavoidable, implement a pre-ingestion translation layer that maps ; to . and applies the FRAME_TOLERANCE_SEC buffer.

Audit Log Analysis

The structured logger emits machine-readable entries compatible with ELK/Splunk ingestion. Filter for compliance_status=NON_STANDARD to identify recurring deviation patterns. A spike in MALFORMED_DURATION typically indicates a CSV encoding shift (e.g., Windows-1252 vs UTF-8) or a third-party trafficking system update. Verify file encoding using file -i or Python’s chardet before pipeline execution.

Quarantine Review and Rollback

Traffic managers should review quarantine files daily using the provided JSON schema. Once corrected, records can be re-ingested via:

bash
python -c "import json, asyncio; from main import process_batch; asyncio.run(process_batch(json.load(open('quarantine/last.json')), 'https://avstar.internal/api/v1/ingest'))"

If Avstar sync fails mid-batch, the avstar_retry_batch.json artifact guarantees zero data loss. Implement a cron job that monitors this file and retries with exponential backoff.

Performance Tuning

For high-volume markets (>10k spots/day), increase asyncio.Semaphore concurrency limits and batch CSV reads using pandas or polars before Pydantic coercion. Memory consumption scales linearly with batch size; cap ingestion windows at 500 records to prevent event loop starvation. Reference the official Python asyncio documentation for event loop optimization patterns.

Deployment and Monitoring

Deploy the validation module as a containerized microservice or embedded cron worker within your traffic orchestration stack. Enforce strict version pinning for Pydantic and aiohttp to prevent breaking changes during minor releases. Integrate Prometheus metrics for spots_validated_total, quarantine_count, and avstar_sync_latency. Alert thresholds should trigger at quarantine_count > 50 per hour, indicating upstream trafficking degradation.

For compliance auditing, retain duration_validation_audit.log for a minimum of 13 months per FCC record-keeping requirements. Rotate logs weekly and archive to cold storage. See FCC Broadcast Record Retention Guidelines for regulatory specifics.