← back to TommyKammy__openclaw-n8n-blueprint

Function bodies 109 total

All specs Real LLM only Function bodies
lifespan function · python · L38-L43 (6 LOC)
docker/knowledge-gateway/main.py
async def lifespan(app: FastAPI):
    """Manage database connection lifecycle"""
    global db_pool
    db_pool = await asyncpg.create_pool(DATABASE_URL)
    yield
    await db_pool.close()
OAuthStartRequest class · python · L54-L57 (4 LOC)
docker/knowledge-gateway/main.py
class OAuthStartRequest(BaseModel):
    guest_id: str
    slack_user_id: str
    redirect_hint: Optional[str] = "slack"
OAuthStartResponse class · python · L60-L62 (3 LOC)
docker/knowledge-gateway/main.py
class OAuthStartResponse(BaseModel):
    auth_url: str
    state: str
SearchRequest class · python · L65-L68 (4 LOC)
docker/knowledge-gateway/main.py
class SearchRequest(BaseModel):
    guest_id: str
    query: str
    top_k: int = Field(default=5, ge=1, le=20)
SearchResult class · python · L71-L75 (5 LOC)
docker/knowledge-gateway/main.py
class SearchResult(BaseModel):
    snippet: str
    source_id: str
    title: str
    source_url: Optional[str]
DisconnectRequest class · python · L86-L88 (3 LOC)
docker/knowledge-gateway/main.py
class DisconnectRequest(BaseModel):
    connection_id: str
    purge_index: bool = False
verify_token function · python · L92-L100 (9 LOC)
docker/knowledge-gateway/main.py
async def verify_token(authorization: Optional[str] = Header(None)):
    if not INTERNAL_API_TOKEN:
        return True
    if not authorization:
        raise HTTPException(status_code=401, detail="Missing authorization header")
    token = authorization.replace("Bearer ", "") if authorization.startswith("Bearer ") else authorization
    if token != INTERNAL_API_TOKEN:
        raise HTTPException(status_code=401, detail="Invalid token")
    return True
