Function bodies 178 total
cookies_status function · python · L140-L143 (4 LOC)backend/app/routers/youtube.py
async def cookies_status():
"""Перевірити наявність cookies."""
cookies_path = settings.storage_path / "cookies.txt"
return {"has_cookies": cookies_path.exists()}TaskResponse class · python · L4-L7 (4 LOC)backend/app/schemas/common.py
class TaskResponse(BaseModel):
task_id: str
status: str = "queued"
message: str = ""SuccessResponse class · python · L14-L16 (3 LOC)backend/app/schemas/common.py
class SuccessResponse(BaseModel):
success: bool = True
message: str = ""DatasetPrepareRequest class · python · L6-L10 (5 LOC)backend/app/schemas/dataset.py
class DatasetPrepareRequest(BaseModel):
project_id: str
min_duration: float = Field(default=1.0, ge=0.5, le=10.0)
max_duration: float = Field(default=15.0, ge=2.0, le=30.0)
sample_rate: int = Field(default=22050)DatasetResponse class · python · L13-L23 (11 LOC)backend/app/schemas/dataset.py
class DatasetResponse(BaseModel):
id: str
project_id: str
csv_path: str
audio_dir: str
total_segments: int
total_duration: float
config: str | None
created_at: datetime
model_config = {"from_attributes": True}DatasetStatsResponse class · python · L26-L33 (8 LOC)backend/app/schemas/dataset.py
class DatasetStatsResponse(BaseModel):
total_segments: int
total_duration_sec: float = 0
avg_duration_sec: float = 0
min_duration_sec: float = 0
max_duration_sec: float = 0
avg_text_length: float = 0
duration_histogram: list[dict] = []ValidationIssue class · python · L36-L38 (3 LOC)backend/app/schemas/dataset.py
class ValidationIssue(BaseModel):
level: str
message: strHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
ProjectCreate class · python · L6-L11 (6 LOC)backend/app/schemas/project.py
class ProjectCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=255)
language: str = Field(default="uk", max_length=10)
espeak_voice: str = Field(default="uk", max_length=20)
sample_rate: int = Field(default=22050)
description: str | None = NoneProjectUpdate class · python · L14-L19 (6 LOC)backend/app/schemas/project.py
class ProjectUpdate(BaseModel):
name: str | None = Field(None, min_length=1, max_length=255)
language: str | None = Field(None, max_length=10)
espeak_voice: str | None = Field(None, max_length=20)
sample_rate: int | None = None
description: str | None = NoneProjectResponse class · python · L22-L33 (12 LOC)backend/app/schemas/project.py
class ProjectResponse(BaseModel):
id: str
name: str
language: str
espeak_voice: str
sample_rate: int
status: str
description: str | None
created_at: datetime
updated_at: datetime
model_config = {"from_attributes": True}ProjectListResponse class · python · L36-L43 (8 LOC)backend/app/schemas/project.py
class ProjectListResponse(BaseModel):
id: str
name: str
language: str
status: str
created_at: datetime
model_config = {"from_attributes": True}ExportRequest class · python · L6-L8 (3 LOC)backend/app/schemas/synthesis.py
class ExportRequest(BaseModel):
project_id: str
checkpoint_path: strExportedModelResponse class · python · L11-L20 (10 LOC)backend/app/schemas/synthesis.py
class ExportedModelResponse(BaseModel):
id: str
project_id: str
checkpoint_id: str
onnx_path: str
config_path: str
file_size_bytes: int | None
created_at: datetime
model_config = {"from_attributes": True}SynthesizeRequest class · python · L23-L29 (7 LOC)backend/app/schemas/synthesis.py
class SynthesizeRequest(BaseModel):
model_id: str
text: str = Field(..., min_length=1, max_length=5000)
speaker_id: int | None = None
length_scale: float = Field(default=1.0, ge=0.1, le=5.0)
noise_scale: float = Field(default=0.667, ge=0.0, le=1.0)
noise_w: float = Field(default=0.8, ge=0.0, le=1.0)TrainingStartRequest class · python · L4-L14 (11 LOC)backend/app/schemas/training.py
class TrainingStartRequest(BaseModel):
project_id: str
dataset_id: str
mode: str = Field(default="scratch", pattern="^(scratch|finetune)$")
base_checkpoint: str | None = None
batch_size: int = Field(default=4, ge=1, le=64)
max_epochs: int = Field(default=10000, ge=10, le=100000)
precision: str = Field(default="32", pattern="^(16-mixed|32|bf16-mixed)$")
accumulate_grad_batches: int = Field(default=8, ge=1, le=32)
espeak_voice: str = Field(default="uk")
sample_rate: int = Field(default=22050)Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
TrainingStatusResponse class · python · L17-L24 (8 LOC)backend/app/schemas/training.py
class TrainingStatusResponse(BaseModel):
active: bool
run_id: str | None = None
pid: int | None = None
metrics: dict = {}
log_lines: list[str] = []
started_at: str | None = None
elapsed_seconds: float = 0CheckpointInfo class · python · L27-L31 (5 LOC)backend/app/schemas/training.py
class CheckpointInfo(BaseModel):
path: str
filename: str
size_mb: float
modified: str | None = NoneTranscriptionRequest class · python · L6-L10 (5 LOC)backend/app/schemas/transcription.py
class TranscriptionRequest(BaseModel):
project_id: str
audio_file_id: str
model_size: str = Field(default="small", pattern="^(tiny|base|small|medium|large-v3)$")
language: str | None = NoneSegmentResponse class · python · L13-L28 (16 LOC)backend/app/schemas/transcription.py
class SegmentResponse(BaseModel):
id: str
project_id: str
audio_file_id: str
start_time: float
end_time: float
text: str
text_edited: bool
included: bool
created_at: datetime
model_config = {"from_attributes": True}
@property
def duration_sec(self) -> float:
return self.end_time - self.start_timeSegmentUpdate class · python · L31-L33 (3 LOC)backend/app/schemas/transcription.py
class SegmentUpdate(BaseModel):
text: str | None = None
included: bool | None = NoneYoutubeDownloadRequest class · python · L6-L9 (4 LOC)backend/app/schemas/youtube.py
class YoutubeDownloadRequest(BaseModel):
project_id: str
url: str = Field(..., min_length=1)
audio_format: str = Field(default="wav")YoutubeInfoResponse class · python · L12-L17 (6 LOC)backend/app/schemas/youtube.py
class YoutubeInfoResponse(BaseModel):
title: str
duration: int
uploader: str
thumbnail: str
description: strAudioFileResponse class · python · L20-L30 (11 LOC)backend/app/schemas/youtube.py
class AudioFileResponse(BaseModel):
id: str
project_id: str
filename: str
source_url: str | None
duration_sec: float | None
file_path: str
file_size_bytes: int | None
created_at: datetime
model_config = {"from_attributes": True}Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
AudioFileService class · python · L11-L103 (93 LOC)backend/app/services/audio_file_service.py
class AudioFileService:
def __init__(self, db: AsyncSession):
self.db = db
async def create(
self,
project_id: str,
filename: str,
file_path: str,
source_url: str | None = None,
duration_sec: float | None = None,
file_size_bytes: int | None = None,
) -> AudioFile:
audio_file = AudioFile(
project_id=project_id,
filename=filename,
file_path=file_path,
source_url=source_url,
duration_sec=duration_sec,
file_size_bytes=file_size_bytes,
)
self.db.add(audio_file)
await self.db.commit()
await self.db.refresh(audio_file)
return audio_file
async def get_by_project(self, project_id: str) -> list[AudioFile]:
result = await self.db.execute(
select(AudioFile)
.where(AudioFile.project_id == project_id)
.order_by(AudioFile.created_at.desc())
)
create method · python · L15-L35 (21 LOC)backend/app/services/audio_file_service.py
async def create(
self,
project_id: str,
filename: str,
file_path: str,
source_url: str | None = None,
duration_sec: float | None = None,
file_size_bytes: int | None = None,
) -> AudioFile:
audio_file = AudioFile(
project_id=project_id,
filename=filename,
file_path=file_path,
source_url=source_url,
duration_sec=duration_sec,
file_size_bytes=file_size_bytes,
)
self.db.add(audio_file)
await self.db.commit()
await self.db.refresh(audio_file)
return audio_fileget_by_project method · python · L37-L43 (7 LOC)backend/app/services/audio_file_service.py
async def get_by_project(self, project_id: str) -> list[AudioFile]:
result = await self.db.execute(
select(AudioFile)
.where(AudioFile.project_id == project_id)
.order_by(AudioFile.created_at.desc())
)
return list(result.scalars().all())get_by_id method · python · L45-L49 (5 LOC)backend/app/services/audio_file_service.py
async def get_by_id(self, audio_file_id: str) -> AudioFile | None:
result = await self.db.execute(
select(AudioFile).where(AudioFile.id == audio_file_id)
)
return result.scalar_one_or_none()delete method · python · L51-L63 (13 LOC)backend/app/services/audio_file_service.py
async def delete(self, audio_file_id: str) -> bool:
audio_file = await self.get_by_id(audio_file_id)
if not audio_file:
return False
# Delete physical file
abs_path = settings.storage_path / audio_file.file_path
if abs_path.exists():
abs_path.unlink()
await self.db.delete(audio_file)
await self.db.commit()
return Truesave_uploaded_file method · python · L65-L87 (23 LOC)backend/app/services/audio_file_service.py
async def save_uploaded_file(
self,
project_id: str,
filename: str,
content: bytes,
) -> AudioFile:
"""Зберегти завантажений файл користувача."""
output_dir = settings.projects_path / project_id / "raw_audio"
output_dir.mkdir(parents=True, exist_ok=True)
file_path = output_dir / filename
file_path.write_bytes(content)
rel_path = str(file_path.relative_to(settings.storage_path))
duration = self._get_duration(file_path)
return await self.create(
project_id=project_id,
filename=filename,
file_path=rel_path,
duration_sec=duration,
file_size_bytes=len(content),
)_get_duration method · python · L89-L103 (15 LOC)backend/app/services/audio_file_service.py
def _get_duration(self, file_path: Path) -> float:
import subprocess
try:
result = subprocess.run(
[
"ffprobe", "-v", "quiet",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(file_path),
],
capture_output=True, text=True, timeout=10,
)
return float(result.stdout.strip())
except Exception:
return 0.0DatasetService class · python · L16-L283 (268 LOC)backend/app/services/dataset_service.py
class DatasetService:
def __init__(self, db: AsyncSession):
self.db = db
async def prepare(
self,
project_id: str,
min_duration: float = 1.0,
max_duration: float = 15.0,
sample_rate: int = 22050,
) -> Dataset:
"""Підготувати датасет з сегментів: нарізка WAV, нормалізація, генерація CSV."""
# Get included segments
result = await self.db.execute(
select(Segment)
.where(Segment.project_id == project_id, Segment.included == True)
.order_by(Segment.start_time)
)
segments = list(result.scalars().all())
if not segments:
raise ValueError("Немає включених сегментів для датасету")
# Setup directories
dataset_dir = settings.projects_path / project_id / "dataset"
audio_dir = dataset_dir / "wavs"
audio_dir.mkdir(parents=True, exist_ok=True)
# Filter by duration
valid_segments = [
s Repobility · open methodology · https://repobility.com/research/
prepare method · python · L20-L159 (140 LOC)backend/app/services/dataset_service.py
async def prepare(
self,
project_id: str,
min_duration: float = 1.0,
max_duration: float = 15.0,
sample_rate: int = 22050,
) -> Dataset:
"""Підготувати датасет з сегментів: нарізка WAV, нормалізація, генерація CSV."""
# Get included segments
result = await self.db.execute(
select(Segment)
.where(Segment.project_id == project_id, Segment.included == True)
.order_by(Segment.start_time)
)
segments = list(result.scalars().all())
if not segments:
raise ValueError("Немає включених сегментів для датасету")
# Setup directories
dataset_dir = settings.projects_path / project_id / "dataset"
audio_dir = dataset_dir / "wavs"
audio_dir.mkdir(parents=True, exist_ok=True)
# Filter by duration
valid_segments = [
s for s in segments
if min_duration <= (s.end_time - s.start_time) <= max_duget_by_project method · python · L161-L167 (7 LOC)backend/app/services/dataset_service.py
async def get_by_project(self, project_id: str) -> list[Dataset]:
result = await self.db.execute(
select(Dataset)
.where(Dataset.project_id == project_id)
.order_by(Dataset.created_at.desc())
)
return list(result.scalars().all())get_by_id method · python · L169-L173 (5 LOC)backend/app/services/dataset_service.py
async def get_by_id(self, dataset_id: str) -> Dataset | None:
result = await self.db.execute(
select(Dataset).where(Dataset.id == dataset_id)
)
return result.scalar_one_or_none()get_stats method · python · L175-L214 (40 LOC)backend/app/services/dataset_service.py
async def get_stats(self, dataset_id: str) -> dict:
"""Статистика датасету."""
dataset = await self.get_by_id(dataset_id)
if not dataset:
return {}
csv_path = settings.storage_path / dataset.csv_path
if not csv_path.exists():
return {}
lines = csv_path.read_text(encoding="utf-8").strip().split("\n")
durations = []
text_lengths = []
audio_dir = settings.storage_path / dataset.audio_dir
for line in lines:
parts = line.split("|", 1)
if len(parts) != 2:
continue
wav_name, text = parts
text_lengths.append(len(text))
wav_path = audio_dir / wav_name
if wav_path.exists():
dur = self._get_duration(wav_path)
if dur > 0:
durations.append(dur)
if not durations:
return {"total_segments": 0}
return {
"total_seg_histogram method · python · L216-L229 (14 LOC)backend/app/services/dataset_service.py
def _histogram(self, values: list[float], bins: int = 10) -> list[dict]:
if not values:
return []
mn, mx = min(values), max(values)
if mn == mx:
return [{"min": mn, "max": mx, "count": len(values)}]
step = (mx - mn) / bins
result = []
for i in range(bins):
lo = mn + i * step
hi = mn + (i + 1) * step
count = sum(1 for v in values if lo <= v < hi) if i < bins - 1 else sum(1 for v in values if lo <= v <= hi)
result.append({"min": round(lo, 1), "max": round(hi, 1), "count": count})
return result_get_duration method · python · L231-L240 (10 LOC)backend/app/services/dataset_service.py
def _get_duration(self, path: Path) -> float:
try:
result = subprocess.run(
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", str(path)],
capture_output=True, text=True, timeout=5,
)
return float(result.stdout.strip())
except Exception:
return 0.0validate method · python · L242-L283 (42 LOC)backend/app/services/dataset_service.py
async def validate(self, dataset_id: str) -> list[dict]:
"""Валідація датасету — пошук проблем."""
dataset = await self.get_by_id(dataset_id)
if not dataset:
return [{"level": "error", "message": "Датасет не знайдено"}]
issues = []
csv_path = settings.storage_path / dataset.csv_path
audio_dir = settings.storage_path / dataset.audio_dir
if not csv_path.exists():
issues.append({"level": "error", "message": "metadata.csv не знайдено"})
return issues
lines = csv_path.read_text(encoding="utf-8").strip().split("\n")
missing_audio = 0
short_text = 0
for line in lines:
parts = line.split("|", 1)
if len(parts) != 2:
continue
wav_name, text = parts
if not (audio_dir / wav_name).exists():
missing_audio += 1
if len(text.strip()) < 3:
short_text += 1
if ExportService class · python · L16-L109 (94 LOC)backend/app/services/export_service.py
class ExportService:
def __init__(self, db: AsyncSession):
self.db = db
async def export_onnx(
self,
project_id: str,
checkpoint_path: str,
) -> ExportedModel:
"""Експортувати checkpoint в ONNX формат для Piper."""
ckpt = Path(checkpoint_path)
if not ckpt.exists():
raise FileNotFoundError(f"Checkpoint не знайдено: {checkpoint_path}")
exports_dir = settings.projects_path / project_id / "exports"
exports_dir.mkdir(parents=True, exist_ok=True)
# Output paths
model_name = ckpt.stem.replace("=", "_")
onnx_path = exports_dir / f"{model_name}.onnx"
config_path = exports_dir / f"{model_name}.onnx.json"
# Run piper export
cmd = [
"python3", "-m", "piper.train.export_onnx",
"--checkpoint", str(ckpt),
"--output-file", str(onnx_path),
]
logger.info(f"Exporting: {' '.join(cmd)}")
result = subpHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
export_onnx method · python · L20-L95 (76 LOC)backend/app/services/export_service.py
async def export_onnx(
self,
project_id: str,
checkpoint_path: str,
) -> ExportedModel:
"""Експортувати checkpoint в ONNX формат для Piper."""
ckpt = Path(checkpoint_path)
if not ckpt.exists():
raise FileNotFoundError(f"Checkpoint не знайдено: {checkpoint_path}")
exports_dir = settings.projects_path / project_id / "exports"
exports_dir.mkdir(parents=True, exist_ok=True)
# Output paths
model_name = ckpt.stem.replace("=", "_")
onnx_path = exports_dir / f"{model_name}.onnx"
config_path = exports_dir / f"{model_name}.onnx.json"
# Run piper export
cmd = [
"python3", "-m", "piper.train.export_onnx",
"--checkpoint", str(ckpt),
"--output-file", str(onnx_path),
]
logger.info(f"Exporting: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
get_by_project method · python · L97-L103 (7 LOC)backend/app/services/export_service.py
async def get_by_project(self, project_id: str) -> list[ExportedModel]:
result = await self.db.execute(
select(ExportedModel)
.where(ExportedModel.project_id == project_id)
.order_by(ExportedModel.created_at.desc())
)
return list(result.scalars().all())get_by_id method · python · L105-L109 (5 LOC)backend/app/services/export_service.py
async def get_by_id(self, model_id: str) -> ExportedModel | None:
result = await self.db.execute(
select(ExportedModel).where(ExportedModel.id == model_id)
)
return result.scalar_one_or_none()GpuManager class · python · L5-L70 (66 LOC)backend/app/services/gpu_manager.py
class GpuManager:
_lock = threading.Lock()
_current_task: str | None = None
def get_status(self) -> dict:
try:
result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=name,memory.used,memory.total,memory.free,temperature.gpu,utilization.gpu,power.draw",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return {"available": False, "error": "nvidia-smi failed"}
parts = [p.strip() for p in result.stdout.strip().split(",")]
return {
"available": True,
"name": parts[0],
"vram_used_mb": int(float(parts[1])),
"vram_total_mb": int(float(parts[2])),
"vram_free_mb": int(float(parts[3])),
"temperature_c": int(floaget_status method · python · L9-L39 (31 LOC)backend/app/services/gpu_manager.py
def get_status(self) -> dict:
try:
result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=name,memory.used,memory.total,memory.free,temperature.gpu,utilization.gpu,power.draw",
"--format=csv,noheader,nounits",
],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return {"available": False, "error": "nvidia-smi failed"}
parts = [p.strip() for p in result.stdout.strip().split(",")]
return {
"available": True,
"name": parts[0],
"vram_used_mb": int(float(parts[1])),
"vram_total_mb": int(float(parts[2])),
"vram_free_mb": int(float(parts[3])),
"temperature_c": int(float(parts[4])),
"utilization_pct": int(float(parts[5])),
get_free_vram_mb method · python · L41-L43 (3 LOC)backend/app/services/gpu_manager.py
def get_free_vram_mb(self) -> int:
status = self.get_status()
return status.get("vram_free_mb", 0)acquire_gpu method · python · L48-L53 (6 LOC)backend/app/services/gpu_manager.py
def acquire_gpu(self, task_id: str) -> bool:
with self._lock:
if self._current_task is not None:
return False
self._current_task = task_id
return Truerelease_gpu method · python · L55-L58 (4 LOC)backend/app/services/gpu_manager.py
def release_gpu(self, task_id: str) -> None:
with self._lock:
if self._current_task == task_id:
self._current_task = NoneMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
estimate_batch_size method · python · L60-L70 (11 LOC)backend/app/services/gpu_manager.py
def estimate_batch_size(self, available_vram_mb: int) -> int:
if available_vram_mb >= 20000:
return 32
elif available_vram_mb >= 10000:
return 16
elif available_vram_mb >= 6000:
return 8
elif available_vram_mb >= 3500:
return 4
else:
return 2ProjectService class · python · L11-L74 (64 LOC)backend/app/services/project_service.py
class ProjectService:
def __init__(self, db: AsyncSession):
self.db = db
async def create(self, data: ProjectCreate) -> Project:
project = Project(**data.model_dump())
self.db.add(project)
await self.db.commit()
await self.db.refresh(project)
# Create project directory structure
project_dir = settings.projects_path / project.id
for subdir in ["raw_audio", "segments", "cache", "dataset", "checkpoints", "exports", "logs"]:
(project_dir / subdir).mkdir(parents=True, exist_ok=True)
return project
async def get_all(self) -> list[Project]:
result = await self.db.execute(
select(Project).order_by(Project.created_at.desc())
)
return list(result.scalars().all())
async def get_by_id(self, project_id: str) -> Project | None:
result = await self.db.execute(
select(Project).where(Project.id == project_id)
)
return result.scacreate method · python · L15-L26 (12 LOC)backend/app/services/project_service.py
async def create(self, data: ProjectCreate) -> Project:
project = Project(**data.model_dump())
self.db.add(project)
await self.db.commit()
await self.db.refresh(project)
# Create project directory structure
project_dir = settings.projects_path / project.id
for subdir in ["raw_audio", "segments", "cache", "dataset", "checkpoints", "exports", "logs"]:
(project_dir / subdir).mkdir(parents=True, exist_ok=True)
return project