← back to drewscreations__archiving-app

Function bodies 212 total

All specs Real LLM only Function bodies
reorder function · python · L86-L94 (9 LOC)
services/queue.py
def reorder(ordered_ids: list[int]):
    """Reorder queue items by providing IDs in desired order."""
    with _get_conn() as conn:
        for pos, entry_id in enumerate(ordered_ids, start=1):
            conn.execute(
                "UPDATE transfer_queue SET position = ? WHERE id = ?",
                (pos, entry_id),
            )
        conn.commit()
clear function · python · L97-L101 (5 LOC)
services/queue.py
def clear():
    """Remove all items from the queue."""
    with _get_conn() as conn:
        conn.execute("DELETE FROM transfer_queue")
        conn.commit()
count function · python · L104-L107 (4 LOC)
services/queue.py
def count() -> int:
    with _get_conn() as conn:
        row = conn.execute("SELECT COUNT(*) AS cnt FROM transfer_queue").fetchone()
        return row["cnt"]
_reindex function · python · L110-L120 (11 LOC)
services/queue.py
def _reindex(conn):
    """Re-number positions to be sequential after deletions."""
    rows = conn.execute(
        "SELECT id FROM transfer_queue ORDER BY position ASC"
    ).fetchall()
    for pos, row in enumerate(rows, start=1):
        conn.execute(
            "UPDATE transfer_queue SET position = ? WHERE id = ?",
            (pos, row["id"]),
        )
    conn.commit()
create_search_session function · python · L4-L26 (23 LOC)
services/search.py
def create_search_session(query: str, category: str = "movies") -> dict:
    """Create a search session: store in DB, search Torznab, return results."""
    search_id = torrent_db.add_search(query, category)

    quality_pref = settings.get("quality_pref", "")
    min_seeders = int(settings.get("min_seeders", "3"))

    raw_results = torznab.search(query, category)
    ranked = torznab.rank_results(raw_results, quality_pref, min_seeders)

    torrent_db.update_search(search_id, result_count=len(ranked), status="done")

    # Add formatted size to each result
    for r in ranked:
        r["size_formatted"] = torznab.format_size(r["size"])

    return {
        "search_id": search_id,
        "query": query,
        "category": category,
        "results": ranked,
        "total": len(ranked),
    }
get_search_history function · python · L29-L31 (3 LOC)
services/search.py
def get_search_history(limit: int = 20) -> list[dict]:
    """Get recent search sessions."""
    return torrent_db.get_searches(limit)
