← back to dotradepro__Piper-Master-Trainer

Function bodies 178 total

All specs Real LLM only Function bodies
get_all method · python · L28-L32 (5 LOC)
backend/app/services/project_service.py
    async def get_all(self) -> list[Project]:
        result = await self.db.execute(
            select(Project).order_by(Project.created_at.desc())
        )
        return list(result.scalars().all())
get_by_id method · python · L34-L38 (5 LOC)
backend/app/services/project_service.py
    async def get_by_id(self, project_id: str) -> Project | None:
        result = await self.db.execute(
            select(Project).where(Project.id == project_id)
        )
        return result.scalar_one_or_none()
update method · python · L40-L51 (12 LOC)
backend/app/services/project_service.py
    async def update(self, project_id: str, data: ProjectUpdate) -> Project | None:
        project = await self.get_by_id(project_id)
        if not project:
            return None

        update_data = data.model_dump(exclude_unset=True)
        for field, value in update_data.items():
            setattr(project, field, value)

        await self.db.commit()
        await self.db.refresh(project)
        return project
delete method · python · L53-L65 (13 LOC)
backend/app/services/project_service.py
    async def delete(self, project_id: str) -> bool:
        project = await self.get_by_id(project_id)
        if not project:
            return False

        # Delete project files
        project_dir = settings.projects_path / project_id
        if project_dir.exists():
            shutil.rmtree(project_dir)

        await self.db.delete(project)
        await self.db.commit()
        return True
update_status method · python · L67-L74 (8 LOC)
backend/app/services/project_service.py
    async def update_status(self, project_id: str, status: str) -> Project | None:
        project = await self.get_by_id(project_id)
        if not project:
            return None
        project.status = status
        await self.db.commit()
        await self.db.refresh(project)
        return project
