← back to dotradepro__Piper-Master-Trainer

Function bodies 178 total

All specs Real LLM only Function bodies
run_migrations_offline function · python · L18-L27 (10 LOC)
backend/alembic/env.py
def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()
do_run_migrations function · python · L30-L33 (4 LOC)
backend/alembic/env.py
def do_run_migrations(connection):
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()
run_async_migrations function · python · L36-L44 (9 LOC)
backend/alembic/env.py
async def run_async_migrations() -> None:
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()
Settings class · python · L6-L46 (41 LOC)
backend/app/config.py
class Settings(BaseSettings):
    # App
    app_name: str = "Piper Master Trainer"
    debug: bool = True

    # Database
    database_url: str = "sqlite+aiosqlite:///./storage/db.sqlite3"

    # Redis
    redis_url: str = "redis://localhost:6379/0"

    # Storage
    storage_path: Path = Path("./storage")

    # GPU
    cuda_visible_devices: str = "0"

    # Whisper
    whisper_model_size: str = "small"
    whisper_device: str = "cuda"
    whisper_compute_type: str = "float16"

    # Training defaults
    default_batch_size: int = 4
    default_precision: str = "16-mixed"
    default_accumulate_grad_batches: int = 8
    default_max_epochs: int = 10000
    default_sample_rate: int = 22050

    # Piper
    piper_gpl_path: str = "/opt/piper1-gpl"

    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}

    @property
    def projects_path(self) -> Path:
        return self.storage_path / "projects"

    @property
    def pretrained_path(self) -> Path:
        return self.st
get_db function · python · L19-L24 (6 LOC)
backend/app/database.py
async def get_db() -> AsyncSession:
    async with async_session() as session:
        try:
            yield session
        finally:
            await session.close()
init_db function · python · L27-L29 (3 LOC)
backend/app/database.py
async def init_db():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
lifespan function · python · L14-L30 (17 LOC)
backend/app/main.py
async def lifespan(app: FastAPI):
    # Startup
    settings.storage_path.mkdir(parents=True, exist_ok=True)
    settings.projects_path.mkdir(parents=True, exist_ok=True)
    settings.pretrained_path.mkdir(parents=True, exist_ok=True)
    await init_db()

    # Auto-export Chrome cookies for YouTube age-restricted videos
    cookies_file = settings.storage_path / "cookies.txt"
    if not cookies_file.exists():
        try:
            from app.services.youtube_service import YoutubeService
            YoutubeService().ensure_cookies()
        except Exception:
            pass  # Cookies are optional

    yield
Open data scored by Repobility · https://repobility.com
gpu_status function · python · L65-L69 (5 LOC)
backend/app/main.py
async def gpu_status():
    from app.services.gpu_manager import GpuManager

    gpu = GpuManager()
    return gpu.get_status()
serve_audio function · python · L73-L84 (12 LOC)
backend/app/main.py
async def serve_audio(file_path: str):
    """Стрімінг аудіо файлів для плеєра."""
    abs_path = settings.storage_path / file_path
    if not abs_path.exists() or not abs_path.is_file():
        raise HTTPException(status_code=404, detail="Файл не знайдено")
    # Security: ensure path is within storage
    try:
        abs_path.resolve().relative_to(settings.storage_path.resolve())
    except ValueError:
        raise HTTPException(status_code=403, detail="Доступ заборонено")
    media_type = "audio/wav" if abs_path.suffix == ".wav" else f"audio/{abs_path.suffix[1:]}"
    return FileResponse(abs_path, media_type=media_type)