_get_conn function · python · L5-L8 (4 LOC)
services/settings.py
def _get_conn():
    conn = sqlite3.connect(config.DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn
Repobility (the analyzer behind this table) · https://repobility.com
init_db function · python · L11-L20 (10 LOC)
services/settings.py
def init_db():
    """Create settings table if it doesn't exist."""
    with _get_conn() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS settings (
                key   TEXT PRIMARY KEY,
                value TEXT NOT NULL
            )
        """)
        conn.commit()
get function · python · L23-L28 (6 LOC)
services/settings.py
def get(key: str, default: str = "") -> str:
    with _get_conn() as conn:
        row = conn.execute(
            "SELECT value FROM settings WHERE key = ?", (key,)
        ).fetchone()
        return row["value"] if row else default
set function · python · L31-L37 (7 LOC)
services/settings.py
def set(key: str, value: str):
    with _get_conn() as conn:
        conn.execute(
            "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
            (key, value),
        )
        conn.commit()
get_all function · python · L40-L43 (4 LOC)
services/settings.py
def get_all() -> dict:
    with _get_conn() as conn:
        rows = conn.execute("SELECT key, value FROM settings").fetchall()
        return {row["key"]: row["value"] for row in rows}
get_directories function · python · L46-L52 (7 LOC)
services/settings.py
def get_directories() -> dict:
    """Get saved directory paths."""
    return {
        "src_dir": get("src_dir"),
        "dst_dir": get("dst_dir"),
        "archive_dir": get("archive_dir"),
    }
save_directories function · python · L55-L59 (5 LOC)
services/settings.py
def save_directories(src_dir: str, dst_dir: str, archive_dir: str):
    """Save directory paths."""
    set("src_dir", src_dir)
    set("dst_dir", dst_dir)
    set("archive_dir", archive_dir)
_poster_url function · python · L18-L21 (4 LOC)
services/tmdb.py
def _poster_url(path: str, size: str = "w200") -> str:
    if not path:
        return ""
    return f"{TMDB_IMG_BASE}/{size}{path}"
test_tmdb_connection function · python · L24-L34 (11 LOC)
services/tmdb.py
def test_tmdb_connection() -> dict:
    key = _get_tmdb_key()
    if not key:
        return {"ok": False, "message": "TMDB API key not configured"}
    try:
        resp = requests.get(f"{TMDB_BASE}/configuration",
                            params={"api_key": key}, timeout=10)
        resp.raise_for_status()
        return {"ok": True, "message": "Connected to TMDB"}
    except requests.RequestException as e:
        return {"ok": False, "message": str(e)}
Powered by Repobility — scan your code at https://repobility.com
search_shows function · python · L37-L63 (27 LOC)
services/tmdb.py
def search_shows(query: str) -> list[dict]:
    """Search TMDB for TV shows matching query."""
    key = _get_tmdb_key()
    if not key:
        return _omdb_search_fallback(query)

    try:
        resp = requests.get(f"{TMDB_BASE}/search/tv",
                            params={"api_key": key, "query": query, "page": 1},
                            timeout=15)
        resp.raise_for_status()
        data = resp.json()
    except requests.RequestException:
        return _omdb_search_fallback(query)

    results = []
    for show in data.get("results", [])[:20]:
        results.append({
            "tmdb_id": show["id"],
            "title": show.get("name", ""),
            "original_title": show.get("original_name", ""),
            "overview": (show.get("overview", "") or "")[:200],
            "poster_path": _poster_url(show.get("poster_path")),
            "first_air_date": show.get("first_air_date", ""),
            "vote_average": show.get("vote_average", 0),
        })
    ret
get_show_details function · python · L66-L114 (49 LOC)
services/tmdb.py
def get_show_details(tmdb_id: int) -> dict | None:
    """Get full show details including season list."""
    key = _get_tmdb_key()
    if not key:
        return None

    try:
        resp = requests.get(
            f"{TMDB_BASE}/tv/{tmdb_id}",
            params={"api_key": key, "append_to_response": "external_ids"},
            timeout=15,
        )
        resp.raise_for_status()
        data = resp.json()
    except requests.RequestException:
        return None

    imdb_id = ""
    ext = data.get("external_ids", {})
    if ext:
        imdb_id = ext.get("imdb_id", "") or ""

    seasons = []
    for s in data.get("seasons", []):
        if s.get("season_number", 0) == 0:
            continue  # Skip specials
        seasons.append({
            "tmdb_season_id": s.get("id"),
            "season_number": s["season_number"],
            "episode_count": s.get("episode_count", 0),
            "name": s.get("name", ""),
            "overview": (s.get("overview", "") or "")[:200],
get_season_episodes function · python · L117-L145 (29 LOC)
services/tmdb.py
def get_season_episodes(tmdb_id: int, season_number: int) -> list[dict]:
    """Get all episodes for a specific season."""
    key = _get_tmdb_key()
    if not key:
        return _omdb_episodes_fallback(tmdb_id, season_number)

    try:
        resp = requests.get(
            f"{TMDB_BASE}/tv/{tmdb_id}/season/{season_number}",
            params={"api_key": key},
            timeout=15,
        )
        resp.raise_for_status()
        data = resp.json()
    except requests.RequestException:
        return []

    episodes = []
    for ep in data.get("episodes", []):
        episodes.append({
            "tmdb_episode_id": ep.get("id"),
            "episode_number": ep["episode_number"],
            "title": ep.get("name", ""),
            "overview": (ep.get("overview", "") or "")[:200],
            "air_date": ep.get("air_date", ""),
            "runtime": ep.get("runtime"),
            "still_path": _poster_url(ep.get("still_path"), "w300"),
        })
    return episodes
_omdb_search_fallback function · python · L150-L177 (28 LOC)
services/tmdb.py
def _omdb_search_fallback(query: str) -> list[dict]:
    """Fallback to OMDb for show search."""
    key = _get_omdb_key()
    if not key:
        return []
    try:
        resp = requests.get(OMDB_BASE,
                            params={"apikey": key, "s": query, "type": "series"},
                            timeout=10)
        resp.raise_for_status()
        data = resp.json()
        if data.get("Response") != "True":
            return []
        results = []
        for item in data.get("Search", [])[:20]:
            results.append({
                "tmdb_id": 0,
                "imdb_id": item.get("imdbID", ""),
                "title": item.get("Title", ""),
                "original_title": item.get("Title", ""),
                "overview": "",
                "poster_path": item.get("Poster", "") if item.get("Poster") != "N/A" else "",
                "first_air_date": item.get("Year", ""),
                "vote_average": 0,
            })
        return results
    excep
_omdb_episodes_fallback function · python · L180-L182 (3 LOC)
services/tmdb.py
def _omdb_episodes_fallback(tmdb_id: int, season_number: int) -> list[dict]:
    """OMDb can't provide episode details without an IMDB ID, return empty."""
    return []
_get_conn function · python · L7-L10 (4 LOC)
services/torrent_db.py
def _get_conn():
    conn = sqlite3.connect(config.DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn
init_db function · python · L13-L64 (52 LOC)
services/torrent_db.py
def init_db():
    """Create torrent-related tables if they don't exist."""
    with _get_conn() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS torrent_searches (
                id           INTEGER PRIMARY KEY AUTOINCREMENT,
                query        TEXT NOT NULL,
                category     TEXT NOT NULL DEFAULT 'movies',
                created_at   TEXT NOT NULL,
                result_count INTEGER NOT NULL DEFAULT 0,
                status       TEXT NOT NULL DEFAULT 'pending'
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS torrent_downloads (
                id                  INTEGER PRIMARY KEY AUTOINCREMENT,
                search_id           INTEGER,
                torrent_title       TEXT NOT NULL,
                expected_title      TEXT NOT NULL,
                magnet_or_url       TEXT NOT NULL,
                info_hash           TEXT,
                size_bytes          INTEGER NOT NULL DEFAU
add_search function · python · L69-L77 (9 LOC)
services/torrent_db.py
def add_search(query: str, category: str = "movies") -> int:
    with _get_conn() as conn:
        cursor = conn.execute(
            """INSERT INTO torrent_searches (query, category, created_at, status)
               VALUES (?, ?, ?, 'pending')""",
            (query, category, time.strftime("%Y-%m-%d %H:%M:%S")),
        )
        conn.commit()
        return cursor.lastrowid
Repobility · MCP-ready · https://repobility.com
update_search function · python · L80-L87 (8 LOC)
services/torrent_db.py
def update_search(search_id: int, **kwargs):
    if not kwargs:
        return
    cols = ", ".join(f"{k} = ?" for k in kwargs)
    vals = list(kwargs.values()) + [search_id]
    with _get_conn() as conn:
        conn.execute(f"UPDATE torrent_searches SET {cols} WHERE id = ?", vals)
        conn.commit()
get_searches function · python · L90-L95 (6 LOC)
services/torrent_db.py
def get_searches(limit: int = 20) -> list[dict]:
    with _get_conn() as conn:
        rows = conn.execute(
            "SELECT * FROM torrent_searches ORDER BY id DESC LIMIT ?", (limit,)
        ).fetchall()
        return [dict(row) for row in rows]
add_download function · python · L100-L116 (17 LOC)
services/torrent_db.py
def add_download(search_id: int, torrent_title: str, expected_title: str,
                 magnet_or_url: str, info_hash: str = None,
                 size_bytes: int = 0, seeders: int = 0, leechers: int = 0,
                 indexer: str = None, quality: str = None) -> int:
    with _get_conn() as conn:
        cursor = conn.execute(
            """INSERT INTO torrent_downloads
               (search_id, torrent_title, expected_title, magnet_or_url,
                info_hash, size_bytes, seeders, leechers, indexer, quality,
                status, added_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'queued', ?)""",
            (search_id, torrent_title, expected_title, magnet_or_url,
             info_hash, size_bytes, seeders, leechers, indexer, quality,
             time.strftime("%Y-%m-%d %H:%M:%S")),
        )
        conn.commit()
        return cursor.lastrowid
update_download function · python · L119-L126 (8 LOC)
services/torrent_db.py
def update_download(download_id: int, **kwargs):
    if not kwargs:
        return
    cols = ", ".join(f"{k} = ?" for k in kwargs)
    vals = list(kwargs.values()) + [download_id]
    with _get_conn() as conn:
        conn.execute(f"UPDATE torrent_downloads SET {cols} WHERE id = ?", vals)
        conn.commit()
get_download function · python · L129-L134 (6 LOC)
services/torrent_db.py
def get_download(download_id: int) -> dict | None:
    with _get_conn() as conn:
        row = conn.execute(
            "SELECT * FROM torrent_downloads WHERE id = ?", (download_id,)
        ).fetchone()
        return dict(row) if row else None
get_downloads function · python · L137-L151 (15 LOC)
services/torrent_db.py
def get_downloads(limit: int = 50, status: str = None,
                  search_id: int = None) -> list[dict]:
    query = "SELECT * FROM torrent_downloads WHERE 1=1"
    params = []
    if status:
        query += " AND status = ?"
        params.append(status)
    if search_id:
        query += " AND search_id = ?"
        params.append(search_id)
    query += " ORDER BY id DESC LIMIT ?"
    params.append(limit)
    with _get_conn() as conn:
        rows = conn.execute(query, params).fetchall()
        return [dict(row) for row in rows]
get_active_downloads function · python · L154-L162 (9 LOC)
services/torrent_db.py
def get_active_downloads() -> list[dict]:
    """Get downloads that are queued or downloading."""
    with _get_conn() as conn:
        rows = conn.execute(
            """SELECT * FROM torrent_downloads
               WHERE status IN ('queued', 'downloading')
               ORDER BY id"""
        ).fetchall()
        return [dict(row) for row in rows]
add_verification function · python · L167-L178 (12 LOC)
services/torrent_db.py
def add_verification(download_id: int, thumbnail_path: str,
                     timestamp_sec: float = 0) -> int:
    with _get_conn() as conn:
        cursor = conn.execute(
            """INSERT INTO verification_results
               (download_id, thumbnail_path, timestamp_sec, created_at)
               VALUES (?, ?, ?, ?)""",
            (download_id, thumbnail_path, timestamp_sec,
             time.strftime("%Y-%m-%d %H:%M:%S")),
        )
        conn.commit()
        return cursor.lastrowid
If a scraper extracted this row, it came from Repobility (https://repobility.com)
update_verification function · python · L181-L188 (8 LOC)
services/torrent_db.py
def update_verification(verification_id: int, **kwargs):
    if not kwargs:
        return
    cols = ", ".join(f"{k} = ?" for k in kwargs)
    vals = list(kwargs.values()) + [verification_id]
    with _get_conn() as conn:
        conn.execute(f"UPDATE verification_results SET {cols} WHERE id = ?", vals)
        conn.commit()
get_verifications function · python · L191-L198 (8 LOC)
services/torrent_db.py
def get_verifications(download_id: int) -> list[dict]:
    with _get_conn() as conn:
        rows = conn.execute(
            """SELECT * FROM verification_results
               WHERE download_id = ? ORDER BY timestamp_sec""",
            (download_id,)
        ).fetchall()
        return [dict(row) for row in rows]
TorrentManager class · python · L9-L253 (245 LOC)
services/torrent_manager.py
class TorrentManager:
    """Orchestrates torrent downloads and verification via background polling."""

    def __init__(self, socketio):
        self._socketio = socketio
        self._polling = False
        self._poll_thread = None
        self._stop_event = threading.Event()

    def start_polling(self):
        """Start the background polling thread."""
        if self._polling:
            return
        self._stop_event.clear()
        self._polling = True
        self._poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
        self._poll_thread.start()

    def stop_polling(self):
        """Stop the polling thread."""
        self._stop_event.set()
        self._polling = False

    def add_download(self, search_id: int, torrent_title: str,
                     expected_title: str, magnet_or_url: str,
                     info_hash: str = None, size_bytes: int = 0,
                     seeders: int = 0, leechers: int = 0,
                     indexer: str = N
__init__ method · python · L12-L16 (5 LOC)
services/torrent_manager.py
    def __init__(self, socketio):
        self._socketio = socketio
        self._polling = False
        self._poll_thread = None
        self._stop_event = threading.Event()
start_polling method · python · L18-L25 (8 LOC)
services/torrent_manager.py
    def start_polling(self):
        """Start the background polling thread."""
        if self._polling:
            return
        self._stop_event.clear()
        self._polling = True
        self._poll_thread = threading.Thread(target=self._poll_loop, daemon=True)
        self._poll_thread.start()
stop_polling method · python · L27-L30 (4 LOC)
services/torrent_manager.py
    def stop_polling(self):
        """Stop the polling thread."""
        self._stop_event.set()
        self._polling = False
add_download method · python · L32-L69 (38 LOC)
services/torrent_manager.py
    def add_download(self, search_id: int, torrent_title: str,
                     expected_title: str, magnet_or_url: str,
                     info_hash: str = None, size_bytes: int = 0,
                     seeders: int = 0, leechers: int = 0,
                     indexer: str = None, quality: str = None) -> dict:
        """Add a torrent to qBittorrent and track it in the DB."""
        download_dir = settings.get("torrent_download_dir", "")

        try:
            client = qbittorrent.get_client()
            success = client.add_torrent(magnet_or_url, save_path=download_dir or None)
            if not success:
                return {"error": "qBittorrent rejected the torrent"}
        except Exception as e:
            return {"error": f"Failed to add torrent: {e}"}

        # Extract info_hash from magnet if not provided
        if not info_hash and magnet_or_url.startswith("magnet:"):
            match = re.search(r'btih:([a-fA-F0-9]{40})', magnet_or_url)
            if mat
cancel_download method · python · L71-L85 (15 LOC)
services/torrent_manager.py
    def cancel_download(self, download_id: int, delete_files: bool = False) -> dict:
        """Cancel a download and optionally delete files."""
        dl = torrent_db.get_download(download_id)
        if not dl:
            return {"error": "Download not found"}

        if dl["info_hash"]:
            try:
                client = qbittorrent.get_client()
                client.delete_torrent(dl["info_hash"], delete_files=delete_files)
            except Exception:
                pass

        torrent_db.update_download(download_id, status="cancelled")
        return {"status": "cancelled"}
Repobility (the analyzer behind this table) · https://repobility.com
_poll_loop method · python · L87-L160 (74 LOC)
services/torrent_manager.py
    def _poll_loop(self):
        """Background loop to poll qBittorrent for progress updates."""
        backoff = config.TORRENT_POLL_INTERVAL

        while not self._stop_event.is_set():
            try:
                active = torrent_db.get_active_downloads()
                if not active:
                    backoff = config.TORRENT_IDLE_POLL_INTERVAL
                    self._stop_event.wait(backoff)
                    continue

                backoff = config.TORRENT_POLL_INTERVAL
                client = qbittorrent.get_client()

                for dl in active:
                    if not dl["info_hash"]:
                        continue

                    try:
                        torrent_info = client.get_torrent(dl["info_hash"])
                    except Exception:
                        continue

                    if torrent_info is None:
                        continue

                    progress = torrent_info.get("progress", 0)
                    state
_on_download_complete method · python · L162-L198 (37 LOC)
services/torrent_manager.py
    def _on_download_complete(self, download_id: int, torrent_info: dict):
        """Handle a completed download."""
        dl = torrent_db.get_download(download_id)
        if not dl or dl["status"] == "completed":
            return

        download_path = torrent_info.get("content_path", "")
        if not download_path:
            download_path = torrent_info.get("save_path", "")

        torrent_db.update_download(
            download_id,
            status="completed",
            download_progress=1.0,
            download_path=download_path,
            completed_at=time.strftime("%Y-%m-%d %H:%M:%S"),
        )

        self._socketio.emit("torrent:download_complete", {
            "download_id": download_id,
            "success": True,
            "message": "Download completed",
            "download_path": download_path,
        })

        # Update linked TV episodes
        self._update_tv_episodes_on_complete(download_id)

        # Auto-verify if enabled
        au
_update_tv_episodes_on_complete method · python · L200-L229 (30 LOC)
services/torrent_manager.py
    def _update_tv_episodes_on_complete(self, download_id: int):
        """Update linked TV episodes when a download completes."""
        try:
            # Individual episodes linked by download_id
            episodes = tv_db.get_episodes_by_download_id(download_id)
            for ep in episodes:
                tv_db.update_episode(ep["id"], status="completed")
                self._socketio.emit("tv:episode_status", {
                    "episode_id": ep["id"],
                    "season_id": ep["season_id"],
                    "status": "completed",
                    "download_id": download_id,
                })

            # Season packs linked by pack_download_id
            seasons = tv_db.get_seasons_by_pack_download_id(download_id)
            for season in seasons:
                for ep in tv_db.get_episodes(season["id"]):
                    if ep["covered_by_pack"]:
                        tv_db.update_episode(ep["id"], status="completed")
                complet
_update_tv_episodes_on_fail method · python · L231-L253 (23 LOC)
services/torrent_manager.py
    def _update_tv_episodes_on_fail(self, download_id: int):
        """Update linked TV episodes when a download fails."""
        try:
            episodes = tv_db.get_episodes_by_download_id(download_id)
            for ep in episodes:
                tv_db.update_episode(ep["id"], status="failed",
                                     error_message="Torrent download failed")
                self._socketio.emit("tv:episode_status", {
                    "episode_id": ep["id"],
                    "season_id": ep["season_id"],
                    "status": "failed",
                })

            seasons = tv_db.get_seasons_by_pack_download_id(download_id)
            for season in seasons:
                for ep in tv_db.get_episodes(season["id"]):
                    if ep["covered_by_pack"]:
                        tv_db.update_episode(ep["id"], status="failed",
                                             error_message="Season pack download failed")
                completion = t
_get_config function · python · L22-L26 (5 LOC)
services/torznab.py
def _get_config() -> dict:
    return {
        "url": settings.get("torznab_url", ""),
        "api_key": settings.get("torznab_api_key", ""),
    }
test_connection function · python · L29-L42 (14 LOC)
services/torznab.py
def test_connection() -> dict:
    """Test Torznab endpoint with a caps request."""
    cfg = _get_config()
    if not cfg["url"]:
        return {"ok": False, "message": "Torznab URL not configured"}
    try:
        params = {"t": "caps", "apikey": cfg["api_key"]}
        resp = requests.get(cfg["url"], params=params, timeout=10)
        resp.raise_for_status()
        if "<caps>" in resp.text or "<server" in resp.text:
            return {"ok": True, "message": "Connected successfully"}
        return {"ok": False, "message": "Unexpected response format"}
    except requests.RequestException as e:
        return {"ok": False, "message": str(e)}
search function · python · L45-L66 (22 LOC)
services/torznab.py
def search(query: str, category: str = "movies", limit: int = 100) -> list[dict]:
    """Search Torznab for torrents matching query."""
    cfg = _get_config()
    if not cfg["url"] or not cfg["api_key"]:
        return []

    cat_code = CATEGORIES.get(category, CATEGORIES["movies"])

    params = {
        "t": "search",
        "apikey": cfg["api_key"],
        "q": query,
        "cat": cat_code,
        "limit": str(limit),
    }

    try:
        resp = requests.get(cfg["url"], params=params, timeout=30)
        resp.raise_for_status()
        return _parse_results(resp.text)
    except requests.RequestException:
        return []
_parse_results function · python · L69-L162 (94 LOC)
services/torznab.py
def _parse_results(xml_text: str) -> list[dict]:
    """Parse Torznab XML response into a list of result dicts."""
    results = []
    try:
        root = ET.fromstring(xml_text)
    except ET.ParseError:
        return []

    # Find <item> elements in the RSS channel
    channel = root.find("channel")
    if channel is None:
        return []

    for item in channel.findall("item"):
        result = {
            "title": _text(item, "title"),
            "link": _text(item, "link"),
            "size": 0,
            "seeders": 0,
            "leechers": 0,
            "magnet_url": "",
            "info_hash": "",
            "indexer": _text(item, "jackettindexer") or _text(item, "prowlarrindexer") or "",
            "pub_date": _text(item, "pubDate"),
            "category": "",
        }

        # Parse size from <size> or enclosure
        size_text = _text(item, "size")
        if size_text:
            try:
                result["size"] = int(size_text)
            except
Powered by Repobility — scan your code at https://repobility.com
_text function · python · L165-L168 (4 LOC)
services/torznab.py
def _text(element, tag: str) -> str:
    """Get text content of a child element."""
    child = element.find(tag)
    return child.text.strip() if child is not None and child.text else ""
_extract_quality function · python · L171-L182 (12 LOC)
services/torznab.py
def _extract_quality(title: str) -> str:
    """Extract quality indicator from torrent title."""
    title_upper = title.upper()
    if "2160P" in title_upper or "4K" in title_upper or "UHD" in title_upper:
        return "2160p"
    if "1080P" in title_upper:
        return "1080p"
    if "720P" in title_upper:
        return "720p"
    if "480P" in title_upper:
        return "480p"
    return "unknown"
rank_results function · python · L185-L212 (28 LOC)
services/torznab.py
def rank_results(results: list[dict], quality_pref: str = None,
                 min_seeders: int = 3) -> list[dict]:
    """Filter and rank search results by quality and seed ratio."""
    # Filter by minimum seeders
    filtered = [r for r in results if r["seeders"] >= min_seeders]

    # Parse quality preferences
    preferred_qualities = []
    if quality_pref:
        preferred_qualities = [q.strip().lower() for q in quality_pref.split(",")]

    def score(r):
        s = 0
        # Quality match bonus
        if preferred_qualities and r["quality"].lower() in preferred_qualities:
            s += 100
        # Seed ratio score
        leechers = max(r["leechers"], 1)
        s += min(r["seeders"] / leechers, 50) * 2
        # Raw seeder count (diminishing returns)
        s += min(r["seeders"], 200) * 0.1
        # Penalize very large files (> 50GB)
        if r["size"] > 50 * 1024 ** 3:
            s -= 20
        return s

    filtered.sort(key=score, reverse=True)
    return 
‹ prevpage 3 / 5next ›