Function bodies 109 total
lifespan function · python · L38-L43 (6 LOC)docker/knowledge-gateway/main.py
async def lifespan(app: FastAPI):
"""Manage database connection lifecycle"""
global db_pool
db_pool = await asyncpg.create_pool(DATABASE_URL)
yield
await db_pool.close()OAuthStartRequest class · python · L54-L57 (4 LOC)docker/knowledge-gateway/main.py
class OAuthStartRequest(BaseModel):
guest_id: str
slack_user_id: str
redirect_hint: Optional[str] = "slack"OAuthStartResponse class · python · L60-L62 (3 LOC)docker/knowledge-gateway/main.py
class OAuthStartResponse(BaseModel):
auth_url: str
state: strSearchRequest class · python · L65-L68 (4 LOC)docker/knowledge-gateway/main.py
class SearchRequest(BaseModel):
guest_id: str
query: str
top_k: int = Field(default=5, ge=1, le=20)SearchResult class · python · L71-L75 (5 LOC)docker/knowledge-gateway/main.py
class SearchResult(BaseModel):
snippet: str
source_id: str
title: str
source_url: Optional[str]DisconnectRequest class · python · L86-L88 (3 LOC)docker/knowledge-gateway/main.py
class DisconnectRequest(BaseModel):
connection_id: str
purge_index: bool = Falseverify_token function · python · L92-L100 (9 LOC)docker/knowledge-gateway/main.py
async def verify_token(authorization: Optional[str] = Header(None)):
if not INTERNAL_API_TOKEN:
return True
if not authorization:
raise HTTPException(status_code=401, detail="Missing authorization header")
token = authorization.replace("Bearer ", "") if authorization.startswith("Bearer ") else authorization
if token != INTERNAL_API_TOKEN:
raise HTTPException(status_code=401, detail="Invalid token")
return TrueRepobility · severity-and-effort ranking · https://repobility.com
oauth_google_start function · python · L105-L150 (46 LOC)docker/knowledge-gateway/main.py
async def oauth_google_start(
req: OAuthStartRequest,
authorized: bool = Depends(verify_token)
):
"""Generate Google OAuth URL for guest connection"""
state = secrets.token_urlsafe(32)
# Store state in temporary table/cache for validation
async with db_pool.acquire() as conn:
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS oauth_states (
state text PRIMARY KEY,
guest_id text NOT NULL,
slack_user_id text NOT NULL,
redirect_hint text,
created_at timestamptz DEFAULT now(),
expires_at timestamptz DEFAULT now() + interval '10 minutes'
)
"""
)
await conn.execute(
"""
INSERT INTO oauth_states (state, guest_id, slack_user_id, redirect_hint, expires_at)
VALUES ($1, $2, $3, $4, now() + interval '10 minutes')
""",
state, req.guest_id, req.sloauth_google_callback function · python · L154-L251 (98 LOC)docker/knowledge-gateway/main.py
async def oauth_google_callback(
code: str = Query(...),
state: str = Query(...),
error: Optional[str] = Query(None)
):
"""Handle Google OAuth callback"""
if error:
raise HTTPException(status_code=400, detail=f"OAuth error: {error}")
# Validate state
async with db_pool.acquire() as conn:
row = await conn.fetchrow(
"SELECT guest_id, slack_user_id, redirect_hint FROM oauth_states WHERE state = $1 AND expires_at > now()",
state
)
if not row:
raise HTTPException(status_code=400, detail="Invalid or expired state")
await conn.execute("DELETE FROM oauth_states WHERE state = $1", state)
guest_id = row["guest_id"]
slack_user_id = row["slack_user_id"]
redirect_hint = row["redirect_hint"]
# Exchange code for tokens
async with httpx.AsyncClient() as client:
token_resp = await client.post(
"https:/knowledge_search function · python · L256-L312 (57 LOC)docker/knowledge-gateway/main.py
async def knowledge_search(
req: SearchRequest,
authorized: bool = Depends(verify_token)
):
"""Search guest knowledge with vector similarity"""
# TODO: Integrate with embedding service (OpenAI or local)
# For now, return simple text search results
async with db_pool.acquire() as conn:
# Get connection for guest
conn_row = await conn.fetchrow(
"SELECT id FROM knowledge_connections WHERE guest_id = $1 AND status = 'active' LIMIT 1",
req.guest_id
)
if not conn_row:
return SearchResponse(answers=[])
connection_id = conn_row["id"]
# Simple text search on chunks (placeholder for vector search)
# In production, this would use embedding + vector similarity
chunks = await conn.fetch(
"""
SELECT kc.id, kc.content, kc.chunk_index, ks.title, ks.source_url, ks.id as source_id
FROM knowledge_chunks kc
knowledge_sync_run function · python · L317-L344 (28 LOC)docker/knowledge-gateway/main.py
async def knowledge_sync_run(
req: SyncRequest,
authorized: bool = Depends(verify_token)
):
"""Trigger sync for a connection"""
async with db_pool.acquire() as conn:
# Verify connection exists
row = await conn.fetchrow(
"SELECT id, guest_id FROM knowledge_connections WHERE id = $1",
req.connection_id
)
if not row:
raise HTTPException(status_code=404, detail="Connection not found")
# Update sync cursor to mark as syncing
await conn.execute(
"""
UPDATE sync_cursors
SET last_status = 'syncing', updated_at = now()
WHERE connection_id = $1
""",
req.connection_id
)
# Trigger background sync (in production, use Celery/Redis)
asyncio.create_task(sync_connection(req.connection_id))
return {"status": "accepted", "message": "Sync job queued"}sync_connection function · python · L347-L420 (74 LOC)docker/knowledge-gateway/main.py
async def sync_connection(connection_id: str):
"""Background task to sync Google Drive files"""
try:
async with db_pool.acquire() as conn:
# Get tokens
token_row = await conn.fetchrow(
"""
SELECT access_token_enc, refresh_token_enc, token_expiry
FROM oauth_tokens WHERE connection_id = $1
""",
connection_id
)
if not token_row:
await update_sync_status(connection_id, "error", "No tokens found")
return
# Decrypt access token
access_token = fernet.decrypt(token_row["access_token_enc"].encode()).decode()
# Fetch files from Google Drive
async with httpx.AsyncClient() as client:
files_resp = await client.get(
"https://www.googleapis.com/drive/v3/files",
headers={"Authorization": update_sync_status function · python · L423-L436 (14 LOC)docker/knowledge-gateway/main.py
async def update_sync_status(connection_id: str, status: str, error: Optional[str]):
"""Update sync cursor status"""
async with db_pool.acquire() as conn:
await conn.execute(
"""
UPDATE sync_cursors
SET last_sync_at = CASE WHEN $2 = 'success' THEN now() ELSE last_sync_at END,
last_status = $2,
last_error = $3,
updated_at = now()
WHERE connection_id = $1
""",
connection_id, status, error
)knowledge_disconnect function · python · L441-L472 (32 LOC)docker/knowledge-gateway/main.py
async def knowledge_disconnect(
req: DisconnectRequest,
authorized: bool = Depends(verify_token)
):
"""Disconnect provider and optionally purge data"""
async with db_pool.acquire() as conn:
# Verify connection exists
row = await conn.fetchrow(
"SELECT id, guest_id FROM knowledge_connections WHERE id = $1",
req.connection_id
)
if not row:
raise HTTPException(status_code=404, detail="Connection not found")
if req.purge_index:
# Delete all related data
await conn.execute(
"DELETE FROM knowledge_chunks WHERE source_id IN (SELECT id FROM knowledge_sources WHERE connection_id = $1)",
req.connection_id
)
await conn.execute("DELETE FROM knowledge_sources WHERE connection_id = $1", req.connection_id)
await conn.execute("DELETE FROM sync_cursors WHERE connection_id = $1", req.connection_id)
awahealth_check function · python · L477-L479 (3 LOC)docker/knowledge-gateway/main.py
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "version": "1.0.0"}Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
run function · python · L29-L40 (12 LOC)docker/sync/openclaw_n8n_sync_worker.py
def run(cmd):
p = subprocess.run(cmd, capture_output=True, text=True)
if p.returncode != 0:
raise RuntimeError(f"command failed: {' '.join(cmd)}\n{p.stderr.strip()}")
stdout = p.stdout
# Strip any UI/text output before the JSON starts
json_start = stdout.find('{')
if json_start == -1:
json_start = stdout.find('[')
if json_start != -1:
stdout = stdout[json_start:]
return stdoutenv_bool function · python · L43-L47 (5 LOC)docker/sync/openclaw_n8n_sync_worker.py
def env_bool(name, default=False):
v = os.environ.get(name)
if v is None:
return default
return v.strip().lower() in {"1", "true", "yes", "on"}csv_set function · python · L50-L52 (3 LOC)docker/sync/openclaw_n8n_sync_worker.py
def csv_set(name):
raw = os.environ.get(name, "")
return {x.strip().lower() for x in raw.split(",") if x.strip()}csv_set_raw function · python · L55-L57 (3 LOC)docker/sync/openclaw_n8n_sync_worker.py
def csv_set_raw(name):
raw = os.environ.get(name, "")
return {x.strip() for x in raw.split(",") if x.strip()}parse_email function · python · L60-L62 (3 LOC)docker/sync/openclaw_n8n_sync_worker.py
def parse_email(text):
m = EMAIL_RE.search(text or "")
return m.group(0).lower() if m else Noneparse_slack_user_id function · python · L65-L67 (3 LOC)docker/sync/openclaw_n8n_sync_worker.py
def parse_slack_user_id(text):
m = SLACK_FROM_RE.search(text or "")
return m.group(1) if m else Nonecontent_text function · python · L70-L77 (8 LOC)docker/sync/openclaw_n8n_sync_worker.py
def content_text(content):
if not isinstance(content, list):
return ""
out = []
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
out.append(item.get("text", ""))
return "\n".join(out)discover_requesters function · python · L80-L109 (30 LOC)docker/sync/openclaw_n8n_sync_worker.py
def discover_requesters(session_glob):
requesters = {}
for fp in sorted(glob.glob(session_glob), key=os.path.getmtime):
last = {"slack_user_id": None, "email": None}
with open(fp, "r", encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
msg = row.get("message", {})
role = msg.get("role")
if role == "user":
text = content_text(msg.get("content", []))
sid = parse_slack_user_id(text)
if sid:
last = {"slack_user_id": sid, "email": parse_email(text)}
continue
if role != "toolResult" or msg.get("toolName") != "cron":
continue
details = msg.getCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
fetch_slack_email function · python · L112-L132 (21 LOC)docker/sync/openclaw_n8n_sync_worker.py
def fetch_slack_email(bot_token, slack_user_id):
if not bot_token:
return None
try:
out = run([
"curl",
"-sS",
"--max-time",
"8",
"https://slack.com/api/users.info",
"-H",
f"Authorization: Bearer {bot_token}",
"--data-urlencode",
f"user={slack_user_id}",
])
data = json.loads(out)
if not data.get("ok"):
return None
return (data.get("user", {}).get("profile", {}).get("email") or "").lower() or None
except Exception:
return Noneschedule_params function · python · L135-L143 (9 LOC)docker/sync/openclaw_n8n_sync_worker.py
def schedule_params(job):
sch = job.get("schedule", {})
kind = sch.get("kind")
if kind == "cron" and sch.get("expr"):
return {"rule": {"interval": [{"field": "cronExpression", "expression": sch["expr"]}]}}
if kind == "every" and sch.get("everyMs"):
mins = max(1, int(sch["everyMs"] / 60000))
return {"rule": {"interval": [{"field": "minutes", "minutesInterval": mins}]}}
return Noneisolation_message function · python · L150-L166 (17 LOC)docker/sync/openclaw_n8n_sync_worker.py
def isolation_message(original, slack_user_id, requester_email):
root = f"~/.openclaw/workspace/memory/users/{slack_user_id}"
who = requester_email or slack_user_id
head = (
"[Memory Isolation Policy]\n"
f"Requester: {who}\n"
f"Slack User ID: {slack_user_id}\n"
f"Allowed memory root: {root}\n\n"
"Rules:\n"
"1. Read/write persistent memory only under the allowed memory root.\n"
"2. Never read/write any other user's memory directory.\n"
"3. If path is missing, create it only under the allowed root.\n"
"4. Refuse requests for other users' memory.\n"
"5. Rewrite hardcoded memory paths to the allowed root.\n"
"----------------------------------------\n"
)
return head + (original or "")make_workflow function · python · L169-L214 (46 LOC)docker/sync/openclaw_n8n_sync_worker.py
def make_workflow(job, requester_email, slack_user_id, hook_url, hook_token):
sid = hashlib.sha1(job["id"].encode("utf-8")).hexdigest()[:8]
params = schedule_params(job)
if not params:
return None
body = {
"message": isolation_message(job.get("payload", {}).get("message", ""), slack_user_id, requester_email),
"name": f"openclaw-sync:{job.get('name', 'job')}",
"sessionKey": f"openclaw-sync:{slack_user_id}:{job['id']}",
"wakeMode": job.get("wakeMode", "next-heartbeat"),
"deliver": False,
}
return {
"id": workflow_id(job["id"]),
"name": f"OpenClaw Sync | {job.get('name','job')} | {requester_email or 'unknown'}",
"nodes": [
{
"id": f"n1{sid}",
"name": "Schedule Trigger",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1.2,
"position": [240, 300],
"parameters": params,
hash_item function · python · L217-L219 (3 LOC)docker/sync/openclaw_n8n_sync_worker.py
def hash_item(job, hook_url, hook_token):
base = json.dumps(job, sort_keys=True, ensure_ascii=True) + "|" + hook_url + "|" + hook_token + "|v1"
return hashlib.sha256(base.encode("utf-8")).hexdigest()project_id_for_email function · python · L226-L234 (9 LOC)docker/sync/openclaw_n8n_sync_worker.py
def project_id_for_email(db_container, email):
if not email:
return None
sql = (
"SELECT p.id FROM project p JOIN \"user\" u ON u.id=p.\"creatorId\" "
f"WHERE p.type='personal' AND lower(u.email)='{sql_escape(email.lower())}' LIMIT 1;"
)
out = run(["docker", "exec", "-i", db_container, "psql", "-U", "n8n", "-d", "n8n", "-At", "-c", sql]).strip()
return out or Noneset_workflow_owners function · python · L237-L247 (11 LOC)docker/sync/openclaw_n8n_sync_worker.py
def set_workflow_owners(db_container, assignments):
if not assignments:
return
sqls = []
for wid, pid in assignments.items():
sqls.append(f"DELETE FROM shared_workflow WHERE \"workflowId\"='{sql_escape(wid)}';")
sqls.append(
"INSERT INTO shared_workflow (\"workflowId\",\"projectId\",role,\"createdAt\",\"updatedAt\") "
f"VALUES ('{sql_escape(wid)}','{sql_escape(pid)}','workflow:owner',CURRENT_TIMESTAMP(3),CURRENT_TIMESTAMP(3));"
)
run(["docker", "exec", "-i", db_container, "psql", "-U", "n8n", "-d", "n8n", "-c", "\n".join(sqls)])set_active function · python · L250-L254 (5 LOC)docker/sync/openclaw_n8n_sync_worker.py
def set_active(db_container, state):
if not state:
return
sqls = [f"UPDATE workflow_entity SET active={'true' if active else 'false'} WHERE id='{sql_escape(wid)}';" for wid, active in state.items()]
run(["docker", "exec", "-i", db_container, "psql", "-U", "n8n", "-d", "n8n", "-c", "\n".join(sqls)])Same scanner, your repo: https://repobility.com — Repobility
import_workflows function · python · L257-L266 (10 LOC)docker/sync/openclaw_n8n_sync_worker.py
def import_workflows(n8n_container, import_dir, files_dir):
run(["docker", "exec", n8n_container, "sh", "-lc", f"rm -rf {import_dir}/* && mkdir -p {import_dir}"])
for fname in os.listdir(files_dir):
fpath = os.path.join(files_dir, fname)
if os.path.isfile(fpath):
with open(fpath, "r", encoding="utf-8") as f:
content = f.read()
encoded = base64.b64encode(content.encode("utf-8")).decode("ascii")
run(["docker", "exec", n8n_container, "sh", "-lc", f"echo '{encoded}' | base64 -d > {import_dir}/{fname}"])
run(["docker", "exec", n8n_container, "n8n", "import:workflow", "--separate", "--input", import_dir])delete_workflows function · python · L269-L273 (5 LOC)docker/sync/openclaw_n8n_sync_worker.py
def delete_workflows(db_container, workflow_ids):
if not workflow_ids:
return
quoted = ",".join([f"'{sql_escape(x)}'" for x in workflow_ids])
run(["docker", "exec", "-i", db_container, "psql", "-U", "n8n", "-d", "n8n", "-c", f"DELETE FROM workflow_entity WHERE id IN ({quoted});"])load_state function · python · L276-L280 (5 LOC)docker/sync/openclaw_n8n_sync_worker.py
def load_state(path):
if not os.path.exists(path):
return {"jobs": {}, "managed_workflow_ids": []}
with open(path, "r", encoding="utf-8") as f:
return json.load(f)save_state function · python · L283-L288 (6 LOC)docker/sync/openclaw_n8n_sync_worker.py
def save_state(path, data):
os.makedirs(os.path.dirname(path), exist_ok=True)
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=True, indent=2)
os.replace(tmp, path)get_fallback_requester function · python · L291-L298 (8 LOC)docker/sync/openclaw_n8n_sync_worker.py
def get_fallback_requester():
fallback = os.environ.get("SYNC_FALLBACK_REQUESTER", "").strip()
if not fallback:
return None
if ":" in fallback:
sid, email = fallback.split(":", 1)
return {"slack_user_id": sid.strip(), "email": email.strip().lower()}
return Nonesync_once function · python · L301-L408 (108 LOC)docker/sync/openclaw_n8n_sync_worker.py
def sync_once():
state_file = os.environ.get("SYNC_STATE_FILE", "/state/state.json")
session_glob = os.environ.get("OPENCLAW_SESSION_GLOB", "/home/openclaw/.openclaw/agents/main/sessions/*.jsonl")
hook_url = os.environ["OPENCLAW_HOOK_URL"]
hook_token = os.environ["OPENCLAW_HOOK_TOKEN"]
n8n_container = os.environ.get("N8N_CONTAINER", "n8n-app")
db_container = os.environ.get("N8N_DB_CONTAINER", "n8n-postgres")
n8n_import_dir = os.environ.get("N8N_IMPORT_DIR", "/tmp/openclaw-sync")
allowed_ids = csv_set_raw("SYNC_ALLOWED_SLACK_USER_IDS")
allowed_emails = csv_set("SYNC_ALLOWED_EMAILS")
require_slack_email = env_bool("SYNC_REQUIRE_SLACK_EMAIL_VERIFICATION", True)
slack_bot_token = os.environ.get("SLACK_BOT_TOKEN", "")
allow_cli_jobs = env_bool("SYNC_ALLOW_CLI_JOBS", True)
fallback_requester = get_fallback_requester()
user_email_map = {}
mapping_raw = os.environ.get("SYNC_USER_EMAIL_MAP", "")
if mapping_raw:
for item inHandler class · python · L411-L456 (46 LOC)docker/sync/openclaw_n8n_sync_worker.py
class Handler(BaseHTTPRequestHandler):
def _send(self, code, content_type, body_bytes):
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body_bytes)))
self.end_headers()
self.wfile.write(body_bytes)
def do_GET(self):
if self.path == "/healthz":
with LOCK:
ok = bool(SYNC_STATUS.get("ok", False))
payload = {
"ok": ok,
"last_run_at": SYNC_STATUS.get("last_run_at"),
"last_success_at": SYNC_STATUS.get("last_success_at"),
"last_error": SYNC_STATUS.get("last_error", ""),
}
self._send(200 if ok else 503, "application/json", json.dumps(payload, ensure_ascii=True).encode("utf-8"))
return
if self.path == "/metrics":
with LOCK:
result = SYNC_STATUS.get("last_result", {})
_send method · python · L412-L417 (6 LOC)docker/sync/openclaw_n8n_sync_worker.py
def _send(self, code, content_type, body_bytes):
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body_bytes)))
self.end_headers()
self.wfile.write(body_bytes)Repobility · severity-and-effort ranking · https://repobility.com
do_GET method · python · L419-L453 (35 LOC)docker/sync/openclaw_n8n_sync_worker.py
def do_GET(self):
if self.path == "/healthz":
with LOCK:
ok = bool(SYNC_STATUS.get("ok", False))
payload = {
"ok": ok,
"last_run_at": SYNC_STATUS.get("last_run_at"),
"last_success_at": SYNC_STATUS.get("last_success_at"),
"last_error": SYNC_STATUS.get("last_error", ""),
}
self._send(200 if ok else 503, "application/json", json.dumps(payload, ensure_ascii=True).encode("utf-8"))
return
if self.path == "/metrics":
with LOCK:
result = SYNC_STATUS.get("last_result", {})
ok_num = 1 if SYNC_STATUS.get("ok", False) else 0
lines = [
"# HELP openclaw_sync_ok 1 if last run succeeded",
"# TYPE openclaw_sync_ok gauge",
f"openclaw_sync_ok {ok_num}",
"# HELP openclaw_sync_synced_lassync_loop function · python · L459-L478 (20 LOC)docker/sync/openclaw_n8n_sync_worker.py
def sync_loop():
interval = int(os.environ.get("SYNC_INTERVAL_SECONDS", "60"))
while True:
now = int(time.time())
try:
result = sync_once()
with LOCK:
SYNC_STATUS["ok"] = True
SYNC_STATUS["last_run_at"] = now
SYNC_STATUS["last_success_at"] = now
SYNC_STATUS["last_error"] = ""
SYNC_STATUS["last_result"] = result
print(json.dumps({"event": "sync_ok", **result}, ensure_ascii=True), flush=True)
except Exception as e:
with LOCK:
SYNC_STATUS["ok"] = False
SYNC_STATUS["last_run_at"] = now
SYNC_STATUS["last_error"] = str(e)
print(json.dumps({"event": "sync_error", "error": str(e)}, ensure_ascii=True), flush=True)
time.sleep(max(5, interval))main function · python · L481-L487 (7 LOC)docker/sync/openclaw_n8n_sync_worker.py
def main():
port = int(os.environ.get("SYNC_METRICS_PORT", "18090"))
thread = threading.Thread(target=sync_loop, daemon=True)
thread.start()
server = ThreadingHTTPServer(("0.0.0.0", port), Handler)
print(json.dumps({"event": "metrics_server_started", "port": port}, ensure_ascii=True), flush=True)
server.serve_forever()env_bool function · python · L17-L21 (5 LOC)guest_automation_service.py
def env_bool(name, default=False):
val = os.environ.get(name)
if val is None:
return default
return val.strip().lower() in {"1", "true", "yes", "on"}Config class · python · L24-L39 (16 LOC)guest_automation_service.py
class Config:
HOST = os.environ.get("GUEST_AUTOMATION_HOST", "0.0.0.0")
PORT = int(os.environ.get("GUEST_AUTOMATION_PORT", "18111"))
TOKEN = os.environ.get("GUEST_AUTOMATION_TOKEN", "")
GITHUB_OWNER = os.environ.get("GITHUB_OWNER", "TommyKammy")
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN", "")
GUEST_TEMPLATE_REPO = os.environ.get("GUEST_TEMPLATE_REPO", "TommyKammy/guest-app-template")
SLACK_BOT_TOKEN = os.environ.get("SLACK_BOT_TOKEN", "")
CLAW_BOT_USER_ID = os.environ.get("CLAW_BOT_USER_ID", "U0ADFF3483E")
VERCEL_TOKEN = os.environ.get("VERCEL_TOKEN", "")
VERCEL_ORG_ID = os.environ.get("VERCEL_ORG_ID", "")
DB_PATH = os.environ.get("GUEST_AUTOMATION_DB_PATH", "/var/lib/guest-automation/automation.db")init_db function · python · L45-L60 (16 LOC)guest_automation_service.py
def init_db(path):
os.makedirs(os.path.dirname(path), exist_ok=True)
con = sqlite3.connect(path)
try:
con.execute(
"""
CREATE TABLE IF NOT EXISTS slack_channels (
channel_name TEXT PRIMARY KEY,
channel_id TEXT NOT NULL,
updated_at INTEGER NOT NULL
)
"""
)
con.commit()
finally:
con.close()cache_slack_channel function · python · L63-L80 (18 LOC)guest_automation_service.py
def cache_slack_channel(channel_name, channel_id):
if not channel_name or not channel_id:
return
con = sqlite3.connect(CFG.DB_PATH)
try:
con.execute(
"""
INSERT INTO slack_channels (channel_name, channel_id, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(channel_name) DO UPDATE SET
channel_id=excluded.channel_id,
updated_at=excluded.updated_at
""",
(channel_name, channel_id, int(time.time())),
)
con.commit()
finally:
con.close()get_cached_slack_channel_id function · python · L83-L93 (11 LOC)guest_automation_service.py
def get_cached_slack_channel_id(channel_name):
con = sqlite3.connect(CFG.DB_PATH)
con.row_factory = sqlite3.Row
try:
row = con.execute(
"SELECT channel_id FROM slack_channels WHERE channel_name=? LIMIT 1",
(channel_name,),
).fetchone()
return row["channel_id"] if row else None
finally:
con.close()Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
verify_token function · python · L96-L102 (7 LOC)guest_automation_service.py
def verify_token(auth_header):
if not CFG.TOKEN:
return True
if not auth_header:
return False
expected = f"Bearer {CFG.TOKEN}"
return hmac.compare_digest(auth_header, expected)create_github_repo function · python · L105-L133 (29 LOC)guest_automation_service.py
def create_github_repo(repo_name, description=""):
if not CFG.GITHUB_TOKEN:
return {"ok": False, "error": "missing GITHUB_TOKEN"}
url = f"https://api.github.com/repos/{CFG.GUEST_TEMPLATE_REPO}/generate"
headers = {
"Authorization": f"Bearer {CFG.GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
"Content-Type": "application/json"
}
data = {
"owner": CFG.GITHUB_OWNER,
"name": repo_name,
"description": description or f"Guest app for {repo_name}",
"private": True,
"include_all_branches": False
}
try:
req = urllib.request.Request(url, data=json.dumps(data).encode(), headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read().decode())
return {"ok": True, "repo": f"{CFG.GITHUB_OWNER}/{repo_name}", "html_url": result.get("html_url")}
except urllib.error.HTTPError as e:
create_slack_channel function · python · L136-L198 (63 LOC)guest_automation_service.py
def create_slack_channel(channel_name, guest_user_id):
if not CFG.SLACK_BOT_TOKEN:
return {"ok": False, "error": "missing SLACK_BOT_TOKEN"}
# Create channel
url = "https://slack.com/api/conversations.create"
headers = {
"Authorization": f"Bearer {CFG.SLACK_BOT_TOKEN}",
"Content-Type": "application/json"
}
data = {"name": channel_name, "is_private": False}
try:
def invite_users(channel_id, user_ids):
# Invite each user individually to avoid partial failures.
for uid in [str(u).strip() for u in (user_ids or []) if str(u).strip()]:
invite_url = "https://slack.com/api/conversations.invite"
invite_req = urllib.request.Request(
invite_url,
data=json.dumps({"channel": channel_id, "users": uid}).encode(),
headers=headers,
method="POST",
)
with urllib.request.upage 1 / 3next ›