← back to festeh__dotfiles

Function bodies 110 total

All specs Real LLM only Function bodies
Tray function · typescript · L12-L65 (54 LOC)
ags/widget/Tray.ts
export default function Tray() {
  const tray = AstalTray.get_default()
  const children = bind(tray, "items").as(items => {
    return items.map(item => {
      if (item.icon_theme_path !== null) {
        App.add_icons(item.icon_theme_path)
      }
      let menu: Gtk.PopoverMenu;

      const entryBinding = Variable.derive(
        [bind(item, 'menuModel'), bind(item, 'actionGroup')],
        (menuModel, actionGroup) => {
          if (!menuModel) {
            return console.error(`Menu Model not found for ${item.id}`);
          }
          if (!actionGroup) {
            return console.error(`Action Group not found for ${item.id}`);
          }

          menu = createMenu(menuModel, actionGroup);
        },
      );

      const button = Widget.Button({
        tooltipMarkup: bind(item, "tooltipMarkup"),
        onClicked: () => {
          item.activate(0, 0);
        },

      }, Widget.Image(
        {
          gicon: bind(item, "gicon"),
        }
      ))

      // Add rig
Volume function · typescript · L6-L42 (37 LOC)
ags/widget/Volume.ts
export default function Volume() {
  const speaker = Wp.get_default()?.audio.defaultSpeaker!
  const muted = bind(speaker, "mute")
  return Widget.Box({
    css_classes: ["volume-widget"],
    children: [
      Widget.Button({
        css_classes: bind(muted).as(m => m ? ["muted"] : []),
        child: Widget.Image({
          iconName: bind(muted).as(m =>
            m ? "audio-volume-muted-symbolic" : "audio-volume-high-symbolic"
          ),
        }),
        onClicked: () => {
          speaker.mute = !speaker.mute
        },
      }),
      Widget.Button({
        className: "volume-button-minus",
        child: Widget.Image({ iconName: "list-remove-symbolic" }), // Use minus icon
        onClicked: () => {
          speaker.volume = Math.max(0, speaker.volume - 0.05); // Decrease by 5%
        },
      }),
      Widget.Button({
        className: "volume-button-plus",
        child: Widget.Image({ iconName: "list-add-symbolic" }), // Use plus icon
        onClicked: () => {
   
question function · javascript · L14-L18 (5 LOC)
kitty/generate-kitty-session.js
function question(prompt) {
  return new Promise((resolve) => {
    rl.question(prompt, resolve);
  });
}
expandPath function · javascript · L20-L26 (7 LOC)
kitty/generate-kitty-session.js
function expandPath(filePath) {
  if (filePath.startsWith('~/')) {
    return path.join(os.homedir(), filePath.slice(2));
  }
  // Resolve relative paths (like . or ..) to absolute
  return path.resolve(filePath);
}
validateDirectory function · javascript · L28-L42 (15 LOC)
kitty/generate-kitty-session.js
function validateDirectory(dirPath) {
  const expanded = expandPath(dirPath);
  try {
    const stats = fs.statSync(expanded);
    if (!stats.isDirectory()) {
      return { valid: false, error: `Path exists but is not a directory: ${dirPath}` };
    }
    return { valid: true, expanded };
  } catch (error) {
    if (error.code === 'ENOENT') {
      return { valid: false, error: `Directory does not exist: ${dirPath}` };
    }
    return { valid: false, error: `Cannot access directory: ${error.message}` };
  }
}
generateSession function · javascript · L44-L81 (38 LOC)
kitty/generate-kitty-session.js
function generateSession(projectPath) {
  // Extract directory name for session name
  const dirName = path.basename(projectPath);

  // Generate the session content based on the silence template
  const sessionContent = `
new_tab
layout fat
enabled_layouts fat,grid,horizontal,splits,stack,tall,vertical
set_layout_state {"main_bias": [0.5, 0.5], "biased_map": {}, "opts": {"full_size": 1, "bias": 50, "mirrored": "n"}, "class": "Fat", "all_windows": {"active_group_idx": 0, "active_group_history": [1], "window_groups": [{"id": 1, "window_ids": [1]}]}}
cd ${projectPath}

launch 'kitty-unserialize-data={"id": 1, "cmd_at_shell_startup": "nvim"}'
focus

new_tab
layout fat
enabled_layouts fat,grid,horizontal,splits,stack,tall,vertical
set_layout_state {"main_bias": [0.5, 0.5], "biased_map": {}, "opts": {"full_size": 1, "bias": 50, "mirrored": "n"}, "class": "Fat", "all_windows": {"active_group_idx": 0, "active_group_history": [2], "window_groups": [{"id": 2, "window_ids": [2]}]}}
cd ${projectP
main function · javascript · L83-L162 (80 LOC)
kitty/generate-kitty-session.js
async function main() {
  // Parse command-line arguments
  const args = process.argv.slice(2);
  const spawnFlag = args.includes('--spawn');
  const localFlag = args.includes('--local');
  const pathArgs = args.filter(arg => !arg.startsWith('--'));
  let projectPath;

  if (pathArgs.length > 0) {
    projectPath = pathArgs[0];
  } else {
    console.log('Kitty Session Generator\n');
    projectPath = await question('Enter project path (e.g., ~/projects/myproject): ');
  }

  if (!projectPath.trim()) {
    console.error('Error: Project path cannot be empty');
    rl.close();
    process.exit(1);
  }

  // Validate that the directory exists
  const validation = validateDirectory(projectPath.trim());
  if (!validation.valid) {
    console.error(`\nError: ${validation.error}`);
    rl.close();
    process.exit(1);
  }

  // Use the expanded (absolute) path for session generation
  const { dirName, sessionContent } = generateSession(validation.expanded);

  const configKittyDir = path.join
Repobility · severity-and-effort ranking · https://repobility.com
expandPath function · go · L43-L52 (10 LOC)
kitty/session-generator/main.go
func expandPath(p string) (string, error) {
	if strings.HasPrefix(p, "~/") {
		home, err := os.UserHomeDir()
		if err != nil {
			return "", err
		}
		return filepath.Join(home, p[2:]), nil
	}
	return filepath.Abs(p)
}
validateDirectory function · go · L54-L73 (20 LOC)
kitty/session-generator/main.go
func validateDirectory(p string) (string, error) {
	expanded, err := expandPath(p)
	if err != nil {
		return "", fmt.Errorf("cannot expand path: %w", err)
	}

	info, err := os.Stat(expanded)
	if err != nil {
		if os.IsNotExist(err) {
			return "", fmt.Errorf("directory does not exist: %s", p)
		}
		return "", fmt.Errorf("cannot access directory: %w", err)
	}

	if !info.IsDir() {
		return "", fmt.Errorf("path exists but is not a directory: %s", p)
	}

	return expanded, nil
}
generateSession function · go · L75-L77 (3 LOC)
kitty/session-generator/main.go
func generateSession(projectPath string) string {
	return fmt.Sprintf(sessionTemplate, projectPath, projectPath, projectPath)
}
prompt function · go · L79-L84 (6 LOC)
kitty/session-generator/main.go
func prompt(message string) string {
	fmt.Print(message)
	reader := bufio.NewReader(os.Stdin)
	input, _ := reader.ReadString('\n')
	return strings.TrimSpace(input)
}
main function · go · L86-L186 (101 LOC)
kitty/session-generator/main.go
func main() {
	spawnFlag := flag.Bool("spawn", false, "Spawn kitty with the session after creation")
	localFlag := flag.Bool("local", false, "Write directly to ~/.config/kitty/ instead of dotfiles")
	flag.Parse()

	args := flag.Args()
	var projectPath string

	if len(args) > 0 {
		projectPath = args[0]
	} else {
		fmt.Println("Kitty Session Generator\n")
		projectPath = prompt("Enter project path (e.g., ~/projects/myproject): ")
	}

	if projectPath == "" {
		fmt.Fprintln(os.Stderr, "Error: Project path cannot be empty")
		os.Exit(1)
	}

	expanded, err := validateDirectory(projectPath)
	if err != nil {
		fmt.Fprintf(os.Stderr, "\nError: %v\n", err)
		os.Exit(1)
	}

	dirName := filepath.Base(expanded)
	sessionContent := generateSession(expanded)

	home, err := os.UserHomeDir()
	if err != nil {
		fmt.Fprintf(os.Stderr, "Error: cannot get home directory: %v\n", err)
		os.Exit(1)
	}

	configKittyDir := filepath.Join(home, ".config", "kitty")
	sessionPath := filepath.Join(configKittyDir, dir
Colors class · python · L14-L18 (5 LOC)
scripts/adb-auto-connect.py
class Colors:
    RED = '\033[0;31m'
    GREEN = '\033[0;32m'
    BLUE = '\033[0;34m'
    NC = '\033[0m'
run_command function · python · L21-L33 (13 LOC)
scripts/adb-auto-connect.py
def run_command(cmd: List[str], capture=True) -> tuple[bool, str]:
    """Run a command and return success status and output"""
    try:
        if capture:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
            return result.returncode == 0, result.stdout + result.stderr
        else:
            result = subprocess.run(cmd, timeout=10)
            return result.returncode == 0, ""
    except subprocess.TimeoutExpired:
        return False, "Command timed out"
    except Exception as e:
        return False, str(e)
get_mdns_devices function · python · L36-L56 (21 LOC)
scripts/adb-auto-connect.py
def get_mdns_devices() -> List[Dict[str, str]]:
    """Get list of devices from mDNS discovery"""
    success, output = run_command(["adb", "mdns", "services"])

    if not success:
        print(f"{Colors.RED}Failed to query mDNS services{Colors.NC}")
        sys.exit(1)

    devices = []
    for line in output.strip().split('\n'):
        if '_adb-tls-connect._tcp' in line:
            parts = line.split('\t')
            if len(parts) >= 3:
                service_name = parts[0].strip()
                address = parts[2].strip()
                devices.append({
                    'service_name': service_name,
                    'address': address
                })

    return devices
Same scanner, your repo: https://repobility.com — Repobility
deduplicate_devices function · python · L59-L89 (31 LOC)
scripts/adb-auto-connect.py
def deduplicate_devices(devices: List[Dict[str, str]]) -> List[Dict[str, str]]:
    """Deduplicate devices by service name, always selecting the address with the smallest port"""
    seen = {}
    all_addresses = {}

    for device in devices:
        service = device['service_name']
        # Remove (n) suffix if present
        clean_service = re.sub(r' \(\d+\)$', '', service)

        # Collect all addresses for this service
        if clean_service not in all_addresses:
            all_addresses[clean_service] = []
        all_addresses[clean_service].append(device['address'])

    # For each unique service, select the address with the smallest port
    result = []
    for clean_service, addresses in all_addresses.items():
        # Sort addresses by port number (extract port from IP:PORT)
        sorted_addresses = sorted(addresses, key=lambda addr: int(addr.split(':')[1]))

        # Select the one with the smallest port
        selected_address = sorted_addresses[0]

        res
get_device_info function · python · L92-L119 (28 LOC)
scripts/adb-auto-connect.py
def get_device_info(address: str) -> Optional[Dict[str, str]]:
    """Try to get device information after connecting"""
    # Try to connect
    success, _ = run_command(["adb", "connect", address])

    if not success:
        return None

    # Give it a moment to establish connection
    import time
    time.sleep(1)

    # Try to get device properties
    info = {}

    success, model = run_command(["adb", "-s", address, "shell", "getprop", "ro.product.model"])
    if success:
        info['model'] = model.strip()

    success, manufacturer = run_command(["adb", "-s", address, "shell", "getprop", "ro.product.manufacturer"])
    if success:
        info['manufacturer'] = manufacturer.strip()

    success, device_name = run_command(["adb", "-s", address, "shell", "settings", "get", "global", "device_name"])
    if success and device_name.strip() and device_name.strip() != "null":
        info['device_name'] = device_name.strip()

    return info if info else None
format_device_name function · python · L122-L130 (9 LOC)
scripts/adb-auto-connect.py
def format_device_name(device: Dict[str, str], info: Optional[Dict[str, str]]) -> str:
    """Format a display name for the device"""
    if info:
        if 'device_name' in info:
            return f"{info['device_name']} ({info.get('manufacturer', '')} {info.get('model', '')})"
        elif 'model' in info:
            return f"{info.get('manufacturer', '')} {info['model']}"

    return device['service_name']
connect_device function · python · L133-L138 (6 LOC)
scripts/adb-auto-connect.py
def connect_device(address: str) -> bool:
    """Connect to a device"""
    success, output = run_command(["adb", "connect", address])
    print(output)

    return "connected" in output.lower()
pair_device function · python · L141-L146 (6 LOC)
scripts/adb-auto-connect.py
def pair_device(ip: str, port: str, code: str) -> bool:
    """Pair with a device using pairing code"""
    print(f"Pairing with {ip}:{port}...")
    success, output = run_command(["adb", "pair", f"{ip}:{port}", code])
    print(output)
    return success
main function · python · L149-L234 (86 LOC)
scripts/adb-auto-connect.py
def main():
    print(f"{Colors.BLUE}Discovering Android devices via mDNS...{Colors.NC}")

    devices = get_mdns_devices()

    if not devices:
        print(f"{Colors.RED}No devices found via mDNS.{Colors.NC}")
        print("Make sure:")
        print("  1. Wireless Debugging is enabled on your Android device")
        print("  2. Your device and computer are on the same network")
        print("  3. Your device is Android 11+ with mDNS support")
        sys.exit(1)

    # Deduplicate
    unique_devices = deduplicate_devices(devices)

    print(f"{Colors.GREEN}Found {len(unique_devices)} unique device(s){Colors.NC}\n")

    # Select device
    if len(unique_devices) == 1:
        selected = unique_devices[0]
        print(f"{Colors.GREEN}Device:{Colors.NC} {Colors.BLUE}{selected['service_name']}{Colors.NC}")
        print(f"{Colors.GREEN}Address:{Colors.NC} {Colors.BLUE}{selected['address']}{Colors.NC}")

        # Show all addresses if there are duplicates
        if len(selected.g
main function · python · L15-L66 (52 LOC)
scripts/ai.py
def main():
    parser = argparse.ArgumentParser(description="Query ai.dimalip.in")
    parser.add_argument("text", nargs="*", help="Prompt text (or pipe via stdin)")
    parser.add_argument("-m", "--model", default="default", help="Model name (default: default)")
    parser.add_argument("-v", "--verbose", action="store_true", help="Show network logs")
    args = parser.parse_args()

    if args.verbose:
        logging.basicConfig(level=logging.DEBUG)
        logging.getLogger("httpx").setLevel(logging.DEBUG)
        logging.getLogger("httpcore").setLevel(logging.DEBUG)

    api_key = os.environ.get("CLIPROXYAPI_API_KEY") or (os.environ.get("CLIPROXYAPI_API_KEYS") or "").split(",")[0]
    if not api_key:
        print("Error: set CLIPROXYAPI_API_KEY or CLIPROXYAPI_API_KEYS env var", file=sys.stderr)
        sys.exit(1)

    if args.text:
        text = " ".join(args.text)
    elif not sys.stdin.isatty():
        text = sys.stdin.read().strip()
    else:
        print("Error: provide t
log function · typescript · L10-L12 (3 LOC)
scripts/bt-capture-setup.ts
function log(msg: string): void {
  console.log(`[bt-capture] ${msg}`);
}
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
logVerbose function · typescript · L14-L16 (3 LOC)
scripts/bt-capture-setup.ts
function logVerbose(msg: string): void {
  if (VERBOSE) console.log(`[bt-capture] ${msg}`);
}
logError function · typescript · L18-L20 (3 LOC)
scripts/bt-capture-setup.ts
function logError(msg: string): void {
  console.error(`[bt-capture] ERROR: ${msg}`);
}
run function · typescript · L22-L28 (7 LOC)
scripts/bt-capture-setup.ts
function run(cmd: string, { quiet = false } = {}): string {
  logVerbose(`exec: ${cmd}`);
  const result = execSync(cmd, { encoding: "utf-8" }).trim();
  if (result && !quiet) log(`output: ${result}`);
  if (result && quiet) logVerbose(`output: ${result}`);
  return result;
}
tryRun function · typescript · L30-L37 (8 LOC)
scripts/bt-capture-setup.ts
function tryRun(cmd: string, opts?: { quiet?: boolean }): string | null {
  try {
    return run(cmd, opts);
  } catch (e) {
    logError(`command failed: ${cmd} — ${e instanceof Error ? e.message : e}`);
    return null;
  }
}
promptChoice function · typescript · L39-L60 (22 LOC)
scripts/bt-capture-setup.ts
function promptChoice(addrs: string[]): string {
  console.log("\nMultiple BT devices found:");
  for (let i = 0; i < addrs.length; i++) {
    console.log(`  ${i + 1}) ${addrs[i]}`);
  }
  process.stdout.write("\nSelect device [1-9]: ");

  const buf = Buffer.alloc(1);
  const fd = openSync("/dev/tty", "r");
  readSync(fd, buf, 0, 1, null);
  closeSync(fd);

  const ch = buf.toString("utf-8").trim();
  const idx = parseInt(ch, 10) - 1;
  if (isNaN(idx) || idx < 0 || idx >= addrs.length) {
    logError(`Invalid selection: "${ch}"`);
    process.exit(1);
  }

  log(`Selected device ${idx + 1}: ${addrs[idx]}`);
  return addrs[idx];
}
sinkExists function · typescript · L86-L92 (7 LOC)
scripts/bt-capture-setup.ts
function sinkExists(): boolean {
  log("Checking if sink already exists...");
  const out = tryRun("pactl list sinks short");
  const exists = out?.includes(SINK_NAME) ?? false;
  log(`Sink "${SINK_NAME}" exists: ${exists}`);
  return exists;
}
createSink function · typescript · L94-L104 (11 LOC)
scripts/bt-capture-setup.ts
function createSink(): void {
  if (sinkExists()) {
    log(`Sink "${SINK_NAME}" already exists, reusing.`);
    return;
  }
  log(`Creating null sink "${SINK_NAME}"...`);
  run(
    `pactl load-module module-null-sink sink_name=${SINK_NAME} sink_properties=device.description=${SINK_DESC}`
  );
  log(`Created null sink "${SINK_NAME}".`);
}
waitForPorts function · typescript · L106-L126 (21 LOC)
scripts/bt-capture-setup.ts
function waitForPorts(
  nodeNames: string[],
  { timeout = 5000, interval = 200 } = {}
): boolean {
  log(`Waiting for ports to appear for: ${nodeNames.join(", ")} (timeout ${timeout}ms)...`);
  const deadline = Date.now() + timeout;
  while (Date.now() < deadline) {
    const lines = tryRun("pw-link -o", { quiet: true }) ?? "";
    const allFound = nodeNames.every((name) =>
      lines.split("\n").some((l) => l.startsWith(`${name}:`))
    );
    if (allFound) {
      log("All ports registered.");
      return true;
    }
    log(`Ports not ready yet, retrying in ${interval}ms...`);
    execSync(`sleep ${interval / 1000}`);
  }
  logError(`Timed out waiting for ports after ${timeout}ms.`);
  return false;
}
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
link function · typescript · L128-L134 (7 LOC)
scripts/bt-capture-setup.ts
function link(src: string, dst: string): void {
  log(`Linking: ${src} -> ${dst}`);
  const result = tryRun(`pw-link "${src}" "${dst}"`);
  if (result === null) {
    log(`Link may already exist (pw-link returned non-zero), continuing.`);
  }
}
getOutputPorts function · typescript · L136-L142 (7 LOC)
scripts/bt-capture-setup.ts
function getOutputPorts(nodeName: string): string[] {
  log(`Getting output ports for "${nodeName}"...`);
  const lines = run("pw-link -o", { quiet: true });
  const ports = lines.split("\n").filter((l) => l.startsWith(`${nodeName}:`));
  log(`Found ${ports.length} port(s) for "${nodeName}": ${ports.join(", ") || "none"}`);
  return ports;
}
setupLinks function · typescript · L144-L175 (32 LOC)
scripts/bt-capture-setup.ts
function setupLinks(device: { input: string; output: string }): void {
  log("Setting up PipeWire links...");
  const inputPorts = getOutputPorts(device.input);
  const allOutputPorts = getOutputPorts(device.output);
  const monitorPorts = allOutputPorts.filter((p) => p.includes("monitor_"));

  log(`Input ports (mic): ${inputPorts.length}`);
  log(`Output ports (headphone): ${allOutputPorts.length} total, ${monitorPorts.length} monitor`);

  if (inputPorts.length === 0) {
    logError("No input (mic) ports found — mic capture won't work.");
  }
  if (monitorPorts.length === 0) {
    logError("No monitor ports found — headphone capture won't work.");
  }

  // Link mic -> combined sink (mono mic goes to both channels)
  for (const port of inputPorts) {
    link(port, `${SINK_NAME}:playback_FL`);
    link(port, `${SINK_NAME}:playback_FR`);
    log(`Linked mic ${port} -> ${SINK_NAME}:playback_FL/FR`);
  }

  // Link headphone monitor -> combined sink
  for (const port of monitorPorts) {
startRecording function · typescript · L177-L202 (26 LOC)
scripts/bt-capture-setup.ts
function startRecording(outFile: string): void {
  log(`Starting recording to ${outFile}...`);
  log(`Target: ${SINK_NAME}.monitor`);
  console.log("Press Ctrl+C to stop.\n");

  const proc = spawn(
    "pw-record",
    ["--target", `${SINK_NAME}.monitor`, outFile],
    { stdio: "inherit" }
  );

  proc.on("error", (err) => {
    logError(`Failed to start pw-record: ${err.message}`);
  });

  process.on("SIGINT", () => {
    log("Received SIGINT, stopping recording...");
    proc.kill("SIGINT");
  });

  proc.on("exit", (code, signal) => {
    log(`pw-record exited with code=${code}, signal=${signal}`);
    console.log(`\nRecording saved to ${outFile}`);
    process.exit(code ?? 0);
  });
}
parseArgs function · typescript · L227-L252 (26 LOC)
scripts/bt-capture-setup.ts
function parseArgs(argv: string[]): Command {
  if (argv.includes("-h") || argv.includes("--help")) {
    return { kind: "help" };
  }

  if (argv.includes("teardown") || argv.includes("--teardown")) {
    return { kind: "teardown" };
  }

  if (argv.includes("status") || argv.includes("--status")) {
    return { kind: "status" };
  }

  const cmd: Command = { kind: "setup" };
  const recordIdx = argv.indexOf("--record");
  if (recordIdx !== -1) {
    const outFile = argv[recordIdx + 1];
    if (!outFile || outFile.startsWith("-")) {
      logError("--record requires an output file path");
      process.exit(1);
    }
    cmd.record = outFile;
  }

  return cmd;
}
status function · typescript · L272-L302 (31 LOC)
scripts/bt-capture-setup.ts
function status(): void {
  log("Checking status...");

  // Sink
  const hasSink = sinkExists();
  console.log(`\nSink "${SINK_NAME}": ${hasSink ? "ACTIVE" : "NOT FOUND"}`);

  // BT device
  const device = findBluezDevice();
  if (device) {
    console.log(`BT device input:  ${device.input}`);
    console.log(`BT device output: ${device.output}`);
  } else {
    console.log("BT device: NOT FOUND");
  }

  // Links involving our sink
  const allLinks = getLinks();
  const sinkLinks = allLinks.filter(
    (l) => l.src.includes(SINK_NAME) || l.dst.includes(SINK_NAME)
  );

  if (sinkLinks.length > 0) {
    console.log(`\nActive links (${sinkLinks.length}):`);
    for (const l of sinkLinks) {
      console.log(`  ${l.src} -> ${l.dst}`);
    }
  } else {
    console.log("\nNo active links to sink.");
  }
}
teardown function · typescript · L304-L316 (13 LOC)
scripts/bt-capture-setup.ts
function teardown(): void {
  log("Teardown requested...");
  const out = tryRun("pactl list modules short");
  const line = out?.split("\n").find((l) => l.includes(SINK_NAME));
  if (line) {
    const moduleId = line.split("\t")[0];
    log(`Found module ${moduleId}, unloading...`);
    run(`pactl unload-module ${moduleId}`);
    log(`Removed sink "${SINK_NAME}".`);
  } else {
    log(`Sink "${SINK_NAME}" not found, nothing to remove.`);
  }
}
setup function · typescript · L318-L345 (28 LOC)
scripts/bt-capture-setup.ts
function setup(record?: string): void {
  log("Starting BT capture setup...");

  const device = findBluezDevice();
  if (!device) {
    logError("No Bluetooth audio device found. Is it connected?");
    process.exit(1);
  }

  log(`Found BT device — input: ${device.input}, output: ${device.output}`);

  createSink();

  if (!waitForPorts([SINK_NAME, device.input, device.output])) {
    logError("Required ports did not appear in time. Aborting.");
    process.exit(1);
  }

  setupLinks(device);

  log("Setup complete.");
  console.log(`\nDone. Select "${SINK_DESC}" monitor as recording source in Audacity.`);
  console.log(`Or record from CLI: parecord -d ${SINK_NAME}.monitor --file-format=wav output.wav`);

  if (record) {
    startRecording(record);
  }
}
Repobility · severity-and-effort ranking · https://repobility.com
main function · typescript · L347-L367 (21 LOC)
scripts/bt-capture-setup.ts
function main(): void {
  const argv = process.argv.slice(2);
  VERBOSE = argv.includes("-v") || argv.includes("--verbose");
  const cmd = parseArgs(argv);
  log(`Command: ${cmd.kind}`);

  switch (cmd.kind) {
    case "help":
      console.log(USAGE);
      break;
    case "teardown":
      teardown();
      break;
    case "status":
      status();
      break;
    case "setup":
      setup(cmd.record);
      break;
  }
}
_CallbackHandler class · python · L21-L32 (12 LOC)
spotify/spotify.py
class _CallbackHandler(http.server.BaseHTTPRequestHandler):
    def do_GET(self):
        global auth_code
        params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
        auth_code = params.get("code", [None])[0]
        self.send_response(200 if auth_code else 400)
        self.send_header("Content-Type", "text/html")
        self.end_headers()
        self.wfile.write(b"<h1>Success! You can close this tab.</h1>" if auth_code else b"Authorization failed.")

    def log_message(self, *_):
        pass
do_GET method · python · L22-L29 (8 LOC)
spotify/spotify.py
    def do_GET(self):
        global auth_code
        params = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
        auth_code = params.get("code", [None])[0]
        self.send_response(200 if auth_code else 400)
        self.send_header("Content-Type", "text/html")
        self.end_headers()
        self.wfile.write(b"<h1>Success! You can close this tab.</h1>" if auth_code else b"Authorization failed.")
_token_request function · python · L35-L38 (4 LOC)
spotify/spotify.py
def _token_request(data: dict) -> dict:
    resp = httpx.post("https://accounts.spotify.com/api/token", data=data, auth=(CLIENT_ID, CLIENT_SECRET))
    resp.raise_for_status()
    return resp.json()
_save_token function · python · L41-L43 (3 LOC)
spotify/spotify.py
def _save_token(data):
    with open(TOKEN_FILE, "w") as f:
        json.dump(data, f)
_load_token function · python · L46-L51 (6 LOC)
spotify/spotify.py
def _load_token():
    try:
        with open(TOKEN_FILE) as f:
            return json.load(f)
    except FileNotFoundError:
        return None
authenticate function · python · L54-L88 (35 LOC)
spotify/spotify.py
def authenticate() -> httpx.Client:
    """Authenticate and return an httpx.Client with auth headers set."""
    global auth_code
    saved = _load_token()

    if saved and "refresh_token" in saved:
        print("Refreshing saved token...")
        try:
            token_data = _token_request({"grant_type": "refresh_token", "refresh_token": saved["refresh_token"]})
            token_data.setdefault("refresh_token", saved["refresh_token"])
            _save_token(token_data)
            print(f"Authorized (cached). Scopes: {token_data.get('scope', 'none')}")
            return _client(token_data["access_token"])
        except httpx.HTTPStatusError:
            print("Refresh failed, re-authenticating...")

    auth_url = (
        f"https://accounts.spotify.com/authorize?"
        f"client_id={CLIENT_ID}&response_type=code&redirect_uri={urllib.parse.quote(REDIRECT_URI)}"
        f"&scope={urllib.parse.quote(SCOPES)}&show_dialog=true"
    )
    print("Opening browser for Spotify autho
_client function · python · L91-L95 (5 LOC)
spotify/spotify.py
def _client(token: str) -> httpx.Client:
    return httpx.Client(
        base_url="https://api.spotify.com/v1",
        headers={"Authorization": f"Bearer {token}"},
    )
Same scanner, your repo: https://repobility.com — Repobility
search_track function · python · L98-L106 (9 LOC)
spotify/spotify.py
def search_track(client: httpx.Client, artist: str, title: str):
    for q in [f"artist:{artist} track:{title}", f"{artist} {title}"]:
        resp = client.get("/search", params={"q": q, "type": "track", "limit": 5})
        resp.raise_for_status()
        tracks = resp.json().get("tracks", {}).get("items", [])
        if tracks:
            t = tracks[0]
            return t["uri"], t["name"], t["artists"][0]["name"]
    return None, None, None
create_playlist function · python · L109-L113 (5 LOC)
spotify/spotify.py
def create_playlist(client: httpx.Client, name: str, description: str = "", public: bool = False):
    resp = client.post("/me/playlists", json={"name": name, "description": description, "public": public})
    resp.raise_for_status()
    playlist = resp.json()
    return playlist["id"], playlist["external_urls"]["spotify"]
add_tracks function · python · L116-L118 (3 LOC)
spotify/spotify.py
def add_tracks(client: httpx.Client, playlist_id: str, uris: list[str]):
    resp = client.post(f"/playlists/{playlist_id}/items", json={"uris": uris})
    resp.raise_for_status()
‹ prevpage 2 / 3next ›