Function bodies 108 total
_record_chunk method · python · L76-L128 (53 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
def _record_chunk(self, pa, device: dict, output_path: Path) -> None:
"""Record a single chunk of audio to a WAV file."""
import pyaudiowpatch as pyaudio
device_rate = int(device["defaultSampleRate"])
device_channels = min(device["maxInputChannels"], 2)
frames_per_buffer = 512
stream = pa.open(
format=pyaudio.paInt16,
channels=device_channels,
rate=device_rate,
input=True,
input_device_index=device["index"],
frames_per_buffer=frames_per_buffer,
)
frames: list[bytes] = []
total_frames = 0
target_frames = device_rate * self.chunk_seconds
try:
while self._recording and total_frames < target_frames:
data = stream.read(frames_per_buffer, exception_on_overflow=False)
frames.append(data)
total_frames += frames_per_buffer
finally:
stream.stop_stream(_find_loopback_device method · python · L130-L156 (27 LOC)tools/meeting-transcriber/meeting_transcriber/_recorder_windows.py
def _find_loopback_device(self, pa) -> dict | None:
"""Find the WASAPI loopback device for the default output."""
try:
wasapi_info = pa.get_host_api_info_by_type(
__import__("pyaudiowpatch").paWASAPI
)
except OSError:
return None
default_output = pa.get_device_info_by_index(wasapi_info["defaultOutputDevice"])
# Find the loopback device matching the default output
for i in range(pa.get_device_count()):
device = pa.get_device_info_by_index(i)
if (
device.get("isLoopbackDevice", False)
and device["name"].startswith(default_output["name"].split(" (")[0])
):
return device
# Fallback: any loopback device
for i in range(pa.get_device_count()):
device = pa.get_device_info_by_index(i)
if device.get("isLoopbackDevice", False):
return device
retWhisperTranscriber class · python · L15-L115 (101 LOC)tools/meeting-transcriber/meeting_transcriber/transcriber.py
class WhisperTranscriber:
"""Transcribes WAV audio files using faster-whisper.
Models are downloaded on first use (~75MB for base, ~3GB for large-v3).
VAD filtering is enabled by default to skip silence.
"""
def __init__(self, model_size: str = DEFAULT_MODEL, device: str = "auto"):
if model_size not in MODEL_SIZES:
raise ValueError(f"Invalid model size: {model_size}. Choose from {MODEL_SIZES}")
self.model_size = model_size
self.device = device
self._model = None
def _load_model(self):
if self._model is not None:
return
from faster_whisper import WhisperModel
compute_type = "int8" if self.device == "cpu" else "auto"
self._model = WhisperModel(
self.model_size,
device=self.device,
compute_type=compute_type,
)
def transcribe(self, audio_path: Path, language: str | None = None) -> list[TranscriptSegment]:
"""Transcribe __init__ method · python · L22-L27 (6 LOC)tools/meeting-transcriber/meeting_transcriber/transcriber.py
def __init__(self, model_size: str = DEFAULT_MODEL, device: str = "auto"):
if model_size not in MODEL_SIZES:
raise ValueError(f"Invalid model size: {model_size}. Choose from {MODEL_SIZES}")
self.model_size = model_size
self.device = device
self._model = None_load_model method · python · L29-L39 (11 LOC)tools/meeting-transcriber/meeting_transcriber/transcriber.py
def _load_model(self):
if self._model is not None:
return
from faster_whisper import WhisperModel
compute_type = "int8" if self.device == "cpu" else "auto"
self._model = WhisperModel(
self.model_size,
device=self.device,
compute_type=compute_type,
)transcribe method · python · L41-L75 (35 LOC)tools/meeting-transcriber/meeting_transcriber/transcriber.py
def transcribe(self, audio_path: Path, language: str | None = None) -> list[TranscriptSegment]:
"""Transcribe a WAV file and return segments.
Args:
audio_path: Path to 16kHz mono WAV file.
language: ISO language code (e.g., "en"). None for auto-detect.
Returns:
List of TranscriptSegment with timestamps and text.
"""
self._load_model()
segments, info = self._model.transcribe(
str(audio_path),
language=language,
vad_filter=True,
vad_parameters={"min_silence_duration_ms": 500},
beam_size=5,
word_timestamps=False,
)
result = []
for seg in segments:
text = seg.text.strip()
if not text:
continue
result.append(TranscriptSegment(
start=seg.start,
end=seg.end,
text=text,
language=info.language,
transcribe_chunks method · python · L77-L109 (33 LOC)tools/meeting-transcriber/meeting_transcriber/transcriber.py
def transcribe_chunks(
self,
chunk_paths: list[Path],
language: str | None = None,
) -> list[TranscriptSegment]:
"""Transcribe multiple chunks with cumulative timestamps.
Adjusts timestamps so they're continuous across all chunks.
"""
self._load_model()
all_segments: list[TranscriptSegment] = []
time_offset = 0.0
for chunk_path in sorted(chunk_paths):
if not chunk_path.exists():
continue
# Get chunk duration for offset calculation
chunk_duration = self._get_wav_duration(chunk_path)
segments = self.transcribe(chunk_path, language=language)
for seg in segments:
all_segments.append(TranscriptSegment(
start=seg.start + time_offset,
end=seg.end + time_offset,
text=seg.text,
language=seg.language,
probability=segOpen data scored by Repobility · https://repobility.com
_get_wav_duration method · python · L112-L115 (4 LOC)tools/meeting-transcriber/meeting_transcriber/transcriber.py
def _get_wav_duration(path: Path) -> float:
import wave
with wave.open(str(path), "rb") as wf:
return wf.getnframes() / wf.getframerate()‹ prevpage 3 / 3