← back to davicavalcanti1__replayproject

Function bodies 126 total

All specs Real LLM only Function bodies
_make_cam function · python · L139-L153 (15 LOC)
backend/app.py
def _make_cam(index, backend: int = 0, ip: str = None, channel: int = 1, name: str = None) -> dict:
    return {
        "index":       index,
        "backend":     backend,
        "ip":          ip,            # None = USB camera, "10.20.100.126" = IP camera
        "channel":     channel,       # RTSP channel (1 = default)
        "name":        name,          # custom name for IP cameras
        "buffer":      collections.deque(),
        "buffer_lock": threading.Lock(),
        "latest_jpeg": None,
        "latest_lock": threading.Lock(),
        "active":      False,
        "error":       None,
        "stop":        False,
    }
_build_rtsp_url function · python · L156-L160 (5 LOC)
backend/app.py
def _build_rtsp_url(ip: str, channel: int = 1, subtype: int = 0) -> str:
    """Monta URL RTSP para câmeras Intelbras. Faz URL-encode das credenciais."""
    user = quote(CAMERA_USER, safe='')
    pwd  = quote(CAMERA_PASSWORD, safe='')
    return f"rtsp://{user}:{pwd}@{ip}:554/cam/realmonitor?channel={channel}&subtype={subtype}"
_try_open_ip function · python · L163-L196 (34 LOC)
backend/app.py
def _try_open_ip(ip: str, channel: int = 1) -> cv2.VideoCapture | None:
    """Tenta conectar a uma câmera IP via RTSP.

    Por padrão prefere MAINSTREAM (FPS completo, 25-30 fps) — o substream das
    Intelbras vem tipicamente em 10-15fps, o que gera o efeito 'câmera lenta'
    no replay. Se a largura de banda/CPU não suportarem mainstream, defina
    PREFER_SUBSTREAM=1 pra inverter a preferência.
    """
    # subtype=0 = mainstream (H.264/H.265, resolução e FPS cheios)
    # subtype=1 = substream (H.264, ~704x576 ou 640x480, FPS reduzido)
    prefer_sub = os.environ.get("PREFER_SUBSTREAM", "0") in ("1", "true", "yes")
    order = [1, 0] if prefer_sub else [0, 1]
    for subtype in order:
        url = _build_rtsp_url(ip, channel, subtype=subtype)
        label = "substream" if subtype == 1 else "mainstream"
        print(f"  Conectando câmera IP {ip} canal {channel} ({label})...", flush=True)
        try:
            cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
        except Except
_get_candidates function · python · L199-L202 (4 LOC)
backend/app.py
def _get_candidates() -> list[int]:
    if CAMERA_INDICES:
        return [int(x.strip()) for x in CAMERA_INDICES.split(",") if x.strip()]
    return list(range(10))
_try_open function · python · L205-L225 (21 LOC)
backend/app.py
def _try_open(idx: int):
    """Tenta abrir câmera com múltiplos backends. Retorna (cap, backend) ou (None, None)."""
    for backend in _BACKENDS:
        try:
            cap = cv2.VideoCapture(idx, backend)
        except Exception:
            continue
        if not cap.isOpened():
            cap.release()
            continue
        if cap.get(cv2.CAP_PROP_FRAME_WIDTH) == 0:
            cap.release()
            continue
        time.sleep(0.3)
        ret, _ = cap.read()
        if ret:
            bname = _BACKEND_NAMES.get(backend, str(backend))
            print(f"  Câmera {idx} OK (backend {bname})", flush=True)
            return cap, backend
        cap.release()
    return None, None
_grab_test_frames function · python · L228-L236 (9 LOC)
backend/app.py
def _grab_test_frames(cap, n: int = 6):
    """Captura N frames de teste para validação."""
    frames = []
    for _ in range(n):
        ret, frame = cap.read()
        if ret and frame is not None and frame.size > 0:
            frames.append(frame)
        time.sleep(0.05)
    return frames
_is_mostly_black function · python · L239-L244 (6 LOC)
backend/app.py
def _is_mostly_black(frames) -> bool:
    """Verifica se a maioria dos frames é preta (câmera ghost)."""
    if not frames:
        return True
    black = sum(1 for f in frames if np.mean(f) < 8)
    return black > len(frames) // 2