Repobility · severity-and-effort ranking · https://repobility.com
oauth_google_start function · python · L105-L150 (46 LOC)
docker/knowledge-gateway/main.py
async def oauth_google_start(
    req: OAuthStartRequest,
    authorized: bool = Depends(verify_token)
):
    """Generate Google OAuth URL for guest connection"""
    state = secrets.token_urlsafe(32)
    
    # Store state in temporary table/cache for validation
    async with db_pool.acquire() as conn:
        await conn.execute(
            """
            CREATE TABLE IF NOT EXISTS oauth_states (
                state text PRIMARY KEY,
                guest_id text NOT NULL,
                slack_user_id text NOT NULL,
                redirect_hint text,
                created_at timestamptz DEFAULT now(),
                expires_at timestamptz DEFAULT now() + interval '10 minutes'
            )
            """
        )
        await conn.execute(
            """
            INSERT INTO oauth_states (state, guest_id, slack_user_id, redirect_hint, expires_at)
            VALUES ($1, $2, $3, $4, now() + interval '10 minutes')
            """,
            state, req.guest_id, req.sl
oauth_google_callback function · python · L154-L251 (98 LOC)
docker/knowledge-gateway/main.py
async def oauth_google_callback(
    code: str = Query(...),
    state: str = Query(...),
    error: Optional[str] = Query(None)
):
    """Handle Google OAuth callback"""
    if error:
        raise HTTPException(status_code=400, detail=f"OAuth error: {error}")
    
    # Validate state
    async with db_pool.acquire() as conn:
        row = await conn.fetchrow(
            "SELECT guest_id, slack_user_id, redirect_hint FROM oauth_states WHERE state = $1 AND expires_at > now()",
            state
        )
        if not row:
            raise HTTPException(status_code=400, detail="Invalid or expired state")
        
        await conn.execute("DELETE FROM oauth_states WHERE state = $1", state)
        
        guest_id = row["guest_id"]
        slack_user_id = row["slack_user_id"]
        redirect_hint = row["redirect_hint"]
        
        # Exchange code for tokens
        async with httpx.AsyncClient() as client:
            token_resp = await client.post(
                "https:/
knowledge_search function · python · L256-L312 (57 LOC)
docker/knowledge-gateway/main.py
async def knowledge_search(
    req: SearchRequest,
    authorized: bool = Depends(verify_token)
):
    """Search guest knowledge with vector similarity"""
    # TODO: Integrate with embedding service (OpenAI or local)
    # For now, return simple text search results
    
    async with db_pool.acquire() as conn:
        # Get connection for guest
        conn_row = await conn.fetchrow(
            "SELECT id FROM knowledge_connections WHERE guest_id = $1 AND status = 'active' LIMIT 1",
            req.guest_id
        )
        
        if not conn_row:
            return SearchResponse(answers=[])
        
        connection_id = conn_row["id"]
        
        # Simple text search on chunks (placeholder for vector search)
        # In production, this would use embedding + vector similarity
        chunks = await conn.fetch(
            """
            SELECT kc.id, kc.content, kc.chunk_index, ks.title, ks.source_url, ks.id as source_id
            FROM knowledge_chunks kc
         
knowledge_sync_run function · python · L317-L344 (28 LOC)
docker/knowledge-gateway/main.py
async def knowledge_sync_run(
    req: SyncRequest,
    authorized: bool = Depends(verify_token)
):
    """Trigger sync for a connection"""
    async with db_pool.acquire() as conn:
        # Verify connection exists
        row = await conn.fetchrow(
            "SELECT id, guest_id FROM knowledge_connections WHERE id = $1",
            req.connection_id
        )
        if not row:
            raise HTTPException(status_code=404, detail="Connection not found")
        
        # Update sync cursor to mark as syncing
        await conn.execute(
            """
            UPDATE sync_cursors 
            SET last_status = 'syncing', updated_at = now()
            WHERE connection_id = $1
            """,
            req.connection_id
        )
    
    # Trigger background sync (in production, use Celery/Redis)
    asyncio.create_task(sync_connection(req.connection_id))
    
    return {"status": "accepted", "message": "Sync job queued"}
sync_connection function · python · L347-L420 (74 LOC)
docker/knowledge-gateway/main.py
async def sync_connection(connection_id: str):
    """Background task to sync Google Drive files"""
    try:
        async with db_pool.acquire() as conn:
            # Get tokens
            token_row = await conn.fetchrow(
                """
                SELECT access_token_enc, refresh_token_enc, token_expiry
                FROM oauth_tokens WHERE connection_id = $1
                """,
                connection_id
            )
            
            if not token_row:
                await update_sync_status(connection_id, "error", "No tokens found")
                return
            
            # Decrypt access token
            access_token = fernet.decrypt(token_row["access_token_enc"].encode()).decode()
            
            # Fetch files from Google Drive
            async with httpx.AsyncClient() as client:
                files_resp = await client.get(
                    "https://www.googleapis.com/drive/v3/files",
                    headers={"Authorization": 
update_sync_status function · python · L423-L436 (14 LOC)
docker/knowledge-gateway/main.py
async def update_sync_status(connection_id: str, status: str, error: Optional[str]):
    """Update sync cursor status"""
    async with db_pool.acquire() as conn:
        await conn.execute(
            """
            UPDATE sync_cursors 
            SET last_sync_at = CASE WHEN $2 = 'success' THEN now() ELSE last_sync_at END,
                last_status = $2,
                last_error = $3,
                updated_at = now()
            WHERE connection_id = $1
            """,
            connection_id, status, error
        )
knowledge_disconnect function · python · L441-L472 (32 LOC)
docker/knowledge-gateway/main.py
async def knowledge_disconnect(
    req: DisconnectRequest,
    authorized: bool = Depends(verify_token)
):
    """Disconnect provider and optionally purge data"""
    async with db_pool.acquire() as conn:
        # Verify connection exists
        row = await conn.fetchrow(
            "SELECT id, guest_id FROM knowledge_connections WHERE id = $1",
            req.connection_id
        )
        if not row:
            raise HTTPException(status_code=404, detail="Connection not found")
        
        if req.purge_index:
            # Delete all related data
            await conn.execute(
                "DELETE FROM knowledge_chunks WHERE source_id IN (SELECT id FROM knowledge_sources WHERE connection_id = $1)",
                req.connection_id
            )
            await conn.execute("DELETE FROM knowledge_sources WHERE connection_id = $1", req.connection_id)
            await conn.execute("DELETE FROM sync_cursors WHERE connection_id = $1", req.connection_id)
            awa
health_check function · python · L477-L479 (3 LOC)
docker/knowledge-gateway/main.py
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "version": "1.0.0"}
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
run function · python · L29-L40 (12 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def run(cmd):
    p = subprocess.run(cmd, capture_output=True, text=True)
    if p.returncode != 0:
        raise RuntimeError(f"command failed: {' '.join(cmd)}\n{p.stderr.strip()}")
    stdout = p.stdout
    # Strip any UI/text output before the JSON starts
    json_start = stdout.find('{')
    if json_start == -1:
        json_start = stdout.find('[')
    if json_start != -1:
        stdout = stdout[json_start:]
    return stdout
env_bool function · python · L43-L47 (5 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def env_bool(name, default=False):
    v = os.environ.get(name)
    if v is None:
        return default
    return v.strip().lower() in {"1", "true", "yes", "on"}
csv_set function · python · L50-L52 (3 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def csv_set(name):
    raw = os.environ.get(name, "")
    return {x.strip().lower() for x in raw.split(",") if x.strip()}
csv_set_raw function · python · L55-L57 (3 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def csv_set_raw(name):
    raw = os.environ.get(name, "")
    return {x.strip() for x in raw.split(",") if x.strip()}
parse_email function · python · L60-L62 (3 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def parse_email(text):
    m = EMAIL_RE.search(text or "")
    return m.group(0).lower() if m else None
parse_slack_user_id function · python · L65-L67 (3 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def parse_slack_user_id(text):
    m = SLACK_FROM_RE.search(text or "")
    return m.group(1) if m else None
content_text function · python · L70-L77 (8 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def content_text(content):
    if not isinstance(content, list):
        return ""
    out = []
    for item in content:
        if isinstance(item, dict) and item.get("type") == "text":
            out.append(item.get("text", ""))
    return "\n".join(out)
discover_requesters function · python · L80-L109 (30 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def discover_requesters(session_glob):
    requesters = {}
    for fp in sorted(glob.glob(session_glob), key=os.path.getmtime):
        last = {"slack_user_id": None, "email": None}
        with open(fp, "r", encoding="utf-8") as f:
            for raw in f:
                line = raw.strip()
                if not line:
                    continue
                try:
                    row = json.loads(line)
                except json.JSONDecodeError:
                    continue
                msg = row.get("message", {})
                role = msg.get("role")
                if role == "user":
                    text = content_text(msg.get("content", []))
                    sid = parse_slack_user_id(text)
                    if sid:
                        last = {"slack_user_id": sid, "email": parse_email(text)}
                    continue
                if role != "toolResult" or msg.get("toolName") != "cron":
                    continue
                details = msg.get
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
fetch_slack_email function · python · L112-L132 (21 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def fetch_slack_email(bot_token, slack_user_id):
    if not bot_token:
        return None
    try:
        out = run([
            "curl",
            "-sS",
            "--max-time",
            "8",
            "https://slack.com/api/users.info",
            "-H",
            f"Authorization: Bearer {bot_token}",
            "--data-urlencode",
            f"user={slack_user_id}",
        ])
        data = json.loads(out)
        if not data.get("ok"):
            return None
        return (data.get("user", {}).get("profile", {}).get("email") or "").lower() or None
    except Exception:
        return None
schedule_params function · python · L135-L143 (9 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def schedule_params(job):
    sch = job.get("schedule", {})
    kind = sch.get("kind")
    if kind == "cron" and sch.get("expr"):
        return {"rule": {"interval": [{"field": "cronExpression", "expression": sch["expr"]}]}}
    if kind == "every" and sch.get("everyMs"):
        mins = max(1, int(sch["everyMs"] / 60000))
        return {"rule": {"interval": [{"field": "minutes", "minutesInterval": mins}]}}
    return None
isolation_message function · python · L150-L166 (17 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def isolation_message(original, slack_user_id, requester_email):
    root = f"~/.openclaw/workspace/memory/users/{slack_user_id}"
    who = requester_email or slack_user_id
    head = (
        "[Memory Isolation Policy]\n"
        f"Requester: {who}\n"
        f"Slack User ID: {slack_user_id}\n"
        f"Allowed memory root: {root}\n\n"
        "Rules:\n"
        "1. Read/write persistent memory only under the allowed memory root.\n"
        "2. Never read/write any other user's memory directory.\n"
        "3. If path is missing, create it only under the allowed root.\n"
        "4. Refuse requests for other users' memory.\n"
        "5. Rewrite hardcoded memory paths to the allowed root.\n"
        "----------------------------------------\n"
    )
    return head + (original or "")
make_workflow function · python · L169-L214 (46 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def make_workflow(job, requester_email, slack_user_id, hook_url, hook_token):
    sid = hashlib.sha1(job["id"].encode("utf-8")).hexdigest()[:8]
    params = schedule_params(job)
    if not params:
        return None
    body = {
        "message": isolation_message(job.get("payload", {}).get("message", ""), slack_user_id, requester_email),
        "name": f"openclaw-sync:{job.get('name', 'job')}",
        "sessionKey": f"openclaw-sync:{slack_user_id}:{job['id']}",
        "wakeMode": job.get("wakeMode", "next-heartbeat"),
        "deliver": False,
    }
    return {
        "id": workflow_id(job["id"]),
        "name": f"OpenClaw Sync | {job.get('name','job')} | {requester_email or 'unknown'}",
        "nodes": [
            {
                "id": f"n1{sid}",
                "name": "Schedule Trigger",
                "type": "n8n-nodes-base.scheduleTrigger",
                "typeVersion": 1.2,
                "position": [240, 300],
                "parameters": params,
            
hash_item function · python · L217-L219 (3 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def hash_item(job, hook_url, hook_token):
    base = json.dumps(job, sort_keys=True, ensure_ascii=True) + "|" + hook_url + "|" + hook_token + "|v1"
    return hashlib.sha256(base.encode("utf-8")).hexdigest()
project_id_for_email function · python · L226-L234 (9 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def project_id_for_email(db_container, email):
    if not email:
        return None
    sql = (
        "SELECT p.id FROM project p JOIN \"user\" u ON u.id=p.\"creatorId\" "
        f"WHERE p.type='personal' AND lower(u.email)='{sql_escape(email.lower())}' LIMIT 1;"
    )
    out = run(["docker", "exec", "-i", db_container, "psql", "-U", "n8n", "-d", "n8n", "-At", "-c", sql]).strip()
    return out or None
set_workflow_owners function · python · L237-L247 (11 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def set_workflow_owners(db_container, assignments):
    if not assignments:
        return
    sqls = []
    for wid, pid in assignments.items():
        sqls.append(f"DELETE FROM shared_workflow WHERE \"workflowId\"='{sql_escape(wid)}';")
        sqls.append(
            "INSERT INTO shared_workflow (\"workflowId\",\"projectId\",role,\"createdAt\",\"updatedAt\") "
            f"VALUES ('{sql_escape(wid)}','{sql_escape(pid)}','workflow:owner',CURRENT_TIMESTAMP(3),CURRENT_TIMESTAMP(3));"
        )
    run(["docker", "exec", "-i", db_container, "psql", "-U", "n8n", "-d", "n8n", "-c", "\n".join(sqls)])
set_active function · python · L250-L254 (5 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def set_active(db_container, state):
    if not state:
        return
    sqls = [f"UPDATE workflow_entity SET active={'true' if active else 'false'} WHERE id='{sql_escape(wid)}';" for wid, active in state.items()]
    run(["docker", "exec", "-i", db_container, "psql", "-U", "n8n", "-d", "n8n", "-c", "\n".join(sqls)])
Same scanner, your repo: https://repobility.com — Repobility
import_workflows function · python · L257-L266 (10 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def import_workflows(n8n_container, import_dir, files_dir):
    run(["docker", "exec", n8n_container, "sh", "-lc", f"rm -rf {import_dir}/* && mkdir -p {import_dir}"])
    for fname in os.listdir(files_dir):
        fpath = os.path.join(files_dir, fname)
        if os.path.isfile(fpath):
            with open(fpath, "r", encoding="utf-8") as f:
                content = f.read()
            encoded = base64.b64encode(content.encode("utf-8")).decode("ascii")
            run(["docker", "exec", n8n_container, "sh", "-lc", f"echo '{encoded}' | base64 -d > {import_dir}/{fname}"])
    run(["docker", "exec", n8n_container, "n8n", "import:workflow", "--separate", "--input", import_dir])
delete_workflows function · python · L269-L273 (5 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def delete_workflows(db_container, workflow_ids):
    if not workflow_ids:
        return
    quoted = ",".join([f"'{sql_escape(x)}'" for x in workflow_ids])
    run(["docker", "exec", "-i", db_container, "psql", "-U", "n8n", "-d", "n8n", "-c", f"DELETE FROM workflow_entity WHERE id IN ({quoted});"])
load_state function · python · L276-L280 (5 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def load_state(path):
    if not os.path.exists(path):
        return {"jobs": {}, "managed_workflow_ids": []}
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)
save_state function · python · L283-L288 (6 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def save_state(path, data):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    tmp = path + ".tmp"
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=True, indent=2)
    os.replace(tmp, path)
get_fallback_requester function · python · L291-L298 (8 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def get_fallback_requester():
    fallback = os.environ.get("SYNC_FALLBACK_REQUESTER", "").strip()
    if not fallback:
        return None
    if ":" in fallback:
        sid, email = fallback.split(":", 1)
        return {"slack_user_id": sid.strip(), "email": email.strip().lower()}
    return None
sync_once function · python · L301-L408 (108 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def sync_once():
    state_file = os.environ.get("SYNC_STATE_FILE", "/state/state.json")
    session_glob = os.environ.get("OPENCLAW_SESSION_GLOB", "/home/openclaw/.openclaw/agents/main/sessions/*.jsonl")
    hook_url = os.environ["OPENCLAW_HOOK_URL"]
    hook_token = os.environ["OPENCLAW_HOOK_TOKEN"]
    n8n_container = os.environ.get("N8N_CONTAINER", "n8n-app")
    db_container = os.environ.get("N8N_DB_CONTAINER", "n8n-postgres")
    n8n_import_dir = os.environ.get("N8N_IMPORT_DIR", "/tmp/openclaw-sync")
    allowed_ids = csv_set_raw("SYNC_ALLOWED_SLACK_USER_IDS")
    allowed_emails = csv_set("SYNC_ALLOWED_EMAILS")
    require_slack_email = env_bool("SYNC_REQUIRE_SLACK_EMAIL_VERIFICATION", True)
    slack_bot_token = os.environ.get("SLACK_BOT_TOKEN", "")
    allow_cli_jobs = env_bool("SYNC_ALLOW_CLI_JOBS", True)
    fallback_requester = get_fallback_requester()

    user_email_map = {}
    mapping_raw = os.environ.get("SYNC_USER_EMAIL_MAP", "")
    if mapping_raw:
        for item in
Handler class · python · L411-L456 (46 LOC)
docker/sync/openclaw_n8n_sync_worker.py
class Handler(BaseHTTPRequestHandler):
    def _send(self, code, content_type, body_bytes):
        self.send_response(code)
        self.send_header("Content-Type", content_type)
        self.send_header("Content-Length", str(len(body_bytes)))
        self.end_headers()
        self.wfile.write(body_bytes)

    def do_GET(self):
        if self.path == "/healthz":
            with LOCK:
                ok = bool(SYNC_STATUS.get("ok", False))
                payload = {
                    "ok": ok,
                    "last_run_at": SYNC_STATUS.get("last_run_at"),
                    "last_success_at": SYNC_STATUS.get("last_success_at"),
                    "last_error": SYNC_STATUS.get("last_error", ""),
                }
            self._send(200 if ok else 503, "application/json", json.dumps(payload, ensure_ascii=True).encode("utf-8"))
            return

        if self.path == "/metrics":
            with LOCK:
                result = SYNC_STATUS.get("last_result", {})
        
_send method · python · L412-L417 (6 LOC)
docker/sync/openclaw_n8n_sync_worker.py
    def _send(self, code, content_type, body_bytes):
        self.send_response(code)
        self.send_header("Content-Type", content_type)
        self.send_header("Content-Length", str(len(body_bytes)))
        self.end_headers()
        self.wfile.write(body_bytes)
Repobility · severity-and-effort ranking · https://repobility.com
do_GET method · python · L419-L453 (35 LOC)
docker/sync/openclaw_n8n_sync_worker.py
    def do_GET(self):
        if self.path == "/healthz":
            with LOCK:
                ok = bool(SYNC_STATUS.get("ok", False))
                payload = {
                    "ok": ok,
                    "last_run_at": SYNC_STATUS.get("last_run_at"),
                    "last_success_at": SYNC_STATUS.get("last_success_at"),
                    "last_error": SYNC_STATUS.get("last_error", ""),
                }
            self._send(200 if ok else 503, "application/json", json.dumps(payload, ensure_ascii=True).encode("utf-8"))
            return

        if self.path == "/metrics":
            with LOCK:
                result = SYNC_STATUS.get("last_result", {})
                ok_num = 1 if SYNC_STATUS.get("ok", False) else 0
                lines = [
                    "# HELP openclaw_sync_ok 1 if last run succeeded",
                    "# TYPE openclaw_sync_ok gauge",
                    f"openclaw_sync_ok {ok_num}",
                    "# HELP openclaw_sync_synced_las
sync_loop function · python · L459-L478 (20 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def sync_loop():
    interval = int(os.environ.get("SYNC_INTERVAL_SECONDS", "60"))
    while True:
        now = int(time.time())
        try:
            result = sync_once()
            with LOCK:
                SYNC_STATUS["ok"] = True
                SYNC_STATUS["last_run_at"] = now
                SYNC_STATUS["last_success_at"] = now
                SYNC_STATUS["last_error"] = ""
                SYNC_STATUS["last_result"] = result
            print(json.dumps({"event": "sync_ok", **result}, ensure_ascii=True), flush=True)
        except Exception as e:
            with LOCK:
                SYNC_STATUS["ok"] = False
                SYNC_STATUS["last_run_at"] = now
                SYNC_STATUS["last_error"] = str(e)
            print(json.dumps({"event": "sync_error", "error": str(e)}, ensure_ascii=True), flush=True)
        time.sleep(max(5, interval))
main function · python · L481-L487 (7 LOC)
docker/sync/openclaw_n8n_sync_worker.py
def main():
    port = int(os.environ.get("SYNC_METRICS_PORT", "18090"))
    thread = threading.Thread(target=sync_loop, daemon=True)
    thread.start()
    server = ThreadingHTTPServer(("0.0.0.0", port), Handler)
    print(json.dumps({"event": "metrics_server_started", "port": port}, ensure_ascii=True), flush=True)
    server.serve_forever()
env_bool function · python · L17-L21 (5 LOC)
guest_automation_service.py
def env_bool(name, default=False):
    val = os.environ.get(name)
    if val is None:
        return default
    return val.strip().lower() in {"1", "true", "yes", "on"}
Config class · python · L24-L39 (16 LOC)
guest_automation_service.py
class Config:
    HOST = os.environ.get("GUEST_AUTOMATION_HOST", "0.0.0.0")
    PORT = int(os.environ.get("GUEST_AUTOMATION_PORT", "18111"))
    TOKEN = os.environ.get("GUEST_AUTOMATION_TOKEN", "")
    
    GITHUB_OWNER = os.environ.get("GITHUB_OWNER", "TommyKammy")
    GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "")
    GUEST_TEMPLATE_REPO = os.environ.get("GUEST_TEMPLATE_REPO", "TommyKammy/guest-app-template")
    
    SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", "")
    CLAW_BOT_USER_ID = os.environ.get("CLAW_BOT_USER_ID", "U0ADFF3483E")
    
    VERCEL_TOKEN = os.environ.get("VERCEL_TOKEN", "")
    VERCEL_ORG_ID = os.environ.get("VERCEL_ORG_ID", "")

    DB_PATH = os.environ.get("GUEST_AUTOMATION_DB_PATH", "/var/lib/guest-automation/automation.db")
init_db function · python · L45-L60 (16 LOC)
guest_automation_service.py
def init_db(path):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    con = sqlite3.connect(path)
    try:
        con.execute(
            """
            CREATE TABLE IF NOT EXISTS slack_channels (
              channel_name TEXT PRIMARY KEY,
              channel_id TEXT NOT NULL,
              updated_at INTEGER NOT NULL
            )
            """
        )
        con.commit()
    finally:
        con.close()
cache_slack_channel function · python · L63-L80 (18 LOC)
guest_automation_service.py
def cache_slack_channel(channel_name, channel_id):
    if not channel_name or not channel_id:
        return
    con = sqlite3.connect(CFG.DB_PATH)
    try:
        con.execute(
            """
            INSERT INTO slack_channels (channel_name, channel_id, updated_at)
            VALUES (?, ?, ?)
            ON CONFLICT(channel_name) DO UPDATE SET
              channel_id=excluded.channel_id,
              updated_at=excluded.updated_at
            """,
            (channel_name, channel_id, int(time.time())),
        )
        con.commit()
    finally:
        con.close()
get_cached_slack_channel_id function · python · L83-L93 (11 LOC)
guest_automation_service.py
def get_cached_slack_channel_id(channel_name):
    con = sqlite3.connect(CFG.DB_PATH)
    con.row_factory = sqlite3.Row
    try:
        row = con.execute(
            "SELECT channel_id FROM slack_channels WHERE channel_name=? LIMIT 1",
            (channel_name,),
        ).fetchone()
        return row["channel_id"] if row else None
    finally:
        con.close()
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
verify_token function · python · L96-L102 (7 LOC)
guest_automation_service.py
def verify_token(auth_header):
    if not CFG.TOKEN:
        return True
    if not auth_header:
        return False
    expected = f"Bearer {CFG.TOKEN}"
    return hmac.compare_digest(auth_header, expected)
create_github_repo function · python · L105-L133 (29 LOC)
guest_automation_service.py
def create_github_repo(repo_name, description=""):
    if not CFG.GITHUB_TOKEN:
        return {"ok": False, "error": "missing GITHUB_TOKEN"}
    
    url = f"https://api.github.com/repos/{CFG.GUEST_TEMPLATE_REPO}/generate"
    headers = {
        "Authorization": f"Bearer {CFG.GITHUB_TOKEN}",
        "Accept": "application/vnd.github.v3+json",
        "Content-Type": "application/json"
    }
    data = {
        "owner": CFG.GITHUB_OWNER,
        "name": repo_name,
        "description": description or f"Guest app for {repo_name}",
        "private": True,
        "include_all_branches": False
    }
    
    try:
        req = urllib.request.Request(url, data=json.dumps(data).encode(), headers=headers, method="POST")
        with urllib.request.urlopen(req, timeout=30) as resp:
            result = json.loads(resp.read().decode())
            return {"ok": True, "repo": f"{CFG.GITHUB_OWNER}/{repo_name}", "html_url": result.get("html_url")}
    except urllib.error.HTTPError as e:
     
create_slack_channel function · python · L136-L198 (63 LOC)
guest_automation_service.py
def create_slack_channel(channel_name, guest_user_id):
    if not CFG.SLACK_BOT_TOKEN:
        return {"ok": False, "error": "missing SLACK_BOT_TOKEN"}
    
    # Create channel
    url = "https://slack.com/api/conversations.create"
    headers = {
        "Authorization": f"Bearer {CFG.SLACK_BOT_TOKEN}",
        "Content-Type": "application/json"
    }
    data = {"name": channel_name, "is_private": False}
    
    try:
        def invite_users(channel_id, user_ids):
            # Invite each user individually to avoid partial failures.
            for uid in [str(u).strip() for u in (user_ids or []) if str(u).strip()]:
                invite_url = "https://slack.com/api/conversations.invite"
                invite_req = urllib.request.Request(
                    invite_url,
                    data=json.dumps({"channel": channel_id, "users": uid}).encode(),
                    headers=headers,
                    method="POST",
                )
                with urllib.request.u
page 1 / 3next ›