← back to drewscreations__archiving-app

Function bodies 212 total

All specs Real LLM only Function bodies
send_to_archiver function · python · L214-L232 (19 LOC)
routes/torrent_api.py
def send_to_archiver(download_id):
    dl = torrent_db.get_download(download_id)
    if not dl:
        return jsonify({"error": "Not found"}), 404

    data = request.get_json() or {}
    dest_name = data.get("dest_name", "").strip()
    if not dest_name:
        # Auto-suggest using Jellyfin naming
        dest_name = media.suggest_jellyfin_name(dl["expected_title"])

    torrent_db.update_download(download_id, archived=1)

    return jsonify({
        "status": "ready_to_archive",
        "download_id": download_id,
        "download_path": dl["download_path"],
        "dest_name": dest_name,
    })
suggest_archive_name function · python · L236-L241 (6 LOC)
routes/torrent_api.py
def suggest_archive_name(download_id):
    dl = torrent_db.get_download(download_id)
    if not dl:
        return jsonify({"error": "Not found"}), 404
    suggestion = media.suggest_jellyfin_name(dl["expected_title"])
    return jsonify({"suggestion": suggestion})
lookup_show function · python · L16-L22 (7 LOC)
routes/tv_api.py
def lookup_show():
    data = request.get_json()
    query = data.get("query", "").strip()
    if not query:
        return jsonify({"error": "Query is required"}), 400
    results = tv_search.lookup_show(query)
    return jsonify({"results": results})
select_show function · python · L26-L34 (9 LOC)
routes/tv_api.py
def select_show():
    data = request.get_json()
    tmdb_id = data.get("tmdb_id")
    if not tmdb_id:
        return jsonify({"error": "tmdb_id is required"}), 400
    show = tv_search.select_show(int(tmdb_id))
    if not show:
        return jsonify({"error": "Show not found on TMDB"}), 404
    return jsonify(show)
select_seasons function · python · L38-L45 (8 LOC)
routes/tv_api.py
def select_seasons():
    data = request.get_json()
    show_id = data.get("show_id")
    season_numbers = data.get("season_numbers", [])
    if not show_id or not season_numbers:
        return jsonify({"error": "show_id and season_numbers are required"}), 400
    seasons = tv_search.select_seasons(int(show_id), season_numbers)
    return jsonify({"seasons": seasons})
search_season function · python · L51-L66 (16 LOC)
routes/tv_api.py
def search_season(season_id):
    if torrent_manager is None:
        return jsonify({"error": "Torrent manager not initialized"}), 500
    season = tv_db.get_season(season_id)
    if not season:
        return jsonify({"error": "Season not found"}), 404

    from flask import current_app
    socketio = current_app.extensions.get("socketio")
    thread = threading.Thread(
        target=tv_search.start_season_search,
        args=(season_id, torrent_manager, socketio),
        daemon=True,
    )
    thread.start()
    return jsonify({"status": "search_started", "season_id": season_id})
set_preference function · python · L70-L76 (7 LOC)
routes/tv_api.py
def set_preference(season_id):
    data = request.get_json()
    preference = data.get("preference", "auto")
    if preference not in ("auto", "pack", "individual"):
        return jsonify({"error": "Invalid preference"}), 400
    tv_db.update_season(season_id, pack_preference=preference)
    return jsonify({"status": "ok"})
Repobility · code-quality intelligence platform · https://repobility.com
list_shows function · python · L82-L89 (8 LOC)
routes/tv_api.py
def list_shows():
    shows = tv_db.get_shows()
    for show in shows:
        seasons = tv_db.get_seasons(show["id"])
        show["season_count"] = len(seasons)
        total_eps = sum(s["episode_count"] for s in seasons)
        show["total_episodes"] = total_eps
    return jsonify(shows)
get_show function · python · L93-L97 (5 LOC)
routes/tv_api.py
def get_show(show_id):
    show = tv_search.get_show_status(show_id)
    if not show:
        return jsonify({"error": "Not found"}), 404
    return jsonify(show)
