← back to drzachconner__bizbrain-os-plugin

Function bodies 108 total

All specs Real LLM only Function bodies
_cleanup_old_audio method · python · L233-L247 (15 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def _cleanup_old_audio(self) -> None:
        """Delete audio chunks older than retention period.

        When audio_retention_days is None, audio is kept forever.
        """
        if self.audio_retention_days is None:
            return
        if not self._audio_dir.exists():
            return
        cutoff = time.time() - (self.audio_retention_days * 86400)
        for session_dir in self._audio_dir.iterdir():
            if session_dir.is_dir() and session_dir.stat().st_mtime < cutoff:
                for f in session_dir.iterdir():
                    f.unlink()
                session_dir.rmdir()
_update_status method · python · L249-L257 (9 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def _update_status(self, **kwargs) -> None:
        status = DaemonStatus.load(self._status_file)
        status.pid = os.getpid()
        status.last_check = datetime.now().isoformat()
        for k, v in kwargs.items():
            setattr(status, k, v)
        if self._recorder:
            status.chunks_recorded = len(self._recorder.chunks)
        status.save(self._status_file)
_handle_signal method · python · L259-L261 (3 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def _handle_signal(self, signum, frame) -> None:
        print(f"\nReceived signal {signum} — stopping...")
        self._running = False
_cleanup method · python · L263-L274 (12 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def _cleanup(self) -> None:
        """Final cleanup on daemon exit."""
        if self._current_meeting:
            # _on_meeting_end handles stopping the recorder internally
            self._on_meeting_end()
        elif self._recorder:
            # No active meeting but recorder running — just stop it
            self._recorder.stop()
        if self._pid_file.exists():
            self._pid_file.unlink()
        self._update_status(running=False, meeting_active=False)
        print("Daemon stopped.")
_stitch_wav_chunks function · python · L277-L292 (16 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
def _stitch_wav_chunks(chunk_paths: list[Path], output_path: Path) -> None:
    """Concatenate WAV chunks into a single file (standalone fallback)."""
    import wave

    valid_paths = [p for p in chunk_paths if p.exists()]
    if not valid_paths:
        return

    with wave.open(str(valid_paths[0]), "rb") as first:
        params = first.getparams()

    with wave.open(str(output_path), "wb") as out:
        out.setparams(params)
        for chunk_path in valid_paths:
            with wave.open(str(chunk_path), "rb") as chunk:
                out.writeframes(chunk.readframes(chunk.getnframes()))
DetectedMeeting class · python · L9-L15 (7 LOC)
tools/meeting-transcriber/meeting_transcriber/detector_base.py
class DetectedMeeting:
    """A detected meeting process."""

    platform: str
    process_name: str
    window_title: str
    pid: int
MacOSDetector class · python · L39-L86 (48 LOC)
tools/meeting-transcriber/meeting_transcriber/_detector_macos.py
class MacOSDetector:
    """Meeting detection using macOS APIs."""

    @staticmethod
    def detect() -> DetectedMeeting | None:
        """Scan running processes for active meeting applications.

        Returns the first detected meeting, or None if no meeting is active.
        """
        for proc in psutil.process_iter(["pid", "name"]):
            try:
                name = proc.info["name"] or ""
                pid = proc.info["pid"]
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue

            for proc_pattern, platform, title_pattern in MEETING_PATTERNS:
                if not re.search(proc_pattern, name, re.IGNORECASE):
                    continue

                if title_pattern:
                    window_title = _get_window_title(pid)
                    if not window_title or not re.search(title_pattern, window_title, re.IGNORECASE):
                        continue
                    return DetectedMeeting(
                 
Want this analysis on your repo? https://repobility.com/scan/
detect method · python · L43-L77 (35 LOC)
tools/meeting-transcriber/meeting_transcriber/_detector_macos.py
    def detect() -> DetectedMeeting | None:
        """Scan running processes for active meeting applications.

        Returns the first detected meeting, or None if no meeting is active.
        """
        for proc in psutil.process_iter(["pid", "name"]):
            try:
                name = proc.info["name"] or ""
                pid = proc.info["pid"]
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue

            for proc_pattern, platform, title_pattern in MEETING_PATTERNS:
                if not re.search(proc_pattern, name, re.IGNORECASE):
                    continue

                if title_pattern:
                    window_title = _get_window_title(pid)
                    if not window_title or not re.search(title_pattern, window_title, re.IGNORECASE):
                        continue
                    return DetectedMeeting(
                        platform=platform,
                        process_name=name,
                 
is_still_active method · python · L80-L86 (7 LOC)
tools/meeting-transcriber/meeting_transcriber/_detector_macos.py
    def is_still_active(meeting: DetectedMeeting) -> bool:
        """Check if a previously detected meeting is still running."""
        try:
            proc = psutil.Process(meeting.pid)
            return proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            return False
_get_window_title function · python · L89-L111 (23 LOC)
tools/meeting-transcriber/meeting_transcriber/_detector_macos.py
def _get_window_title(pid: int) -> str:
    """Get the window title for a process via Quartz CGWindowList."""
    if not _QUARTZ_AVAILABLE:
        # Without Quartz, fall back to process name only
        try:
            proc = psutil.Process(pid)
            return proc.name()
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            return ""

    try:
        window_list = CGWindowListCopyWindowInfo(
            kCGWindowListOptionOnScreenOnly, kCGNullWindowID
        )
        for window in window_list:
            owner_pid = window.get("kCGWindowOwnerPID", 0)
            if owner_pid == pid:
                title = window.get("kCGWindowName", "")
                if title:
                    return title
        return ""
    except Exception:
        return ""
detect_meeting function · python · L19-L23 (5 LOC)
tools/meeting-transcriber/meeting_transcriber/detector.py
def detect_meeting() -> DetectedMeeting | None:
    """Scan running processes for active meeting applications."""
    if _Detector is None:
        return None
    return _Detector.detect()
is_meeting_still_active function · python · L26-L30 (5 LOC)
tools/meeting-transcriber/meeting_transcriber/detector.py
def is_meeting_still_active(meeting: DetectedMeeting) -> bool:
    """Check if a previously detected meeting is still running."""
    if _Detector is None:
        return False
    return _Detector.is_still_active(meeting)
WindowsDetector class · python · L25-L72 (48 LOC)
tools/meeting-transcriber/meeting_transcriber/_detector_windows.py
class WindowsDetector:
    """Meeting detection using Win32 APIs."""

    @staticmethod
    def detect() -> DetectedMeeting | None:
        """Scan running processes for active meeting applications.

        Returns the first detected meeting, or None if no meeting is active.
        """
        for proc in psutil.process_iter(["pid", "name"]):
            try:
                name = proc.info["name"] or ""
                pid = proc.info["pid"]
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue

            for proc_pattern, platform, title_pattern in MEETING_PATTERNS:
                if not re.search(proc_pattern, name, re.IGNORECASE):
                    continue

                if title_pattern:
                    window_title = _get_window_title(pid)
                    if not window_title or not re.search(title_pattern, window_title, re.IGNORECASE):
                        continue
                    return DetectedMeeting(
               
detect method · python · L29-L63 (35 LOC)
tools/meeting-transcriber/meeting_transcriber/_detector_windows.py
    def detect() -> DetectedMeeting | None:
        """Scan running processes for active meeting applications.

        Returns the first detected meeting, or None if no meeting is active.
        """
        for proc in psutil.process_iter(["pid", "name"]):
            try:
                name = proc.info["name"] or ""
                pid = proc.info["pid"]
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                continue

            for proc_pattern, platform, title_pattern in MEETING_PATTERNS:
                if not re.search(proc_pattern, name, re.IGNORECASE):
                    continue

                if title_pattern:
                    window_title = _get_window_title(pid)
                    if not window_title or not re.search(title_pattern, window_title, re.IGNORECASE):
                        continue
                    return DetectedMeeting(
                        platform=platform,
                        process_name=name,
                 
is_still_active method · python · L66-L72 (7 LOC)
tools/meeting-transcriber/meeting_transcriber/_detector_windows.py
    def is_still_active(meeting: DetectedMeeting) -> bool:
        """Check if a previously detected meeting is still running."""
        try:
            proc = psutil.Process(meeting.pid)
            return proc.is_running() and proc.status() != psutil.STATUS_ZOMBIE
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            return False
Repobility (the analyzer behind this table) · https://repobility.com
_get_window_title function · python · L75-L101 (27 LOC)
tools/meeting-transcriber/meeting_transcriber/_detector_windows.py
def _get_window_title(pid: int) -> str:
    """Get the window title for a process via ctypes/user32."""
    try:
        import ctypes
        from ctypes import wintypes

        user32 = ctypes.windll.user32
        titles: list[str] = []

        def enum_callback(hwnd, _):
            if not user32.IsWindowVisible(hwnd):
                return True
            window_pid = wintypes.DWORD()
            user32.GetWindowThreadProcessId(hwnd, ctypes.byref(window_pid))
            if window_pid.value == pid:
                length = user32.GetWindowTextLengthW(hwnd)
                if length > 0:
                    buf = ctypes.create_unicode_buffer(length + 1)
                    user32.GetWindowTextW(hwnd, buf, length + 1)
                    titles.append(buf.value)
            return True

        WNDENUMPROC = ctypes.WINFUNCTYPE(ctypes.c_bool, wintypes.HWND, wintypes.LPARAM)
        user32.EnumWindows(WNDENUMPROC(enum_callback), 0)
        return titles[0] if titles else ""
    ex
SpeakerDiarizer class · python · L25-L127 (103 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
class SpeakerDiarizer:
    """Identifies speakers in audio and merges with transcript segments.

    Uses pyannote-audio's pretrained speaker diarization pipeline.
    Requires a HuggingFace auth token with access to pyannote/speaker-diarization-3.1.
    """

    def __init__(self, hf_token: str | None = None):
        if not DIARIZATION_AVAILABLE:
            raise RuntimeError(
                "Speaker diarization requires pyannote-audio. "
                "Install with: uv pip install bizbrain-meetings[diarization]"
            )
        self.hf_token = hf_token
        self._pipeline = None

    def _load_pipeline(self):
        if self._pipeline is not None:
            return
        self._pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1",
            use_auth_token=self.hf_token,
        )

    def diarize_and_merge(
        self,
        audio_path: Path,
        segments: list[TranscriptSegment],
    ) -> list[SpeakerSegment]:
        """Run d
__init__ method · python · L32-L39 (8 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
    def __init__(self, hf_token: str | None = None):
        if not DIARIZATION_AVAILABLE:
            raise RuntimeError(
                "Speaker diarization requires pyannote-audio. "
                "Install with: uv pip install bizbrain-meetings[diarization]"
            )
        self.hf_token = hf_token
        self._pipeline = None
_load_pipeline method · python · L41-L47 (7 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
    def _load_pipeline(self):
        if self._pipeline is not None:
            return
        self._pipeline = Pipeline.from_pretrained(
            "pyannote/speaker-diarization-3.1",
            use_auth_token=self.hf_token,
        )
diarize_and_merge method · python · L49-L63 (15 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
    def diarize_and_merge(
        self,
        audio_path: Path,
        segments: list[TranscriptSegment],
    ) -> list[SpeakerSegment]:
        """Run diarization on a single audio file and merge with transcript segments.

        Each transcript segment gets the speaker label from whoever spoke
        during the majority of that segment's time range.
        """
        self._load_pipeline()
        diarization = self._pipeline(str(audio_path))

        speaker_timeline = _build_speaker_timeline(diarization)
        return _merge_speakers(segments, speaker_timeline)
diarize_full_meeting method · python · L65-L109 (45 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
    def diarize_full_meeting(
        self,
        chunk_paths: list[Path],
        segments: list[TranscriptSegment],
        tmp_dir: Path | None = None,
    ) -> list[SpeakerSegment]:
        """Stitch all audio chunks and run diarization on the full meeting.

        Fixes the single-chunk diarization bug by concatenating all WAV chunks
        into one temp file, running pyannote on the complete audio, then cleaning up.
        """
        if not chunk_paths:
            return [
                SpeakerSegment(
                    start=s.start, end=s.end, text=s.text,
                    speaker="Unknown", language=s.language,
                )
                for s in segments
            ]

        # Single chunk — no stitching needed
        if len(chunk_paths) == 1:
            return self.diarize_and_merge(chunk_paths[0], segments)

        # Stitch all chunks into a single temp WAV
        self._load_pipeline()
        cleanup_tmp_dir = tmp_dir is None
        if tmp_dir i
_find_dominant_speaker method · python · L112-L127 (16 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
    def _find_dominant_speaker(
        start: float,
        end: float,
        timeline: list[tuple[float, float, str]],
    ) -> str:
        """Find which speaker talked most during [start, end]."""
        overlap: dict[str, float] = {}
        for t_start, t_end, speaker in timeline:
            o_start = max(start, t_start)
            o_end = min(end, t_end)
            if o_start < o_end:
                overlap[speaker] = overlap.get(speaker, 0) + (o_end - o_start)

        if not overlap:
            return "Unknown"
        return max(overlap, key=overlap.get)
_build_speaker_timeline function · python · L130-L135 (6 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
def _build_speaker_timeline(diarization) -> list[tuple[float, float, str]]:
    """Extract speaker timeline from pyannote diarization result."""
    timeline: list[tuple[float, float, str]] = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        timeline.append((turn.start, turn.end, speaker))
    return timeline
Powered by Repobility — scan your code at https://repobility.com
_merge_speakers function · python · L138-L155 (18 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
def _merge_speakers(
    segments: list[TranscriptSegment],
    speaker_timeline: list[tuple[float, float, str]],
) -> list[SpeakerSegment]:
    """Assign each transcript segment to its dominant speaker."""
    result = []
    for seg in segments:
        speaker = SpeakerDiarizer._find_dominant_speaker(
            seg.start, seg.end, speaker_timeline
        )
        result.append(SpeakerSegment(
            start=seg.start,
            end=seg.end,
            text=seg.text,
            speaker=speaker,
            language=seg.language,
        ))
    return result
_stitch_wav_files function · python · L158-L176 (19 LOC)
tools/meeting-transcriber/meeting_transcriber/diarizer.py
def _stitch_wav_files(chunk_paths: list[Path], output_path: Path) -> None:
    """Concatenate multiple WAV files into a single file.

    All chunks must have the same sample rate, channels, and sample width.
    """
    if not chunk_paths:
        return

    # Read params from first chunk
    with wave.open(str(chunk_paths[0]), "rb") as first:
        params = first.getparams()

    with wave.open(str(output_path), "wb") as out:
        out.setparams(params)
        for chunk_path in chunk_paths:
            if not chunk_path.exists():
                continue
            with wave.open(str(chunk_path), "rb") as chunk:
                out.writeframes(chunk.readframes(chunk.getnframes()))
format_timestamp function · python · L13-L20 (8 LOC)
tools/meeting-transcriber/meeting_transcriber/formatter.py
def format_timestamp(seconds: float) -> str:
    """Convert seconds to HH:MM:SS format."""
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = int(seconds % 60)
    if h > 0:
        return f"{h:02d}:{m:02d}:{s:02d}"
    return f"{m:02d}:{s:02d}"
format_transcript_markdown function · python · L23-L72 (50 LOC)
tools/meeting-transcriber/meeting_transcriber/formatter.py
def format_transcript_markdown(
    meeting: MeetingInfo,
    segments: list[TranscriptSegment | SpeakerSegment],
) -> str:
    """Generate brain-compatible markdown transcript.

    Output format matches the brain's entity/project file conventions:
    YAML-style header, then structured content.
    """
    lines = [
        f"# Meeting Transcript — {meeting.title}",
        "",
        f"**Platform:** {meeting.platform}",
        f"**Date:** {meeting.started_at.strftime('%Y-%m-%d')}",
        f"**Started:** {meeting.started_at.strftime('%H:%M')}",
    ]

    if meeting.ended_at:
        lines.append(f"**Ended:** {meeting.ended_at.strftime('%H:%M')}")
        lines.append(f"**Duration:** {meeting.duration_minutes:.0f} minutes")

    # Include recording reference if available
    if meeting.recording_path:
        rel_path = meeting.recording_path
        # Try to make path relative to brain
        try:
            rel_path = meeting.recording_path.relative_to(meeting.recording_path.p
format_metadata_json function · python · L75-L115 (41 LOC)
tools/meeting-transcriber/meeting_transcriber/formatter.py
def format_metadata_json(
    meeting: MeetingInfo,
    segments: list[TranscriptSegment | SpeakerSegment],
    detected_entities: list[str] | None = None,
) -> dict:
    """Generate JSON metadata sidecar for brain integration."""
    speakers = set()
    if segments and isinstance(segments[0], SpeakerSegment):
        for seg in segments:
            if isinstance(seg, SpeakerSegment):
                speakers.add(seg.speaker)

    word_count = sum(len(seg.text.split()) for seg in segments)

    meta = {
        "type": "meeting-transcript",
        "version": "2.0.0",
        "meeting": {
            "platform": meeting.platform,
            "title": meeting.title,
            "date": meeting.started_at.strftime("%Y-%m-%d"),
            "started_at": meeting.started_at.isoformat(),
            "ended_at": meeting.ended_at.isoformat() if meeting.ended_at else None,
            "duration_minutes": round(meeting.duration_minutes, 1),
        },
        "transcript": {
            "segme
format_intake_summary function · python · L118-L197 (80 LOC)
tools/meeting-transcriber/meeting_transcriber/formatter.py
def format_intake_summary(
    meeting: MeetingInfo,
    segments: list[TranscriptSegment | SpeakerSegment],
    brain_path: Path | None = None,
) -> str:
    """Generate an enriched intake summary for the brain's intake system.

    Dropped into _intake-dump/files/ for entity linking, action item extraction,
    and AI summarization. Includes the full transcript text so Claude can generate
    a proper summary without needing to read additional files.
    """
    word_count = sum(len(seg.text.split()) for seg in segments)
    full_text = "\n".join(
        f"[{format_timestamp(seg.start)}] {seg.text}" for seg in segments
    )

    # Detect entities if brain path available
    detected = []
    if brain_path:
        detected = _detect_entities_in_transcript(
            segments, brain_path
        )

    date_str = meeting.started_at.strftime("%Y-%m-%d")
    transcript_ref = f"Operations/meetings/transcripts/{date_str}-{meeting.slug}.md"

    recording_ref = ""
    if meeting.record
_detect_entities_in_transcript function · python · L200-L251 (52 LOC)
tools/meeting-transcriber/meeting_transcriber/formatter.py
def _detect_entities_in_transcript(
    segments: list[TranscriptSegment | SpeakerSegment],
    brain_path: Path,
) -> list[str]:
    """Scan transcript text against ENTITY-INDEX.md names and aliases.

    Returns a list of entity names that appear in the transcript.
    """
    entity_index = brain_path / "Operations" / "entity-watchdog" / "ENTITY-INDEX.md"
    if not entity_index.exists():
        return []

    # Parse entity names and aliases from the index
    try:
        content = entity_index.read_text(encoding="utf-8")
    except Exception:
        return []

    # Build keyword set from entity names
    # The ENTITY-INDEX.md typically has lines like:
    # | Entity Name | Type | Aliases | ...
    keywords: dict[str, str] = {}  # lowercase keyword → entity name
    for line in content.splitlines():
        if not line.startswith("|") or "---" in line or "Entity" in line:
            continue
        parts = [p.strip() for p in line.split("|")]
        if len(parts) >= 3:
     
save_transcript function · python · L254-L291 (38 LOC)
tools/meeting-transcriber/meeting_transcriber/formatter.py
def save_transcript(
    brain_path: Path,
    meeting: MeetingInfo,
    segments: list[TranscriptSegment | SpeakerSegment],
) -> Path:
    """Save transcript markdown, JSON metadata, and intake summary to brain.

    Returns the path to the saved transcript markdown file.
    """
    date_str = meeting.started_at.strftime("%Y-%m-%d")
    filename = f"{date_str}-{meeting.slug}"

    # Detect entities for metadata
    detected_entities = _detect_entities_in_transcript(segments, brain_path)

    # Transcript directory
    transcript_dir = brain_path / "Operations" / "meetings" / "transcripts"
    transcript_dir.mkdir(parents=True, exist_ok=True)

    # Save markdown transcript
    md_path = transcript_dir / f"{filename}.md"
    md_path.write_text(format_transcript_markdown(meeting, segments), encoding="utf-8")

    # Save JSON metadata sidecar
    meta_path = transcript_dir / f"{filename}.meta.json"
    meta = format_metadata_json(meeting, segments, detected_entities=detected_entities)
 
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
MeetingInfo class · python · L12-L44 (33 LOC)
tools/meeting-transcriber/meeting_transcriber/models.py
class MeetingInfo:
    """Metadata about a detected or active meeting."""

    platform: str  # zoom, meet, teams, slack, discord, unknown
    title: str
    started_at: datetime
    ended_at: datetime | None = None
    process_name: str = ""
    window_title: str = ""
    audio_chunks: list[Path] = field(default_factory=list)
    transcript_path: Path | None = None
    recording_path: Path | None = None

    @property
    def slug(self) -> str:
        """URL-safe slug for filenames."""
        safe = self.title.lower().replace(" ", "-")
        safe = "".join(c for c in safe if c.isalnum() or c == "-")
        return safe[:60] or self.platform

    @property
    def duration_minutes(self) -> float:
        end = self.ended_at or datetime.now()
        return (end - self.started_at).total_seconds() / 60

    def to_dict(self) -> dict:
        d = asdict(self)
        d["started_at"] = self.started_at.isoformat()
        d["ended_at"] = self.ended_at.isoformat() if self.ended_at else N
to_dict method · python · L37-L44 (8 LOC)
tools/meeting-transcriber/meeting_transcriber/models.py
    def to_dict(self) -> dict:
        d = asdict(self)
        d["started_at"] = self.started_at.isoformat()
        d["ended_at"] = self.ended_at.isoformat() if self.ended_at else None
        d["audio_chunks"] = [str(p) for p in self.audio_chunks]
        d["transcript_path"] = str(self.transcript_path) if self.transcript_path else None
        d["recording_path"] = str(self.recording_path) if self.recording_path else None
        return d
TranscriptSegment class · python · L48-L55 (8 LOC)
tools/meeting-transcriber/meeting_transcriber/models.py
class TranscriptSegment:
    """A single transcribed segment from Whisper."""

    start: float  # seconds
    end: float
    text: str
    language: str = "en"
    probability: float = 0.0
SpeakerSegment class · python · L59-L66 (8 LOC)
tools/meeting-transcriber/meeting_transcriber/models.py
class SpeakerSegment:
    """A transcript segment with speaker attribution."""

    start: float
    end: float
    text: str
    speaker: str = "Unknown"
    language: str = "en"
DaemonStatus class · python · L70-L113 (44 LOC)
tools/meeting-transcriber/meeting_transcriber/models.py
class DaemonStatus:
    """Status of the meeting transcription daemon."""

    running: bool = False
    pid: int | None = None
    meeting_active: bool = False
    current_meeting: MeetingInfo | None = None
    chunks_recorded: int = 0
    chunks_transcribed: int = 0
    started_at: str | None = None
    last_check: str | None = None

    def save(self, path: Path) -> None:
        data = {
            "running": self.running,
            "pid": self.pid,
            "meeting_active": self.meeting_active,
            "chunks_recorded": self.chunks_recorded,
            "chunks_transcribed": self.chunks_transcribed,
            "started_at": self.started_at,
            "last_check": self.last_check,
        }
        if self.current_meeting:
            data["current_meeting"] = self.current_meeting.to_dict()
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(json.dumps(data, indent=2))

    @classmethod
    def load(cls, path: Path) -> DaemonStatus:
      
save method · python · L82-L95 (14 LOC)
tools/meeting-transcriber/meeting_transcriber/models.py
    def save(self, path: Path) -> None:
        data = {
            "running": self.running,
            "pid": self.pid,
            "meeting_active": self.meeting_active,
            "chunks_recorded": self.chunks_recorded,
            "chunks_transcribed": self.chunks_transcribed,
            "started_at": self.started_at,
            "last_check": self.last_check,
        }
        if self.current_meeting:
            data["current_meeting"] = self.current_meeting.to_dict()
        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(json.dumps(data, indent=2))
load method · python · L98-L113 (16 LOC)
tools/meeting-transcriber/meeting_transcriber/models.py
    def load(cls, path: Path) -> DaemonStatus:
        if not path.exists():
            return cls()
        try:
            data = json.loads(path.read_text())
            return cls(
                running=data.get("running", False),
                pid=data.get("pid"),
                meeting_active=data.get("meeting_active", False),
                chunks_recorded=data.get("chunks_recorded", 0),
                chunks_transcribed=data.get("chunks_transcribed", 0),
                started_at=data.get("started_at"),
                last_check=data.get("last_check"),
            )
        except (json.JSONDecodeError, KeyError):
            return cls()
_find_blackhole_device function · python · L17-L24 (8 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
def _find_blackhole_device() -> int | None:
    """Find the BlackHole virtual audio device index."""
    devices = sd.query_devices()
    for i, dev in enumerate(devices):
        name = dev["name"].lower()
        if "blackhole" in name and dev["max_input_channels"] > 0:
            return i
    return None
Want this analysis on your repo? https://repobility.com/scan/
BlackHoleRecorder class · python · L27-L144 (118 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
class BlackHoleRecorder:
    """Records system audio via BlackHole virtual audio device on macOS.

    Requires BlackHole (https://existential.audio/blackhole/) to be installed
    and configured as part of a Multi-Output Device in Audio MIDI Setup so that
    system audio is routed to both speakers and BlackHole simultaneously.
    """

    def __init__(self, output_dir: Path, chunk_seconds: int = CHUNK_DURATION_SEC):
        self.output_dir = output_dir
        self.chunk_seconds = chunk_seconds
        self._recording = False
        self._thread: threading.Thread | None = None
        self._chunks: list[Path] = []
        self._lock = threading.Lock()

    @property
    def chunks(self) -> list[Path]:
        with self._lock:
            return list(self._chunks)

    def start(self) -> None:
        """Start recording system audio in background thread."""
        if self._recording:
            return
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self._recordi
__init__ method · python · L35-L41 (7 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
    def __init__(self, output_dir: Path, chunk_seconds: int = CHUNK_DURATION_SEC):
        self.output_dir = output_dir
        self.chunk_seconds = chunk_seconds
        self._recording = False
        self._thread: threading.Thread | None = None
        self._chunks: list[Path] = []
        self._lock = threading.Lock()
start method · python · L48-L55 (8 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
    def start(self) -> None:
        """Start recording system audio in background thread."""
        if self._recording:
            return
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self._recording = True
        self._thread = threading.Thread(target=self._record_loop, daemon=True)
        self._thread.start()
stop method · python · L57-L63 (7 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
    def stop(self) -> list[Path]:
        """Stop recording and return list of chunk file paths."""
        self._recording = False
        if self._thread:
            self._thread.join(timeout=10)
            self._thread = None
        return self.chunks
_record_loop method · python · L65-L82 (18 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
    def _record_loop(self) -> None:
        """Main recording loop — captures audio in chunks."""
        device_idx = _find_blackhole_device()
        if device_idx is None:
            print(
                "Error: BlackHole audio device not found.\n"
                "Install BlackHole: https://existential.audio/blackhole/\n"
                "Then run: bizbrain-meetings setup"
            )
            return

        chunk_idx = 0
        while self._recording:
            chunk_path = self.output_dir / f"chunk_{chunk_idx:04d}.wav"
            self._record_chunk(device_idx, chunk_path)
            with self._lock:
                self._chunks.append(chunk_path)
            chunk_idx += 1
_record_chunk method · python · L84-L144 (61 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_macos.py
    def _record_chunk(self, device_idx: int, output_path: Path) -> None:
        """Record a single chunk of audio to a WAV file."""
        device_info = sd.query_devices(device_idx)
        device_rate = int(device_info["default_samplerate"])
        device_channels = min(device_info["max_input_channels"], 2)

        # Calculate total frames for this chunk
        total_samples = device_rate * self.chunk_seconds

        # Record the chunk (blocking, but we check _recording via callback)
        frames_collected: list[np.ndarray] = []
        samples_so_far = 0
        block_size = 1024

        def callback(indata, frames, time_info, status):
            nonlocal samples_so_far
            if not self._recording:
                raise sd.CallbackAbort
            frames_collected.append(indata.copy())
            samples_so_far += frames

        try:
            with sd.InputStream(
                samplerate=device_rate,
                channels=device_channels,
                d
WASAPILoopbackRecorder class · python · L18-L156 (139 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
class WASAPILoopbackRecorder:
    """Records system audio via WASAPI loopback into WAV chunks.

    Uses pyaudiowpatch to capture whatever is playing through the system's
    default output device. Windows-only.
    """

    def __init__(self, output_dir: Path, chunk_seconds: int = CHUNK_DURATION_SEC):
        self.output_dir = output_dir
        self.chunk_seconds = chunk_seconds
        self._recording = False
        self._thread: threading.Thread | None = None
        self._chunks: list[Path] = []
        self._lock = threading.Lock()

    @property
    def chunks(self) -> list[Path]:
        with self._lock:
            return list(self._chunks)

    def start(self) -> None:
        """Start recording system audio in background thread."""
        if self._recording:
            return
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self._recording = True
        self._thread = threading.Thread(target=self._record_loop, daemon=True)
        self._thread.start()

__init__ method · python · L25-L31 (7 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
    def __init__(self, output_dir: Path, chunk_seconds: int = CHUNK_DURATION_SEC):
        self.output_dir = output_dir
        self.chunk_seconds = chunk_seconds
        self._recording = False
        self._thread: threading.Thread | None = None
        self._chunks: list[Path] = []
        self._lock = threading.Lock()
Repobility (the analyzer behind this table) · https://repobility.com
start method · python · L38-L45 (8 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
    def start(self) -> None:
        """Start recording system audio in background thread."""
        if self._recording:
            return
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self._recording = True
        self._thread = threading.Thread(target=self._record_loop, daemon=True)
        self._thread.start()
stop method · python · L47-L53 (7 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
    def stop(self) -> list[Path]:
        """Stop recording and return list of chunk file paths."""
        self._recording = False
        if self._thread:
            self._thread.join(timeout=10)
            self._thread = None
        return self.chunks
_record_loop method · python · L55-L74 (20 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
    def _record_loop(self) -> None:
        """Main recording loop — captures audio in chunks."""
        import pyaudiowpatch as pyaudio

        pa = pyaudio.PyAudio()
        try:
            device = self._find_loopback_device(pa)
            if device is None:
                print("Error: No WASAPI loopback device found.")
                return

            chunk_idx = 0
            while self._recording:
                chunk_path = self.output_dir / f"chunk_{chunk_idx:04d}.wav"
                self._record_chunk(pa, device, chunk_path)
                with self._lock:
                    self._chunks.append(chunk_path)
                chunk_idx += 1
        finally:
            pa.terminate()
‹ prevpage 2 / 3next ›