← back to dany2048__morningside-xml-pipeline

Function bodies 59 total

All specs Real LLM only Function bodies
get_video_metadata function · python · L13-L52 (40 LOC)
audio.py
def get_video_metadata(mp4_path: str) -> dict:
    """Extract video metadata using ffprobe."""
    cmd = [
        "ffprobe", "-v", "quiet",
        "-print_format", "json",
        "-show_streams", "-show_format",
        str(mp4_path),
    ]
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    data = json.loads(result.stdout)

    video_stream = None
    audio_stream = None
    for stream in data.get("streams", []):
        if stream["codec_type"] == "video" and not video_stream:
            video_stream = stream
        elif stream["codec_type"] == "audio" and not audio_stream:
            audio_stream = stream

    fps = 24.0
    if video_stream:
        r_frame_rate = video_stream.get("r_frame_rate", "24/1")
        num, den = r_frame_rate.split("/")
        fps = float(num) / float(den) if float(den) != 0 else 24.0

    duration = float(data.get("format", {}).get("duration", 0))

    audio_channels = int(audio_stream.get("channels", 2)) if audio_stream
extract_audio function · python · L55-L69 (15 LOC)
audio.py
def extract_audio(mp4_path: str, output_path: str) -> str:
    """Extract audio from MP4 as MP3."""
    cmd = [
        "ffmpeg", "-y",
        "-i", str(mp4_path),
        "-vn",
        "-acodec", "libmp3lame",
        "-ab", AUDIO_BITRATE,
        str(output_path),
    ]
    print(f"  Extracting audio to {output_path}...")
    subprocess.run(cmd, capture_output=True, check=True)
    size_mb = os.path.getsize(output_path) / (1024 * 1024)
    print(f"  Audio extracted: {size_mb:.1f} MB")
    return output_path
