← back to devrandom__YCluster

Function bodies 355 total

All specs Real LLM only Function bodies
determine_ip_from_hostname function · python · L119-L167 (49 LOC)
config/ansible/admin/files/app.py
def determine_ip_from_hostname(hostname, via_wg=False):
    """Generate deterministic IP based on hostname.

    via_wg=True puts the IP in the wg peer subnet (10.0.1.0/24) while
    preserving the per-type hostname numbering. A physical compute c3
    lands at 10.0.0.53; a wg-bootstrapped compute c3 lands at 10.0.1.53.
    """
    if not hostname:
        return None

    # Check if this is an AMT hostname (ends with 'a')
    is_amt = hostname.endswith('a')
    if is_amt:
        hostname = hostname[:-1]  # Strip trailing 'a' for parsing

    # Try multi-char prefixes first (e.g., 'nas'), then single-char
    prefix = None
    num = None
    for p in IP_RANGES:
        if hostname.startswith(p):
            try:
                num = int(hostname[len(p):])
                prefix = p
                break
            except ValueError:
                continue

    if prefix is None:
        return None

    config = IP_RANGES[prefix]

    # Validate number is within range
    if num <
determine_type_from_mac function · python · L169-L186 (18 LOC)
config/ansible/admin/files/app.py
def determine_type_from_mac(mac_address):
    """Determine machine type based on MAC address prefix.

    Used for PXE boot allocation. macOS nodes use explicit type parameter
    via /macos/bootstrap endpoint.
    """
    if not mac_address:
        return 'compute'

    # Normalize MAC address to lowercase and remove separators
    normalized_mac = mac_address.lower().replace(':', '').replace('-', '')

    # Check for storage prefix (58:47:ca becomes 5847ca)
    if normalized_mac.startswith('5847ca'):
        return 'storage'

    # Default to compute
    return 'compute'
get_next_hostname function · python · L188-L224 (37 LOC)
config/ansible/admin/files/app.py
def get_next_hostname(client, node_type):
    """Get the next available hostname for a node type"""
    prefixes = {
        'storage': 's',
        'compute': 'c',
        'macos': 'm',
        'nvidia': 'nv',
        'nas': 'nas',
        'dev': 'd',
    }
    
    prefix = prefixes.get(node_type, 'c')
    
    # Get all existing hostnames of this type
    existing_numbers = []
    for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/{prefix}"):
        if value:
            hostname = metadata.key.decode().split('/')[-1]
            try:
                num = int(hostname[len(prefix):])
                existing_numbers.append(num)
            except:
                pass
    
    # Find the next available number
    next_num = 1
    if existing_numbers:
        existing_numbers.sort()
        # Find first gap or use max+1
        for i, num in enumerate(existing_numbers):
            if num != i + 1:
                next_num = i + 1
                break
        else
get_or_create_allocation function · python · L226-L312 (87 LOC)
config/ansible/admin/files/app.py
def get_or_create_allocation(mac_address, node_type=None, via_wg=False):
    """Get existing allocation or create new one for non-normalized MAC address.

    Args:
        mac_address: MAC address (any format)
        node_type: Optional type override ('storage', 'compute', 'macos').
                   If not provided, type is detected from MAC address.
                   If provided and different from existing, the allocation is updated.
        via_wg:    Transport marker — WG-bootstrapped nodes get their IP
                   from 10.0.1.0/24 instead of 10.0.0.0/24. Preserved in
                   the allocation record so list/lookup stays consistent.
    """
    client = get_etcd_client()
    normalized_mac = mac_address.lower().replace(':', '').replace('-', '')

    # Check if allocation already exists
    existing_data = client.get(f"{ETCD_PREFIX}/by-mac/{normalized_mac}")
    if existing_data[0]:
        data = json.loads(existing_data[0].decode())
        existing_via_wg = boo
allocate_hostname function · python · L315-L343 (29 LOC)
config/ansible/admin/files/app.py
def allocate_hostname():
    """Allocate a new hostname based on MAC address.

    Query parameters:
        mac: MAC address (required)
        type: Optional type override ('storage', 'compute', 'macos')
    """
    mac_address = request.args.get('mac')
    node_type = request.args.get('type')

    if not mac_address:
        return jsonify({'error': 'MAC address is required'}), 400

    if node_type and node_type not in ('storage', 'compute', 'macos', 'nas', 'nvidia', 'dev'):
        return jsonify({'error': f'Invalid type: {node_type}'}), 400

    try:
        allocation = get_or_create_allocation(mac_address, node_type=node_type)

        return jsonify({
            'hostname': allocation['hostname'],
            'type': allocation['type'],
            'ip': allocation['ip'],
            'amt_ip': allocation['amt_ip'],
            'mac': mac_address,
            'existing': True  # Always true since we return existing or newly created
        })
    except Exception as e:
       
