Function bodies 178 total
run_migrations_offline function · python · L18-L27 (10 LOC)backend/alembic/env.py
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()do_run_migrations function · python · L30-L33 (4 LOC)backend/alembic/env.py
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()run_async_migrations function · python · L36-L44 (9 LOC)backend/alembic/env.py
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()Settings class · python · L6-L46 (41 LOC)backend/app/config.py
class Settings(BaseSettings):
# App
app_name: str = "Piper Master Trainer"
debug: bool = True
# Database
database_url: str = "sqlite+aiosqlite:///./storage/db.sqlite3"
# Redis
redis_url: str = "redis://localhost:6379/0"
# Storage
storage_path: Path = Path("./storage")
# GPU
cuda_visible_devices: str = "0"
# Whisper
whisper_model_size: str = "small"
whisper_device: str = "cuda"
whisper_compute_type: str = "float16"
# Training defaults
default_batch_size: int = 4
default_precision: str = "16-mixed"
default_accumulate_grad_batches: int = 8
default_max_epochs: int = 10000
default_sample_rate: int = 22050
# Piper
piper_gpl_path: str = "/opt/piper1-gpl"
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
@property
def projects_path(self) -> Path:
return self.storage_path / "projects"
@property
def pretrained_path(self) -> Path:
return self.stget_db function · python · L19-L24 (6 LOC)backend/app/database.py
async def get_db() -> AsyncSession:
async with async_session() as session:
try:
yield session
finally:
await session.close()init_db function · python · L27-L29 (3 LOC)backend/app/database.py
async def init_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)lifespan function · python · L14-L30 (17 LOC)backend/app/main.py
async def lifespan(app: FastAPI):
# Startup
settings.storage_path.mkdir(parents=True, exist_ok=True)
settings.projects_path.mkdir(parents=True, exist_ok=True)
settings.pretrained_path.mkdir(parents=True, exist_ok=True)
await init_db()
# Auto-export Chrome cookies for YouTube age-restricted videos
cookies_file = settings.storage_path / "cookies.txt"
if not cookies_file.exists():
try:
from app.services.youtube_service import YoutubeService
YoutubeService().ensure_cookies()
except Exception:
pass # Cookies are optional
yieldOpen data scored by Repobility · https://repobility.com
gpu_status function · python · L65-L69 (5 LOC)backend/app/main.py
async def gpu_status():
from app.services.gpu_manager import GpuManager
gpu = GpuManager()
return gpu.get_status()serve_audio function · python · L73-L84 (12 LOC)backend/app/main.py
async def serve_audio(file_path: str):
"""Стрімінг аудіо файлів для плеєра."""
abs_path = settings.storage_path / file_path
if not abs_path.exists() or not abs_path.is_file():
raise HTTPException(status_code=404, detail="Файл не знайдено")
# Security: ensure path is within storage
try:
abs_path.resolve().relative_to(settings.storage_path.resolve())
except ValueError:
raise HTTPException(status_code=403, detail="Доступ заборонено")
media_type = "audio/wav" if abs_path.suffix == ".wav" else f"audio/{abs_path.suffix[1:]}"
return FileResponse(abs_path, media_type=media_type)AudioFile class · python · L10-L23 (14 LOC)backend/app/models/audio_file.py
class AudioFile(Base):
__tablename__ = "audio_files"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
filename: Mapped[str] = mapped_column(String(255), nullable=False)
source_url: Mapped[str | None] = mapped_column(Text, nullable=True)
duration_sec: Mapped[float | None] = mapped_column(nullable=True)
file_path: Mapped[str] = mapped_column(Text, nullable=False)
file_size_bytes: Mapped[int | None] = mapped_column(nullable=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
project = relationship("Project", back_populates="audio_files")
segments = relationship("Segment", back_populates="audio_file", cascade="all, delete-orphan")Checkpoint class · python · L10-L23 (14 LOC)backend/app/models/checkpoint.py
class Checkpoint(Base):
__tablename__ = "checkpoints"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
run_id: Mapped[str] = mapped_column(ForeignKey("training_runs.id", ondelete="CASCADE"))
epoch: Mapped[int] = mapped_column(nullable=False)
step: Mapped[int] = mapped_column(nullable=False)
file_path: Mapped[str] = mapped_column(Text, nullable=False)
file_size_bytes: Mapped[int | None] = mapped_column(nullable=True)
loss_g: Mapped[float | None] = mapped_column(nullable=True)
loss_d: Mapped[float | None] = mapped_column(nullable=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
training_run = relationship("TrainingRun", back_populates="checkpoints")Dataset class · python · L10-L23 (14 LOC)backend/app/models/dataset.py
class Dataset(Base):
__tablename__ = "datasets"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
csv_path: Mapped[str] = mapped_column(Text, nullable=False)
audio_dir: Mapped[str] = mapped_column(Text, nullable=False)
total_segments: Mapped[int] = mapped_column(nullable=False)
total_duration: Mapped[float] = mapped_column(nullable=False)
config: Mapped[str | None] = mapped_column(Text, nullable=True) # JSON
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
project = relationship("Project", back_populates="datasets")
training_runs = relationship("TrainingRun", back_populates="dataset")ExportedModel class · python · L10-L22 (13 LOC)backend/app/models/exported_model.py
class ExportedModel(Base):
__tablename__ = "exported_models"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
checkpoint_id: Mapped[str] = mapped_column(ForeignKey("checkpoints.id"))
onnx_path: Mapped[str] = mapped_column(Text, nullable=False)
config_path: Mapped[str] = mapped_column(Text, nullable=False)
file_size_bytes: Mapped[int | None] = mapped_column(nullable=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
project = relationship("Project", back_populates="exported_models")
checkpoint = relationship("Checkpoint")Project class · python · L10-L32 (23 LOC)backend/app/models/project.py
class Project(Base):
__tablename__ = "projects"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
name: Mapped[str] = mapped_column(String(255), nullable=False)
language: Mapped[str] = mapped_column(String(10), nullable=False, default="uk")
espeak_voice: Mapped[str] = mapped_column(String(20), nullable=False, default="uk")
sample_rate: Mapped[int] = mapped_column(nullable=False, default=22050)
status: Mapped[str] = mapped_column(String(50), nullable=False, default="created")
description: Mapped[str | None] = mapped_column(Text, nullable=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
audio_files = relationship("AudioFile", back_populates="project", cascade="all, delete-orphan")
segments = relationship("Segment", back_populates="project", cascade="alSegment class · python · L10-L29 (20 LOC)backend/app/models/segment.py
class Segment(Base):
__tablename__ = "segments"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
audio_file_id: Mapped[str] = mapped_column(ForeignKey("audio_files.id", ondelete="CASCADE"))
start_time: Mapped[float] = mapped_column(nullable=False)
end_time: Mapped[float] = mapped_column(nullable=False)
text: Mapped[str] = mapped_column(Text, nullable=False)
text_edited: Mapped[bool] = mapped_column(Boolean, default=False)
segment_path: Mapped[str | None] = mapped_column(Text, nullable=True)
included: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
project = relationship("Project", back_populates="segments")
audio_file = relationship("AudioFile", back_populates="segments")
@property
def duration_sec(self) -> float:
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
TrainingRun class · python · L10-L32 (23 LOC)backend/app/models/training_run.py
class TrainingRun(Base):
__tablename__ = "training_runs"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
project_id: Mapped[str] = mapped_column(ForeignKey("projects.id", ondelete="CASCADE"))
dataset_id: Mapped[str] = mapped_column(ForeignKey("datasets.id"))
mode: Mapped[str] = mapped_column(String(20), nullable=False) # scratch | finetune
base_checkpoint: Mapped[str | None] = mapped_column(Text, nullable=True)
config: Mapped[str] = mapped_column(Text, nullable=False) # JSON
status: Mapped[str] = mapped_column(String(50), nullable=False, default="pending")
celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
current_epoch: Mapped[int] = mapped_column(default=0)
current_step: Mapped[int] = mapped_column(default=0)
last_loss_g: Mapped[float | None] = mapped_column(nullable=True)
last_loss_d: Mapped[float | None] = mapped_column(nullable=True)
error_messageprepare_dataset function · python · L19-L39 (21 LOC)backend/app/routers/datasets.py
async def prepare_dataset(
data: DatasetPrepareRequest,
db: AsyncSession = Depends(get_db),
):
"""Підготувати датасет з транскрибованих сегментів."""
service = DatasetService(db)
try:
dataset = await service.prepare(
project_id=data.project_id,
min_duration=data.min_duration,
max_duration=data.max_duration,
sample_rate=data.sample_rate,
)
# Update project status
from app.services.project_service import ProjectService
await ProjectService(db).update_status(data.project_id, "dataset_ready")
return dataset
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Помилка підготовки датасету: {str(e)}")list_datasets function · python · L43-L46 (4 LOC)backend/app/routers/datasets.py
async def list_datasets(project_id: str, db: AsyncSession = Depends(get_db)):
"""Список датасетів проєкту."""
service = DatasetService(db)
return await service.get_by_project(project_id)dataset_stats function · python · L50-L56 (7 LOC)backend/app/routers/datasets.py
async def dataset_stats(dataset_id: str, db: AsyncSession = Depends(get_db)):
"""Статистика датасету."""
service = DatasetService(db)
stats = await service.get_stats(dataset_id)
if not stats:
raise HTTPException(status_code=404, detail="Датасет не знайдено")
return statsvalidate_dataset function · python · L60-L63 (4 LOC)backend/app/routers/datasets.py
async def validate_dataset(dataset_id: str, db: AsyncSession = Depends(get_db)):
"""Валідація якості датасету."""
service = DatasetService(db)
return await service.validate(dataset_id)preview_csv function · python · L67-L86 (20 LOC)backend/app/routers/datasets.py
async def preview_csv(dataset_id: str, limit: int = 20, db: AsyncSession = Depends(get_db)):
"""Перегляд перших N рядків metadata.csv."""
from app.config import settings
service = DatasetService(db)
dataset = await service.get_by_id(dataset_id)
if not dataset:
raise HTTPException(status_code=404, detail="Датасет не знайдено")
csv_path = settings.storage_path / dataset.csv_path
if not csv_path.exists():
raise HTTPException(status_code=404, detail="CSV не знайдено")
lines = csv_path.read_text(encoding="utf-8").strip().split("\n")[:limit]
rows = []
for line in lines:
parts = line.split("|", 1)
if len(parts) == 2:
rows.append({"filename": parts[0], "text": parts[1]})
return {"total": dataset.total_segments, "rows": rows}export_model function · python · L20-L38 (19 LOC)backend/app/routers/models.py
async def export_model(
data: ExportRequest,
db: AsyncSession = Depends(get_db),
):
"""Експортувати checkpoint в ONNX формат."""
service = ExportService(db)
try:
model = await service.export_onnx(
project_id=data.project_id,
checkpoint_path=data.checkpoint_path,
)
# Update project status
from app.services.project_service import ProjectService
await ProjectService(db).update_status(data.project_id, "exported")
return model
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except RuntimeError as e:
raise HTTPException(status_code=500, detail=str(e))list_models function · python · L42-L45 (4 LOC)backend/app/routers/models.py
async def list_models(project_id: str, db: AsyncSession = Depends(get_db)):
"""Список експортованих моделей проєкту."""
service = ExportService(db)
return await service.get_by_project(project_id)Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
synthesize function · python · L49-L74 (26 LOC)backend/app/routers/models.py
async def synthesize(
data: SynthesizeRequest,
db: AsyncSession = Depends(get_db),
):
"""Синтезувати мовлення з ONNX моделі."""
# Get model
export_service = ExportService(db)
model = await export_service.get_by_id(data.model_id)
if not model:
raise HTTPException(status_code=404, detail="Модель не знайдено")
try:
synth = SynthesisService()
wav_bytes = await asyncio.to_thread(
synth.synthesize,
model.onnx_path,
model.config_path,
data.text,
speaker_id=data.speaker_id,
length_scale=data.length_scale,
noise_scale=data.noise_scale,
noise_w=data.noise_w,
)
return Response(content=wav_bytes, media_type="audio/wav")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Помилка синтезу: {str(e)}")create_project function · python · L17-L20 (4 LOC)backend/app/routers/projects.py
async def create_project(data: ProjectCreate, db: AsyncSession = Depends(get_db)):
service = ProjectService(db)
project = await service.create(data)
return projectlist_projects function · python · L24-L26 (3 LOC)backend/app/routers/projects.py
async def list_projects(db: AsyncSession = Depends(get_db)):
service = ProjectService(db)
return await service.get_all()get_project function · python · L30-L35 (6 LOC)backend/app/routers/projects.py
async def get_project(project_id: str, db: AsyncSession = Depends(get_db)):
service = ProjectService(db)
project = await service.get_by_id(project_id)
if not project:
raise HTTPException(status_code=404, detail="Проєкт не знайдено")
return projectupdate_project function · python · L39-L46 (8 LOC)backend/app/routers/projects.py
async def update_project(
project_id: str, data: ProjectUpdate, db: AsyncSession = Depends(get_db)
):
service = ProjectService(db)
project = await service.update(project_id, data)
if not project:
raise HTTPException(status_code=404, detail="Проєкт не знайдено")
return projectdelete_project function · python · L50-L55 (6 LOC)backend/app/routers/projects.py
async def delete_project(project_id: str, db: AsyncSession = Depends(get_db)):
service = ProjectService(db)
deleted = await service.delete(project_id)
if not deleted:
raise HTTPException(status_code=404, detail="Проєкт не знайдено")
return {"success": True, "message": "Проєкт видалено"}start_training function · python · L20-L61 (42 LOC)backend/app/routers/training.py
async def start_training(
data: TrainingStartRequest,
db: AsyncSession = Depends(get_db),
):
"""Запустити тренування VITS моделі."""
from app.services.dataset_service import DatasetService
# Get dataset
ds_service = DatasetService(db)
dataset = await ds_service.get_by_id(data.dataset_id)
if not dataset:
raise HTTPException(status_code=404, detail="Датасет не знайдено")
run_id = str(uuid.uuid4())
try:
await asyncio.to_thread(
_training_service.start_training,
run_id=run_id,
project_id=data.project_id,
dataset_id=data.dataset_id,
csv_path=dataset.csv_path,
audio_dir=dataset.audio_dir,
mode=data.mode,
base_checkpoint=data.base_checkpoint,
batch_size=data.batch_size,
max_epochs=data.max_epochs,
precision=data.precision,
accumulate_grad_batches=data.accumulate_grad_batches,
samplstop_training function · python · L65-L75 (11 LOC)backend/app/routers/training.py
async def stop_training(run_id: str | None = None):
"""Зупинити тренування."""
status = _training_service.get_status()
if not status["active"]:
raise HTTPException(status_code=404, detail="Немає активного тренування")
rid = run_id or status.get("run_id")
success = await asyncio.to_thread(_training_service.stop_training, rid)
if not success:
raise HTTPException(status_code=404, detail="Тренування не знайдено")
return {"status": "stopped"}About: code-quality intelligence by Repobility · https://repobility.com
training_status function · python · L79-L81 (3 LOC)backend/app/routers/training.py
async def training_status():
"""Стан тренування."""
return _training_service.get_status()training_logs function · python · L85-L87 (3 LOC)backend/app/routers/training.py
async def training_logs(last_n: int = 100):
"""Останні N рядків логу тренування."""
return {"lines": _training_service.get_logs(last_n)}list_checkpoints function · python · L91-L93 (3 LOC)backend/app/routers/training.py
async def list_checkpoints(project_id: str):
"""Список чекпоінтів проєкту."""
return _training_service.list_checkpoints(project_id)list_pretrained function · python · L97-L99 (3 LOC)backend/app/routers/training.py
async def list_pretrained():
"""Список претренованих моделей."""
return _training_service.list_pretrained()start_transcription function · python · L21-L66 (46 LOC)backend/app/routers/transcription.py
async def start_transcription(
data: TranscriptionRequest,
db: AsyncSession = Depends(get_db),
):
"""Запустити транскрипцію аудіо файлу."""
from app.services.audio_file_service import AudioFileService
# Verify audio file exists
af_service = AudioFileService(db)
audio_file = await af_service.get_by_id(data.audio_file_id)
if not audio_file:
raise HTTPException(status_code=404, detail="Аудіо файл не знайдено")
# Clear existing segments for this audio file
seg_service = SegmentService(db)
existing = await seg_service.get_by_audio_file(data.audio_file_id)
for seg in existing:
await seg_service.delete(seg.id)
# Transcribe
try:
ts = TranscriptionService()
raw_segments = await asyncio.to_thread(
ts.transcribe,
audio_file.file_path,
language=data.language,
model_size=data.model_size,
)
except Exception as e:
raise HTTPException(status_codlist_segments function · python · L70-L73 (4 LOC)backend/app/routers/transcription.py
async def list_segments(project_id: str, db: AsyncSession = Depends(get_db)):
"""Отримати всі сегменти проєкту."""
service = SegmentService(db)
return await service.get_by_project(project_id)update_segment function · python · L77-L87 (11 LOC)backend/app/routers/transcription.py
async def update_segment(
segment_id: str,
data: SegmentUpdate,
db: AsyncSession = Depends(get_db),
):
"""Оновити текст або включення сегменту."""
service = SegmentService(db)
segment = await service.update(segment_id, text=data.text, included=data.included)
if not segment:
raise HTTPException(status_code=404, detail="Сегмент не знайдено")
return segmentdelete_segment function · python · L91-L97 (7 LOC)backend/app/routers/transcription.py
async def delete_segment(segment_id: str, db: AsyncSession = Depends(get_db)):
"""Видалити сегмент."""
service = SegmentService(db)
deleted = await service.delete(segment_id)
if not deleted:
raise HTTPException(status_code=404, detail="Сегмент не знайдено")
return {"success": True}Open data scored by Repobility · https://repobility.com
merge_segments function · python · L101-L107 (7 LOC)backend/app/routers/transcription.py
async def merge_segments(data: MergeRequest, db: AsyncSession = Depends(get_db)):
"""Об'єднати кілька сегментів."""
service = SegmentService(db)
merged = await service.merge(data.segment_ids)
if not merged:
raise HTTPException(status_code=400, detail="Не вдалося об'єднати сегменти")
return mergedsplit_segment function · python · L111-L121 (11 LOC)backend/app/routers/transcription.py
async def split_segment(
segment_id: str,
data: SplitRequest,
db: AsyncSession = Depends(get_db),
):
"""Розділити сегмент на два."""
service = SegmentService(db)
result = await service.split(segment_id, data.split_time)
if not result:
raise HTTPException(status_code=400, detail="Не вдалося розділити сегмент")
return list(result)available_models function · python · L125-L137 (13 LOC)backend/app/routers/transcription.py
async def available_models():
"""Доступні моделі Whisper та рекомендації для GPU."""
return {
"models": [
{"id": "tiny", "size_mb": 75, "vram_mb": 400, "speed": "fastest", "quality": "low"},
{"id": "base", "size_mb": 145, "vram_mb": 500, "speed": "fast", "quality": "medium"},
{"id": "small", "size_mb": 488, "vram_mb": 1000, "speed": "medium", "quality": "good"},
{"id": "medium", "size_mb": 1500, "vram_mb": 2500, "speed": "slow", "quality": "high"},
{"id": "large-v3", "size_mb": 3000, "vram_mb": 4500, "speed": "slowest", "quality": "best"},
],
"recommended": "small",
"note": "RTX 3050 (4GB): small рекомендовано, medium можливо",
}websocket_endpoint function · python · L9-L18 (10 LOC)backend/app/routers/ws.py
async def websocket_endpoint(websocket: WebSocket, project_id: str):
await ws_manager.connect(project_id, websocket)
try:
while True:
data = await websocket.receive_text()
# Client can send ping/pong or commands
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
ws_manager.disconnect(project_id, websocket)get_video_info function · python · L19-L26 (8 LOC)backend/app/routers/youtube.py
async def get_video_info(url: str):
"""Отримати інформацію про YouTube відео."""
try:
service = YoutubeService()
info = await asyncio.to_thread(service.get_video_info, url)
return info
except Exception as e:
raise HTTPException(status_code=400, detail=f"Не вдалося отримати інформацію: {str(e)}")download_youtube function · python · L30-L70 (41 LOC)backend/app/routers/youtube.py
async def download_youtube(
data: YoutubeDownloadRequest,
db: AsyncSession = Depends(get_db),
):
"""Завантажити аудіо з YouTube."""
try:
import os
# Ensure GNOME env for Chrome keyring access in worker thread
os.environ.setdefault("XDG_CURRENT_DESKTOP", "GNOME")
os.environ.setdefault("DESKTOP_SESSION", "gnome")
yt_service = YoutubeService()
# Download audio
result = await asyncio.to_thread(
yt_service.download_audio,
data.url,
data.project_id,
data.audio_format,
)
# Save to database
af_service = AudioFileService(db)
audio_file = await af_service.create(
project_id=data.project_id,
filename=result["filename"],
file_path=result["file_path"],
source_url=result["source_url"],
duration_sec=result["duration_sec"],
file_size_bytes=result["file_size_bytes"],
)
upload_audio function · python · L74-L98 (25 LOC)backend/app/routers/youtube.py
async def upload_audio(
project_id: str = Form(...),
file: UploadFile = File(...),
db: AsyncSession = Depends(get_db),
):
"""Завантажити аудіо файл напряму."""
allowed_extensions = {".wav", ".mp3", ".flac", ".ogg", ".m4a"}
ext = "." + file.filename.rsplit(".", 1)[-1].lower() if "." in file.filename else ""
if ext not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"Непідтримуваний формат. Дозволено: {', '.join(allowed_extensions)}",
)
content = await file.read()
if len(content) > 500 * 1024 * 1024: # 500MB limit
raise HTTPException(status_code=400, detail="Файл занадто великий (макс. 500MB)")
service = AudioFileService(db)
audio_file = await service.save_uploaded_file(
project_id=project_id,
filename=file.filename,
content=content,
)
return audio_filelist_audio_files function · python · L102-L105 (4 LOC)backend/app/routers/youtube.py
async def list_audio_files(project_id: str, db: AsyncSession = Depends(get_db)):
"""Список аудіо файлів проєкту."""
service = AudioFileService(db)
return await service.get_by_project(project_id)Repobility — the code-quality scanner for AI-generated software · https://repobility.com
delete_audio_file function · python · L109-L115 (7 LOC)backend/app/routers/youtube.py
async def delete_audio_file(audio_file_id: str, db: AsyncSession = Depends(get_db)):
"""Видалити аудіо файл."""
service = AudioFileService(db)
deleted = await service.delete(audio_file_id)
if not deleted:
raise HTTPException(status_code=404, detail="Файл не знайдено")
return {"success": True, "message": "Файл видалено"}upload_cookies function · python · L119-L126 (8 LOC)backend/app/routers/youtube.py
async def upload_cookies(file: UploadFile = File(...)):
"""Завантажити cookies.txt для доступу до захищених відео YouTube."""
if not file.filename.endswith(".txt"):
raise HTTPException(status_code=400, detail="Потрібен файл cookies.txt")
content = await file.read()
cookies_path = settings.storage_path / "cookies.txt"
cookies_path.write_bytes(content)
return {"success": True, "message": "Cookies збережено"}refresh_cookies function · python · L130-L136 (7 LOC)backend/app/routers/youtube.py
async def refresh_cookies():
"""Оновити cookies з Chrome браузера."""
service = YoutubeService()
success = await asyncio.to_thread(service.ensure_cookies)
if success:
return {"success": True, "message": "Cookies оновлено з Chrome"}
raise HTTPException(status_code=500, detail="Не вдалося оновити cookies. Переконайтесь що Chrome авторизований у YouTube.")page 1 / 4next ›