Function bodies 110 total
Tray function · typescript · L12-L65 (54 LOC)ags/widget/Tray.ts
export default function Tray() {
const tray = AstalTray.get_default()
const children = bind(tray, "items").as(items => {
return items.map(item => {
if (item.icon_theme_path !== null) {
App.add_icons(item.icon_theme_path)
}
let menu: Gtk.PopoverMenu;
const entryBinding = Variable.derive(
[bind(item, 'menuModel'), bind(item, 'actionGroup')],
(menuModel, actionGroup) => {
if (!menuModel) {
return console.error(`Menu Model not found for ${item.id}`);
}
if (!actionGroup) {
return console.error(`Action Group not found for ${item.id}`);
}
menu = createMenu(menuModel, actionGroup);
},
);
const button = Widget.Button({
tooltipMarkup: bind(item, "tooltipMarkup"),
onClicked: () => {
item.activate(0, 0);
},
}, Widget.Image(
{
gicon: bind(item, "gicon"),
}
))
// Add rigVolume function · typescript · L6-L42 (37 LOC)ags/widget/Volume.ts
export default function Volume() {
const speaker = Wp.get_default()?.audio.defaultSpeaker!
const muted = bind(speaker, "mute")
return Widget.Box({
css_classes: ["volume-widget"],
children: [
Widget.Button({
css_classes: bind(muted).as(m => m ? ["muted"] : []),
child: Widget.Image({
iconName: bind(muted).as(m =>
m ? "audio-volume-muted-symbolic" : "audio-volume-high-symbolic"
),
}),
onClicked: () => {
speaker.mute = !speaker.mute
},
}),
Widget.Button({
className: "volume-button-minus",
child: Widget.Image({ iconName: "list-remove-symbolic" }), // Use minus icon
onClicked: () => {
speaker.volume = Math.max(0, speaker.volume - 0.05); // Decrease by 5%
},
}),
Widget.Button({
className: "volume-button-plus",
child: Widget.Image({ iconName: "list-add-symbolic" }), // Use plus icon
onClicked: () => {
question function · javascript · L14-L18 (5 LOC)kitty/generate-kitty-session.js
function question(prompt) {
return new Promise((resolve) => {
rl.question(prompt, resolve);
});
}expandPath function · javascript · L20-L26 (7 LOC)kitty/generate-kitty-session.js
function expandPath(filePath) {
if (filePath.startsWith('~/')) {
return path.join(os.homedir(), filePath.slice(2));
}
// Resolve relative paths (like . or ..) to absolute
return path.resolve(filePath);
}validateDirectory function · javascript · L28-L42 (15 LOC)kitty/generate-kitty-session.js
function validateDirectory(dirPath) {
const expanded = expandPath(dirPath);
try {
const stats = fs.statSync(expanded);
if (!stats.isDirectory()) {
return { valid: false, error: `Path exists but is not a directory: ${dirPath}` };
}
return { valid: true, expanded };
} catch (error) {
if (error.code === 'ENOENT') {
return { valid: false, error: `Directory does not exist: ${dirPath}` };
}
return { valid: false, error: `Cannot access directory: ${error.message}` };
}
}generateSession function · javascript · L44-L81 (38 LOC)kitty/generate-kitty-session.js
function generateSession(projectPath) {
// Extract directory name for session name
const dirName = path.basename(projectPath);
// Generate the session content based on the silence template
const sessionContent = `
new_tab
layout fat
enabled_layouts fat,grid,horizontal,splits,stack,tall,vertical
set_layout_state {"main_bias": [0.5, 0.5], "biased_map": {}, "opts": {"full_size": 1, "bias": 50, "mirrored": "n"}, "class": "Fat", "all_windows": {"active_group_idx": 0, "active_group_history": [1], "window_groups": [{"id": 1, "window_ids": [1]}]}}
cd ${projectPath}
launch 'kitty-unserialize-data={"id": 1, "cmd_at_shell_startup": "nvim"}'
focus
new_tab
layout fat
enabled_layouts fat,grid,horizontal,splits,stack,tall,vertical
set_layout_state {"main_bias": [0.5, 0.5], "biased_map": {}, "opts": {"full_size": 1, "bias": 50, "mirrored": "n"}, "class": "Fat", "all_windows": {"active_group_idx": 0, "active_group_history": [2], "window_groups": [{"id": 2, "window_ids": [2]}]}}
cd ${projectPmain function · javascript · L83-L162 (80 LOC)kitty/generate-kitty-session.js
async function main() {
// Parse command-line arguments
const args = process.argv.slice(2);
const spawnFlag = args.includes('--spawn');
const localFlag = args.includes('--local');
const pathArgs = args.filter(arg => !arg.startsWith('--'));
let projectPath;
if (pathArgs.length > 0) {
projectPath = pathArgs[0];
} else {
console.log('Kitty Session Generator\n');
projectPath = await question('Enter project path (e.g., ~/projects/myproject): ');
}
if (!projectPath.trim()) {
console.error('Error: Project path cannot be empty');
rl.close();
process.exit(1);
}
// Validate that the directory exists
const validation = validateDirectory(projectPath.trim());
if (!validation.valid) {
console.error(`\nError: ${validation.error}`);
rl.close();
process.exit(1);
}
// Use the expanded (absolute) path for session generation
const { dirName, sessionContent } = generateSession(validation.expanded);
const configKittyDir = path.joinRepobility · severity-and-effort ranking · https://repobility.com
expandPath function · go · L43-L52 (10 LOC)kitty/session-generator/main.go
func expandPath(p string) (string, error) {
if strings.HasPrefix(p, "~/") {
home, err := os.UserHomeDir()
if err != nil {
return "", err
}
return filepath.Join(home, p[2:]), nil
}
return filepath.Abs(p)
}validateDirectory function · go · L54-L73 (20 LOC)kitty/session-generator/main.go
func validateDirectory(p string) (string, error) {
expanded, err := expandPath(p)
if err != nil {
return "", fmt.Errorf("cannot expand path: %w", err)
}
info, err := os.Stat(expanded)
if err != nil {
if os.IsNotExist(err) {
return "", fmt.Errorf("directory does not exist: %s", p)
}
return "", fmt.Errorf("cannot access directory: %w", err)
}
if !info.IsDir() {
return "", fmt.Errorf("path exists but is not a directory: %s", p)
}
return expanded, nil
}generateSession function · go · L75-L77 (3 LOC)kitty/session-generator/main.go
func generateSession(projectPath string) string {
return fmt.Sprintf(sessionTemplate, projectPath, projectPath, projectPath)
}prompt function · go · L79-L84 (6 LOC)kitty/session-generator/main.go
func prompt(message string) string {
fmt.Print(message)
reader := bufio.NewReader(os.Stdin)
input, _ := reader.ReadString('\n')
return strings.TrimSpace(input)
}main function · go · L86-L186 (101 LOC)kitty/session-generator/main.go
func main() {
spawnFlag := flag.Bool("spawn", false, "Spawn kitty with the session after creation")
localFlag := flag.Bool("local", false, "Write directly to ~/.config/kitty/ instead of dotfiles")
flag.Parse()
args := flag.Args()
var projectPath string
if len(args) > 0 {
projectPath = args[0]
} else {
fmt.Println("Kitty Session Generator\n")
projectPath = prompt("Enter project path (e.g., ~/projects/myproject): ")
}
if projectPath == "" {
fmt.Fprintln(os.Stderr, "Error: Project path cannot be empty")
os.Exit(1)
}
expanded, err := validateDirectory(projectPath)
if err != nil {
fmt.Fprintf(os.Stderr, "\nError: %v\n", err)
os.Exit(1)
}
dirName := filepath.Base(expanded)
sessionContent := generateSession(expanded)
home, err := os.UserHomeDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Error: cannot get home directory: %v\n", err)
os.Exit(1)
}
configKittyDir := filepath.Join(home, ".config", "kitty")
sessionPath := filepath.Join(configKittyDir, dirColors class · python · L14-L18 (5 LOC)scripts/adb-auto-connect.py
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
BLUE = '\033[0;34m'
NC = '\033[0m'run_command function · python · L21-L33 (13 LOC)scripts/adb-auto-connect.py
def run_command(cmd: List[str], capture=True) -> tuple[bool, str]:
"""Run a command and return success status and output"""
try:
if capture:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
return result.returncode == 0, result.stdout + result.stderr
else:
result = subprocess.run(cmd, timeout=10)
return result.returncode == 0, ""
except subprocess.TimeoutExpired:
return False, "Command timed out"
except Exception as e:
return False, str(e)get_mdns_devices function · python · L36-L56 (21 LOC)scripts/adb-auto-connect.py
def get_mdns_devices() -> List[Dict[str, str]]:
"""Get list of devices from mDNS discovery"""
success, output = run_command(["adb", "mdns", "services"])
if not success:
print(f"{Colors.RED}Failed to query mDNS services{Colors.NC}")
sys.exit(1)
devices = []
for line in output.strip().split('\n'):
if '_adb-tls-connect._tcp' in line:
parts = line.split('\t')
if len(parts) >= 3:
service_name = parts[0].strip()
address = parts[2].strip()
devices.append({
'service_name': service_name,
'address': address
})
return devicesSame scanner, your repo: https://repobility.com — Repobility
deduplicate_devices function · python · L59-L89 (31 LOC)scripts/adb-auto-connect.py
def deduplicate_devices(devices: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Deduplicate devices by service name, always selecting the address with the smallest port"""
seen = {}
all_addresses = {}
for device in devices:
service = device['service_name']
# Remove (n) suffix if present
clean_service = re.sub(r' \(\d+\)$', '', service)
# Collect all addresses for this service
if clean_service not in all_addresses:
all_addresses[clean_service] = []
all_addresses[clean_service].append(device['address'])
# For each unique service, select the address with the smallest port
result = []
for clean_service, addresses in all_addresses.items():
# Sort addresses by port number (extract port from IP:PORT)
sorted_addresses = sorted(addresses, key=lambda addr: int(addr.split(':')[1]))
# Select the one with the smallest port
selected_address = sorted_addresses[0]
resget_device_info function · python · L92-L119 (28 LOC)scripts/adb-auto-connect.py
def get_device_info(address: str) -> Optional[Dict[str, str]]:
"""Try to get device information after connecting"""
# Try to connect
success, _ = run_command(["adb", "connect", address])
if not success:
return None
# Give it a moment to establish connection
import time
time.sleep(1)
# Try to get device properties
info = {}
success, model = run_command(["adb", "-s", address, "shell", "getprop", "ro.product.model"])
if success:
info['model'] = model.strip()
success, manufacturer = run_command(["adb", "-s", address, "shell", "getprop", "ro.product.manufacturer"])
if success:
info['manufacturer'] = manufacturer.strip()
success, device_name = run_command(["adb", "-s", address, "shell", "settings", "get", "global", "device_name"])
if success and device_name.strip() and device_name.strip() != "null":
info['device_name'] = device_name.strip()
return info if info else Noneformat_device_name function · python · L122-L130 (9 LOC)scripts/adb-auto-connect.py
def format_device_name(device: Dict[str, str], info: Optional[Dict[str, str]]) -> str:
"""Format a display name for the device"""
if info:
if 'device_name' in info:
return f"{info['device_name']} ({info.get('manufacturer', '')} {info.get('model', '')})"
elif 'model' in info:
return f"{info.get('manufacturer', '')} {info['model']}"
return device['service_name']connect_device function · python · L133-L138 (6 LOC)scripts/adb-auto-connect.py
def connect_device(address: str) -> bool:
"""Connect to a device"""
success, output = run_command(["adb", "connect", address])
print(output)
return "connected" in output.lower()pair_device function · python · L141-L146 (6 LOC)scripts/adb-auto-connect.py
def pair_device(ip: str, port: str, code: str) -> bool:
"""Pair with a device using pairing code"""
print(f"Pairing with {ip}:{port}...")
success, output = run_command(["adb", "pair", f"{ip}:{port}", code])
print(output)
return successmain function · python · L149-L234 (86 LOC)scripts/adb-auto-connect.py
def main():
print(f"{Colors.BLUE}Discovering Android devices via mDNS...{Colors.NC}")
devices = get_mdns_devices()
if not devices:
print(f"{Colors.RED}No devices found via mDNS.{Colors.NC}")
print("Make sure:")
print(" 1. Wireless Debugging is enabled on your Android device")
print(" 2. Your device and computer are on the same network")
print(" 3. Your device is Android 11+ with mDNS support")
sys.exit(1)
# Deduplicate
unique_devices = deduplicate_devices(devices)
print(f"{Colors.GREEN}Found {len(unique_devices)} unique device(s){Colors.NC}\n")
# Select device
if len(unique_devices) == 1:
selected = unique_devices[0]
print(f"{Colors.GREEN}Device:{Colors.NC} {Colors.BLUE}{selected['service_name']}{Colors.NC}")
print(f"{Colors.GREEN}Address:{Colors.NC} {Colors.BLUE}{selected['address']}{Colors.NC}")
# Show all addresses if there are duplicates
if len(selected.gmain function · python · L15-L66 (52 LOC)scripts/ai.py
def main():
parser = argparse.ArgumentParser(description="Query ai.dimalip.in")
parser.add_argument("text", nargs="*", help="Prompt text (or pipe via stdin)")
parser.add_argument("-m", "--model", default="default", help="Model name (default: default)")
parser.add_argument("-v", "--verbose", action="store_true", help="Show network logs")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("httpx").setLevel(logging.DEBUG)
logging.getLogger("httpcore").setLevel(logging.DEBUG)
api_key = os.environ.get("CLIPROXYAPI_API_KEY") or (os.environ.get("CLIPROXYAPI_API_KEYS") or "").split(",")[0]
if not api_key:
print("Error: set CLIPROXYAPI_API_KEY or CLIPROXYAPI_API_KEYS env var", file=sys.stderr)
sys.exit(1)
if args.text:
text = " ".join(args.text)
elif not sys.stdin.isatty():
text = sys.stdin.read().strip()
else:
print("Error: provide tlog function · typescript · L10-L12 (3 LOC)scripts/bt-capture-setup.ts
function log(msg: string): void {
console.log(`[bt-capture] ${msg}`);
}Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
logVerbose function · typescript · L14-L16 (3 LOC)scripts/bt-capture-setup.ts
function logVerbose(msg: string): void {
if (VERBOSE) console.log(`[bt-capture] ${msg}`);
}logError function · typescript · L18-L20 (3 LOC)scripts/bt-capture-setup.ts
function logError(msg: string): void {
console.error(`[bt-capture] ERROR: ${msg}`);
}run function · typescript · L22-L28 (7 LOC)scripts/bt-capture-setup.ts
function run(cmd: string, { quiet = false } = {}): string {
logVerbose(`exec: ${cmd}`);
const result = execSync(cmd, { encoding: "utf-8" }).trim();
if (result && !quiet) log(`output: ${result}`);
if (result && quiet) logVerbose(`output: ${result}`);
return result;
}tryRun function · typescript · L30-L37 (8 LOC)scripts/bt-capture-setup.ts
function tryRun(cmd: string, opts?: { quiet?: boolean }): string | null {
try {
return run(cmd, opts);
} catch (e) {
logError(`command failed: ${cmd} — ${e instanceof Error ? e.message : e}`);
return null;
}
}promptChoice function · typescript · L39-L60 (22 LOC)scripts/bt-capture-setup.ts
function promptChoice(addrs: string[]): string {
console.log("\nMultiple BT devices found:");
for (let i = 0; i < addrs.length; i++) {
console.log(` ${i + 1}) ${addrs[i]}`);
}
process.stdout.write("\nSelect device [1-9]: ");
const buf = Buffer.alloc(1);
const fd = openSync("/dev/tty", "r");
readSync(fd, buf, 0, 1, null);
closeSync(fd);
const ch = buf.toString("utf-8").trim();
const idx = parseInt(ch, 10) - 1;
if (isNaN(idx) || idx < 0 || idx >= addrs.length) {
logError(`Invalid selection: "${ch}"`);
process.exit(1);
}
log(`Selected device ${idx + 1}: ${addrs[idx]}`);
return addrs[idx];
}sinkExists function · typescript · L86-L92 (7 LOC)scripts/bt-capture-setup.ts
function sinkExists(): boolean {
log("Checking if sink already exists...");
const out = tryRun("pactl list sinks short");
const exists = out?.includes(SINK_NAME) ?? false;
log(`Sink "${SINK_NAME}" exists: ${exists}`);
return exists;
}createSink function · typescript · L94-L104 (11 LOC)scripts/bt-capture-setup.ts
function createSink(): void {
if (sinkExists()) {
log(`Sink "${SINK_NAME}" already exists, reusing.`);
return;
}
log(`Creating null sink "${SINK_NAME}"...`);
run(
`pactl load-module module-null-sink sink_name=${SINK_NAME} sink_properties=device.description=${SINK_DESC}`
);
log(`Created null sink "${SINK_NAME}".`);
}waitForPorts function · typescript · L106-L126 (21 LOC)scripts/bt-capture-setup.ts
function waitForPorts(
nodeNames: string[],
{ timeout = 5000, interval = 200 } = {}
): boolean {
log(`Waiting for ports to appear for: ${nodeNames.join(", ")} (timeout ${timeout}ms)...`);
const deadline = Date.now() + timeout;
while (Date.now() < deadline) {
const lines = tryRun("pw-link -o", { quiet: true }) ?? "";
const allFound = nodeNames.every((name) =>
lines.split("\n").some((l) => l.startsWith(`${name}:`))
);
if (allFound) {
log("All ports registered.");
return true;
}
log(`Ports not ready yet, retrying in ${interval}ms...`);
execSync(`sleep ${interval / 1000}`);
}
logError(`Timed out waiting for ports after ${timeout}ms.`);
return false;
}Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
link function · typescript · L128-L134 (7 LOC)scripts/bt-capture-setup.ts
function link(src: string, dst: string): void {
log(`Linking: ${src} -> ${dst}`);
const result = tryRun(`pw-link "${src}" "${dst}"`);
if (result === null) {
log(`Link may already exist (pw-link returned non-zero), continuing.`);
}
}getOutputPorts function · typescript · L136-L142 (7 LOC)scripts/bt-capture-setup.ts
function getOutputPorts(nodeName: string): string[] {
log(`Getting output ports for "${nodeName}"...`);
const lines = run("pw-link -o", { quiet: true });
const ports = lines.split("\n").filter((l) => l.startsWith(`${nodeName}:`));
log(`Found ${ports.length} port(s) for "${nodeName}": ${ports.join(", ") || "none"}`);
return ports;
}setupLinks function · typescript · L144-L175 (32 LOC)scripts/bt-capture-setup.ts
function setupLinks(device: { input: string; output: string }): void {
log("Setting up PipeWire links...");
const inputPorts = getOutputPorts(device.input);
const allOutputPorts = getOutputPorts(device.output);
const monitorPorts = allOutputPorts.filter((p) => p.includes("monitor_"));
log(`Input ports (mic): ${inputPorts.length}`);
log(`Output ports (headphone): ${allOutputPorts.length} total, ${monitorPorts.length} monitor`);
if (inputPorts.length === 0) {
logError("No input (mic) ports found — mic capture won't work.");
}
if (monitorPorts.length === 0) {
logError("No monitor ports found — headphone capture won't work.");
}
// Link mic -> combined sink (mono mic goes to both channels)
for (const port of inputPorts) {
link(port, `${SINK_NAME}:playback_FL`);
link(port, `${SINK_NAME}:playback_FR`);
log(`Linked mic ${port} -> ${SINK_NAME}:playback_FL/FR`);
}
// Link headphone monitor -> combined sink
for (const port of monitorPorts) {
startRecording function · typescript · L177-L202 (26 LOC)scripts/bt-capture-setup.ts
function startRecording(outFile: string): void {
log(`Starting recording to ${outFile}...`);
log(`Target: ${SINK_NAME}.monitor`);
console.log("Press Ctrl+C to stop.\n");
const proc = spawn(
"pw-record",
["--target", `${SINK_NAME}.monitor`, outFile],
{ stdio: "inherit" }
);
proc.on("error", (err) => {
logError(`Failed to start pw-record: ${err.message}`);
});
process.on("SIGINT", () => {
log("Received SIGINT, stopping recording...");
proc.kill("SIGINT");
});
proc.on("exit", (code, signal) => {
log(`pw-record exited with code=${code}, signal=${signal}`);
console.log(`\nRecording saved to ${outFile}`);
process.exit(code ?? 0);
});
}parseArgs function · typescript · L227-L252 (26 LOC)scripts/bt-capture-setup.ts
function parseArgs(argv: string[]): Command {
if (argv.includes("-h") || argv.includes("--help")) {
return { kind: "help" };
}
if (argv.includes("teardown") || argv.includes("--teardown")) {
return { kind: "teardown" };
}
if (argv.includes("status") || argv.includes("--status")) {
return { kind: "status" };
}
const cmd: Command = { kind: "setup" };
const recordIdx = argv.indexOf("--record");
if (recordIdx !== -1) {
const outFile = argv[recordIdx + 1];
if (!outFile || outFile.startsWith("-")) {
logError("--record requires an output file path");
process.exit(1);
}
cmd.record = outFile;
}
return cmd;
}status function · typescript · L272-L302 (31 LOC)scripts/bt-capture-setup.ts
function status(): void {
log("Checking status...");
// Sink
const hasSink = sinkExists();
console.log(`\nSink "${SINK_NAME}": ${hasSink ? "ACTIVE" : "NOT FOUND"}`);
// BT device
const device = findBluezDevice();
if (device) {
console.log(`BT device input: ${device.input}`);
console.log(`BT device output: ${device.output}`);
} else {
console.log("BT device: NOT FOUND");
}
// Links involving our sink
const allLinks = getLinks();
const sinkLinks = allLinks.filter(
(l) => l.src.includes(SINK_NAME) || l.dst.includes(SINK_NAME)
);
if (sinkLinks.length > 0) {
console.log(`\nActive links (${sinkLinks.length}):`);
for (const l of sinkLinks) {
console.log(` ${l.src} -> ${l.dst}`);
}
} else {
console.log("\nNo active links to sink.");
}
}teardown function · typescript · L304-L316 (13 LOC)scripts/bt-capture-setup.ts
function teardown(): void {
log("Teardown requested...");
const out = tryRun("pactl list modules short");
const line = out?.split("\n").find((l) => l.includes(SINK_NAME));
if (line) {
const moduleId = line.split("\t")[0];
log(`Found module ${moduleId}, unloading...`);
run(`pactl unload-module ${moduleId}`);
log(`Removed sink "${SINK_NAME}".`);
} else {
log(`Sink "${SINK_NAME}" not found, nothing to remove.`);
}
}setup function · typescript · L318-L345 (28 LOC)scripts/bt-capture-setup.ts
function setup(record?: string): void {
log("Starting BT capture setup...");
const device = findBluezDevice();
if (!device) {
logError("No Bluetooth audio device found. Is it connected?");
process.exit(1);
}
log(`Found BT device — input: ${device.input}, output: ${device.output}`);
createSink();
if (!waitForPorts([SINK_NAME, device.input, device.output])) {
logError("Required ports did not appear in time. Aborting.");
process.exit(1);
}
setupLinks(device);
log("Setup complete.");
console.log(`\nDone. Select "${SINK_DESC}" monitor as recording source in Audacity.`);
console.log(`Or record from CLI: parecord -d ${SINK_NAME}.monitor --file-format=wav output.wav`);
if (record) {
startRecording(record);
}
}Repobility · severity-and-effort ranking · https://repobility.com
main function · typescript · L347-L367 (21 LOC)scripts/bt-capture-setup.ts
function main(): void {
const argv = process.argv.slice(2);
VERBOSE = argv.includes("-v") || argv.includes("--verbose");
const cmd = parseArgs(argv);
log(`Command: ${cmd.kind}`);
switch (cmd.kind) {
case "help":
console.log(USAGE);
break;
case "teardown":
teardown();
break;
case "status":
status();
break;
case "setup":
setup(cmd.record);
break;
}
}_CallbackHandler class · python · L21-L32 (12 LOC)spotify/spotify.py
class _CallbackHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
global auth_code
params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
auth_code = params.get("code", [None])[0]
self.send_response(200 if auth_code else 400)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"<h1>Success! You can close this tab.</h1>" if auth_code else b"Authorization failed.")
def log_message(self, *_):
passdo_GET method · python · L22-L29 (8 LOC)spotify/spotify.py
def do_GET(self):
global auth_code
params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
auth_code = params.get("code", [None])[0]
self.send_response(200 if auth_code else 400)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"<h1>Success! You can close this tab.</h1>" if auth_code else b"Authorization failed.")_token_request function · python · L35-L38 (4 LOC)spotify/spotify.py
def _token_request(data: dict) -> dict:
resp = httpx.post("https://accounts.spotify.com/api/token", data=data, auth=(CLIENT_ID, CLIENT_SECRET))
resp.raise_for_status()
return resp.json()_save_token function · python · L41-L43 (3 LOC)spotify/spotify.py
def _save_token(data):
with open(TOKEN_FILE, "w") as f:
json.dump(data, f)_load_token function · python · L46-L51 (6 LOC)spotify/spotify.py
def _load_token():
try:
with open(TOKEN_FILE) as f:
return json.load(f)
except FileNotFoundError:
return Noneauthenticate function · python · L54-L88 (35 LOC)spotify/spotify.py
def authenticate() -> httpx.Client:
"""Authenticate and return an httpx.Client with auth headers set."""
global auth_code
saved = _load_token()
if saved and "refresh_token" in saved:
print("Refreshing saved token...")
try:
token_data = _token_request({"grant_type": "refresh_token", "refresh_token": saved["refresh_token"]})
token_data.setdefault("refresh_token", saved["refresh_token"])
_save_token(token_data)
print(f"Authorized (cached). Scopes: {token_data.get('scope', 'none')}")
return _client(token_data["access_token"])
except httpx.HTTPStatusError:
print("Refresh failed, re-authenticating...")
auth_url = (
f"https://accounts.spotify.com/authorize?"
f"client_id={CLIENT_ID}&response_type=code&redirect_uri={urllib.parse.quote(REDIRECT_URI)}"
f"&scope={urllib.parse.quote(SCOPES)}&show_dialog=true"
)
print("Opening browser for Spotify autho_client function · python · L91-L95 (5 LOC)spotify/spotify.py
def _client(token: str) -> httpx.Client:
return httpx.Client(
base_url="https://api.spotify.com/v1",
headers={"Authorization": f"Bearer {token}"},
)Same scanner, your repo: https://repobility.com — Repobility
search_track function · python · L98-L106 (9 LOC)spotify/spotify.py
def search_track(client: httpx.Client, artist: str, title: str):
for q in [f"artist:{artist} track:{title}", f"{artist} {title}"]:
resp = client.get("/search", params={"q": q, "type": "track", "limit": 5})
resp.raise_for_status()
tracks = resp.json().get("tracks", {}).get("items", [])
if tracks:
t = tracks[0]
return t["uri"], t["name"], t["artists"][0]["name"]
return None, None, Nonecreate_playlist function · python · L109-L113 (5 LOC)spotify/spotify.py
def create_playlist(client: httpx.Client, name: str, description: str = "", public: bool = False):
resp = client.post("/me/playlists", json={"name": name, "description": description, "public": public})
resp.raise_for_status()
playlist = resp.json()
return playlist["id"], playlist["external_urls"]["spotify"]add_tracks function · python · L116-L118 (3 LOC)spotify/spotify.py
def add_tracks(client: httpx.Client, playlist_id: str, uris: list[str]):
resp = client.post(f"/playlists/{playlist_id}/items", json={"uris": uris})
resp.raise_for_status()