← back to drewscreations__archiving-app

Function bodies 212 total

All specs Real LLM only Function bodies
unique_path function · python · L10-L31 (22 LOC)
archiving_app.py
def unique_path(base_path: str) -> str:
    """
    If base_path exists, append _1, _2, ... until it doesn't.
    Works for both files and folders.
    """
    if not os.path.exists(base_path):
        return base_path

    parent = os.path.dirname(base_path)
    name = os.path.basename(base_path)

    stem = name
    ext = ""
    if os.path.isfile(base_path):
        stem, ext = os.path.splitext(name)

    i = 1
    while True:
        candidate = os.path.join(parent, f"{stem}_{i}{ext}")
        if not os.path.exists(candidate):
            return candidate
        i += 1
copy_item function · python · L34-L47 (14 LOC)
archiving_app.py
def copy_item(src: str, dst: str):
    """
    Copy a file or directory from src to dst.
    - Files use copy2 (preserves metadata)
    - Directories use copytree
    """
    if os.path.isdir(src):
        # copytree requires dst to not exist; ensure unique dst
        dst = unique_path(dst)
        shutil.copytree(src, dst)
    else:
        os.makedirs(os.path.dirname(dst), exist_ok=True)
        dst = unique_path(dst)
        shutil.copy2(src, dst)