get_season function · python · L101-L105 (5 LOC)
routes/tv_api.py
def get_season(season_id):
    season = tv_search.get_season_status(season_id)
    if not season:
        return jsonify({"error": "Not found"}), 404
    return jsonify(season)
retry_episode function · python · L111-L118 (8 LOC)
routes/tv_api.py
def retry_episode(episode_id):
    if torrent_manager is None:
        return jsonify({"error": "Torrent manager not initialized"}), 500

    from flask import current_app
    socketio = current_app.extensions.get("socketio")
    result = tv_search.retry_episode(episode_id, torrent_manager, socketio)
    return jsonify(result)
retry_all function · python · L122-L126 (5 LOC)
routes/tv_api.py
def retry_all():
    if tv_scheduler is None:
        return jsonify({"error": "Scheduler not initialized"}), 500
    tv_scheduler.run_now()
    return jsonify({"status": "retry_triggered"})
scheduler_status function · python · L132-L135 (4 LOC)
routes/tv_api.py
def scheduler_status():
    if tv_scheduler is None:
        return jsonify({"running": False})
    return jsonify(tv_scheduler.get_status())
get_disk_usage function · python · L6-L24 (19 LOC)
services/disk.py
def get_disk_usage(path: str) -> dict | None:
    """Get disk usage info for the volume containing path."""
    if not path or not os.path.exists(path):
        return None
    try:
        usage = shutil.disk_usage(path)
        used_percent = (usage.used / usage.total * 100) if usage.total > 0 else 0
        return {
            "path": path,
            "total": usage.total,
            "used": usage.used,
            "free": usage.free,
            "total_formatted": format_size(usage.total),
            "used_formatted": format_size(usage.used),
            "free_formatted": format_size(usage.free),
            "used_percent": round(used_percent, 1),
        }
    except OSError:
        return None
unique_path function · python · L6-L24 (19 LOC)
services/file_ops.py
def unique_path(base_path: str) -> str:
    """If base_path exists, append _1, _2, ... until it doesn't."""
    if not os.path.exists(base_path):
        return base_path

    parent = os.path.dirname(base_path)
    name = os.path.basename(base_path)

    stem = name
    ext = ""
    if os.path.isfile(base_path):
        stem, ext = os.path.splitext(name)

    i = 1
    while True:
        candidate = os.path.join(parent, f"{stem}_{i}{ext}")
        if not os.path.exists(candidate):
            return candidate
        i += 1
Repobility (the analyzer behind this table) · https://repobility.com
copy_file_chunked function · python · L27-L49 (23 LOC)
services/file_ops.py
def copy_file_chunked(src: str, dst: str, on_progress=None):
    """
    Copy a single file in chunks, calling on_progress(bytes_written) after each chunk.
    Preserves file metadata via shutil.copystat.
    """
    os.makedirs(os.path.dirname(dst), exist_ok=True)
    dst = unique_path(dst)

    total = os.path.getsize(src)
    copied = 0

    with open(src, "rb") as fsrc, open(dst, "wb") as fdst:
        while True:
            chunk = fsrc.read(config.CHUNK_SIZE)
            if not chunk:
                break
            fdst.write(chunk)
            copied += len(chunk)
            if on_progress:
                on_progress(len(chunk))

    shutil.copystat(src, dst)
    return dst
copy_item function · python · L52-L61 (10 LOC)
services/file_ops.py
def copy_item(src: str, dst: str):
    """Copy a file or directory from src to dst (no progress callbacks)."""
    if os.path.isdir(src):
        dst = unique_path(dst)
        shutil.copytree(src, dst)
    else:
        os.makedirs(os.path.dirname(dst), exist_ok=True)
        dst = unique_path(dst)
        shutil.copy2(src, dst)
    return dst