Open data scored by Repobility · https://repobility.com
_make_thumb function · python · L247-L249 (3 LOC)
backend/app.py
def _make_thumb(frame):
    """Cria thumbnail 64x64 para comparação rápida."""
    return cv2.resize(frame, (64, 64)).astype(np.float32)
_is_duplicate function · python · L252-L254 (3 LOC)
backend/app.py
def _is_duplicate(thumb, other_thumb, threshold: float = 12.0) -> bool:
    """Compara dois thumbnails — True se forem muito parecidos."""
    return np.mean(np.abs(thumb - other_thumb)) < threshold
detect_cameras function · python · L257-L305 (49 LOC)
backend/app.py
def detect_cameras() -> dict:
    """Retorna {idx: (cap, backend)} filtrando ghosts e duplicatas."""
    found = {}
    validated_thumbs = {}
    consecutive_misses = 0

    for idx in _get_candidates():
        print(f"  Testando câmera {idx}...", flush=True)
        cap, backend = _try_open(idx)
        if cap is None:
            consecutive_misses += 1
            if not CAMERA_INDICES and consecutive_misses >= 2 and found:
                print(f"  Parando busca (2 índices vazios seguidos)", flush=True)
                break
            continue
        consecutive_misses = 0

        # Gravar frames de teste
        frames = _grab_test_frames(cap)
        if len(frames) < 2:
            print(f"  Câmera {idx}: descartada (sem frames válidos)", flush=True)
            cap.release()
            continue

        # Ghost — imagem preta
        if _is_mostly_black(frames):
            print(f"  Câmera {idx}: descartada (ghost/preta)", flush=True)
            cap.release()
           
_validate_new_camera function · python · L308-L334 (27 LOC)
backend/app.py
def _validate_new_camera(cap, idx: int) -> bool:
    """Valida câmera nova no scanner — checa ghost e duplicata com câmeras ativas."""
    frames = _grab_test_frames(cap)
    if len(frames) < 2:
        return False

    if _is_mostly_black(frames):
        print(f"  [scanner] Câmera {idx}: ghost (preta)", flush=True)
        return False

    brightnesses = [np.mean(f) for f in frames]
    best = frames[brightnesses.index(max(brightnesses))]
    thumb = _make_thumb(best)

    for cam_id, cam in cameras.items():
        with cam["latest_lock"]:
            jpeg = cam["latest_jpeg"]
        if jpeg is None:
            continue
        existing = cv2.imdecode(np.frombuffer(jpeg, np.uint8), cv2.IMREAD_COLOR)
        if existing is None:
            continue
        if _is_duplicate(thumb, _make_thumb(existing)):
            print(f"  [scanner] Câmera {idx}: duplicata de cam{cam_id}", flush=True)
            return False

    return True
capture_loop function · python · L337-L359 (23 LOC)
backend/app.py
def capture_loop(cam_id: str, cap: cv2.VideoCapture):
    cam = cameras.get(cam_id)
    if cam is None:
        cap.release()
        return
    idx = cam["index"]
    backend = cam["backend"]
    cam["active"] = True
    is_ip = cam.get("ip") is not None
    # Câmeras IP: mesmo FPS das USB para replay fluido
    effective_fps = CAPTURE_FPS
    print(f"[cam{cam_id}] Captura iniciada ({'IP ' + cam['ip'] if is_ip else 'USB'}, {effective_fps}fps)", flush=True)
    interval = 1.0 / effective_fps

    try:
      _capture_loop_inner(cam_id, cam, cap, idx, backend, is_ip, interval)
    except Exception as e:
        print(f"[cam{cam_id}] ERRO FATAL na thread: {e}", flush=True)
        cam["active"] = False
        try:
            cap.release()
        except Exception:
            pass
_capture_loop_inner function · python · L362-L460 (99 LOC)
backend/app.py
def _capture_loop_inner(cam_id, cam, cap, idx, backend, is_ip, interval):
    # Câmeras IP: minimizar buffer interno do OpenCV para evitar delay
    if is_ip:
        cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

    while not cam["stop"]:
        t0 = time.time()
        try:
            if is_ip:
                # Drenar agressivamente o buffer RTSP: grab() várias vezes pra
                # descartar frames antigos e ficar SÓ com o mais recente.
                # Isso elimina o delay acumulado (câmera IP passa a ser quase 0).
                # Timeout implícito: 2 grabs suficientes pra acompanhar 30fps
                # num loop que rota em ~interval; grabs extras simplesmente
                # não farão nada porque o socket não terá frame novo.
                grabbed_any = False
                for _ in range(4):
                    if cap.grab():
                        grabbed_any = True
                    else:
                        break
                if grabbed_any:
             
