Function bodies 109 total
worker_loop function · python · L720-L731 (12 LOC)slack_n8n_provisioner.py
def worker_loop():
while True:
rows = db_exec(
"SELECT id, provider, payload FROM events WHERE status IN ('pending','failed') AND attempts < 5 ORDER BY received_at ASC LIMIT 10",
fetch=True,
) or []
for row in rows:
try:
process_event(row)
except Exception as exc:
db_exec("UPDATE events SET status='failed', attempts=attempts+1, reason=? WHERE id=?", (str(exc)[:500], row["id"]))
time.sleep(2)Handler class · python · L734-L861 (128 LOC)slack_n8n_provisioner.py
class Handler(BaseHTTPRequestHandler):
def _json(self, code, payload):
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _text(self, code, text):
body = text.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self):
if self.path == "/healthz":
# Keep payload small and non-sensitive. This endpoint is polled by
# compose healthchecks and is also useful for quick ops triage.
counts = {}
try:
rows = db_exec(
"SELECT status, COUNT(*) AS c FROM events GROUP_json method · python · L735-L741 (7 LOC)slack_n8n_provisioner.py
def _json(self, code, payload):
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)_text method · python · L743-L749 (7 LOC)slack_n8n_provisioner.py
def _text(self, code, text):
body = text.encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)do_GET method · python · L751-L790 (40 LOC)slack_n8n_provisioner.py
def do_GET(self):
if self.path == "/healthz":
# Keep payload small and non-sensitive. This endpoint is polled by
# compose healthchecks and is also useful for quick ops triage.
counts = {}
try:
rows = db_exec(
"SELECT status, COUNT(*) AS c FROM events GROUP BY status",
fetch=True,
) or []
for r in rows:
counts[r["status"]] = int(r["c"]) if r["c"] is not None else 0
last = db_exec(
"SELECT id, provider, event_type, status, reason, received_at FROM events ORDER BY received_at DESC LIMIT 1",
fetch=True,
)
last = last[0] if last else None
except Exception:
last = None
self._json(
200,
{
"ok": True,
"time": now_iso(),
do_POST method · python · L792-L799 (8 LOC)slack_n8n_provisioner.py
def do_POST(self):
if self.path == "/slack/events":
self._handle_slack()
return
if self.path.startswith("/teams/events"):
self._handle_teams()
return
self._json(404, {"ok": False, "error": "not_found"})_handle_slack method · python · L801-L826 (26 LOC)slack_n8n_provisioner.py
def _handle_slack(self):
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
try:
payload = json.loads(raw.decode("utf-8"))
except Exception:
self._json(400, {"ok": False, "error": "invalid_json"})
return
if payload.get("type") == "url_verification":
self._text(200, payload.get("challenge", ""))
return
ts = self.headers.get("X-Slack-Request-Timestamp", "")
sig = self.headers.get("X-Slack-Signature", "")
if not verify_slack_signature(raw, ts, sig):
self._json(401, {"ok": False, "error": "invalid_signature"})
return
event_type = payload.get("event", {}).get("type")
if event_type not in {"team_join", "user_change", "user_deactivated"}:
self._json(200, {"ok": True, "ignored": True})
return
event_id = payload.get("event_id") or secrets.token_hex(16)
quRepobility — same analyzer, your code, free for public repos · /scan/
_handle_teams method · python · L828-L858 (31 LOC)slack_n8n_provisioner.py
def _handle_teams(self):
if not CFG.TEAMS_ENABLED:
self._json(403, {"ok": False, "error": "teams_disabled"})
return
qs = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(qs)
token = (params.get("validationToken") or [""])[0]
if token:
self._text(200, token)
return
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
try:
payload = json.loads(raw.decode("utf-8"))
except Exception:
self._json(400, {"ok": False, "error": "invalid_json"})
return
if CFG.TEAMS_CLIENT_STATE:
values = payload.get("value", []) if isinstance(payload, dict) else []
if values:
cs = values[0].get("clientState", "")
if cs != CFG.TEAMS_CLIENT_STATE:
self._json(401, {"ok": False, "error": "invalid_client_state"})
main function · python · L864-L869 (6 LOC)slack_n8n_provisioner.py
def main():
init_db(CFG.DB_PATH)
t = threading.Thread(target=worker_loop, daemon=True)
t.start()
server = ThreadingHTTPServer((CFG.HOST, CFG.PORT), Handler)
server.serve_forever()‹ prevpage 3 / 3