AudioFile class · python · L10-L23 (14 LOC)
backend/app/models/audio_file.py
class AudioFile(Base):
    __tablename__ = "audio_files"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
    filename: Mapped[str] = mapped_column(String(255), nullable=False)
    source_url: Mapped[str | None] = mapped_column(Text, nullable=True)
    duration_sec: Mapped[float | None] = mapped_column(nullable=True)
    file_path: Mapped[str] = mapped_column(Text, nullable=False)
    file_size_bytes: Mapped[int | None] = mapped_column(nullable=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    project = relationship("Project", back_populates="audio_files")
    segments = relationship("Segment", back_populates="audio_file", cascade="all, delete-orphan")
Checkpoint class · python · L10-L23 (14 LOC)
backend/app/models/checkpoint.py
class Checkpoint(Base):
    __tablename__ = "checkpoints"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    run_id: Mapped[str] = mapped_column(ForeignKey("training_runs.id", ondelete="CASCADE"))
    epoch: Mapped[int] = mapped_column(nullable=False)
    step: Mapped[int] = mapped_column(nullable=False)
    file_path: Mapped[str] = mapped_column(Text, nullable=False)
    file_size_bytes: Mapped[int | None] = mapped_column(nullable=True)
    loss_g: Mapped[float | None] = mapped_column(nullable=True)
    loss_d: Mapped[float | None] = mapped_column(nullable=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    training_run = relationship("TrainingRun", back_populates="checkpoints")
Dataset class · python · L10-L23 (14 LOC)
backend/app/models/dataset.py
class Dataset(Base):
    __tablename__ = "datasets"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
    csv_path: Mapped[str] = mapped_column(Text, nullable=False)
    audio_dir: Mapped[str] = mapped_column(Text, nullable=False)
    total_segments: Mapped[int] = mapped_column(nullable=False)
    total_duration: Mapped[float] = mapped_column(nullable=False)
    config: Mapped[str | None] = mapped_column(Text, nullable=True)  # JSON
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    project = relationship("Project", back_populates="datasets")
    training_runs = relationship("TrainingRun", back_populates="dataset")
ExportedModel class · python · L10-L22 (13 LOC)
backend/app/models/exported_model.py
class ExportedModel(Base):
    __tablename__ = "exported_models"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
    checkpoint_id: Mapped[str] = mapped_column(ForeignKey("checkpoints.id"))
    onnx_path: Mapped[str] = mapped_column(Text, nullable=False)
    config_path: Mapped[str] = mapped_column(Text, nullable=False)
    file_size_bytes: Mapped[int | None] = mapped_column(nullable=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    project = relationship("Project", back_populates="exported_models")
    checkpoint = relationship("Checkpoint")
Project class · python · L10-L32 (23 LOC)
backend/app/models/project.py
class Project(Base):
    __tablename__ = "projects"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    name: Mapped[str] = mapped_column(String(255), nullable=False)
    language: Mapped[str] = mapped_column(String(10), nullable=False, default="uk")
    espeak_voice: Mapped[str] = mapped_column(String(20), nullable=False, default="uk")
    sample_rate: Mapped[int] = mapped_column(nullable=False, default=22050)
    status: Mapped[str] = mapped_column(String(50), nullable=False, default="created")
    description: Mapped[str | None] = mapped_column(Text, nullable=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(default=datetime.utcnow, onupdate=datetime.utcnow)

    # Relationships
    audio_files = relationship("AudioFile", back_populates="project", cascade="all, delete-orphan")
    segments = relationship("Segment", back_populates="project", cascade="al
Segment class · python · L10-L29 (20 LOC)
backend/app/models/segment.py
class Segment(Base):
    __tablename__ = "segments"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
    audio_file_id: Mapped[str] = mapped_column(ForeignKey("audio_files.id", ondelete="CASCADE"))
    start_time: Mapped[float] = mapped_column(nullable=False)
    end_time: Mapped[float] = mapped_column(nullable=False)
    text: Mapped[str] = mapped_column(Text, nullable=False)
    text_edited: Mapped[bool] = mapped_column(Boolean, default=False)
    segment_path: Mapped[str | None] = mapped_column(Text, nullable=True)
    included: Mapped[bool] = mapped_column(Boolean, default=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

    project = relationship("Project", back_populates="segments")
    audio_file = relationship("AudioFile", back_populates="segments")

    @property
    def duration_sec(self) -> float:
    
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
TrainingRun class · python · L10-L32 (23 LOC)
backend/app/models/training_run.py
class TrainingRun(Base):
    __tablename__ = "training_runs"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
    dataset_id: Mapped[str] = mapped_column(ForeignKey("datasets.id"))
    mode: Mapped[str] = mapped_column(String(20), nullable=False)  # scratch | finetune
    base_checkpoint: Mapped[str | None] = mapped_column(Text, nullable=True)
    config: Mapped[str] = mapped_column(Text, nullable=False)  # JSON
    status: Mapped[str] = mapped_column(String(50), nullable=False, default="pending")
    celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
    current_epoch: Mapped[int] = mapped_column(default=0)
    current_step: Mapped[int] = mapped_column(default=0)
    last_loss_g: Mapped[float | None] = mapped_column(nullable=True)
    last_loss_d: Mapped[float | None] = mapped_column(nullable=True)
    error_message
prepare_dataset function · python · L19-L39 (21 LOC)
backend/app/routers/datasets.py
async def prepare_dataset(
    data: DatasetPrepareRequest,
    db: AsyncSession = Depends(get_db),
):
    """Підготувати датасет з транскрибованих сегментів."""
    service = DatasetService(db)
    try:
        dataset = await service.prepare(
            project_id=data.project_id,
            min_duration=data.min_duration,
            max_duration=data.max_duration,
            sample_rate=data.sample_rate,
        )
        # Update project status
        from app.services.project_service import ProjectService
        await ProjectService(db).update_status(data.project_id, "dataset_ready")
        return dataset
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Помилка підготовки датасету: {str(e)}")
list_datasets function · python · L43-L46 (4 LOC)
backend/app/routers/datasets.py
async def list_datasets(project_id: str, db: AsyncSession = Depends(get_db)):
    """Список датасетів проєкту."""
    service = DatasetService(db)
    return await service.get_by_project(project_id)
dataset_stats function · python · L50-L56 (7 LOC)
backend/app/routers/datasets.py
async def dataset_stats(dataset_id: str, db: AsyncSession = Depends(get_db)):
    """Статистика датасету."""
    service = DatasetService(db)
    stats = await service.get_stats(dataset_id)
    if not stats:
        raise HTTPException(status_code=404, detail="Датасет не знайдено")
    return stats
validate_dataset function · python · L60-L63 (4 LOC)
backend/app/routers/datasets.py
async def validate_dataset(dataset_id: str, db: AsyncSession = Depends(get_db)):
    """Валідація якості датасету."""
    service = DatasetService(db)
    return await service.validate(dataset_id)
preview_csv function · python · L67-L86 (20 LOC)
backend/app/routers/datasets.py
async def preview_csv(dataset_id: str, limit: int = 20, db: AsyncSession = Depends(get_db)):
    """Перегляд перших N рядків metadata.csv."""
    from app.config import settings

    service = DatasetService(db)
    dataset = await service.get_by_id(dataset_id)
    if not dataset:
        raise HTTPException(status_code=404, detail="Датасет не знайдено")

    csv_path = settings.storage_path / dataset.csv_path
    if not csv_path.exists():
        raise HTTPException(status_code=404, detail="CSV не знайдено")

    lines = csv_path.read_text(encoding="utf-8").strip().split("\n")[:limit]
    rows = []
    for line in lines:
        parts = line.split("|", 1)
        if len(parts) == 2:
            rows.append({"filename": parts[0], "text": parts[1]})
    return {"total": dataset.total_segments, "rows": rows}
export_model function · python · L20-L38 (19 LOC)
backend/app/routers/models.py
async def export_model(
    data: ExportRequest,
    db: AsyncSession = Depends(get_db),
):
    """Експортувати checkpoint в ONNX формат."""
    service = ExportService(db)
    try:
        model = await service.export_onnx(
            project_id=data.project_id,
            checkpoint_path=data.checkpoint_path,
        )
        # Update project status
        from app.services.project_service import ProjectService
        await ProjectService(db).update_status(data.project_id, "exported")
        return model
    except FileNotFoundError as e:
        raise HTTPException(status_code=404, detail=str(e))
    except RuntimeError as e:
        raise HTTPException(status_code=500, detail=str(e))
list_models function · python · L42-L45 (4 LOC)
backend/app/routers/models.py
async def list_models(project_id: str, db: AsyncSession = Depends(get_db)):
    """Список експортованих моделей проєкту."""
    service = ExportService(db)
    return await service.get_by_project(project_id)
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
synthesize function · python · L49-L74 (26 LOC)
backend/app/routers/models.py
async def synthesize(
    data: SynthesizeRequest,
    db: AsyncSession = Depends(get_db),
):
    """Синтезувати мовлення з ONNX моделі."""
    # Get model
    export_service = ExportService(db)
    model = await export_service.get_by_id(data.model_id)
    if not model:
        raise HTTPException(status_code=404, detail="Модель не знайдено")

    try:
        synth = SynthesisService()
        wav_bytes = await asyncio.to_thread(
            synth.synthesize,
            model.onnx_path,
            model.config_path,
            data.text,
            speaker_id=data.speaker_id,
            length_scale=data.length_scale,
            noise_scale=data.noise_scale,
            noise_w=data.noise_w,
        )
        return Response(content=wav_bytes, media_type="audio/wav")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Помилка синтезу: {str(e)}")
create_project function · python · L17-L20 (4 LOC)
backend/app/routers/projects.py
async def create_project(data: ProjectCreate, db: AsyncSession = Depends(get_db)):
    service = ProjectService(db)
    project = await service.create(data)
    return project
list_projects function · python · L24-L26 (3 LOC)
backend/app/routers/projects.py
async def list_projects(db: AsyncSession = Depends(get_db)):
    service = ProjectService(db)
    return await service.get_all()
get_project function · python · L30-L35 (6 LOC)
backend/app/routers/projects.py
async def get_project(project_id: str, db: AsyncSession = Depends(get_db)):
    service = ProjectService(db)
    project = await service.get_by_id(project_id)
    if not project:
        raise HTTPException(status_code=404, detail="Проєкт не знайдено")
    return project
update_project function · python · L39-L46 (8 LOC)
backend/app/routers/projects.py
async def update_project(
    project_id: str, data: ProjectUpdate, db: AsyncSession = Depends(get_db)
):
    service = ProjectService(db)
    project = await service.update(project_id, data)
    if not project:
        raise HTTPException(status_code=404, detail="Проєкт не знайдено")
    return project
delete_project function · python · L50-L55 (6 LOC)
backend/app/routers/projects.py
async def delete_project(project_id: str, db: AsyncSession = Depends(get_db)):
    service = ProjectService(db)
    deleted = await service.delete(project_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Проєкт не знайдено")
    return {"success": True, "message": "Проєкт видалено"}
start_training function · python · L20-L61 (42 LOC)
backend/app/routers/training.py
async def start_training(
    data: TrainingStartRequest,
    db: AsyncSession = Depends(get_db),
):
    """Запустити тренування VITS моделі."""
    from app.services.dataset_service import DatasetService

    # Get dataset
    ds_service = DatasetService(db)
    dataset = await ds_service.get_by_id(data.dataset_id)
    if not dataset:
        raise HTTPException(status_code=404, detail="Датасет не знайдено")

    run_id = str(uuid.uuid4())

    try:
        await asyncio.to_thread(
            _training_service.start_training,
            run_id=run_id,
            project_id=data.project_id,
            dataset_id=data.dataset_id,
            csv_path=dataset.csv_path,
            audio_dir=dataset.audio_dir,
            mode=data.mode,
            base_checkpoint=data.base_checkpoint,
            batch_size=data.batch_size,
            max_epochs=data.max_epochs,
            precision=data.precision,
            accumulate_grad_batches=data.accumulate_grad_batches,
            sampl
stop_training function · python · L65-L75 (11 LOC)
backend/app/routers/training.py
async def stop_training(run_id: str | None = None):
    """Зупинити тренування."""
    status = _training_service.get_status()
    if not status["active"]:
        raise HTTPException(status_code=404, detail="Немає активного тренування")

    rid = run_id or status.get("run_id")
    success = await asyncio.to_thread(_training_service.stop_training, rid)
    if not success:
        raise HTTPException(status_code=404, detail="Тренування не знайдено")
    return {"status": "stopped"}
About: code-quality intelligence by Repobility · https://repobility.com
training_status function · python · L79-L81 (3 LOC)
backend/app/routers/training.py
async def training_status():
    """Стан тренування."""
    return _training_service.get_status()
training_logs function · python · L85-L87 (3 LOC)
backend/app/routers/training.py
async def training_logs(last_n: int = 100):
    """Останні N рядків логу тренування."""
    return {"lines": _training_service.get_logs(last_n)}
list_checkpoints function · python · L91-L93 (3 LOC)
backend/app/routers/training.py
async def list_checkpoints(project_id: str):
    """Список чекпоінтів проєкту."""
    return _training_service.list_checkpoints(project_id)
list_pretrained function · python · L97-L99 (3 LOC)
backend/app/routers/training.py
async def list_pretrained():
    """Список претренованих моделей."""
    return _training_service.list_pretrained()
start_transcription function · python · L21-L66 (46 LOC)
backend/app/routers/transcription.py
async def start_transcription(
    data: TranscriptionRequest,
    db: AsyncSession = Depends(get_db),
):
    """Запустити транскрипцію аудіо файлу."""
    from app.services.audio_file_service import AudioFileService

    # Verify audio file exists
    af_service = AudioFileService(db)
    audio_file = await af_service.get_by_id(data.audio_file_id)
    if not audio_file:
        raise HTTPException(status_code=404, detail="Аудіо файл не знайдено")

    # Clear existing segments for this audio file
    seg_service = SegmentService(db)
    existing = await seg_service.get_by_audio_file(data.audio_file_id)
    for seg in existing:
        await seg_service.delete(seg.id)

    # Transcribe
    try:
        ts = TranscriptionService()
        raw_segments = await asyncio.to_thread(
            ts.transcribe,
            audio_file.file_path,
            language=data.language,
            model_size=data.model_size,
        )
    except Exception as e:
        raise HTTPException(status_cod
list_segments function · python · L70-L73 (4 LOC)
backend/app/routers/transcription.py
async def list_segments(project_id: str, db: AsyncSession = Depends(get_db)):
    """Отримати всі сегменти проєкту."""
    service = SegmentService(db)
    return await service.get_by_project(project_id)
update_segment function · python · L77-L87 (11 LOC)
backend/app/routers/transcription.py
async def update_segment(
    segment_id: str,
    data: SegmentUpdate,
    db: AsyncSession = Depends(get_db),
):
    """Оновити текст або включення сегменту."""
    service = SegmentService(db)
    segment = await service.update(segment_id, text=data.text, included=data.included)
    if not segment:
        raise HTTPException(status_code=404, detail="Сегмент не знайдено")
    return segment
delete_segment function · python · L91-L97 (7 LOC)
backend/app/routers/transcription.py
async def delete_segment(segment_id: str, db: AsyncSession = Depends(get_db)):
    """Видалити сегмент."""
    service = SegmentService(db)
    deleted = await service.delete(segment_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Сегмент не знайдено")
    return {"success": True}
Open data scored by Repobility · https://repobility.com
merge_segments function · python · L101-L107 (7 LOC)
backend/app/routers/transcription.py
async def merge_segments(data: MergeRequest, db: AsyncSession = Depends(get_db)):
    """Об'єднати кілька сегментів."""
    service = SegmentService(db)
    merged = await service.merge(data.segment_ids)
    if not merged:
        raise HTTPException(status_code=400, detail="Не вдалося об'єднати сегменти")
    return merged
split_segment function · python · L111-L121 (11 LOC)
backend/app/routers/transcription.py
async def split_segment(
    segment_id: str,
    data: SplitRequest,
    db: AsyncSession = Depends(get_db),
):
    """Розділити сегмент на два."""
    service = SegmentService(db)
    result = await service.split(segment_id, data.split_time)
    if not result:
        raise HTTPException(status_code=400, detail="Не вдалося розділити сегмент")
    return list(result)
available_models function · python · L125-L137 (13 LOC)
backend/app/routers/transcription.py
async def available_models():
    """Доступні моделі Whisper та рекомендації для GPU."""
    return {
        "models": [
            {"id": "tiny", "size_mb": 75, "vram_mb": 400, "speed": "fastest", "quality": "low"},
            {"id": "base", "size_mb": 145, "vram_mb": 500, "speed": "fast", "quality": "medium"},
            {"id": "small", "size_mb": 488, "vram_mb": 1000, "speed": "medium", "quality": "good"},
            {"id": "medium", "size_mb": 1500, "vram_mb": 2500, "speed": "slow", "quality": "high"},
            {"id": "large-v3", "size_mb": 3000, "vram_mb": 4500, "speed": "slowest", "quality": "best"},
        ],
        "recommended": "small",
        "note": "RTX 3050 (4GB): small рекомендовано, medium можливо",
    }
websocket_endpoint function · python · L9-L18 (10 LOC)
backend/app/routers/ws.py
async def websocket_endpoint(websocket: WebSocket, project_id: str):
    await ws_manager.connect(project_id, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # Client can send ping/pong or commands
            if data == "ping":
                await websocket.send_text("pong")
    except WebSocketDisconnect:
        ws_manager.disconnect(project_id, websocket)
get_video_info function · python · L19-L26 (8 LOC)
backend/app/routers/youtube.py
async def get_video_info(url: str):
    """Отримати інформацію про YouTube відео."""
    try:
        service = YoutubeService()
        info = await asyncio.to_thread(service.get_video_info, url)
        return info
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Не вдалося отримати інформацію: {str(e)}")
download_youtube function · python · L30-L70 (41 LOC)
backend/app/routers/youtube.py
async def download_youtube(
    data: YoutubeDownloadRequest,
    db: AsyncSession = Depends(get_db),
):
    """Завантажити аудіо з YouTube."""
    try:
        import os
        # Ensure GNOME env for Chrome keyring access in worker thread
        os.environ.setdefault("XDG_CURRENT_DESKTOP", "GNOME")
        os.environ.setdefault("DESKTOP_SESSION", "gnome")

        yt_service = YoutubeService()

        # Download audio
        result = await asyncio.to_thread(
            yt_service.download_audio,
            data.url,
            data.project_id,
            data.audio_format,
        )

        # Save to database
        af_service = AudioFileService(db)
        audio_file = await af_service.create(
            project_id=data.project_id,
            filename=result["filename"],
            file_path=result["file_path"],
            source_url=result["source_url"],
            duration_sec=result["duration_sec"],
            file_size_bytes=result["file_size_bytes"],
        )

 
upload_audio function · python · L74-L98 (25 LOC)
backend/app/routers/youtube.py
async def upload_audio(
    project_id: str = Form(...),
    file: UploadFile = File(...),
    db: AsyncSession = Depends(get_db),
):
    """Завантажити аудіо файл напряму."""
    allowed_extensions = {".wav", ".mp3", ".flac", ".ogg", ".m4a"}
    ext = "." + file.filename.rsplit(".", 1)[-1].lower() if "." in file.filename else ""
    if ext not in allowed_extensions:
        raise HTTPException(
            status_code=400,
            detail=f"Непідтримуваний формат. Дозволено: {', '.join(allowed_extensions)}",
        )

    content = await file.read()
    if len(content) > 500 * 1024 * 1024:  # 500MB limit
        raise HTTPException(status_code=400, detail="Файл занадто великий (макс. 500MB)")

    service = AudioFileService(db)
    audio_file = await service.save_uploaded_file(
        project_id=project_id,
        filename=file.filename,
        content=content,
    )
    return audio_file
list_audio_files function · python · L102-L105 (4 LOC)
backend/app/routers/youtube.py
async def list_audio_files(project_id: str, db: AsyncSession = Depends(get_db)):
    """Список аудіо файлів проєкту."""
    service = AudioFileService(db)
    return await service.get_by_project(project_id)
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
delete_audio_file function · python · L109-L115 (7 LOC)
backend/app/routers/youtube.py
async def delete_audio_file(audio_file_id: str, db: AsyncSession = Depends(get_db)):
    """Видалити аудіо файл."""
    service = AudioFileService(db)
    deleted = await service.delete(audio_file_id)
    if not deleted:
        raise HTTPException(status_code=404, detail="Файл не знайдено")
    return {"success": True, "message": "Файл видалено"}
upload_cookies function · python · L119-L126 (8 LOC)
backend/app/routers/youtube.py
async def upload_cookies(file: UploadFile = File(...)):
    """Завантажити cookies.txt для доступу до захищених відео YouTube."""
    if not file.filename.endswith(".txt"):
        raise HTTPException(status_code=400, detail="Потрібен файл cookies.txt")
    content = await file.read()
    cookies_path = settings.storage_path / "cookies.txt"
    cookies_path.write_bytes(content)
    return {"success": True, "message": "Cookies збережено"}
refresh_cookies function · python · L130-L136 (7 LOC)
backend/app/routers/youtube.py
async def refresh_cookies():
    """Оновити cookies з Chrome браузера."""
    service = YoutubeService()
    success = await asyncio.to_thread(service.ensure_cookies)
    if success:
        return {"success": True, "message": "Cookies оновлено з Chrome"}
    raise HTTPException(status_code=500, detail="Не вдалося оновити cookies. Переконайтесь що Chrome авторизований у YouTube.")
page 1 / 4next ›