chunk_audio function · python · L72-L119 (48 LOC)
audio.py
def chunk_audio(audio_path: str, chunk_dir: str, max_mb: int = WHISPER_MAX_CHUNK_MB) -> list[tuple[str, float]]:
    """Split audio into chunks under max_mb, with overlap for boundary safety.

    Returns list of (chunk_path, start_offset_seconds).
    """
    file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)

    if file_size_mb <= max_mb:
        return [(audio_path, 0.0)]

    # Get audio duration
    cmd = [
        "ffprobe", "-v", "quiet",
        "-print_format", "json",
        "-show_format",
        str(audio_path),
    ]
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    duration = float(json.loads(result.stdout)["format"]["duration"])

    # Calculate chunk duration to stay under max_mb
    num_chunks = math.ceil(file_size_mb / max_mb)
    chunk_duration = duration / num_chunks

    chunks = []
    Path(chunk_dir).mkdir(parents=True, exist_ok=True)

    for i in range(num_chunks):
        start = max(0, i * chunk_duration - (CHUNK_OVER
_get_service function · python · L23-L53 (31 LOC)
drive.py
def _get_service():
    """Build Drive API service using OAuth2 credentials."""
    creds = None

    if os.path.exists(_TOKEN_PATH):
        creds = Credentials.from_authorized_user_file(_TOKEN_PATH, SCOPES)

    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            # Find client secret file
            client_secret = os.getenv("GOOGLE_CLIENT_SECRET_PATH")
            if not client_secret:
                # Look in reference/ for client_secret*.json
                for f in os.listdir(_CLIENT_SECRET_GLOB):
                    if f.startswith("client_secret") and f.endswith(".json"):
                        client_secret = os.path.join(_CLIENT_SECRET_GLOB, f)
                        break
            if not client_secret:
                raise RuntimeError(
                    "No Google OAuth client secret found. Set GOOGLE_CLIENT_SECRET_PATH "
                    "or place client_se
parse_drive_file_id function · python · L56-L67 (12 LOC)
drive.py
def parse_drive_file_id(url: str) -> str:
    """Extract file ID from various Google Drive URL formats."""
    patterns = [
        r"/file/d/([a-zA-Z0-9_-]+)",
        r"id=([a-zA-Z0-9_-]+)",
        r"/d/([a-zA-Z0-9_-]+)",
    ]
    for pattern in patterns:
        match = re.search(pattern, url)
        if match:
            return match.group(1)
    raise ValueError(f"Could not extract Drive file ID from URL: {url}")
download_file function · python · L70-L95 (26 LOC)
drive.py
def download_file(file_id: str, dest_path: str) -> str:
    """Download a file from Drive with progress logging."""
    service = _get_service()

    # Get file metadata
    meta = service.files().get(fileId=file_id, fields="name,size,mimeType").execute()
    name = meta.get("name", "unknown")
    size = int(meta.get("size", 0))
    size_gb = size / (1024**3)
    print(f"  Downloading: {name} ({size_gb:.1f} GB)")

    request = service.files().get_media(fileId=file_id)
    with open(dest_path, "wb") as f:
        downloader = MediaIoBaseDownload(f, request)
        done = False
        last_pct = 0
        while not done:
            status, done = downloader.next_chunk()
            if status:
                pct = int(status.progress() * 100)
                if pct >= last_pct + 10:
                    print(f"  Download: {pct}%")
                    last_pct = pct

    print(f"  Download complete: {dest_path}")
    return dest_path
upload_file function · python · L98-L115 (18 LOC)
drive.py
def upload_file(local_path: str, folder_id: str, filename: str) -> str:
    """Upload a file to Drive and return the shareable URL."""
    service = _get_service()

    file_metadata = {
        "name": filename,
        "parents": [folder_id],
    }
    media = MediaFileUpload(local_path, mimetype="application/xml")
    uploaded = service.files().create(
        body=file_metadata,
        media_body=media,
        fields="id,webViewLink",
    ).execute()

    link = uploaded.get("webViewLink", f"https://drive.google.com/file/d/{uploaded['id']}/view")
    print(f"  Uploaded to Drive: {link}")
    return link
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
get_file_name function · python · L118-L122 (5 LOC)
drive.py
def get_file_name(file_id: str) -> str:
    """Get just the filename for a Drive file."""
    service = _get_service()
    meta = service.files().get(fileId=file_id, fields="name").execute()
    return meta.get("name", "unknown")
_run_core function · python · L45-L90 (46 LOC)
main.py
def _run_core(mp4_path: str, output_path: str, use_local: bool = False) -> str:
    """Core pipeline: MP4 in, FCPXML out. Returns output path."""
    start_time = time.time()

    with tempfile.TemporaryDirectory(prefix="morningside_") as tmp_dir:
        # Step 1: Video metadata
        print("\n[1/5] Analyzing video...")
        metadata = get_video_metadata(mp4_path)
        print(f"  Resolution: {metadata['width']}x{metadata['height']}")
        print(f"  FPS: {metadata['fps']}")
        print(f"  Duration: {metadata['duration_seconds']:.1f}s ({metadata['duration_seconds']/60:.1f} min)")
        print(f"  Codec: {metadata['codec_name']}")

        # Step 2: Extract audio
        print("\n[2/5] Extracting audio...")
        audio_path = os.path.join(tmp_dir, "audio.mp3")
        extract_audio(mp4_path, audio_path)

        # Step 3: Chunk + transcribe
        print("\n[3/5] Transcribing...")
        chunk_dir = os.path.join(tmp_dir, "chunks")
        chunks = chunk_audio(audio_path,
run_local function · python · L93-L125 (33 LOC)
main.py
def run_local(mp4_path: str, output_path: str | None = None, whisper_model: str | None = None, use_local: bool = False) -> str:
    """Local file mode — for testing or manual runs."""
    mp4_path = os.path.abspath(mp4_path)
    if not os.path.exists(mp4_path):
        print(f"Error: File not found: {mp4_path}")
        sys.exit(1)

    source_filename = Path(mp4_path).stem
    file_size_mb = os.path.getsize(mp4_path) / (1024 * 1024)

    if whisper_model:
        config.WHISPER_MODEL = whisper_model

    if not output_path:
        output_path = os.path.join(
            os.path.dirname(mp4_path),
            f"{source_filename} - Clean Cut.xml",
        )

    print(f"\n{'='*60}")
    print(f"Morningside XML Pipeline")
    print(f"{'='*60}")
    print(f"Source: {mp4_path}")
    print(f"Size: {file_size_mb:.0f} MB")

    result = _run_core(mp4_path, output_path, use_local)

    print(f"\n{'='*60}")
    print(f"Output: {result}")
    print(f"{'='*60}")
    print(f"\nNext: Open Premiere
run_notion function · python · L128-L181 (54 LOC)
main.py
def run_notion(page_id: str, use_local: bool = False) -> str:
    """Notion + Drive mode — download from Drive, process, upload FCPXML, update Notion."""
    from drive import parse_drive_file_id, download_file, upload_file
    from notion_handler import get_page, update_xml_property

    output_folder_id = os.getenv("GOOGLE_DRIVE_OUTPUT_FOLDER_ID")
    if not output_folder_id:
        print("Error: GOOGLE_DRIVE_OUTPUT_FOLDER_ID not set in .env")
        sys.exit(1)

    print(f"\n{'='*60}")
    print(f"Morningside XML Pipeline — Notion Mode")
    print(f"{'='*60}")

    # Read Notion page
    print(f"\nReading Notion page {page_id}...")
    page = get_page(page_id)
    print(f"  Title: {page['title']}")

    if not page["raws_url"]:
        print("Error: No RAWs URL found on this Notion page")
        sys.exit(1)

    print(f"  RAWs URL: {page['raws_url']}")
    file_id = parse_drive_file_id(page["raws_url"])

    with tempfile.TemporaryDirectory(prefix="morningside_") as tmp_dir:
   
run_watch function · python · L184-L220 (37 LOC)
main.py
def run_watch(interval: int = 120, use_local: bool = False):
    """Watch mode — poll Notion DB every N seconds for new pages to process."""
    from notion_handler import get_ready_pages

    db_id = os.getenv("MORNINGSIDE_NOTION_DB_ID")
    if not db_id:
        print("Error: MORNINGSIDE_NOTION_DB_ID not set in .env")
        sys.exit(1)

    print(f"\n{'='*60}")
    print(f"Morningside XML Pipeline — Watch Mode")
    print(f"Polling every {interval}s for 'Started Editing' pages...")
    print(f"{'='*60}")

    while True:
        try:
            pages = get_ready_pages(db_id)
            if pages:
                print(f"\nFound {len(pages)} page(s) to process")
                for page in pages:
                    print(f"\n--- Processing: {page['title']} ---")
                    try:
                        run_notion(page["page_id"], use_local)
                    except Exception as e:
                        print(f"  Error processing {page['title']}: {e}")
                 
main function · python · L223-L249 (27 LOC)
main.py
def main():
    parser = argparse.ArgumentParser(description="Morningside XML Pipeline")

    mode = parser.add_mutually_exclusive_group(required=True)
    mode.add_argument("--file", help="Path to local MP4 file")
    mode.add_argument("--notion-id", help="Notion page ID to process")
    mode.add_argument("--watch", action="store_true", help="Poll Notion DB for new pages")

    parser.add_argument("--out", help="Output FCPXML path (local mode only)")
    parser.add_argument("--local", action="store_true", help="Use local Whisper model instead of API")
    parser.add_argument("--interval", type=int, default=120, help="Watch mode poll interval in seconds (default: 120)")
    parser.add_argument(
        "--whisper-model",
        choices=["tiny", "base", "small", "medium", "large"],
        help="Local Whisper model size (default: base)",
    )
    args = parser.parse_args()

    if args.whisper_model:
        config.WHISPER_MODEL = args.whisper_model

    if args.file:
        run_loca
_run_core function · python · L56-L121 (66 LOC)
main_v2.py
def _run_core(mp4_path: str, output_path: str, use_local: bool = False, transcript_path: str | None = None) -> str:
    """Core pipeline: MP4 in, FCPXML out. Returns output path.

    If transcript_path is given, skips audio extraction + Whisper and uses
    the Premiere Pro transcript directly.
    """
    start_time = time.time()

    # Step 1: Video metadata
    print("\n[1] Analyzing video...")
    metadata = get_video_metadata(mp4_path)
    print(f"  Resolution: {metadata['width']}x{metadata['height']}")
    print(f"  FPS: {metadata['fps']}")
    print(f"  Duration: {metadata['duration_seconds']:.1f}s ({metadata['duration_seconds']/60:.1f} min)")
    print(f"  Codec: {metadata['codec_name']}")

    if transcript_path:
        # Premiere transcript mode — skip Whisper
        print(f"\n[2] Parsing transcript: {transcript_path}")
        lines = detect_and_parse(transcript_path)
        if not lines:
            raise RuntimeError("No lines parsed from transcript")
        total_dur
run_local function · python · L124-L167 (44 LOC)
main_v2.py
def run_local(mp4_path: str, output_path: str | None = None, whisper_model: str | None = None,
               use_local: bool = False, transcript_path: str | None = None) -> str:
    """Local file mode."""
    mp4_path = os.path.abspath(mp4_path)
    if not os.path.exists(mp4_path):
        print(f"Error: File not found: {mp4_path}")
        sys.exit(1)

    if transcript_path:
        transcript_path = os.path.abspath(transcript_path)
        if not os.path.exists(transcript_path):
            print(f"Error: Transcript not found: {transcript_path}")
            sys.exit(1)

    source_filename = Path(mp4_path).stem
    file_size_mb = os.path.getsize(mp4_path) / (1024 * 1024)

    if whisper_model:
        config.WHISPER_MODEL = whisper_model

    if not output_path:
        tag = "transcript" if transcript_path else "v2"
        output_path = os.path.join(
            os.path.dirname(mp4_path),
            f"{source_filename} - Clean Cut {tag}.xml",
        )

    mode = "Premiere Tra
All rows above produced by Repobility · https://repobility.com
run_notion function · python · L170-L217 (48 LOC)
main_v2.py
def run_notion(page_id: str, use_local: bool = False) -> str:
    """Notion + Drive mode."""
    from drive import parse_drive_file_id, download_file, upload_file
    from notion_handler import get_page, update_xml_property

    output_folder_id = os.getenv("GOOGLE_DRIVE_OUTPUT_FOLDER_ID")
    if not output_folder_id:
        print("Error: GOOGLE_DRIVE_OUTPUT_FOLDER_ID not set in .env")
        sys.exit(1)

    print(f"\n{'='*60}")
    print(f"Morningside XML Pipeline v2 (GPT-5.4) — Notion Mode")
    print(f"{'='*60}")

    print(f"\nReading Notion page {page_id}...")
    page = get_page(page_id)
    print(f"  Title: {page['title']}")

    if not page["raws_url"]:
        print("Error: No RAWs URL found on this Notion page")
        sys.exit(1)

    print(f"  RAWs URL: {page['raws_url']}")
    file_id = parse_drive_file_id(page["raws_url"])

    with tempfile.TemporaryDirectory(prefix="morningside_v2_") as tmp_dir:
        print("\nDownloading from Drive...")
        mp4_path = os.path
run_watch function · python · L220-L256 (37 LOC)
main_v2.py
def run_watch(interval: int = 120, use_local: bool = False):
    """Watch mode — poll Notion DB every N seconds."""
    from notion_handler import get_ready_pages

    db_id = os.getenv("MORNINGSIDE_NOTION_DB_ID")
    if not db_id:
        print("Error: MORNINGSIDE_NOTION_DB_ID not set in .env")
        sys.exit(1)

    print(f"\n{'='*60}")
    print(f"Morningside XML Pipeline v2 (GPT-5.4) — Watch Mode")
    print(f"Polling every {interval}s for 'Started Editing' pages...")
    print(f"{'='*60}")

    while True:
        try:
            pages = get_ready_pages(db_id)
            if pages:
                print(f"\nFound {len(pages)} page(s) to process")
                for page in pages:
                    print(f"\n--- Processing: {page['title']} ---")
                    try:
                        run_notion(page["page_id"], use_local)
                    except Exception as e:
                        print(f"  Error processing {page['title']}: {e}")
                        conti
main function · python · L259-L287 (29 LOC)
main_v2.py
def main():
    parser = argparse.ArgumentParser(description="Morningside XML Pipeline v2 (GPT-5.4)")

    mode = parser.add_mutually_exclusive_group(required=True)
    mode.add_argument("--file", help="Path to local MP4 file")
    mode.add_argument("--notion-id", help="Notion page ID to process")
    mode.add_argument("--watch", action="store_true", help="Poll Notion DB for new pages")

    parser.add_argument("--transcript", help="Premiere Pro transcript file (.txt, .srt, .vtt) — skips Whisper")
    parser.add_argument("--out", help="Output FCPXML path (local mode only)")
    parser.add_argument("--local", action="store_true", help="Use local Whisper model instead of API")
    parser.add_argument("--interval", type=int, default=120, help="Watch mode poll interval in seconds (default: 120)")
    parser.add_argument(
        "--whisper-model",
        choices=["tiny", "base", "small", "medium", "large"],
        help="Local Whisper model size (default: base)",
    )
    args = parser.p
_get_client function · python · L8-L12 (5 LOC)
notion_handler.py
def _get_client() -> Client:
    token = os.getenv("NOTION_TOKEN")
    if not token:
        raise RuntimeError("NOTION_TOKEN not set in environment")
    return Client(auth=token)
get_page function · python · L15-L49 (35 LOC)
notion_handler.py
def get_page(page_id: str) -> dict:
    """Get a Notion page and extract the RAWs Drive URL."""
    client = _get_client()
    page = client.pages.retrieve(page_id=page_id)
    props = page.get("properties", {})

    # RAWs property — could be URL type or rich_text
    raws_url = None
    raws_prop = props.get("RAWs") or props.get("Raws") or props.get("raws")
    if raws_prop:
        prop_type = raws_prop.get("type")
        if prop_type == "url":
            raws_url = raws_prop.get("url")
        elif prop_type == "rich_text":
            texts = raws_prop.get("rich_text", [])
            if texts:
                raws_url = texts[0].get("plain_text") or texts[0].get("href")
        elif prop_type == "files":
            files = raws_prop.get("files", [])
            if files:
                raws_url = files[0].get("external", {}).get("url") or files[0].get("name")

    title_prop = props.get("Name") or props.get("Title") or props.get("title")
    title = ""
    if title_prop:
    
update_xml_property function · python · L52-L61 (10 LOC)
notion_handler.py
def update_xml_property(page_id: str, drive_url: str):
    """Write the FCPXML Drive URL back to the Notion page's XML property."""
    client = _get_client()
    client.pages.update(
        page_id=page_id,
        properties={
            "XML": {"url": drive_url},
        },
    )
    print(f"  Notion updated: XML property set on {page_id}")
get_ready_pages function · python · L64-L92 (29 LOC)
notion_handler.py
def get_ready_pages(db_id: str) -> list[dict]:
    """Query DB for pages with Status = 'Started Editing', RAWs filled, XML empty."""
    client = _get_client()

    results = client.databases.query(
        database_id=db_id,
        filter={
            "and": [
                {
                    "property": "Status",
                    "status": {"equals": "Started Editing"},
                },
                {
                    "property": "RAWs",
                    "url": {"is_not_empty": True},
                },
                {
                    "property": "XML",
                    "url": {"is_empty": True},
                },
            ]
        },
    )

    pages = []
    for page in results.get("results", []):
        pages.append(get_page(page["id"]))

    return pages
_get_client function · python · L13-L17 (5 LOC)
processor.py
def _get_client():
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise RuntimeError("OPENAI_API_KEY not set")
    return OpenAI(api_key=api_key)
All rows scored by the Repobility analyzer (https://repobility.com)
_build_numbered_lines function · python · L20-L59 (40 LOC)
processor.py
def _build_numbered_lines(words: list[dict]) -> list[dict]:
    """Break words into numbered lines. Each line has an ID, start, end, and text.

    Lines break on pauses > 0.3s or every ~15 words.
    """
    lines = []
    current_words = []
    current_start = None
    last_end = 0

    def flush():
        if current_words:
            text = " ".join(w["word"] for w in current_words)
            lines.append({
                "id": len(lines) + 1,
                "start": current_start,
                "end": current_words[-1]["end"],
                "text": text,
            })

    for w in words:
        if current_start is None:
            current_start = w["start"]

        gap = w["start"] - last_end if last_end > 0 else 0
        if gap > 0.3 and current_words:
            flush()
            current_words = []
            current_start = w["start"]

        current_words.append(w)
        last_end = w["end"]

        if len(current_words) >= 15:
            flush()
       
_format_for_llm function · python · L62-L67 (6 LOC)
processor.py
def _format_for_llm(lines: list[dict]) -> str:
    """Format numbered lines for LLM consumption."""
    return "\n".join(
        f"L{line['id']:04d} [{line['start']:.1f}s-{line['end']:.1f}s] {line['text']}"
        for line in lines
    )
_parse_line_numbers function · python · L70-L85 (16 LOC)
processor.py
def _parse_line_numbers(content: str) -> list[int]:
    """Parse line numbers from LLM response. Handles various formats."""
    # Remove markdown fences
    content = re.sub(r"```(?:json)?\s*", "", content).rstrip("`").strip()

    # Try JSON array first
    try:
        nums = json.loads(content)
        if isinstance(nums, list):
            return [int(n) for n in nums]
    except (json.JSONDecodeError, ValueError):
        pass

    # Fallback: extract all numbers that look like line references
    nums = re.findall(r"L?0*(\d+)", content)
    return [int(n) for n in nums]
_lines_to_segments function · python · L88-L111 (24 LOC)
processor.py
def _lines_to_segments(lines: list[dict], keep_ids: list[int]) -> list[dict]:
    """Convert kept line IDs back to time segments, merging adjacent lines."""
    keep_set = set(keep_ids)
    kept_lines = [l for l in lines if l["id"] in keep_set]

    if not kept_lines:
        return []

    # Merge consecutive/close lines into segments
    segments = []
    seg_start = kept_lines[0]["start"]
    seg_end = kept_lines[0]["end"]

    for line in kept_lines[1:]:
        gap = line["start"] - seg_end
        if gap <= REMOVABLE_PAUSE:
            seg_end = line["end"]
        else:
            segments.append({"start": seg_start, "end": seg_end})
            seg_start = line["start"]
            seg_end = line["end"]

    segments.append({"start": seg_start, "end": seg_end})
    return segments
process function · python · L178-L280 (103 LOC)
processor.py
def process(words: list[dict], total_duration: float) -> list[dict]:
    """Multi-pass GPT-4o processing with numbered lines.

    Pass 1: Full transcript → aggressive cut
    Pass 2: Survivors only → catch semantic repeats, incomplete thoughts
    Pass 3: Final QC → perfection check

    Returns list of {start: float, end: float, label: str} segments.
    """
    client = _get_client()

    # Build numbered lines
    lines = _build_numbered_lines(words)
    print(f"  {len(lines)} numbered lines from {len(words)} words")

    full_transcript = _format_for_llm(lines)

    # === PASS 1 ===
    print("\n  [Pass 1] Full transcript → GPT-4o...")
    r1 = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": PASS_1_PROMPT},
            {"role": "user", "content": full_transcript},
        ],
        temperature=0.1,
        max_tokens=16000,
    )
    print(f"  [Pass 1] Tokens — in: {r1.usage.prompt_tokens}, out: {r1.usage.comple
_get_client function · python · L20-L24 (5 LOC)
processor_v2.py
def _get_client():
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        raise RuntimeError("OPENAI_API_KEY not set")
    return OpenAI(api_key=api_key)
_build_numbered_lines function · python · L27-L66 (40 LOC)
processor_v2.py
def _build_numbered_lines(words: list[dict]) -> list[dict]:
    """Break words into numbered lines. Each line has an ID, start, end, and text.

    Lines break on pauses > 0.3s or every ~15 words.
    """
    lines = []
    current_words = []
    current_start = None
    last_end = 0

    def flush():
        if current_words:
            text = " ".join(w["word"] for w in current_words)
            lines.append({
                "id": len(lines) + 1,
                "start": current_start,
                "end": current_words[-1]["end"],
                "text": text,
            })

    for w in words:
        if current_start is None:
            current_start = w["start"]

        gap = w["start"] - last_end if last_end > 0 else 0
        if gap > 0.3 and current_words:
            flush()
            current_words = []
            current_start = w["start"]

        current_words.append(w)
        last_end = w["end"]

        if len(current_words) >= 15:
            flush()
       
_format_for_llm function · python · L69-L74 (6 LOC)
processor_v2.py
def _format_for_llm(lines: list[dict]) -> str:
    """Format numbered lines for LLM consumption."""
    return "\n".join(
        f"L{line['id']:04d} [{line['start']:.1f}s-{line['end']:.1f}s] {line['text']}"
        for line in lines
    )
Open data scored by Repobility · https://repobility.com
_parse_line_numbers function · python · L77-L92 (16 LOC)
processor_v2.py
def _parse_line_numbers(content: str) -> list[int]:
    """Parse line numbers from LLM response. Handles various formats."""
    # Remove markdown fences
    content = re.sub(r"```(?:json)?\s*", "", content).rstrip("`").strip()

    # Try JSON array first
    try:
        nums = json.loads(content)
        if isinstance(nums, list):
            return [int(n) for n in nums]
    except (json.JSONDecodeError, ValueError):
        pass

    # Fallback: extract all numbers that look like line references
    nums = re.findall(r"L?0*(\d+)", content)
    return [int(n) for n in nums]
_lines_to_segments function · python · L95-L117 (23 LOC)
processor_v2.py
def _lines_to_segments(lines: list[dict], keep_ids: list[int]) -> list[dict]:
    """Convert kept line IDs back to time segments, merging adjacent lines."""
    keep_set = set(keep_ids)
    kept_lines = [l for l in lines if l["id"] in keep_set]

    if not kept_lines:
        return []

    segments = []
    seg_start = kept_lines[0]["start"]
    seg_end = kept_lines[0]["end"]

    for line in kept_lines[1:]:
        gap = line["start"] - seg_end
        if gap <= REMOVABLE_PAUSE:
            seg_end = line["end"]
        else:
            segments.append({"start": seg_start, "end": seg_end})
            seg_start = line["start"]
            seg_end = line["end"]

    segments.append({"start": seg_start, "end": seg_end})
    return segments
_cost_estimate function · python · L120-L127 (8 LOC)
processor_v2.py
def _cost_estimate(usage, label: str) -> float:
    """Calculate and print cost for a GPT-5.4 call."""
    input_cost = usage.input_tokens * 2.50 / 1_000_000
    output_cost = usage.output_tokens * 15.00 / 1_000_000
    total = input_cost + output_cost
    reasoning = getattr(getattr(usage, "output_tokens_details", None), "reasoning_tokens", 0) or 0
    print(f"  [{label}] Tokens — in: {usage.input_tokens}, out: {usage.output_tokens} (reasoning: {reasoning}) | Cost: ${total:.4f}")
    return total
process_lines function · python · L165-L215 (51 LOC)
processor_v2.py
def process_lines(lines: list[dict], total_duration: float) -> list[dict]:
    """Single-pass GPT-5.4 processing with pre-built numbered lines.

    Accepts lines from any source (Whisper word-level or Premiere transcript).
    Each line must have: id, start, end, text.

    Returns list of {start: float, end: float, label: str} segments.
    """
    client = _get_client()

    print(f"  {len(lines)} lines, {total_duration:.0f}s ({total_duration/60:.1f} min)")

    full_transcript = _format_for_llm(lines)
    valid_ids = {l["id"] for l in lines}

    # === SINGLE PASS ===
    print(f"\n  Full transcript -> {MODEL} (reasoning: medium)...")
    r = client.responses.create(
        model=MODEL,
        instructions=SINGLE_PASS_PROMPT,
        input=full_transcript,
        reasoning={"effort": "medium"},
        max_output_tokens=32000,
    )
    cost = _cost_estimate(r.usage, "Single pass")

    keep = _parse_line_numbers(r.output_text)
    keep = [k for k in keep if k in valid_ids]
    
process function · python · L218-L222 (5 LOC)
processor_v2.py
def process(words: list[dict], total_duration: float) -> list[dict]:
    """Process from Whisper word-level data. Builds numbered lines then runs GPT-5.4."""
    lines = _build_numbered_lines(words)
    print(f"  Built {len(lines)} numbered lines from {len(words)} words")
    return process_lines(lines, total_duration)
step_1_extract_audio function · python · L38-L46 (9 LOC)
rlhf_capture.py
def step_1_extract_audio():
    if os.path.exists(AUDIO_CACHE):
        print(f"[1/4] Audio cached at {AUDIO_CACHE}")
        return AUDIO_CACHE
    print(f"[1/4] Extracting audio from 33GB file...")
    t0 = time.time()
    audio_path = extract_audio(INPUT_FILE, AUDIO_CACHE)
    print(f"  Done in {time.time()-t0:.0f}s")
    return audio_path
step_2_transcribe function · python · L49-L62 (14 LOC)
rlhf_capture.py
def step_2_transcribe(audio_path):
    if os.path.exists(WORDS_CACHE):
        print(f"[2/4] Transcript cached at {WORDS_CACHE}")
        with open(WORDS_CACHE) as f:
            return json.load(f)
    print(f"[2/4] Chunking + transcribing with Whisper API...")
    t0 = time.time()
    chunk_dir = os.path.join(OUTPUT_DIR, "chunks")
    chunks = chunk_audio(audio_path, chunk_dir)
    words = transcribe_all(chunks)
    with open(WORDS_CACHE, "w") as f:
        json.dump(words, f)
    print(f"  Done in {time.time()-t0:.0f}s — {len(words)} words")
    return words
step_3_build_lines function · python · L65-L73 (9 LOC)
rlhf_capture.py
def step_3_build_lines(words):
    print(f"[3/4] Building numbered lines...")
    lines = _build_numbered_lines(words)
    # Save raw transcript
    transcript_path = os.path.join(OUTPUT_DIR, "c5296_numbered_transcript.txt")
    with open(transcript_path, "w") as f:
        f.write(_format_for_llm(lines))
    print(f"  {len(lines)} lines saved to {transcript_path}")
    return lines
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
step_4_run_passes function · python · L76-L150 (75 LOC)
rlhf_capture.py
def step_4_run_passes(lines, total_duration):
    print(f"[4/4] Running 3 passes through GPT-4o...")
    client = _get_client()
    valid_ids = {l["id"] for l in lines}
    full_transcript = _format_for_llm(lines)
    results = {}

    # Pass 1
    print("\n  [Pass 1] Full transcript → GPT-4o...")
    t0 = time.time()
    r1 = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": PASS_1_PROMPT},
            {"role": "user", "content": full_transcript},
        ],
        temperature=0.1,
        max_tokens=16000,
    )
    keep_1 = [k for k in _parse_line_numbers(r1.choices[0].message.content) if k in valid_ids]
    dur_1 = sum(l["end"] - l["start"] for l in lines if l["id"] in set(keep_1))
    print(f"  [Pass 1] {len(keep_1)}/{len(lines)} lines kept ({dur_1:.0f}s, {dur_1/total_duration*100:.0f}%) — {time.time()-t0:.0f}s")
    results["pass_1"] = {"kept": keep_1, "raw_response": r1.choices[0].message.content}

    # Save Pa
build_rlhf_file function · python · L153-L218 (66 LOC)
rlhf_capture.py
def build_rlhf_file(lines, results, total_duration):
    """Build the annotatable RLHF review file."""
    keep_1 = set(results["pass_1"]["kept"])
    keep_2 = set(results["pass_2"]["kept"])
    keep_3 = set(results["pass_3"]["kept"])

    dur_1 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_1)
    dur_2 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_2)
    dur_3 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_3)

    out = []
    out.append("=" * 90)
    out.append("RLHF REVIEW — Project C5296 (32 min raw → 14:38 published)")
    out.append("=" * 90)
    out.append("")
    out.append("HOW TO USE THIS FILE:")
    out.append("  Each line shows: line number, timestamps, the LLM's decision, and the text.")
    out.append("  Decision key:")
    out.append("    KEEP     = survived all 3 passes (in final cut)")
    out.append("    CUT@P1   = cut in Pass 1 (initial aggressive cut)")
    out.append("    CUT@P2   = survived Pass 1, cut in Pass 2 (fals
main function · python · L221-L228 (8 LOC)
rlhf_capture.py
def main():
    audio_path = step_1_extract_audio()
    words = step_2_transcribe(audio_path)
    total_duration = words[-1]["end"] if words else 0
    lines = step_3_build_lines(words)
    results = step_4_run_passes(lines, total_duration)
    build_rlhf_file(lines, results, total_duration)
    print("\nDone. Review the file, add your --> comments, then I'll rebuild the prompts.")
parse_srt function · python · L39-L78 (40 LOC)
rlhf_from_transcript.py
def parse_srt(path: str) -> list[dict]:
    """Parse SRT file into numbered lines with timestamps."""
    with open(path, "r", encoding="utf-8") as f:
        content = f.read()

    blocks = re.split(r"\n\s*\n", content.strip())
    lines = []

    for block in blocks:
        block_lines = block.strip().split("\n")
        if len(block_lines) < 3:
            continue

        # Line 1: sequence number (ignore, we renumber)
        # Line 2: timecodes  00:01:23,456 --> 00:01:25,789
        # Line 3+: text
        tc_match = re.match(
            r"(\d{2}):(\d{2}):(\d{2})[,.](\d{3})\s*-->\s*(\d{2}):(\d{2}):(\d{2})[,.](\d{3})",
            block_lines[1].strip(),
        )
        if not tc_match:
            continue

        g = tc_match.groups()
        start = int(g[0]) * 3600 + int(g[1]) * 60 + int(g[2]) + int(g[3]) / 1000
        end = int(g[4]) * 3600 + int(g[5]) * 60 + int(g[6]) + int(g[7]) / 1000

        text = " ".join(block_lines[2:]).strip()
        # Strip HTML tags (Prem
parse_vtt function · python · L81-L132 (52 LOC)
rlhf_from_transcript.py
def parse_vtt(path: str) -> list[dict]:
    """Parse WebVTT file into numbered lines with timestamps."""
    with open(path, "r", encoding="utf-8") as f:
        content = f.read()

    # Strip WEBVTT header and any metadata
    content = re.sub(r"^WEBVTT.*?\n\n", "", content, flags=re.DOTALL)
    # Strip NOTE blocks
    content = re.sub(r"NOTE.*?\n\n", "", content, flags=re.DOTALL)

    blocks = re.split(r"\n\s*\n", content.strip())
    lines = []

    for block in blocks:
        block_lines = block.strip().split("\n")

        # Find the timecode line (might have optional cue ID before it)
        tc_line = None
        text_start = 0
        for i, bl in enumerate(block_lines):
            if "-->" in bl:
                tc_line = bl
                text_start = i + 1
                break

        if not tc_line:
            continue

        # VTT timecodes: 00:01:23.456 --> 00:01:25.789  or  01:23.456 --> 01:25.789
        tc_match = re.match(
            r"(?:(\d{2}):)?(\d{2}):
parse_plain_text function · python · L135-L153 (19 LOC)
rlhf_from_transcript.py
def parse_plain_text(path: str) -> list[dict]:
    """Parse plain text transcript (no timestamps). Assigns fake 3s-per-line timing."""
    with open(path, "r", encoding="utf-8") as f:
        raw_lines = [l.strip() for l in f if l.strip()]

    lines = []
    t = 0.0
    for text in raw_lines:
        # Estimate ~3 seconds per line
        duration = max(1.0, len(text.split()) * 0.3)
        lines.append({
            "id": len(lines) + 1,
            "start": t,
            "end": t + duration,
            "text": text,
        })
        t += duration + 0.2

    return lines
parse_premiere_txt function · python · L156-L202 (47 LOC)
rlhf_from_transcript.py
def parse_premiere_txt(path: str, fps: float = 29.97) -> list[dict]:
    """Parse Premiere Pro text transcript export.

    Format:
        00:00:03:05 - 00:00:10:03
        Unknown
        Michael. Yeah.

    Timecodes are HH:MM:SS:FF (frames, not milliseconds).
    """
    with open(path, "r", encoding="utf-8") as f:
        content = f.read()

    blocks = re.split(r"\n\s*\n", content.strip())
    lines = []

    for block in blocks:
        block_lines = block.strip().split("\n")
        if len(block_lines) < 3:
            continue

        # Line 1: timecodes  00:00:03:05 - 00:00:10:03
        tc_match = re.match(
            r"(\d{2}):(\d{2}):(\d{2}):(\d{2})\s*-\s*(\d{2}):(\d{2}):(\d{2}):(\d{2})",
            block_lines[0].strip(),
        )
        if not tc_match:
            continue

        g = tc_match.groups()
        start = int(g[0]) * 3600 + int(g[1]) * 60 + int(g[2]) + int(g[3]) / fps
        end = int(g[4]) * 3600 + int(g[5]) * 60 + int(g[6]) + int(g[7]) / fps

    
detect_and_parse function · python · L205-L232 (28 LOC)
rlhf_from_transcript.py
def detect_and_parse(path: str) -> list[dict]:
    """Auto-detect format and parse."""
    ext = os.path.splitext(path)[1].lower()

    # Peek at content to auto-detect
    with open(path, "r", encoding="utf-8") as f:
        head = f.read(500)

    # Premiere Pro TXT: "00:00:03:05 - 00:00:10:03" (frame-based, dash separator)
    if re.search(r"\d{2}:\d{2}:\d{2}:\d{2}\s*-\s*\d{2}:\d{2}:\d{2}:\d{2}", head):
        print(f"  Detected Premiere Pro transcript format (frame-based timecodes)")
        return parse_premiere_txt(path)

    if ext == ".srt":
        print(f"  Detected SRT format")
        return parse_srt(path)
    elif ext == ".vtt":
        print(f"  Detected WebVTT format")
        return parse_vtt(path)
    elif "WEBVTT" in head:
        print(f"  Detected WebVTT format (from content)")
        return parse_vtt(path)
    elif re.search(r"\d{2}:\d{2}:\d{2}[,.]\d{3}\s*-->", head):
        print(f"  Detected SRT format (from content)")
        return parse_srt(path)
    else:
All rows above produced by Repobility · https://repobility.com
run_passes function · python · L239-L317 (79 LOC)
rlhf_from_transcript.py
def run_passes(lines, total_duration):
    print(f"Running 3 passes through GPT-4o...")
    client = _get_client()
    valid_ids = {l["id"] for l in lines}
    full_transcript = _format_for_llm(lines)
    results = {}

    # Pass 1
    print(f"\n  [Pass 1] {len(lines)} lines → GPT-4o...")
    t0 = time.time()
    r1 = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": PASS_1_PROMPT},
            {"role": "user", "content": full_transcript},
        ],
        temperature=0.1,
        max_tokens=16000,
    )
    keep_1 = [k for k in _parse_line_numbers(r1.choices[0].message.content) if k in valid_ids]
    dur_1 = sum(l["end"] - l["start"] for l in lines if l["id"] in set(keep_1))
    print(f"  [Pass 1] {len(keep_1)}/{len(lines)} lines kept ({dur_1:.0f}s, {dur_1/total_duration*100:.0f}%) — {time.time()-t0:.0f}s")
    results["pass_1"] = {"kept": keep_1, "raw_response": r1.choices[0].message.content}

    with open(os.path.
build_rlhf_file function · python · L324-L388 (65 LOC)
rlhf_from_transcript.py
def build_rlhf_file(lines, results, total_duration, source_label):
    keep_1 = set(results["pass_1"]["kept"])
    keep_2 = set(results["pass_2"]["kept"])
    keep_3 = set(results["pass_3"]["kept"])

    dur_1 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_1)
    dur_2 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_2)
    dur_3 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_3)

    out = []
    out.append("=" * 90)
    out.append(f"RLHF REVIEW — Project C5296 (transcript source: {source_label})")
    out.append("=" * 90)
    out.append("")
    out.append("HOW TO USE THIS FILE:")
    out.append("  Each line shows: line number, timestamps, the LLM's decision, and the text.")
    out.append("  Decision key:")
    out.append("    KEEP     = survived all 3 passes (in final cut)")
    out.append("    CUT@P1   = cut in Pass 1 (initial aggressive cut)")
    out.append("    CUT@P2   = survived Pass 1, cut in Pass 2 (false start / repeat hunting)")
  
main function · python · L395-L430 (36 LOC)
rlhf_from_transcript.py
def main():
    if len(sys.argv) < 2:
        print("Usage: python3 rlhf_from_transcript.py <transcript_file>")
        print("  Supports: .srt, .vtt, .txt")
        sys.exit(1)

    transcript_path = sys.argv[1]
    if not os.path.exists(transcript_path):
        print(f"File not found: {transcript_path}")
        sys.exit(1)

    print(f"[1/3] Parsing transcript: {transcript_path}")
    lines = detect_and_parse(transcript_path)
    total_duration = lines[-1]["end"] if lines else 0
    print(f"  {len(lines)} lines, {total_duration:.0f}s ({total_duration/60:.1f} min)")

    # Save parsed lines as JSON for reuse
    parsed_path = os.path.join(OUTPUT_DIR, "parsed_lines.json")
    with open(parsed_path, "w") as f:
        json.dump(lines, f, indent=2)
    print(f"  Saved parsed lines to {parsed_path}")

    # Save numbered transcript
    transcript_out = os.path.join(OUTPUT_DIR, "numbered_transcript.txt")
    with open(transcript_out, "w") as f:
        f.write(_format_for_llm(lines))
   
page 1 / 2next ›