← back to drzachconner__bizbrain-os-plugin

Function bodies 108 total

All specs Real LLM only Function bodies
_record_chunk method · python · L76-L128 (53 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
    def _record_chunk(self, pa, device: dict, output_path: Path) -> None:
        """Record a single chunk of audio to a WAV file."""
        import pyaudiowpatch as pyaudio

        device_rate = int(device["defaultSampleRate"])
        device_channels = min(device["maxInputChannels"], 2)
        frames_per_buffer = 512

        stream = pa.open(
            format=pyaudio.paInt16,
            channels=device_channels,
            rate=device_rate,
            input=True,
            input_device_index=device["index"],
            frames_per_buffer=frames_per_buffer,
        )

        frames: list[bytes] = []
        total_frames = 0
        target_frames = device_rate * self.chunk_seconds

        try:
            while self._recording and total_frames < target_frames:
                data = stream.read(frames_per_buffer, exception_on_overflow=False)
                frames.append(data)
                total_frames += frames_per_buffer
        finally:
            stream.stop_stream(
_find_loopback_device method · python · L130-L156 (27 LOC)
tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
    def _find_loopback_device(self, pa) -> dict | None:
        """Find the WASAPI loopback device for the default output."""
        try:
            wasapi_info = pa.get_host_api_info_by_type(
                __import__("pyaudiowpatch").paWASAPI
            )
        except OSError:
            return None

        default_output = pa.get_device_info_by_index(wasapi_info["defaultOutputDevice"])

        # Find the loopback device matching the default output
        for i in range(pa.get_device_count()):
            device = pa.get_device_info_by_index(i)
            if (
                device.get("isLoopbackDevice", False)
                and device["name"].startswith(default_output["name"].split(" (")[0])
            ):
                return device

        # Fallback: any loopback device
        for i in range(pa.get_device_count()):
            device = pa.get_device_info_by_index(i)
            if device.get("isLoopbackDevice", False):
                return device

        ret
WhisperTranscriber class · python · L15-L115 (101 LOC)
tools/meeting-transcriber/meeting_transcriber/transcriber.py
class WhisperTranscriber:
    """Transcribes WAV audio files using faster-whisper.

    Models are downloaded on first use (~75MB for base, ~3GB for large-v3).
    VAD filtering is enabled by default to skip silence.
    """

    def __init__(self, model_size: str = DEFAULT_MODEL, device: str = "auto"):
        if model_size not in MODEL_SIZES:
            raise ValueError(f"Invalid model size: {model_size}. Choose from {MODEL_SIZES}")
        self.model_size = model_size
        self.device = device
        self._model = None

    def _load_model(self):
        if self._model is not None:
            return
        from faster_whisper import WhisperModel

        compute_type = "int8" if self.device == "cpu" else "auto"
        self._model = WhisperModel(
            self.model_size,
            device=self.device,
            compute_type=compute_type,
        )

    def transcribe(self, audio_path: Path, language: str | None = None) -> list[TranscriptSegment]:
        """Transcribe 
__init__ method · python · L22-L27 (6 LOC)
tools/meeting-transcriber/meeting_transcriber/transcriber.py
    def __init__(self, model_size: str = DEFAULT_MODEL, device: str = "auto"):
        if model_size not in MODEL_SIZES:
            raise ValueError(f"Invalid model size: {model_size}. Choose from {MODEL_SIZES}")
        self.model_size = model_size
        self.device = device
        self._model = None
_load_model method · python · L29-L39 (11 LOC)
tools/meeting-transcriber/meeting_transcriber/transcriber.py
    def _load_model(self):
        if self._model is not None:
            return
        from faster_whisper import WhisperModel

        compute_type = "int8" if self.device == "cpu" else "auto"
        self._model = WhisperModel(
            self.model_size,
            device=self.device,
            compute_type=compute_type,
        )
transcribe method · python · L41-L75 (35 LOC)
tools/meeting-transcriber/meeting_transcriber/transcriber.py
    def transcribe(self, audio_path: Path, language: str | None = None) -> list[TranscriptSegment]:
        """Transcribe a WAV file and return segments.

        Args:
            audio_path: Path to 16kHz mono WAV file.
            language: ISO language code (e.g., "en"). None for auto-detect.

        Returns:
            List of TranscriptSegment with timestamps and text.
        """
        self._load_model()

        segments, info = self._model.transcribe(
            str(audio_path),
            language=language,
            vad_filter=True,
            vad_parameters={"min_silence_duration_ms": 500},
            beam_size=5,
            word_timestamps=False,
        )

        result = []
        for seg in segments:
            text = seg.text.strip()
            if not text:
                continue
            result.append(TranscriptSegment(
                start=seg.start,
                end=seg.end,
                text=text,
                language=info.language,
 
transcribe_chunks method · python · L77-L109 (33 LOC)
tools/meeting-transcriber/meeting_transcriber/transcriber.py
    def transcribe_chunks(
        self,
        chunk_paths: list[Path],
        language: str | None = None,
    ) -> list[TranscriptSegment]:
        """Transcribe multiple chunks with cumulative timestamps.

        Adjusts timestamps so they're continuous across all chunks.
        """
        self._load_model()
        all_segments: list[TranscriptSegment] = []
        time_offset = 0.0

        for chunk_path in sorted(chunk_paths):
            if not chunk_path.exists():
                continue

            # Get chunk duration for offset calculation
            chunk_duration = self._get_wav_duration(chunk_path)
            segments = self.transcribe(chunk_path, language=language)

            for seg in segments:
                all_segments.append(TranscriptSegment(
                    start=seg.start + time_offset,
                    end=seg.end + time_offset,
                    text=seg.text,
                    language=seg.language,
                    probability=seg
Open data scored by Repobility · https://repobility.com
_get_wav_duration method · python · L112-L115 (4 LOC)
tools/meeting-transcriber/meeting_transcriber/transcriber.py
    def _get_wav_duration(path: Path) -> float:
        import wave
        with wave.open(str(path), "rb") as wf:
            return wf.getnframes() / wf.getframerate()
‹ prevpage 3 / 3