Function bodies 108 total
_cleanup_old_audio method · python · L233-L247 (15 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _cleanup_old_audio(self) -> None:
"""Delete audio chunks older than retention period.
When audio_retention_days is None, audio is kept forever.
"""
if self.audio_retention_days is None:
return
if not self._audio_dir.exists():
return
cutoff = time.time() - (self.audio_retention_days * 86400)
for session_dir in self._audio_dir.iterdir():
if session_dir.is_dir() and session_dir.stat().st_mtime < cutoff:
for f in session_dir.iterdir():
f.unlink()
session_dir.rmdir()_update_status method · python · L249-L257 (9 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _update_status(self, **kwargs) -> None:
status = DaemonStatus.load(self._status_file)
status.pid = os.getpid()
status.last_check = datetime.now().isoformat()
for k, v in kwargs.items():
setattr(status, k, v)
if self._recorder:
status.chunks_recorded = len(self._recorder.chunks)
status.save(self._status_file)_handle_signal method · python · L259-L261 (3 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _handle_signal(self, signum, frame) -> None:
print(f"\nReceived signal {signum} — stopping...")
self._running = False_cleanup method · python · L263-L274 (12 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _cleanup(self) -> None:
"""Final cleanup on daemon exit."""
if self._current_meeting:
# _on_meeting_end handles stopping the recorder internally
self._on_meeting_end()
elif self._recorder:
# No active meeting but recorder running — just stop it
self._recorder.stop()
if self._pid_file.exists():
self._pid_file.unlink()
self._update_status(running=False, meeting_active=False)
print("Daemon stopped.")_stitch_wav_chunks function · python · L277-L292 (16 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _stitch_wav_chunks(chunk_paths: list[Path], output_path: Path) -> None:
"""Concatenate WAV chunks into a single file (standalone fallback)."""
import wave
valid_paths = [p for p in chunk_paths if p.exists()]
if not valid_paths:
return
with wave.open(str(valid_paths[0]), "rb") as first:
params = first.getparams()
with wave.open(str(output_path), "wb") as out:
out.setparams(params)
for chunk_path in valid_paths:
with wave.open(str(chunk_path), "rb") as chunk:
out.writeframes(chunk.readframes(chunk.getnframes()))DetectedMeeting class · python · L9-L15 (7 LOC)tools/meeting-transcriber/meeting_transcriber/detector_base.py
class DetectedMeeting:
"""A detected meeting process."""
platform: str
process_name: str
window_title: str
pid: intMacOSDetector class · python · L39-L86 (48 LOC)tools/meeting-transcriber/meeting_transcriber/_detector_macos.py
class MacOSDetector:
"""Meeting detection using macOS APIs."""
@staticmethod
def detect() -> DetectedMeeting | None:
"""Scan running processes for active meeting applications.
Returns the first detected meeting, or None if no meeting is active.
"""
for proc in psutil.process_iter(["pid", "name"]):
try:
name = proc.info["name"] or ""
pid = proc.info["pid"]
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
for proc_pattern, platform, title_pattern in MEETING_PATTERNS:
if not re.search(proc_pattern, name, re.IGNORECASE):
continue
if title_pattern:
window_title = _get_window_title(pid)
if not window_title or not re.search(title_pattern, window_title, re.IGNORECASE):
continue
return DetectedMeeting(
Want this analysis on your repo? https://repobility.com/scan/
detect method · python · L43-L77 (35 LOC)tools/meeting-transcriber/meeting_transcriber/_detector_macos.py
def detect() -> DetectedMeeting | None:
"""Scan running processes for active meeting applications.
Returns the first detected meeting, or None if no meeting is active.
"""
for proc in psutil.process_iter(["pid", "name"]):
try:
name = proc.info["name"] or ""
pid = proc.info["pid"]
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
for proc_pattern, platform, title_pattern in MEETING_PATTERNS:
if not re.search(proc_pattern, name, re.IGNORECASE):
continue
if title_pattern:
window_title = _get_window_title(pid)
if not window_title or not re.search(title_pattern, window_title, re.IGNORECASE):
continue
return DetectedMeeting(
platform=platform,
process_name=name,
is_still_active method · python · L80-L86 (7 LOC)tools/meeting-transcriber/meeting_transcriber/_detector_macos.py
def is_still_active(meeting: DetectedMeeting) -> bool:
"""Check if a previously detected meeting is still running."""
try:
proc = psutil.Process(meeting.pid)
return proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE
except (psutil.NoSuchProcess, psutil.AccessDenied):
return False_get_window_title function · python · L89-L111 (23 LOC)tools/meeting-transcriber/meeting_transcriber/_detector_macos.py
def _get_window_title(pid: int) -> str:
"""Get the window title for a process via Quartz CGWindowList."""
if not _QUARTZ_AVAILABLE:
# Without Quartz, fall back to process name only
try:
proc = psutil.Process(pid)
return proc.name()
except (psutil.NoSuchProcess, psutil.AccessDenied):
return ""
try:
window_list = CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly, kCGNullWindowID
)
for window in window_list:
owner_pid = window.get("kCGWindowOwnerPID", 0)
if owner_pid == pid:
title = window.get("kCGWindowName", "")
if title:
return title
return ""
except Exception:
return ""detect_meeting function · python · L19-L23 (5 LOC)tools/meeting-transcriber/meeting_transcriber/detector.py
def detect_meeting() -> DetectedMeeting | None:
"""Scan running processes for active meeting applications."""
if _Detector is None:
return None
return _Detector.detect()is_meeting_still_active function · python · L26-L30 (5 LOC)tools/meeting-transcriber/meeting_transcriber/detector.py
def is_meeting_still_active(meeting: DetectedMeeting) -> bool:
"""Check if a previously detected meeting is still running."""
if _Detector is None:
return False
return _Detector.is_still_active(meeting)WindowsDetector class · python · L25-L72 (48 LOC)tools/meeting-transcriber/meeting_transcriber/_detector_windows.py
class WindowsDetector:
"""Meeting detection using Win32 APIs."""
@staticmethod
def detect() -> DetectedMeeting | None:
"""Scan running processes for active meeting applications.
Returns the first detected meeting, or None if no meeting is active.
"""
for proc in psutil.process_iter(["pid", "name"]):
try:
name = proc.info["name"] or ""
pid = proc.info["pid"]
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
for proc_pattern, platform, title_pattern in MEETING_PATTERNS:
if not re.search(proc_pattern, name, re.IGNORECASE):
continue
if title_pattern:
window_title = _get_window_title(pid)
if not window_title or not re.search(title_pattern, window_title, re.IGNORECASE):
continue
return DetectedMeeting(
detect method · python · L29-L63 (35 LOC)tools/meeting-transcriber/meeting_transcriber/_detector_windows.py
def detect() -> DetectedMeeting | None:
"""Scan running processes for active meeting applications.
Returns the first detected meeting, or None if no meeting is active.
"""
for proc in psutil.process_iter(["pid", "name"]):
try:
name = proc.info["name"] or ""
pid = proc.info["pid"]
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
for proc_pattern, platform, title_pattern in MEETING_PATTERNS:
if not re.search(proc_pattern, name, re.IGNORECASE):
continue
if title_pattern:
window_title = _get_window_title(pid)
if not window_title or not re.search(title_pattern, window_title, re.IGNORECASE):
continue
return DetectedMeeting(
platform=platform,
process_name=name,
is_still_active method · python · L66-L72 (7 LOC)tools/meeting-transcriber/meeting_transcriber/_detector_windows.py
def is_still_active(meeting: DetectedMeeting) -> bool:
"""Check if a previously detected meeting is still running."""
try:
proc = psutil.Process(meeting.pid)
return proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE
except (psutil.NoSuchProcess, psutil.AccessDenied):
return FalseRepobility (the analyzer behind this table) · https://repobility.com
_get_window_title function · python · L75-L101 (27 LOC)tools/meeting-transcriber/meeting_transcriber/_detector_windows.py
def _get_window_title(pid: int) -> str:
"""Get the window title for a process via ctypes/user32."""
try:
import ctypes
from ctypes import wintypes
user32 = ctypes.windll.user32
titles: list[str] = []
def enum_callback(hwnd, _):
if not user32.IsWindowVisible(hwnd):
return True
window_pid = wintypes.DWORD()
user32.GetWindowThreadProcessId(hwnd, ctypes.byref(window_pid))
if window_pid.value == pid:
length = user32.GetWindowTextLengthW(hwnd)
if length > 0:
buf = ctypes.create_unicode_buffer(length + 1)
user32.GetWindowTextW(hwnd, buf, length + 1)
titles.append(buf.value)
return True
WNDENUMPROC = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM)
user32.EnumWindows(WNDENUMPROC(enum_callback), 0)
return titles[0] if titles else ""
exSpeakerDiarizer class · python · L25-L127 (103 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
class SpeakerDiarizer:
"""Identifies speakers in audio and merges with transcript segments.
Uses pyannote-audio's pretrained speaker diarization pipeline.
Requires a HuggingFace auth token with access to pyannote/speaker-diarization-3.1.
"""
def __init__(self, hf_token: str | None = None):
if not DIARIZATION_AVAILABLE:
raise RuntimeError(
"Speaker diarization requires pyannote-audio. "
"Install with: uv pip install bizbrain-meetings[diarization]"
)
self.hf_token = hf_token
self._pipeline = None
def _load_pipeline(self):
if self._pipeline is not None:
return
self._pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=self.hf_token,
)
def diarize_and_merge(
self,
audio_path: Path,
segments: list[TranscriptSegment],
) -> list[SpeakerSegment]:
"""Run d__init__ method · python · L32-L39 (8 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
def __init__(self, hf_token: str | None = None):
if not DIARIZATION_AVAILABLE:
raise RuntimeError(
"Speaker diarization requires pyannote-audio. "
"Install with: uv pip install bizbrain-meetings[diarization]"
)
self.hf_token = hf_token
self._pipeline = None_load_pipeline method · python · L41-L47 (7 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
def _load_pipeline(self):
if self._pipeline is not None:
return
self._pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=self.hf_token,
)diarize_and_merge method · python · L49-L63 (15 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
def diarize_and_merge(
self,
audio_path: Path,
segments: list[TranscriptSegment],
) -> list[SpeakerSegment]:
"""Run diarization on a single audio file and merge with transcript segments.
Each transcript segment gets the speaker label from whoever spoke
during the majority of that segment's time range.
"""
self._load_pipeline()
diarization = self._pipeline(str(audio_path))
speaker_timeline = _build_speaker_timeline(diarization)
return _merge_speakers(segments, speaker_timeline)diarize_full_meeting method · python · L65-L109 (45 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
def diarize_full_meeting(
self,
chunk_paths: list[Path],
segments: list[TranscriptSegment],
tmp_dir: Path | None = None,
) -> list[SpeakerSegment]:
"""Stitch all audio chunks and run diarization on the full meeting.
Fixes the single-chunk diarization bug by concatenating all WAV chunks
into one temp file, running pyannote on the complete audio, then cleaning up.
"""
if not chunk_paths:
return [
SpeakerSegment(
start=s.start, end=s.end, text=s.text,
speaker="Unknown", language=s.language,
)
for s in segments
]
# Single chunk — no stitching needed
if len(chunk_paths) == 1:
return self.diarize_and_merge(chunk_paths[0], segments)
# Stitch all chunks into a single temp WAV
self._load_pipeline()
cleanup_tmp_dir = tmp_dir is None
if tmp_dir i_find_dominant_speaker method · python · L112-L127 (16 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
def _find_dominant_speaker(
start: float,
end: float,
timeline: list[tuple[float, float, str]],
) -> str:
"""Find which speaker talked most during [start, end]."""
overlap: dict[str, float] = {}
for t_start, t_end, speaker in timeline:
o_start = max(start, t_start)
o_end = min(end, t_end)
if o_start < o_end:
overlap[speaker] = overlap.get(speaker, 0) + (o_end - o_start)
if not overlap:
return "Unknown"
return max(overlap, key=overlap.get)_build_speaker_timeline function · python · L130-L135 (6 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
def _build_speaker_timeline(diarization) -> list[tuple[float, float, str]]:
"""Extract speaker timeline from pyannote diarization result."""
timeline: list[tuple[float, float, str]] = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
timeline.append((turn.start, turn.end, speaker))
return timelinePowered by Repobility — scan your code at https://repobility.com
_merge_speakers function · python · L138-L155 (18 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
def _merge_speakers(
segments: list[TranscriptSegment],
speaker_timeline: list[tuple[float, float, str]],
) -> list[SpeakerSegment]:
"""Assign each transcript segment to its dominant speaker."""
result = []
for seg in segments:
speaker = SpeakerDiarizer._find_dominant_speaker(
seg.start, seg.end, speaker_timeline
)
result.append(SpeakerSegment(
start=seg.start,
end=seg.end,
text=seg.text,
speaker=speaker,
language=seg.language,
))
return result_stitch_wav_files function · python · L158-L176 (19 LOC)tools/meeting-transcriber/meeting_transcriber/diarizer.py
def _stitch_wav_files(chunk_paths: list[Path], output_path: Path) -> None:
"""Concatenate multiple WAV files into a single file.
All chunks must have the same sample rate, channels, and sample width.
"""
if not chunk_paths:
return
# Read params from first chunk
with wave.open(str(chunk_paths[0]), "rb") as first:
params = first.getparams()
with wave.open(str(output_path), "wb") as out:
out.setparams(params)
for chunk_path in chunk_paths:
if not chunk_path.exists():
continue
with wave.open(str(chunk_path), "rb") as chunk:
out.writeframes(chunk.readframes(chunk.getnframes()))format_timestamp function · python · L13-L20 (8 LOC)tools/meeting-transcriber/meeting_transcriber/formatter.py
def format_timestamp(seconds: float) -> str:
"""Convert seconds to HH:MM:SS format."""
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
if h > 0:
return f"{h:02d}:{m:02d}:{s:02d}"
return f"{m:02d}:{s:02d}"format_transcript_markdown function · python · L23-L72 (50 LOC)tools/meeting-transcriber/meeting_transcriber/formatter.py
def format_transcript_markdown(
meeting: MeetingInfo,
segments: list[TranscriptSegment | SpeakerSegment],
) -> str:
"""Generate brain-compatible markdown transcript.
Output format matches the brain's entity/project file conventions:
YAML-style header, then structured content.
"""
lines = [
f"# Meeting Transcript — {meeting.title}",
"",
f"**Platform:** {meeting.platform}",
f"**Date:** {meeting.started_at.strftime('%Y-%m-%d')}",
f"**Started:** {meeting.started_at.strftime('%H:%M')}",
]
if meeting.ended_at:
lines.append(f"**Ended:** {meeting.ended_at.strftime('%H:%M')}")
lines.append(f"**Duration:** {meeting.duration_minutes:.0f} minutes")
# Include recording reference if available
if meeting.recording_path:
rel_path = meeting.recording_path
# Try to make path relative to brain
try:
rel_path = meeting.recording_path.relative_to(meeting.recording_path.pformat_metadata_json function · python · L75-L115 (41 LOC)tools/meeting-transcriber/meeting_transcriber/formatter.py
def format_metadata_json(
meeting: MeetingInfo,
segments: list[TranscriptSegment | SpeakerSegment],
detected_entities: list[str] | None = None,
) -> dict:
"""Generate JSON metadata sidecar for brain integration."""
speakers = set()
if segments and isinstance(segments[0], SpeakerSegment):
for seg in segments:
if isinstance(seg, SpeakerSegment):
speakers.add(seg.speaker)
word_count = sum(len(seg.text.split()) for seg in segments)
meta = {
"type": "meeting-transcript",
"version": "2.0.0",
"meeting": {
"platform": meeting.platform,
"title": meeting.title,
"date": meeting.started_at.strftime("%Y-%m-%d"),
"started_at": meeting.started_at.isoformat(),
"ended_at": meeting.ended_at.isoformat() if meeting.ended_at else None,
"duration_minutes": round(meeting.duration_minutes, 1),
},
"transcript": {
"segmeformat_intake_summary function · python · L118-L197 (80 LOC)tools/meeting-transcriber/meeting_transcriber/formatter.py
def format_intake_summary(
meeting: MeetingInfo,
segments: list[TranscriptSegment | SpeakerSegment],
brain_path: Path | None = None,
) -> str:
"""Generate an enriched intake summary for the brain's intake system.
Dropped into _intake-dump/files/ for entity linking, action item extraction,
and AI summarization. Includes the full transcript text so Claude can generate
a proper summary without needing to read additional files.
"""
word_count = sum(len(seg.text.split()) for seg in segments)
full_text = "\n".join(
f"[{format_timestamp(seg.start)}] {seg.text}" for seg in segments
)
# Detect entities if brain path available
detected = []
if brain_path:
detected = _detect_entities_in_transcript(
segments, brain_path
)
date_str = meeting.started_at.strftime("%Y-%m-%d")
transcript_ref = f"Operations/meetings/transcripts/{date_str}-{meeting.slug}.md"
recording_ref = ""
if meeting.record_detect_entities_in_transcript function · python · L200-L251 (52 LOC)tools/meeting-transcriber/meeting_transcriber/formatter.py
def _detect_entities_in_transcript(
segments: list[TranscriptSegment | SpeakerSegment],
brain_path: Path,
) -> list[str]:
"""Scan transcript text against ENTITY-INDEX.md names and aliases.
Returns a list of entity names that appear in the transcript.
"""
entity_index = brain_path / "Operations" / "entity-watchdog" / "ENTITY-INDEX.md"
if not entity_index.exists():
return []
# Parse entity names and aliases from the index
try:
content = entity_index.read_text(encoding="utf-8")
except Exception:
return []
# Build keyword set from entity names
# The ENTITY-INDEX.md typically has lines like:
# | Entity Name | Type | Aliases | ...
keywords: dict[str, str] = {} # lowercase keyword → entity name
for line in content.splitlines():
if not line.startswith("|") or "---" in line or "Entity" in line:
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 3:
save_transcript function · python · L254-L291 (38 LOC)tools/meeting-transcriber/meeting_transcriber/formatter.py
def save_transcript(
brain_path: Path,
meeting: MeetingInfo,
segments: list[TranscriptSegment | SpeakerSegment],
) -> Path:
"""Save transcript markdown, JSON metadata, and intake summary to brain.
Returns the path to the saved transcript markdown file.
"""
date_str = meeting.started_at.strftime("%Y-%m-%d")
filename = f"{date_str}-{meeting.slug}"
# Detect entities for metadata
detected_entities = _detect_entities_in_transcript(segments, brain_path)
# Transcript directory
transcript_dir = brain_path / "Operations" / "meetings" / "transcripts"
transcript_dir.mkdir(parents=True, exist_ok=True)
# Save markdown transcript
md_path = transcript_dir / f"{filename}.md"
md_path.write_text(format_transcript_markdown(meeting, segments), encoding="utf-8")
# Save JSON metadata sidecar
meta_path = transcript_dir / f"{filename}.meta.json"
meta = format_metadata_json(meeting, segments, detected_entities=detected_entities)
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
MeetingInfo class · python · L12-L44 (33 LOC)tools/meeting-transcriber/meeting_transcriber/models.py
class MeetingInfo:
"""Metadata about a detected or active meeting."""
platform: str # zoom, meet, teams, slack, discord, unknown
title: str
started_at: datetime
ended_at: datetime | None = None
process_name: str = ""
window_title: str = ""
audio_chunks: list[Path] = field(default_factory=list)
transcript_path: Path | None = None
recording_path: Path | None = None
@property
def slug(self) -> str:
"""URL-safe slug for filenames."""
safe = self.title.lower().replace(" ", "-")
safe = "".join(c for c in safe if c.isalnum() or c == "-")
return safe[:60] or self.platform
@property
def duration_minutes(self) -> float:
end = self.ended_at or datetime.now()
return (end - self.started_at).total_seconds() / 60
def to_dict(self) -> dict:
d = asdict(self)
d["started_at"] = self.started_at.isoformat()
d["ended_at"] = self.ended_at.isoformat() if self.ended_at else Nto_dict method · python · L37-L44 (8 LOC)tools/meeting-transcriber/meeting_transcriber/models.py
def to_dict(self) -> dict:
d = asdict(self)
d["started_at"] = self.started_at.isoformat()
d["ended_at"] = self.ended_at.isoformat() if self.ended_at else None
d["audio_chunks"] = [str(p) for p in self.audio_chunks]
d["transcript_path"] = str(self.transcript_path) if self.transcript_path else None
d["recording_path"] = str(self.recording_path) if self.recording_path else None
return dTranscriptSegment class · python · L48-L55 (8 LOC)tools/meeting-transcriber/meeting_transcriber/models.py
class TranscriptSegment:
"""A single transcribed segment from Whisper."""
start: float # seconds
end: float
text: str
language: str = "en"
probability: float = 0.0SpeakerSegment class · python · L59-L66 (8 LOC)tools/meeting-transcriber/meeting_transcriber/models.py
class SpeakerSegment:
"""A transcript segment with speaker attribution."""
start: float
end: float
text: str
speaker: str = "Unknown"
language: str = "en"DaemonStatus class · python · L70-L113 (44 LOC)tools/meeting-transcriber/meeting_transcriber/models.py
class DaemonStatus:
"""Status of the meeting transcription daemon."""
running: bool = False
pid: int | None = None
meeting_active: bool = False
current_meeting: MeetingInfo | None = None
chunks_recorded: int = 0
chunks_transcribed: int = 0
started_at: str | None = None
last_check: str | None = None
def save(self, path: Path) -> None:
data = {
"running": self.running,
"pid": self.pid,
"meeting_active": self.meeting_active,
"chunks_recorded": self.chunks_recorded,
"chunks_transcribed": self.chunks_transcribed,
"started_at": self.started_at,
"last_check": self.last_check,
}
if self.current_meeting:
data["current_meeting"] = self.current_meeting.to_dict()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))
@classmethod
def load(cls, path: Path) -> DaemonStatus:
save method · python · L82-L95 (14 LOC)tools/meeting-transcriber/meeting_transcriber/models.py
def save(self, path: Path) -> None:
data = {
"running": self.running,
"pid": self.pid,
"meeting_active": self.meeting_active,
"chunks_recorded": self.chunks_recorded,
"chunks_transcribed": self.chunks_transcribed,
"started_at": self.started_at,
"last_check": self.last_check,
}
if self.current_meeting:
data["current_meeting"] = self.current_meeting.to_dict()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, indent=2))load method · python · L98-L113 (16 LOC)tools/meeting-transcriber/meeting_transcriber/models.py
def load(cls, path: Path) -> DaemonStatus:
if not path.exists():
return cls()
try:
data = json.loads(path.read_text())
return cls(
running=data.get("running", False),
pid=data.get("pid"),
meeting_active=data.get("meeting_active", False),
chunks_recorded=data.get("chunks_recorded", 0),
chunks_transcribed=data.get("chunks_transcribed", 0),
started_at=data.get("started_at"),
last_check=data.get("last_check"),
)
except (json.JSONDecodeError, KeyError):
return cls()_find_blackhole_device function · python · L17-L24 (8 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
def _find_blackhole_device() -> int | None:
"""Find the BlackHole virtual audio device index."""
devices = sd.query_devices()
for i, dev in enumerate(devices):
name = dev["name"].lower()
if "blackhole" in name and dev["max_input_channels"] > 0:
return i
return NoneWant this analysis on your repo? https://repobility.com/scan/
BlackHoleRecorder class · python · L27-L144 (118 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
class BlackHoleRecorder:
"""Records system audio via BlackHole virtual audio device on macOS.
Requires BlackHole (https://existential.audio/blackhole/) to be installed
and configured as part of a Multi-Output Device in Audio MIDI Setup so that
system audio is routed to both speakers and BlackHole simultaneously.
"""
def __init__(self, output_dir: Path, chunk_seconds: int = CHUNK_DURATION_SEC):
self.output_dir = output_dir
self.chunk_seconds = chunk_seconds
self._recording = False
self._thread: threading.Thread | None = None
self._chunks: list[Path] = []
self._lock = threading.Lock()
@property
def chunks(self) -> list[Path]:
with self._lock:
return list(self._chunks)
def start(self) -> None:
"""Start recording system audio in background thread."""
if self._recording:
return
self.output_dir.mkdir(parents=True, exist_ok=True)
self._recordi__init__ method · python · L35-L41 (7 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
def __init__(self, output_dir: Path, chunk_seconds: int = CHUNK_DURATION_SEC):
self.output_dir = output_dir
self.chunk_seconds = chunk_seconds
self._recording = False
self._thread: threading.Thread | None = None
self._chunks: list[Path] = []
self._lock = threading.Lock()start method · python · L48-L55 (8 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
def start(self) -> None:
"""Start recording system audio in background thread."""
if self._recording:
return
self.output_dir.mkdir(parents=True, exist_ok=True)
self._recording = True
self._thread = threading.Thread(target=self._record_loop, daemon=True)
self._thread.start()stop method · python · L57-L63 (7 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
def stop(self) -> list[Path]:
"""Stop recording and return list of chunk file paths."""
self._recording = False
if self._thread:
self._thread.join(timeout=10)
self._thread = None
return self.chunks_record_loop method · python · L65-L82 (18 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
def _record_loop(self) -> None:
"""Main recording loop — captures audio in chunks."""
device_idx = _find_blackhole_device()
if device_idx is None:
print(
"Error: BlackHole audio device not found.\n"
"Install BlackHole: https://existential.audio/blackhole/\n"
"Then run: bizbrain-meetings setup"
)
return
chunk_idx = 0
while self._recording:
chunk_path = self.output_dir / f"chunk_{chunk_idx:04d}.wav"
self._record_chunk(device_idx, chunk_path)
with self._lock:
self._chunks.append(chunk_path)
chunk_idx += 1_record_chunk method · python · L84-L144 (61 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
def _record_chunk(self, device_idx: int, output_path: Path) -> None:
"""Record a single chunk of audio to a WAV file."""
device_info = sd.query_devices(device_idx)
device_rate = int(device_info["default_samplerate"])
device_channels = min(device_info["max_input_channels"], 2)
# Calculate total frames for this chunk
total_samples = device_rate * self.chunk_seconds
# Record the chunk (blocking, but we check _recording via callback)
frames_collected: list[np.ndarray] = []
samples_so_far = 0
block_size = 1024
def callback(indata, frames, time_info, status):
nonlocal samples_so_far
if not self._recording:
raise sd.CallbackAbort
frames_collected.append(indata.copy())
samples_so_far += frames
try:
with sd.InputStream(
samplerate=device_rate,
channels=device_channels,
dWASAPILoopbackRecorder class · python · L18-L156 (139 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
class WASAPILoopbackRecorder:
"""Records system audio via WASAPI loopback into WAV chunks.
Uses pyaudiowpatch to capture whatever is playing through the system's
default output device. Windows-only.
"""
def __init__(self, output_dir: Path, chunk_seconds: int = CHUNK_DURATION_SEC):
self.output_dir = output_dir
self.chunk_seconds = chunk_seconds
self._recording = False
self._thread: threading.Thread | None = None
self._chunks: list[Path] = []
self._lock = threading.Lock()
@property
def chunks(self) -> list[Path]:
with self._lock:
return list(self._chunks)
def start(self) -> None:
"""Start recording system audio in background thread."""
if self._recording:
return
self.output_dir.mkdir(parents=True, exist_ok=True)
self._recording = True
self._thread = threading.Thread(target=self._record_loop, daemon=True)
self._thread.start()
__init__ method · python · L25-L31 (7 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
def __init__(self, output_dir: Path, chunk_seconds: int = CHUNK_DURATION_SEC):
self.output_dir = output_dir
self.chunk_seconds = chunk_seconds
self._recording = False
self._thread: threading.Thread | None = None
self._chunks: list[Path] = []
self._lock = threading.Lock()Repobility (the analyzer behind this table) · https://repobility.com
start method · python · L38-L45 (8 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
def start(self) -> None:
"""Start recording system audio in background thread."""
if self._recording:
return
self.output_dir.mkdir(parents=True, exist_ok=True)
self._recording = True
self._thread = threading.Thread(target=self._record_loop, daemon=True)
self._thread.start()stop method · python · L47-L53 (7 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
def stop(self) -> list[Path]:
"""Stop recording and return list of chunk file paths."""
self._recording = False
if self._thread:
self._thread.join(timeout=10)
self._thread = None
return self.chunks_record_loop method · python · L55-L74 (20 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
def _record_loop(self) -> None:
"""Main recording loop — captures audio in chunks."""
import pyaudiowpatch as pyaudio
pa = pyaudio.PyAudio()
try:
device = self._find_loopback_device(pa)
if device is None:
print("Error: No WASAPI loopback device found.")
return
chunk_idx = 0
while self._recording:
chunk_path = self.output_dir / f"chunk_{chunk_idx:04d}.wav"
self._record_chunk(pa, device, chunk_path)
with self._lock:
self._chunks.append(chunk_path)
chunk_idx += 1
finally:
pa.terminate()