Function bodies 178 total
get_all method · python · L28-L32 (5 LOC)backend/app/services/project_service.py
async def get_all(self) -> list[Project]:
result = await self.db.execute(
select(Project).order_by(Project.created_at.desc())
)
return list(result.scalars().all())get_by_id method · python · L34-L38 (5 LOC)backend/app/services/project_service.py
async def get_by_id(self, project_id: str) -> Project | None:
result = await self.db.execute(
select(Project).where(Project.id == project_id)
)
return result.scalar_one_or_none()update method · python · L40-L51 (12 LOC)backend/app/services/project_service.py
async def update(self, project_id: str, data: ProjectUpdate) -> Project | None:
project = await self.get_by_id(project_id)
if not project:
return None
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(project, field, value)
await self.db.commit()
await self.db.refresh(project)
return projectdelete method · python · L53-L65 (13 LOC)backend/app/services/project_service.py
async def delete(self, project_id: str) -> bool:
project = await self.get_by_id(project_id)
if not project:
return False
# Delete project files
project_dir = settings.projects_path / project_id
if project_dir.exists():
shutil.rmtree(project_dir)
await self.db.delete(project)
await self.db.commit()
return Trueupdate_status method · python · L67-L74 (8 LOC)backend/app/services/project_service.py
async def update_status(self, project_id: str, status: str) -> Project | None:
project = await self.get_by_id(project_id)
if not project:
return None
project.status = status
await self.db.commit()
await self.db.refresh(project)
return projectSegmentService class · python · L7-L148 (142 LOC)backend/app/services/segment_service.py
class SegmentService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_batch(self, project_id: str, audio_file_id: str, segments: list[dict]) -> list[Segment]:
"""Створити сегменти з результатів транскрипції."""
db_segments = []
for seg_data in segments:
segment = Segment(
project_id=project_id,
audio_file_id=audio_file_id,
start_time=seg_data["start"],
end_time=seg_data["end"],
text=seg_data["text"],
)
self.db.add(segment)
db_segments.append(segment)
await self.db.commit()
for seg in db_segments:
await self.db.refresh(seg)
return db_segments
async def get_by_project(self, project_id: str) -> list[Segment]:
result = await self.db.execute(
select(Segment)
.where(Segment.project_id == project_id)
.order_by(Segment.starcreate_batch method · python · L11-L28 (18 LOC)backend/app/services/segment_service.py
async def create_batch(self, project_id: str, audio_file_id: str, segments: list[dict]) -> list[Segment]:
"""Створити сегменти з результатів транскрипції."""
db_segments = []
for seg_data in segments:
segment = Segment(
project_id=project_id,
audio_file_id=audio_file_id,
start_time=seg_data["start"],
end_time=seg_data["end"],
text=seg_data["text"],
)
self.db.add(segment)
db_segments.append(segment)
await self.db.commit()
for seg in db_segments:
await self.db.refresh(seg)
return db_segmentsWant this analysis on your repo? https://repobility.com/scan/
get_by_project method · python · L30-L36 (7 LOC)backend/app/services/segment_service.py
async def get_by_project(self, project_id: str) -> list[Segment]:
result = await self.db.execute(
select(Segment)
.where(Segment.project_id == project_id)
.order_by(Segment.start_time)
)
return list(result.scalars().all())get_by_audio_file method · python · L38-L44 (7 LOC)backend/app/services/segment_service.py
async def get_by_audio_file(self, audio_file_id: str) -> list[Segment]:
result = await self.db.execute(
select(Segment)
.where(Segment.audio_file_id == audio_file_id)
.order_by(Segment.start_time)
)
return list(result.scalars().all())get_by_id method · python · L46-L50 (5 LOC)backend/app/services/segment_service.py
async def get_by_id(self, segment_id: str) -> Segment | None:
result = await self.db.execute(
select(Segment).where(Segment.id == segment_id)
)
return result.scalar_one_or_none()update method · python · L52-L63 (12 LOC)backend/app/services/segment_service.py
async def update(self, segment_id: str, text: str | None = None, included: bool | None = None) -> Segment | None:
segment = await self.get_by_id(segment_id)
if not segment:
return None
if text is not None:
segment.text = text
segment.text_edited = True
if included is not None:
segment.included = included
await self.db.commit()
await self.db.refresh(segment)
return segmentdelete method · python · L65-L71 (7 LOC)backend/app/services/segment_service.py
async def delete(self, segment_id: str) -> bool:
segment = await self.get_by_id(segment_id)
if not segment:
return False
await self.db.delete(segment)
await self.db.commit()
return Truemerge method · python · L73-L106 (34 LOC)backend/app/services/segment_service.py
async def merge(self, segment_ids: list[str]) -> Segment | None:
"""Об'єднати кілька сегментів в один."""
if len(segment_ids) < 2:
return None
segments = []
for sid in segment_ids:
seg = await self.get_by_id(sid)
if seg:
segments.append(seg)
if len(segments) < 2:
return None
segments.sort(key=lambda s: s.start_time)
# Create merged segment
merged = Segment(
project_id=segments[0].project_id,
audio_file_id=segments[0].audio_file_id,
start_time=segments[0].start_time,
end_time=segments[-1].end_time,
text=" ".join(s.text for s in segments),
text_edited=True,
)
self.db.add(merged)
# Delete originals
for seg in segments:
await self.db.delete(seg)
await self.db.commit()
await self.db.refresh(merged)
return mergedsplit method · python · L108-L139 (32 LOC)backend/app/services/segment_service.py
async def split(self, segment_id: str, split_time: float) -> tuple[Segment, Segment] | None:
"""Розділити сегмент на два в заданій точці часу."""
segment = await self.get_by_id(segment_id)
if not segment:
return None
if split_time <= segment.start_time or split_time >= segment.end_time:
return None
# Create two new segments
seg1 = Segment(
project_id=segment.project_id,
audio_file_id=segment.audio_file_id,
start_time=segment.start_time,
end_time=split_time,
text=segment.text,
text_edited=True,
)
seg2 = Segment(
project_id=segment.project_id,
audio_file_id=segment.audio_file_id,
start_time=split_time,
end_time=segment.end_time,
text="",
text_edited=True,
)
self.db.add(seg1)
self.db.add(seg2)
await self.db.delete(segment)
delete_by_project method · python · L141-L148 (8 LOC)backend/app/services/segment_service.py
async def delete_by_project(self, project_id: str) -> int:
"""Видалити всі сегменти проєкту."""
segments = await self.get_by_project(project_id)
count = len(segments)
for seg in segments:
await self.db.delete(seg)
await self.db.commit()
return countWant fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
SynthesisService class · python · L11-L75 (65 LOC)backend/app/services/synthesis_service.py
class SynthesisService:
_voice = None
_model_path = None
@classmethod
def get_voice(cls, onnx_path: str, config_path: str | None = None):
"""Завантажити або повернути кешовану модель."""
if cls._voice is not None and cls._model_path == onnx_path:
return cls._voice
from piper import PiperVoice
abs_onnx = Path(onnx_path)
if not abs_onnx.is_absolute():
abs_onnx = settings.storage_path / onnx_path
abs_config = None
if config_path:
abs_config = Path(config_path)
if not abs_config.is_absolute():
abs_config = settings.storage_path / config_path
logger.info(f"Loading voice: {abs_onnx}")
cls._voice = PiperVoice.load(str(abs_onnx), str(abs_config) if abs_config else None)
cls._model_path = onnx_path
return cls._voice
@classmethod
def release(cls):
cls._voice = None
cls._model_path = None
def syntget_voice method · python · L16-L36 (21 LOC)backend/app/services/synthesis_service.py
def get_voice(cls, onnx_path: str, config_path: str | None = None):
"""Завантажити або повернути кешовану модель."""
if cls._voice is not None and cls._model_path == onnx_path:
return cls._voice
from piper import PiperVoice
abs_onnx = Path(onnx_path)
if not abs_onnx.is_absolute():
abs_onnx = settings.storage_path / onnx_path
abs_config = None
if config_path:
abs_config = Path(config_path)
if not abs_config.is_absolute():
abs_config = settings.storage_path / config_path
logger.info(f"Loading voice: {abs_onnx}")
cls._voice = PiperVoice.load(str(abs_onnx), str(abs_config) if abs_config else None)
cls._model_path = onnx_path
return cls._voicerelease method · python · L39-L41 (3 LOC)backend/app/services/synthesis_service.py
def release(cls):
cls._voice = None
cls._model_path = Nonesynthesize method · python · L43-L75 (33 LOC)backend/app/services/synthesis_service.py
def synthesize(
self,
onnx_path: str,
config_path: str | None,
text: str,
speaker_id: int | None = None,
length_scale: float = 1.0,
noise_scale: float = 0.667,
noise_w: float = 0.8,
) -> bytes:
"""Синтезувати мовлення з тексту."""
import tempfile
from piper import SynthesisConfig
voice = self.get_voice(onnx_path, config_path)
synth_config = SynthesisConfig(
speaker_id=speaker_id,
length_scale=length_scale,
noise_scale=noise_scale,
noise_w_scale=noise_w,
)
# Use temp file because synthesize_wav needs a real wave.Wave_write
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
with wave.open(tmp_path, "wb") as wav_file:
voice.synthesize_wav(text, wav_file, syn_config=synth_config)
return Path(tmp_path).rTrainingProcess class · python · L16-L23 (8 LOC)backend/app/services/training_service.py
class TrainingProcess:
"""Контейнер для активного процесу тренування."""
def __init__(self, run_id: str, process: subprocess.Popen):
self.run_id = run_id
self.process = process
self.metrics: dict = {}
self.logs: list[str] = []
self.started_at = datetime.utcnow()__init__ method · python · L18-L23 (6 LOC)backend/app/services/training_service.py
def __init__(self, run_id: str, process: subprocess.Popen):
self.run_id = run_id
self.process = process
self.metrics: dict = {}
self.logs: list[str] = []
self.started_at = datetime.utcnow()TrainingService class · python · L31-L201 (171 LOC)backend/app/services/training_service.py
class TrainingService:
def start_training(
self,
run_id: str,
project_id: str,
dataset_id: str,
csv_path: str,
audio_dir: str,
mode: str = "scratch",
base_checkpoint: str | None = None,
batch_size: int = 4,
max_epochs: int = 10000,
precision: str = "32",
accumulate_grad_batches: int = 8,
sample_rate: int = 22050,
espeak_voice: str = "uk",
on_metrics=None,
on_log=None,
) -> subprocess.Popen:
"""Запустити тренування VITS моделі."""
global _active_training
with _lock:
if _active_training and _active_training.process.poll() is None:
raise RuntimeError("Тренування вже запущено. Зупиніть поточне перед запуском нового.")
# Release GPU memory from previous tasks (whisper, etc.)
self._cleanup_gpu()
project_dir = settings.projects_path / project_id
cache_dir = project_dir / "cstart_training method · python · L32-L112 (81 LOC)backend/app/services/training_service.py
def start_training(
self,
run_id: str,
project_id: str,
dataset_id: str,
csv_path: str,
audio_dir: str,
mode: str = "scratch",
base_checkpoint: str | None = None,
batch_size: int = 4,
max_epochs: int = 10000,
precision: str = "32",
accumulate_grad_batches: int = 8,
sample_rate: int = 22050,
espeak_voice: str = "uk",
on_metrics=None,
on_log=None,
) -> subprocess.Popen:
"""Запустити тренування VITS моделі."""
global _active_training
with _lock:
if _active_training and _active_training.process.poll() is None:
raise RuntimeError("Тренування вже запущено. Зупиніть поточне перед запуском нового.")
# Release GPU memory from previous tasks (whisper, etc.)
self._cleanup_gpu()
project_dir = settings.projects_path / project_id
cache_dir = project_dir / "cache"
log_dir =If a scraper extracted this row, it came from Repobility (https://repobility.com)
stop_training method · python · L114-L130 (17 LOC)backend/app/services/training_service.py
def stop_training(self, run_id: str) -> bool:
global _active_training
with _lock:
if not _active_training or _active_training.run_id != run_id:
return False
bridge = PiperBridge()
bridge.stop_training(_active_training.process)
_active_training = None
# Cleanup GPU
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except ImportError:
pass
gc.collect()
return Trueget_status method · python · L132-L145 (14 LOC)backend/app/services/training_service.py
def get_status(self) -> dict:
if not _active_training:
return {"active": False}
is_running = _active_training.process.poll() is None
return {
"active": is_running,
"run_id": _active_training.run_id,
"pid": _active_training.process.pid,
"metrics": _active_training.metrics,
"log_lines": _active_training.logs[-50:],
"started_at": _active_training.started_at.isoformat(),
"elapsed_seconds": (datetime.utcnow() - _active_training.started_at).total_seconds(),
}get_logs method · python · L147-L150 (4 LOC)backend/app/services/training_service.py
def get_logs(self, last_n: int = 100) -> list[str]:
if not _active_training:
return []
return _active_training.logs[-last_n:]list_checkpoints method · python · L152-L170 (19 LOC)backend/app/services/training_service.py
def list_checkpoints(self, project_id: str) -> list[dict]:
"""Список чекпоінтів проєкту."""
ckpt_dirs = [
settings.projects_path / project_id / "checkpoints",
settings.projects_path / project_id / "logs",
]
checkpoints = []
for d in ckpt_dirs:
if not d.exists():
continue
for f in d.rglob("*.ckpt"):
checkpoints.append({
"path": str(f),
"filename": f.name,
"size_mb": f.stat().st_size / 1024 / 1024,
"modified": datetime.fromtimestamp(f.stat().st_mtime).isoformat(),
})
checkpoints.sort(key=lambda x: x["modified"], reverse=True)
return checkpoints_cleanup_gpu method · python · L172-L187 (16 LOC)backend/app/services/training_service.py
def _cleanup_gpu(self):
"""Звільнити GPU пам'ять від попередніх задач."""
try:
# Release cached whisper model
from app.services.transcription_service import TranscriptionService
TranscriptionService.release_model()
except Exception:
pass
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except ImportError:
pass
gc.collect()
logger.info("GPU memory cleaned up")list_pretrained method · python · L189-L201 (13 LOC)backend/app/services/training_service.py
def list_pretrained(self) -> list[dict]:
"""Список претренованих чекпоінтів."""
pretrained_dir = settings.pretrained_path
if not pretrained_dir.exists():
return []
result = []
for f in pretrained_dir.glob("*.ckpt"):
result.append({
"path": str(f),
"filename": f.name,
"size_mb": f.stat().st_size / 1024 / 1024,
})
return resultTranscriptionService class · python · L10-L116 (107 LOC)backend/app/services/transcription_service.py
class TranscriptionService:
_model = None
_model_size = None
@classmethod
def get_model(cls, model_size: str = "small", device: str = "auto"):
"""Завантажити або повернути кешовану модель Whisper."""
if cls._model is not None and cls._model_size == model_size:
return cls._model
from faster_whisper import WhisperModel
# Auto-detect device
if device == "auto":
device, compute_type = cls._detect_device()
elif device == "cuda":
compute_type = "float16"
else:
compute_type = "int8"
logger.info(f"Loading Whisper model: {model_size} on {device} ({compute_type})")
cls._model = WhisperModel(
model_size,
device=device,
compute_type=compute_type,
)
cls._model_size = model_size
return cls._model
@classmethod
def _detect_device(cls) -> tuple[str, str]:
"""Визначити найкращий пристрій дget_model method · python · L15-L38 (24 LOC)backend/app/services/transcription_service.py
def get_model(cls, model_size: str = "small", device: str = "auto"):
"""Завантажити або повернути кешовану модель Whisper."""
if cls._model is not None and cls._model_size == model_size:
return cls._model
from faster_whisper import WhisperModel
# Auto-detect device
if device == "auto":
device, compute_type = cls._detect_device()
elif device == "cuda":
compute_type = "float16"
else:
compute_type = "int8"
logger.info(f"Loading Whisper model: {model_size} on {device} ({compute_type})")
cls._model = WhisperModel(
model_size,
device=device,
compute_type=compute_type,
)
cls._model_size = model_size
return cls._modelGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_detect_device method · python · L41-L57 (17 LOC)backend/app/services/transcription_service.py
def _detect_device(cls) -> tuple[str, str]:
"""Визначити найкращий пристрій для Whisper."""
try:
import ctranslate2
ctranslate2.get_supported_compute_types("cuda")
return "cuda", "float16"
except Exception:
pass
try:
import torch
if torch.cuda.is_available():
return "cuda", "float16"
except ImportError:
pass
return "cpu", "int8"release_model method · python · L60-L70 (11 LOC)backend/app/services/transcription_service.py
def release_model(cls):
"""Звільнити модель та GPU пам'ять."""
cls._model = None
cls._model_size = None
gc.collect()
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
except ImportError:
passtranscribe method · python · L72-L116 (45 LOC)backend/app/services/transcription_service.py
def transcribe(
self,
audio_path: str,
language: str | None = None,
model_size: str = "small",
progress_callback=None,
) -> list[dict]:
"""Транскрибувати аудіо файл.
Returns: список сегментів [{start, end, text}]
"""
model = self.get_model(model_size)
abs_path = Path(audio_path)
if not abs_path.is_absolute():
abs_path = settings.storage_path / audio_path
logger.info(f"Transcribing: {abs_path}")
segments_iter, info = model.transcribe(
str(abs_path),
language=language,
beam_size=5,
word_timestamps=True,
vad_filter=True,
vad_parameters=dict(
min_silence_duration_ms=500,
speech_pad_ms=200,
),
)
logger.info(f"Detected language: {info.language} (prob: {info.language_probability:.2f})")
segments = []
for i, seg in enumeYoutubeService class · python · L20-L171 (152 LOC)backend/app/services/youtube_service.py
class YoutubeService:
def _get_common_opts(self) -> dict:
"""Загальні опції для yt-dlp."""
opts: dict = {
"quiet": True,
"no_warnings": True,
"remote_components": {"ejs:github"},
}
# Use cookies.txt if available
cookies_file = settings.storage_path / "cookies.txt"
if cookies_file.exists():
opts["cookiefile"] = str(cookies_file)
else:
# Try browser cookies
opts["cookiesfrombrowser"] = ("chrome",)
return opts
def ensure_cookies(self) -> bool:
"""Експортувати Chrome cookies, якщо cookies.txt ще не існує.
Використовує yt-dlp напряму для коректного дешифрування."""
cookies_file = settings.storage_path / "cookies.txt"
if cookies_file.exists():
return True
import yt_dlp
# Use yt-dlp's own cookie extraction to create cookies.txt
ydl_opts = {
"quiet": True,
"co_get_common_opts method · python · L21-L35 (15 LOC)backend/app/services/youtube_service.py
def _get_common_opts(self) -> dict:
"""Загальні опції для yt-dlp."""
opts: dict = {
"quiet": True,
"no_warnings": True,
"remote_components": {"ejs:github"},
}
# Use cookies.txt if available
cookies_file = settings.storage_path / "cookies.txt"
if cookies_file.exists():
opts["cookiefile"] = str(cookies_file)
else:
# Try browser cookies
opts["cookiesfrombrowser"] = ("chrome",)
return optsensure_cookies method · python · L37-L59 (23 LOC)backend/app/services/youtube_service.py
def ensure_cookies(self) -> bool:
"""Експортувати Chrome cookies, якщо cookies.txt ще не існує.
Використовує yt-dlp напряму для коректного дешифрування."""
cookies_file = settings.storage_path / "cookies.txt"
if cookies_file.exists():
return True
import yt_dlp
# Use yt-dlp's own cookie extraction to create cookies.txt
ydl_opts = {
"quiet": True,
"cookiesfrombrowser": ("chrome",),
"remote_components": {"ejs:github"},
"cookiefile": str(cookies_file), # yt-dlp will save cookies here
"simulate": True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# Just extract info from any video to trigger cookie export
ydl.extract_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ", download=False)
return cookies_file.exists()
except Exception:
return cookies_file.exists()get_video_info method · python · L61-L77 (17 LOC)backend/app/services/youtube_service.py
def get_video_info(self, url: str) -> dict:
"""Отримати інформацію про відео без завантаження."""
import yt_dlp
ydl_opts = {
**self._get_common_opts(),
"extract_flat": False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
"title": info.get("title", "Unknown"),
"duration": info.get("duration", 0),
"uploader": info.get("uploader", "Unknown"),
"thumbnail": info.get("thumbnail", ""),
"description": info.get("description", "")[:500],
}download_audio method · python · L79-L152 (74 LOC)backend/app/services/youtube_service.py
def download_audio(
self,
url: str,
project_id: str,
audio_format: str = "wav",
progress_callback=None,
) -> dict:
"""Завантажити аудіо з YouTube та конвертувати в WAV."""
import yt_dlp
output_dir = settings.projects_path / project_id / "raw_audio"
output_dir.mkdir(parents=True, exist_ok=True)
output_template = str(output_dir / "%(title)s.%(ext)s")
ydl_opts = {
**self._get_common_opts(),
"format": "bestaudio/best",
"outtmpl": output_template,
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": audio_format,
"preferredquality": "0",
}
],
}
if progress_callback:
ydl_opts["progress_hooks"] = [progress_callback]
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(urWant this analysis on your repo? https://repobility.com/scan/
_get_audio_duration method · python · L154-L171 (18 LOC)backend/app/services/youtube_service.py
def _get_audio_duration(self, file_path: Path) -> float:
"""Отримати тривалість аудіо через ffprobe."""
try:
result = subprocess.run(
[
"ffprobe",
"-v", "quiet",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
str(file_path),
],
capture_output=True,
text=True,
timeout=10,
)
return float(result.stdout.strip())
except Exception:
return 0.0prepare_dataset function · python · L5-L7 (3 LOC)backend/app/tasks/dataset_tasks.py
def prepare_dataset(self, project_id: str, config: dict):
"""Підготовка датасету - буде реалізовано у Фазі 4."""
passexport_onnx function · python · L5-L7 (3 LOC)backend/app/tasks/export_tasks.py
def export_onnx(self, project_id: str, checkpoint_path: str, output_path: str):
"""Експорт моделі в ONNX формат - буде реалізовано у Фазі 6."""
passtrain_model function · python · L5-L7 (3 LOC)backend/app/tasks/training_tasks.py
def train_model(self, run_id: str, config: dict):
"""Тренування VITS моделі - буде реалізовано у Фазі 5."""
passtranscribe_audio function · python · L5-L7 (3 LOC)backend/app/tasks/transcription_tasks.py
def transcribe_audio(self, project_id: str, audio_file_id: str, model_size: str = "small"):
"""Транскрипція аудіо через faster-whisper - буде реалізовано у Фазі 3."""
passdownload_audio function · python · L5-L7 (3 LOC)backend/app/tasks/youtube_tasks.py
def download_audio(self, project_id: str, url: str, audio_format: str = "wav"):
"""Завантаження аудіо з YouTube - буде реалізовано у Фазі 2."""
passexport_chrome_cookies function · python · L15-L111 (97 LOC)backend/app/utils/chrome_cookies.py
def export_chrome_cookies(output_path: Path, domains: list[str] | None = None) -> int:
"""Експортувати Chrome cookies в Netscape format cookies.txt.
Returns: кількість експортованих cookies.
"""
try:
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
import secretstorage
except ImportError:
return 0
# Get Chrome Safe Storage key from GNOME keyring
try:
bus = secretstorage.dbus_init()
collection = secretstorage.get_default_collection(bus)
chrome_key = None
for item in collection.get_all_items():
if item.get_label() == "Chrome Safe Storage":
chrome_key = item.get_secret()
break
if not chrome_key:
return 0
except Exception:
return 0
derived_key = PBKDF2(chrome_key, b"saltysalt", dkLen=16, count=1)
def decrypt_v11(encrypted_value: bytes) -> str:
if not encrypted_value or len(encrypted_vaTrainingConfig class · python · L18-L49 (32 LOC)backend/app/utils/piper_bridge.py
class TrainingConfig:
"""Конфігурація тренування для RTX 3050."""
def __init__(
self,
csv_path: str,
audio_dir: str,
cache_dir: str,
config_path: str,
espeak_voice: str = "uk",
sample_rate: int = 22050,
batch_size: int = 4,
max_epochs: int = 10000,
precision: str = "32",
accumulate_grad_batches: int = 8,
checkpoint_path: str | None = None,
checkpoint_every_n_epochs: int = 500,
log_dir: str | None = None,
):
self.csv_path = csv_path
self.audio_dir = audio_dir
self.cache_dir = cache_dir
self.config_path = config_path
self.espeak_voice = espeak_voice
self.sample_rate = sample_rate
self.batch_size = batch_size
self.max_epochs = max_epochs
self.precision = precision
self.accumulate_grad_batches = accumulate_grad_batches
self.checkpoint_path = checkpoint_path
self.checkpointWant fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
__init__ method · python · L21-L49 (29 LOC)backend/app/utils/piper_bridge.py
def __init__(
self,
csv_path: str,
audio_dir: str,
cache_dir: str,
config_path: str,
espeak_voice: str = "uk",
sample_rate: int = 22050,
batch_size: int = 4,
max_epochs: int = 10000,
precision: str = "32",
accumulate_grad_batches: int = 8,
checkpoint_path: str | None = None,
checkpoint_every_n_epochs: int = 500,
log_dir: str | None = None,
):
self.csv_path = csv_path
self.audio_dir = audio_dir
self.cache_dir = cache_dir
self.config_path = config_path
self.espeak_voice = espeak_voice
self.sample_rate = sample_rate
self.batch_size = batch_size
self.max_epochs = max_epochs
self.precision = precision
self.accumulate_grad_batches = accumulate_grad_batches
self.checkpoint_path = checkpoint_path
self.checkpoint_every_n_epochs = checkpoint_every_n_epochs
self.log_dir = log_PiperBridge class · python · L52-L202 (151 LOC)backend/app/utils/piper_bridge.py
class PiperBridge:
def build_training_command(self, config: TrainingConfig) -> list[str]:
"""Побудувати CLI команду для piper.train."""
cmd = [
"python3", "-m", "piper.train", "fit",
"--data.voice_name", "custom",
"--data.csv_path", config.csv_path,
"--data.audio_dir", config.audio_dir,
"--data.cache_dir", config.cache_dir,
"--data.config_path", config.config_path,
"--data.espeak_voice", config.espeak_voice,
"--model.sample_rate", str(config.sample_rate),
"--data.batch_size", str(config.batch_size),
"--trainer.max_epochs", str(config.max_epochs),
"--trainer.precision", config.precision,
"--trainer.accelerator", "gpu",
"--trainer.devices", "1",
]
# Checkpoint for fine-tuning
if config.checkpoint_path:
cmd.extend(["--ckpt_path", config.checkpoint_path])
# Logging
build_training_command method · python · L53-L81 (29 LOC)backend/app/utils/piper_bridge.py
def build_training_command(self, config: TrainingConfig) -> list[str]:
"""Побудувати CLI команду для piper.train."""
cmd = [
"python3", "-m", "piper.train", "fit",
"--data.voice_name", "custom",
"--data.csv_path", config.csv_path,
"--data.audio_dir", config.audio_dir,
"--data.cache_dir", config.cache_dir,
"--data.config_path", config.config_path,
"--data.espeak_voice", config.espeak_voice,
"--model.sample_rate", str(config.sample_rate),
"--data.batch_size", str(config.batch_size),
"--trainer.max_epochs", str(config.max_epochs),
"--trainer.precision", config.precision,
"--trainer.accelerator", "gpu",
"--trainer.devices", "1",
]
# Checkpoint for fine-tuning
if config.checkpoint_path:
cmd.extend(["--ckpt_path", config.checkpoint_path])
# Logging
if config.log_dir