Function bodies 59 total
get_video_metadata function · python · L13-L52 (40 LOC)audio.py
def get_video_metadata(mp4_path: str) -> dict:
"""Extract video metadata using ffprobe."""
cmd = [
"ffprobe", "-v", "quiet",
"-print_format", "json",
"-show_streams", "-show_format",
str(mp4_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
video_stream = None
audio_stream = None
for stream in data.get("streams", []):
if stream["codec_type"] == "video" and not video_stream:
video_stream = stream
elif stream["codec_type"] == "audio" and not audio_stream:
audio_stream = stream
fps = 24.0
if video_stream:
r_frame_rate = video_stream.get("r_frame_rate", "24/1")
num, den = r_frame_rate.split("/")
fps = float(num) / float(den) if float(den) != 0 else 24.0
duration = float(data.get("format", {}).get("duration", 0))
audio_channels = int(audio_stream.get("channels", 2)) if audio_streamextract_audio function · python · L55-L69 (15 LOC)audio.py
def extract_audio(mp4_path: str, output_path: str) -> str:
"""Extract audio from MP4 as MP3."""
cmd = [
"ffmpeg", "-y",
"-i", str(mp4_path),
"-vn",
"-acodec", "libmp3lame",
"-ab", AUDIO_BITRATE,
str(output_path),
]
print(f" Extracting audio to {output_path}...")
subprocess.run(cmd, capture_output=True, check=True)
size_mb = os.path.getsize(output_path) / (1024 * 1024)
print(f" Audio extracted: {size_mb:.1f} MB")
return output_pathchunk_audio function · python · L72-L119 (48 LOC)audio.py
def chunk_audio(audio_path: str, chunk_dir: str, max_mb: int = WHISPER_MAX_CHUNK_MB) -> list[tuple[str, float]]:
"""Split audio into chunks under max_mb, with overlap for boundary safety.
Returns list of (chunk_path, start_offset_seconds).
"""
file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
if file_size_mb <= max_mb:
return [(audio_path, 0.0)]
# Get audio duration
cmd = [
"ffprobe", "-v", "quiet",
"-print_format", "json",
"-show_format",
str(audio_path),
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
duration = float(json.loads(result.stdout)["format"]["duration"])
# Calculate chunk duration to stay under max_mb
num_chunks = math.ceil(file_size_mb / max_mb)
chunk_duration = duration / num_chunks
chunks = []
Path(chunk_dir).mkdir(parents=True, exist_ok=True)
for i in range(num_chunks):
start = max(0, i * chunk_duration - (CHUNK_OVER_get_service function · python · L23-L53 (31 LOC)drive.py
def _get_service():
"""Build Drive API service using OAuth2 credentials."""
creds = None
if os.path.exists(_TOKEN_PATH):
creds = Credentials.from_authorized_user_file(_TOKEN_PATH, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
# Find client secret file
client_secret = os.getenv("GOOGLE_CLIENT_SECRET_PATH")
if not client_secret:
# Look in reference/ for client_secret*.json
for f in os.listdir(_CLIENT_SECRET_GLOB):
if f.startswith("client_secret") and f.endswith(".json"):
client_secret = os.path.join(_CLIENT_SECRET_GLOB, f)
break
if not client_secret:
raise RuntimeError(
"No Google OAuth client secret found. Set GOOGLE_CLIENT_SECRET_PATH "
"or place client_separse_drive_file_id function · python · L56-L67 (12 LOC)drive.py
def parse_drive_file_id(url: str) -> str:
"""Extract file ID from various Google Drive URL formats."""
patterns = [
r"/file/d/([a-zA-Z0-9_-]+)",
r"id=([a-zA-Z0-9_-]+)",
r"/d/([a-zA-Z0-9_-]+)",
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
raise ValueError(f"Could not extract Drive file ID from URL: {url}")download_file function · python · L70-L95 (26 LOC)drive.py
def download_file(file_id: str, dest_path: str) -> str:
"""Download a file from Drive with progress logging."""
service = _get_service()
# Get file metadata
meta = service.files().get(fileId=file_id, fields="name,size,mimeType").execute()
name = meta.get("name", "unknown")
size = int(meta.get("size", 0))
size_gb = size / (1024**3)
print(f" Downloading: {name} ({size_gb:.1f} GB)")
request = service.files().get_media(fileId=file_id)
with open(dest_path, "wb") as f:
downloader = MediaIoBaseDownload(f, request)
done = False
last_pct = 0
while not done:
status, done = downloader.next_chunk()
if status:
pct = int(status.progress() * 100)
if pct >= last_pct + 10:
print(f" Download: {pct}%")
last_pct = pct
print(f" Download complete: {dest_path}")
return dest_pathupload_file function · python · L98-L115 (18 LOC)drive.py
def upload_file(local_path: str, folder_id: str, filename: str) -> str:
"""Upload a file to Drive and return the shareable URL."""
service = _get_service()
file_metadata = {
"name": filename,
"parents": [folder_id],
}
media = MediaFileUpload(local_path, mimetype="application/xml")
uploaded = service.files().create(
body=file_metadata,
media_body=media,
fields="id,webViewLink",
).execute()
link = uploaded.get("webViewLink", f"https://drive.google.com/file/d/{uploaded['id']}/view")
print(f" Uploaded to Drive: {link}")
return linkGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
get_file_name function · python · L118-L122 (5 LOC)drive.py
def get_file_name(file_id: str) -> str:
"""Get just the filename for a Drive file."""
service = _get_service()
meta = service.files().get(fileId=file_id, fields="name").execute()
return meta.get("name", "unknown")_run_core function · python · L45-L90 (46 LOC)main.py
def _run_core(mp4_path: str, output_path: str, use_local: bool = False) -> str:
"""Core pipeline: MP4 in, FCPXML out. Returns output path."""
start_time = time.time()
with tempfile.TemporaryDirectory(prefix="morningside_") as tmp_dir:
# Step 1: Video metadata
print("\n[1/5] Analyzing video...")
metadata = get_video_metadata(mp4_path)
print(f" Resolution: {metadata['width']}x{metadata['height']}")
print(f" FPS: {metadata['fps']}")
print(f" Duration: {metadata['duration_seconds']:.1f}s ({metadata['duration_seconds']/60:.1f} min)")
print(f" Codec: {metadata['codec_name']}")
# Step 2: Extract audio
print("\n[2/5] Extracting audio...")
audio_path = os.path.join(tmp_dir, "audio.mp3")
extract_audio(mp4_path, audio_path)
# Step 3: Chunk + transcribe
print("\n[3/5] Transcribing...")
chunk_dir = os.path.join(tmp_dir, "chunks")
chunks = chunk_audio(audio_path,run_local function · python · L93-L125 (33 LOC)main.py
def run_local(mp4_path: str, output_path: str | None = None, whisper_model: str | None = None, use_local: bool = False) -> str:
"""Local file mode — for testing or manual runs."""
mp4_path = os.path.abspath(mp4_path)
if not os.path.exists(mp4_path):
print(f"Error: File not found: {mp4_path}")
sys.exit(1)
source_filename = Path(mp4_path).stem
file_size_mb = os.path.getsize(mp4_path) / (1024 * 1024)
if whisper_model:
config.WHISPER_MODEL = whisper_model
if not output_path:
output_path = os.path.join(
os.path.dirname(mp4_path),
f"{source_filename} - Clean Cut.xml",
)
print(f"\n{'='*60}")
print(f"Morningside XML Pipeline")
print(f"{'='*60}")
print(f"Source: {mp4_path}")
print(f"Size: {file_size_mb:.0f} MB")
result = _run_core(mp4_path, output_path, use_local)
print(f"\n{'='*60}")
print(f"Output: {result}")
print(f"{'='*60}")
print(f"\nNext: Open Premiererun_notion function · python · L128-L181 (54 LOC)main.py
def run_notion(page_id: str, use_local: bool = False) -> str:
"""Notion + Drive mode — download from Drive, process, upload FCPXML, update Notion."""
from drive import parse_drive_file_id, download_file, upload_file
from notion_handler import get_page, update_xml_property
output_folder_id = os.getenv("GOOGLE_DRIVE_OUTPUT_FOLDER_ID")
if not output_folder_id:
print("Error: GOOGLE_DRIVE_OUTPUT_FOLDER_ID not set in .env")
sys.exit(1)
print(f"\n{'='*60}")
print(f"Morningside XML Pipeline — Notion Mode")
print(f"{'='*60}")
# Read Notion page
print(f"\nReading Notion page {page_id}...")
page = get_page(page_id)
print(f" Title: {page['title']}")
if not page["raws_url"]:
print("Error: No RAWs URL found on this Notion page")
sys.exit(1)
print(f" RAWs URL: {page['raws_url']}")
file_id = parse_drive_file_id(page["raws_url"])
with tempfile.TemporaryDirectory(prefix="morningside_") as tmp_dir:
run_watch function · python · L184-L220 (37 LOC)main.py
def run_watch(interval: int = 120, use_local: bool = False):
"""Watch mode — poll Notion DB every N seconds for new pages to process."""
from notion_handler import get_ready_pages
db_id = os.getenv("MORNINGSIDE_NOTION_DB_ID")
if not db_id:
print("Error: MORNINGSIDE_NOTION_DB_ID not set in .env")
sys.exit(1)
print(f"\n{'='*60}")
print(f"Morningside XML Pipeline — Watch Mode")
print(f"Polling every {interval}s for 'Started Editing' pages...")
print(f"{'='*60}")
while True:
try:
pages = get_ready_pages(db_id)
if pages:
print(f"\nFound {len(pages)} page(s) to process")
for page in pages:
print(f"\n--- Processing: {page['title']} ---")
try:
run_notion(page["page_id"], use_local)
except Exception as e:
print(f" Error processing {page['title']}: {e}")
main function · python · L223-L249 (27 LOC)main.py
def main():
parser = argparse.ArgumentParser(description="Morningside XML Pipeline")
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("--file", help="Path to local MP4 file")
mode.add_argument("--notion-id", help="Notion page ID to process")
mode.add_argument("--watch", action="store_true", help="Poll Notion DB for new pages")
parser.add_argument("--out", help="Output FCPXML path (local mode only)")
parser.add_argument("--local", action="store_true", help="Use local Whisper model instead of API")
parser.add_argument("--interval", type=int, default=120, help="Watch mode poll interval in seconds (default: 120)")
parser.add_argument(
"--whisper-model",
choices=["tiny", "base", "small", "medium", "large"],
help="Local Whisper model size (default: base)",
)
args = parser.parse_args()
if args.whisper_model:
config.WHISPER_MODEL = args.whisper_model
if args.file:
run_loca_run_core function · python · L56-L121 (66 LOC)main_v2.py
def _run_core(mp4_path: str, output_path: str, use_local: bool = False, transcript_path: str | None = None) -> str:
"""Core pipeline: MP4 in, FCPXML out. Returns output path.
If transcript_path is given, skips audio extraction + Whisper and uses
the Premiere Pro transcript directly.
"""
start_time = time.time()
# Step 1: Video metadata
print("\n[1] Analyzing video...")
metadata = get_video_metadata(mp4_path)
print(f" Resolution: {metadata['width']}x{metadata['height']}")
print(f" FPS: {metadata['fps']}")
print(f" Duration: {metadata['duration_seconds']:.1f}s ({metadata['duration_seconds']/60:.1f} min)")
print(f" Codec: {metadata['codec_name']}")
if transcript_path:
# Premiere transcript mode — skip Whisper
print(f"\n[2] Parsing transcript: {transcript_path}")
lines = detect_and_parse(transcript_path)
if not lines:
raise RuntimeError("No lines parsed from transcript")
total_durrun_local function · python · L124-L167 (44 LOC)main_v2.py
def run_local(mp4_path: str, output_path: str | None = None, whisper_model: str | None = None,
use_local: bool = False, transcript_path: str | None = None) -> str:
"""Local file mode."""
mp4_path = os.path.abspath(mp4_path)
if not os.path.exists(mp4_path):
print(f"Error: File not found: {mp4_path}")
sys.exit(1)
if transcript_path:
transcript_path = os.path.abspath(transcript_path)
if not os.path.exists(transcript_path):
print(f"Error: Transcript not found: {transcript_path}")
sys.exit(1)
source_filename = Path(mp4_path).stem
file_size_mb = os.path.getsize(mp4_path) / (1024 * 1024)
if whisper_model:
config.WHISPER_MODEL = whisper_model
if not output_path:
tag = "transcript" if transcript_path else "v2"
output_path = os.path.join(
os.path.dirname(mp4_path),
f"{source_filename} - Clean Cut {tag}.xml",
)
mode = "Premiere TraAll rows above produced by Repobility · https://repobility.com
run_notion function · python · L170-L217 (48 LOC)main_v2.py
def run_notion(page_id: str, use_local: bool = False) -> str:
"""Notion + Drive mode."""
from drive import parse_drive_file_id, download_file, upload_file
from notion_handler import get_page, update_xml_property
output_folder_id = os.getenv("GOOGLE_DRIVE_OUTPUT_FOLDER_ID")
if not output_folder_id:
print("Error: GOOGLE_DRIVE_OUTPUT_FOLDER_ID not set in .env")
sys.exit(1)
print(f"\n{'='*60}")
print(f"Morningside XML Pipeline v2 (GPT-5.4) — Notion Mode")
print(f"{'='*60}")
print(f"\nReading Notion page {page_id}...")
page = get_page(page_id)
print(f" Title: {page['title']}")
if not page["raws_url"]:
print("Error: No RAWs URL found on this Notion page")
sys.exit(1)
print(f" RAWs URL: {page['raws_url']}")
file_id = parse_drive_file_id(page["raws_url"])
with tempfile.TemporaryDirectory(prefix="morningside_v2_") as tmp_dir:
print("\nDownloading from Drive...")
mp4_path = os.pathrun_watch function · python · L220-L256 (37 LOC)main_v2.py
def run_watch(interval: int = 120, use_local: bool = False):
"""Watch mode — poll Notion DB every N seconds."""
from notion_handler import get_ready_pages
db_id = os.getenv("MORNINGSIDE_NOTION_DB_ID")
if not db_id:
print("Error: MORNINGSIDE_NOTION_DB_ID not set in .env")
sys.exit(1)
print(f"\n{'='*60}")
print(f"Morningside XML Pipeline v2 (GPT-5.4) — Watch Mode")
print(f"Polling every {interval}s for 'Started Editing' pages...")
print(f"{'='*60}")
while True:
try:
pages = get_ready_pages(db_id)
if pages:
print(f"\nFound {len(pages)} page(s) to process")
for page in pages:
print(f"\n--- Processing: {page['title']} ---")
try:
run_notion(page["page_id"], use_local)
except Exception as e:
print(f" Error processing {page['title']}: {e}")
contimain function · python · L259-L287 (29 LOC)main_v2.py
def main():
parser = argparse.ArgumentParser(description="Morningside XML Pipeline v2 (GPT-5.4)")
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("--file", help="Path to local MP4 file")
mode.add_argument("--notion-id", help="Notion page ID to process")
mode.add_argument("--watch", action="store_true", help="Poll Notion DB for new pages")
parser.add_argument("--transcript", help="Premiere Pro transcript file (.txt, .srt, .vtt) — skips Whisper")
parser.add_argument("--out", help="Output FCPXML path (local mode only)")
parser.add_argument("--local", action="store_true", help="Use local Whisper model instead of API")
parser.add_argument("--interval", type=int, default=120, help="Watch mode poll interval in seconds (default: 120)")
parser.add_argument(
"--whisper-model",
choices=["tiny", "base", "small", "medium", "large"],
help="Local Whisper model size (default: base)",
)
args = parser.p_get_client function · python · L8-L12 (5 LOC)notion_handler.py
def _get_client() -> Client:
token = os.getenv("NOTION_TOKEN")
if not token:
raise RuntimeError("NOTION_TOKEN not set in environment")
return Client(auth=token)get_page function · python · L15-L49 (35 LOC)notion_handler.py
def get_page(page_id: str) -> dict:
"""Get a Notion page and extract the RAWs Drive URL."""
client = _get_client()
page = client.pages.retrieve(page_id=page_id)
props = page.get("properties", {})
# RAWs property — could be URL type or rich_text
raws_url = None
raws_prop = props.get("RAWs") or props.get("Raws") or props.get("raws")
if raws_prop:
prop_type = raws_prop.get("type")
if prop_type == "url":
raws_url = raws_prop.get("url")
elif prop_type == "rich_text":
texts = raws_prop.get("rich_text", [])
if texts:
raws_url = texts[0].get("plain_text") or texts[0].get("href")
elif prop_type == "files":
files = raws_prop.get("files", [])
if files:
raws_url = files[0].get("external", {}).get("url") or files[0].get("name")
title_prop = props.get("Name") or props.get("Title") or props.get("title")
title = ""
if title_prop:
update_xml_property function · python · L52-L61 (10 LOC)notion_handler.py
def update_xml_property(page_id: str, drive_url: str):
"""Write the FCPXML Drive URL back to the Notion page's XML property."""
client = _get_client()
client.pages.update(
page_id=page_id,
properties={
"XML": {"url": drive_url},
},
)
print(f" Notion updated: XML property set on {page_id}")get_ready_pages function · python · L64-L92 (29 LOC)notion_handler.py
def get_ready_pages(db_id: str) -> list[dict]:
"""Query DB for pages with Status = 'Started Editing', RAWs filled, XML empty."""
client = _get_client()
results = client.databases.query(
database_id=db_id,
filter={
"and": [
{
"property": "Status",
"status": {"equals": "Started Editing"},
},
{
"property": "RAWs",
"url": {"is_not_empty": True},
},
{
"property": "XML",
"url": {"is_empty": True},
},
]
},
)
pages = []
for page in results.get("results", []):
pages.append(get_page(page["id"]))
return pages_get_client function · python · L13-L17 (5 LOC)processor.py
def _get_client():
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY not set")
return OpenAI(api_key=api_key)All rows scored by the Repobility analyzer (https://repobility.com)
_build_numbered_lines function · python · L20-L59 (40 LOC)processor.py
def _build_numbered_lines(words: list[dict]) -> list[dict]:
"""Break words into numbered lines. Each line has an ID, start, end, and text.
Lines break on pauses > 0.3s or every ~15 words.
"""
lines = []
current_words = []
current_start = None
last_end = 0
def flush():
if current_words:
text = " ".join(w["word"] for w in current_words)
lines.append({
"id": len(lines) + 1,
"start": current_start,
"end": current_words[-1]["end"],
"text": text,
})
for w in words:
if current_start is None:
current_start = w["start"]
gap = w["start"] - last_end if last_end > 0 else 0
if gap > 0.3 and current_words:
flush()
current_words = []
current_start = w["start"]
current_words.append(w)
last_end = w["end"]
if len(current_words) >= 15:
flush()
_format_for_llm function · python · L62-L67 (6 LOC)processor.py
def _format_for_llm(lines: list[dict]) -> str:
"""Format numbered lines for LLM consumption."""
return "\n".join(
f"L{line['id']:04d} [{line['start']:.1f}s-{line['end']:.1f}s] {line['text']}"
for line in lines
)_parse_line_numbers function · python · L70-L85 (16 LOC)processor.py
def _parse_line_numbers(content: str) -> list[int]:
"""Parse line numbers from LLM response. Handles various formats."""
# Remove markdown fences
content = re.sub(r"```(?:json)?\s*", "", content).rstrip("`").strip()
# Try JSON array first
try:
nums = json.loads(content)
if isinstance(nums, list):
return [int(n) for n in nums]
except (json.JSONDecodeError, ValueError):
pass
# Fallback: extract all numbers that look like line references
nums = re.findall(r"L?0*(\d+)", content)
return [int(n) for n in nums]_lines_to_segments function · python · L88-L111 (24 LOC)processor.py
def _lines_to_segments(lines: list[dict], keep_ids: list[int]) -> list[dict]:
"""Convert kept line IDs back to time segments, merging adjacent lines."""
keep_set = set(keep_ids)
kept_lines = [l for l in lines if l["id"] in keep_set]
if not kept_lines:
return []
# Merge consecutive/close lines into segments
segments = []
seg_start = kept_lines[0]["start"]
seg_end = kept_lines[0]["end"]
for line in kept_lines[1:]:
gap = line["start"] - seg_end
if gap <= REMOVABLE_PAUSE:
seg_end = line["end"]
else:
segments.append({"start": seg_start, "end": seg_end})
seg_start = line["start"]
seg_end = line["end"]
segments.append({"start": seg_start, "end": seg_end})
return segmentsprocess function · python · L178-L280 (103 LOC)processor.py
def process(words: list[dict], total_duration: float) -> list[dict]:
"""Multi-pass GPT-4o processing with numbered lines.
Pass 1: Full transcript → aggressive cut
Pass 2: Survivors only → catch semantic repeats, incomplete thoughts
Pass 3: Final QC → perfection check
Returns list of {start: float, end: float, label: str} segments.
"""
client = _get_client()
# Build numbered lines
lines = _build_numbered_lines(words)
print(f" {len(lines)} numbered lines from {len(words)} words")
full_transcript = _format_for_llm(lines)
# === PASS 1 ===
print("\n [Pass 1] Full transcript → GPT-4o...")
r1 = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": PASS_1_PROMPT},
{"role": "user", "content": full_transcript},
],
temperature=0.1,
max_tokens=16000,
)
print(f" [Pass 1] Tokens — in: {r1.usage.prompt_tokens}, out: {r1.usage.comple_get_client function · python · L20-L24 (5 LOC)processor_v2.py
def _get_client():
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY not set")
return OpenAI(api_key=api_key)_build_numbered_lines function · python · L27-L66 (40 LOC)processor_v2.py
def _build_numbered_lines(words: list[dict]) -> list[dict]:
"""Break words into numbered lines. Each line has an ID, start, end, and text.
Lines break on pauses > 0.3s or every ~15 words.
"""
lines = []
current_words = []
current_start = None
last_end = 0
def flush():
if current_words:
text = " ".join(w["word"] for w in current_words)
lines.append({
"id": len(lines) + 1,
"start": current_start,
"end": current_words[-1]["end"],
"text": text,
})
for w in words:
if current_start is None:
current_start = w["start"]
gap = w["start"] - last_end if last_end > 0 else 0
if gap > 0.3 and current_words:
flush()
current_words = []
current_start = w["start"]
current_words.append(w)
last_end = w["end"]
if len(current_words) >= 15:
flush()
_format_for_llm function · python · L69-L74 (6 LOC)processor_v2.py
def _format_for_llm(lines: list[dict]) -> str:
"""Format numbered lines for LLM consumption."""
return "\n".join(
f"L{line['id']:04d} [{line['start']:.1f}s-{line['end']:.1f}s] {line['text']}"
for line in lines
)Open data scored by Repobility · https://repobility.com
_parse_line_numbers function · python · L77-L92 (16 LOC)processor_v2.py
def _parse_line_numbers(content: str) -> list[int]:
"""Parse line numbers from LLM response. Handles various formats."""
# Remove markdown fences
content = re.sub(r"```(?:json)?\s*", "", content).rstrip("`").strip()
# Try JSON array first
try:
nums = json.loads(content)
if isinstance(nums, list):
return [int(n) for n in nums]
except (json.JSONDecodeError, ValueError):
pass
# Fallback: extract all numbers that look like line references
nums = re.findall(r"L?0*(\d+)", content)
return [int(n) for n in nums]_lines_to_segments function · python · L95-L117 (23 LOC)processor_v2.py
def _lines_to_segments(lines: list[dict], keep_ids: list[int]) -> list[dict]:
"""Convert kept line IDs back to time segments, merging adjacent lines."""
keep_set = set(keep_ids)
kept_lines = [l for l in lines if l["id"] in keep_set]
if not kept_lines:
return []
segments = []
seg_start = kept_lines[0]["start"]
seg_end = kept_lines[0]["end"]
for line in kept_lines[1:]:
gap = line["start"] - seg_end
if gap <= REMOVABLE_PAUSE:
seg_end = line["end"]
else:
segments.append({"start": seg_start, "end": seg_end})
seg_start = line["start"]
seg_end = line["end"]
segments.append({"start": seg_start, "end": seg_end})
return segments_cost_estimate function · python · L120-L127 (8 LOC)processor_v2.py
def _cost_estimate(usage, label: str) -> float:
"""Calculate and print cost for a GPT-5.4 call."""
input_cost = usage.input_tokens * 2.50 / 1_000_000
output_cost = usage.output_tokens * 15.00 / 1_000_000
total = input_cost + output_cost
reasoning = getattr(getattr(usage, "output_tokens_details", None), "reasoning_tokens", 0) or 0
print(f" [{label}] Tokens — in: {usage.input_tokens}, out: {usage.output_tokens} (reasoning: {reasoning}) | Cost: ${total:.4f}")
return totalprocess_lines function · python · L165-L215 (51 LOC)processor_v2.py
def process_lines(lines: list[dict], total_duration: float) -> list[dict]:
"""Single-pass GPT-5.4 processing with pre-built numbered lines.
Accepts lines from any source (Whisper word-level or Premiere transcript).
Each line must have: id, start, end, text.
Returns list of {start: float, end: float, label: str} segments.
"""
client = _get_client()
print(f" {len(lines)} lines, {total_duration:.0f}s ({total_duration/60:.1f} min)")
full_transcript = _format_for_llm(lines)
valid_ids = {l["id"] for l in lines}
# === SINGLE PASS ===
print(f"\n Full transcript -> {MODEL} (reasoning: medium)...")
r = client.responses.create(
model=MODEL,
instructions=SINGLE_PASS_PROMPT,
input=full_transcript,
reasoning={"effort": "medium"},
max_output_tokens=32000,
)
cost = _cost_estimate(r.usage, "Single pass")
keep = _parse_line_numbers(r.output_text)
keep = [k for k in keep if k in valid_ids]
process function · python · L218-L222 (5 LOC)processor_v2.py
def process(words: list[dict], total_duration: float) -> list[dict]:
"""Process from Whisper word-level data. Builds numbered lines then runs GPT-5.4."""
lines = _build_numbered_lines(words)
print(f" Built {len(lines)} numbered lines from {len(words)} words")
return process_lines(lines, total_duration)step_1_extract_audio function · python · L38-L46 (9 LOC)rlhf_capture.py
def step_1_extract_audio():
if os.path.exists(AUDIO_CACHE):
print(f"[1/4] Audio cached at {AUDIO_CACHE}")
return AUDIO_CACHE
print(f"[1/4] Extracting audio from 33GB file...")
t0 = time.time()
audio_path = extract_audio(INPUT_FILE, AUDIO_CACHE)
print(f" Done in {time.time()-t0:.0f}s")
return audio_pathstep_2_transcribe function · python · L49-L62 (14 LOC)rlhf_capture.py
def step_2_transcribe(audio_path):
if os.path.exists(WORDS_CACHE):
print(f"[2/4] Transcript cached at {WORDS_CACHE}")
with open(WORDS_CACHE) as f:
return json.load(f)
print(f"[2/4] Chunking + transcribing with Whisper API...")
t0 = time.time()
chunk_dir = os.path.join(OUTPUT_DIR, "chunks")
chunks = chunk_audio(audio_path, chunk_dir)
words = transcribe_all(chunks)
with open(WORDS_CACHE, "w") as f:
json.dump(words, f)
print(f" Done in {time.time()-t0:.0f}s — {len(words)} words")
return wordsstep_3_build_lines function · python · L65-L73 (9 LOC)rlhf_capture.py
def step_3_build_lines(words):
print(f"[3/4] Building numbered lines...")
lines = _build_numbered_lines(words)
# Save raw transcript
transcript_path = os.path.join(OUTPUT_DIR, "c5296_numbered_transcript.txt")
with open(transcript_path, "w") as f:
f.write(_format_for_llm(lines))
print(f" {len(lines)} lines saved to {transcript_path}")
return linesGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
step_4_run_passes function · python · L76-L150 (75 LOC)rlhf_capture.py
def step_4_run_passes(lines, total_duration):
print(f"[4/4] Running 3 passes through GPT-4o...")
client = _get_client()
valid_ids = {l["id"] for l in lines}
full_transcript = _format_for_llm(lines)
results = {}
# Pass 1
print("\n [Pass 1] Full transcript → GPT-4o...")
t0 = time.time()
r1 = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": PASS_1_PROMPT},
{"role": "user", "content": full_transcript},
],
temperature=0.1,
max_tokens=16000,
)
keep_1 = [k for k in _parse_line_numbers(r1.choices[0].message.content) if k in valid_ids]
dur_1 = sum(l["end"] - l["start"] for l in lines if l["id"] in set(keep_1))
print(f" [Pass 1] {len(keep_1)}/{len(lines)} lines kept ({dur_1:.0f}s, {dur_1/total_duration*100:.0f}%) — {time.time()-t0:.0f}s")
results["pass_1"] = {"kept": keep_1, "raw_response": r1.choices[0].message.content}
# Save Pabuild_rlhf_file function · python · L153-L218 (66 LOC)rlhf_capture.py
def build_rlhf_file(lines, results, total_duration):
"""Build the annotatable RLHF review file."""
keep_1 = set(results["pass_1"]["kept"])
keep_2 = set(results["pass_2"]["kept"])
keep_3 = set(results["pass_3"]["kept"])
dur_1 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_1)
dur_2 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_2)
dur_3 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_3)
out = []
out.append("=" * 90)
out.append("RLHF REVIEW — Project C5296 (32 min raw → 14:38 published)")
out.append("=" * 90)
out.append("")
out.append("HOW TO USE THIS FILE:")
out.append(" Each line shows: line number, timestamps, the LLM's decision, and the text.")
out.append(" Decision key:")
out.append(" KEEP = survived all 3 passes (in final cut)")
out.append(" CUT@P1 = cut in Pass 1 (initial aggressive cut)")
out.append(" CUT@P2 = survived Pass 1, cut in Pass 2 (falsmain function · python · L221-L228 (8 LOC)rlhf_capture.py
def main():
audio_path = step_1_extract_audio()
words = step_2_transcribe(audio_path)
total_duration = words[-1]["end"] if words else 0
lines = step_3_build_lines(words)
results = step_4_run_passes(lines, total_duration)
build_rlhf_file(lines, results, total_duration)
print("\nDone. Review the file, add your --> comments, then I'll rebuild the prompts.")parse_srt function · python · L39-L78 (40 LOC)rlhf_from_transcript.py
def parse_srt(path: str) -> list[dict]:
"""Parse SRT file into numbered lines with timestamps."""
with open(path, "r", encoding="utf-8") as f:
content = f.read()
blocks = re.split(r"\n\s*\n", content.strip())
lines = []
for block in blocks:
block_lines = block.strip().split("\n")
if len(block_lines) < 3:
continue
# Line 1: sequence number (ignore, we renumber)
# Line 2: timecodes 00:01:23,456 --> 00:01:25,789
# Line 3+: text
tc_match = re.match(
r"(\d{2}):(\d{2}):(\d{2})[,.](\d{3})\s*-->\s*(\d{2}):(\d{2}):(\d{2})[,.](\d{3})",
block_lines[1].strip(),
)
if not tc_match:
continue
g = tc_match.groups()
start = int(g[0]) * 3600 + int(g[1]) * 60 + int(g[2]) + int(g[3]) / 1000
end = int(g[4]) * 3600 + int(g[5]) * 60 + int(g[6]) + int(g[7]) / 1000
text = " ".join(block_lines[2:]).strip()
# Strip HTML tags (Premparse_vtt function · python · L81-L132 (52 LOC)rlhf_from_transcript.py
def parse_vtt(path: str) -> list[dict]:
"""Parse WebVTT file into numbered lines with timestamps."""
with open(path, "r", encoding="utf-8") as f:
content = f.read()
# Strip WEBVTT header and any metadata
content = re.sub(r"^WEBVTT.*?\n\n", "", content, flags=re.DOTALL)
# Strip NOTE blocks
content = re.sub(r"NOTE.*?\n\n", "", content, flags=re.DOTALL)
blocks = re.split(r"\n\s*\n", content.strip())
lines = []
for block in blocks:
block_lines = block.strip().split("\n")
# Find the timecode line (might have optional cue ID before it)
tc_line = None
text_start = 0
for i, bl in enumerate(block_lines):
if "-->" in bl:
tc_line = bl
text_start = i + 1
break
if not tc_line:
continue
# VTT timecodes: 00:01:23.456 --> 00:01:25.789 or 01:23.456 --> 01:25.789
tc_match = re.match(
r"(?:(\d{2}):)?(\d{2}):parse_plain_text function · python · L135-L153 (19 LOC)rlhf_from_transcript.py
def parse_plain_text(path: str) -> list[dict]:
"""Parse plain text transcript (no timestamps). Assigns fake 3s-per-line timing."""
with open(path, "r", encoding="utf-8") as f:
raw_lines = [l.strip() for l in f if l.strip()]
lines = []
t = 0.0
for text in raw_lines:
# Estimate ~3 seconds per line
duration = max(1.0, len(text.split()) * 0.3)
lines.append({
"id": len(lines) + 1,
"start": t,
"end": t + duration,
"text": text,
})
t += duration + 0.2
return linesparse_premiere_txt function · python · L156-L202 (47 LOC)rlhf_from_transcript.py
def parse_premiere_txt(path: str, fps: float = 29.97) -> list[dict]:
"""Parse Premiere Pro text transcript export.
Format:
00:00:03:05 - 00:00:10:03
Unknown
Michael. Yeah.
Timecodes are HH:MM:SS:FF (frames, not milliseconds).
"""
with open(path, "r", encoding="utf-8") as f:
content = f.read()
blocks = re.split(r"\n\s*\n", content.strip())
lines = []
for block in blocks:
block_lines = block.strip().split("\n")
if len(block_lines) < 3:
continue
# Line 1: timecodes 00:00:03:05 - 00:00:10:03
tc_match = re.match(
r"(\d{2}):(\d{2}):(\d{2}):(\d{2})\s*-\s*(\d{2}):(\d{2}):(\d{2}):(\d{2})",
block_lines[0].strip(),
)
if not tc_match:
continue
g = tc_match.groups()
start = int(g[0]) * 3600 + int(g[1]) * 60 + int(g[2]) + int(g[3]) / fps
end = int(g[4]) * 3600 + int(g[5]) * 60 + int(g[6]) + int(g[7]) / fps
detect_and_parse function · python · L205-L232 (28 LOC)rlhf_from_transcript.py
def detect_and_parse(path: str) -> list[dict]:
"""Auto-detect format and parse."""
ext = os.path.splitext(path)[1].lower()
# Peek at content to auto-detect
with open(path, "r", encoding="utf-8") as f:
head = f.read(500)
# Premiere Pro TXT: "00:00:03:05 - 00:00:10:03" (frame-based, dash separator)
if re.search(r"\d{2}:\d{2}:\d{2}:\d{2}\s*-\s*\d{2}:\d{2}:\d{2}:\d{2}", head):
print(f" Detected Premiere Pro transcript format (frame-based timecodes)")
return parse_premiere_txt(path)
if ext == ".srt":
print(f" Detected SRT format")
return parse_srt(path)
elif ext == ".vtt":
print(f" Detected WebVTT format")
return parse_vtt(path)
elif "WEBVTT" in head:
print(f" Detected WebVTT format (from content)")
return parse_vtt(path)
elif re.search(r"\d{2}:\d{2}:\d{2}[,.]\d{3}\s*-->", head):
print(f" Detected SRT format (from content)")
return parse_srt(path)
else:All rows above produced by Repobility · https://repobility.com
run_passes function · python · L239-L317 (79 LOC)rlhf_from_transcript.py
def run_passes(lines, total_duration):
print(f"Running 3 passes through GPT-4o...")
client = _get_client()
valid_ids = {l["id"] for l in lines}
full_transcript = _format_for_llm(lines)
results = {}
# Pass 1
print(f"\n [Pass 1] {len(lines)} lines → GPT-4o...")
t0 = time.time()
r1 = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": PASS_1_PROMPT},
{"role": "user", "content": full_transcript},
],
temperature=0.1,
max_tokens=16000,
)
keep_1 = [k for k in _parse_line_numbers(r1.choices[0].message.content) if k in valid_ids]
dur_1 = sum(l["end"] - l["start"] for l in lines if l["id"] in set(keep_1))
print(f" [Pass 1] {len(keep_1)}/{len(lines)} lines kept ({dur_1:.0f}s, {dur_1/total_duration*100:.0f}%) — {time.time()-t0:.0f}s")
results["pass_1"] = {"kept": keep_1, "raw_response": r1.choices[0].message.content}
with open(os.path.build_rlhf_file function · python · L324-L388 (65 LOC)rlhf_from_transcript.py
def build_rlhf_file(lines, results, total_duration, source_label):
keep_1 = set(results["pass_1"]["kept"])
keep_2 = set(results["pass_2"]["kept"])
keep_3 = set(results["pass_3"]["kept"])
dur_1 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_1)
dur_2 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_2)
dur_3 = sum(l["end"] - l["start"] for l in lines if l["id"] in keep_3)
out = []
out.append("=" * 90)
out.append(f"RLHF REVIEW — Project C5296 (transcript source: {source_label})")
out.append("=" * 90)
out.append("")
out.append("HOW TO USE THIS FILE:")
out.append(" Each line shows: line number, timestamps, the LLM's decision, and the text.")
out.append(" Decision key:")
out.append(" KEEP = survived all 3 passes (in final cut)")
out.append(" CUT@P1 = cut in Pass 1 (initial aggressive cut)")
out.append(" CUT@P2 = survived Pass 1, cut in Pass 2 (false start / repeat hunting)")
main function · python · L395-L430 (36 LOC)rlhf_from_transcript.py
def main():
if len(sys.argv) < 2:
print("Usage: python3 rlhf_from_transcript.py <transcript_file>")
print(" Supports: .srt, .vtt, .txt")
sys.exit(1)
transcript_path = sys.argv[1]
if not os.path.exists(transcript_path):
print(f"File not found: {transcript_path}")
sys.exit(1)
print(f"[1/3] Parsing transcript: {transcript_path}")
lines = detect_and_parse(transcript_path)
total_duration = lines[-1]["end"] if lines else 0
print(f" {len(lines)} lines, {total_duration:.0f}s ({total_duration/60:.1f} min)")
# Save parsed lines as JSON for reuse
parsed_path = os.path.join(OUTPUT_DIR, "parsed_lines.json")
with open(parsed_path, "w") as f:
json.dump(lines, f, indent=2)
print(f" Saved parsed lines to {parsed_path}")
# Save numbered transcript
transcript_out = os.path.join(OUTPUT_DIR, "numbered_transcript.txt")
with open(transcript_out, "w") as f:
f.write(_format_for_llm(lines))
page 1 / 2next ›