← back to drewscreations__archiving-app

Function bodies 212 total

All specs Real LLM only Function bodies
format_size function · python · L215-L225 (11 LOC)
services/torznab.py
def format_size(size_bytes: int) -> str:
    """Format bytes as human-readable string."""
    if size_bytes < 1024:
        return f"{size_bytes} B"
    if size_bytes < 1024 ** 2:
        return f"{size_bytes / 1024:.1f} KB"
    if size_bytes < 1024 ** 3:
        return f"{size_bytes / 1024 ** 2:.1f} MB"
    if size_bytes < 1024 ** 4:
        return f"{size_bytes / 1024 ** 3:.2f} GB"
    return f"{size_bytes / 1024 ** 4:.2f} TB"
TransferManager class · python · L12-L319 (308 LOC)
services/transfer.py
class TransferManager:
    """Manages file transfer operations with progress tracking and cancellation."""

    def __init__(self, socketio):
        self.socketio = socketio
        self.active_transfer = None
        self._cancel_event = threading.Event()
        self._lock = threading.Lock()
        self._processing_queue = False

    @property
    def is_active(self):
        return self.active_transfer is not None

    def cancel(self):
        self._cancel_event.set()

    def start_transfer(self, src_dir, dst_dir, archive_dir, folder_name,
                       dest_name, selected_items, history_id=None):
        """Start a copy+move transfer in a background thread."""
        with self._lock:
            if self.is_active:
                return {"error": "A transfer is already in progress"}

            self._cancel_event.clear()
            transfer_id = f"transfer_{int(time.time())}"
            self.active_transfer = {
                "id": transfer_id,
                "fo
__init__ method · python · L15-L20 (6 LOC)
services/transfer.py
    def __init__(self, socketio):
        self.socketio = socketio
        self.active_transfer = None
        self._cancel_event = threading.Event()
        self._lock = threading.Lock()
        self._processing_queue = False
start_transfer method · python · L29-L54 (26 LOC)
services/transfer.py
    def start_transfer(self, src_dir, dst_dir, archive_dir, folder_name,
                       dest_name, selected_items, history_id=None):
        """Start a copy+move transfer in a background thread."""
        with self._lock:
            if self.is_active:
                return {"error": "A transfer is already in progress"}

            self._cancel_event.clear()
            transfer_id = f"transfer_{int(time.time())}"
            self.active_transfer = {
                "id": transfer_id,
                "folder": folder_name,
                "dest_name": dest_name,
                "items": selected_items,
                "started_at": time.time(),
                "history_id": history_id,
            }

        thread = threading.Thread(
            target=self._run_transfer,
            args=(transfer_id, src_dir, dst_dir, archive_dir,
                  folder_name, dest_name, selected_items, history_id),
            daemon=True,
        )
        thread.start()
        return
process_queue method · python · L56-L71 (16 LOC)
services/transfer.py
    def process_queue(self, src_dir, dst_dir, archive_dir):
        """Process all items in the transfer queue sequentially."""
        from services import queue as queue_svc

        if self._processing_queue:
            return {"error": "Queue is already being processed"}

        self._processing_queue = True

        thread = threading.Thread(
            target=self._run_queue,
            args=(src_dir, dst_dir, archive_dir),
            daemon=True,
        )
        thread.start()
        return {"status": "queue_processing_started"}
_run_queue method · python · L73-L117 (45 LOC)
services/transfer.py
    def _run_queue(self, src_dir, dst_dir, archive_dir):
        """Process queue items one by one."""
        from services import queue as queue_svc

        try:
            while True:
                item = queue_svc.pop_first()
                if item is None:
                    break

                # Wait for any active transfer to finish
                while self.is_active:
                    time.sleep(0.5)

                if self._cancel_event.is_set():
                    self._cancel_event.clear()
                    self.socketio.emit("queue:stopped", {"message": "Queue processing stopped"})
                    break

                # Calculate total bytes for history
                total_bytes = 0
                src_folder = os.path.join(src_dir, item["source_folder"])
                for name in item["items"]:
                    item_path = os.path.join(src_folder, name)
                    if os.path.exists(item_path):
                        total_bytes += ge
_emit_log method · python · L119-L126 (8 LOC)
services/transfer.py
    def _emit_log(self, transfer_id, message, level="info"):
        ts = time.strftime("%H:%M:%S")
        self.socketio.emit("transfer:log", {
            "id": transfer_id,
            "timestamp": ts,
            "message": message,
            "level": level,
        })
All rows scored by the Repobility analyzer (https://repobility.com)
_emit_progress method · python · L128-L138 (11 LOC)
services/transfer.py
    def _emit_progress(self, transfer_id, current_file, bytes_copied,
                       bytes_total, speed_bps, eta_seconds, percent):
        self.socketio.emit("transfer:progress", {
            "id": transfer_id,
            "current_file": current_file,
            "bytes_copied": bytes_copied,
            "bytes_total": bytes_total,
            "speed_bps": speed_bps,
            "eta_seconds": eta_seconds,
            "percent": percent,
        })
_emit_complete method · python · L140-L162 (23 LOC)
services/transfer.py
    def _emit_complete(self, transfer_id, success, message, duration,
                       history_id=None, archive_folder=None):
        self.socketio.emit("transfer:complete", {
            "id": transfer_id,
            "success": success,
            "message": message,
            "duration": round(duration, 1),
        })

        # Update history entry
        if history_id:
            try:
                history.complete_entry(
                    history_id, success,
                    archive_folder=archive_folder,
                    duration_sec=duration,
                    error_message=message if not success else None,
                )
            except Exception:
                pass

        with self._lock:
            self.active_transfer = None
_run_transfer method · python · L164-L298 (135 LOC)
services/transfer.py
    def _run_transfer(self, transfer_id, src_dir, dst_dir, archive_dir,
                      folder_name, dest_name, selected_items, history_id=None):
        """Worker thread: copy selected items then move source to archive."""
        start_time = time.time()
        errors = []

        src_folder = os.path.join(src_dir, folder_name)
        dst_folder = unique_path(os.path.join(dst_dir, dest_name))
        arc_folder = unique_path(os.path.join(archive_dir, folder_name))

        self._emit_log(transfer_id, "---- START ----")

        # Create destination folder
        try:
            os.makedirs(dst_folder, exist_ok=False)
        except Exception as e:
            self._emit_log(transfer_id,
                           f"Failed to create destination folder: {dst_folder}\n{e}",
                           "error")
            self._emit_complete(transfer_id, False,
                                "Failed to create destination folder.",
                                time.time() -
_copy_dir_chunked method · python · L300-L319 (20 LOC)
services/transfer.py
    def _copy_dir_chunked(self, src_dir, dst_dir, on_chunk):
        """Copy a directory tree with chunked file copies for progress tracking."""
        dst_dir = unique_path(dst_dir)
        for dirpath, dirnames, filenames in os.walk(src_dir):
            rel = os.path.relpath(dirpath, src_dir)
            dst_subdir = os.path.join(dst_dir, rel)
            os.makedirs(dst_subdir, exist_ok=True)

            try:
                shutil.copystat(dirpath, dst_subdir)
            except OSError:
                pass

            for filename in filenames:
                if self._cancel_event.is_set():
                    raise InterruptedError("Transfer cancelled")

                src_file = os.path.join(dirpath, filename)
                dst_file = os.path.join(dst_subdir, filename)
                copy_file_chunked(src_file, dst_file, on_chunk)
_get_conn function · python · L6-L9 (4 LOC)
services/tv_db.py
def _get_conn():
    conn = sqlite3.connect(config.DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn
init_db function · python · L12-L78 (67 LOC)
services/tv_db.py
def init_db():
    """Create TV-related tables if they don't exist."""
    with _get_conn() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS tv_shows (
                id              INTEGER PRIMARY KEY AUTOINCREMENT,
                tmdb_id         INTEGER NOT NULL UNIQUE,
                imdb_id         TEXT,
                title           TEXT NOT NULL,
                original_title  TEXT,
                overview        TEXT,
                poster_path     TEXT,
                backdrop_path   TEXT,
                first_air_date  TEXT,
                status          TEXT,
                total_seasons   INTEGER NOT NULL DEFAULT 0,
                created_at      TEXT NOT NULL
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS tv_seasons (
                id              INTEGER PRIMARY KEY AUTOINCREMENT,
                show_id         INTEGER NOT NULL,
                tmdb_season_id  INTEGER,
                sea
add_show function · python · L83-L100 (18 LOC)
services/tv_db.py
def add_show(tmdb_id: int, imdb_id: str = "", title: str = "",
             original_title: str = "", overview: str = "",
             poster_path: str = "", backdrop_path: str = "",
             first_air_date: str = "", status: str = "",
             total_seasons: int = 0) -> int:
    with _get_conn() as conn:
        cursor = conn.execute(
            """INSERT INTO tv_shows
               (tmdb_id, imdb_id, title, original_title, overview,
                poster_path, backdrop_path, first_air_date, status,
                total_seasons, created_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (tmdb_id, imdb_id, title, original_title, overview,
             poster_path, backdrop_path, first_air_date, status,
             total_seasons, time.strftime("%Y-%m-%d %H:%M:%S")),
        )
        conn.commit()
        return cursor.lastrowid
get_show function · python · L103-L107 (5 LOC)
services/tv_db.py
def get_show(show_id: int) -> dict | None:
    with _get_conn() as conn:
        row = conn.execute("SELECT * FROM tv_shows WHERE id = ?",
                           (show_id,)).fetchone()
        return dict(row) if row else None
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
get_show_by_tmdb_id function · python · L110-L114 (5 LOC)
services/tv_db.py
def get_show_by_tmdb_id(tmdb_id: int) -> dict | None:
    with _get_conn() as conn:
        row = conn.execute("SELECT * FROM tv_shows WHERE tmdb_id = ?",
                           (tmdb_id,)).fetchone()
        return dict(row) if row else None
get_shows function · python · L117-L121 (5 LOC)
services/tv_db.py
def get_shows() -> list[dict]:
    with _get_conn() as conn:
        rows = conn.execute(
            "SELECT * FROM tv_shows ORDER BY id DESC").fetchall()
        return [dict(row) for row in rows]
add_season function · python · L126-L141 (16 LOC)
services/tv_db.py
def add_season(show_id: int, season_number: int, episode_count: int = 0,
               tmdb_season_id: int = None, name: str = "",
               overview: str = "", air_date: str = "",
               poster_path: str = "") -> int:
    with _get_conn() as conn:
        cursor = conn.execute(
            """INSERT INTO tv_seasons
               (show_id, tmdb_season_id, season_number, episode_count,
                name, overview, air_date, poster_path, created_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (show_id, tmdb_season_id, season_number, episode_count,
             name, overview, air_date, poster_path,
             time.strftime("%Y-%m-%d %H:%M:%S")),
        )
        conn.commit()
        return cursor.lastrowid
update_season function · python · L144-L151 (8 LOC)
services/tv_db.py
def update_season(season_id: int, **kwargs):
    if not kwargs:
        return
    cols = ", ".join(f"{k} = ?" for k in kwargs)
    vals = list(kwargs.values()) + [season_id]
    with _get_conn() as conn:
        conn.execute(f"UPDATE tv_seasons SET {cols} WHERE id = ?", vals)
        conn.commit()
get_season function · python · L154-L158 (5 LOC)
services/tv_db.py
def get_season(season_id: int) -> dict | None:
    with _get_conn() as conn:
        row = conn.execute("SELECT * FROM tv_seasons WHERE id = ?",
                           (season_id,)).fetchone()
        return dict(row) if row else None
get_seasons function · python · L161-L166 (6 LOC)
services/tv_db.py
def get_seasons(show_id: int) -> list[dict]:
    with _get_conn() as conn:
        rows = conn.execute(
            "SELECT * FROM tv_seasons WHERE show_id = ? ORDER BY season_number",
            (show_id,)).fetchall()
        return [dict(row) for row in rows]
add_episode function · python · L171-L188 (18 LOC)
services/tv_db.py
def add_episode(season_id: int, show_id: int, season_number: int,
                episode_number: int, tmdb_episode_id: int = None,
                title: str = "", overview: str = "", air_date: str = "",
                runtime: int = None, still_path: str = "") -> int:
    max_retries = int(settings_get("tv_max_retries", "10"))
    with _get_conn() as conn:
        cursor = conn.execute(
            """INSERT INTO tv_episodes
               (season_id, show_id, tmdb_episode_id, season_number,
                episode_number, title, overview, air_date, runtime,
                still_path, max_retries, created_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (season_id, show_id, tmdb_episode_id, season_number,
             episode_number, title, overview, air_date, runtime,
             still_path, max_retries, time.strftime("%Y-%m-%d %H:%M:%S")),
        )
        conn.commit()
        return cursor.lastrowid
update_episode function · python · L191-L198 (8 LOC)
services/tv_db.py
def update_episode(episode_id: int, **kwargs):
    if not kwargs:
        return
    cols = ", ".join(f"{k} = ?" for k in kwargs)
    vals = list(kwargs.values()) + [episode_id]
    with _get_conn() as conn:
        conn.execute(f"UPDATE tv_episodes SET {cols} WHERE id = ?", vals)
        conn.commit()
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
get_episode function · python · L201-L205 (5 LOC)
services/tv_db.py
def get_episode(episode_id: int) -> dict | None:
    with _get_conn() as conn:
        row = conn.execute("SELECT * FROM tv_episodes WHERE id = ?",
                           (episode_id,)).fetchone()
        return dict(row) if row else None
get_episodes function · python · L208-L214 (7 LOC)
services/tv_db.py
def get_episodes(season_id: int) -> list[dict]:
    with _get_conn() as conn:
        rows = conn.execute(
            """SELECT * FROM tv_episodes
               WHERE season_id = ? ORDER BY episode_number""",
            (season_id,)).fetchall()
        return [dict(row) for row in rows]
get_episodes_by_status function · python · L217-L230 (14 LOC)
services/tv_db.py
def get_episodes_by_status(*statuses) -> list[dict]:
    """Get episodes matching any of the given statuses with retry budget."""
    placeholders = ",".join("?" for _ in statuses)
    with _get_conn() as conn:
        rows = conn.execute(
            f"""SELECT e.*, s.title as show_title
                FROM tv_episodes e
                JOIN tv_shows s ON e.show_id = s.id
                WHERE e.status IN ({placeholders})
                AND e.retry_count < e.max_retries
                ORDER BY e.id""",
            statuses,
        ).fetchall()
        return [dict(row) for row in rows]
get_episodes_by_download_id function · python · L233-L238 (6 LOC)
services/tv_db.py
def get_episodes_by_download_id(download_id: int) -> list[dict]:
    with _get_conn() as conn:
        rows = conn.execute(
            "SELECT * FROM tv_episodes WHERE download_id = ?",
            (download_id,)).fetchall()
        return [dict(row) for row in rows]
get_seasons_by_pack_download_id function · python · L241-L246 (6 LOC)
services/tv_db.py
def get_seasons_by_pack_download_id(download_id: int) -> list[dict]:
    with _get_conn() as conn:
        rows = conn.execute(
            "SELECT * FROM tv_seasons WHERE pack_download_id = ?",
            (download_id,)).fetchall()
        return [dict(row) for row in rows]
compute_season_completion function · python · L249-L260 (12 LOC)
services/tv_db.py
def compute_season_completion(season_id: int) -> float:
    """Compute completion percentage for a season."""
    with _get_conn() as conn:
        row = conn.execute(
            """SELECT COUNT(*) as total,
                      SUM(CASE WHEN status IN ('completed', 'verified')
                          THEN 1 ELSE 0 END) as done
               FROM tv_episodes WHERE season_id = ?""",
            (season_id,)).fetchone()
        total = row["total"] if row else 0
        done = row["done"] if row else 0
        return done / total if total > 0 else 0
settings_get function · python · L264-L266 (3 LOC)
services/tv_db.py
def settings_get(key, default=""):
    from services import settings
    return settings.get(key, default)
TVRetryScheduler class · python · L8-L160 (153 LOC)
services/tv_scheduler.py
class TVRetryScheduler:
    """Background scheduler that retries failed/stalled TV episode downloads."""

    def __init__(self, socketio, torrent_manager):
        self._socketio = socketio
        self._torrent_manager = torrent_manager
        self._running = False
        self._thread = None
        self._stop_event = threading.Event()
        self._run_now_flag = threading.Event()
        self.last_run_at = None
        self.next_run_at = None

    def start(self):
        if self._running:
            return
        self._stop_event.clear()
        self._running = True
        self._thread = threading.Thread(target=self._run_loop, daemon=True)
        self._thread.start()

    def stop(self):
        self._stop_event.set()
        self._running = False

    def run_now(self):
        """Trigger an immediate scheduler cycle."""
        self._run_now_flag.set()

    def get_status(self) -> dict:
        return {
            "running": self._running,
            "last_run_at": self.
Repobility — same analyzer, your code, free for public repos · /scan/
__init__ method · python · L11-L19 (9 LOC)
services/tv_scheduler.py
    def __init__(self, socketio, torrent_manager):
        self._socketio = socketio
        self._torrent_manager = torrent_manager
        self._running = False
        self._thread = None
        self._stop_event = threading.Event()
        self._run_now_flag = threading.Event()
        self.last_run_at = None
        self.next_run_at = None
start method · python · L21-L27 (7 LOC)
services/tv_scheduler.py
    def start(self):
        if self._running:
            return
        self._stop_event.clear()
        self._running = True
        self._thread = threading.Thread(target=self._run_loop, daemon=True)
        self._thread.start()
stop method · python · L29-L31 (3 LOC)
services/tv_scheduler.py
    def stop(self):
        self._stop_event.set()
        self._running = False
run_now method · python · L33-L35 (3 LOC)
services/tv_scheduler.py
    def run_now(self):
        """Trigger an immediate scheduler cycle."""
        self._run_now_flag.set()
get_status method · python · L37-L42 (6 LOC)
services/tv_scheduler.py
    def get_status(self) -> dict:
        return {
            "running": self._running,
            "last_run_at": self.last_run_at,
            "next_run_at": self.next_run_at,
        }
_run_loop method · python · L44-L63 (20 LOC)
services/tv_scheduler.py
    def _run_loop(self):
        # Initial delay to let the app fully start
        self._stop_event.wait(10)

        while not self._stop_event.is_set():
            interval_hours = float(settings.get("tv_retry_interval_hours", "6"))
            interval_secs = max(interval_hours * 3600, 60)

            self.next_run_at = time.strftime(
                "%Y-%m-%d %H:%M:%S",
                time.localtime(time.time() + interval_secs))

            # Wait for interval or run_now trigger
            triggered = self._run_now_flag.wait(timeout=interval_secs)
            if self._stop_event.is_set():
                break
            if triggered:
                self._run_now_flag.clear()

            self._run_cycle()
_run_cycle method · python · L65-L77 (13 LOC)
services/tv_scheduler.py
    def _run_cycle(self):
        """Execute one retry cycle."""
        self.last_run_at = time.strftime("%Y-%m-%d %H:%M:%S")

        try:
            self._check_stalled_downloads()
        except Exception:
            pass

        try:
            self._retry_failed_episodes()
        except Exception:
            pass
_check_stalled_downloads method · python · L79-L138 (60 LOC)
services/tv_scheduler.py
    def _check_stalled_downloads(self):
        """Detect and handle stalled downloads."""
        stall_timeout = int(settings.get("tv_stall_timeout_minutes", "30")) * 60

        downloading_eps = tv_db.get_episodes_by_status("downloading")
        if not downloading_eps:
            return

        try:
            client = qbittorrent.get_client()
        except Exception:
            return

        for ep in downloading_eps:
            if not ep.get("download_id"):
                continue

            from services import torrent_db
            dl = torrent_db.get_download(ep["download_id"])
            if not dl or not dl.get("info_hash"):
                continue

            try:
                torrent_info = client.get_torrent(dl["info_hash"])
            except Exception:
                continue

            if torrent_info is None:
                continue

            progress = torrent_info.get("progress", 0)
            state = torrent_info.get("state", "")

        
All rows scored by the Repobility analyzer (https://repobility.com)
_retry_failed_episodes method · python · L140-L160 (21 LOC)
services/tv_scheduler.py
    def _retry_failed_episodes(self):
        """Re-search and download failed/retrying episodes."""
        retryable = tv_db.get_episodes_by_status("failed", "retrying")
        if not retryable:
            return

        for ep in retryable:
            if self._stop_event.is_set():
                break

            tv_search.retry_episode(ep["id"], self._torrent_manager, self._socketio)

            # Update season completion
            completion = tv_db.compute_season_completion(ep["season_id"])
            tv_db.update_season(ep["season_id"], completion_pct=completion)
            self._socketio.emit("tv:season_completion", {
                "season_id": ep["season_id"],
                "completion_pct": completion,
            })

            time.sleep(config.TV_SEARCH_DELAY)
lookup_show function · python · L8-L10 (3 LOC)
services/tv_search.py
def lookup_show(query: str) -> list[dict]:
    """Search TMDB for TV shows matching query."""
    return tmdb.search_shows(query)
select_show function · python · L13-L44 (32 LOC)
services/tv_search.py
def select_show(tmdb_id: int) -> dict | None:
    """Fetch show details from TMDB and store in DB. Returns existing if already tracked."""
    existing = tv_db.get_show_by_tmdb_id(tmdb_id)
    if existing:
        # Refresh season list
        details = tmdb.get_show_details(tmdb_id)
        if details:
            existing["seasons"] = details["seasons"]
        else:
            existing["seasons"] = []
        return existing

    details = tmdb.get_show_details(tmdb_id)
    if not details:
        return None

    show_id = tv_db.add_show(
        tmdb_id=details["tmdb_id"],
        imdb_id=details.get("imdb_id", ""),
        title=details["title"],
        original_title=details.get("original_title", ""),
        overview=details.get("overview", ""),
        poster_path=details.get("poster_path", ""),
        backdrop_path=details.get("backdrop_path", ""),
        first_air_date=details.get("first_air_date", ""),
        status=details.get("status", ""),
        total_seasons=deta
select_seasons function · python · L47-L81 (35 LOC)
services/tv_search.py
def select_seasons(show_id: int, season_numbers: list[int]) -> list[dict]:
    """Fetch episode metadata from TMDB and store seasons + episodes in DB."""
    show = tv_db.get_show(show_id)
    if not show:
        return []

    results = []
    for sn in season_numbers:
        episodes = tmdb.get_season_episodes(show["tmdb_id"], sn)

        season_id = tv_db.add_season(
            show_id=show_id,
            season_number=sn,
            episode_count=len(episodes),
        )

        for ep in episodes:
            tv_db.add_episode(
                season_id=season_id,
                show_id=show_id,
                season_number=sn,
                episode_number=ep["episode_number"],
                tmdb_episode_id=ep.get("tmdb_episode_id"),
                title=ep.get("title", ""),
                overview=ep.get("overview", ""),
                air_date=ep.get("air_date", ""),
                runtime=ep.get("runtime"),
                still_path=ep.get("still_path", ""),
 
start_season_search function · python · L84-L242 (159 LOC)
services/tv_search.py
def start_season_search(season_id: int, torrent_manager, socketio=None):
    """Main season search with pack-vs-individual strategy. Run in background thread."""
    season = tv_db.get_season(season_id)
    if not season:
        return {"error": "Season not found"}

    show = tv_db.get_show(season["show_id"])
    if not show:
        return {"error": "Show not found"}

    episodes = tv_db.get_episodes(season_id)
    show_title = show["title"]
    sn = season["season_number"]
    preference = season["pack_preference"]

    quality_pref = settings.get("quality_pref", "")
    min_seeders = int(settings.get("min_seeders", "3"))
    pack_multiplier = int(settings.get("tv_pack_seeder_multiplier", "2"))

    # Determine category
    category = "tv"
    if quality_pref:
        prefs_lower = quality_pref.lower()
        if "2160p" in prefs_lower or "4k" in prefs_lower:
            category = "tv_uhd"
        elif "1080p" in prefs_lower:
            category = "tv_hd"

    tv_db.update_seaso
_search_pack function · python · L245-L269 (25 LOC)
services/tv_search.py
def _search_pack(show_title: str, season_number: int, category: str) -> list[dict]:
    """Search for season pack torrents."""
    queries = [
        f"{show_title} S{season_number:02d}",
        f"{show_title} Season {season_number}",
    ]
    # Also try without apostrophes
    clean_title = show_title.replace("'", "").replace("'", "")
    if clean_title != show_title:
        queries.append(f"{clean_title} S{season_number:02d}")

    all_results = []
    seen_hashes = set()
    for q in queries:
        results = torznab.search(q, category)
        for r in results:
            h = r.get("info_hash", "")
            if h and h in seen_hashes:
                continue
            if h:
                seen_hashes.add(h)
            all_results.append(r)
        time.sleep(config.TV_SEARCH_DELAY)

    return all_results
_search_episode function · python · L272-L284 (13 LOC)
services/tv_search.py
def _search_episode(show_title: str, season_number: int,
                    episode_number: int, category: str) -> list[dict]:
    """Search for a single episode torrent."""
    query = f"{show_title} S{season_number:02d}E{episode_number:02d}"
    results = torznab.search(query, category)

    # Fallback: try without special characters
    clean_title = show_title.replace("'", "").replace("'", "")
    if clean_title != show_title and not results:
        results = torznab.search(
            f"{clean_title} S{season_number:02d}E{episode_number:02d}", category)

    return results
get_season_status function · python · L287-L294 (8 LOC)
services/tv_search.py
def get_season_status(season_id: int) -> dict | None:
    """Get season with episodes and computed completion."""
    season = tv_db.get_season(season_id)
    if not season:
        return None
    season["episodes"] = tv_db.get_episodes(season_id)
    season["completion_pct"] = tv_db.compute_season_completion(season_id)
    return season
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
get_show_status function · python · L297-L307 (11 LOC)
services/tv_search.py
def get_show_status(show_id: int) -> dict | None:
    """Get show with all seasons and their statuses."""
    show = tv_db.get_show(show_id)
    if not show:
        return None
    seasons = tv_db.get_seasons(show_id)
    for s in seasons:
        s["episodes"] = tv_db.get_episodes(s["id"])
        s["completion_pct"] = tv_db.compute_season_completion(s["id"])
    show["seasons"] = seasons
    return show
retry_episode function · python · L310-L374 (65 LOC)
services/tv_search.py
def retry_episode(episode_id: int, torrent_manager, socketio=None) -> dict:
    """Retry a single failed episode."""
    ep = tv_db.get_episode(episode_id)
    if not ep:
        return {"error": "Episode not found"}

    show = tv_db.get_show(ep["show_id"])
    if not show:
        return {"error": "Show not found"}

    quality_pref = settings.get("quality_pref", "")
    min_seeders = int(settings.get("min_seeders", "3"))
    category = "tv"
    if quality_pref:
        prefs_lower = quality_pref.lower()
        if "2160p" in prefs_lower:
            category = "tv_uhd"
        elif "1080p" in prefs_lower:
            category = "tv_hd"

    results = _search_episode(show["title"], ep["season_number"],
                              ep["episode_number"], category)
    ranked = torznab.rank_results(results, quality_pref, min_seeders)

    tv_db.update_episode(episode_id,
                         retry_count=ep["retry_count"] + 1,
                         last_retry_at=time.strftime("%Y
_get_ollama_config function · python · L16-L21 (6 LOC)
services/verify.py
def _get_ollama_config() -> dict:
    return {
        "url": settings.get("ollama_url",
                            "http://ollama.ollamaserver-shared.svc.cluster.local:11434"),
        "model": settings.get("ollama_model", "gemma3:12b"),
    }
‹ prevpage 4 / 5next ›