camera_scanner function · python · L472-L526 (55 LOC)
backend/app.py
def camera_scanner():
    """Detecta câmeras novas (só no modo auto-detect, sem CAMERA_INDICES)."""
    inactive_since: dict[str, float] = {}

    while True:
        time.sleep(SCAN_INTERVAL)

        # Com CAMERA_INDICES definido, cada capture_loop cuida da sua reconexão
        if CAMERA_INDICES:
            continue

        now = time.time()
        known_indices = {cam["index"] for cam in cameras.values()}
        candidates = _get_candidates()
        if known_indices:
            max_idx = max(known_indices) + 2
            candidates = [i for i in candidates if i <= max_idx]

        # ── Detectar novas câmeras ───────────────────────────────────────
        for idx in candidates:
            if idx in known_indices:
                continue
            rej_time = _rejected_indices.get(idx)
            if rej_time and now - rej_time < _REJECT_COOLDOWN:
                continue
            cap, backend = _try_open(idx)
            if cap is None:
                continue
      
_encode_ffmpeg function · python · L531-L550 (20 LOC)
backend/app.py
def _encode_ffmpeg(frames, fps, out_path) -> bool:
    cmd = [
        "ffmpeg", "-y",
        "-f", "image2pipe", "-vcodec", "mjpeg",
        "-framerate", str(round(fps, 3)),
        "-i", "pipe:0",
        "-vcodec", "libx264", "-preset", "fast", "-crf", "23",
        "-pix_fmt", "yuv420p", "-movflags", "+faststart",
        out_path,
    ]
    try:
        proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
        for _, jpeg_bytes in frames:
            proc.stdin.write(jpeg_bytes)
        proc.stdin.close()
        proc.stderr.read()
        proc.wait()
        return proc.returncode == 0 and os.path.getsize(out_path) > 0
    except FileNotFoundError:
        return False
Repobility · code-quality intelligence platform · https://repobility.com
_encode_cv2 function · python · L553-L561 (9 LOC)
backend/app.py
def _encode_cv2(frames, fps, out_path) -> bool:
    first = cv2.imdecode(np.frombuffer(frames[0][1], np.uint8), cv2.IMREAD_COLOR)
    h, w = first.shape[:2]
    writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
    for _, jpeg_bytes in frames:
        frame = cv2.imdecode(np.frombuffer(jpeg_bytes, np.uint8), cv2.IMREAD_COLOR)
        writer.write(frame)
    writer.release()
    return os.path.getsize(out_path) > 0
login_required function · python · L566-L573 (8 LOC)
backend/app.py
def login_required(f):
    @wraps(f)
    def wrapper(*a, **kw):
        if "user" not in session or session["user"] not in users_db:
            session.pop("user", None)
            return redirect(url_for("login"))
        return f(*a, **kw)
    return wrapper
admin_required function · python · L576-L585 (10 LOC)
backend/app.py
def admin_required(f):
    @wraps(f)
    def wrapper(*a, **kw):
        if "user" not in session:
            return redirect(url_for("login"))
        u = users_db.get(session["user"])
        if not u or not u.get("is_admin"):
            return redirect(url_for("locations_page"))
        return f(*a, **kw)
    return wrapper
add_cors_headers function · python · L589-L599 (11 LOC)
backend/app.py
def add_cors_headers(response):
    """Echo Access-Control-* headers quando o Origin da requisição está na
    lista permitida (inclui FRONTEND_ORIGINS do .env)."""
    origin = request.headers.get("Origin", "")
    if origin in ALLOWED_ORIGINS:
        response.headers["Access-Control-Allow-Origin"]      = origin
        response.headers["Access-Control-Allow-Credentials"] = "true"
        response.headers["Access-Control-Allow-Headers"]     = "Content-Type, X-Requested-With, Authorization"
        response.headers["Access-Control-Allow-Methods"]     = "GET, POST, PUT, DELETE, OPTIONS"
        response.headers["Vary"]                             = "Origin"
    return response
