Function bodies 212 total
send_to_archiver function · python · L214-L232 (19 LOC)routes/torrent_api.py
def send_to_archiver(download_id):
dl = torrent_db.get_download(download_id)
if not dl:
return jsonify({"error": "Not found"}), 404
data = request.get_json() or {}
dest_name = data.get("dest_name", "").strip()
if not dest_name:
# Auto-suggest using Jellyfin naming
dest_name = media.suggest_jellyfin_name(dl["expected_title"])
torrent_db.update_download(download_id, archived=1)
return jsonify({
"status": "ready_to_archive",
"download_id": download_id,
"download_path": dl["download_path"],
"dest_name": dest_name,
})suggest_archive_name function · python · L236-L241 (6 LOC)routes/torrent_api.py
def suggest_archive_name(download_id):
dl = torrent_db.get_download(download_id)
if not dl:
return jsonify({"error": "Not found"}), 404
suggestion = media.suggest_jellyfin_name(dl["expected_title"])
return jsonify({"suggestion": suggestion})lookup_show function · python · L16-L22 (7 LOC)routes/tv_api.py
def lookup_show():
data = request.get_json()
query = data.get("query", "").strip()
if not query:
return jsonify({"error": "Query is required"}), 400
results = tv_search.lookup_show(query)
return jsonify({"results": results})select_show function · python · L26-L34 (9 LOC)routes/tv_api.py
def select_show():
data = request.get_json()
tmdb_id = data.get("tmdb_id")
if not tmdb_id:
return jsonify({"error": "tmdb_id is required"}), 400
show = tv_search.select_show(int(tmdb_id))
if not show:
return jsonify({"error": "Show not found on TMDB"}), 404
return jsonify(show)select_seasons function · python · L38-L45 (8 LOC)routes/tv_api.py
def select_seasons():
data = request.get_json()
show_id = data.get("show_id")
season_numbers = data.get("season_numbers", [])
if not show_id or not season_numbers:
return jsonify({"error": "show_id and season_numbers are required"}), 400
seasons = tv_search.select_seasons(int(show_id), season_numbers)
return jsonify({"seasons": seasons})search_season function · python · L51-L66 (16 LOC)routes/tv_api.py
def search_season(season_id):
if torrent_manager is None:
return jsonify({"error": "Torrent manager not initialized"}), 500
season = tv_db.get_season(season_id)
if not season:
return jsonify({"error": "Season not found"}), 404
from flask import current_app
socketio = current_app.extensions.get("socketio")
thread = threading.Thread(
target=tv_search.start_season_search,
args=(season_id, torrent_manager, socketio),
daemon=True,
)
thread.start()
return jsonify({"status": "search_started", "season_id": season_id})set_preference function · python · L70-L76 (7 LOC)routes/tv_api.py
def set_preference(season_id):
data = request.get_json()
preference = data.get("preference", "auto")
if preference not in ("auto", "pack", "individual"):
return jsonify({"error": "Invalid preference"}), 400
tv_db.update_season(season_id, pack_preference=preference)
return jsonify({"status": "ok"})Repobility · code-quality intelligence platform · https://repobility.com
list_shows function · python · L82-L89 (8 LOC)routes/tv_api.py
def list_shows():
shows = tv_db.get_shows()
for show in shows:
seasons = tv_db.get_seasons(show["id"])
show["season_count"] = len(seasons)
total_eps = sum(s["episode_count"] for s in seasons)
show["total_episodes"] = total_eps
return jsonify(shows)get_show function · python · L93-L97 (5 LOC)routes/tv_api.py
def get_show(show_id):
show = tv_search.get_show_status(show_id)
if not show:
return jsonify({"error": "Not found"}), 404
return jsonify(show)get_season function · python · L101-L105 (5 LOC)routes/tv_api.py
def get_season(season_id):
season = tv_search.get_season_status(season_id)
if not season:
return jsonify({"error": "Not found"}), 404
return jsonify(season)retry_episode function · python · L111-L118 (8 LOC)routes/tv_api.py
def retry_episode(episode_id):
if torrent_manager is None:
return jsonify({"error": "Torrent manager not initialized"}), 500
from flask import current_app
socketio = current_app.extensions.get("socketio")
result = tv_search.retry_episode(episode_id, torrent_manager, socketio)
return jsonify(result)retry_all function · python · L122-L126 (5 LOC)routes/tv_api.py
def retry_all():
if tv_scheduler is None:
return jsonify({"error": "Scheduler not initialized"}), 500
tv_scheduler.run_now()
return jsonify({"status": "retry_triggered"})scheduler_status function · python · L132-L135 (4 LOC)routes/tv_api.py
def scheduler_status():
if tv_scheduler is None:
return jsonify({"running": False})
return jsonify(tv_scheduler.get_status())get_disk_usage function · python · L6-L24 (19 LOC)services/disk.py
def get_disk_usage(path: str) -> dict | None:
"""Get disk usage info for the volume containing path."""
if not path or not os.path.exists(path):
return None
try:
usage = shutil.disk_usage(path)
used_percent = (usage.used / usage.total * 100) if usage.total > 0 else 0
return {
"path": path,
"total": usage.total,
"used": usage.used,
"free": usage.free,
"total_formatted": format_size(usage.total),
"used_formatted": format_size(usage.used),
"free_formatted": format_size(usage.free),
"used_percent": round(used_percent, 1),
}
except OSError:
return Noneunique_path function · python · L6-L24 (19 LOC)services/file_ops.py
def unique_path(base_path: str) -> str:
"""If base_path exists, append _1, _2, ... until it doesn't."""
if not os.path.exists(base_path):
return base_path
parent = os.path.dirname(base_path)
name = os.path.basename(base_path)
stem = name
ext = ""
if os.path.isfile(base_path):
stem, ext = os.path.splitext(name)
i = 1
while True:
candidate = os.path.join(parent, f"{stem}_{i}{ext}")
if not os.path.exists(candidate):
return candidate
i += 1Repobility (the analyzer behind this table) · https://repobility.com
copy_file_chunked function · python · L27-L49 (23 LOC)services/file_ops.py
def copy_file_chunked(src: str, dst: str, on_progress=None):
"""
Copy a single file in chunks, calling on_progress(bytes_written) after each chunk.
Preserves file metadata via shutil.copystat.
"""
os.makedirs(os.path.dirname(dst), exist_ok=True)
dst = unique_path(dst)
total = os.path.getsize(src)
copied = 0
with open(src, "rb") as fsrc, open(dst, "wb") as fdst:
while True:
chunk = fsrc.read(config.CHUNK_SIZE)
if not chunk:
break
fdst.write(chunk)
copied += len(chunk)
if on_progress:
on_progress(len(chunk))
shutil.copystat(src, dst)
return dstcopy_item function · python · L52-L61 (10 LOC)services/file_ops.py
def copy_item(src: str, dst: str):
"""Copy a file or directory from src to dst (no progress callbacks)."""
if os.path.isdir(src):
dst = unique_path(dst)
shutil.copytree(src, dst)
else:
os.makedirs(os.path.dirname(dst), exist_ok=True)
dst = unique_path(dst)
shutil.copy2(src, dst)
return dstget_size function · python · L64-L76 (13 LOC)services/file_ops.py
def get_size(path: str) -> int:
"""Get total size in bytes for a file or directory."""
if os.path.isfile(path):
return os.path.getsize(path)
total = 0
for dirpath, _dirnames, filenames in os.walk(path):
for f in filenames:
fp = os.path.join(dirpath, f)
try:
total += os.path.getsize(fp)
except OSError:
pass
return totalformat_size function · python · L79-L90 (12 LOC)services/file_ops.py
def format_size(size_bytes: int) -> str:
"""Format bytes into human-readable string."""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 ** 2:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 ** 3:
return f"{size_bytes / 1024 ** 2:.1f} MB"
elif size_bytes < 1024 ** 4:
return f"{size_bytes / 1024 ** 3:.2f} GB"
else:
return f"{size_bytes / 1024 ** 4:.2f} TB"list_subdirs function · python · L93-L103 (11 LOC)services/file_ops.py
def list_subdirs(path: str) -> list[str]:
"""List subdirectory names in path, sorted alphabetically."""
if not path or not os.path.isdir(path):
return []
try:
return sorted(
d for d in os.listdir(path)
if os.path.isdir(os.path.join(path, d))
)
except OSError:
return []list_items function · python · L106-L126 (21 LOC)services/file_ops.py
def list_items(folder_path: str) -> list[dict]:
"""List items in a folder with name, type, and size."""
if not os.path.isdir(folder_path):
return []
try:
items = sorted(os.listdir(folder_path))
except OSError:
return []
result = []
for name in items:
full = os.path.join(folder_path, name)
is_dir = os.path.isdir(full)
size = get_size(full)
result.append({
"name": name,
"is_dir": is_dir,
"size": size,
"size_formatted": format_size(size),
})
return resultsafe_resolve function · python · L129-L137 (9 LOC)services/file_ops.py
def safe_resolve(path: str) -> str | None:
"""Resolve a path and ensure it doesn't contain traversal attacks."""
try:
resolved = os.path.realpath(path)
if ".." in os.path.relpath(resolved, path):
return None
return resolved
except (ValueError, OSError):
return None_get_conn function · python · L7-L10 (4 LOC)services/history.py
def _get_conn():
conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row
return connWant this analysis on your repo? https://repobility.com/scan/
init_db function · python · L13-L31 (19 LOC)services/history.py
def init_db():
"""Create transfer_history table if it doesn't exist."""
with _get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS transfer_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
completed_at TEXT,
source_folder TEXT NOT NULL,
dest_folder TEXT NOT NULL,
archive_folder TEXT,
items_count INTEGER NOT NULL,
bytes_total INTEGER NOT NULL DEFAULT 0,
duration_sec REAL,
status TEXT NOT NULL DEFAULT 'in_progress',
error_message TEXT
)
""")
conn.commit()add_entry function · python · L34-L46 (13 LOC)services/history.py
def add_entry(source_folder: str, dest_folder: str, items_count: int,
bytes_total: int = 0) -> int:
"""Add a new transfer history entry. Returns the entry ID."""
with _get_conn() as conn:
cursor = conn.execute(
"""INSERT INTO transfer_history
(started_at, source_folder, dest_folder, items_count, bytes_total, status)
VALUES (?, ?, ?, ?, ?, 'in_progress')""",
(time.strftime("%Y-%m-%d %H:%M:%S"), source_folder, dest_folder,
items_count, bytes_total),
)
conn.commit()
return cursor.lastrowidcomplete_entry function · python · L49-L62 (14 LOC)services/history.py
def complete_entry(entry_id: int, success: bool, archive_folder: str = None,
duration_sec: float = None, error_message: str = None):
"""Mark a transfer as completed."""
status = "success" if success else "failed"
with _get_conn() as conn:
conn.execute(
"""UPDATE transfer_history
SET completed_at = ?, status = ?, archive_folder = ?,
duration_sec = ?, error_message = ?
WHERE id = ?""",
(time.strftime("%Y-%m-%d %H:%M:%S"), status, archive_folder,
duration_sec, error_message, entry_id),
)
conn.commit()get_all function · python · L65-L73 (9 LOC)services/history.py
def get_all(limit: int = 50) -> list[dict]:
"""Get transfer history, newest first."""
with _get_conn() as conn:
rows = conn.execute(
"""SELECT * FROM transfer_history
ORDER BY id DESC LIMIT ?""",
(limit,),
).fetchall()
return [dict(row) for row in rows]delete_entry function · python · L76-L79 (4 LOC)services/history.py
def delete_entry(entry_id: int):
with _get_conn() as conn:
conn.execute("DELETE FROM transfer_history WHERE id = ?", (entry_id,))
conn.commit()get_media_type function · python · L22-L25 (4 LOC)services/media.py
def get_media_type(filename: str) -> str:
"""Get media type category for a filename."""
ext = os.path.splitext(filename)[1].lower()
return _EXT_MAP.get(ext, "other")suggest_jellyfin_name function · python · L41-L108 (68 LOC)services/media.py
def suggest_jellyfin_name(folder_name: str) -> str:
"""
Clean up a torrent-style folder name into Jellyfin-friendly format.
e.g., "The.Movie.2024.1080p.BluRay.x264-GROUP" -> "The Movie (2024)"
"""
name = folder_name
# Remove common group tags at end: -GROUP or [GROUP]
name = re.sub(r'[-\s]*\[?[A-Za-z0-9]+\]?$', '', name)
# But be careful not to strip too aggressively; only strip if preceded by codec/source tags
# Remove known tags (case-insensitive)
tags = [
# Resolution
r'2160p', r'1080p', r'720p', r'480p', r'4K', r'UHD',
# Source
r'BluRay', r'Blu-Ray', r'BDRip', r'BRRip', r'WEB-?DL', r'WEBRip',
r'WEB', r'HDTV', r'DVDRip', r'DVD', r'HDCAM', r'CAM', r'TS',
r'HDRip', r'AMZN', r'NF', r'DSNP', r'HMAX', r'ATVP', r'PCOK',
r'Remux',
# Codec
r'x264', r'x265', r'H\.?264', r'H\.?265', r'HEVC', r'AVC',
r'XviD', r'DivX', r'AV1', r'10bit',
# Audio
r'AAC', r'AC3'QBitClient class · python · L9-L140 (132 LOC)services/qbittorrent.py
class QBitClient:
"""Client for qBittorrent Web API v2."""
def __init__(self):
self._session = requests.Session()
self._authenticated = False
self._last_auth = 0
def _get_config(self) -> dict:
return {
"url": settings.get("qbit_url", "").rstrip("/"),
"username": settings.get("qbit_username", ""),
"password": settings.get("qbit_password", ""),
}
def _ensure_auth(self):
"""Authenticate if not already authenticated or session expired."""
if self._authenticated and (time.time() - self._last_auth) < 1800:
return
self.login()
def login(self) -> bool:
"""Login to qBittorrent. Returns True on success."""
cfg = self._get_config()
if not cfg["url"]:
raise ConnectionError("qBittorrent URL not configured")
try:
resp = self._session.post(
f"{cfg['url']}/api/v2/auth/login",
dataSame scanner, your repo: https://repobility.com — Repobility
__init__ method · python · L12-L15 (4 LOC)services/qbittorrent.py
def __init__(self):
self._session = requests.Session()
self._authenticated = False
self._last_auth = 0_get_config method · python · L17-L22 (6 LOC)services/qbittorrent.py
def _get_config(self) -> dict:
return {
"url": settings.get("qbit_url", "").rstrip("/"),
"username": settings.get("qbit_username", ""),
"password": settings.get("qbit_password", ""),
}_ensure_auth method · python · L24-L28 (5 LOC)services/qbittorrent.py
def _ensure_auth(self):
"""Authenticate if not already authenticated or session expired."""
if self._authenticated and (time.time() - self._last_auth) < 1800:
return
self.login()login method · python · L30-L49 (20 LOC)services/qbittorrent.py
def login(self) -> bool:
"""Login to qBittorrent. Returns True on success."""
cfg = self._get_config()
if not cfg["url"]:
raise ConnectionError("qBittorrent URL not configured")
try:
resp = self._session.post(
f"{cfg['url']}/api/v2/auth/login",
data={"username": cfg["username"], "password": cfg["password"]},
timeout=10,
)
if resp.text == "Ok.":
self._authenticated = True
self._last_auth = time.time()
return True
raise ConnectionError(f"Login failed: {resp.text}")
except requests.RequestException as e:
self._authenticated = False
raise ConnectionError(f"Cannot connect to qBittorrent: {e}")add_torrent method · python · L51-L72 (22 LOC)services/qbittorrent.py
def add_torrent(self, magnet_or_url: str, save_path: str = None,
category: str = "archiver") -> bool:
"""Add a torrent by magnet link or .torrent URL."""
self._ensure_auth()
cfg = self._get_config()
data = {"category": category}
if save_path:
data["savepath"] = save_path
# Determine if it's a magnet link or a torrent URL
if magnet_or_url.startswith("magnet:"):
data["urls"] = magnet_or_url
else:
data["urls"] = magnet_or_url
resp = self._session.post(
f"{cfg['url']}/api/v2/torrents/add",
data=data,
timeout=15,
)
return resp.status_code == 200 and resp.text == "Ok."get_torrent method · python · L74-L87 (14 LOC)services/qbittorrent.py
def get_torrent(self, info_hash: str) -> dict | None:
"""Get info for a specific torrent by hash."""
self._ensure_auth()
cfg = self._get_config()
resp = self._session.get(
f"{cfg['url']}/api/v2/torrents/info",
params={"hashes": info_hash.lower()},
timeout=10,
)
if resp.status_code != 200:
return None
torrents = resp.json()
return torrents[0] if torrents else Noneget_torrents method · python · L89-L101 (13 LOC)services/qbittorrent.py
def get_torrents(self, category: str = "archiver") -> list[dict]:
"""Get all torrents in a category."""
self._ensure_auth()
cfg = self._get_config()
resp = self._session.get(
f"{cfg['url']}/api/v2/torrents/info",
params={"category": category},
timeout=10,
)
if resp.status_code != 200:
return []
return resp.json()get_torrent_files method · python · L103-L115 (13 LOC)services/qbittorrent.py
def get_torrent_files(self, info_hash: str) -> list[dict]:
"""Get file list for a torrent."""
self._ensure_auth()
cfg = self._get_config()
resp = self._session.get(
f"{cfg['url']}/api/v2/torrents/files",
params={"hash": info_hash.lower()},
timeout=10,
)
if resp.status_code != 200:
return []
return resp.json()Repobility · code-quality intelligence platform · https://repobility.com
delete_torrent method · python · L117-L129 (13 LOC)services/qbittorrent.py
def delete_torrent(self, info_hash: str, delete_files: bool = False):
"""Delete a torrent."""
self._ensure_auth()
cfg = self._get_config()
self._session.post(
f"{cfg['url']}/api/v2/torrents/delete",
data={
"hashes": info_hash.lower(),
"deleteFiles": "true" if delete_files else "false",
},
timeout=10,
)get_version method · python · L131-L140 (10 LOC)services/qbittorrent.py
def get_version(self) -> str:
"""Get qBittorrent version string."""
self._ensure_auth()
cfg = self._get_config()
resp = self._session.get(
f"{cfg['url']}/api/v2/app/version",
timeout=10,
)
return resp.text if resp.status_code == 200 else "unknown"test_connection function · python · L143-L151 (9 LOC)services/qbittorrent.py
def test_connection() -> dict:
"""Test qBittorrent connection."""
try:
client = QBitClient()
client.login()
version = client.get_version()
return {"ok": True, "message": f"Connected - qBittorrent {version}"}
except Exception as e:
return {"ok": False, "message": str(e)}get_client function · python · L158-L162 (5 LOC)services/qbittorrent.py
def get_client() -> QBitClient:
global _client
if _client is None:
_client = QBitClient()
return _clientreset_client function · python · L165-L168 (4 LOC)services/qbittorrent.py
def reset_client():
"""Reset the client (e.g., after settings change)."""
global _client
_client = None_get_conn function · python · L7-L10 (4 LOC)services/queue.py
def _get_conn():
conn = sqlite3.connect(config.DB_PATH)
conn.row_factory = sqlite3.Row
return conninit_db function · python · L13-L26 (14 LOC)services/queue.py
def init_db():
"""Create transfer_queue table if it doesn't exist."""
with _get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS transfer_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
position INTEGER NOT NULL,
source_folder TEXT NOT NULL,
dest_name TEXT NOT NULL,
items TEXT NOT NULL,
added_at TEXT NOT NULL
)
""")
conn.commit()add function · python · L29-L45 (17 LOC)services/queue.py
def add(source_folder: str, dest_name: str, items: list[str]) -> int:
"""Add a folder to the transfer queue. Returns the entry ID."""
with _get_conn() as conn:
# Get next position
row = conn.execute(
"SELECT COALESCE(MAX(position), 0) + 1 AS next_pos FROM transfer_queue"
).fetchone()
pos = row["next_pos"]
cursor = conn.execute(
"""INSERT INTO transfer_queue (position, source_folder, dest_name, items, added_at)
VALUES (?, ?, ?, ?, ?)""",
(pos, source_folder, dest_name, json.dumps(items),
time.strftime("%Y-%m-%d %H:%M:%S")),
)
conn.commit()
return cursor.lastrowidRepobility (the analyzer behind this table) · https://repobility.com
get_all function · python · L48-L59 (12 LOC)services/queue.py
def get_all() -> list[dict]:
"""Get all queued items ordered by position."""
with _get_conn() as conn:
rows = conn.execute(
"SELECT * FROM transfer_queue ORDER BY position ASC"
).fetchall()
result = []
for row in rows:
d = dict(row)
d["items"] = json.loads(d["items"])
result.append(d)
return resultremove function · python · L62-L67 (6 LOC)services/queue.py
def remove(entry_id: int):
"""Remove an item from the queue."""
with _get_conn() as conn:
conn.execute("DELETE FROM transfer_queue WHERE id = ?", (entry_id,))
conn.commit()
_reindex(conn)pop_first function · python · L70-L83 (14 LOC)services/queue.py
def pop_first() -> dict | None:
"""Remove and return the first item in the queue."""
with _get_conn() as conn:
row = conn.execute(
"SELECT * FROM transfer_queue ORDER BY position ASC LIMIT 1"
).fetchone()
if not row:
return None
d = dict(row)
d["items"] = json.loads(d["items"])
conn.execute("DELETE FROM transfer_queue WHERE id = ?", (d["id"],))
conn.commit()
_reindex(conn)
return d