← back to TommyKammy__openclaw-n8n-blueprint

Function bodies 109 total

All specs Real LLM only Function bodies
worker_loop function · python · L720-L731 (12 LOC)
slack_n8n_provisioner.py
def worker_loop():
    while True:
        rows = db_exec(
            "SELECT id, provider, payload FROM events WHERE status IN ('pending','failed') AND attempts < 5 ORDER BY received_at ASC LIMIT 10",
            fetch=True,
        ) or []
        for row in rows:
            try:
                process_event(row)
            except Exception as exc:
                db_exec("UPDATE events SET status='failed', attempts=attempts+1, reason=? WHERE id=?", (str(exc)[:500], row["id"]))
        time.sleep(2)
Handler class · python · L734-L861 (128 LOC)
slack_n8n_provisioner.py
class Handler(BaseHTTPRequestHandler):
    def _json(self, code, payload):
        body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
        self.send_response(code)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def _text(self, code, text):
        body = text.encode("utf-8")
        self.send_response(code)
        self.send_header("Content-Type", "text/plain; charset=utf-8")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def do_GET(self):
        if self.path == "/healthz":
            # Keep payload small and non-sensitive. This endpoint is polled by
            # compose healthchecks and is also useful for quick ops triage.
            counts = {}
            try:
                rows = db_exec(
                    "SELECT status, COUNT(*) AS c FROM events GROUP
_json method · python · L735-L741 (7 LOC)
slack_n8n_provisioner.py
    def _json(self, code, payload):
        body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
        self.send_response(code)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)
_text method · python · L743-L749 (7 LOC)
slack_n8n_provisioner.py
    def _text(self, code, text):
        body = text.encode("utf-8")
        self.send_response(code)
        self.send_header("Content-Type", "text/plain; charset=utf-8")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)
do_GET method · python · L751-L790 (40 LOC)
slack_n8n_provisioner.py
    def do_GET(self):
        if self.path == "/healthz":
            # Keep payload small and non-sensitive. This endpoint is polled by
            # compose healthchecks and is also useful for quick ops triage.
            counts = {}
            try:
                rows = db_exec(
                    "SELECT status, COUNT(*) AS c FROM events GROUP BY status",
                    fetch=True,
                ) or []
                for r in rows:
                    counts[r["status"]] = int(r["c"]) if r["c"] is not None else 0
                last = db_exec(
                    "SELECT id, provider, event_type, status, reason, received_at FROM events ORDER BY received_at DESC LIMIT 1",
                    fetch=True,
                )
                last = last[0] if last else None
            except Exception:
                last = None
            self._json(
                200,
                {
                    "ok": True,
                    "time": now_iso(),
           
do_POST method · python · L792-L799 (8 LOC)
slack_n8n_provisioner.py
    def do_POST(self):
        if self.path == "/slack/events":
            self._handle_slack()
            return
        if self.path.startswith("/teams/events"):
            self._handle_teams()
            return
        self._json(404, {"ok": False, "error": "not_found"})
_handle_slack method · python · L801-L826 (26 LOC)
slack_n8n_provisioner.py
    def _handle_slack(self):
        length = int(self.headers.get("Content-Length", "0"))
        raw = self.rfile.read(length)
        try:
            payload = json.loads(raw.decode("utf-8"))
        except Exception:
            self._json(400, {"ok": False, "error": "invalid_json"})
            return

        if payload.get("type") == "url_verification":
            self._text(200, payload.get("challenge", ""))
            return

        ts = self.headers.get("X-Slack-Request-Timestamp", "")
        sig = self.headers.get("X-Slack-Signature", "")
        if not verify_slack_signature(raw, ts, sig):
            self._json(401, {"ok": False, "error": "invalid_signature"})
            return

        event_type = payload.get("event", {}).get("type")
        if event_type not in {"team_join", "user_change", "user_deactivated"}:
            self._json(200, {"ok": True, "ignored": True})
            return
        event_id = payload.get("event_id") or secrets.token_hex(16)
        qu
Repobility — same analyzer, your code, free for public repos · /scan/
_handle_teams method · python · L828-L858 (31 LOC)
slack_n8n_provisioner.py
    def _handle_teams(self):
        if not CFG.TEAMS_ENABLED:
            self._json(403, {"ok": False, "error": "teams_disabled"})
            return

        qs = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(qs)
        token = (params.get("validationToken") or [""])[0]
        if token:
            self._text(200, token)
            return

        length = int(self.headers.get("Content-Length", "0"))
        raw = self.rfile.read(length)
        try:
            payload = json.loads(raw.decode("utf-8"))
        except Exception:
            self._json(400, {"ok": False, "error": "invalid_json"})
            return

        if CFG.TEAMS_CLIENT_STATE:
            values = payload.get("value", []) if isinstance(payload, dict) else []
            if values:
                cs = values[0].get("clientState", "")
                if cs != CFG.TEAMS_CLIENT_STATE:
                    self._json(401, {"ok": False, "error": "invalid_client_state"})
           
main function · python · L864-L869 (6 LOC)
slack_n8n_provisioner.py
def main():
    init_db(CFG.DB_PATH)
    t = threading.Thread(target=worker_loop, daemon=True)
    t.start()
    server = ThreadingHTTPServer((CFG.HOST, CFG.PORT), Handler)
    server.serve_forever()
‹ prevpage 3 / 3