Function bodies 212 total
unique_path function · python · L10-L31 (22 LOC)archiving_app.py
def unique_path(base_path: str) -> str:
"""
If base_path exists, append _1, _2, ... until it doesn't.
Works for both files and folders.
"""
if not os.path.exists(base_path):
return base_path
parent = os.path.dirname(base_path)
name = os.path.basename(base_path)
stem = name
ext = ""
if os.path.isfile(base_path):
stem, ext = os.path.splitext(name)
i = 1
while True:
candidate = os.path.join(parent, f"{stem}_{i}{ext}")
if not os.path.exists(candidate):
return candidate
i += 1copy_item function · python · L34-L47 (14 LOC)archiving_app.py
def copy_item(src: str, dst: str):
"""
Copy a file or directory from src to dst.
- Files use copy2 (preserves metadata)
- Directories use copytree
"""
if os.path.isdir(src):
# copytree requires dst to not exist; ensure unique dst
dst = unique_path(dst)
shutil.copytree(src, dst)
else:
os.makedirs(os.path.dirname(dst), exist_ok=True)
dst = unique_path(dst)
shutil.copy2(src, dst)ScrollableFrame class · python · L50-L89 (40 LOC)archiving_app.py
class ScrollableFrame(ttk.Frame):
def __init__(self, parent):
super().__init__(parent)
self.canvas = tk.Canvas(self, highlightthickness=0)
self.scrollbar = ttk.Scrollbar(self, orient="vertical", command=self.canvas.yview)
self.inner = ttk.Frame(self.canvas)
self.inner.bind(
"<Configure>",
lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
)
self.window_id = self.canvas.create_window((0, 0), window=self.inner, anchor="nw")
self.canvas.configure(yscrollcommand=self.scrollbar.set)
self.canvas.pack(side="left", fill="both", expand=True)
self.scrollbar.pack(side="right", fill="y")
self.canvas.bind("<Configure>", self._on_canvas_configure)
# Mouse wheel support (Windows/macOS/Linux variations)
self.canvas.bind_all("<MouseWheel>", self._on_mousewheel) # Windows/macOS
self.canvas.bind_all("<Button-4>", self._on_mousewheel_linu__init__ method · python · L51-L74 (24 LOC)archiving_app.py
def __init__(self, parent):
super().__init__(parent)
self.canvas = tk.Canvas(self, highlightthickness=0)
self.scrollbar = ttk.Scrollbar(self, orient="vertical", command=self.canvas.yview)
self.inner = ttk.Frame(self.canvas)
self.inner.bind(
"<Configure>",
lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
)
self.window_id = self.canvas.create_window((0, 0), window=self.inner, anchor="nw")
self.canvas.configure(yscrollcommand=self.scrollbar.set)
self.canvas.pack(side="left", fill="both", expand=True)
self.scrollbar.pack(side="right", fill="y")
self.canvas.bind("<Configure>", self._on_canvas_configure)
# Mouse wheel support (Windows/macOS/Linux variations)
self.canvas.bind_all("<MouseWheel>", self._on_mousewheel) # Windows/macOS
self.canvas.bind_all("<Button-4>", self._on_mousewheel_linux) # Linux up
self.canvas_on_canvas_configure method · python · L76-L78 (3 LOC)archiving_app.py
def _on_canvas_configure(self, event):
# Keep inner frame width in sync with canvas width
self.canvas.itemconfigure(self.window_id, width=event.width)_on_mousewheel method · python · L80-L83 (4 LOC)archiving_app.py
def _on_mousewheel(self, event):
# Windows: event.delta is multiples of 120. macOS uses different scaling sometimes.
delta = -1 * int(event.delta / 120) if event.delta else 0
self.canvas.yview_scroll(delta, "units")_on_mousewheel_linux method · python · L85-L89 (5 LOC)archiving_app.py
def _on_mousewheel_linux(self, event):
if event.num == 4:
self.canvas.yview_scroll(-1, "units")
elif event.num == 5:
self.canvas.yview_scroll(1, "units")Repobility · open methodology · https://repobility.com/research/
CopyMoveApp class · python · L92-L430 (339 LOC)archiving_app.py
class CopyMoveApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("Folder Copy + Move Tool")
self.geometry("980x640")
self.src_dir = tk.StringVar()
self.dst_dir = tk.StringVar()
self.archive_dir = tk.StringVar()
self.selected_folder = tk.StringVar()
self.folder_name_entry = tk.StringVar()
self.item_vars = {} # name -> BooleanVar
self.item_widgets = [] # checkbuttons to destroy when reloading
self.worker_q = queue.Queue()
self.worker_thread = None
self.is_working = False
self._build_ui()
self._poll_worker_queue()
def _build_ui(self):
top = ttk.Frame(self, padding=10)
top.pack(fill="x")
# Directory pickers
ttk.Label(top, text="Source directory:").grid(row=0, column=0, sticky="w")
ttk.Entry(top, textvariable=self.src_dir, width=90).grid(row=0, column=1, sticky="we", padx=(8, 8))
ttk.Button(top, te__init__ method · python · L93-L113 (21 LOC)archiving_app.py
def __init__(self):
super().__init__()
self.title("Folder Copy + Move Tool")
self.geometry("980x640")
self.src_dir = tk.StringVar()
self.dst_dir = tk.StringVar()
self.archive_dir = tk.StringVar()
self.selected_folder = tk.StringVar()
self.folder_name_entry = tk.StringVar()
self.item_vars = {} # name -> BooleanVar
self.item_widgets = [] # checkbuttons to destroy when reloading
self.worker_q = queue.Queue()
self.worker_thread = None
self.is_working = False
self._build_ui()
self._poll_worker_queue()_build_ui method · python · L115-L197 (83 LOC)archiving_app.py
def _build_ui(self):
top = ttk.Frame(self, padding=10)
top.pack(fill="x")
# Directory pickers
ttk.Label(top, text="Source directory:").grid(row=0, column=0, sticky="w")
ttk.Entry(top, textvariable=self.src_dir, width=90).grid(row=0, column=1, sticky="we", padx=(8, 8))
ttk.Button(top, text="Browse…", command=self._browse_src).grid(row=0, column=2)
ttk.Label(top, text="Destination directory:").grid(row=1, column=0, sticky="w", pady=(6, 0))
ttk.Entry(top, textvariable=self.dst_dir, width=90).grid(row=1, column=1, sticky="we", padx=(8, 8), pady=(6, 0))
ttk.Button(top, text="Browse…", command=self._browse_dst).grid(row=1, column=2, pady=(6, 0))
ttk.Label(top, text="Move-to directory:").grid(row=2, column=0, sticky="w", pady=(6, 0))
ttk.Entry(top, textvariable=self.archive_dir, width=90).grid(row=2, column=1, sticky="we", padx=(8, 8), pady=(6, 0))
ttk.Button(top, text="Browse…", command=sel_browse_src method · python · L199-L203 (5 LOC)archiving_app.py
def _browse_src(self):
path = filedialog.askdirectory(title="Choose source directory")
if path:
self.src_dir.set(path)
self._refresh_folders()_browse_dst method · python · L205-L208 (4 LOC)archiving_app.py
def _browse_dst(self):
path = filedialog.askdirectory(title="Choose destination directory")
if path:
self.dst_dir.set(path)_browse_archive method · python · L210-L213 (4 LOC)archiving_app.py
def _browse_archive(self):
path = filedialog.askdirectory(title="Choose move-to directory")
if path:
self.archive_dir.set(path)_refresh_folders method · python · L215-L236 (22 LOC)archiving_app.py
def _refresh_folders(self):
src = self.src_dir.get().strip()
folders = []
if os.path.isdir(src):
try:
folders = sorted([d for d in os.listdir(src) if os.path.isdir(os.path.join(src, d))])
except Exception as e:
self._log(f"Error reading source directory: {e}")
else:
if src:
self._log("Source directory does not exist (yet).")
self.folder_combo["values"] = folders
# keep selection if still exists
current = self.selected_folder.get()
if current in folders:
self.selected_folder.set(current)
else:
self.selected_folder.set(folders[0] if folders else "")
self._load_folder_contents()_load_folder_contents method · python · L238-L283 (46 LOC)archiving_app.py
def _load_folder_contents(self):
# Clear old
for w in self.item_widgets:
w.destroy()
self.item_widgets.clear()
self.item_vars.clear()
folder = self.selected_folder.get().strip()
src = self.src_dir.get().strip()
if not folder or not os.path.isdir(src):
self.folder_name_entry.set("")
return
src_folder = os.path.join(src, folder)
if not os.path.isdir(src_folder):
self.folder_name_entry.set("")
return
# Put folder name into textbox
self.folder_name_entry.set(folder)
try:
items = sorted(os.listdir(src_folder))
except Exception as e:
self._log(f"Error listing folder '{folder}': {e}")
return
if not items:
lbl = ttk.Label(self.scroll.inner, text="(Folder is empty)")
lbl.pack(anchor="w", pady=4)
self.item_widgets.append(lbl)
return
Repobility — same analyzer, your code, free for public repos · /scan/
_set_all_checks method · python · L285-L287 (3 LOC)archiving_app.py
def _set_all_checks(self, value: bool):
for v in self.item_vars.values():
v.set(value)_validate_before_copy method · python · L289-L319 (31 LOC)archiving_app.py
def _validate_before_copy(self):
src = self.src_dir.get().strip()
dst = self.dst_dir.get().strip()
arc = self.archive_dir.get().strip()
folder = self.selected_folder.get().strip()
if not os.path.isdir(src):
messagebox.showerror("Missing source", "Please select a valid source directory.")
return None
if not os.path.isdir(dst):
messagebox.showerror("Missing destination", "Please select a valid destination directory.")
return None
if not os.path.isdir(arc):
messagebox.showerror("Missing move-to", "Please select a valid move-to directory.")
return None
if not folder:
messagebox.showerror("No folder selected", "Please choose a folder from the source directory.")
return None
src_folder = os.path.join(src, folder)
if not os.path.isdir(src_folder):
messagebox.showerror("Folder missing", f"Folder no long_start_copy method · python · L321-L340 (20 LOC)archiving_app.py
def _start_copy(self):
if self.is_working:
return
validated = self._validate_before_copy()
if not validated:
return
self.is_working = True
self.copy_btn.configure(state="disabled")
self.progress.start(10)
self.status.set("Copy in progress…")
self._log("---- START ----")
self.worker_thread = threading.Thread(
target=self._worker_copy_move,
args=validated,
daemon=True
)
self.worker_thread.start()_worker_copy_move method · python · L342-L394 (53 LOC)archiving_app.py
def _worker_copy_move(self, src, dst, arc, folder, new_name, checked_items):
"""
Worker thread:
- Create destination folder (unique if needed)
- Copy checked items into destination folder
- If all copies succeed, move entire source folder into arc (unique if needed)
"""
errors = []
src_folder = os.path.join(src, folder)
dst_folder = unique_path(os.path.join(dst, new_name))
arc_folder = unique_path(os.path.join(arc, folder))
try:
os.makedirs(dst_folder, exist_ok=False)
except Exception as e:
self.worker_q.put(("log", f"Failed to create destination folder: {dst_folder}\n{e}"))
self.worker_q.put(("done", False, "Failed to create destination folder."))
return
self.worker_q.put(("log", f"Destination folder created: {dst_folder}"))
# Copy checked items
for name in checked_items:
src_item = os.path.join(src__poll_worker_queue method · python · L396-L425 (30 LOC)archiving_app.py
def _poll_worker_queue(self):
try:
while True:
msg = self.worker_q.get_nowait()
kind = msg[0]
if kind == "log":
self._log(msg[1])
elif kind == "done":
success, final_msg = msg[1], msg[2]
self.progress.stop()
self.is_working = False
self.copy_btn.configure(state="normal")
self.status.set("Idle." if success else "Idle (last run had issues).")
if success:
self._log("---- DONE (SUCCESS) ----")
#messagebox.showinfo("Success", final_msg)
else:
self._log("---- DONE (FAILED) ----")
messagebox.showerror("Finished with issues", final_msg)
# Refresh folder list after run (folder may have moved)
self._refresh_folders_log method · python · L427-L430 (4 LOC)archiving_app.py
def _log(self, text: str):
ts = time.strftime("%H:%M:%S")
self.log.insert("end", f"[{ts}] {text}\n")
self.log.see("end")get_settings function · python · L12-L14 (3 LOC)routes/api.py
def get_settings():
dirs = settings.get_directories()
return jsonify(dirs)save_settings function · python · L18-L24 (7 LOC)routes/api.py
def save_settings():
data = request.get_json()
src = data.get("src_dir", "").strip()
dst = data.get("dst_dir", "").strip()
arc = data.get("archive_dir", "").strip()
settings.save_directories(src, dst, arc)
return jsonify({"status": "ok"})Same scanner, your repo: https://repobility.com — Repobility
browse_directory function · python · L28-L67 (40 LOC)routes/api.py
def browse_directory():
"""List subdirectories at a given path for the directory browser."""
path = request.args.get("path", "").strip()
# Default to showing drive roots on Windows or / on Unix
if not path:
if os.name == "nt":
import string
drives = []
for letter in string.ascii_uppercase:
drive = f"{letter}:\\"
if os.path.exists(drive):
drives.append(drive)
return jsonify({"path": "", "parent": "", "dirs": drives, "is_root": True})
else:
path = "/"
resolved = os.path.realpath(path)
if not os.path.isdir(resolved):
return jsonify({"error": "Not a valid directory"}), 400
parent = os.path.dirname(resolved)
if parent == resolved:
parent = "" # At root
try:
dirs = sorted(
d for d in os.listdir(resolved)
if os.path.isdir(os.path.join(resolved, d))
and not d.startswitlist_folders function · python · L71-L74 (4 LOC)routes/api.py
def list_folders():
src = settings.get("src_dir")
folders = file_ops.list_subdirs(src)
return jsonify({"src_dir": src, "folders": folders})list_folder_items function · python · L78-L105 (28 LOC)routes/api.py
def list_folder_items(name):
src = settings.get("src_dir")
folder_path = os.path.join(src, name)
# Path traversal protection
real_src = os.path.realpath(src)
real_folder = os.path.realpath(folder_path)
if not real_folder.startswith(real_src):
return jsonify({"error": "Invalid path"}), 400
items = file_ops.list_items(folder_path)
# Add media type info
for item in items:
if item["is_dir"]:
item["media_type"] = "folder"
item["icon"] = media.MEDIA_ICONS["folder"]
else:
mt = media.get_media_type(item["name"])
item["media_type"] = mt
item["icon"] = media.MEDIA_ICONS.get(mt, media.MEDIA_ICONS["other"])
total_size = sum(i["size"] for i in items)
return jsonify({
"items": items,
"total_size": total_size,
"total_size_formatted": file_ops.format_size(total_size),
})start_transfer function · python · L109-L155 (47 LOC)routes/api.py
def start_transfer():
if transfer_manager is None:
return jsonify({"error": "Transfer manager not initialized"}), 500
data = request.get_json()
src_dir = settings.get("src_dir")
dst_dir = settings.get("dst_dir")
archive_dir = settings.get("archive_dir")
folder_name = data.get("folder", "").strip()
dest_name = data.get("dest_name", "").strip() or folder_name
selected_items = data.get("items", [])
if not all([src_dir, dst_dir, archive_dir, folder_name, selected_items]):
return jsonify({"error": "Missing required fields"}), 400
# Validate directories exist
for label, path in [("Source", src_dir), ("Destination", dst_dir),
("Archive", archive_dir)]:
if not os.path.isdir(path):
return jsonify({"error": f"{label} directory does not exist: {path}"}), 400
src_folder = os.path.join(src_dir, folder_name)
if not os.path.isdir(src_folder):
return jsonify({"error": f"Source fget_active_transfer function · python · L159-L162 (4 LOC)routes/api.py
def get_active_transfer():
if transfer_manager is None or not transfer_manager.is_active:
return jsonify({"active": False})
return jsonify({"active": True, **transfer_manager.active_transfer})cancel_transfer function · python · L166-L170 (5 LOC)routes/api.py
def cancel_transfer(transfer_id):
if transfer_manager is None:
return jsonify({"error": "No transfer manager"}), 500
transfer_manager.cancel()
return jsonify({"status": "cancelling"})get_history function · python · L174-L182 (9 LOC)routes/api.py
def get_history():
entries = history.get_all()
for entry in entries:
entry["bytes_total_formatted"] = file_ops.format_size(
entry.get("bytes_total", 0)
)
if entry.get("duration_sec"):
entry["duration_formatted"] = f"{entry['duration_sec']:.1f}s"
return jsonify(entries)delete_history_entry function · python · L186-L188 (3 LOC)routes/api.py
def delete_history_entry(entry_id):
history.delete_entry(entry_id)
return jsonify({"status": "ok"})Want this analysis on your repo? https://repobility.com/scan/
get_disk_usage function · python · L192-L198 (7 LOC)routes/api.py
def get_disk_usage():
dst = settings.get("dst_dir")
arc = settings.get("archive_dir")
return jsonify({
"destination": disk.get_disk_usage(dst),
"archive": disk.get_disk_usage(arc),
})suggest_name function · python · L202-L204 (3 LOC)routes/api.py
def suggest_name(folder):
suggestion = media.suggest_jellyfin_name(folder)
return jsonify({"original": folder, "suggestion": suggestion})get_queue function · python · L210-L212 (3 LOC)routes/api.py
def get_queue():
items = queue.get_all()
return jsonify(items)add_to_queue function · python · L216-L226 (11 LOC)routes/api.py
def add_to_queue():
data = request.get_json()
source_folder = data.get("source_folder", "").strip()
dest_name = data.get("dest_name", "").strip() or source_folder
items = data.get("items", [])
if not source_folder or not items:
return jsonify({"error": "Missing source_folder or items"}), 400
entry_id = queue.add(source_folder, dest_name, items)
return jsonify({"id": entry_id, "status": "queued"})remove_from_queue function · python · L230-L232 (3 LOC)routes/api.py
def remove_from_queue(entry_id):
queue.remove(entry_id)
return jsonify({"status": "ok"})clear_queue function · python · L236-L238 (3 LOC)routes/api.py
def clear_queue():
queue.clear()
return jsonify({"status": "ok"})reorder_queue function · python · L242-L247 (6 LOC)routes/api.py
def reorder_queue():
data = request.get_json()
ordered_ids = data.get("ids", [])
if ordered_ids:
queue.reorder(ordered_ids)
return jsonify({"status": "ok"})process_queue function · python · L251-L263 (13 LOC)routes/api.py
def process_queue():
if transfer_manager is None:
return jsonify({"error": "Transfer manager not initialized"}), 500
src_dir = settings.get("src_dir")
dst_dir = settings.get("dst_dir")
archive_dir = settings.get("archive_dir")
if not all([src_dir, dst_dir, archive_dir]):
return jsonify({"error": "Directories not configured"}), 400
result = transfer_manager.process_queue(src_dir, dst_dir, archive_dir)
return jsonify(result)Repobility · open methodology · https://repobility.com/research/
get_torrent_settings function · python · L17-L41 (25 LOC)routes/torrent_api.py
def get_torrent_settings():
keys = [
"torznab_url", "torznab_api_key",
"qbit_url", "qbit_username", "qbit_password",
"ollama_url", "ollama_model",
"torrent_download_dir", "auto_verify",
"quality_pref", "min_seeders",
"tmdb_api_key", "omdb_api_key",
"tv_retry_interval_hours", "tv_max_retries",
"tv_stall_timeout_minutes", "tv_pack_seeder_multiplier",
]
defaults = {
"ollama_url": "http://ollama.ollamaserver-shared.svc.cluster.local:11434",
"ollama_model": "gemma3:12b",
"auto_verify": "true",
"min_seeders": "3",
"tv_retry_interval_hours": "6",
"tv_max_retries": "10",
"tv_stall_timeout_minutes": "30",
"tv_pack_seeder_multiplier": "2",
}
result = {}
for k in keys:
result[k] = settings.get(k, defaults.get(k, ""))
return jsonify(result)save_torrent_settings function · python · L45-L62 (18 LOC)routes/torrent_api.py
def save_torrent_settings():
data = request.get_json()
allowed_keys = {
"torznab_url", "torznab_api_key",
"qbit_url", "qbit_username", "qbit_password",
"ollama_url", "ollama_model",
"torrent_download_dir", "auto_verify",
"quality_pref", "min_seeders",
"tmdb_api_key", "omdb_api_key",
"tv_retry_interval_hours", "tv_max_retries",
"tv_stall_timeout_minutes", "tv_pack_seeder_multiplier",
}
for k, v in data.items():
if k in allowed_keys:
settings.set(k, str(v).strip())
# Reset qBit client so it picks up new credentials
qbittorrent.reset_client()
return jsonify({"status": "ok"})search_torrents function · python · L85-L93 (9 LOC)routes/torrent_api.py
def search_torrents():
data = request.get_json()
query = data.get("query", "").strip()
category = data.get("category", "movies")
if not query:
return jsonify({"error": "Query is required"}), 400
result = search.create_search_session(query, category)
return jsonify(result)start_download function · python · L104-L130 (27 LOC)routes/torrent_api.py
def start_download():
if torrent_manager is None:
return jsonify({"error": "Torrent manager not initialized"}), 500
data = request.get_json()
required = ["torrent_title", "expected_title", "magnet_or_url"]
for field in required:
if not data.get(field):
return jsonify({"error": f"Missing required field: {field}"}), 400
result = torrent_manager.add_download(
search_id=data.get("search_id"),
torrent_title=data["torrent_title"],
expected_title=data["expected_title"],
magnet_or_url=data["magnet_or_url"],
info_hash=data.get("info_hash"),
size_bytes=data.get("size_bytes", 0),
seeders=data.get("seeders", 0),
leechers=data.get("leechers", 0),
indexer=data.get("indexer"),
quality=data.get("quality"),
)
if "error" in result:
return jsonify(result), 500
return jsonify(result)list_downloads function · python · L134-L139 (6 LOC)routes/torrent_api.py
def list_downloads():
status = request.args.get("status")
downloads = torrent_db.get_downloads(limit=100, status=status)
for dl in downloads:
dl["size_formatted"] = format_size(dl.get("size_bytes", 0))
return jsonify(downloads)get_download function · python · L143-L148 (6 LOC)routes/torrent_api.py
def get_download(download_id):
dl = torrent_db.get_download(download_id)
if not dl:
return jsonify({"error": "Not found"}), 404
dl["size_formatted"] = format_size(dl.get("size_bytes", 0))
return jsonify(dl)cancel_download function · python · L152-L157 (6 LOC)routes/torrent_api.py
def cancel_download(download_id):
if torrent_manager is None:
return jsonify({"error": "Torrent manager not initialized"}), 500
delete_files = request.args.get("delete_files", "false") == "true"
result = torrent_manager.cancel_download(download_id, delete_files)
return jsonify(result)trigger_verification function · python · L163-L177 (15 LOC)routes/torrent_api.py
def trigger_verification(download_id):
import threading
dl = torrent_db.get_download(download_id)
if not dl:
return jsonify({"error": "Not found"}), 404
from flask import current_app
socketio = current_app.extensions.get("socketio")
thread = threading.Thread(
target=verify.run_verification,
args=(download_id, socketio),
daemon=True,
)
thread.start()
return jsonify({"status": "verification_started", "download_id": download_id})Repobility — same analyzer, your code, free for public repos · /scan/
get_verification function · python · L181-L190 (10 LOC)routes/torrent_api.py
def get_verification(download_id):
results = torrent_db.get_verifications(download_id)
dl = torrent_db.get_download(download_id)
return jsonify({
"download_id": download_id,
"verification_status": dl["verification_status"] if dl else None,
"verification_score": dl["verification_score"] if dl else None,
"verification_notes": dl["verification_notes"] if dl else None,
"results": results,
})approve_download function · python · L194-L198 (5 LOC)routes/torrent_api.py
def approve_download(download_id):
data = request.get_json()
approved = data.get("approved", True)
verify.approve_manually(download_id, approved)
return jsonify({"status": "approved" if approved else "rejected"})serve_thumbnail function · python · L204-L208 (5 LOC)routes/torrent_api.py
def serve_thumbnail(download_id, filename):
thumb_dir = os.path.join(config.DATA_DIR, "thumbnails", str(download_id))
if not os.path.isdir(thumb_dir):
return jsonify({"error": "Not found"}), 404
return send_from_directory(thumb_dir, filename)page 1 / 5next ›