inject_globals function · python · L603-L605 (3 LOC)
backend/app.py
def inject_globals():
    u = users_db.get(session.get("user"))
    return {"current_user": u, "current_username": session.get("user")}
index function · python · L611-L615 (5 LOC)
backend/app.py
def index():
    if "user" in session and session["user"] in users_db:
        u = users_db[session["user"]]
        return redirect(url_for("admin_page" if u.get("is_admin") else "locations_page"))
    return redirect(url_for("login"))
login function · python · L619-L628 (10 LOC)
backend/app.py
def login():
    if request.method == "POST":
        username = request.form.get("username", "").strip()
        password = request.form.get("password", "")
        user = users_db.get(username)
        if user and user["password"] == password:
            session["user"] = username
            return redirect(url_for("admin_page" if user.get("is_admin") else "locations_page"))
        flash("Usuário ou senha incorretos.")
    return render_template("login.html")
register function · python · L632-L663 (32 LOC)
backend/app.py
def register():
    if request.method == "POST":
        name  = request.form.get("name", "").strip()
        phone = request.form.get("phone", "").strip()
        sex   = request.form.get("sex", "")
        dob   = request.form.get("dob", "")
        terms = request.form.get("terms")
        if not all([name, phone, sex, dob]):
            flash("Preencha todos os campos obrigatórios.")
            return render_template("register.html")
        if not terms:
            flash("Você deve aceitar os termos de uso.")
            return render_template("register.html")
        uid = phone.replace(" ", "").replace("-", "").replace("(", "").replace(")", "")
        if uid in users_db:
            flash("Este telefone já está cadastrado.")
            return render_template("register.html")
        pwd = uid[-4:]
        users_db[uid] = {
            "name": name,
            "phone": phone,
            "sex": sex,
            "dob": dob,
            "password": pwd,
            "credits": 
Repobility · open methodology · https://repobility.com/research/
logout function · python · L667-L669 (3 LOC)
backend/app.py
def logout():
    session.pop("user", None)
    return redirect(url_for("login"))
locations_page function · python · L676-L684 (9 LOC)
backend/app.py
def locations_page():
    locs = []
    for cam_id in cameras:
        locs.append({
            "id": cam_id,
            "name": _location_name(cam_id),
            "clips_count": len(clips_db.get(cam_id, [])),
        })
    return render_template("locations.html", locations=locs)
location_detail function · python · L689-L699 (11 LOC)
backend/app.py
def location_detail(cam_id):
    if cam_id not in cameras:
        flash("Localidade não encontrada.")
        return redirect(url_for("locations_page"))
    return render_template(
        "location_detail.html",
        cam_id=cam_id,
        location_name=_location_name(cam_id),
        clips=clips_db.get(cam_id, []),
        buffer_seconds=BUFFER_SECONDS,
    )
_do_generate_clip function · python · L702-L756 (55 LOC)
backend/app.py
def _do_generate_clip(cam_id: str, triggered_by: str = "system") -> dict:
    """Generate a replay clip for *cam_id* and persist it in clips_db.

    Returns a dict with keys:
      - on success: {"ok": True, "clip": clip_info, "download_url": ..., "download_name": ...}
      - on failure: {"ok": False, "error": "<reason>"}
    """
    cam = cameras.get(cam_id)
    if cam is None:
        return {"ok": False, "error": "camera_not_found"}

    with cam["buffer_lock"]:
        snapshot_frames = list(cam["buffer"])
    if not snapshot_frames:
        return {"ok": False, "error": "buffer_empty"}

    duration = (snapshot_frames[-1][0] - snapshot_frames[0][0]) if len(snapshot_frames) > 1 else 0.0
    fps = len(snapshot_frames) / duration if duration > 0 else float(CAPTURE_FPS)

    clip_id = uuid.uuid4().hex[:12]
    cam_dir = os.path.join(CLIPS_DIR, cam_id)
    os.makedirs(cam_dir, exist_ok=True)
    clip_path  = os.path.join(cam_dir, f"{clip_id}.mp4")
    thumb_path = os.path.join(cam_di
generate_clip function · python · L761-L783 (23 LOC)
backend/app.py
def generate_clip(cam_id):
    user = users_db[session["user"]]
    if not user.get("is_admin") and user["credits"] <= 0:
        return jsonify({"error": "no_credits", "credits": 0}), 402

    result = _do_generate_clip(cam_id, triggered_by=session["user"])

    if not result["ok"]:
        status = 404 if result["error"] == "camera_not_found" else \
                 503 if result["error"] == "buffer_empty" else 500
        return jsonify({"error": result["error"]}), status

    if not user.get("is_admin"):
        user["credits"] -= 1

    return jsonify({
        "success":       True,
        "clip":          result["clip"],
        "download_url":  url_for("serve_clip", cam_id=cam_id,
                                 filename=result["clip"]["filename"]),
        "download_name": result["download_name"],
        "credits":       user["credits"],
    })
trigger_all function · python · L788-L825 (38 LOC)
backend/app.py
def trigger_all():
    """Hardware trigger endpoint — called by the ESP8266 button.

    Generates a replay clip for every active camera simultaneously.
    Authentication: if TRIGGER_TOKEN env var is set, the request must carry
    the header  X-Trigger-Token: <token>  (or Bearer token in Authorization).
    """
    if TRIGGER_TOKEN:
        provided = (
            request.headers.get("X-Trigger-Token") or
            request.headers.get("Authorization", "").removeprefix("Bearer ").strip()
        )
        if provided != TRIGGER_TOKEN:
            return jsonify({"error": "unauthorized"}), 401

    active_ids = [cam_id for cam_id, cam in cameras.items() if cam.get("active")]
    if not active_ids:
        return jsonify({"ok": False, "error": "no_active_cameras"}), 503

    results = {}
    threads = []

    def _worker(cam_id):
        results[cam_id] = _do_generate_clip(cam_id, triggered_by="hardware_button")

    for cam_id in active_ids:
        t = threading.Thread(target=_work
buy_credits function · python · L830-L833 (4 LOC)
backend/app.py
def buy_credits():
    user = users_db[session["user"]]
    user["credits"] += 5
    return jsonify({"success": True, "credits": user["credits"]})
serve_clip function · python · L837-L843 (7 LOC)
backend/app.py
def serve_clip(cam_id, filename):
    """Serve o arquivo do clipe diretamente (sem auth).
    O UUID hex de 12 chars do nome já funciona como token — só quem está
    autenticado e listou os clipes via /api/clips/<cam_id> tem acesso aos
    nomes. Tornar público permite <img src> e <video src> cross-origin
    funcionarem sem precisar de CORS+crossOrigin="use-credentials"."""
    return send_from_directory(os.path.join(CLIPS_DIR, cam_id), filename)
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
trim_clip function · python · L847-L967 (121 LOC)
backend/app.py
def trim_clip(cam_id, clip_id):
    """Return a trimmed portion of a previously generated clip.

    Query params:
        start     (float, default 0)  — seconds from the beginning of the clip
        duration  (float, default full) — seconds of trimmed output

    If both params omitted (or duration >= source length), the full clip is
    streamed without re-encoding.
    """
    # sanitize clip_id to avoid path traversal
    safe_id = os.path.basename(clip_id)
    if safe_id != clip_id or not safe_id.replace("-", "").replace("_", "").isalnum():
        return jsonify({"error": "invalid_clip_id"}), 400

    src_path = os.path.join(CLIPS_DIR, cam_id, f"{safe_id}.mp4")
    if not os.path.exists(src_path):
        return jsonify({"error": "clip_not_found"}), 404

    try:
        start    = max(0.0, float(request.args.get("start", 0)))
        duration = float(request.args.get("duration", 0))
    except ValueError:
        return jsonify({"error": "invalid_params"}), 400

    loc_name =
admin_page function · python · L974-L979 (6 LOC)
backend/app.py
def admin_page():
    cam_list = [{"id": k, "label": _location_name(k)} for k in cameras]
    users_count = len([u for u in users_db.values() if not u.get("is_admin")])
    clips_count = sum(len(v) for v in clips_db.values())
    return render_template("admin.html", cameras=cam_list,
                           users_count=users_count, clips_count=clips_count)
health function · python · L991-L1007 (17 LOC)
backend/app.py
def health():
    result = {}
    for cam_id, cam in cameras.items():
        with cam["buffer_lock"]:
            n = len(cam["buffer"])
            dur = round(cam["buffer"][-1][0] - cam["buffer"][0][0], 1) if n > 1 else 0.0
        result[cam_id] = {
            "index":      cam["index"],
            "active":     cam["active"],
            "error":      cam["error"],
            "frames":     n,
            "duration_s": dur,
            "ip":         cam.get("ip"),
            "channel":    cam.get("channel"),
            "is_ip":      cam.get("ip") is not None,
        }
    return jsonify(result)
snapshot function · python · L1011-L1019 (9 LOC)
backend/app.py
def snapshot(cam_id):
    cam = cameras.get(cam_id)
    if cam is None:
        return Response(status=404)
    with cam["latest_lock"]:
        frame = cam["latest_jpeg"]
    if frame is None:
        return Response(status=503)
    return Response(frame, mimetype="image/jpeg", headers={"Cache-Control": "no-store"})
_gen_mjpeg function · python · L1022-L1031 (10 LOC)
backend/app.py
def _gen_mjpeg(cam_id: str):
    cam = cameras[cam_id]
    while True:
        with cam["latest_lock"]:
            frame = cam["latest_jpeg"]
        if frame is None:
            time.sleep(0.05)
            continue
        yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + frame + b"\r\n"
        time.sleep(1.0 / CAPTURE_FPS)
video_stream function · python · L1035-L1039 (5 LOC)
backend/app.py
def video_stream(cam_id):
    if cam_id not in cameras:
        return Response(status=404)
    return Response(_gen_mjpeg(cam_id),
                    mimetype="multipart/x-mixed-replace; boundary=frame")
_scan_single function · python · L1061-L1112 (52 LOC)
backend/app.py
def _scan_single(idx: int):
    """
    Try to open camera at index `idx`, validate, and add to `cameras`.
    Updates _scan_state in place.  Runs in its own thread.
    """
    def _set(status, reason="", cam_id=None):
        with _scan_lock:
            for entry in _scan_state["indices"]:
                if entry["index"] == idx:
                    entry["status"] = status
                    entry["reason"] = reason
                    if cam_id:
                        entry["cam_id"] = cam_id
                    break

    _set("checking")

    # Skip index already active
    with _scan_lock:
        known = {cam["index"] for cam in cameras.values()}
    if idx in known:
        _set("skipped", "já ativa")
        return

    # Try to open
    cap, backend = _try_open(idx)
    if cap is None:
        _set("error", "não encontrada")
        return

    # Validate (ghost / duplicate check)
    if not _validate_new_camera(cap, idx):
        cap.release()
        _set("error", "gho
_run_scan function · python · L1115-L1144 (30 LOC)
backend/app.py
def _run_scan(candidates: list[int]):
    """Background orchestrator: launches one thread per candidate, waits up to 12s total."""
    with _scan_lock:
        _scan_state["running"]    = True
        _scan_state["started_at"] = time.time()
        _scan_state["added"]      = []
        _scan_state["indices"]    = [
            {"index": i, "status": "pending", "cam_id": None, "reason": ""}
            for i in candidates
        ]

    threads = [threading.Thread(target=_scan_single, args=(i,), daemon=True) for i in candidates]
    for t in threads:
        t.start()

    deadline = time.time() + 12   # 12 s total budget
    for t in threads:
        remaining = max(0.1, deadline - time.time())
        t.join(timeout=remaining)

    # Mark any still-pending as timed out
    with _scan_lock:
        for entry in _scan_state["indices"]:
            if entry["status"] in ("pending", "checking"):
                entry["status"] = "error"
                entry["reason"] = "timeout"
       
Open data scored by Repobility · https://repobility.com
api_scan_cameras function · python · L1149-L1169 (21 LOC)
backend/app.py
def api_scan_cameras():
    """Kick off a hot-plug scan.  Returns immediately; poll /api/scan-status."""
    with _scan_lock:
        if _scan_state["running"]:
            return jsonify({"error": "scan_already_running"}), 409

    # Candidates: 0-9 minus indices already active
    known = {cam["index"] for cam in cameras.values()}
    data  = request.get_json(silent=True) or {}
    # Caller can pass {"indices": [0,1,2,...]} to override
    requested = data.get("indices")
    if requested:
        candidates = [int(i) for i in requested if int(i) not in known]
    else:
        candidates = [i for i in range(10) if i not in known]

    if not candidates:
        return jsonify({"message": "Nenhum índice novo para verificar.", "added": []}), 200

    threading.Thread(target=_run_scan, args=(candidates,), daemon=True).start()
    return jsonify({"message": "Scan iniciado.", "candidates": candidates}), 202
api_scan_status function · python · L1174-L1183 (10 LOC)
backend/app.py
def api_scan_status():
    """Poll this endpoint to get live scan progress."""
    with _scan_lock:
        state = {
            "running":    _scan_state["running"],
            "indices":    list(_scan_state["indices"]),
            "added":      list(_scan_state["added"]),
            "total_cameras": len(cameras),
        }
    return jsonify(state)
api_add_ip_camera function · python · L1193-L1233 (41 LOC)
backend/app.py
def api_add_ip_camera():
    """
    Add an IP camera by its IP address.
    Body: {"ip": "10.20.100.126", "channel": 1, "name": "Entrada"}
    """
    global _ip_cam_counter
    data = request.get_json(silent=True) or {}
    ip = data.get("ip", "").strip()
    channel = int(data.get("channel", 1))
    name = data.get("name", "").strip()

    if not ip:
        return jsonify({"error": "IP obrigatório"}), 400

    # Check if this IP+channel is already registered
    for cam_id, cam in cameras.items():
        if cam.get("ip") == ip and cam.get("channel", 1) == channel:
            return jsonify({"error": f"Câmera {ip} canal {channel} já cadastrada (ID {cam_id})"}), 409

    # Try to connect
    cap = _try_open_ip(ip, channel)
    if cap is None:
        return jsonify({"error": f"Não foi possível conectar em {ip}. Verifique IP, usuário e senha."}), 502

    cam_id = f"ip{_ip_cam_counter}"
    _ip_cam_counter += 1

    cam_name = name or f"Câmera IP {ip}"
    cameras[cam_id] = _make_ca
api_remove_camera function · python · L1238-L1249 (12 LOC)
backend/app.py
def api_remove_camera(cam_id):
    """Remove a camera (IP or USB) by its ID."""
    cam = cameras.get(cam_id)
    if cam is None:
        return jsonify({"error": "Câmera não encontrada"}), 404

    cam["stop"] = True
    cameras.pop(cam_id, None)
    LOCATION_NAMES.pop(cam_id, None)
    print(f"[remove] Câmera {cam_id} removida", flush=True)

    return jsonify({"success": True, "cam_id": cam_id})
api_list_ip_cameras function · python · L1254-L1266 (13 LOC)
backend/app.py
def api_list_ip_cameras():
    """List all registered IP cameras."""
    result = []
    for cam_id, cam in cameras.items():
        if cam.get("ip"):
            result.append({
                "cam_id": cam_id,
                "ip": cam["ip"],
                "channel": cam.get("channel", 1),
                "name": cam.get("name", f"Câmera IP {cam['ip']}"),
                "active": cam["active"],
            })
    return jsonify(result)
_json_admin_only function · python · L1271-L1278 (8 LOC)
backend/app.py
def _json_admin_only():
    """Retorna 401/403 JSON em vez de redirect pros endpoints /api/admin/*.
    Admin real precisa estar autenticado e marcado is_admin."""
    if "user" not in session or session["user"] not in users_db:
        return jsonify({"error": "unauthorized"}), 401
    if not users_db[session["user"]].get("is_admin"):
        return jsonify({"error": "forbidden"}), 403
    return None
api_admin_list_cameras function · python · L1282-L1306 (25 LOC)
backend/app.py
def api_admin_list_cameras():
    """Lista TODAS as câmeras (USB + IP) com status detalhado — pra UI admin."""
    err = _json_admin_only()
    if err: return err

    result = []
    for cam_id, cam in cameras.items():
        with cam["buffer_lock"]:
            n = len(cam["buffer"])
            dur = round(cam["buffer"][-1][0] - cam["buffer"][0][0], 1) if n > 1 else 0.0
        result.append({
            "cam_id":     cam_id,
            "name":       cam.get("name", _location_name(cam_id)),
            "label":      _location_name(cam_id),
            "type":       "ip" if cam.get("ip") else "usb",
            "ip":         cam.get("ip"),
            "channel":    cam.get("channel"),
            "index":      cam.get("index"),
            "active":     bool(cam.get("active")),
            "paused":     bool(cam.get("paused")),
            "error":      cam.get("error"),
            "frames":     n,
            "duration_s": dur,
        })
    return jsonify({"cameras": result})
api_admin_pause_camera function · python · L1310-L1322 (13 LOC)
backend/app.py
def api_admin_pause_camera(cam_id):
    """Pausa a captura dessa câmera (thread encerra, buffer congelado).
    A câmera permanece registrada — basta /resume pra voltar."""
    err = _json_admin_only()
    if err: return err
    cam = cameras.get(cam_id)
    if cam is None:
        return jsonify({"error": "camera_not_found"}), 404
    cam["stop"]   = True
    cam["paused"] = True
    cam["active"] = False
    print(f"[admin] Câmera {cam_id} pausada", flush=True)
    return jsonify({"ok": True, "cam_id": cam_id, "paused": True})
Repobility · code-quality intelligence platform · https://repobility.com
api_admin_resume_camera function · python · L1326-L1353 (28 LOC)
backend/app.py
def api_admin_resume_camera(cam_id):
    """Retoma uma câmera pausada — abre captura de novo e inicia thread."""
    err = _json_admin_only()
    if err: return err
    cam = cameras.get(cam_id)
    if cam is None:
        return jsonify({"error": "camera_not_found"}), 404
    if cam.get("active"):
        return jsonify({"ok": True, "cam_id": cam_id, "already_active": True})

    # Reabre o capture
    if cam.get("ip"):
        cap = _try_open_ip(cam["ip"], cam.get("channel", 1))
    else:
        cap, backend = _try_open(cam["index"])
        if cap is not None:
            cam["backend"] = backend
    if cap is None:
        cam["error"] = "reopen_failed"
        return jsonify({"error": "reopen_failed"}), 500

    cam["stop"]   = False
    cam["paused"] = False
    cam["error"]  = None
    t = threading.Thread(target=capture_loop, args=(cam_id, cap), daemon=True)
    t.start()
    print(f"[admin] Câmera {cam_id} retomada", flush=True)
    return jsonify({"ok": True, "cam_id": cam_i
api_admin_system_info function · python · L1359-L1387 (29 LOC)
backend/app.py
def api_admin_system_info():
    """Retorna status do sistema (CPU, RAM, uptime, disk) pra o dashboard."""
    err = _json_admin_only()
    if err: return err

    info = {
        "platform":  platform.system(),
        "node":      platform.node(),
        "release":   platform.release(),
        "python":    platform.python_version(),
        "cameras_total":  len(cameras),
        "cameras_active": sum(1 for c in cameras.values() if c.get("active")),
        "clips_total":    sum(len(v) for v in clips_db.values()),
        "buffer_seconds": BUFFER_SECONDS,
        "capture_fps":    CAPTURE_FPS,
    }
    # psutil é opcional — só usa se instalado
    try:
        import psutil
        info["cpu_percent"]    = psutil.cpu_percent(interval=0.1)
        info["ram_percent"]    = psutil.virtual_memory().percent
        info["ram_used_mb"]    = round(psutil.virtual_memory().used / 1024 / 1024)
        info["ram_total_mb"]   = round(psutil.virtual_memory().total / 1024 / 1024)
        info[
_run_system_command function · python · L1390-L1402 (13 LOC)
backend/app.py
def _run_system_command(cmd_windows, cmd_linux, delay_seconds=5):
    """Dispara um comando de sistema em background após delay (dá tempo da
    resposta HTTP chegar no cliente)."""
    def _delayed():
        time.sleep(delay_seconds)
        try:
            if platform.system() == "Windows":
                subprocess.Popen(cmd_windows, shell=False)
            else:
                subprocess.Popen(cmd_linux, shell=False)
        except Exception as e:
            print(f"[admin] Falha ao executar comando: {e}", flush=True)
    threading.Thread(target=_delayed, daemon=True).start()
page 1 / 3next ›