Function bodies 212 total
format_size function · python · L215-L225 (11 LOC)services/torznab.py
def format_size(size_bytes: int) -> str:
"""Format bytes as human-readable string."""
if size_bytes < 1024:
return f"{size_bytes} B"
if size_bytes < 1024 ** 2:
return f"{size_bytes / 1024:.1f} KB"
if size_bytes < 1024 ** 3:
return f"{size_bytes / 1024 ** 2:.1f} MB"
if size_bytes < 1024 ** 4:
return f"{size_bytes / 1024 ** 3:.2f} GB"
return f"{size_bytes / 1024 ** 4:.2f} TB"TransferManager class · python · L12-L319 (308 LOC)services/transfer.py
class TransferManager:
"""Manages file transfer operations with progress tracking and cancellation."""
def __init__(self, socketio):
self.socketio = socketio
self.active_transfer = None
self._cancel_event = threading.Event()
self._lock = threading.Lock()
self._processing_queue = False
@property
def is_active(self):
return self.active_transfer is not None
def cancel(self):
self._cancel_event.set()
def start_transfer(self, src_dir, dst_dir, archive_dir, folder_name,
dest_name, selected_items, history_id=None):
"""Start a copy+move transfer in a background thread."""
with self._lock:
if self.is_active:
return {"error": "A transfer is already in progress"}
self._cancel_event.clear()
transfer_id = f"transfer_{int(time.time())}"
self.active_transfer = {
"id": transfer_id,
"fo__init__ method · python · L15-L20 (6 LOC)services/transfer.py
def __init__(self, socketio):
self.socketio = socketio
self.active_transfer = None
self._cancel_event = threading.Event()
self._lock = threading.Lock()
self._processing_queue = Falsestart_transfer method · python · L29-L54 (26 LOC)services/transfer.py
def start_transfer(self, src_dir, dst_dir, archive_dir, folder_name,
dest_name, selected_items, history_id=None):
"""Start a copy+move transfer in a background thread."""
with self._lock:
if self.is_active:
return {"error": "A transfer is already in progress"}
self._cancel_event.clear()
transfer_id = f"transfer_{int(time.time())}"
self.active_transfer = {
"id": transfer_id,
"folder": folder_name,
"dest_name": dest_name,
"items": selected_items,
"started_at": time.time(),
"history_id": history_id,
}
thread = threading.Thread(
target=self._run_transfer,
args=(transfer_id, src_dir, dst_dir, archive_dir,
folder_name, dest_name, selected_items, history_id),
daemon=True,
)
thread.start()
returnprocess_queue method · python · L56-L71 (16 LOC)services/transfer.py
def process_queue(self, src_dir, dst_dir, archive_dir):
"""Process all items in the transfer queue sequentially."""
from services import queue as queue_svc
if self._processing_queue:
return {"error": "Queue is already being processed"}
self._processing_queue = True
thread = threading.Thread(
target=self._run_queue,
args=(src_dir, dst_dir, archive_dir),
daemon=True,
)
thread.start()
return {"status": "queue_processing_started"}_run_queue method · python · L73-L117 (45 LOC)services/transfer.py
def _run_queue(self, src_dir, dst_dir, archive_dir):
"""Process queue items one by one."""
from services import queue as queue_svc
try:
while True:
item = queue_svc.pop_first()
if item is None:
break
# Wait for any active transfer to finish
while self.is_active:
time.sleep(0.5)
if self._cancel_event.is_set():
self._cancel_event.clear()
self.socketio.emit("queue:stopped", {"message": "Queue processing stopped"})
break
# Calculate total bytes for history
total_bytes = 0
src_folder = os.path.join(src_dir, item["source_folder"])
for name in item["items"]:
item_path = os.path.join(src_folder, name)
if os.path.exists(item_path):
total_bytes += ge_emit_log method · python · L119-L126 (8 LOC)services/transfer.py
def _emit_log(self, transfer_id, message, level="info"):
ts = time.strftime("%H:%M:%S")
self.socketio.emit("transfer:log", {
"id": transfer_id,
"timestamp": ts,
"message": message,
"level": level,
})All rows scored by the Repobility analyzer (https://repobility.com)
_emit_progress method · python · L128-L138 (11 LOC)services/transfer.py
def _emit_progress(self, transfer_id, current_file, bytes_copied,
bytes_total, speed_bps, eta_seconds, percent):
self.socketio.emit("transfer:progress", {
"id": transfer_id,
"current_file": current_file,
"bytes_copied": bytes_copied,
"bytes_total": bytes_total,
"speed_bps": speed_bps,
"eta_seconds": eta_seconds,
"percent": percent,
})_emit_complete method · python · L140-L162 (23 LOC)services/transfer.py
def _emit_complete(self, transfer_id, success, message, duration,
history_id=None, archive_folder=None):
self.socketio.emit("transfer:complete", {
"id": transfer_id,
"success": success,
"message": message,
"duration": round(duration, 1),
})
# Update history entry
if history_id:
try:
history.complete_entry(
history_id, success,
archive_folder=archive_folder,
duration_sec=duration,
error_message=message if not success else None,
)
except Exception:
pass
with self._lock:
self.active_transfer = None_run_transfer method · python · L164-L298 (135 LOC)services/transfer.py
def _run_transfer(self, transfer_id, src_dir, dst_dir, archive_dir,
folder_name, dest_name, selected_items, history_id=None):
"""Worker thread: copy selected items then move source to archive."""
start_time = time.time()
errors = []
src_folder = os.path.join(src_dir, folder_name)
dst_folder = unique_path(os.path.join(dst_dir, dest_name))
arc_folder = unique_path(os.path.join(archive_dir, folder_name))
self._emit_log(transfer_id, "---- START ----")
# Create destination folder
try:
os.makedirs(dst_folder, exist_ok=False)
except Exception as e:
self._emit_log(transfer_id,
f"Failed to create destination folder: {dst_folder}\n{e}",
"error")
self._emit_complete(transfer_id, False,
"Failed to create destination folder.",
time.time() -_copy_dir_chunked method · python · L300-L319 (20 LOC)services/transfer.py
def _copy_dir_chunked(self, src_dir, dst_dir, on_chunk):
"""Copy a directory tree with chunked file copies for progress tracking."""
dst_dir = unique_path(dst_dir)
for dirpath, dirnames, filenames in os.walk(src_dir):
rel = os.path.relpath(dirpath, src_dir)
dst_subdir = os.path.join(dst_dir, rel)
os.makedirs(dst_subdir, exist_ok=True)
try:
shutil.copystat(dirpath, dst_subdir)
except OSError:
pass
for filename in filenames:
if self._cancel_event.is_set():
raise InterruptedError("Transfer cancelled")
src_file = os.path.join(dirpath, filename)
dst_file = os.path.join(dst_subdir, filename)
copy_file_chunked(src_file, dst_file, on_chunk)_get_conn function · python · L6-L9 (4 LOC)services/tv_db.py
def _get_conn():
conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row
return conninit_db function · python · L12-L78 (67 LOC)services/tv_db.py
def init_db():
"""Create TV-related tables if they don't exist."""
with _get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS tv_shows (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tmdb_id INTEGER NOT NULL UNIQUE,
imdb_id TEXT,
title TEXT NOT NULL,
original_title TEXT,
overview TEXT,
poster_path TEXT,
backdrop_path TEXT,
first_air_date TEXT,
status TEXT,
total_seasons INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS tv_seasons (
id INTEGER PRIMARY KEY AUTOINCREMENT,
show_id INTEGER NOT NULL,
tmdb_season_id INTEGER,
seaadd_show function · python · L83-L100 (18 LOC)services/tv_db.py
def add_show(tmdb_id: int, imdb_id: str = "", title: str = "",
original_title: str = "", overview: str = "",
poster_path: str = "", backdrop_path: str = "",
first_air_date: str = "", status: str = "",
total_seasons: int = 0) -> int:
with _get_conn() as conn:
cursor = conn.execute(
"""INSERT INTO tv_shows
(tmdb_id, imdb_id, title, original_title, overview,
poster_path, backdrop_path, first_air_date, status,
total_seasons, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(tmdb_id, imdb_id, title, original_title, overview,
poster_path, backdrop_path, first_air_date, status,
total_seasons, time.strftime("%Y-%m-%d %H:%M:%S")),
)
conn.commit()
return cursor.lastrowidget_show function · python · L103-L107 (5 LOC)services/tv_db.py
def get_show(show_id: int) -> dict | None:
with _get_conn() as conn:
row = conn.execute("SELECT * FROM tv_shows WHERE id = ?",
(show_id,)).fetchone()
return dict(row) if row else NoneCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
get_show_by_tmdb_id function · python · L110-L114 (5 LOC)services/tv_db.py
def get_show_by_tmdb_id(tmdb_id: int) -> dict | None:
with _get_conn() as conn:
row = conn.execute("SELECT * FROM tv_shows WHERE tmdb_id = ?",
(tmdb_id,)).fetchone()
return dict(row) if row else Noneget_shows function · python · L117-L121 (5 LOC)services/tv_db.py
def get_shows() -> list[dict]:
with _get_conn() as conn:
rows = conn.execute(
"SELECT * FROM tv_shows ORDER BY id DESC").fetchall()
return [dict(row) for row in rows]add_season function · python · L126-L141 (16 LOC)services/tv_db.py
def add_season(show_id: int, season_number: int, episode_count: int = 0,
tmdb_season_id: int = None, name: str = "",
overview: str = "", air_date: str = "",
poster_path: str = "") -> int:
with _get_conn() as conn:
cursor = conn.execute(
"""INSERT INTO tv_seasons
(show_id, tmdb_season_id, season_number, episode_count,
name, overview, air_date, poster_path, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(show_id, tmdb_season_id, season_number, episode_count,
name, overview, air_date, poster_path,
time.strftime("%Y-%m-%d %H:%M:%S")),
)
conn.commit()
return cursor.lastrowidupdate_season function · python · L144-L151 (8 LOC)services/tv_db.py
def update_season(season_id: int, **kwargs):
if not kwargs:
return
cols = ", ".join(f"{k} = ?" for k in kwargs)
vals = list(kwargs.values()) + [season_id]
with _get_conn() as conn:
conn.execute(f"UPDATE tv_seasons SET {cols} WHERE id = ?", vals)
conn.commit()get_season function · python · L154-L158 (5 LOC)services/tv_db.py
def get_season(season_id: int) -> dict | None:
with _get_conn() as conn:
row = conn.execute("SELECT * FROM tv_seasons WHERE id = ?",
(season_id,)).fetchone()
return dict(row) if row else Noneget_seasons function · python · L161-L166 (6 LOC)services/tv_db.py
def get_seasons(show_id: int) -> list[dict]:
with _get_conn() as conn:
rows = conn.execute(
"SELECT * FROM tv_seasons WHERE show_id = ? ORDER BY season_number",
(show_id,)).fetchall()
return [dict(row) for row in rows]add_episode function · python · L171-L188 (18 LOC)services/tv_db.py
def add_episode(season_id: int, show_id: int, season_number: int,
episode_number: int, tmdb_episode_id: int = None,
title: str = "", overview: str = "", air_date: str = "",
runtime: int = None, still_path: str = "") -> int:
max_retries = int(settings_get("tv_max_retries", "10"))
with _get_conn() as conn:
cursor = conn.execute(
"""INSERT INTO tv_episodes
(season_id, show_id, tmdb_episode_id, season_number,
episode_number, title, overview, air_date, runtime,
still_path, max_retries, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(season_id, show_id, tmdb_episode_id, season_number,
episode_number, title, overview, air_date, runtime,
still_path, max_retries, time.strftime("%Y-%m-%d %H:%M:%S")),
)
conn.commit()
return cursor.lastrowidupdate_episode function · python · L191-L198 (8 LOC)services/tv_db.py
def update_episode(episode_id: int, **kwargs):
if not kwargs:
return
cols = ", ".join(f"{k} = ?" for k in kwargs)
vals = list(kwargs.values()) + [episode_id]
with _get_conn() as conn:
conn.execute(f"UPDATE tv_episodes SET {cols} WHERE id = ?", vals)
conn.commit()Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
get_episode function · python · L201-L205 (5 LOC)services/tv_db.py
def get_episode(episode_id: int) -> dict | None:
with _get_conn() as conn:
row = conn.execute("SELECT * FROM tv_episodes WHERE id = ?",
(episode_id,)).fetchone()
return dict(row) if row else Noneget_episodes function · python · L208-L214 (7 LOC)services/tv_db.py
def get_episodes(season_id: int) -> list[dict]:
with _get_conn() as conn:
rows = conn.execute(
"""SELECT * FROM tv_episodes
WHERE season_id = ? ORDER BY episode_number""",
(season_id,)).fetchall()
return [dict(row) for row in rows]get_episodes_by_status function · python · L217-L230 (14 LOC)services/tv_db.py
def get_episodes_by_status(*statuses) -> list[dict]:
"""Get episodes matching any of the given statuses with retry budget."""
placeholders = ",".join("?" for _ in statuses)
with _get_conn() as conn:
rows = conn.execute(
f"""SELECT e.*, s.title as show_title
FROM tv_episodes e
JOIN tv_shows s ON e.show_id = s.id
WHERE e.status IN ({placeholders})
AND e.retry_count < e.max_retries
ORDER BY e.id""",
statuses,
).fetchall()
return [dict(row) for row in rows]get_episodes_by_download_id function · python · L233-L238 (6 LOC)services/tv_db.py
def get_episodes_by_download_id(download_id: int) -> list[dict]:
with _get_conn() as conn:
rows = conn.execute(
"SELECT * FROM tv_episodes WHERE download_id = ?",
(download_id,)).fetchall()
return [dict(row) for row in rows]get_seasons_by_pack_download_id function · python · L241-L246 (6 LOC)services/tv_db.py
def get_seasons_by_pack_download_id(download_id: int) -> list[dict]:
with _get_conn() as conn:
rows = conn.execute(
"SELECT * FROM tv_seasons WHERE pack_download_id = ?",
(download_id,)).fetchall()
return [dict(row) for row in rows]compute_season_completion function · python · L249-L260 (12 LOC)services/tv_db.py
def compute_season_completion(season_id: int) -> float:
"""Compute completion percentage for a season."""
with _get_conn() as conn:
row = conn.execute(
"""SELECT COUNT(*) as total,
SUM(CASE WHEN status IN ('completed', 'verified')
THEN 1 ELSE 0 END) as done
FROM tv_episodes WHERE season_id = ?""",
(season_id,)).fetchone()
total = row["total"] if row else 0
done = row["done"] if row else 0
return done / total if total > 0 else 0settings_get function · python · L264-L266 (3 LOC)services/tv_db.py
def settings_get(key, default=""):
from services import settings
return settings.get(key, default)TVRetryScheduler class · python · L8-L160 (153 LOC)services/tv_scheduler.py
class TVRetryScheduler:
"""Background scheduler that retries failed/stalled TV episode downloads."""
def __init__(self, socketio, torrent_manager):
self._socketio = socketio
self._torrent_manager = torrent_manager
self._running = False
self._thread = None
self._stop_event = threading.Event()
self._run_now_flag = threading.Event()
self.last_run_at = None
self.next_run_at = None
def start(self):
if self._running:
return
self._stop_event.clear()
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
def stop(self):
self._stop_event.set()
self._running = False
def run_now(self):
"""Trigger an immediate scheduler cycle."""
self._run_now_flag.set()
def get_status(self) -> dict:
return {
"running": self._running,
"last_run_at": self.Repobility — same analyzer, your code, free for public repos · /scan/
__init__ method · python · L11-L19 (9 LOC)services/tv_scheduler.py
def __init__(self, socketio, torrent_manager):
self._socketio = socketio
self._torrent_manager = torrent_manager
self._running = False
self._thread = None
self._stop_event = threading.Event()
self._run_now_flag = threading.Event()
self.last_run_at = None
self.next_run_at = Nonestart method · python · L21-L27 (7 LOC)services/tv_scheduler.py
def start(self):
if self._running:
return
self._stop_event.clear()
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()stop method · python · L29-L31 (3 LOC)services/tv_scheduler.py
def stop(self):
self._stop_event.set()
self._running = Falserun_now method · python · L33-L35 (3 LOC)services/tv_scheduler.py
def run_now(self):
"""Trigger an immediate scheduler cycle."""
self._run_now_flag.set()get_status method · python · L37-L42 (6 LOC)services/tv_scheduler.py
def get_status(self) -> dict:
return {
"running": self._running,
"last_run_at": self.last_run_at,
"next_run_at": self.next_run_at,
}_run_loop method · python · L44-L63 (20 LOC)services/tv_scheduler.py
def _run_loop(self):
# Initial delay to let the app fully start
self._stop_event.wait(10)
while not self._stop_event.is_set():
interval_hours = float(settings.get("tv_retry_interval_hours", "6"))
interval_secs = max(interval_hours * 3600, 60)
self.next_run_at = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(time.time() + interval_secs))
# Wait for interval or run_now trigger
triggered = self._run_now_flag.wait(timeout=interval_secs)
if self._stop_event.is_set():
break
if triggered:
self._run_now_flag.clear()
self._run_cycle()_run_cycle method · python · L65-L77 (13 LOC)services/tv_scheduler.py
def _run_cycle(self):
"""Execute one retry cycle."""
self.last_run_at = time.strftime("%Y-%m-%d %H:%M:%S")
try:
self._check_stalled_downloads()
except Exception:
pass
try:
self._retry_failed_episodes()
except Exception:
pass_check_stalled_downloads method · python · L79-L138 (60 LOC)services/tv_scheduler.py
def _check_stalled_downloads(self):
"""Detect and handle stalled downloads."""
stall_timeout = int(settings.get("tv_stall_timeout_minutes", "30")) * 60
downloading_eps = tv_db.get_episodes_by_status("downloading")
if not downloading_eps:
return
try:
client = qbittorrent.get_client()
except Exception:
return
for ep in downloading_eps:
if not ep.get("download_id"):
continue
from services import torrent_db
dl = torrent_db.get_download(ep["download_id"])
if not dl or not dl.get("info_hash"):
continue
try:
torrent_info = client.get_torrent(dl["info_hash"])
except Exception:
continue
if torrent_info is None:
continue
progress = torrent_info.get("progress", 0)
state = torrent_info.get("state", "")
All rows scored by the Repobility analyzer (https://repobility.com)
_retry_failed_episodes method · python · L140-L160 (21 LOC)services/tv_scheduler.py
def _retry_failed_episodes(self):
"""Re-search and download failed/retrying episodes."""
retryable = tv_db.get_episodes_by_status("failed", "retrying")
if not retryable:
return
for ep in retryable:
if self._stop_event.is_set():
break
tv_search.retry_episode(ep["id"], self._torrent_manager, self._socketio)
# Update season completion
completion = tv_db.compute_season_completion(ep["season_id"])
tv_db.update_season(ep["season_id"], completion_pct=completion)
self._socketio.emit("tv:season_completion", {
"season_id": ep["season_id"],
"completion_pct": completion,
})
time.sleep(config.TV_SEARCH_DELAY)lookup_show function · python · L8-L10 (3 LOC)services/tv_search.py
def lookup_show(query: str) -> list[dict]:
"""Search TMDB for TV shows matching query."""
return tmdb.search_shows(query)select_show function · python · L13-L44 (32 LOC)services/tv_search.py
def select_show(tmdb_id: int) -> dict | None:
"""Fetch show details from TMDB and store in DB. Returns existing if already tracked."""
existing = tv_db.get_show_by_tmdb_id(tmdb_id)
if existing:
# Refresh season list
details = tmdb.get_show_details(tmdb_id)
if details:
existing["seasons"] = details["seasons"]
else:
existing["seasons"] = []
return existing
details = tmdb.get_show_details(tmdb_id)
if not details:
return None
show_id = tv_db.add_show(
tmdb_id=details["tmdb_id"],
imdb_id=details.get("imdb_id", ""),
title=details["title"],
original_title=details.get("original_title", ""),
overview=details.get("overview", ""),
poster_path=details.get("poster_path", ""),
backdrop_path=details.get("backdrop_path", ""),
first_air_date=details.get("first_air_date", ""),
status=details.get("status", ""),
total_seasons=detaselect_seasons function · python · L47-L81 (35 LOC)services/tv_search.py
def select_seasons(show_id: int, season_numbers: list[int]) -> list[dict]:
"""Fetch episode metadata from TMDB and store seasons + episodes in DB."""
show = tv_db.get_show(show_id)
if not show:
return []
results = []
for sn in season_numbers:
episodes = tmdb.get_season_episodes(show["tmdb_id"], sn)
season_id = tv_db.add_season(
show_id=show_id,
season_number=sn,
episode_count=len(episodes),
)
for ep in episodes:
tv_db.add_episode(
season_id=season_id,
show_id=show_id,
season_number=sn,
episode_number=ep["episode_number"],
tmdb_episode_id=ep.get("tmdb_episode_id"),
title=ep.get("title", ""),
overview=ep.get("overview", ""),
air_date=ep.get("air_date", ""),
runtime=ep.get("runtime"),
still_path=ep.get("still_path", ""),
start_season_search function · python · L84-L242 (159 LOC)services/tv_search.py
def start_season_search(season_id: int, torrent_manager, socketio=None):
"""Main season search with pack-vs-individual strategy. Run in background thread."""
season = tv_db.get_season(season_id)
if not season:
return {"error": "Season not found"}
show = tv_db.get_show(season["show_id"])
if not show:
return {"error": "Show not found"}
episodes = tv_db.get_episodes(season_id)
show_title = show["title"]
sn = season["season_number"]
preference = season["pack_preference"]
quality_pref = settings.get("quality_pref", "")
min_seeders = int(settings.get("min_seeders", "3"))
pack_multiplier = int(settings.get("tv_pack_seeder_multiplier", "2"))
# Determine category
category = "tv"
if quality_pref:
prefs_lower = quality_pref.lower()
if "2160p" in prefs_lower or "4k" in prefs_lower:
category = "tv_uhd"
elif "1080p" in prefs_lower:
category = "tv_hd"
tv_db.update_seaso_search_pack function · python · L245-L269 (25 LOC)services/tv_search.py
def _search_pack(show_title: str, season_number: int, category: str) -> list[dict]:
"""Search for season pack torrents."""
queries = [
f"{show_title} S{season_number:02d}",
f"{show_title} Season {season_number}",
]
# Also try without apostrophes
clean_title = show_title.replace("'", "").replace("'", "")
if clean_title != show_title:
queries.append(f"{clean_title} S{season_number:02d}")
all_results = []
seen_hashes = set()
for q in queries:
results = torznab.search(q, category)
for r in results:
h = r.get("info_hash", "")
if h and h in seen_hashes:
continue
if h:
seen_hashes.add(h)
all_results.append(r)
time.sleep(config.TV_SEARCH_DELAY)
return all_results_search_episode function · python · L272-L284 (13 LOC)services/tv_search.py
def _search_episode(show_title: str, season_number: int,
episode_number: int, category: str) -> list[dict]:
"""Search for a single episode torrent."""
query = f"{show_title} S{season_number:02d}E{episode_number:02d}"
results = torznab.search(query, category)
# Fallback: try without special characters
clean_title = show_title.replace("'", "").replace("'", "")
if clean_title != show_title and not results:
results = torznab.search(
f"{clean_title} S{season_number:02d}E{episode_number:02d}", category)
return resultsget_season_status function · python · L287-L294 (8 LOC)services/tv_search.py
def get_season_status(season_id: int) -> dict | None:
"""Get season with episodes and computed completion."""
season = tv_db.get_season(season_id)
if not season:
return None
season["episodes"] = tv_db.get_episodes(season_id)
season["completion_pct"] = tv_db.compute_season_completion(season_id)
return seasonCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
get_show_status function · python · L297-L307 (11 LOC)services/tv_search.py
def get_show_status(show_id: int) -> dict | None:
"""Get show with all seasons and their statuses."""
show = tv_db.get_show(show_id)
if not show:
return None
seasons = tv_db.get_seasons(show_id)
for s in seasons:
s["episodes"] = tv_db.get_episodes(s["id"])
s["completion_pct"] = tv_db.compute_season_completion(s["id"])
show["seasons"] = seasons
return showretry_episode function · python · L310-L374 (65 LOC)services/tv_search.py
def retry_episode(episode_id: int, torrent_manager, socketio=None) -> dict:
"""Retry a single failed episode."""
ep = tv_db.get_episode(episode_id)
if not ep:
return {"error": "Episode not found"}
show = tv_db.get_show(ep["show_id"])
if not show:
return {"error": "Show not found"}
quality_pref = settings.get("quality_pref", "")
min_seeders = int(settings.get("min_seeders", "3"))
category = "tv"
if quality_pref:
prefs_lower = quality_pref.lower()
if "2160p" in prefs_lower:
category = "tv_uhd"
elif "1080p" in prefs_lower:
category = "tv_hd"
results = _search_episode(show["title"], ep["season_number"],
ep["episode_number"], category)
ranked = torznab.rank_results(results, quality_pref, min_seeders)
tv_db.update_episode(episode_id,
retry_count=ep["retry_count"] + 1,
last_retry_at=time.strftime("%Y_get_ollama_config function · python · L16-L21 (6 LOC)services/verify.py
def _get_ollama_config() -> dict:
return {
"url": settings.get("ollama_url",
"http://ollama.ollamaserver-shared.svc.cluster.local:11434"),
"model": settings.get("ollama_model", "gemma3:12b"),
}