Function bodies 126 total
_make_cam function · python · L139-L153 (15 LOC)backend/app.py
def _make_cam(index, backend: int = 0, ip: str = None, channel: int = 1, name: str = None) -> dict:
return {
"index": index,
"backend": backend,
"ip": ip, # None = USB camera, "10.20.100.126" = IP camera
"channel": channel, # RTSP channel (1 = default)
"name": name, # custom name for IP cameras
"buffer": collections.deque(),
"buffer_lock": threading.Lock(),
"latest_jpeg": None,
"latest_lock": threading.Lock(),
"active": False,
"error": None,
"stop": False,
}_build_rtsp_url function · python · L156-L160 (5 LOC)backend/app.py
def _build_rtsp_url(ip: str, channel: int = 1, subtype: int = 0) -> str:
"""Monta URL RTSP para câmeras Intelbras. Faz URL-encode das credenciais."""
user = quote(CAMERA_USER, safe='')
pwd = quote(CAMERA_PASSWORD, safe='')
return f"rtsp://{user}:{pwd}@{ip}:554/cam/realmonitor?channel={channel}&subtype={subtype}"_try_open_ip function · python · L163-L196 (34 LOC)backend/app.py
def _try_open_ip(ip: str, channel: int = 1) -> cv2.VideoCapture | None:
"""Tenta conectar a uma câmera IP via RTSP.
Por padrão prefere MAINSTREAM (FPS completo, 25-30 fps) — o substream das
Intelbras vem tipicamente em 10-15fps, o que gera o efeito 'câmera lenta'
no replay. Se a largura de banda/CPU não suportarem mainstream, defina
PREFER_SUBSTREAM=1 pra inverter a preferência.
"""
# subtype=0 = mainstream (H.264/H.265, resolução e FPS cheios)
# subtype=1 = substream (H.264, ~704x576 ou 640x480, FPS reduzido)
prefer_sub = os.environ.get("PREFER_SUBSTREAM", "0") in ("1", "true", "yes")
order = [1, 0] if prefer_sub else [0, 1]
for subtype in order:
url = _build_rtsp_url(ip, channel, subtype=subtype)
label = "substream" if subtype == 1 else "mainstream"
print(f" Conectando câmera IP {ip} canal {channel} ({label})...", flush=True)
try:
cap = cv2.VideoCapture(url, cv2.CAP_FFMPEG)
except Except_get_candidates function · python · L199-L202 (4 LOC)backend/app.py
def _get_candidates() -> list[int]:
if CAMERA_INDICES:
return [int(x.strip()) for x in CAMERA_INDICES.split(",") if x.strip()]
return list(range(10))_try_open function · python · L205-L225 (21 LOC)backend/app.py
def _try_open(idx: int):
"""Tenta abrir câmera com múltiplos backends. Retorna (cap, backend) ou (None, None)."""
for backend in _BACKENDS:
try:
cap = cv2.VideoCapture(idx, backend)
except Exception:
continue
if not cap.isOpened():
cap.release()
continue
if cap.get(cv2.CAP_PROP_FRAME_WIDTH) == 0:
cap.release()
continue
time.sleep(0.3)
ret, _ = cap.read()
if ret:
bname = _BACKEND_NAMES.get(backend, str(backend))
print(f" Câmera {idx} OK (backend {bname})", flush=True)
return cap, backend
cap.release()
return None, None_grab_test_frames function · python · L228-L236 (9 LOC)backend/app.py
def _grab_test_frames(cap, n: int = 6):
"""Captura N frames de teste para validação."""
frames = []
for _ in range(n):
ret, frame = cap.read()
if ret and frame is not None and frame.size > 0:
frames.append(frame)
time.sleep(0.05)
return frames_is_mostly_black function · python · L239-L244 (6 LOC)backend/app.py
def _is_mostly_black(frames) -> bool:
"""Verifica se a maioria dos frames é preta (câmera ghost)."""
if not frames:
return True
black = sum(1 for f in frames if np.mean(f) < 8)
return black > len(frames) // 2Open data scored by Repobility · https://repobility.com
_make_thumb function · python · L247-L249 (3 LOC)backend/app.py
def _make_thumb(frame):
"""Cria thumbnail 64x64 para comparação rápida."""
return cv2.resize(frame, (64, 64)).astype(np.float32)_is_duplicate function · python · L252-L254 (3 LOC)backend/app.py
def _is_duplicate(thumb, other_thumb, threshold: float = 12.0) -> bool:
"""Compara dois thumbnails — True se forem muito parecidos."""
return np.mean(np.abs(thumb - other_thumb)) < thresholddetect_cameras function · python · L257-L305 (49 LOC)backend/app.py
def detect_cameras() -> dict:
"""Retorna {idx: (cap, backend)} filtrando ghosts e duplicatas."""
found = {}
validated_thumbs = {}
consecutive_misses = 0
for idx in _get_candidates():
print(f" Testando câmera {idx}...", flush=True)
cap, backend = _try_open(idx)
if cap is None:
consecutive_misses += 1
if not CAMERA_INDICES and consecutive_misses >= 2 and found:
print(f" Parando busca (2 índices vazios seguidos)", flush=True)
break
continue
consecutive_misses = 0
# Gravar frames de teste
frames = _grab_test_frames(cap)
if len(frames) < 2:
print(f" Câmera {idx}: descartada (sem frames válidos)", flush=True)
cap.release()
continue
# Ghost — imagem preta
if _is_mostly_black(frames):
print(f" Câmera {idx}: descartada (ghost/preta)", flush=True)
cap.release()
_validate_new_camera function · python · L308-L334 (27 LOC)backend/app.py
def _validate_new_camera(cap, idx: int) -> bool:
"""Valida câmera nova no scanner — checa ghost e duplicata com câmeras ativas."""
frames = _grab_test_frames(cap)
if len(frames) < 2:
return False
if _is_mostly_black(frames):
print(f" [scanner] Câmera {idx}: ghost (preta)", flush=True)
return False
brightnesses = [np.mean(f) for f in frames]
best = frames[brightnesses.index(max(brightnesses))]
thumb = _make_thumb(best)
for cam_id, cam in cameras.items():
with cam["latest_lock"]:
jpeg = cam["latest_jpeg"]
if jpeg is None:
continue
existing = cv2.imdecode(np.frombuffer(jpeg, np.uint8), cv2.IMREAD_COLOR)
if existing is None:
continue
if _is_duplicate(thumb, _make_thumb(existing)):
print(f" [scanner] Câmera {idx}: duplicata de cam{cam_id}", flush=True)
return False
return Truecapture_loop function · python · L337-L359 (23 LOC)backend/app.py
def capture_loop(cam_id: str, cap: cv2.VideoCapture):
cam = cameras.get(cam_id)
if cam is None:
cap.release()
return
idx = cam["index"]
backend = cam["backend"]
cam["active"] = True
is_ip = cam.get("ip") is not None
# Câmeras IP: mesmo FPS das USB para replay fluido
effective_fps = CAPTURE_FPS
print(f"[cam{cam_id}] Captura iniciada ({'IP ' + cam['ip'] if is_ip else 'USB'}, {effective_fps}fps)", flush=True)
interval = 1.0 / effective_fps
try:
_capture_loop_inner(cam_id, cam, cap, idx, backend, is_ip, interval)
except Exception as e:
print(f"[cam{cam_id}] ERRO FATAL na thread: {e}", flush=True)
cam["active"] = False
try:
cap.release()
except Exception:
pass_capture_loop_inner function · python · L362-L460 (99 LOC)backend/app.py
def _capture_loop_inner(cam_id, cam, cap, idx, backend, is_ip, interval):
# Câmeras IP: minimizar buffer interno do OpenCV para evitar delay
if is_ip:
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
while not cam["stop"]:
t0 = time.time()
try:
if is_ip:
# Drenar agressivamente o buffer RTSP: grab() várias vezes pra
# descartar frames antigos e ficar SÓ com o mais recente.
# Isso elimina o delay acumulado (câmera IP passa a ser quase 0).
# Timeout implícito: 2 grabs suficientes pra acompanhar 30fps
# num loop que rota em ~interval; grabs extras simplesmente
# não farão nada porque o socket não terá frame novo.
grabbed_any = False
for _ in range(4):
if cap.grab():
grabbed_any = True
else:
break
if grabbed_any:
camera_scanner function · python · L472-L526 (55 LOC)backend/app.py
def camera_scanner():
"""Detecta câmeras novas (só no modo auto-detect, sem CAMERA_INDICES)."""
inactive_since: dict[str, float] = {}
while True:
time.sleep(SCAN_INTERVAL)
# Com CAMERA_INDICES definido, cada capture_loop cuida da sua reconexão
if CAMERA_INDICES:
continue
now = time.time()
known_indices = {cam["index"] for cam in cameras.values()}
candidates = _get_candidates()
if known_indices:
max_idx = max(known_indices) + 2
candidates = [i for i in candidates if i <= max_idx]
# ── Detectar novas câmeras ───────────────────────────────────────
for idx in candidates:
if idx in known_indices:
continue
rej_time = _rejected_indices.get(idx)
if rej_time and now - rej_time < _REJECT_COOLDOWN:
continue
cap, backend = _try_open(idx)
if cap is None:
continue
_encode_ffmpeg function · python · L531-L550 (20 LOC)backend/app.py
def _encode_ffmpeg(frames, fps, out_path) -> bool:
cmd = [
"ffmpeg", "-y",
"-f", "image2pipe", "-vcodec", "mjpeg",
"-framerate", str(round(fps, 3)),
"-i", "pipe:0",
"-vcodec", "libx264", "-preset", "fast", "-crf", "23",
"-pix_fmt", "yuv420p", "-movflags", "+faststart",
out_path,
]
try:
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
for _, jpeg_bytes in frames:
proc.stdin.write(jpeg_bytes)
proc.stdin.close()
proc.stderr.read()
proc.wait()
return proc.returncode == 0 and os.path.getsize(out_path) > 0
except FileNotFoundError:
return FalseRepobility · code-quality intelligence platform · https://repobility.com
_encode_cv2 function · python · L553-L561 (9 LOC)backend/app.py
def _encode_cv2(frames, fps, out_path) -> bool:
first = cv2.imdecode(np.frombuffer(frames[0][1], np.uint8), cv2.IMREAD_COLOR)
h, w = first.shape[:2]
writer = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
for _, jpeg_bytes in frames:
frame = cv2.imdecode(np.frombuffer(jpeg_bytes, np.uint8), cv2.IMREAD_COLOR)
writer.write(frame)
writer.release()
return os.path.getsize(out_path) > 0login_required function · python · L566-L573 (8 LOC)backend/app.py
def login_required(f):
@wraps(f)
def wrapper(*a, **kw):
if "user" not in session or session["user"] not in users_db:
session.pop("user", None)
return redirect(url_for("login"))
return f(*a, **kw)
return wrapperadmin_required function · python · L576-L585 (10 LOC)backend/app.py
def admin_required(f):
@wraps(f)
def wrapper(*a, **kw):
if "user" not in session:
return redirect(url_for("login"))
u = users_db.get(session["user"])
if not u or not u.get("is_admin"):
return redirect(url_for("locations_page"))
return f(*a, **kw)
return wrapperadd_cors_headers function · python · L589-L599 (11 LOC)backend/app.py
def add_cors_headers(response):
"""Echo Access-Control-* headers quando o Origin da requisição está na
lista permitida (inclui FRONTEND_ORIGINS do .env)."""
origin = request.headers.get("Origin", "")
if origin in ALLOWED_ORIGINS:
response.headers["Access-Control-Allow-Origin"] = origin
response.headers["Access-Control-Allow-Credentials"] = "true"
response.headers["Access-Control-Allow-Headers"] = "Content-Type, X-Requested-With, Authorization"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, DELETE, OPTIONS"
response.headers["Vary"] = "Origin"
return responseinject_globals function · python · L603-L605 (3 LOC)backend/app.py
def inject_globals():
u = users_db.get(session.get("user"))
return {"current_user": u, "current_username": session.get("user")}index function · python · L611-L615 (5 LOC)backend/app.py
def index():
if "user" in session and session["user"] in users_db:
u = users_db[session["user"]]
return redirect(url_for("admin_page" if u.get("is_admin") else "locations_page"))
return redirect(url_for("login"))login function · python · L619-L628 (10 LOC)backend/app.py
def login():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
user = users_db.get(username)
if user and user["password"] == password:
session["user"] = username
return redirect(url_for("admin_page" if user.get("is_admin") else "locations_page"))
flash("Usuário ou senha incorretos.")
return render_template("login.html")register function · python · L632-L663 (32 LOC)backend/app.py
def register():
if request.method == "POST":
name = request.form.get("name", "").strip()
phone = request.form.get("phone", "").strip()
sex = request.form.get("sex", "")
dob = request.form.get("dob", "")
terms = request.form.get("terms")
if not all([name, phone, sex, dob]):
flash("Preencha todos os campos obrigatórios.")
return render_template("register.html")
if not terms:
flash("Você deve aceitar os termos de uso.")
return render_template("register.html")
uid = phone.replace(" ", "").replace("-", "").replace("(", "").replace(")", "")
if uid in users_db:
flash("Este telefone já está cadastrado.")
return render_template("register.html")
pwd = uid[-4:]
users_db[uid] = {
"name": name,
"phone": phone,
"sex": sex,
"dob": dob,
"password": pwd,
"credits": Repobility · open methodology · https://repobility.com/research/
logout function · python · L667-L669 (3 LOC)backend/app.py
def logout():
session.pop("user", None)
return redirect(url_for("login"))locations_page function · python · L676-L684 (9 LOC)backend/app.py
def locations_page():
locs = []
for cam_id in cameras:
locs.append({
"id": cam_id,
"name": _location_name(cam_id),
"clips_count": len(clips_db.get(cam_id, [])),
})
return render_template("locations.html", locations=locs)location_detail function · python · L689-L699 (11 LOC)backend/app.py
def location_detail(cam_id):
if cam_id not in cameras:
flash("Localidade não encontrada.")
return redirect(url_for("locations_page"))
return render_template(
"location_detail.html",
cam_id=cam_id,
location_name=_location_name(cam_id),
clips=clips_db.get(cam_id, []),
buffer_seconds=BUFFER_SECONDS,
)_do_generate_clip function · python · L702-L756 (55 LOC)backend/app.py
def _do_generate_clip(cam_id: str, triggered_by: str = "system") -> dict:
"""Generate a replay clip for *cam_id* and persist it in clips_db.
Returns a dict with keys:
- on success: {"ok": True, "clip": clip_info, "download_url": ..., "download_name": ...}
- on failure: {"ok": False, "error": "<reason>"}
"""
cam = cameras.get(cam_id)
if cam is None:
return {"ok": False, "error": "camera_not_found"}
with cam["buffer_lock"]:
snapshot_frames = list(cam["buffer"])
if not snapshot_frames:
return {"ok": False, "error": "buffer_empty"}
duration = (snapshot_frames[-1][0] - snapshot_frames[0][0]) if len(snapshot_frames) > 1 else 0.0
fps = len(snapshot_frames) / duration if duration > 0 else float(CAPTURE_FPS)
clip_id = uuid.uuid4().hex[:12]
cam_dir = os.path.join(CLIPS_DIR, cam_id)
os.makedirs(cam_dir, exist_ok=True)
clip_path = os.path.join(cam_dir, f"{clip_id}.mp4")
thumb_path = os.path.join(cam_digenerate_clip function · python · L761-L783 (23 LOC)backend/app.py
def generate_clip(cam_id):
user = users_db[session["user"]]
if not user.get("is_admin") and user["credits"] <= 0:
return jsonify({"error": "no_credits", "credits": 0}), 402
result = _do_generate_clip(cam_id, triggered_by=session["user"])
if not result["ok"]:
status = 404 if result["error"] == "camera_not_found" else \
503 if result["error"] == "buffer_empty" else 500
return jsonify({"error": result["error"]}), status
if not user.get("is_admin"):
user["credits"] -= 1
return jsonify({
"success": True,
"clip": result["clip"],
"download_url": url_for("serve_clip", cam_id=cam_id,
filename=result["clip"]["filename"]),
"download_name": result["download_name"],
"credits": user["credits"],
})trigger_all function · python · L788-L825 (38 LOC)backend/app.py
def trigger_all():
"""Hardware trigger endpoint — called by the ESP8266 button.
Generates a replay clip for every active camera simultaneously.
Authentication: if TRIGGER_TOKEN env var is set, the request must carry
the header X-Trigger-Token: <token> (or Bearer token in Authorization).
"""
if TRIGGER_TOKEN:
provided = (
request.headers.get("X-Trigger-Token") or
request.headers.get("Authorization", "").removeprefix("Bearer ").strip()
)
if provided != TRIGGER_TOKEN:
return jsonify({"error": "unauthorized"}), 401
active_ids = [cam_id for cam_id, cam in cameras.items() if cam.get("active")]
if not active_ids:
return jsonify({"ok": False, "error": "no_active_cameras"}), 503
results = {}
threads = []
def _worker(cam_id):
results[cam_id] = _do_generate_clip(cam_id, triggered_by="hardware_button")
for cam_id in active_ids:
t = threading.Thread(target=_workbuy_credits function · python · L830-L833 (4 LOC)backend/app.py
def buy_credits():
user = users_db[session["user"]]
user["credits"] += 5
return jsonify({"success": True, "credits": user["credits"]})serve_clip function · python · L837-L843 (7 LOC)backend/app.py
def serve_clip(cam_id, filename):
"""Serve o arquivo do clipe diretamente (sem auth).
O UUID hex de 12 chars do nome já funciona como token — só quem está
autenticado e listou os clipes via /api/clips/<cam_id> tem acesso aos
nomes. Tornar público permite <img src> e <video src> cross-origin
funcionarem sem precisar de CORS+crossOrigin="use-credentials"."""
return send_from_directory(os.path.join(CLIPS_DIR, cam_id), filename)Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
trim_clip function · python · L847-L967 (121 LOC)backend/app.py
def trim_clip(cam_id, clip_id):
"""Return a trimmed portion of a previously generated clip.
Query params:
start (float, default 0) — seconds from the beginning of the clip
duration (float, default full) — seconds of trimmed output
If both params omitted (or duration >= source length), the full clip is
streamed without re-encoding.
"""
# sanitize clip_id to avoid path traversal
safe_id = os.path.basename(clip_id)
if safe_id != clip_id or not safe_id.replace("-", "").replace("_", "").isalnum():
return jsonify({"error": "invalid_clip_id"}), 400
src_path = os.path.join(CLIPS_DIR, cam_id, f"{safe_id}.mp4")
if not os.path.exists(src_path):
return jsonify({"error": "clip_not_found"}), 404
try:
start = max(0.0, float(request.args.get("start", 0)))
duration = float(request.args.get("duration", 0))
except ValueError:
return jsonify({"error": "invalid_params"}), 400
loc_name =admin_page function · python · L974-L979 (6 LOC)backend/app.py
def admin_page():
cam_list = [{"id": k, "label": _location_name(k)} for k in cameras]
users_count = len([u for u in users_db.values() if not u.get("is_admin")])
clips_count = sum(len(v) for v in clips_db.values())
return render_template("admin.html", cameras=cam_list,
users_count=users_count, clips_count=clips_count)health function · python · L991-L1007 (17 LOC)backend/app.py
def health():
result = {}
for cam_id, cam in cameras.items():
with cam["buffer_lock"]:
n = len(cam["buffer"])
dur = round(cam["buffer"][-1][0] - cam["buffer"][0][0], 1) if n > 1 else 0.0
result[cam_id] = {
"index": cam["index"],
"active": cam["active"],
"error": cam["error"],
"frames": n,
"duration_s": dur,
"ip": cam.get("ip"),
"channel": cam.get("channel"),
"is_ip": cam.get("ip") is not None,
}
return jsonify(result)snapshot function · python · L1011-L1019 (9 LOC)backend/app.py
def snapshot(cam_id):
cam = cameras.get(cam_id)
if cam is None:
return Response(status=404)
with cam["latest_lock"]:
frame = cam["latest_jpeg"]
if frame is None:
return Response(status=503)
return Response(frame, mimetype="image/jpeg", headers={"Cache-Control": "no-store"})_gen_mjpeg function · python · L1022-L1031 (10 LOC)backend/app.py
def _gen_mjpeg(cam_id: str):
cam = cameras[cam_id]
while True:
with cam["latest_lock"]:
frame = cam["latest_jpeg"]
if frame is None:
time.sleep(0.05)
continue
yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + frame + b"\r\n"
time.sleep(1.0 / CAPTURE_FPS)video_stream function · python · L1035-L1039 (5 LOC)backend/app.py
def video_stream(cam_id):
if cam_id not in cameras:
return Response(status=404)
return Response(_gen_mjpeg(cam_id),
mimetype="multipart/x-mixed-replace; boundary=frame")_scan_single function · python · L1061-L1112 (52 LOC)backend/app.py
def _scan_single(idx: int):
"""
Try to open camera at index `idx`, validate, and add to `cameras`.
Updates _scan_state in place. Runs in its own thread.
"""
def _set(status, reason="", cam_id=None):
with _scan_lock:
for entry in _scan_state["indices"]:
if entry["index"] == idx:
entry["status"] = status
entry["reason"] = reason
if cam_id:
entry["cam_id"] = cam_id
break
_set("checking")
# Skip index already active
with _scan_lock:
known = {cam["index"] for cam in cameras.values()}
if idx in known:
_set("skipped", "já ativa")
return
# Try to open
cap, backend = _try_open(idx)
if cap is None:
_set("error", "não encontrada")
return
# Validate (ghost / duplicate check)
if not _validate_new_camera(cap, idx):
cap.release()
_set("error", "gho_run_scan function · python · L1115-L1144 (30 LOC)backend/app.py
def _run_scan(candidates: list[int]):
"""Background orchestrator: launches one thread per candidate, waits up to 12s total."""
with _scan_lock:
_scan_state["running"] = True
_scan_state["started_at"] = time.time()
_scan_state["added"] = []
_scan_state["indices"] = [
{"index": i, "status": "pending", "cam_id": None, "reason": ""}
for i in candidates
]
threads = [threading.Thread(target=_scan_single, args=(i,), daemon=True) for i in candidates]
for t in threads:
t.start()
deadline = time.time() + 12 # 12 s total budget
for t in threads:
remaining = max(0.1, deadline - time.time())
t.join(timeout=remaining)
# Mark any still-pending as timed out
with _scan_lock:
for entry in _scan_state["indices"]:
if entry["status"] in ("pending", "checking"):
entry["status"] = "error"
entry["reason"] = "timeout"
Open data scored by Repobility · https://repobility.com
api_scan_cameras function · python · L1149-L1169 (21 LOC)backend/app.py
def api_scan_cameras():
"""Kick off a hot-plug scan. Returns immediately; poll /api/scan-status."""
with _scan_lock:
if _scan_state["running"]:
return jsonify({"error": "scan_already_running"}), 409
# Candidates: 0-9 minus indices already active
known = {cam["index"] for cam in cameras.values()}
data = request.get_json(silent=True) or {}
# Caller can pass {"indices": [0,1,2,...]} to override
requested = data.get("indices")
if requested:
candidates = [int(i) for i in requested if int(i) not in known]
else:
candidates = [i for i in range(10) if i not in known]
if not candidates:
return jsonify({"message": "Nenhum índice novo para verificar.", "added": []}), 200
threading.Thread(target=_run_scan, args=(candidates,), daemon=True).start()
return jsonify({"message": "Scan iniciado.", "candidates": candidates}), 202api_scan_status function · python · L1174-L1183 (10 LOC)backend/app.py
def api_scan_status():
"""Poll this endpoint to get live scan progress."""
with _scan_lock:
state = {
"running": _scan_state["running"],
"indices": list(_scan_state["indices"]),
"added": list(_scan_state["added"]),
"total_cameras": len(cameras),
}
return jsonify(state)api_add_ip_camera function · python · L1193-L1233 (41 LOC)backend/app.py
def api_add_ip_camera():
"""
Add an IP camera by its IP address.
Body: {"ip": "10.20.100.126", "channel": 1, "name": "Entrada"}
"""
global _ip_cam_counter
data = request.get_json(silent=True) or {}
ip = data.get("ip", "").strip()
channel = int(data.get("channel", 1))
name = data.get("name", "").strip()
if not ip:
return jsonify({"error": "IP obrigatório"}), 400
# Check if this IP+channel is already registered
for cam_id, cam in cameras.items():
if cam.get("ip") == ip and cam.get("channel", 1) == channel:
return jsonify({"error": f"Câmera {ip} canal {channel} já cadastrada (ID {cam_id})"}), 409
# Try to connect
cap = _try_open_ip(ip, channel)
if cap is None:
return jsonify({"error": f"Não foi possível conectar em {ip}. Verifique IP, usuário e senha."}), 502
cam_id = f"ip{_ip_cam_counter}"
_ip_cam_counter += 1
cam_name = name or f"Câmera IP {ip}"
cameras[cam_id] = _make_caapi_remove_camera function · python · L1238-L1249 (12 LOC)backend/app.py
def api_remove_camera(cam_id):
"""Remove a camera (IP or USB) by its ID."""
cam = cameras.get(cam_id)
if cam is None:
return jsonify({"error": "Câmera não encontrada"}), 404
cam["stop"] = True
cameras.pop(cam_id, None)
LOCATION_NAMES.pop(cam_id, None)
print(f"[remove] Câmera {cam_id} removida", flush=True)
return jsonify({"success": True, "cam_id": cam_id})api_list_ip_cameras function · python · L1254-L1266 (13 LOC)backend/app.py
def api_list_ip_cameras():
"""List all registered IP cameras."""
result = []
for cam_id, cam in cameras.items():
if cam.get("ip"):
result.append({
"cam_id": cam_id,
"ip": cam["ip"],
"channel": cam.get("channel", 1),
"name": cam.get("name", f"Câmera IP {cam['ip']}"),
"active": cam["active"],
})
return jsonify(result)_json_admin_only function · python · L1271-L1278 (8 LOC)backend/app.py
def _json_admin_only():
"""Retorna 401/403 JSON em vez de redirect pros endpoints /api/admin/*.
Admin real precisa estar autenticado e marcado is_admin."""
if "user" not in session or session["user"] not in users_db:
return jsonify({"error": "unauthorized"}), 401
if not users_db[session["user"]].get("is_admin"):
return jsonify({"error": "forbidden"}), 403
return Noneapi_admin_list_cameras function · python · L1282-L1306 (25 LOC)backend/app.py
def api_admin_list_cameras():
"""Lista TODAS as câmeras (USB + IP) com status detalhado — pra UI admin."""
err = _json_admin_only()
if err: return err
result = []
for cam_id, cam in cameras.items():
with cam["buffer_lock"]:
n = len(cam["buffer"])
dur = round(cam["buffer"][-1][0] - cam["buffer"][0][0], 1) if n > 1 else 0.0
result.append({
"cam_id": cam_id,
"name": cam.get("name", _location_name(cam_id)),
"label": _location_name(cam_id),
"type": "ip" if cam.get("ip") else "usb",
"ip": cam.get("ip"),
"channel": cam.get("channel"),
"index": cam.get("index"),
"active": bool(cam.get("active")),
"paused": bool(cam.get("paused")),
"error": cam.get("error"),
"frames": n,
"duration_s": dur,
})
return jsonify({"cameras": result})api_admin_pause_camera function · python · L1310-L1322 (13 LOC)backend/app.py
def api_admin_pause_camera(cam_id):
"""Pausa a captura dessa câmera (thread encerra, buffer congelado).
A câmera permanece registrada — basta /resume pra voltar."""
err = _json_admin_only()
if err: return err
cam = cameras.get(cam_id)
if cam is None:
return jsonify({"error": "camera_not_found"}), 404
cam["stop"] = True
cam["paused"] = True
cam["active"] = False
print(f"[admin] Câmera {cam_id} pausada", flush=True)
return jsonify({"ok": True, "cam_id": cam_id, "paused": True})Repobility · code-quality intelligence platform · https://repobility.com
api_admin_resume_camera function · python · L1326-L1353 (28 LOC)backend/app.py
def api_admin_resume_camera(cam_id):
"""Retoma uma câmera pausada — abre captura de novo e inicia thread."""
err = _json_admin_only()
if err: return err
cam = cameras.get(cam_id)
if cam is None:
return jsonify({"error": "camera_not_found"}), 404
if cam.get("active"):
return jsonify({"ok": True, "cam_id": cam_id, "already_active": True})
# Reabre o capture
if cam.get("ip"):
cap = _try_open_ip(cam["ip"], cam.get("channel", 1))
else:
cap, backend = _try_open(cam["index"])
if cap is not None:
cam["backend"] = backend
if cap is None:
cam["error"] = "reopen_failed"
return jsonify({"error": "reopen_failed"}), 500
cam["stop"] = False
cam["paused"] = False
cam["error"] = None
t = threading.Thread(target=capture_loop, args=(cam_id, cap), daemon=True)
t.start()
print(f"[admin] Câmera {cam_id} retomada", flush=True)
return jsonify({"ok": True, "cam_id": cam_iapi_admin_system_info function · python · L1359-L1387 (29 LOC)backend/app.py
def api_admin_system_info():
"""Retorna status do sistema (CPU, RAM, uptime, disk) pra o dashboard."""
err = _json_admin_only()
if err: return err
info = {
"platform": platform.system(),
"node": platform.node(),
"release": platform.release(),
"python": platform.python_version(),
"cameras_total": len(cameras),
"cameras_active": sum(1 for c in cameras.values() if c.get("active")),
"clips_total": sum(len(v) for v in clips_db.values()),
"buffer_seconds": BUFFER_SECONDS,
"capture_fps": CAPTURE_FPS,
}
# psutil é opcional — só usa se instalado
try:
import psutil
info["cpu_percent"] = psutil.cpu_percent(interval=0.1)
info["ram_percent"] = psutil.virtual_memory().percent
info["ram_used_mb"] = round(psutil.virtual_memory().used / 1024 / 1024)
info["ram_total_mb"] = round(psutil.virtual_memory().total / 1024 / 1024)
info[_run_system_command function · python · L1390-L1402 (13 LOC)backend/app.py
def _run_system_command(cmd_windows, cmd_linux, delay_seconds=5):
"""Dispara um comando de sistema em background após delay (dá tempo da
resposta HTTP chegar no cliente)."""
def _delayed():
time.sleep(delay_seconds)
try:
if platform.system() == "Windows":
subprocess.Popen(cmd_windows, shell=False)
else:
subprocess.Popen(cmd_linux, shell=False)
except Exception as e:
print(f"[admin] Falha ao executar comando: {e}", flush=True)
threading.Thread(target=_delayed, daemon=True).start()page 1 / 3next ›