ScrollableFrame class · python · L50-L89 (40 LOC)
archiving_app.py
class ScrollableFrame(ttk.Frame):
    def __init__(self, parent):
        super().__init__(parent)

        self.canvas = tk.Canvas(self, highlightthickness=0)
        self.scrollbar = ttk.Scrollbar(self, orient="vertical", command=self.canvas.yview)
        self.inner = ttk.Frame(self.canvas)

        self.inner.bind(
            "<Configure>",
            lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
        )

        self.window_id = self.canvas.create_window((0, 0), window=self.inner, anchor="nw")
        self.canvas.configure(yscrollcommand=self.scrollbar.set)

        self.canvas.pack(side="left", fill="both", expand=True)
        self.scrollbar.pack(side="right", fill="y")

        self.canvas.bind("<Configure>", self._on_canvas_configure)

        # Mouse wheel support (Windows/macOS/Linux variations)
        self.canvas.bind_all("<MouseWheel>", self._on_mousewheel)      # Windows/macOS
        self.canvas.bind_all("<Button-4>", self._on_mousewheel_linu
__init__ method · python · L51-L74 (24 LOC)
archiving_app.py
    def __init__(self, parent):
        super().__init__(parent)

        self.canvas = tk.Canvas(self, highlightthickness=0)
        self.scrollbar = ttk.Scrollbar(self, orient="vertical", command=self.canvas.yview)
        self.inner = ttk.Frame(self.canvas)

        self.inner.bind(
            "<Configure>",
            lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
        )

        self.window_id = self.canvas.create_window((0, 0), window=self.inner, anchor="nw")
        self.canvas.configure(yscrollcommand=self.scrollbar.set)

        self.canvas.pack(side="left", fill="both", expand=True)
        self.scrollbar.pack(side="right", fill="y")

        self.canvas.bind("<Configure>", self._on_canvas_configure)

        # Mouse wheel support (Windows/macOS/Linux variations)
        self.canvas.bind_all("<MouseWheel>", self._on_mousewheel)      # Windows/macOS
        self.canvas.bind_all("<Button-4>", self._on_mousewheel_linux)  # Linux up
        self.canvas
_on_canvas_configure method · python · L76-L78 (3 LOC)
archiving_app.py
    def _on_canvas_configure(self, event):
        # Keep inner frame width in sync with canvas width
        self.canvas.itemconfigure(self.window_id, width=event.width)
_on_mousewheel method · python · L80-L83 (4 LOC)
archiving_app.py
    def _on_mousewheel(self, event):
        # Windows: event.delta is multiples of 120. macOS uses different scaling sometimes.
        delta = -1 * int(event.delta / 120) if event.delta else 0
        self.canvas.yview_scroll(delta, "units")
_on_mousewheel_linux method · python · L85-L89 (5 LOC)
archiving_app.py
    def _on_mousewheel_linux(self, event):
        if event.num == 4:
            self.canvas.yview_scroll(-1, "units")
        elif event.num == 5:
            self.canvas.yview_scroll(1, "units")
Repobility · open methodology · https://repobility.com/research/
CopyMoveApp class · python · L92-L430 (339 LOC)
archiving_app.py
class CopyMoveApp(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Folder Copy + Move Tool")
        self.geometry("980x640")

        self.src_dir = tk.StringVar()
        self.dst_dir = tk.StringVar()
        self.archive_dir = tk.StringVar()

        self.selected_folder = tk.StringVar()
        self.folder_name_entry = tk.StringVar()

        self.item_vars = {}  # name -> BooleanVar
        self.item_widgets = []  # checkbuttons to destroy when reloading

        self.worker_q = queue.Queue()
        self.worker_thread = None
        self.is_working = False

        self._build_ui()
        self._poll_worker_queue()

    def _build_ui(self):
        top = ttk.Frame(self, padding=10)
        top.pack(fill="x")

        # Directory pickers
        ttk.Label(top, text="Source directory:").grid(row=0, column=0, sticky="w")
        ttk.Entry(top, textvariable=self.src_dir, width=90).grid(row=0, column=1, sticky="we", padx=(8, 8))
        ttk.Button(top, te
__init__ method · python · L93-L113 (21 LOC)
archiving_app.py
    def __init__(self):
        super().__init__()
        self.title("Folder Copy + Move Tool")
        self.geometry("980x640")

        self.src_dir = tk.StringVar()
        self.dst_dir = tk.StringVar()
        self.archive_dir = tk.StringVar()

        self.selected_folder = tk.StringVar()
        self.folder_name_entry = tk.StringVar()

        self.item_vars = {}  # name -> BooleanVar
        self.item_widgets = []  # checkbuttons to destroy when reloading

        self.worker_q = queue.Queue()
        self.worker_thread = None
        self.is_working = False

        self._build_ui()
        self._poll_worker_queue()
_build_ui method · python · L115-L197 (83 LOC)
archiving_app.py
    def _build_ui(self):
        top = ttk.Frame(self, padding=10)
        top.pack(fill="x")

        # Directory pickers
        ttk.Label(top, text="Source directory:").grid(row=0, column=0, sticky="w")
        ttk.Entry(top, textvariable=self.src_dir, width=90).grid(row=0, column=1, sticky="we", padx=(8, 8))
        ttk.Button(top, text="Browse…", command=self._browse_src).grid(row=0, column=2)

        ttk.Label(top, text="Destination directory:").grid(row=1, column=0, sticky="w", pady=(6, 0))
        ttk.Entry(top, textvariable=self.dst_dir, width=90).grid(row=1, column=1, sticky="we", padx=(8, 8), pady=(6, 0))
        ttk.Button(top, text="Browse…", command=self._browse_dst).grid(row=1, column=2, pady=(6, 0))

        ttk.Label(top, text="Move-to directory:").grid(row=2, column=0, sticky="w", pady=(6, 0))
        ttk.Entry(top, textvariable=self.archive_dir, width=90).grid(row=2, column=1, sticky="we", padx=(8, 8), pady=(6, 0))
        ttk.Button(top, text="Browse…", command=sel
_browse_src method · python · L199-L203 (5 LOC)
archiving_app.py
    def _browse_src(self):
        path = filedialog.askdirectory(title="Choose source directory")
        if path:
            self.src_dir.set(path)
            self._refresh_folders()
_browse_dst method · python · L205-L208 (4 LOC)
archiving_app.py
    def _browse_dst(self):
        path = filedialog.askdirectory(title="Choose destination directory")
        if path:
            self.dst_dir.set(path)
_browse_archive method · python · L210-L213 (4 LOC)
archiving_app.py
    def _browse_archive(self):
        path = filedialog.askdirectory(title="Choose move-to directory")
        if path:
            self.archive_dir.set(path)
_refresh_folders method · python · L215-L236 (22 LOC)
archiving_app.py
    def _refresh_folders(self):
        src = self.src_dir.get().strip()
        folders = []
        if os.path.isdir(src):
            try:
                folders = sorted([d for d in os.listdir(src) if os.path.isdir(os.path.join(src, d))])
            except Exception as e:
                self._log(f"Error reading source directory: {e}")
        else:
            if src:
                self._log("Source directory does not exist (yet).")

        self.folder_combo["values"] = folders

        # keep selection if still exists
        current = self.selected_folder.get()
        if current in folders:
            self.selected_folder.set(current)
        else:
            self.selected_folder.set(folders[0] if folders else "")

        self._load_folder_contents()
_load_folder_contents method · python · L238-L283 (46 LOC)
archiving_app.py
    def _load_folder_contents(self):
        # Clear old
        for w in self.item_widgets:
            w.destroy()
        self.item_widgets.clear()
        self.item_vars.clear()

        folder = self.selected_folder.get().strip()
        src = self.src_dir.get().strip()

        if not folder or not os.path.isdir(src):
            self.folder_name_entry.set("")
            return

        src_folder = os.path.join(src, folder)
        if not os.path.isdir(src_folder):
            self.folder_name_entry.set("")
            return

        # Put folder name into textbox
        self.folder_name_entry.set(folder)

        try:
            items = sorted(os.listdir(src_folder))
        except Exception as e:
            self._log(f"Error listing folder '{folder}': {e}")
            return

        if not items:
            lbl = ttk.Label(self.scroll.inner, text="(Folder is empty)")
            lbl.pack(anchor="w", pady=4)
            self.item_widgets.append(lbl)
            return

Repobility — same analyzer, your code, free for public repos · /scan/
_set_all_checks method · python · L285-L287 (3 LOC)
archiving_app.py
    def _set_all_checks(self, value: bool):
        for v in self.item_vars.values():
            v.set(value)
_validate_before_copy method · python · L289-L319 (31 LOC)
archiving_app.py
    def _validate_before_copy(self):
        src = self.src_dir.get().strip()
        dst = self.dst_dir.get().strip()
        arc = self.archive_dir.get().strip()
        folder = self.selected_folder.get().strip()

        if not os.path.isdir(src):
            messagebox.showerror("Missing source", "Please select a valid source directory.")
            return None
        if not os.path.isdir(dst):
            messagebox.showerror("Missing destination", "Please select a valid destination directory.")
            return None
        if not os.path.isdir(arc):
            messagebox.showerror("Missing move-to", "Please select a valid move-to directory.")
            return None
        if not folder:
            messagebox.showerror("No folder selected", "Please choose a folder from the source directory.")
            return None

        src_folder = os.path.join(src, folder)
        if not os.path.isdir(src_folder):
            messagebox.showerror("Folder missing", f"Folder no long
_start_copy method · python · L321-L340 (20 LOC)
archiving_app.py
    def _start_copy(self):
        if self.is_working:
            return

        validated = self._validate_before_copy()
        if not validated:
            return

        self.is_working = True
        self.copy_btn.configure(state="disabled")
        self.progress.start(10)
        self.status.set("Copy in progress…")
        self._log("---- START ----")

        self.worker_thread = threading.Thread(
            target=self._worker_copy_move,
            args=validated,
            daemon=True
        )
        self.worker_thread.start()
_worker_copy_move method · python · L342-L394 (53 LOC)
archiving_app.py
    def _worker_copy_move(self, src, dst, arc, folder, new_name, checked_items):
        """
        Worker thread:
        - Create destination folder (unique if needed)
        - Copy checked items into destination folder
        - If all copies succeed, move entire source folder into arc (unique if needed)
        """
        errors = []

        src_folder = os.path.join(src, folder)
        dst_folder = unique_path(os.path.join(dst, new_name))
        arc_folder = unique_path(os.path.join(arc, folder))

        try:
            os.makedirs(dst_folder, exist_ok=False)
        except Exception as e:
            self.worker_q.put(("log", f"Failed to create destination folder: {dst_folder}\n{e}"))
            self.worker_q.put(("done", False, "Failed to create destination folder."))
            return

        self.worker_q.put(("log", f"Destination folder created: {dst_folder}"))

        # Copy checked items
        for name in checked_items:
            src_item = os.path.join(src_
_poll_worker_queue method · python · L396-L425 (30 LOC)
archiving_app.py
    def _poll_worker_queue(self):
        try:
            while True:
                msg = self.worker_q.get_nowait()
                kind = msg[0]

                if kind == "log":
                    self._log(msg[1])

                elif kind == "done":
                    success, final_msg = msg[1], msg[2]
                    self.progress.stop()
                    self.is_working = False
                    self.copy_btn.configure(state="normal")
                    self.status.set("Idle." if success else "Idle (last run had issues).")

                    if success:
                        self._log("---- DONE (SUCCESS) ----")
                        #messagebox.showinfo("Success", final_msg)
                    else:
                        self._log("---- DONE (FAILED) ----")
                        messagebox.showerror("Finished with issues", final_msg)

                    # Refresh folder list after run (folder may have moved)
                    self._refresh_folders
_log method · python · L427-L430 (4 LOC)
archiving_app.py
    def _log(self, text: str):
        ts = time.strftime("%H:%M:%S")
        self.log.insert("end", f"[{ts}] {text}\n")
        self.log.see("end")
get_settings function · python · L12-L14 (3 LOC)
routes/api.py
def get_settings():
    dirs = settings.get_directories()
    return jsonify(dirs)
save_settings function · python · L18-L24 (7 LOC)
routes/api.py
def save_settings():
    data = request.get_json()
    src = data.get("src_dir", "").strip()
    dst = data.get("dst_dir", "").strip()
    arc = data.get("archive_dir", "").strip()
    settings.save_directories(src, dst, arc)
    return jsonify({"status": "ok"})
Same scanner, your repo: https://repobility.com — Repobility
browse_directory function · python · L28-L67 (40 LOC)
routes/api.py
def browse_directory():
    """List subdirectories at a given path for the directory browser."""
    path = request.args.get("path", "").strip()

    # Default to showing drive roots on Windows or / on Unix
    if not path:
        if os.name == "nt":
            import string
            drives = []
            for letter in string.ascii_uppercase:
                drive = f"{letter}:\\"
                if os.path.exists(drive):
                    drives.append(drive)
            return jsonify({"path": "", "parent": "", "dirs": drives, "is_root": True})
        else:
            path = "/"

    resolved = os.path.realpath(path)
    if not os.path.isdir(resolved):
        return jsonify({"error": "Not a valid directory"}), 400

    parent = os.path.dirname(resolved)
    if parent == resolved:
        parent = ""  # At root

    try:
        dirs = sorted(
            d for d in os.listdir(resolved)
            if os.path.isdir(os.path.join(resolved, d))
            and not d.startswit
list_folders function · python · L71-L74 (4 LOC)
routes/api.py
def list_folders():
    src = settings.get("src_dir")
    folders = file_ops.list_subdirs(src)
    return jsonify({"src_dir": src, "folders": folders})
list_folder_items function · python · L78-L105 (28 LOC)
routes/api.py
def list_folder_items(name):
    src = settings.get("src_dir")
    folder_path = os.path.join(src, name)

    # Path traversal protection
    real_src = os.path.realpath(src)
    real_folder = os.path.realpath(folder_path)
    if not real_folder.startswith(real_src):
        return jsonify({"error": "Invalid path"}), 400

    items = file_ops.list_items(folder_path)

    # Add media type info
    for item in items:
        if item["is_dir"]:
            item["media_type"] = "folder"
            item["icon"] = media.MEDIA_ICONS["folder"]
        else:
            mt = media.get_media_type(item["name"])
            item["media_type"] = mt
            item["icon"] = media.MEDIA_ICONS.get(mt, media.MEDIA_ICONS["other"])

    total_size = sum(i["size"] for i in items)
    return jsonify({
        "items": items,
        "total_size": total_size,
        "total_size_formatted": file_ops.format_size(total_size),
    })
start_transfer function · python · L109-L155 (47 LOC)
routes/api.py
def start_transfer():
    if transfer_manager is None:
        return jsonify({"error": "Transfer manager not initialized"}), 500

    data = request.get_json()
    src_dir = settings.get("src_dir")
    dst_dir = settings.get("dst_dir")
    archive_dir = settings.get("archive_dir")

    folder_name = data.get("folder", "").strip()
    dest_name = data.get("dest_name", "").strip() or folder_name
    selected_items = data.get("items", [])

    if not all([src_dir, dst_dir, archive_dir, folder_name, selected_items]):
        return jsonify({"error": "Missing required fields"}), 400

    # Validate directories exist
    for label, path in [("Source", src_dir), ("Destination", dst_dir),
                        ("Archive", archive_dir)]:
        if not os.path.isdir(path):
            return jsonify({"error": f"{label} directory does not exist: {path}"}), 400

    src_folder = os.path.join(src_dir, folder_name)
    if not os.path.isdir(src_folder):
        return jsonify({"error": f"Source f
get_active_transfer function · python · L159-L162 (4 LOC)
routes/api.py
def get_active_transfer():
    if transfer_manager is None or not transfer_manager.is_active:
        return jsonify({"active": False})
    return jsonify({"active": True, **transfer_manager.active_transfer})
cancel_transfer function · python · L166-L170 (5 LOC)
routes/api.py
def cancel_transfer(transfer_id):
    if transfer_manager is None:
        return jsonify({"error": "No transfer manager"}), 500
    transfer_manager.cancel()
    return jsonify({"status": "cancelling"})
get_history function · python · L174-L182 (9 LOC)
routes/api.py
def get_history():
    entries = history.get_all()
    for entry in entries:
        entry["bytes_total_formatted"] = file_ops.format_size(
            entry.get("bytes_total", 0)
        )
        if entry.get("duration_sec"):
            entry["duration_formatted"] = f"{entry['duration_sec']:.1f}s"
    return jsonify(entries)
delete_history_entry function · python · L186-L188 (3 LOC)
routes/api.py
def delete_history_entry(entry_id):
    history.delete_entry(entry_id)
    return jsonify({"status": "ok"})
Want this analysis on your repo? https://repobility.com/scan/
get_disk_usage function · python · L192-L198 (7 LOC)
routes/api.py
def get_disk_usage():
    dst = settings.get("dst_dir")
    arc = settings.get("archive_dir")
    return jsonify({
        "destination": disk.get_disk_usage(dst),
        "archive": disk.get_disk_usage(arc),
    })
suggest_name function · python · L202-L204 (3 LOC)
routes/api.py
def suggest_name(folder):
    suggestion = media.suggest_jellyfin_name(folder)
    return jsonify({"original": folder, "suggestion": suggestion})
get_queue function · python · L210-L212 (3 LOC)
routes/api.py
def get_queue():
    items = queue.get_all()
    return jsonify(items)
add_to_queue function · python · L216-L226 (11 LOC)
routes/api.py
def add_to_queue():
    data = request.get_json()
    source_folder = data.get("source_folder", "").strip()
    dest_name = data.get("dest_name", "").strip() or source_folder
    items = data.get("items", [])

    if not source_folder or not items:
        return jsonify({"error": "Missing source_folder or items"}), 400

    entry_id = queue.add(source_folder, dest_name, items)
    return jsonify({"id": entry_id, "status": "queued"})
remove_from_queue function · python · L230-L232 (3 LOC)
routes/api.py
def remove_from_queue(entry_id):
    queue.remove(entry_id)
    return jsonify({"status": "ok"})
clear_queue function · python · L236-L238 (3 LOC)
routes/api.py
def clear_queue():
    queue.clear()
    return jsonify({"status": "ok"})
reorder_queue function · python · L242-L247 (6 LOC)
routes/api.py
def reorder_queue():
    data = request.get_json()
    ordered_ids = data.get("ids", [])
    if ordered_ids:
        queue.reorder(ordered_ids)
    return jsonify({"status": "ok"})
process_queue function · python · L251-L263 (13 LOC)
routes/api.py
def process_queue():
    if transfer_manager is None:
        return jsonify({"error": "Transfer manager not initialized"}), 500

    src_dir = settings.get("src_dir")
    dst_dir = settings.get("dst_dir")
    archive_dir = settings.get("archive_dir")

    if not all([src_dir, dst_dir, archive_dir]):
        return jsonify({"error": "Directories not configured"}), 400

    result = transfer_manager.process_queue(src_dir, dst_dir, archive_dir)
    return jsonify(result)
Repobility · open methodology · https://repobility.com/research/
get_torrent_settings function · python · L17-L41 (25 LOC)
routes/torrent_api.py
def get_torrent_settings():
    keys = [
        "torznab_url", "torznab_api_key",
        "qbit_url", "qbit_username", "qbit_password",
        "ollama_url", "ollama_model",
        "torrent_download_dir", "auto_verify",
        "quality_pref", "min_seeders",
        "tmdb_api_key", "omdb_api_key",
        "tv_retry_interval_hours", "tv_max_retries",
        "tv_stall_timeout_minutes", "tv_pack_seeder_multiplier",
    ]
    defaults = {
        "ollama_url": "http://ollama.ollamaserver-shared.svc.cluster.local:11434",
        "ollama_model": "gemma3:12b",
        "auto_verify": "true",
        "min_seeders": "3",
        "tv_retry_interval_hours": "6",
        "tv_max_retries": "10",
        "tv_stall_timeout_minutes": "30",
        "tv_pack_seeder_multiplier": "2",
    }
    result = {}
    for k in keys:
        result[k] = settings.get(k, defaults.get(k, ""))
    return jsonify(result)
save_torrent_settings function · python · L45-L62 (18 LOC)
routes/torrent_api.py
def save_torrent_settings():
    data = request.get_json()
    allowed_keys = {
        "torznab_url", "torznab_api_key",
        "qbit_url", "qbit_username", "qbit_password",
        "ollama_url", "ollama_model",
        "torrent_download_dir", "auto_verify",
        "quality_pref", "min_seeders",
        "tmdb_api_key", "omdb_api_key",
        "tv_retry_interval_hours", "tv_max_retries",
        "tv_stall_timeout_minutes", "tv_pack_seeder_multiplier",
    }
    for k, v in data.items():
        if k in allowed_keys:
            settings.set(k, str(v).strip())
    # Reset qBit client so it picks up new credentials
    qbittorrent.reset_client()
    return jsonify({"status": "ok"})
search_torrents function · python · L85-L93 (9 LOC)
routes/torrent_api.py
def search_torrents():
    data = request.get_json()
    query = data.get("query", "").strip()
    category = data.get("category", "movies")
    if not query:
        return jsonify({"error": "Query is required"}), 400

    result = search.create_search_session(query, category)
    return jsonify(result)
start_download function · python · L104-L130 (27 LOC)
routes/torrent_api.py
def start_download():
    if torrent_manager is None:
        return jsonify({"error": "Torrent manager not initialized"}), 500

    data = request.get_json()
    required = ["torrent_title", "expected_title", "magnet_or_url"]
    for field in required:
        if not data.get(field):
            return jsonify({"error": f"Missing required field: {field}"}), 400

    result = torrent_manager.add_download(
        search_id=data.get("search_id"),
        torrent_title=data["torrent_title"],
        expected_title=data["expected_title"],
        magnet_or_url=data["magnet_or_url"],
        info_hash=data.get("info_hash"),
        size_bytes=data.get("size_bytes", 0),
        seeders=data.get("seeders", 0),
        leechers=data.get("leechers", 0),
        indexer=data.get("indexer"),
        quality=data.get("quality"),
    )

    if "error" in result:
        return jsonify(result), 500

    return jsonify(result)
list_downloads function · python · L134-L139 (6 LOC)
routes/torrent_api.py
def list_downloads():
    status = request.args.get("status")
    downloads = torrent_db.get_downloads(limit=100, status=status)
    for dl in downloads:
        dl["size_formatted"] = format_size(dl.get("size_bytes", 0))
    return jsonify(downloads)
get_download function · python · L143-L148 (6 LOC)
routes/torrent_api.py
def get_download(download_id):
    dl = torrent_db.get_download(download_id)
    if not dl:
        return jsonify({"error": "Not found"}), 404
    dl["size_formatted"] = format_size(dl.get("size_bytes", 0))
    return jsonify(dl)
cancel_download function · python · L152-L157 (6 LOC)
routes/torrent_api.py
def cancel_download(download_id):
    if torrent_manager is None:
        return jsonify({"error": "Torrent manager not initialized"}), 500
    delete_files = request.args.get("delete_files", "false") == "true"
    result = torrent_manager.cancel_download(download_id, delete_files)
    return jsonify(result)
trigger_verification function · python · L163-L177 (15 LOC)
routes/torrent_api.py
def trigger_verification(download_id):
    import threading
    dl = torrent_db.get_download(download_id)
    if not dl:
        return jsonify({"error": "Not found"}), 404

    from flask import current_app
    socketio = current_app.extensions.get("socketio")
    thread = threading.Thread(
        target=verify.run_verification,
        args=(download_id, socketio),
        daemon=True,
    )
    thread.start()
    return jsonify({"status": "verification_started", "download_id": download_id})
Repobility — same analyzer, your code, free for public repos · /scan/
get_verification function · python · L181-L190 (10 LOC)
routes/torrent_api.py
def get_verification(download_id):
    results = torrent_db.get_verifications(download_id)
    dl = torrent_db.get_download(download_id)
    return jsonify({
        "download_id": download_id,
        "verification_status": dl["verification_status"] if dl else None,
        "verification_score": dl["verification_score"] if dl else None,
        "verification_notes": dl["verification_notes"] if dl else None,
        "results": results,
    })
approve_download function · python · L194-L198 (5 LOC)
routes/torrent_api.py
def approve_download(download_id):
    data = request.get_json()
    approved = data.get("approved", True)
    verify.approve_manually(download_id, approved)
    return jsonify({"status": "approved" if approved else "rejected"})
serve_thumbnail function · python · L204-L208 (5 LOC)
routes/torrent_api.py
def serve_thumbnail(download_id, filename):
    thumb_dir = os.path.join(config.DATA_DIR, "thumbnails", str(download_id))
    if not os.path.isdir(thumb_dir):
        return jsonify({"error": "Not found"}), 404
    return send_from_directory(thumb_dir, filename)
page 1 / 5next ›