Function bodies 109 total
get_channel_id function · python · L201-L215 (15 LOC)guest_automation_service.py
def get_channel_id(channel_name):
url = "https://slack.com/api/conversations.list"
headers = {"Authorization": f"Bearer {CFG.SLACK_BOT_TOKEN}"}
try:
req = urllib.request.Request(f"{url}?types=public_channel,private_channel", headers=headers)
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode())
if not result.get("ok"):
return {"ok": False, "error": result.get("error") or "slack_error"}
for channel in result.get("channels", []):
if channel.get("name") == channel_name:
return {"ok": True, "channel_id": channel.get("id"), "channel_name": channel_name, "note": "existing"}
return {"ok": False, "error": "channel_not_found"}
except Exception as exc:
return {"ok": False, "error": str(exc)}deploy_to_vercel function · python · L218-L385 (168 LOC)guest_automation_service.py
def deploy_to_vercel(repo_name):
if not CFG.VERCEL_TOKEN:
return {"ok": True, "note": "vercel_not_configured"}
if not CFG.VERCEL_ORG_ID:
return {"ok": False, "error": "missing VERCEL_ORG_ID"}
project_name = repo_name
repo_full = f"{CFG.GITHUB_OWNER}/{repo_name}"
base_qs = urllib.parse.urlencode({"teamId": CFG.VERCEL_ORG_ID})
url = f"https://api.vercel.com/v9/projects?{base_qs}"
headers = {
"Authorization": f"Bearer {CFG.VERCEL_TOKEN}",
"Content-Type": "application/json",
}
data = {
"name": project_name,
"gitRepository": {
"type": "github",
"repo": repo_full,
},
}
def get_project():
req = urllib.request.Request(
"https://api.vercel.com/v9/projects/" + urllib.parse.quote(project_name) + "?" + base_qs,
method="GET",
)
req.add_header("Authorization", f"Bearer {CFG.VERCEL_TOKEN}")
with urllib.request.urlopen(reHandler class · python · L388-L463 (76 LOC)guest_automation_service.py
class Handler(BaseHTTPRequestHandler):
def _json(self, code, payload):
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_POST(self):
if self.path == "/guest-platform/full-onboard":
self._handle_onboard()
return
self._json(404, {"ok": False, "error": "not_found"})
def _handle_onboard(self):
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
auth = self.headers.get("Authorization", "")
if not verify_token(auth):
self._json(401, {"ok": False, "error": "unauthorized"})
return
try:
payload = json.loads(raw.decode("utf-8"))
except Exception:
self._json(400, {"o_json method · python · L389-L395 (7 LOC)guest_automation_service.py
def _json(self, code, payload):
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)do_POST method · python · L397-L401 (5 LOC)guest_automation_service.py
def do_POST(self):
if self.path == "/guest-platform/full-onboard":
self._handle_onboard()
return
self._json(404, {"ok": False, "error": "not_found"})_handle_onboard method · python · L403-L460 (58 LOC)guest_automation_service.py
def _handle_onboard(self):
length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(length)
auth = self.headers.get("Authorization", "")
if not verify_token(auth):
self._json(401, {"ok": False, "error": "unauthorized"})
return
try:
payload = json.loads(raw.decode("utf-8"))
except Exception:
self._json(400, {"ok": False, "error": "invalid_json"})
return
guest_name = payload.get("guest_name", "")
guest_email = payload.get("guest_email", "")
guest_slack_user_id = payload.get("guest_slack_user_id", "")
app_slug = payload.get("app_slug", "starter-app")
description = payload.get("description", f"Guest app for {guest_name}")
if not guest_name or not guest_email:
self._json(400, {"ok": False, "error": "missing_guest_name_or_email"})
return
# main function · python · L466-L470 (5 LOC)guest_automation_service.py
def main():
init_db(CFG.DB_PATH)
server = ThreadingHTTPServer((CFG.HOST, CFG.PORT), Handler)
print(f"Guest automation service running on {CFG.HOST}:{CFG.PORT}")
server.serve_forever()Repobility — same analyzer, your code, free for public repos · /scan/
slugify function · python · L14-L16 (3 LOC)scripts/guest-platform/full_guest_automation.py
def slugify(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", value.strip().lower())
return re.sub(r"^-+|-+$", "", slug)http_post_json function · python · L19-L27 (9 LOC)scripts/guest-platform/full_guest_automation.py
def http_post_json(url: str, payload: dict, headers: dict | None = None) -> dict:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, method="POST")
req.add_header("Content-Type", "application/json; charset=utf-8")
if headers:
for k, v in headers.items():
req.add_header(k, v)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))slack_api function · python · L30-L32 (3 LOC)scripts/guest-platform/full_guest_automation.py
def slack_api(method: str, payload: dict, token: str) -> dict:
url = f"https://slack.com/api/{method}"
return http_post_json(url, payload, {"Authorization": f"Bearer {token}"})ensure_channel function · python · L35-L49 (15 LOC)scripts/guest-platform/full_guest_automation.py
def ensure_channel(channel_name: str, bot_token: str) -> tuple[str, bool]:
res = slack_api("conversations.create", {"name": channel_name, "is_private": False}, bot_token)
if res.get("ok"):
return res["channel"]["id"], True
if res.get("error") == "name_taken":
list_res = slack_api(
"conversations.list",
{"types": "public_channel,private_channel", "exclude_archived": True, "limit": 1000},
bot_token,
)
if list_res.get("ok"):
for ch in list_res.get("channels", []):
if ch.get("name") == channel_name:
return ch["id"], False
raise RuntimeError(f"failed to create/find channel {channel_name}: {res}")lookup_user_id_by_email function · python · L52-L56 (5 LOC)scripts/guest-platform/full_guest_automation.py
def lookup_user_id_by_email(email: str, token: str) -> str | None:
res = slack_api("users.lookupByEmail", {"email": email}, token)
if res.get("ok"):
return res["user"]["id"]
return Noneinvite_user_to_workspace function · python · L59-L63 (5 LOC)scripts/guest-platform/full_guest_automation.py
def invite_user_to_workspace(email: str, admin_token: str) -> None:
# Requires Slack admin-level token/scopes.
res = slack_api("admin.users.invite", {"email": email}, admin_token)
if not res.get("ok") and res.get("error") not in {"already_invited", "already_in_team", "already_in_workspace"}:
raise RuntimeError(f"failed to invite user to workspace: {res}")invite_to_channel function · python · L66-L77 (12 LOC)scripts/guest-platform/full_guest_automation.py
def invite_to_channel(channel_id: str, user_ids: list[str], bot_token: str) -> None:
user_ids = [u for u in user_ids if u]
if not user_ids:
return
res = slack_api("conversations.invite", {"channel": channel_id, "users": ",".join(user_ids)}, bot_token)
if not res.get("ok") and res.get("error") not in {
"already_in_channel",
"cant_invite_self",
"user_is_bot",
"invalid_users",
}:
raise RuntimeError(f"failed to invite users to channel: {res}")set_channel_context function · python · L80-L91 (12 LOC)scripts/guest-platform/full_guest_automation.py
def set_channel_context(channel_id: str, guest_slug: str, bot_token: str) -> None:
topic = f"Guest app development for {guest_slug}. Repo naming: {guest_slug}-<app-slug>."
purpose = (
f"Build and iterate apps for guest {guest_slug}. "
"Use the prompt templates from @Claw and keep app_slug lowercase-hyphen format."
)
topic_res = slack_api("conversations.setTopic", {"channel": channel_id, "topic": topic}, bot_token)
if not topic_res.get("ok"):
raise RuntimeError(f"failed to set channel topic: {topic_res}")
purpose_res = slack_api("conversations.setPurpose", {"channel": channel_id, "purpose": purpose}, bot_token)
if not purpose_res.get("ok"):
raise RuntimeError(f"failed to set channel purpose: {purpose_res}")All rows above produced by Repobility · https://repobility.com
build_prompt_template_message function · python · L94-L111 (18 LOC)scripts/guest-platform/full_guest_automation.py
def build_prompt_template_message(guest_slug: str) -> str:
return "\n".join(
[
"*Welcome to your app-dev channel*",
f"Repository naming convention: `{guest_slug}-<app-slug>`",
"Use lowercase letters, numbers, and hyphens only for `app_slug`.",
"",
"*Prompt examples*",
"1) Create app",
f"`Create app app_slug=booking for {guest_slug}. Use repo name {guest_slug}-booking. Include auth, responsive UI, and deployment-ready setup.`",
"2) Add feature",
f"`In repo {guest_slug}-booking, add calendar sync. Include tests and update README.`",
"3) Fix bug",
f"`In repo {guest_slug}-booking, fix timezone issue in booking confirmation and add regression test.`",
"4) Deploy check",
f"`Check latest deploy status for {guest_slug}-booking and summarize failures with next steps.`",
]
)post_and_pin_prompt_templates function · python · L114-L124 (11 LOC)scripts/guest-platform/full_guest_automation.py
def post_and_pin_prompt_templates(channel_id: str, guest_slug: str, bot_token: str) -> str:
msg = build_prompt_template_message(guest_slug)
post_res = slack_api("chat.postMessage", {"channel": channel_id, "text": msg}, bot_token)
if not post_res.get("ok"):
raise RuntimeError(f"failed to post template message: {post_res}")
ts = post_res.get("ts", "")
if ts:
pin_res = slack_api("pins.add", {"channel": channel_id, "timestamp": ts}, bot_token)
if not pin_res.get("ok") and pin_res.get("error") != "already_pinned":
raise RuntimeError(f"failed to pin template message: {pin_res}")
return tsset_repo_secret function · python · L127-L130 (4 LOC)scripts/guest-platform/full_guest_automation.py
def set_repo_secret(repo: str, key: str, value: str) -> None:
if not value:
return
subprocess.run(["gh", "secret", "set", key, "--repo", repo, "--body", value], check=True)create_vercel_project function · python · L133-L153 (21 LOC)scripts/guest-platform/full_guest_automation.py
def create_vercel_project(repo_name: str, team_id: str, vercel_token: str) -> str:
url = f"https://api.vercel.com/v10/projects?teamId={urllib.parse.quote(team_id)}"
payload = {
"name": repo_name,
"framework": "nextjs",
"buildCommand": "npm run build",
"devCommand": "npm run dev",
"installCommand": "npm install",
}
try:
res = http_post_json(url, payload, {"Authorization": f"Bearer {vercel_token}"})
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="ignore")
if e.code == 409:
query = f"https://api.vercel.com/v9/projects/{urllib.parse.quote(repo_name)}?teamId={urllib.parse.quote(team_id)}"
req = urllib.request.Request(query, headers={"Authorization": f"Bearer {vercel_token}"})
with urllib.request.urlopen(req, timeout=30) as resp:
existing = json.loads(resp.read().decode("utf-8"))
return existing["id"]
raise Runtrigger_author_linked_deploy function · python · L156-L165 (10 LOC)scripts/guest-platform/full_guest_automation.py
def trigger_author_linked_deploy(repo_https: str, branch: str, author_name: str, author_email: str) -> None:
with tempfile.TemporaryDirectory(prefix="guest-auto-") as tmpdir:
subprocess.run(["git", "clone", repo_https, tmpdir], check=True)
subprocess.run(["git", "checkout", branch], cwd=tmpdir, check=True)
subprocess.run(
["git", "commit", "--allow-empty", "--author", f"{author_name} <{author_email}>", "-m", "Trigger initial deployment"],
cwd=tmpdir,
check=True,
)
subprocess.run(["git", "push", "origin", branch], cwd=tmpdir, check=True)run_register_script function · python · L168-L172 (5 LOC)scripts/guest-platform/full_guest_automation.py
def run_register_script(guest_slug: str, app_slug: str, description: str, env: dict) -> str:
script = Path(__file__).parent / "register_guest_app.sh"
subprocess.run([str(script), guest_slug, app_slug, description], check=True, env=env)
owner = env.get("GITHUB_OWNER", "TommyKammy")
return f"{owner}/{guest_slug}-{app_slug}"main function · python · L175-L246 (72 LOC)scripts/guest-platform/full_guest_automation.py
def main() -> None:
parser = argparse.ArgumentParser(description="Fully automate guest onboarding: Slack + GitHub + Vercel")
parser.add_argument("--guest-name", required=True)
parser.add_argument("--guest-email", required=False)
parser.add_argument("--guest-slack-user-id", required=False)
parser.add_argument("--app-slug", required=True)
parser.add_argument("--description", default="Guest app repository")
parser.add_argument("--skip-workspace-invite", action="store_true")
args = parser.parse_args()
env = os.environ.copy()
bot_token = env.get("SLACK_BOT_TOKEN", "")
claw_bot_user_id = env.get("CLAW_BOT_USER_ID", "")
admin_token = env.get("SLACK_ADMIN_USER_TOKEN", "")
vercel_token = env.get("VERCEL_TOKEN", "")
vercel_org_id = env.get("VERCEL_ORG_ID", "")
vercel_author_name = env.get("VERCEL_GIT_COMMIT_AUTHOR_NAME", "Your Name")
vercel_author_email = env.get("VERCEL_GIT_COMMIT_AUTHOR_EMAIL", "[email protected]")
post_Handler class · python · L15-L73 (59 LOC)scripts/guest-platform/full_guest_automation_service.py
class Handler(BaseHTTPRequestHandler):
def _json(self, code: int, payload: dict):
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_POST(self):
if self.path != "/guest-platform/full-onboard":
self._json(404, {"ok": False, "error": "not_found"})
return
auth = self.headers.get("Authorization", "")
if not TOKEN or auth != f"Bearer {TOKEN}":
self._json(401, {"ok": False, "error": "unauthorized"})
return
content_length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(content_length) if content_length > 0 else b"{}"
try:
payload = json.loads(raw.decode("utf-8"))
except Exception:
self._jRepobility · severity-and-effort ranking · https://repobility.com
_json method · python · L16-L22 (7 LOC)scripts/guest-platform/full_guest_automation_service.py
def _json(self, code: int, payload: dict):
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
self.send_response(code)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)do_POST method · python · L24-L73 (50 LOC)scripts/guest-platform/full_guest_automation_service.py
def do_POST(self):
if self.path != "/guest-platform/full-onboard":
self._json(404, {"ok": False, "error": "not_found"})
return
auth = self.headers.get("Authorization", "")
if not TOKEN or auth != f"Bearer {TOKEN}":
self._json(401, {"ok": False, "error": "unauthorized"})
return
content_length = int(self.headers.get("Content-Length", "0"))
raw = self.rfile.read(content_length) if content_length > 0 else b"{}"
try:
payload = json.loads(raw.decode("utf-8"))
except Exception:
self._json(400, {"ok": False, "error": "invalid_json"})
return
guest_name = payload.get("guest_name")
app_slug = payload.get("app_slug")
if not guest_name or not app_slug:
self._json(400, {"ok": False, "error": "missing_guest_or_app_slug"})
return
cmd = [
str(SCRIPT),
"--guest-name",
main function · python · L76-L79 (4 LOC)scripts/guest-platform/full_guest_automation_service.py
def main():
server = ThreadingHTTPServer((HOST, PORT), Handler)
print(json.dumps({"ok": True, "host": HOST, "port": PORT, "path": "/guest-platform/full-onboard"}, ensure_ascii=True), flush=True)
server.serve_forever()env_bool function · python · L17-L21 (5 LOC)slack_n8n_provisioner.py
def env_bool(name, default=False):
val = os.environ.get(name)
if val is None:
return default
return val.strip().lower() in {"1", "true", "yes", "on"}Config class · python · L24-L74 (51 LOC)slack_n8n_provisioner.py
class Config:
HOST = os.environ.get("PROVISIONER_HOST", "127.0.0.1")
PORT = int(os.environ.get("PROVISIONER_PORT", "18089"))
DB_PATH = os.environ.get("PROVISIONER_DB_PATH", "/var/lib/slack-n8n-provisioner/provisioner.db")
AUTO_PROVISION_ENABLED = env_bool("AUTO_PROVISION_ENABLED", False)
REQUIRE_SLACK_EMAIL_VERIFICATION = env_bool("REQUIRE_SLACK_EMAIL_VERIFICATION", True)
# If true, only Slack guests (restricted/ultra_restricted) are provisioned.
# If false, normal members are also eligible (still subject to team/domain checks).
SLACK_REQUIRE_GUEST_ONLY = env_bool("SLACK_REQUIRE_GUEST_ONLY", True)
ALLOWED_SLACK_TEAM_IDS = {x.strip() for x in os.environ.get("ALLOWED_SLACK_TEAM_IDS", "").split(",") if x.strip()}
ALLOWED_EMAIL_DOMAINS = {x.strip().lower() for x in os.environ.get("ALLOWED_EMAIL_DOMAINS", "").split(",") if x.strip()}
SLACK_SIGNING_SECRET = os.environ.get("SLACK_SIGNING_SECRET", "")
SLACK_BOT_TOKEN = os.environ.get("SLACK_BOinit_db function · python · L88-L147 (60 LOC)slack_n8n_provisioner.py
def init_db(path):
os.makedirs(os.path.dirname(path), exist_ok=True)
conn = sqlite3.connect(path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
provider TEXT NOT NULL,
received_at INTEGER NOT NULL,
event_type TEXT,
payload TEXT NOT NULL,
status TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
reason TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS mappings (
provider TEXT NOT NULL,
external_user_id TEXT NOT NULL,
tenant_or_team_id TEXT,
email TEXT,
n8n_user_id TEXT,
status TEXT NOT NULL,
updated_at INTEGER NOT NULL,
reason TEXT,
PRIMARY KEY(provider, external_user_id)
)
"""
db_exec function · python · L150-L159 (10 LOC)slack_n8n_provisioner.py
def db_exec(query, params=(), fetch=False):
conn = sqlite3.connect(CFG.DB_PATH)
conn.row_factory = sqlite3.Row
try:
cur = conn.execute(query, params)
rows = cur.fetchall() if fetch else None
conn.commit()
return rows
finally:
conn.close()upsert_mapping function · python · L162-L176 (15 LOC)slack_n8n_provisioner.py
def upsert_mapping(provider, external_user_id, tenant_or_team_id, email, n8n_user_id, status, reason=""):
db_exec(
"""
INSERT INTO mappings (provider, external_user_id, tenant_or_team_id, email, n8n_user_id, status, updated_at, reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(provider, external_user_id) DO UPDATE SET
tenant_or_team_id=excluded.tenant_or_team_id,
email=excluded.email,
n8n_user_id=excluded.n8n_user_id,
status=excluded.status,
updated_at=excluded.updated_at,
reason=excluded.reason
""",
(provider, external_user_id, tenant_or_team_id, email, n8n_user_id, status, now_epoch(), reason),
)If a scraper extracted this row, it came from Repobility (https://repobility.com)
enqueue_event function · python · L179-L187 (9 LOC)slack_n8n_provisioner.py
def enqueue_event(event_id, provider, event_type, payload):
try:
db_exec(
"INSERT INTO events (id, provider, received_at, event_type, payload, status) VALUES (?, ?, ?, ?, ?, 'pending')",
(event_id, provider, now_epoch(), event_type, json.dumps(payload, ensure_ascii=True)),
)
return True
except sqlite3.IntegrityError:
return Falseverify_slack_signature function · python · L190-L204 (15 LOC)slack_n8n_provisioner.py
def verify_slack_signature(raw_body, ts, sig):
if not CFG.SLACK_SIGNING_SECRET:
return False
if not ts or not sig:
return False
try:
ts_int = int(ts)
except ValueError:
return False
if abs(now_epoch() - ts_int) > 300:
return False
base = f"v0:{ts}:{raw_body.decode('utf-8')}".encode("utf-8")
digest = hmac.new(CFG.SLACK_SIGNING_SECRET.encode("utf-8"), base, hashlib.sha256).hexdigest()
expected = f"v0={digest}"
return hmac.compare_digest(expected, sig)call_slack_users_info function · python · L207-L219 (13 LOC)slack_n8n_provisioner.py
def call_slack_users_info(user_id):
if not CFG.SLACK_BOT_TOKEN:
return None
req = urllib.request.Request("https://slack.com/api/users.info", data=urllib.parse.urlencode({"user": user_id}).encode("utf-8"), method="POST")
req.add_header("Authorization", f"Bearer {CFG.SLACK_BOT_TOKEN}")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if not payload.get("ok"):
return None
return payload.get("user", {})
except Exception:
return Nonen8n_create_user function · python · L222-L245 (24 LOC)slack_n8n_provisioner.py
def n8n_create_user(email):
if not CFG.N8N_API_KEY:
raise RuntimeError("missing N8N_API_KEY")
payload = [{"email": email}]
url = CFG.N8N_BASE_URL.rstrip("/") + CFG.N8N_USER_CREATE_PATH
req = urllib.request.Request(url, data=json.dumps(payload).encode("utf-8"), method="POST")
req.add_header("Content-Type", "application/json")
req.add_header("X-N8N-API-KEY", CFG.N8N_API_KEY)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
out = json.loads(resp.read().decode("utf-8"))
if isinstance(out, list) and out:
user = out[0].get("user", {})
return {
"id": user.get("id") or "created",
"inviteAcceptUrl": user.get("inviteAcceptUrl"),
"created": True,
}
return {"id": "created", "inviteAcceptUrl": None, "created": True}
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="isend_onboarding_email function · python · L248-L289 (42 LOC)slack_n8n_provisioner.py
def send_onboarding_email(email, invite_url=None):
setup_link = invite_url or CFG.ONBOARDING_SETUP_LINK
if CFG.ONBOARDING_MODE == "setup_link":
body = (
"Welcome to n8n.\n\n"
"Your enterprise chat guest account has been provisioned.\n"
f"Please set up/sign in here: {setup_link}\n\n"
"If you cannot sign in, contact the administrator."
)
else:
temp_password = secrets.token_urlsafe(12)
body = (
"Welcome to n8n.\n\n"
f"Temporary password: {temp_password}\n"
"Please change it immediately after first login."
)
cmd = [
"gog",
"--account",
CFG.GOG_ACCOUNT,
"--no-input",
"gmail",
"send",
"--to",
email,
"--subject",
"n8n account onboarding",
"--body",
body,
"--plain",
]
env = os.environ.copy()
if CFG.GOG_KEYRING_PASSWORD:
env["GOG_KEYRsend_slack_dm function · python · L292-L320 (29 LOC)slack_n8n_provisioner.py
def send_slack_dm(slack_user_id, text):
if not CFG.SLACK_BOT_TOKEN:
return {"ok": False, "error": "missing_SLACK_BOT_TOKEN"}
if not slack_user_id:
return {"ok": False, "error": "missing_slack_user_id"}
# open DM
open_req = urllib.request.Request(
"https://slack.com/api/conversations.open",
data=urllib.parse.urlencode({"users": slack_user_id}).encode(),
method="POST",
)
open_req.add_header("Authorization", f"Bearer {CFG.SLACK_BOT_TOKEN}")
open_req.add_header("Content-Type", "application/x-www-form-urlencoded")
with urllib.request.urlopen(open_req, timeout=10) as r:
out = json.loads(r.read().decode("utf-8"))
if not out.get("ok"):
return {"ok": False, "error": out.get("error") or "conversations_open_failed", "needed": out.get("needed"), "provided": out.get("provided")}
channel_id = (out.get("channel") or {}).get("id")
if not channel_id:
return
msg_req = urllib.request.Request(
post_slack_message function · python · L323-L335 (13 LOC)slack_n8n_provisioner.py
def post_slack_message(channel, text):
if not CFG.SLACK_BOT_TOKEN or not channel:
return {"ok": False}
req = urllib.request.Request(
"https://slack.com/api/chat.postMessage",
data=json.dumps({"channel": channel, "text": text}).encode("utf-8"),
method="POST",
)
req.add_header("Authorization", f"Bearer {CFG.SLACK_BOT_TOKEN}")
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=10) as r:
out = json.loads(r.read().decode("utf-8"))
return outfind_slack_channel_id function · python · L338-L360 (23 LOC)slack_n8n_provisioner.py
def find_slack_channel_id(channel_name):
# Requires channels:read (and groups:read for private channels)
if not CFG.SLACK_BOT_TOKEN or not channel_name:
return None
cursor = ""
for _ in range(5):
qs = {"types": "public_channel,private_channel", "limit": "200"}
if cursor:
qs["cursor"] = cursor
url = "https://slack.com/api/conversations.list?" + urllib.parse.urlencode(qs)
req = urllib.request.Request(url)
req.add_header("Authorization", f"Bearer {CFG.SLACK_BOT_TOKEN}")
with urllib.request.urlopen(req, timeout=10) as r:
out = json.loads(r.read().decode("utf-8"))
if not out.get("ok"):
return None
for ch in out.get("channels", []) or []:
if ch.get("name") == channel_name:
return ch.get("id")
cursor = ((out.get("response_metadata") or {}).get("next_cursor") or "").strip()
if not cursor:
break
return NoneRepobility — same analyzer, your code, free for public repos · /scan/
normalize_guest_name function · python · L363-L390 (28 LOC)slack_n8n_provisioner.py
def normalize_guest_name(safe_user, email, fallback_user_id):
profile = safe_user.get("profile", {}) if isinstance(safe_user, dict) else {}
# Prefer email local-part when available. Slack `users.info` may return
# masked placeholder display names/emails in some workspaces.
local = (email.split("@")[0] if email and "@" in email else "").strip()
if local:
# If the local-part has separators, take the last token (e.g.
# oc.slack.demo2 -> demo2) to get a stable, short slug.
import re
parts = [p for p in re.split(r"[^a-zA-Z0-9]+", local) if p]
if parts:
return parts[-1]
candidates = [
(profile.get("display_name") or "").strip(),
(profile.get("real_name") or "").strip(),
(safe_user.get("real_name") or "").strip() if isinstance(safe_user, dict) else "",
]
if local:
candidates.append(local)
if fallback_user_id:
candidates.append(str(fallback_user_id))
for c in ccall_full_onboarding function · python · L393-L406 (14 LOC)slack_n8n_provisioner.py
def call_full_onboarding(payload):
if not CFG.FULL_ONBOARDING_WEBHOOK_URL:
raise RuntimeError("missing FULL_ONBOARDING_WEBHOOK_URL")
req = urllib.request.Request(CFG.FULL_ONBOARDING_WEBHOOK_URL, data=json.dumps(payload).encode("utf-8"), method="POST")
req.add_header("Content-Type", "application/json")
if CFG.FULL_ONBOARDING_TOKEN:
req.add_header("Authorization", f"Bearer {CFG.FULL_ONBOARDING_TOKEN}")
try:
with urllib.request.urlopen(req, timeout=60) as resp:
out = json.loads(resp.read().decode("utf-8")) if resp.readable() else {}
return out
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"full_onboarding_failed: {e.code} {body[:300]}")full_onboarding_status function · python · L409-L415 (7 LOC)slack_n8n_provisioner.py
def full_onboarding_status(provider, external_user_id):
rows = db_exec(
"SELECT status FROM full_onboarding_runs WHERE provider=? AND external_user_id=? LIMIT 1",
(provider, external_user_id),
fetch=True,
) or []
return rows[0]["status"] if rows else Noneupsert_full_onboarding function · python · L418-L429 (12 LOC)slack_n8n_provisioner.py
def upsert_full_onboarding(provider, external_user_id, status, detail=""):
db_exec(
"""
INSERT INTO full_onboarding_runs (provider, external_user_id, requested_at, status, detail)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(provider, external_user_id) DO UPDATE SET
requested_at=excluded.requested_at,
status=excluded.status,
detail=excluded.detail
""",
(provider, external_user_id, now_epoch(), status, detail[:500]),
)offboarding_status function · python · L432-L438 (7 LOC)slack_n8n_provisioner.py
def offboarding_status(provider, external_user_id):
rows = db_exec(
"SELECT status FROM offboarding_runs WHERE provider=? AND external_user_id=? LIMIT 1",
(provider, external_user_id),
fetch=True,
) or []
return rows[0]["status"] if rows else Noneupsert_offboarding function · python · L441-L452 (12 LOC)slack_n8n_provisioner.py
def upsert_offboarding(provider, external_user_id, status, detail=""):
db_exec(
"""
INSERT INTO offboarding_runs (provider, external_user_id, requested_at, status, detail)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(provider, external_user_id) DO UPDATE SET
requested_at=excluded.requested_at,
status=excluded.status,
detail=excluded.detail
""",
(provider, external_user_id, now_epoch(), status, detail[:500]),
)call_offboarding function · python · L455-L468 (14 LOC)slack_n8n_provisioner.py
def call_offboarding(payload):
if not CFG.OFFBOARDING_WEBHOOK_URL:
raise RuntimeError("missing OFFBOARDING_WEBHOOK_URL")
req = urllib.request.Request(CFG.OFFBOARDING_WEBHOOK_URL, data=json.dumps(payload).encode("utf-8"), method="POST")
req.add_header("Content-Type", "application/json")
if CFG.OFFBOARDING_TOKEN:
req.add_header("Authorization", f"Bearer {CFG.OFFBOARDING_TOKEN}")
try:
with urllib.request.urlopen(req, timeout=60) as resp:
out = json.loads(resp.read().decode("utf-8")) if resp.readable() else {}
return out
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"offboarding_failed: {e.code} {body[:300]}")domain_allowed function · python · L471-L475 (5 LOC)slack_n8n_provisioner.py
def domain_allowed(email):
if not CFG.ALLOWED_EMAIL_DOMAINS:
return True
domain = email.split("@")[-1]
return domain in CFG.ALLOWED_EMAIL_DOMAINSAll rows above produced by Repobility · https://repobility.com
process_slack_event function · python · L478-L648 (171 LOC)slack_n8n_provisioner.py
def process_slack_event(row):
payload = json.loads(row["payload"])
team_id = payload.get("team_id") or payload.get("team", {}).get("id")
event = payload.get("event", {})
user = event.get("user", event)
slack_user_id = user.get("id")
event_type = event.get("type")
# Some Slack event payloads include email in the embedded user profile.
# Prefer this value when present because `users.info` may return masked
# placeholder emails depending on workspace/app permissions.
payload_email = ((user.get("profile") or {}).get("email") or "").strip().lower()
if not slack_user_id:
db_exec("UPDATE events SET status='denied', reason=? WHERE id=?", ("missing_user_id", row["id"]))
return
if CFG.ALLOWED_SLACK_TEAM_IDS and team_id not in CFG.ALLOWED_SLACK_TEAM_IDS:
upsert_mapping("slack", slack_user_id, team_id, None, None, "denied", "team_not_allowed")
db_exec("UPDATE events SET status='denied', reason=? WHERE id=?", ("tprocess_teams_event function · python · L651-L707 (57 LOC)slack_n8n_provisioner.py
def process_teams_event(row):
payload = json.loads(row["payload"])
items = payload.get("value", []) if isinstance(payload, dict) else []
if not items:
db_exec("UPDATE events SET status='denied', reason=? WHERE id=?", ("missing_items", row["id"]))
return
# Process first valid event in this notification batch.
item = items[0]
tenant_id = item.get("tenantId") or item.get("organizationId") or ""
if CFG.ALLOWED_TEAMS_TENANT_IDS and tenant_id not in CFG.ALLOWED_TEAMS_TENANT_IDS:
db_exec("UPDATE events SET status='denied', reason=? WHERE id=?", ("tenant_not_allowed", row["id"]))
return
resource = item.get("resourceData", {}) if isinstance(item.get("resourceData"), dict) else {}
external_user_id = resource.get("id") or item.get("resource") or "unknown"
user_type = (resource.get("userType") or "").strip().lower()
if CFG.TEAMS_REQUIRE_GUEST_ONLY and user_type and user_type != "guest":
upsert_mapping("teams", exprocess_event function · python · L710-L717 (8 LOC)slack_n8n_provisioner.py
def process_event(row):
provider = row["provider"]
if provider == "slack":
process_slack_event(row)
elif provider == "teams":
process_teams_event(row)
else:
db_exec("UPDATE events SET status='denied', reason=? WHERE id=?", ("unknown_provider", row["id"]))