get_size function · python · L64-L76 (13 LOC)
services/file_ops.py
def get_size(path: str) -> int:
    """Get total size in bytes for a file or directory."""
    if os.path.isfile(path):
        return os.path.getsize(path)
    total = 0
    for dirpath, _dirnames, filenames in os.walk(path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            try:
                total += os.path.getsize(fp)
            except OSError:
                pass
    return total
format_size function · python · L79-L90 (12 LOC)
services/file_ops.py
def format_size(size_bytes: int) -> str:
    """Format bytes into human-readable string."""
    if size_bytes < 1024:
        return f"{size_bytes} B"
    elif size_bytes < 1024 ** 2:
        return f"{size_bytes / 1024:.1f} KB"
    elif size_bytes < 1024 ** 3:
        return f"{size_bytes / 1024 ** 2:.1f} MB"
    elif size_bytes < 1024 ** 4:
        return f"{size_bytes / 1024 ** 3:.2f} GB"
    else:
        return f"{size_bytes / 1024 ** 4:.2f} TB"
list_subdirs function · python · L93-L103 (11 LOC)
services/file_ops.py
def list_subdirs(path: str) -> list[str]:
    """List subdirectory names in path, sorted alphabetically."""
    if not path or not os.path.isdir(path):
        return []
    try:
        return sorted(
            d for d in os.listdir(path)
            if os.path.isdir(os.path.join(path, d))
        )
    except OSError:
        return []
list_items function · python · L106-L126 (21 LOC)
services/file_ops.py
def list_items(folder_path: str) -> list[dict]:
    """List items in a folder with name, type, and size."""
    if not os.path.isdir(folder_path):
        return []
    try:
        items = sorted(os.listdir(folder_path))
    except OSError:
        return []

    result = []
    for name in items:
        full = os.path.join(folder_path, name)
        is_dir = os.path.isdir(full)
        size = get_size(full)
        result.append({
            "name": name,
            "is_dir": is_dir,
            "size": size,
            "size_formatted": format_size(size),
        })
    return result
safe_resolve function · python · L129-L137 (9 LOC)
services/file_ops.py
def safe_resolve(path: str) -> str | None:
    """Resolve a path and ensure it doesn't contain traversal attacks."""
    try:
        resolved = os.path.realpath(path)
        if ".." in os.path.relpath(resolved, path):
            return None
        return resolved
    except (ValueError, OSError):
        return None
_get_conn function · python · L7-L10 (4 LOC)
services/history.py
def _get_conn():
    conn = sqlite3.connect(config.DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn
Want this analysis on your repo? https://repobility.com/scan/
init_db function · python · L13-L31 (19 LOC)
services/history.py
def init_db():
    """Create transfer_history table if it doesn't exist."""
    with _get_conn() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS transfer_history (
                id             INTEGER PRIMARY KEY AUTOINCREMENT,
                started_at     TEXT NOT NULL,
                completed_at   TEXT,
                source_folder  TEXT NOT NULL,
                dest_folder    TEXT NOT NULL,
                archive_folder TEXT,
                items_count    INTEGER NOT NULL,
                bytes_total    INTEGER NOT NULL DEFAULT 0,
                duration_sec   REAL,
                status         TEXT NOT NULL DEFAULT 'in_progress',
                error_message  TEXT
            )
        """)
        conn.commit()
add_entry function · python · L34-L46 (13 LOC)
services/history.py
def add_entry(source_folder: str, dest_folder: str, items_count: int,
              bytes_total: int = 0) -> int:
    """Add a new transfer history entry. Returns the entry ID."""
    with _get_conn() as conn:
        cursor = conn.execute(
            """INSERT INTO transfer_history
               (started_at, source_folder, dest_folder, items_count, bytes_total, status)
               VALUES (?, ?, ?, ?, ?, 'in_progress')""",
            (time.strftime("%Y-%m-%d %H:%M:%S"), source_folder, dest_folder,
             items_count, bytes_total),
        )
        conn.commit()
        return cursor.lastrowid
complete_entry function · python · L49-L62 (14 LOC)
services/history.py
def complete_entry(entry_id: int, success: bool, archive_folder: str = None,
                   duration_sec: float = None, error_message: str = None):
    """Mark a transfer as completed."""
    status = "success" if success else "failed"
    with _get_conn() as conn:
        conn.execute(
            """UPDATE transfer_history
               SET completed_at = ?, status = ?, archive_folder = ?,
                   duration_sec = ?, error_message = ?
               WHERE id = ?""",
            (time.strftime("%Y-%m-%d %H:%M:%S"), status, archive_folder,
             duration_sec, error_message, entry_id),
        )
        conn.commit()
get_all function · python · L65-L73 (9 LOC)
services/history.py
def get_all(limit: int = 50) -> list[dict]:
    """Get transfer history, newest first."""
    with _get_conn() as conn:
        rows = conn.execute(
            """SELECT * FROM transfer_history
               ORDER BY id DESC LIMIT ?""",
            (limit,),
        ).fetchall()
        return [dict(row) for row in rows]
delete_entry function · python · L76-L79 (4 LOC)
services/history.py
def delete_entry(entry_id: int):
    with _get_conn() as conn:
        conn.execute("DELETE FROM transfer_history WHERE id = ?", (entry_id,))
        conn.commit()
get_media_type function · python · L22-L25 (4 LOC)
services/media.py
def get_media_type(filename: str) -> str:
    """Get media type category for a filename."""
    ext = os.path.splitext(filename)[1].lower()
    return _EXT_MAP.get(ext, "other")
suggest_jellyfin_name function · python · L41-L108 (68 LOC)
services/media.py
def suggest_jellyfin_name(folder_name: str) -> str:
    """
    Clean up a torrent-style folder name into Jellyfin-friendly format.
    e.g., "The.Movie.2024.1080p.BluRay.x264-GROUP" -> "The Movie (2024)"
    """
    name = folder_name

    # Remove common group tags at end: -GROUP or [GROUP]
    name = re.sub(r'[-\s]*\[?[A-Za-z0-9]+\]?$', '', name)
    # But be careful not to strip too aggressively; only strip if preceded by codec/source tags

    # Remove known tags (case-insensitive)
    tags = [
        # Resolution
        r'2160p', r'1080p', r'720p', r'480p', r'4K', r'UHD',
        # Source
        r'BluRay', r'Blu-Ray', r'BDRip', r'BRRip', r'WEB-?DL', r'WEBRip',
        r'WEB', r'HDTV', r'DVDRip', r'DVD', r'HDCAM', r'CAM', r'TS',
        r'HDRip', r'AMZN', r'NF', r'DSNP', r'HMAX', r'ATVP', r'PCOK',
        r'Remux',
        # Codec
        r'x264', r'x265', r'H\.?264', r'H\.?265', r'HEVC', r'AVC',
        r'XviD', r'DivX', r'AV1', r'10bit',
        # Audio
        r'AAC', r'AC3'
QBitClient class · python · L9-L140 (132 LOC)
services/qbittorrent.py
class QBitClient:
    """Client for qBittorrent Web API v2."""

    def __init__(self):
        self._session = requests.Session()
        self._authenticated = False
        self._last_auth = 0

    def _get_config(self) -> dict:
        return {
            "url": settings.get("qbit_url", "").rstrip("/"),
            "username": settings.get("qbit_username", ""),
            "password": settings.get("qbit_password", ""),
        }

    def _ensure_auth(self):
        """Authenticate if not already authenticated or session expired."""
        if self._authenticated and (time.time() - self._last_auth) < 1800:
            return
        self.login()

    def login(self) -> bool:
        """Login to qBittorrent. Returns True on success."""
        cfg = self._get_config()
        if not cfg["url"]:
            raise ConnectionError("qBittorrent URL not configured")

        try:
            resp = self._session.post(
                f"{cfg['url']}/api/v2/auth/login",
                data
Same scanner, your repo: https://repobility.com — Repobility
__init__ method · python · L12-L15 (4 LOC)
services/qbittorrent.py
    def __init__(self):
        self._session = requests.Session()
        self._authenticated = False
        self._last_auth = 0
_get_config method · python · L17-L22 (6 LOC)
services/qbittorrent.py
    def _get_config(self) -> dict:
        return {
            "url": settings.get("qbit_url", "").rstrip("/"),
            "username": settings.get("qbit_username", ""),
            "password": settings.get("qbit_password", ""),
        }
_ensure_auth method · python · L24-L28 (5 LOC)
services/qbittorrent.py
    def _ensure_auth(self):
        """Authenticate if not already authenticated or session expired."""
        if self._authenticated and (time.time() - self._last_auth) < 1800:
            return
        self.login()
login method · python · L30-L49 (20 LOC)
services/qbittorrent.py
    def login(self) -> bool:
        """Login to qBittorrent. Returns True on success."""
        cfg = self._get_config()
        if not cfg["url"]:
            raise ConnectionError("qBittorrent URL not configured")

        try:
            resp = self._session.post(
                f"{cfg['url']}/api/v2/auth/login",
                data={"username": cfg["username"], "password": cfg["password"]},
                timeout=10,
            )
            if resp.text == "Ok.":
                self._authenticated = True
                self._last_auth = time.time()
                return True
            raise ConnectionError(f"Login failed: {resp.text}")
        except requests.RequestException as e:
            self._authenticated = False
            raise ConnectionError(f"Cannot connect to qBittorrent: {e}")
add_torrent method · python · L51-L72 (22 LOC)
services/qbittorrent.py
    def add_torrent(self, magnet_or_url: str, save_path: str = None,
                    category: str = "archiver") -> bool:
        """Add a torrent by magnet link or .torrent URL."""
        self._ensure_auth()
        cfg = self._get_config()

        data = {"category": category}
        if save_path:
            data["savepath"] = save_path

        # Determine if it's a magnet link or a torrent URL
        if magnet_or_url.startswith("magnet:"):
            data["urls"] = magnet_or_url
        else:
            data["urls"] = magnet_or_url

        resp = self._session.post(
            f"{cfg['url']}/api/v2/torrents/add",
            data=data,
            timeout=15,
        )
        return resp.status_code == 200 and resp.text == "Ok."
get_torrent method · python · L74-L87 (14 LOC)
services/qbittorrent.py
    def get_torrent(self, info_hash: str) -> dict | None:
        """Get info for a specific torrent by hash."""
        self._ensure_auth()
        cfg = self._get_config()

        resp = self._session.get(
            f"{cfg['url']}/api/v2/torrents/info",
            params={"hashes": info_hash.lower()},
            timeout=10,
        )
        if resp.status_code != 200:
            return None
        torrents = resp.json()
        return torrents[0] if torrents else None
get_torrents method · python · L89-L101 (13 LOC)
services/qbittorrent.py
    def get_torrents(self, category: str = "archiver") -> list[dict]:
        """Get all torrents in a category."""
        self._ensure_auth()
        cfg = self._get_config()

        resp = self._session.get(
            f"{cfg['url']}/api/v2/torrents/info",
            params={"category": category},
            timeout=10,
        )
        if resp.status_code != 200:
            return []
        return resp.json()
get_torrent_files method · python · L103-L115 (13 LOC)
services/qbittorrent.py
    def get_torrent_files(self, info_hash: str) -> list[dict]:
        """Get file list for a torrent."""
        self._ensure_auth()
        cfg = self._get_config()

        resp = self._session.get(
            f"{cfg['url']}/api/v2/torrents/files",
            params={"hash": info_hash.lower()},
            timeout=10,
        )
        if resp.status_code != 200:
            return []
        return resp.json()
Repobility · code-quality intelligence platform · https://repobility.com
delete_torrent method · python · L117-L129 (13 LOC)
services/qbittorrent.py
    def delete_torrent(self, info_hash: str, delete_files: bool = False):
        """Delete a torrent."""
        self._ensure_auth()
        cfg = self._get_config()

        self._session.post(
            f"{cfg['url']}/api/v2/torrents/delete",
            data={
                "hashes": info_hash.lower(),
                "deleteFiles": "true" if delete_files else "false",
            },
            timeout=10,
        )
get_version method · python · L131-L140 (10 LOC)
services/qbittorrent.py
    def get_version(self) -> str:
        """Get qBittorrent version string."""
        self._ensure_auth()
        cfg = self._get_config()

        resp = self._session.get(
            f"{cfg['url']}/api/v2/app/version",
            timeout=10,
        )
        return resp.text if resp.status_code == 200 else "unknown"
test_connection function · python · L143-L151 (9 LOC)
services/qbittorrent.py
def test_connection() -> dict:
    """Test qBittorrent connection."""
    try:
        client = QBitClient()
        client.login()
        version = client.get_version()
        return {"ok": True, "message": f"Connected - qBittorrent {version}"}
    except Exception as e:
        return {"ok": False, "message": str(e)}
get_client function · python · L158-L162 (5 LOC)
services/qbittorrent.py
def get_client() -> QBitClient:
    global _client
    if _client is None:
        _client = QBitClient()
    return _client
reset_client function · python · L165-L168 (4 LOC)
services/qbittorrent.py
def reset_client():
    """Reset the client (e.g., after settings change)."""
    global _client
    _client = None
_get_conn function · python · L7-L10 (4 LOC)
services/queue.py
def _get_conn():
    conn = sqlite3.connect(config.DB_PATH)
    conn.row_factory = sqlite3.Row
    return conn
init_db function · python · L13-L26 (14 LOC)
services/queue.py
def init_db():
    """Create transfer_queue table if it doesn't exist."""
    with _get_conn() as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS transfer_queue (
                id            INTEGER PRIMARY KEY AUTOINCREMENT,
                position      INTEGER NOT NULL,
                source_folder TEXT NOT NULL,
                dest_name     TEXT NOT NULL,
                items         TEXT NOT NULL,
                added_at      TEXT NOT NULL
            )
        """)
        conn.commit()
add function · python · L29-L45 (17 LOC)
services/queue.py
def add(source_folder: str, dest_name: str, items: list[str]) -> int:
    """Add a folder to the transfer queue. Returns the entry ID."""
    with _get_conn() as conn:
        # Get next position
        row = conn.execute(
            "SELECT COALESCE(MAX(position), 0) + 1 AS next_pos FROM transfer_queue"
        ).fetchone()
        pos = row["next_pos"]

        cursor = conn.execute(
            """INSERT INTO transfer_queue (position, source_folder, dest_name, items, added_at)
               VALUES (?, ?, ?, ?, ?)""",
            (pos, source_folder, dest_name, json.dumps(items),
             time.strftime("%Y-%m-%d %H:%M:%S")),
        )
        conn.commit()
        return cursor.lastrowid
Repobility (the analyzer behind this table) · https://repobility.com
get_all function · python · L48-L59 (12 LOC)
services/queue.py
def get_all() -> list[dict]:
    """Get all queued items ordered by position."""
    with _get_conn() as conn:
        rows = conn.execute(
            "SELECT * FROM transfer_queue ORDER BY position ASC"
        ).fetchall()
        result = []
        for row in rows:
            d = dict(row)
            d["items"] = json.loads(d["items"])
            result.append(d)
        return result
remove function · python · L62-L67 (6 LOC)
services/queue.py
def remove(entry_id: int):
    """Remove an item from the queue."""
    with _get_conn() as conn:
        conn.execute("DELETE FROM transfer_queue WHERE id = ?", (entry_id,))
        conn.commit()
        _reindex(conn)
pop_first function · python · L70-L83 (14 LOC)
services/queue.py
def pop_first() -> dict | None:
    """Remove and return the first item in the queue."""
    with _get_conn() as conn:
        row = conn.execute(
            "SELECT * FROM transfer_queue ORDER BY position ASC LIMIT 1"
        ).fetchone()
        if not row:
            return None
        d = dict(row)
        d["items"] = json.loads(d["items"])
        conn.execute("DELETE FROM transfer_queue WHERE id = ?", (d["id"],))
        conn.commit()
        _reindex(conn)
        return d
‹ prevpage 2 / 5next ›