wg_register function · python · L346-L398 (53 LOC)
config/ansible/admin/files/app.py
def wg_register():
    """Allocate a hostname+IP (as a cluster node of the requested type)
    and register a WireGuard peer pubkey for it in one atomic call.

    This endpoint is public (exposed via the admin-api reverse proxy for
    remote-node onboarding), so it validates input carefully and does not
    leak details of unrelated allocations.

    Body (JSON): {mac, type, pubkey}
      mac:    client MAC (any format)
      type:   one of storage|compute|macos|nas|nvidia (default: compute)
      pubkey: base64-encoded 32-byte WireGuard public key
    Returns: {hostname, ip, status, pubkey_sha256}
    """
    import base64
    from ycluster.utils import wg_config

    data = request.get_json(silent=True) or {}
    mac = data.get('mac')
    node_type = data.get('type') or 'compute'
    pubkey = data.get('pubkey')

    if not mac or not pubkey:
        return jsonify({'error': 'mac and pubkey are required'}), 400
    if node_type not in ('storage', 'compute', 'macos', 'nas', 'nvidia',
wg_poll function · python · L402-L423 (22 LOC)
config/ansible/admin/files/app.py
def wg_poll(hostname):
    """Poll for approval status. Requires fp query param (pubkey_sha256)
    to prevent unauthenticated enumeration of peer configs."""
    from ycluster.utils import wg_config

    fp = request.args.get('fp')
    if not fp:
        return jsonify({'error': 'fp query parameter is required'}), 400

    peer = wg_config.get_peer(hostname)
    if not peer:
        return jsonify({'error': 'no such peer'}), 404
    if peer.get('pubkey_sha256') != fp:
        return jsonify({'error': 'fingerprint mismatch'}), 403

    resp = {'status': peer['status']}
    if peer['status'] == 'approved':
        try:
            resp['config'] = wg_config.render_client_config(hostname)
        except Exception as e:
            return jsonify({'error': f'render failed: {e}'}), 500
    return jsonify(resp)
Open data scored by Repobility · https://repobility.com
status function · python · L427-L446 (20 LOC)
config/ansible/admin/files/app.py
def status():
    """Get current allocation counts by type"""
    try:
        client = get_etcd_client()
    except Exception as e:
        return jsonify({'error': f'etcd connection failed: {str(e)}'}), 503
    
    counts = {'storage': 0, 'compute': 0, 'macos': 0}
    
    # Count allocations by type
    for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
        if value:
            try:
                allocation = json.loads(value.decode())
                node_type = allocation.get('type', 'compute')
                counts[node_type] = counts.get(node_type, 0) + 1
            except:
                pass
    
    return jsonify(counts)
allocations function · python · L449-L477 (29 LOC)
config/ansible/admin/files/app.py
def allocations():
    """Get all current allocations"""
    try:
        client = get_etcd_client()
    except Exception as e:
        return jsonify({'error': f'etcd connection failed: {str(e)}'}), 503
    
    allocations = []
    
    # Get all allocations from by-hostname (to avoid duplicates)
    for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
        if value:
            try:
                allocation = json.loads(value.decode())
                allocations.append({
                    'mac': allocation['mac'],
                    'hostname': allocation['hostname'],
                    'type': allocation['type'],
                    'ip': allocation['ip'],
                    'allocated_at': allocation.get('allocated_at', 0),
                    'disabled': allocation.get('disabled', False)
                })
            except:
                pass
    
    # Sort by hostname
    allocations.sort(key=lambda x: (x['type'], int(x['hostname'][1:]) if x['
disable_host function · python · L481-L501 (21 LOC)
config/ansible/admin/files/app.py
def disable_host(hostname):
    """Disable a host so it doesn't appear in status page"""
    try:
        client = get_etcd_client()
    except Exception as e:
        return jsonify({'error': f'etcd connection failed: {str(e)}'}), 503

    try:
        result = client.get(f"{ETCD_PREFIX}/by-hostname/{hostname}")
        if not result[0]:
            return jsonify({'error': 'Host not found'}), 404

        allocation = json.loads(result[0].decode())
        allocation['disabled'] = True

        client.put(f"{ETCD_PREFIX}/by-hostname/{hostname}", json.dumps(allocation))
        client.put(f"{ETCD_PREFIX}/by-mac/{allocation['mac']}", json.dumps(allocation))

        return jsonify({'status': 'ok', 'hostname': hostname, 'disabled': True})
    except Exception as e:
        return jsonify({'error': str(e)}), 500
enable_host function · python · L505-L525 (21 LOC)
config/ansible/admin/files/app.py
def enable_host(hostname):
    """Re-enable a host so it appears in status page"""
    try:
        client = get_etcd_client()
    except Exception as e:
        return jsonify({'error': f'etcd connection failed: {str(e)}'}), 503

    try:
        result = client.get(f"{ETCD_PREFIX}/by-hostname/{hostname}")
        if not result[0]:
            return jsonify({'error': 'Host not found'}), 404

        allocation = json.loads(result[0].decode())
        allocation['disabled'] = False

        client.put(f"{ETCD_PREFIX}/by-hostname/{hostname}", json.dumps(allocation))
        client.put(f"{ETCD_PREFIX}/by-mac/{allocation['mac']}", json.dumps(allocation))

        return jsonify({'status': 'ok', 'hostname': hostname, 'disabled': False})
    except Exception as e:
        return jsonify({'error': str(e)}), 500
get_dhcp_config function · python · L529-L556 (28 LOC)
config/ansible/admin/files/app.py
def get_dhcp_config():
    """Generate DHCP configuration from etcd allocations"""
    try:
        client = get_etcd_client()
    except Exception as e:
        return f"# etcd connection failed: {str(e)}\n", 503
    
    dhcp_config = []
    
    # Get all allocations
    for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
        if value:
            try:
                allocation = json.loads(value.decode())
                mac = allocation['mac']
                hostname = allocation['hostname']
                ip = allocation['ip']
                
                # Convert normalized MAC back to colon format
                mac_formatted = ':'.join(mac[i:i+2] for i in range(0, 12, 2))
                dhcp_config.append(f"dhcp-host={mac_formatted},{hostname},{ip},infinite")
            except:
                pass
    
    if dhcp_config:
        return '\n'.join(sorted(dhcp_config)) + '\n', 200, {'Content-Type': 'text/plain'}
    else:
        return "# No
get_hosts function · python · L559-L601 (43 LOC)
config/ansible/admin/files/app.py
def get_hosts():
    """Generate hosts file format from etcd allocations"""
    try:
        client = get_etcd_client()
    except Exception as e:
        return f"# etcd connection failed: {str(e)}\n", 503
    
    hosts_entries = []
    
    # Get all allocations
    for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
        if value:
            try:
                allocation = json.loads(value.decode())
                hostname = allocation['hostname']
                ip = allocation['ip']
                
                # Skip AMT hostnames registered as nodes - they get correct
                # entries auto-generated from the base node below
                if hostname.endswith('a') and determine_ip_from_hostname(hostname):
                    continue
                
                # Add main hostname entry
                hosts_entries.append(f"{ip} {hostname} {hostname}.xc")
                
                # Add AMT hostname entry if this is a regul
check_service_status function · python · L603-L610 (8 LOC)
config/ansible/admin/files/app.py
def check_service_status(service_name):
    """Check if a systemd service is active"""
    try:
        result = subprocess.run(['systemctl', 'is-active', service_name], 
                              capture_output=True, text=True, timeout=5)
        return result.stdout.strip() == 'active'
    except:
        return False
check_port_open function · python · L612-L621 (10 LOC)
config/ansible/admin/files/app.py
def check_port_open(host, port, timeout=3):
    """Check if a port is open on a host"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        sock.close()
        return result == 0
    except:
        return False
Repobility analyzer · published findings · https://repobility.com
check_ceph_status function · python · L623-L643 (21 LOC)
config/ansible/admin/files/app.py
def check_ceph_status():
    """Check Ceph cluster health"""
    try:
        result = subprocess.run(['ceph', 'health'], 
                              capture_output=True, text=True, timeout=10)
        if result.returncode == 0:
            health_output = result.stdout.strip()
            if 'HEALTH_OK' in health_output:
                status = 'healthy'
            elif 'HEALTH_ERR' in health_output:
                status = 'unhealthy'
            else:
                status = 'degraded'
            return {
                'status': status,
                'details': health_output
            }
        else:
            return {'status': 'error', 'details': result.stderr.strip()}
    except:
        return {'status': 'unavailable', 'details': 'ceph command failed'}
check_dns_status function · python · L645-L701 (57 LOC)
config/ansible/admin/files/app.py
def check_dns_status():
    """Check DNS (dnsmasq) service and functionality"""
    try:
        # Check if dnsmasq service is running
        service_running = check_service_status('dnsmasq')
        
        # Test local DNS server directly using dnspython
        dns_working = False
        dns_details = "DNS query failed"
        
        try:
            # Create a resolver that queries the local DNS server directly
            resolver = dns.resolver.Resolver()
            resolver.nameservers = ['127.0.0.1']
            resolver.timeout = 3
            resolver.lifetime = 5
            
            # Query local hostname A record
            local_hostname = platform.node()
            answer = resolver.resolve(local_hostname, 'A')
            if answer:
                resolved_ips = [str(rdata) for rdata in answer]
                dns_working = True
                dns_details = f"Local DNS server responding ({local_hostname} -> {', '.join(resolved_ips)})"
            else:
  
check_certificate_expiry function · python · L703-L764 (62 LOC)
config/ansible/admin/files/app.py
def check_certificate_expiry():
    """Check TLS certificate expiry from etcd"""
    try:
        client = get_etcd_client()
        cert_value, _ = client.get('/cluster/tls/cert')
        
        if not cert_value:
            return {
                'status': 'not_configured',
                'details': {
                    'message': 'No certificate found in etcd',
                    'days_until_expiry': None,
                    'expires_at': None
                }
            }
        
        # Parse the certificate
        cert_pem = cert_value.decode()
        cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend())
        
        # Get expiry date
        expires_at = cert.not_valid_after
        now = datetime.now(UTC).replace(tzinfo=None)  # Remove timezone for comparison
        
        # Calculate days until expiry
        time_until_expiry = expires_at - now
        days_until_expiry = time_until_expiry.days
        
        # Determine status b
check_clock_skew function · python · L766-L816 (51 LOC)
config/ansible/admin/files/app.py
def check_clock_skew():
    """Check clock skew using NTP protocol to VIP"""

    # NTP server to check against (VIP)
    ntp_server = '10.0.0.254'

    try:
        # Create NTP client
        client = ntplib.NTPClient()
        
        # Make NTP request (this is a lightweight UDP request)
        response = client.request(ntp_server, version=3, timeout=2)
        
        # Get offset in milliseconds
        offset_ms = response.offset * 1000
        
        # Determine status based on offset
        if abs(offset_ms) > 1000:  # More than 1 second
            status = 'critical'
        elif abs(offset_ms) > 100:  # More than 100ms
            status = 'warning'
        else:
            status = 'healthy'
        
        return {
            'status': status,
            'details': {
                'offset_ms': round(offset_ms, 3),
                'ntp_server': ntp_server,
                'stratum': response.stratum,
                'precision': response.precision,
            
check_docker_daemon function · python · L818-L870 (53 LOC)
config/ansible/admin/files/app.py
def check_docker_daemon():
    """Check Docker daemon status and functionality"""
    try:
        # Check if Docker service is running
        docker_service_running = check_service_status('docker')
        
        # Check if Docker socket is accessible
        docker_socket_accessible = False
        docker_version = None
        docker_error = None
        
        if docker_service_running:
            try:
                # Test Docker daemon connectivity
                result = subprocess.run(['docker', 'version', '--format', '{{.Server.Version}}'], 
                                      capture_output=True, text=True, timeout=10)
                if result.returncode == 0:
                    docker_socket_accessible = True
                    docker_version = result.stdout.strip()
                else:
                    docker_error = result.stderr.strip()
            except subprocess.TimeoutExpired:
                docker_error = 'Docker command timeout'
            except
check_docker_registry function · python · L872-L956 (85 LOC)
config/ansible/admin/files/app.py
def check_docker_registry():
    """Check Docker registry status and functionality"""
    try:
        # Check if registry service is running
        registry_service_running = check_service_status('docker-registry')
        
        # Also check if registry container is running directly
        registry_container_running = False

        # Check if registry port is open (try both localhost and storage VIP)
        registry_port_open = check_port_open('localhost', 5000)

        # Test registry health endpoint
        registry_healthy = False
        registry_error = None
        registry_version = None
        
        if registry_port_open:
            # Try both localhost and VIP endpoints
            test_urls = ['http://localhost:5000/v2/', 'http://10.0.0.100:5000/v2/']
            for url in test_urls:
                try:
                    health_response = requests.get(url, timeout=5)
                    if health_response.status_code == 200:
                        registry_
check_tang_service function · python · L958-L1025 (68 LOC)
config/ansible/admin/files/app.py
def check_tang_service():
    """Check Tang server status and functionality"""
    try:
        # Check if Tang service is running
        tang_service_running = check_service_status('tang-server.service')
        
        # Check if Tang port is open
        tang_port_open = check_port_open('localhost', 8777)
        
        # Test Tang advertisement endpoint
        tang_healthy = False
        tang_error = None
        tang_keys = None
        
        if tang_port_open:
            try:
                adv_response = requests.get('http://localhost:8777/adv', timeout=5)
                if adv_response.status_code == 200:
                    tang_healthy = True
                    # Try to parse the advertisement to count keys
                    try:
                        adv_data = adv_response.json()
                        if isinstance(adv_data, dict) and 'keys' in adv_data:
                            tang_keys = len(adv_data['keys'])
                        else:
          
check_secrets_mount function · python · L1027-L1087 (61 LOC)
config/ansible/admin/files/app.py
def check_secrets_mount():
    """Check if /secrets is mounted"""
    try:
        # Check if /secrets is mounted
        result = subprocess.run(['mountpoint', '-q', '/secrets'], 
                              capture_output=True, text=True, timeout=5)
        is_mounted = result.returncode == 0
        
        # Get mount details if mounted
        mount_details = None
        if is_mounted:
            try:
                mount_result = subprocess.run(['findmnt', '-n', '-o', 'SOURCE,FSTYPE,OPTIONS', '/secrets'], 
                                            capture_output=True, text=True, timeout=5)
                if mount_result.returncode == 0:
                    mount_details = mount_result.stdout.strip()
            except:
                pass
        
        # Check if secrets directory exists and is accessible
        secrets_accessible = False
        secrets_error = None
        try:
            if os.path.exists('/secrets') and os.path.isdir('/secrets'):
              
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
check_open_webui function · python · L1089-L1167 (79 LOC)
config/ansible/admin/files/app.py
def check_open_webui():
    """Check Open-WebUI service status and functionality"""
    try:
        # Check if Open-WebUI service is running
        webui_service_running = check_service_status('open-webui')
        
        # Check if Open-WebUI port is open
        webui_port_open = check_port_open('localhost', 8380)
        
        # Test Open-WebUI health endpoint
        webui_healthy = False
        webui_error = None
        webui_version = None
        
        if webui_port_open:
            try:
                # Try health check endpoint
                health_response = requests.get('http://localhost:8380/health', timeout=5)
                if health_response.status_code == 200:
                    webui_healthy = True
                    try:
                        health_data = health_response.json()
                        webui_version = health_data.get('version', 'unknown')
                    except:
                        webui_version = 'unknown'
               
is_storage_leader function · python · L1169-L1179 (11 LOC)
config/ansible/admin/files/app.py
def is_storage_leader():
    """Check if this node is the current storage leader"""
    try:
        client = get_etcd_client()
        result = client.get('/cluster/leader/app')
        if result[0]:
            leader = result[0].decode()
            return leader == platform.node()
        return False
    except:
        return False
is_dhcp_leader function · python · L1181-L1191 (11 LOC)
config/ansible/admin/files/app.py
def is_dhcp_leader():
    """Check if this node is the current DHCP leader"""
    try:
        client = get_etcd_client()
        result = client.get('/cluster/leader/dhcp')
        if result[0]:
            leader = result[0].decode()
            return leader == platform.node()
        return False
    except:
        return False
is_node_drained function · python · L1193-L1201 (9 LOC)
config/ansible/admin/files/app.py
def is_node_drained():
    """Check if this node is drained"""
    try:
        hostname = platform.node()
        client = get_etcd_client()
        result = client.get(f'/cluster/nodes/{hostname}/drain')
        return result[0] is not None and result[0].decode() == 'true'
    except:
        return False
get_current_node_type function · python · L1203-L1214 (12 LOC)
config/ansible/admin/files/app.py
def get_current_node_type():
    """Determine the current node type based on hostname prefix"""
    hostname = platform.node()
    if hostname and len(hostname) > 0:
        prefix = hostname[0]
        if prefix == 's':
            return 'storage'
        elif prefix == 'c':
            return 'compute'
        elif prefix == 'm':
            return 'macos'
    return 'unknown'
check_service_conditionally function · python · L1216-L1241 (26 LOC)
config/ansible/admin/files/app.py
def check_service_conditionally(health_status, service_name, check_func, required_on_storage_only=True):
    """
    Helper function to conditionally check a service based on node type.
    
    Args:
        health_status: The health status dict to update
        service_name: Name of the service being checked
        check_func: Function to call to check the service (should return a dict with 'status' key)
        required_on_storage_only: If True, service is only checked on storage nodes
    """
    is_storage_node = get_current_node_type() == 'storage'
    
    if not required_on_storage_only or is_storage_node:
        service_result = check_func()
        health_status['services'][service_name] = service_result
        
        # Update overall health based on service status
        if service_result['status'] in ['unhealthy', 'error']:
            health_status['overall'] = 'unhealthy'
        elif service_result['status'] == 'degraded' and health_status['overall'] == 'healthy':
drain_node function · python · L1244-L1252 (9 LOC)
config/ansible/admin/files/app.py
def drain_node():
    """Drain this node - disable leader election"""
    try:
        hostname = platform.node()
        client = get_etcd_client()
        client.put(f'/cluster/nodes/{hostname}/drain', 'true')
        return jsonify({'status': 'drained', 'hostname': hostname})
    except Exception as e:
        return jsonify({'error': f'Failed to drain node: {str(e)}'}), 500
undrain_node function · python · L1255-L1263 (9 LOC)
config/ansible/admin/files/app.py
def undrain_node():
    """Undrain this node - re-enable leader election"""
    try:
        hostname = platform.node()
        client = get_etcd_client()
        client.delete(f'/cluster/nodes/{hostname}/drain')
        return jsonify({'status': 'active', 'hostname': hostname})
    except Exception as e:
        return jsonify({'error': f'Failed to undrain node: {str(e)}'}), 500
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
drain_target_node function · python · L1266-L1273 (8 LOC)
config/ansible/admin/files/app.py
def drain_target_node(target_hostname):
    """Drain a specific node - disable leader election"""
    try:
        client = get_etcd_client()
        client.put(f'/cluster/nodes/{target_hostname}/drain', 'true')
        return jsonify({'status': 'drained', 'hostname': target_hostname})
    except Exception as e:
        return jsonify({'error': f'Failed to drain node {target_hostname}: {str(e)}'}), 500
undrain_target_node function · python · L1276-L1283 (8 LOC)
config/ansible/admin/files/app.py
def undrain_target_node(target_hostname):
    """Undrain a specific node - re-enable leader election"""
    try:
        client = get_etcd_client()
        client.delete(f'/cluster/nodes/{target_hostname}/drain')
        return jsonify({'status': 'active', 'hostname': target_hostname})
    except Exception as e:
        return jsonify({'error': f'Failed to undrain node {target_hostname}: {str(e)}'}), 500
drain_status function · python · L1286-L1295 (10 LOC)
config/ansible/admin/files/app.py
def drain_status():
    """Check drain status of this node"""
    try:
        hostname = platform.node()
        client = get_etcd_client()
        result = client.get(f'/cluster/nodes/{hostname}/drain')
        is_drained = result[0] is not None and result[0].decode() == 'true'
        return jsonify({'hostname': hostname, 'drained': is_drained})
    except Exception as e:
        return jsonify({'error': f'Failed to check drain status: {str(e)}'}), 500
drain_status_target function · python · L1298-L1306 (9 LOC)
config/ansible/admin/files/app.py
def drain_status_target(target_hostname):
    """Check drain status of a specific node"""
    try:
        client = get_etcd_client()
        result = client.get(f'/cluster/nodes/{target_hostname}/drain')
        is_drained = result[0] is not None and result[0].decode() == 'true'
        return jsonify({'hostname': target_hostname, 'drained': is_drained})
    except Exception as e:
        return jsonify({'error': f'Failed to check drain status for {target_hostname}: {str(e)}'}), 500
ping function · python · L1309-L1311 (3 LOC)
config/ansible/admin/files/app.py
def ping():
    """Simple ping endpoint for connectivity testing"""
    return jsonify({'status': 'ok', 'timestamp': datetime.now(UTC).isoformat()})
get_time function · python · L1314-L1316 (3 LOC)
config/ansible/admin/files/app.py
def get_time():
    """Get current timestamp for clock synchronization checks"""
    return jsonify({'timestamp': time.time()})
get_mac_from_ip function · python · L1318-L1363 (46 LOC)
config/ansible/admin/files/app.py
def get_mac_from_ip(client_ip):
    """
    Look up MAC address from IP address using DHCP leases in etcd or neighbor table.
    Return value is non-normalized MAC address (xx:xx:xx:xx:xx:xx) or None if not found.
    """
    if not client_ip:
        return None
    
    client = get_etcd_client()

    # Look through DHCP leases in etcd
    for value, metadata in client.get_prefix('/cluster/dhcp/leases/'):
        if value:
            try:
                lease_data = json.loads(value.decode())
                if lease_data.get('ip') == client_ip:
                    # Return non-normalized MAC (with colons) from lease data
                    return lease_data.get('mac')
            except json.JSONDecodeError:
                # Skip non-JSON entries in the dhcp prefix
                continue

    print("fallback to neighbor table", client_ip, file=sys.stderr)

    # Fallback to neighbor table lookup using ip --json neigh
    result = None
    try:
        result = subprocess.run([
serve_meta_data function · python · L1366-L1368 (3 LOC)
config/ansible/admin/files/app.py
def serve_meta_data():
    """Serve empty meta-data for autoinstall"""
    return "", 200, {'Content-Type': 'text/plain'}
Open data scored by Repobility · https://repobility.com
serve_user_data function · python · L1371-L1452 (82 LOC)
config/ansible/admin/files/app.py
def serve_user_data():
    """Serve dynamically rendered user-data based on client MAC address"""
    # Get MAC from query param (for Docker/dev) or look up from client IP (prod)
    mac_address = request.args.get('mac')
    client_ip = request.environ.get('REMOTE_ADDR') or request.remote_addr

    if mac_address:
        # Normalize MAC format (GRUB uses xx:xx:xx:xx:xx:xx)
        mac_address = mac_address.lower().replace('-', ':')
        print(f"MAC from query param: {mac_address}", file=sys.stderr)
    else:
        # Fall back to IP lookup
        mac_address = get_mac_from_ip(client_ip)
        if not mac_address:
            return f"MAC address not found for client IP {client_ip}", 400
        print(f"MAC from IP lookup: {client_ip} -> {mac_address}", file=sys.stderr)
    
    # Get or create allocation for this MAC address
    allocation_data = get_or_create_allocation(mac_address)
    
    # Use allocation data
    node_type = allocation_data['type']
    hostname = allocation
serve_bootstrap_index function · python · L1463-L1484 (22 LOC)
config/ansible/admin/files/app.py
def serve_bootstrap_index():
    """Serve bootstrap usage hints"""
    api_server = f"http://{request.host}"
    text = f"""YCluster Bootstrap

Available types:
  macos  - macOS compute nodes
  nas    - Ubuntu-based NAS devices
  nvidia - Ubuntu-based Nvidia GPU servers
  wg     - WireGuard overlay client (remote node joining over the internet)

Usage:
  curl {api_server}/bootstrap/<type> | sudo bash

Examples:
  curl {api_server}/bootstrap/macos | sudo bash
  curl {api_server}/bootstrap/nas | sudo bash
  curl {api_server}/bootstrap/nvidia | sudo bash
  curl {api_server}/bootstrap/wg | sudo bash -s -- --type compute
  curl {api_server}/bootstrap/wg | sudo bash -s -- --dev
"""
    return text, 200, {'Content-Type': 'text/plain'}
serve_bootstrap function · python · L1488-L1532 (45 LOC)
config/ansible/admin/files/app.py
def serve_bootstrap(node_type):
    """Serve bootstrap script for a given node type"""
    if node_type not in BOOTSTRAP_TEMPLATES:
        return jsonify({'error': f'Unknown bootstrap type: {node_type}', 'available': list(BOOTSTRAP_TEMPLATES.keys())}), 404

    # Determine the API server URL. Remote (wg) bootstrap must use the
    # public admin subdomain (reverse proxy exposes only a whitelist of
    # paths there). The WG tunnel endpoint is separate and rendered into
    # the client wg config itself. Local node bootstraps (macos/nas/nvidia)
    # keep using the host header.
    if node_type == 'wg':
        try:
            etcd = get_etcd_client()
            domain_value, _ = etcd.get('/cluster/https/domain')
        except Exception as e:
            return jsonify({'error': f'etcd lookup failed: {e}'}), 503
        if not domain_value:
            return jsonify({
                'error': 'wg bootstrap requires /cluster/https/domain to be set '
                         '(run `y
prometheus_metrics function · python · L1536-L1602 (67 LOC)
config/ansible/admin/files/app.py
def prometheus_metrics():
    """Prometheus metrics endpoint"""
    try:
        # Get health data
        health_data = get_comprehensive_health()
        
        metrics = []
        
        # Overall health metric
        overall_value = 1 if health_data['overall'] == 'healthy' else 0
        metrics.append(f'ycluster_node_healthy{{node="{platform.node()}"}} {overall_value}')
        
        # Service health metrics
        for service, details in health_data.get('services', {}).items():
            status = details.get('status')
            if status == 'healthy':
                service_value = 0
            elif status == 'degraded':
                service_value = 1
            elif status == 'unhealthy':
                service_value = 2
            elif status == 'standby':
                service_value = 3
            else:
                service_value = 2
            metrics.append(f'ycluster_service_health{{node="{platform.node()}",service="{service}"}} {service_value}'
get_comprehensive_health function · python · L1604-L1985 (382 LOC)
config/ansible/admin/files/app.py
def get_comprehensive_health():
    """Get comprehensive health data (extracted from health() function)"""
    health_status = {
        'overall': 'healthy',
        'services': {}
    }
    
    # Check etcd
    try:
        client = get_etcd_client()
        client.get('/test')
        health_status['services']['etcd'] = {'status': 'healthy', 'details': 'connected'}
    except Exception as e:
        health_status['services']['etcd'] = {'status': 'unhealthy', 'details': str(e)}
        health_status['overall'] = 'unhealthy'
    
    # Determine current node type
    current_node_type = get_current_node_type()
    is_storage_node = current_node_type == 'storage'
    
    # Check Ceph storage (only on storage nodes)
    if is_storage_node:
        ceph_health = check_ceph_status()
        health_status['services']['ceph'] = ceph_health
        if ceph_health['status'] not in ['healthy', 'degraded']:
            health_status['overall'] = 'unhealthy'
    else:
        health_status['se
health function · python · L1988-L1994 (7 LOC)
config/ansible/admin/files/app.py
def health():
    """Comprehensive health check endpoint for all services"""
    health_status = get_comprehensive_health()

    # Return appropriate HTTP status code
    status_code = 200 if health_status['overall'] == 'healthy' else 503
    return jsonify(health_status), status_code
alert_webhook function · python · L1997-L2017 (21 LOC)
config/ansible/admin/files/app.py
def alert_webhook():
    """Webhook endpoint for Alertmanager notifications"""
    try:
        alert_data = request.get_json()
        
        # Log the alert
        for alert in alert_data.get('alerts', []):
            status = alert.get('status', 'unknown')
            alertname = alert.get('labels', {}).get('alertname', 'unknown')
            severity = alert.get('labels', {}).get('severity', 'unknown')
            node = alert.get('labels', {}).get('node', 'unknown')
            
            print(f"Alert {status}: {alertname} (severity: {severity}, node: {node})")
        
        # Here you could integrate with external notification systems
        # For now, just acknowledge receipt
        return jsonify({'status': 'received'}), 200
        
    except Exception as e:
        print(f"Error processing alert webhook: {e}")
        return jsonify({'error': str(e)}), 500
get_all_hosts function · python · L2019-L2056 (38 LOC)
config/ansible/admin/files/app.py
def get_all_hosts():
    """Get all hosts from etcd allocations"""
    try:
        client = get_etcd_client()
        hosts = []
        
        # Get all allocations from by-hostname
        for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
            if value:
                try:
                    allocation = json.loads(value.decode())
                    hostname = allocation['hostname']
                    
                    # Skip AMT interfaces (hostnames ending with 'a')
                    if hostname.endswith('a'):
                        continue
                    
                    # Skip dynamic IP allocations (hostnames that don't match expected patterns)
                    # Expected patterns: s1-s20, c1-c20, m1-m20
                    if not (hostname.startswith(('s', 'c', 'm')) and 
                            len(hostname) > 1 and 
                            hostname[1:].isdigit()):
                        continue

               
Repobility analyzer · published findings · https://repobility.com
get_host_health function · python · L2058-L2072 (15 LOC)
config/ansible/admin/files/app.py
def get_host_health(host_ip, timeout=10):
    """Get health status from a specific host"""
    try:
        response = requests.get(f"http://{host_ip}:12723/api/health", timeout=timeout)
        if response.status_code in [200, 503]:
            # Both 200 (healthy) and 503 (unhealthy) contain valid health data
            return response.json()
        else:
            return {'overall': 'error', 'services': {}, 'error': f'HTTP {response.status_code}'}
    except requests.exceptions.Timeout:
        return {'overall': 'timeout', 'services': {}, 'error': 'Request timeout'}
    except requests.exceptions.ConnectionError:
        return {'overall': 'unreachable', 'services': {}, 'error': 'Connection failed'}
    except Exception as e:
        return {'overall': 'error', 'services': {}, 'error': str(e)}
check_vip_status function · python · L2074-L2148 (75 LOC)
config/ansible/admin/files/app.py
def check_vip_status():
    """Check VIP status using keepalived and ip commands"""
    gateway_vip_ip = '10.0.0.254'
    storage_vip_ip = '10.0.0.100'
    vip_status = {
        'gateway_vip': {
            'ip': gateway_vip_ip,
            'active': False,
            'master': None,
            'interface': None
        },
        'storage_vip': {
            'ip': storage_vip_ip,
            'active': False,
            'master': None,
            'interface': None
        }
    }
    
    # Check gateway VIP
    try:
        # Use 'ip -j addr show to <vip>' to get JSON output for reliable parsing
        result = subprocess.run(['ip', '-j', 'addr', 'show', 'to', gateway_vip_ip], 
                              capture_output=True, text=True, timeout=5)
        if result.returncode == 0 and result.stdout.strip():
            # Parse JSON output - if there's any output, VIP is active on this node
            interfaces = json.loads(result.stdout)
            if interfaces:
          
get_cluster_vip_status function · python · L2150-L2232 (83 LOC)
config/ansible/admin/files/app.py
def get_cluster_vip_status(host_health):
    """Get VIP status across all cluster nodes from existing health data"""
    vip_info = {
        'gateway_vip': {
            'ip': '10.0.0.254',
            'active_on': None,
            'master_hostname': None,
            'interface': None
        },
        'storage_vip': {
            'ip': '10.0.0.100',
            'active_on': None,
            'master_hostname': None,
            'interface': None
        },
        'keepalived_nodes': []  # Single list for keepalived status across all nodes
    }
    
    # Get all hosts to find core nodes
    all_hosts = get_all_hosts()
    
    # Process only core nodes (where keepalived runs and VIPs can be active)
    for core_node in CORE_NODES:
        # Find the core node in all_hosts to get its IP
        core_host = next((host for host in all_hosts if host['hostname'] == core_node), None)
        if not core_host:
            # Core node not found in allocations - add as missing
          
page 1 / 8next ›