SegmentService class · python · L7-L148 (142 LOC)
backend/app/services/segment_service.py
class SegmentService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def create_batch(self, project_id: str, audio_file_id: str, segments: list[dict]) -> list[Segment]:
        """Створити сегменти з результатів транскрипції."""
        db_segments = []
        for seg_data in segments:
            segment = Segment(
                project_id=project_id,
                audio_file_id=audio_file_id,
                start_time=seg_data["start"],
                end_time=seg_data["end"],
                text=seg_data["text"],
            )
            self.db.add(segment)
            db_segments.append(segment)

        await self.db.commit()
        for seg in db_segments:
            await self.db.refresh(seg)
        return db_segments

    async def get_by_project(self, project_id: str) -> list[Segment]:
        result = await self.db.execute(
            select(Segment)
            .where(Segment.project_id == project_id)
            .order_by(Segment.star
create_batch method · python · L11-L28 (18 LOC)
backend/app/services/segment_service.py
    async def create_batch(self, project_id: str, audio_file_id: str, segments: list[dict]) -> list[Segment]:
        """Створити сегменти з результатів транскрипції."""
        db_segments = []
        for seg_data in segments:
            segment = Segment(
                project_id=project_id,
                audio_file_id=audio_file_id,
                start_time=seg_data["start"],
                end_time=seg_data["end"],
                text=seg_data["text"],
            )
            self.db.add(segment)
            db_segments.append(segment)

        await self.db.commit()
        for seg in db_segments:
            await self.db.refresh(seg)
        return db_segments
Want this analysis on your repo? https://repobility.com/scan/
get_by_project method · python · L30-L36 (7 LOC)
backend/app/services/segment_service.py
    async def get_by_project(self, project_id: str) -> list[Segment]:
        result = await self.db.execute(
            select(Segment)
            .where(Segment.project_id == project_id)
            .order_by(Segment.start_time)
        )
        return list(result.scalars().all())
get_by_audio_file method · python · L38-L44 (7 LOC)
backend/app/services/segment_service.py
    async def get_by_audio_file(self, audio_file_id: str) -> list[Segment]:
        result = await self.db.execute(
            select(Segment)
            .where(Segment.audio_file_id == audio_file_id)
            .order_by(Segment.start_time)
        )
        return list(result.scalars().all())
get_by_id method · python · L46-L50 (5 LOC)
backend/app/services/segment_service.py
    async def get_by_id(self, segment_id: str) -> Segment | None:
        result = await self.db.execute(
            select(Segment).where(Segment.id == segment_id)
        )
        return result.scalar_one_or_none()
update method · python · L52-L63 (12 LOC)
backend/app/services/segment_service.py
    async def update(self, segment_id: str, text: str | None = None, included: bool | None = None) -> Segment | None:
        segment = await self.get_by_id(segment_id)
        if not segment:
            return None
        if text is not None:
            segment.text = text
            segment.text_edited = True
        if included is not None:
            segment.included = included
        await self.db.commit()
        await self.db.refresh(segment)
        return segment
delete method · python · L65-L71 (7 LOC)
backend/app/services/segment_service.py
    async def delete(self, segment_id: str) -> bool:
        segment = await self.get_by_id(segment_id)
        if not segment:
            return False
        await self.db.delete(segment)
        await self.db.commit()
        return True
merge method · python · L73-L106 (34 LOC)
backend/app/services/segment_service.py
    async def merge(self, segment_ids: list[str]) -> Segment | None:
        """Об'єднати кілька сегментів в один."""
        if len(segment_ids) < 2:
            return None

        segments = []
        for sid in segment_ids:
            seg = await self.get_by_id(sid)
            if seg:
                segments.append(seg)

        if len(segments) < 2:
            return None

        segments.sort(key=lambda s: s.start_time)

        # Create merged segment
        merged = Segment(
            project_id=segments[0].project_id,
            audio_file_id=segments[0].audio_file_id,
            start_time=segments[0].start_time,
            end_time=segments[-1].end_time,
            text=" ".join(s.text for s in segments),
            text_edited=True,
        )
        self.db.add(merged)

        # Delete originals
        for seg in segments:
            await self.db.delete(seg)

        await self.db.commit()
        await self.db.refresh(merged)
        return merged
split method · python · L108-L139 (32 LOC)
backend/app/services/segment_service.py
    async def split(self, segment_id: str, split_time: float) -> tuple[Segment, Segment] | None:
        """Розділити сегмент на два в заданій точці часу."""
        segment = await self.get_by_id(segment_id)
        if not segment:
            return None
        if split_time <= segment.start_time or split_time >= segment.end_time:
            return None

        # Create two new segments
        seg1 = Segment(
            project_id=segment.project_id,
            audio_file_id=segment.audio_file_id,
            start_time=segment.start_time,
            end_time=split_time,
            text=segment.text,
            text_edited=True,
        )
        seg2 = Segment(
            project_id=segment.project_id,
            audio_file_id=segment.audio_file_id,
            start_time=split_time,
            end_time=segment.end_time,
            text="",
            text_edited=True,
        )
        self.db.add(seg1)
        self.db.add(seg2)
        await self.db.delete(segment)
 
delete_by_project method · python · L141-L148 (8 LOC)
backend/app/services/segment_service.py
    async def delete_by_project(self, project_id: str) -> int:
        """Видалити всі сегменти проєкту."""
        segments = await self.get_by_project(project_id)
        count = len(segments)
        for seg in segments:
            await self.db.delete(seg)
        await self.db.commit()
        return count
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
SynthesisService class · python · L11-L75 (65 LOC)
backend/app/services/synthesis_service.py
class SynthesisService:
    _voice = None
    _model_path = None

    @classmethod
    def get_voice(cls, onnx_path: str, config_path: str | None = None):
        """Завантажити або повернути кешовану модель."""
        if cls._voice is not None and cls._model_path == onnx_path:
            return cls._voice

        from piper import PiperVoice

        abs_onnx = Path(onnx_path)
        if not abs_onnx.is_absolute():
            abs_onnx = settings.storage_path / onnx_path

        abs_config = None
        if config_path:
            abs_config = Path(config_path)
            if not abs_config.is_absolute():
                abs_config = settings.storage_path / config_path

        logger.info(f"Loading voice: {abs_onnx}")
        cls._voice = PiperVoice.load(str(abs_onnx), str(abs_config) if abs_config else None)
        cls._model_path = onnx_path
        return cls._voice

    @classmethod
    def release(cls):
        cls._voice = None
        cls._model_path = None

    def synt
get_voice method · python · L16-L36 (21 LOC)
backend/app/services/synthesis_service.py
    def get_voice(cls, onnx_path: str, config_path: str | None = None):
        """Завантажити або повернути кешовану модель."""
        if cls._voice is not None and cls._model_path == onnx_path:
            return cls._voice

        from piper import PiperVoice

        abs_onnx = Path(onnx_path)
        if not abs_onnx.is_absolute():
            abs_onnx = settings.storage_path / onnx_path

        abs_config = None
        if config_path:
            abs_config = Path(config_path)
            if not abs_config.is_absolute():
                abs_config = settings.storage_path / config_path

        logger.info(f"Loading voice: {abs_onnx}")
        cls._voice = PiperVoice.load(str(abs_onnx), str(abs_config) if abs_config else None)
        cls._model_path = onnx_path
        return cls._voice
release method · python · L39-L41 (3 LOC)
backend/app/services/synthesis_service.py
    def release(cls):
        cls._voice = None
        cls._model_path = None
synthesize method · python · L43-L75 (33 LOC)
backend/app/services/synthesis_service.py
    def synthesize(
        self,
        onnx_path: str,
        config_path: str | None,
        text: str,
        speaker_id: int | None = None,
        length_scale: float = 1.0,
        noise_scale: float = 0.667,
        noise_w: float = 0.8,
    ) -> bytes:
        """Синтезувати мовлення з тексту."""
        import tempfile
        from piper import SynthesisConfig

        voice = self.get_voice(onnx_path, config_path)

        synth_config = SynthesisConfig(
            speaker_id=speaker_id,
            length_scale=length_scale,
            noise_scale=noise_scale,
            noise_w_scale=noise_w,
        )

        # Use temp file because synthesize_wav needs a real wave.Wave_write
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
            tmp_path = tmp.name

        try:
            with wave.open(tmp_path, "wb") as wav_file:
                voice.synthesize_wav(text, wav_file, syn_config=synth_config)
            return Path(tmp_path).r
TrainingProcess class · python · L16-L23 (8 LOC)
backend/app/services/training_service.py
class TrainingProcess:
    """Контейнер для активного процесу тренування."""
    def __init__(self, run_id: str, process: subprocess.Popen):
        self.run_id = run_id
        self.process = process
        self.metrics: dict = {}
        self.logs: list[str] = []
        self.started_at = datetime.utcnow()
__init__ method · python · L18-L23 (6 LOC)
backend/app/services/training_service.py
    def __init__(self, run_id: str, process: subprocess.Popen):
        self.run_id = run_id
        self.process = process
        self.metrics: dict = {}
        self.logs: list[str] = []
        self.started_at = datetime.utcnow()
TrainingService class · python · L31-L201 (171 LOC)
backend/app/services/training_service.py
class TrainingService:
    def start_training(
        self,
        run_id: str,
        project_id: str,
        dataset_id: str,
        csv_path: str,
        audio_dir: str,
        mode: str = "scratch",
        base_checkpoint: str | None = None,
        batch_size: int = 4,
        max_epochs: int = 10000,
        precision: str = "32",
        accumulate_grad_batches: int = 8,
        sample_rate: int = 22050,
        espeak_voice: str = "uk",
        on_metrics=None,
        on_log=None,
    ) -> subprocess.Popen:
        """Запустити тренування VITS моделі."""
        global _active_training

        with _lock:
            if _active_training and _active_training.process.poll() is None:
                raise RuntimeError("Тренування вже запущено. Зупиніть поточне перед запуском нового.")

        # Release GPU memory from previous tasks (whisper, etc.)
        self._cleanup_gpu()

        project_dir = settings.projects_path / project_id
        cache_dir = project_dir / "c
start_training method · python · L32-L112 (81 LOC)
backend/app/services/training_service.py
    def start_training(
        self,
        run_id: str,
        project_id: str,
        dataset_id: str,
        csv_path: str,
        audio_dir: str,
        mode: str = "scratch",
        base_checkpoint: str | None = None,
        batch_size: int = 4,
        max_epochs: int = 10000,
        precision: str = "32",
        accumulate_grad_batches: int = 8,
        sample_rate: int = 22050,
        espeak_voice: str = "uk",
        on_metrics=None,
        on_log=None,
    ) -> subprocess.Popen:
        """Запустити тренування VITS моделі."""
        global _active_training

        with _lock:
            if _active_training and _active_training.process.poll() is None:
                raise RuntimeError("Тренування вже запущено. Зупиніть поточне перед запуском нового.")

        # Release GPU memory from previous tasks (whisper, etc.)
        self._cleanup_gpu()

        project_dir = settings.projects_path / project_id
        cache_dir = project_dir / "cache"
        log_dir =
If a scraper extracted this row, it came from Repobility (https://repobility.com)
stop_training method · python · L114-L130 (17 LOC)
backend/app/services/training_service.py
    def stop_training(self, run_id: str) -> bool:
        global _active_training
        with _lock:
            if not _active_training or _active_training.run_id != run_id:
                return False
            bridge = PiperBridge()
            bridge.stop_training(_active_training.process)
            _active_training = None
            # Cleanup GPU
            try:
                import torch
                if torch.cuda.is_available():
                    torch.cuda.empty_cache()
            except ImportError:
                pass
            gc.collect()
            return True
get_status method · python · L132-L145 (14 LOC)
backend/app/services/training_service.py
    def get_status(self) -> dict:
        if not _active_training:
            return {"active": False}

        is_running = _active_training.process.poll() is None
        return {
            "active": is_running,
            "run_id": _active_training.run_id,
            "pid": _active_training.process.pid,
            "metrics": _active_training.metrics,
            "log_lines": _active_training.logs[-50:],
            "started_at": _active_training.started_at.isoformat(),
            "elapsed_seconds": (datetime.utcnow() - _active_training.started_at).total_seconds(),
        }
get_logs method · python · L147-L150 (4 LOC)
backend/app/services/training_service.py
    def get_logs(self, last_n: int = 100) -> list[str]:
        if not _active_training:
            return []
        return _active_training.logs[-last_n:]
list_checkpoints method · python · L152-L170 (19 LOC)
backend/app/services/training_service.py
    def list_checkpoints(self, project_id: str) -> list[dict]:
        """Список чекпоінтів проєкту."""
        ckpt_dirs = [
            settings.projects_path / project_id / "checkpoints",
            settings.projects_path / project_id / "logs",
        ]
        checkpoints = []
        for d in ckpt_dirs:
            if not d.exists():
                continue
            for f in d.rglob("*.ckpt"):
                checkpoints.append({
                    "path": str(f),
                    "filename": f.name,
                    "size_mb": f.stat().st_size / 1024 / 1024,
                    "modified": datetime.fromtimestamp(f.stat().st_mtime).isoformat(),
                })
        checkpoints.sort(key=lambda x: x["modified"], reverse=True)
        return checkpoints
_cleanup_gpu method · python · L172-L187 (16 LOC)
backend/app/services/training_service.py
    def _cleanup_gpu(self):
        """Звільнити GPU пам'ять від попередніх задач."""
        try:
            # Release cached whisper model
            from app.services.transcription_service import TranscriptionService
            TranscriptionService.release_model()
        except Exception:
            pass
        try:
            import torch
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
        except ImportError:
            pass
        gc.collect()
        logger.info("GPU memory cleaned up")
list_pretrained method · python · L189-L201 (13 LOC)
backend/app/services/training_service.py
    def list_pretrained(self) -> list[dict]:
        """Список претренованих чекпоінтів."""
        pretrained_dir = settings.pretrained_path
        if not pretrained_dir.exists():
            return []
        result = []
        for f in pretrained_dir.glob("*.ckpt"):
            result.append({
                "path": str(f),
                "filename": f.name,
                "size_mb": f.stat().st_size / 1024 / 1024,
            })
        return result
TranscriptionService class · python · L10-L116 (107 LOC)
backend/app/services/transcription_service.py
class TranscriptionService:
    _model = None
    _model_size = None

    @classmethod
    def get_model(cls, model_size: str = "small", device: str = "auto"):
        """Завантажити або повернути кешовану модель Whisper."""
        if cls._model is not None and cls._model_size == model_size:
            return cls._model

        from faster_whisper import WhisperModel

        # Auto-detect device
        if device == "auto":
            device, compute_type = cls._detect_device()
        elif device == "cuda":
            compute_type = "float16"
        else:
            compute_type = "int8"

        logger.info(f"Loading Whisper model: {model_size} on {device} ({compute_type})")

        cls._model = WhisperModel(
            model_size,
            device=device,
            compute_type=compute_type,
        )
        cls._model_size = model_size
        return cls._model

    @classmethod
    def _detect_device(cls) -> tuple[str, str]:
        """Визначити найкращий пристрій д
get_model method · python · L15-L38 (24 LOC)
backend/app/services/transcription_service.py
    def get_model(cls, model_size: str = "small", device: str = "auto"):
        """Завантажити або повернути кешовану модель Whisper."""
        if cls._model is not None and cls._model_size == model_size:
            return cls._model

        from faster_whisper import WhisperModel

        # Auto-detect device
        if device == "auto":
            device, compute_type = cls._detect_device()
        elif device == "cuda":
            compute_type = "float16"
        else:
            compute_type = "int8"

        logger.info(f"Loading Whisper model: {model_size} on {device} ({compute_type})")

        cls._model = WhisperModel(
            model_size,
            device=device,
            compute_type=compute_type,
        )
        cls._model_size = model_size
        return cls._model
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_detect_device method · python · L41-L57 (17 LOC)
backend/app/services/transcription_service.py
    def _detect_device(cls) -> tuple[str, str]:
        """Визначити найкращий пристрій для Whisper."""
        try:
            import ctranslate2
            ctranslate2.get_supported_compute_types("cuda")
            return "cuda", "float16"
        except Exception:
            pass

        try:
            import torch
            if torch.cuda.is_available():
                return "cuda", "float16"
        except ImportError:
            pass

        return "cpu", "int8"
release_model method · python · L60-L70 (11 LOC)
backend/app/services/transcription_service.py
    def release_model(cls):
        """Звільнити модель та GPU пам'ять."""
        cls._model = None
        cls._model_size = None
        gc.collect()
        try:
            import torch
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
        except ImportError:
            pass
transcribe method · python · L72-L116 (45 LOC)
backend/app/services/transcription_service.py
    def transcribe(
        self,
        audio_path: str,
        language: str | None = None,
        model_size: str = "small",
        progress_callback=None,
    ) -> list[dict]:
        """Транскрибувати аудіо файл.

        Returns: список сегментів [{start, end, text}]
        """
        model = self.get_model(model_size)

        abs_path = Path(audio_path)
        if not abs_path.is_absolute():
            abs_path = settings.storage_path / audio_path

        logger.info(f"Transcribing: {abs_path}")

        segments_iter, info = model.transcribe(
            str(abs_path),
            language=language,
            beam_size=5,
            word_timestamps=True,
            vad_filter=True,
            vad_parameters=dict(
                min_silence_duration_ms=500,
                speech_pad_ms=200,
            ),
        )

        logger.info(f"Detected language: {info.language} (prob: {info.language_probability:.2f})")

        segments = []
        for i, seg in enume
YoutubeService class · python · L20-L171 (152 LOC)
backend/app/services/youtube_service.py
class YoutubeService:
    def _get_common_opts(self) -> dict:
        """Загальні опції для yt-dlp."""
        opts: dict = {
            "quiet": True,
            "no_warnings": True,
            "remote_components": {"ejs:github"},
        }
        # Use cookies.txt if available
        cookies_file = settings.storage_path / "cookies.txt"
        if cookies_file.exists():
            opts["cookiefile"] = str(cookies_file)
        else:
            # Try browser cookies
            opts["cookiesfrombrowser"] = ("chrome",)
        return opts

    def ensure_cookies(self) -> bool:
        """Експортувати Chrome cookies, якщо cookies.txt ще не існує.
        Використовує yt-dlp напряму для коректного дешифрування."""
        cookies_file = settings.storage_path / "cookies.txt"
        if cookies_file.exists():
            return True

        import yt_dlp
        # Use yt-dlp's own cookie extraction to create cookies.txt
        ydl_opts = {
            "quiet": True,
            "co
_get_common_opts method · python · L21-L35 (15 LOC)
backend/app/services/youtube_service.py
    def _get_common_opts(self) -> dict:
        """Загальні опції для yt-dlp."""
        opts: dict = {
            "quiet": True,
            "no_warnings": True,
            "remote_components": {"ejs:github"},
        }
        # Use cookies.txt if available
        cookies_file = settings.storage_path / "cookies.txt"
        if cookies_file.exists():
            opts["cookiefile"] = str(cookies_file)
        else:
            # Try browser cookies
            opts["cookiesfrombrowser"] = ("chrome",)
        return opts
ensure_cookies method · python · L37-L59 (23 LOC)
backend/app/services/youtube_service.py
    def ensure_cookies(self) -> bool:
        """Експортувати Chrome cookies, якщо cookies.txt ще не існує.
        Використовує yt-dlp напряму для коректного дешифрування."""
        cookies_file = settings.storage_path / "cookies.txt"
        if cookies_file.exists():
            return True

        import yt_dlp
        # Use yt-dlp's own cookie extraction to create cookies.txt
        ydl_opts = {
            "quiet": True,
            "cookiesfrombrowser": ("chrome",),
            "remote_components": {"ejs:github"},
            "cookiefile": str(cookies_file),  # yt-dlp will save cookies here
            "simulate": True,
        }
        try:
            with yt_dlp.YoutubeDL(ydl_opts) as ydl:
                # Just extract info from any video to trigger cookie export
                ydl.extract_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ", download=False)
            return cookies_file.exists()
        except Exception:
            return cookies_file.exists()
get_video_info method · python · L61-L77 (17 LOC)
backend/app/services/youtube_service.py
    def get_video_info(self, url: str) -> dict:
        """Отримати інформацію про відео без завантаження."""
        import yt_dlp

        ydl_opts = {
            **self._get_common_opts(),
            "extract_flat": False,
        }
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=False)
            return {
                "title": info.get("title", "Unknown"),
                "duration": info.get("duration", 0),
                "uploader": info.get("uploader", "Unknown"),
                "thumbnail": info.get("thumbnail", ""),
                "description": info.get("description", "")[:500],
            }
download_audio method · python · L79-L152 (74 LOC)
backend/app/services/youtube_service.py
    def download_audio(
        self,
        url: str,
        project_id: str,
        audio_format: str = "wav",
        progress_callback=None,
    ) -> dict:
        """Завантажити аудіо з YouTube та конвертувати в WAV."""
        import yt_dlp

        output_dir = settings.projects_path / project_id / "raw_audio"
        output_dir.mkdir(parents=True, exist_ok=True)

        output_template = str(output_dir / "%(title)s.%(ext)s")

        ydl_opts = {
            **self._get_common_opts(),
            "format": "bestaudio/best",
            "outtmpl": output_template,
            "postprocessors": [
                {
                    "key": "FFmpegExtractAudio",
                    "preferredcodec": audio_format,
                    "preferredquality": "0",
                }
            ],
        }

        if progress_callback:
            ydl_opts["progress_hooks"] = [progress_callback]

        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(ur
Want this analysis on your repo? https://repobility.com/scan/
_get_audio_duration method · python · L154-L171 (18 LOC)
backend/app/services/youtube_service.py
    def _get_audio_duration(self, file_path: Path) -> float:
        """Отримати тривалість аудіо через ffprobe."""
        try:
            result = subprocess.run(
                [
                    "ffprobe",
                    "-v", "quiet",
                    "-show_entries", "format=duration",
                    "-of", "default=noprint_wrappers=1:nokey=1",
                    str(file_path),
                ],
                capture_output=True,
                text=True,
                timeout=10,
            )
            return float(result.stdout.strip())
        except Exception:
            return 0.0
prepare_dataset function · python · L5-L7 (3 LOC)
backend/app/tasks/dataset_tasks.py
def prepare_dataset(self, project_id: str, config: dict):
    """Підготовка датасету - буде реалізовано у Фазі 4."""
    pass
export_onnx function · python · L5-L7 (3 LOC)
backend/app/tasks/export_tasks.py
def export_onnx(self, project_id: str, checkpoint_path: str, output_path: str):
    """Експорт моделі в ONNX формат - буде реалізовано у Фазі 6."""
    pass
train_model function · python · L5-L7 (3 LOC)
backend/app/tasks/training_tasks.py
def train_model(self, run_id: str, config: dict):
    """Тренування VITS моделі - буде реалізовано у Фазі 5."""
    pass
transcribe_audio function · python · L5-L7 (3 LOC)
backend/app/tasks/transcription_tasks.py
def transcribe_audio(self, project_id: str, audio_file_id: str, model_size: str = "small"):
    """Транскрипція аудіо через faster-whisper - буде реалізовано у Фазі 3."""
    pass
download_audio function · python · L5-L7 (3 LOC)
backend/app/tasks/youtube_tasks.py
def download_audio(self, project_id: str, url: str, audio_format: str = "wav"):
    """Завантаження аудіо з YouTube - буде реалізовано у Фазі 2."""
    pass
export_chrome_cookies function · python · L15-L111 (97 LOC)
backend/app/utils/chrome_cookies.py
def export_chrome_cookies(output_path: Path, domains: list[str] | None = None) -> int:
    """Експортувати Chrome cookies в Netscape format cookies.txt.

    Returns: кількість експортованих cookies.
    """
    try:
        from Crypto.Cipher import AES
        from Crypto.Protocol.KDF import PBKDF2
        import secretstorage
    except ImportError:
        return 0

    # Get Chrome Safe Storage key from GNOME keyring
    try:
        bus = secretstorage.dbus_init()
        collection = secretstorage.get_default_collection(bus)
        chrome_key = None
        for item in collection.get_all_items():
            if item.get_label() == "Chrome Safe Storage":
                chrome_key = item.get_secret()
                break
        if not chrome_key:
            return 0
    except Exception:
        return 0

    derived_key = PBKDF2(chrome_key, b"saltysalt", dkLen=16, count=1)

    def decrypt_v11(encrypted_value: bytes) -> str:
        if not encrypted_value or len(encrypted_va
TrainingConfig class · python · L18-L49 (32 LOC)
backend/app/utils/piper_bridge.py
class TrainingConfig:
    """Конфігурація тренування для RTX 3050."""

    def __init__(
        self,
        csv_path: str,
        audio_dir: str,
        cache_dir: str,
        config_path: str,
        espeak_voice: str = "uk",
        sample_rate: int = 22050,
        batch_size: int = 4,
        max_epochs: int = 10000,
        precision: str = "32",
        accumulate_grad_batches: int = 8,
        checkpoint_path: str | None = None,
        checkpoint_every_n_epochs: int = 500,
        log_dir: str | None = None,
    ):
        self.csv_path = csv_path
        self.audio_dir = audio_dir
        self.cache_dir = cache_dir
        self.config_path = config_path
        self.espeak_voice = espeak_voice
        self.sample_rate = sample_rate
        self.batch_size = batch_size
        self.max_epochs = max_epochs
        self.precision = precision
        self.accumulate_grad_batches = accumulate_grad_batches
        self.checkpoint_path = checkpoint_path
        self.checkpoint
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
__init__ method · python · L21-L49 (29 LOC)
backend/app/utils/piper_bridge.py
    def __init__(
        self,
        csv_path: str,
        audio_dir: str,
        cache_dir: str,
        config_path: str,
        espeak_voice: str = "uk",
        sample_rate: int = 22050,
        batch_size: int = 4,
        max_epochs: int = 10000,
        precision: str = "32",
        accumulate_grad_batches: int = 8,
        checkpoint_path: str | None = None,
        checkpoint_every_n_epochs: int = 500,
        log_dir: str | None = None,
    ):
        self.csv_path = csv_path
        self.audio_dir = audio_dir
        self.cache_dir = cache_dir
        self.config_path = config_path
        self.espeak_voice = espeak_voice
        self.sample_rate = sample_rate
        self.batch_size = batch_size
        self.max_epochs = max_epochs
        self.precision = precision
        self.accumulate_grad_batches = accumulate_grad_batches
        self.checkpoint_path = checkpoint_path
        self.checkpoint_every_n_epochs = checkpoint_every_n_epochs
        self.log_dir = log_
PiperBridge class · python · L52-L202 (151 LOC)
backend/app/utils/piper_bridge.py
class PiperBridge:
    def build_training_command(self, config: TrainingConfig) -> list[str]:
        """Побудувати CLI команду для piper.train."""
        cmd = [
            "python3", "-m", "piper.train", "fit",
            "--data.voice_name", "custom",
            "--data.csv_path", config.csv_path,
            "--data.audio_dir", config.audio_dir,
            "--data.cache_dir", config.cache_dir,
            "--data.config_path", config.config_path,
            "--data.espeak_voice", config.espeak_voice,
            "--model.sample_rate", str(config.sample_rate),
            "--data.batch_size", str(config.batch_size),
            "--trainer.max_epochs", str(config.max_epochs),
            "--trainer.precision", config.precision,
            "--trainer.accelerator", "gpu",
            "--trainer.devices", "1",
        ]

        # Checkpoint for fine-tuning
        if config.checkpoint_path:
            cmd.extend(["--ckpt_path", config.checkpoint_path])

        # Logging
      
build_training_command method · python · L53-L81 (29 LOC)
backend/app/utils/piper_bridge.py
    def build_training_command(self, config: TrainingConfig) -> list[str]:
        """Побудувати CLI команду для piper.train."""
        cmd = [
            "python3", "-m", "piper.train", "fit",
            "--data.voice_name", "custom",
            "--data.csv_path", config.csv_path,
            "--data.audio_dir", config.audio_dir,
            "--data.cache_dir", config.cache_dir,
            "--data.config_path", config.config_path,
            "--data.espeak_voice", config.espeak_voice,
            "--model.sample_rate", str(config.sample_rate),
            "--data.batch_size", str(config.batch_size),
            "--trainer.max_epochs", str(config.max_epochs),
            "--trainer.precision", config.precision,
            "--trainer.accelerator", "gpu",
            "--trainer.devices", "1",
        ]

        # Checkpoint for fine-tuning
        if config.checkpoint_path:
            cmd.extend(["--ckpt_path", config.checkpoint_path])

        # Logging
        if config.log_dir
‹ prevpage 3 / 4next ›