Function bodies 355 total
determine_ip_from_hostname function · python · L119-L167 (49 LOC)config/ansible/admin/files/app.py
def determine_ip_from_hostname(hostname, via_wg=False):
"""Generate deterministic IP based on hostname.
via_wg=True puts the IP in the wg peer subnet (10.0.1.0/24) while
preserving the per-type hostname numbering. A physical compute c3
lands at 10.0.0.53; a wg-bootstrapped compute c3 lands at 10.0.1.53.
"""
if not hostname:
return None
# Check if this is an AMT hostname (ends with 'a')
is_amt = hostname.endswith('a')
if is_amt:
hostname = hostname[:-1] # Strip trailing 'a' for parsing
# Try multi-char prefixes first (e.g., 'nas'), then single-char
prefix = None
num = None
for p in IP_RANGES:
if hostname.startswith(p):
try:
num = int(hostname[len(p):])
prefix = p
break
except ValueError:
continue
if prefix is None:
return None
config = IP_RANGES[prefix]
# Validate number is within range
if num <determine_type_from_mac function · python · L169-L186 (18 LOC)config/ansible/admin/files/app.py
def determine_type_from_mac(mac_address):
"""Determine machine type based on MAC address prefix.
Used for PXE boot allocation. macOS nodes use explicit type parameter
via /macos/bootstrap endpoint.
"""
if not mac_address:
return 'compute'
# Normalize MAC address to lowercase and remove separators
normalized_mac = mac_address.lower().replace(':', '').replace('-', '')
# Check for storage prefix (58:47:ca becomes 5847ca)
if normalized_mac.startswith('5847ca'):
return 'storage'
# Default to compute
return 'compute'get_next_hostname function · python · L188-L224 (37 LOC)config/ansible/admin/files/app.py
def get_next_hostname(client, node_type):
"""Get the next available hostname for a node type"""
prefixes = {
'storage': 's',
'compute': 'c',
'macos': 'm',
'nvidia': 'nv',
'nas': 'nas',
'dev': 'd',
}
prefix = prefixes.get(node_type, 'c')
# Get all existing hostnames of this type
existing_numbers = []
for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/{prefix}"):
if value:
hostname = metadata.key.decode().split('/')[-1]
try:
num = int(hostname[len(prefix):])
existing_numbers.append(num)
except:
pass
# Find the next available number
next_num = 1
if existing_numbers:
existing_numbers.sort()
# Find first gap or use max+1
for i, num in enumerate(existing_numbers):
if num != i + 1:
next_num = i + 1
break
elseget_or_create_allocation function · python · L226-L312 (87 LOC)config/ansible/admin/files/app.py
def get_or_create_allocation(mac_address, node_type=None, via_wg=False):
"""Get existing allocation or create new one for non-normalized MAC address.
Args:
mac_address: MAC address (any format)
node_type: Optional type override ('storage', 'compute', 'macos').
If not provided, type is detected from MAC address.
If provided and different from existing, the allocation is updated.
via_wg: Transport marker — WG-bootstrapped nodes get their IP
from 10.0.1.0/24 instead of 10.0.0.0/24. Preserved in
the allocation record so list/lookup stays consistent.
"""
client = get_etcd_client()
normalized_mac = mac_address.lower().replace(':', '').replace('-', '')
# Check if allocation already exists
existing_data = client.get(f"{ETCD_PREFIX}/by-mac/{normalized_mac}")
if existing_data[0]:
data = json.loads(existing_data[0].decode())
existing_via_wg = booallocate_hostname function · python · L315-L343 (29 LOC)config/ansible/admin/files/app.py
def allocate_hostname():
"""Allocate a new hostname based on MAC address.
Query parameters:
mac: MAC address (required)
type: Optional type override ('storage', 'compute', 'macos')
"""
mac_address = request.args.get('mac')
node_type = request.args.get('type')
if not mac_address:
return jsonify({'error': 'MAC address is required'}), 400
if node_type and node_type not in ('storage', 'compute', 'macos', 'nas', 'nvidia', 'dev'):
return jsonify({'error': f'Invalid type: {node_type}'}), 400
try:
allocation = get_or_create_allocation(mac_address, node_type=node_type)
return jsonify({
'hostname': allocation['hostname'],
'type': allocation['type'],
'ip': allocation['ip'],
'amt_ip': allocation['amt_ip'],
'mac': mac_address,
'existing': True # Always true since we return existing or newly created
})
except Exception as e:
wg_register function · python · L346-L398 (53 LOC)config/ansible/admin/files/app.py
def wg_register():
"""Allocate a hostname+IP (as a cluster node of the requested type)
and register a WireGuard peer pubkey for it in one atomic call.
This endpoint is public (exposed via the admin-api reverse proxy for
remote-node onboarding), so it validates input carefully and does not
leak details of unrelated allocations.
Body (JSON): {mac, type, pubkey}
mac: client MAC (any format)
type: one of storage|compute|macos|nas|nvidia (default: compute)
pubkey: base64-encoded 32-byte WireGuard public key
Returns: {hostname, ip, status, pubkey_sha256}
"""
import base64
from ycluster.utils import wg_config
data = request.get_json(silent=True) or {}
mac = data.get('mac')
node_type = data.get('type') or 'compute'
pubkey = data.get('pubkey')
if not mac or not pubkey:
return jsonify({'error': 'mac and pubkey are required'}), 400
if node_type not in ('storage', 'compute', 'macos', 'nas', 'nvidia',wg_poll function · python · L402-L423 (22 LOC)config/ansible/admin/files/app.py
def wg_poll(hostname):
"""Poll for approval status. Requires fp query param (pubkey_sha256)
to prevent unauthenticated enumeration of peer configs."""
from ycluster.utils import wg_config
fp = request.args.get('fp')
if not fp:
return jsonify({'error': 'fp query parameter is required'}), 400
peer = wg_config.get_peer(hostname)
if not peer:
return jsonify({'error': 'no such peer'}), 404
if peer.get('pubkey_sha256') != fp:
return jsonify({'error': 'fingerprint mismatch'}), 403
resp = {'status': peer['status']}
if peer['status'] == 'approved':
try:
resp['config'] = wg_config.render_client_config(hostname)
except Exception as e:
return jsonify({'error': f'render failed: {e}'}), 500
return jsonify(resp)Open data scored by Repobility · https://repobility.com
status function · python · L427-L446 (20 LOC)config/ansible/admin/files/app.py
def status():
"""Get current allocation counts by type"""
try:
client = get_etcd_client()
except Exception as e:
return jsonify({'error': f'etcd connection failed: {str(e)}'}), 503
counts = {'storage': 0, 'compute': 0, 'macos': 0}
# Count allocations by type
for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
if value:
try:
allocation = json.loads(value.decode())
node_type = allocation.get('type', 'compute')
counts[node_type] = counts.get(node_type, 0) + 1
except:
pass
return jsonify(counts)allocations function · python · L449-L477 (29 LOC)config/ansible/admin/files/app.py
def allocations():
"""Get all current allocations"""
try:
client = get_etcd_client()
except Exception as e:
return jsonify({'error': f'etcd connection failed: {str(e)}'}), 503
allocations = []
# Get all allocations from by-hostname (to avoid duplicates)
for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
if value:
try:
allocation = json.loads(value.decode())
allocations.append({
'mac': allocation['mac'],
'hostname': allocation['hostname'],
'type': allocation['type'],
'ip': allocation['ip'],
'allocated_at': allocation.get('allocated_at', 0),
'disabled': allocation.get('disabled', False)
})
except:
pass
# Sort by hostname
allocations.sort(key=lambda x: (x['type'], int(x['hostname'][1:]) if x['disable_host function · python · L481-L501 (21 LOC)config/ansible/admin/files/app.py
def disable_host(hostname):
"""Disable a host so it doesn't appear in status page"""
try:
client = get_etcd_client()
except Exception as e:
return jsonify({'error': f'etcd connection failed: {str(e)}'}), 503
try:
result = client.get(f"{ETCD_PREFIX}/by-hostname/{hostname}")
if not result[0]:
return jsonify({'error': 'Host not found'}), 404
allocation = json.loads(result[0].decode())
allocation['disabled'] = True
client.put(f"{ETCD_PREFIX}/by-hostname/{hostname}", json.dumps(allocation))
client.put(f"{ETCD_PREFIX}/by-mac/{allocation['mac']}", json.dumps(allocation))
return jsonify({'status': 'ok', 'hostname': hostname, 'disabled': True})
except Exception as e:
return jsonify({'error': str(e)}), 500enable_host function · python · L505-L525 (21 LOC)config/ansible/admin/files/app.py
def enable_host(hostname):
"""Re-enable a host so it appears in status page"""
try:
client = get_etcd_client()
except Exception as e:
return jsonify({'error': f'etcd connection failed: {str(e)}'}), 503
try:
result = client.get(f"{ETCD_PREFIX}/by-hostname/{hostname}")
if not result[0]:
return jsonify({'error': 'Host not found'}), 404
allocation = json.loads(result[0].decode())
allocation['disabled'] = False
client.put(f"{ETCD_PREFIX}/by-hostname/{hostname}", json.dumps(allocation))
client.put(f"{ETCD_PREFIX}/by-mac/{allocation['mac']}", json.dumps(allocation))
return jsonify({'status': 'ok', 'hostname': hostname, 'disabled': False})
except Exception as e:
return jsonify({'error': str(e)}), 500get_dhcp_config function · python · L529-L556 (28 LOC)config/ansible/admin/files/app.py
def get_dhcp_config():
"""Generate DHCP configuration from etcd allocations"""
try:
client = get_etcd_client()
except Exception as e:
return f"# etcd connection failed: {str(e)}\n", 503
dhcp_config = []
# Get all allocations
for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
if value:
try:
allocation = json.loads(value.decode())
mac = allocation['mac']
hostname = allocation['hostname']
ip = allocation['ip']
# Convert normalized MAC back to colon format
mac_formatted = ':'.join(mac[i:i+2] for i in range(0, 12, 2))
dhcp_config.append(f"dhcp-host={mac_formatted},{hostname},{ip},infinite")
except:
pass
if dhcp_config:
return '\n'.join(sorted(dhcp_config)) + '\n', 200, {'Content-Type': 'text/plain'}
else:
return "# Noget_hosts function · python · L559-L601 (43 LOC)config/ansible/admin/files/app.py
def get_hosts():
"""Generate hosts file format from etcd allocations"""
try:
client = get_etcd_client()
except Exception as e:
return f"# etcd connection failed: {str(e)}\n", 503
hosts_entries = []
# Get all allocations
for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
if value:
try:
allocation = json.loads(value.decode())
hostname = allocation['hostname']
ip = allocation['ip']
# Skip AMT hostnames registered as nodes - they get correct
# entries auto-generated from the base node below
if hostname.endswith('a') and determine_ip_from_hostname(hostname):
continue
# Add main hostname entry
hosts_entries.append(f"{ip} {hostname} {hostname}.xc")
# Add AMT hostname entry if this is a regulcheck_service_status function · python · L603-L610 (8 LOC)config/ansible/admin/files/app.py
def check_service_status(service_name):
"""Check if a systemd service is active"""
try:
result = subprocess.run(['systemctl', 'is-active', service_name],
capture_output=True, text=True, timeout=5)
return result.stdout.strip() == 'active'
except:
return Falsecheck_port_open function · python · L612-L621 (10 LOC)config/ansible/admin/files/app.py
def check_port_open(host, port, timeout=3):
"""Check if a port is open on a host"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
sock.close()
return result == 0
except:
return FalseRepobility analyzer · published findings · https://repobility.com
check_ceph_status function · python · L623-L643 (21 LOC)config/ansible/admin/files/app.py
def check_ceph_status():
"""Check Ceph cluster health"""
try:
result = subprocess.run(['ceph', 'health'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
health_output = result.stdout.strip()
if 'HEALTH_OK' in health_output:
status = 'healthy'
elif 'HEALTH_ERR' in health_output:
status = 'unhealthy'
else:
status = 'degraded'
return {
'status': status,
'details': health_output
}
else:
return {'status': 'error', 'details': result.stderr.strip()}
except:
return {'status': 'unavailable', 'details': 'ceph command failed'}check_dns_status function · python · L645-L701 (57 LOC)config/ansible/admin/files/app.py
def check_dns_status():
"""Check DNS (dnsmasq) service and functionality"""
try:
# Check if dnsmasq service is running
service_running = check_service_status('dnsmasq')
# Test local DNS server directly using dnspython
dns_working = False
dns_details = "DNS query failed"
try:
# Create a resolver that queries the local DNS server directly
resolver = dns.resolver.Resolver()
resolver.nameservers = ['127.0.0.1']
resolver.timeout = 3
resolver.lifetime = 5
# Query local hostname A record
local_hostname = platform.node()
answer = resolver.resolve(local_hostname, 'A')
if answer:
resolved_ips = [str(rdata) for rdata in answer]
dns_working = True
dns_details = f"Local DNS server responding ({local_hostname} -> {', '.join(resolved_ips)})"
else:
check_certificate_expiry function · python · L703-L764 (62 LOC)config/ansible/admin/files/app.py
def check_certificate_expiry():
"""Check TLS certificate expiry from etcd"""
try:
client = get_etcd_client()
cert_value, _ = client.get('/cluster/tls/cert')
if not cert_value:
return {
'status': 'not_configured',
'details': {
'message': 'No certificate found in etcd',
'days_until_expiry': None,
'expires_at': None
}
}
# Parse the certificate
cert_pem = cert_value.decode()
cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend())
# Get expiry date
expires_at = cert.not_valid_after
now = datetime.now(UTC).replace(tzinfo=None) # Remove timezone for comparison
# Calculate days until expiry
time_until_expiry = expires_at - now
days_until_expiry = time_until_expiry.days
# Determine status bcheck_clock_skew function · python · L766-L816 (51 LOC)config/ansible/admin/files/app.py
def check_clock_skew():
"""Check clock skew using NTP protocol to VIP"""
# NTP server to check against (VIP)
ntp_server = '10.0.0.254'
try:
# Create NTP client
client = ntplib.NTPClient()
# Make NTP request (this is a lightweight UDP request)
response = client.request(ntp_server, version=3, timeout=2)
# Get offset in milliseconds
offset_ms = response.offset * 1000
# Determine status based on offset
if abs(offset_ms) > 1000: # More than 1 second
status = 'critical'
elif abs(offset_ms) > 100: # More than 100ms
status = 'warning'
else:
status = 'healthy'
return {
'status': status,
'details': {
'offset_ms': round(offset_ms, 3),
'ntp_server': ntp_server,
'stratum': response.stratum,
'precision': response.precision,
check_docker_daemon function · python · L818-L870 (53 LOC)config/ansible/admin/files/app.py
def check_docker_daemon():
"""Check Docker daemon status and functionality"""
try:
# Check if Docker service is running
docker_service_running = check_service_status('docker')
# Check if Docker socket is accessible
docker_socket_accessible = False
docker_version = None
docker_error = None
if docker_service_running:
try:
# Test Docker daemon connectivity
result = subprocess.run(['docker', 'version', '--format', '{{.Server.Version}}'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
docker_socket_accessible = True
docker_version = result.stdout.strip()
else:
docker_error = result.stderr.strip()
except subprocess.TimeoutExpired:
docker_error = 'Docker command timeout'
exceptcheck_docker_registry function · python · L872-L956 (85 LOC)config/ansible/admin/files/app.py
def check_docker_registry():
"""Check Docker registry status and functionality"""
try:
# Check if registry service is running
registry_service_running = check_service_status('docker-registry')
# Also check if registry container is running directly
registry_container_running = False
# Check if registry port is open (try both localhost and storage VIP)
registry_port_open = check_port_open('localhost', 5000)
# Test registry health endpoint
registry_healthy = False
registry_error = None
registry_version = None
if registry_port_open:
# Try both localhost and VIP endpoints
test_urls = ['http://localhost:5000/v2/', 'http://10.0.0.100:5000/v2/']
for url in test_urls:
try:
health_response = requests.get(url, timeout=5)
if health_response.status_code == 200:
registry_check_tang_service function · python · L958-L1025 (68 LOC)config/ansible/admin/files/app.py
def check_tang_service():
"""Check Tang server status and functionality"""
try:
# Check if Tang service is running
tang_service_running = check_service_status('tang-server.service')
# Check if Tang port is open
tang_port_open = check_port_open('localhost', 8777)
# Test Tang advertisement endpoint
tang_healthy = False
tang_error = None
tang_keys = None
if tang_port_open:
try:
adv_response = requests.get('http://localhost:8777/adv', timeout=5)
if adv_response.status_code == 200:
tang_healthy = True
# Try to parse the advertisement to count keys
try:
adv_data = adv_response.json()
if isinstance(adv_data, dict) and 'keys' in adv_data:
tang_keys = len(adv_data['keys'])
else:
check_secrets_mount function · python · L1027-L1087 (61 LOC)config/ansible/admin/files/app.py
def check_secrets_mount():
"""Check if /secrets is mounted"""
try:
# Check if /secrets is mounted
result = subprocess.run(['mountpoint', '-q', '/secrets'],
capture_output=True, text=True, timeout=5)
is_mounted = result.returncode == 0
# Get mount details if mounted
mount_details = None
if is_mounted:
try:
mount_result = subprocess.run(['findmnt', '-n', '-o', 'SOURCE,FSTYPE,OPTIONS', '/secrets'],
capture_output=True, text=True, timeout=5)
if mount_result.returncode == 0:
mount_details = mount_result.stdout.strip()
except:
pass
# Check if secrets directory exists and is accessible
secrets_accessible = False
secrets_error = None
try:
if os.path.exists('/secrets') and os.path.isdir('/secrets'):
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
check_open_webui function · python · L1089-L1167 (79 LOC)config/ansible/admin/files/app.py
def check_open_webui():
"""Check Open-WebUI service status and functionality"""
try:
# Check if Open-WebUI service is running
webui_service_running = check_service_status('open-webui')
# Check if Open-WebUI port is open
webui_port_open = check_port_open('localhost', 8380)
# Test Open-WebUI health endpoint
webui_healthy = False
webui_error = None
webui_version = None
if webui_port_open:
try:
# Try health check endpoint
health_response = requests.get('http://localhost:8380/health', timeout=5)
if health_response.status_code == 200:
webui_healthy = True
try:
health_data = health_response.json()
webui_version = health_data.get('version', 'unknown')
except:
webui_version = 'unknown'
is_storage_leader function · python · L1169-L1179 (11 LOC)config/ansible/admin/files/app.py
def is_storage_leader():
"""Check if this node is the current storage leader"""
try:
client = get_etcd_client()
result = client.get('/cluster/leader/app')
if result[0]:
leader = result[0].decode()
return leader == platform.node()
return False
except:
return Falseis_dhcp_leader function · python · L1181-L1191 (11 LOC)config/ansible/admin/files/app.py
def is_dhcp_leader():
"""Check if this node is the current DHCP leader"""
try:
client = get_etcd_client()
result = client.get('/cluster/leader/dhcp')
if result[0]:
leader = result[0].decode()
return leader == platform.node()
return False
except:
return Falseis_node_drained function · python · L1193-L1201 (9 LOC)config/ansible/admin/files/app.py
def is_node_drained():
"""Check if this node is drained"""
try:
hostname = platform.node()
client = get_etcd_client()
result = client.get(f'/cluster/nodes/{hostname}/drain')
return result[0] is not None and result[0].decode() == 'true'
except:
return Falseget_current_node_type function · python · L1203-L1214 (12 LOC)config/ansible/admin/files/app.py
def get_current_node_type():
"""Determine the current node type based on hostname prefix"""
hostname = platform.node()
if hostname and len(hostname) > 0:
prefix = hostname[0]
if prefix == 's':
return 'storage'
elif prefix == 'c':
return 'compute'
elif prefix == 'm':
return 'macos'
return 'unknown'check_service_conditionally function · python · L1216-L1241 (26 LOC)config/ansible/admin/files/app.py
def check_service_conditionally(health_status, service_name, check_func, required_on_storage_only=True):
"""
Helper function to conditionally check a service based on node type.
Args:
health_status: The health status dict to update
service_name: Name of the service being checked
check_func: Function to call to check the service (should return a dict with 'status' key)
required_on_storage_only: If True, service is only checked on storage nodes
"""
is_storage_node = get_current_node_type() == 'storage'
if not required_on_storage_only or is_storage_node:
service_result = check_func()
health_status['services'][service_name] = service_result
# Update overall health based on service status
if service_result['status'] in ['unhealthy', 'error']:
health_status['overall'] = 'unhealthy'
elif service_result['status'] == 'degraded' and health_status['overall'] == 'healthy':drain_node function · python · L1244-L1252 (9 LOC)config/ansible/admin/files/app.py
def drain_node():
"""Drain this node - disable leader election"""
try:
hostname = platform.node()
client = get_etcd_client()
client.put(f'/cluster/nodes/{hostname}/drain', 'true')
return jsonify({'status': 'drained', 'hostname': hostname})
except Exception as e:
return jsonify({'error': f'Failed to drain node: {str(e)}'}), 500undrain_node function · python · L1255-L1263 (9 LOC)config/ansible/admin/files/app.py
def undrain_node():
"""Undrain this node - re-enable leader election"""
try:
hostname = platform.node()
client = get_etcd_client()
client.delete(f'/cluster/nodes/{hostname}/drain')
return jsonify({'status': 'active', 'hostname': hostname})
except Exception as e:
return jsonify({'error': f'Failed to undrain node: {str(e)}'}), 500Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
drain_target_node function · python · L1266-L1273 (8 LOC)config/ansible/admin/files/app.py
def drain_target_node(target_hostname):
"""Drain a specific node - disable leader election"""
try:
client = get_etcd_client()
client.put(f'/cluster/nodes/{target_hostname}/drain', 'true')
return jsonify({'status': 'drained', 'hostname': target_hostname})
except Exception as e:
return jsonify({'error': f'Failed to drain node {target_hostname}: {str(e)}'}), 500undrain_target_node function · python · L1276-L1283 (8 LOC)config/ansible/admin/files/app.py
def undrain_target_node(target_hostname):
"""Undrain a specific node - re-enable leader election"""
try:
client = get_etcd_client()
client.delete(f'/cluster/nodes/{target_hostname}/drain')
return jsonify({'status': 'active', 'hostname': target_hostname})
except Exception as e:
return jsonify({'error': f'Failed to undrain node {target_hostname}: {str(e)}'}), 500drain_status function · python · L1286-L1295 (10 LOC)config/ansible/admin/files/app.py
def drain_status():
"""Check drain status of this node"""
try:
hostname = platform.node()
client = get_etcd_client()
result = client.get(f'/cluster/nodes/{hostname}/drain')
is_drained = result[0] is not None and result[0].decode() == 'true'
return jsonify({'hostname': hostname, 'drained': is_drained})
except Exception as e:
return jsonify({'error': f'Failed to check drain status: {str(e)}'}), 500drain_status_target function · python · L1298-L1306 (9 LOC)config/ansible/admin/files/app.py
def drain_status_target(target_hostname):
"""Check drain status of a specific node"""
try:
client = get_etcd_client()
result = client.get(f'/cluster/nodes/{target_hostname}/drain')
is_drained = result[0] is not None and result[0].decode() == 'true'
return jsonify({'hostname': target_hostname, 'drained': is_drained})
except Exception as e:
return jsonify({'error': f'Failed to check drain status for {target_hostname}: {str(e)}'}), 500ping function · python · L1309-L1311 (3 LOC)config/ansible/admin/files/app.py
def ping():
"""Simple ping endpoint for connectivity testing"""
return jsonify({'status': 'ok', 'timestamp': datetime.now(UTC).isoformat()})get_time function · python · L1314-L1316 (3 LOC)config/ansible/admin/files/app.py
def get_time():
"""Get current timestamp for clock synchronization checks"""
return jsonify({'timestamp': time.time()})get_mac_from_ip function · python · L1318-L1363 (46 LOC)config/ansible/admin/files/app.py
def get_mac_from_ip(client_ip):
"""
Look up MAC address from IP address using DHCP leases in etcd or neighbor table.
Return value is non-normalized MAC address (xx:xx:xx:xx:xx:xx) or None if not found.
"""
if not client_ip:
return None
client = get_etcd_client()
# Look through DHCP leases in etcd
for value, metadata in client.get_prefix('/cluster/dhcp/leases/'):
if value:
try:
lease_data = json.loads(value.decode())
if lease_data.get('ip') == client_ip:
# Return non-normalized MAC (with colons) from lease data
return lease_data.get('mac')
except json.JSONDecodeError:
# Skip non-JSON entries in the dhcp prefix
continue
print("fallback to neighbor table", client_ip, file=sys.stderr)
# Fallback to neighbor table lookup using ip --json neigh
result = None
try:
result = subprocess.run([serve_meta_data function · python · L1366-L1368 (3 LOC)config/ansible/admin/files/app.py
def serve_meta_data():
"""Serve empty meta-data for autoinstall"""
return "", 200, {'Content-Type': 'text/plain'}Open data scored by Repobility · https://repobility.com
serve_user_data function · python · L1371-L1452 (82 LOC)config/ansible/admin/files/app.py
def serve_user_data():
"""Serve dynamically rendered user-data based on client MAC address"""
# Get MAC from query param (for Docker/dev) or look up from client IP (prod)
mac_address = request.args.get('mac')
client_ip = request.environ.get('REMOTE_ADDR') or request.remote_addr
if mac_address:
# Normalize MAC format (GRUB uses xx:xx:xx:xx:xx:xx)
mac_address = mac_address.lower().replace('-', ':')
print(f"MAC from query param: {mac_address}", file=sys.stderr)
else:
# Fall back to IP lookup
mac_address = get_mac_from_ip(client_ip)
if not mac_address:
return f"MAC address not found for client IP {client_ip}", 400
print(f"MAC from IP lookup: {client_ip} -> {mac_address}", file=sys.stderr)
# Get or create allocation for this MAC address
allocation_data = get_or_create_allocation(mac_address)
# Use allocation data
node_type = allocation_data['type']
hostname = allocationserve_bootstrap_index function · python · L1463-L1484 (22 LOC)config/ansible/admin/files/app.py
def serve_bootstrap_index():
"""Serve bootstrap usage hints"""
api_server = f"http://{request.host}"
text = f"""YCluster Bootstrap
Available types:
macos - macOS compute nodes
nas - Ubuntu-based NAS devices
nvidia - Ubuntu-based Nvidia GPU servers
wg - WireGuard overlay client (remote node joining over the internet)
Usage:
curl {api_server}/bootstrap/<type> | sudo bash
Examples:
curl {api_server}/bootstrap/macos | sudo bash
curl {api_server}/bootstrap/nas | sudo bash
curl {api_server}/bootstrap/nvidia | sudo bash
curl {api_server}/bootstrap/wg | sudo bash -s -- --type compute
curl {api_server}/bootstrap/wg | sudo bash -s -- --dev
"""
return text, 200, {'Content-Type': 'text/plain'}serve_bootstrap function · python · L1488-L1532 (45 LOC)config/ansible/admin/files/app.py
def serve_bootstrap(node_type):
"""Serve bootstrap script for a given node type"""
if node_type not in BOOTSTRAP_TEMPLATES:
return jsonify({'error': f'Unknown bootstrap type: {node_type}', 'available': list(BOOTSTRAP_TEMPLATES.keys())}), 404
# Determine the API server URL. Remote (wg) bootstrap must use the
# public admin subdomain (reverse proxy exposes only a whitelist of
# paths there). The WG tunnel endpoint is separate and rendered into
# the client wg config itself. Local node bootstraps (macos/nas/nvidia)
# keep using the host header.
if node_type == 'wg':
try:
etcd = get_etcd_client()
domain_value, _ = etcd.get('/cluster/https/domain')
except Exception as e:
return jsonify({'error': f'etcd lookup failed: {e}'}), 503
if not domain_value:
return jsonify({
'error': 'wg bootstrap requires /cluster/https/domain to be set '
'(run `yprometheus_metrics function · python · L1536-L1602 (67 LOC)config/ansible/admin/files/app.py
def prometheus_metrics():
"""Prometheus metrics endpoint"""
try:
# Get health data
health_data = get_comprehensive_health()
metrics = []
# Overall health metric
overall_value = 1 if health_data['overall'] == 'healthy' else 0
metrics.append(f'ycluster_node_healthy{{node="{platform.node()}"}} {overall_value}')
# Service health metrics
for service, details in health_data.get('services', {}).items():
status = details.get('status')
if status == 'healthy':
service_value = 0
elif status == 'degraded':
service_value = 1
elif status == 'unhealthy':
service_value = 2
elif status == 'standby':
service_value = 3
else:
service_value = 2
metrics.append(f'ycluster_service_health{{node="{platform.node()}",service="{service}"}} {service_value}'get_comprehensive_health function · python · L1604-L1985 (382 LOC)config/ansible/admin/files/app.py
def get_comprehensive_health():
"""Get comprehensive health data (extracted from health() function)"""
health_status = {
'overall': 'healthy',
'services': {}
}
# Check etcd
try:
client = get_etcd_client()
client.get('/test')
health_status['services']['etcd'] = {'status': 'healthy', 'details': 'connected'}
except Exception as e:
health_status['services']['etcd'] = {'status': 'unhealthy', 'details': str(e)}
health_status['overall'] = 'unhealthy'
# Determine current node type
current_node_type = get_current_node_type()
is_storage_node = current_node_type == 'storage'
# Check Ceph storage (only on storage nodes)
if is_storage_node:
ceph_health = check_ceph_status()
health_status['services']['ceph'] = ceph_health
if ceph_health['status'] not in ['healthy', 'degraded']:
health_status['overall'] = 'unhealthy'
else:
health_status['sehealth function · python · L1988-L1994 (7 LOC)config/ansible/admin/files/app.py
def health():
"""Comprehensive health check endpoint for all services"""
health_status = get_comprehensive_health()
# Return appropriate HTTP status code
status_code = 200 if health_status['overall'] == 'healthy' else 503
return jsonify(health_status), status_codealert_webhook function · python · L1997-L2017 (21 LOC)config/ansible/admin/files/app.py
def alert_webhook():
"""Webhook endpoint for Alertmanager notifications"""
try:
alert_data = request.get_json()
# Log the alert
for alert in alert_data.get('alerts', []):
status = alert.get('status', 'unknown')
alertname = alert.get('labels', {}).get('alertname', 'unknown')
severity = alert.get('labels', {}).get('severity', 'unknown')
node = alert.get('labels', {}).get('node', 'unknown')
print(f"Alert {status}: {alertname} (severity: {severity}, node: {node})")
# Here you could integrate with external notification systems
# For now, just acknowledge receipt
return jsonify({'status': 'received'}), 200
except Exception as e:
print(f"Error processing alert webhook: {e}")
return jsonify({'error': str(e)}), 500get_all_hosts function · python · L2019-L2056 (38 LOC)config/ansible/admin/files/app.py
def get_all_hosts():
"""Get all hosts from etcd allocations"""
try:
client = get_etcd_client()
hosts = []
# Get all allocations from by-hostname
for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
if value:
try:
allocation = json.loads(value.decode())
hostname = allocation['hostname']
# Skip AMT interfaces (hostnames ending with 'a')
if hostname.endswith('a'):
continue
# Skip dynamic IP allocations (hostnames that don't match expected patterns)
# Expected patterns: s1-s20, c1-c20, m1-m20
if not (hostname.startswith(('s', 'c', 'm')) and
len(hostname) > 1 and
hostname[1:].isdigit()):
continue
Repobility analyzer · published findings · https://repobility.com
get_host_health function · python · L2058-L2072 (15 LOC)config/ansible/admin/files/app.py
def get_host_health(host_ip, timeout=10):
"""Get health status from a specific host"""
try:
response = requests.get(f"http://{host_ip}:12723/api/health", timeout=timeout)
if response.status_code in [200, 503]:
# Both 200 (healthy) and 503 (unhealthy) contain valid health data
return response.json()
else:
return {'overall': 'error', 'services': {}, 'error': f'HTTP {response.status_code}'}
except requests.exceptions.Timeout:
return {'overall': 'timeout', 'services': {}, 'error': 'Request timeout'}
except requests.exceptions.ConnectionError:
return {'overall': 'unreachable', 'services': {}, 'error': 'Connection failed'}
except Exception as e:
return {'overall': 'error', 'services': {}, 'error': str(e)}check_vip_status function · python · L2074-L2148 (75 LOC)config/ansible/admin/files/app.py
def check_vip_status():
"""Check VIP status using keepalived and ip commands"""
gateway_vip_ip = '10.0.0.254'
storage_vip_ip = '10.0.0.100'
vip_status = {
'gateway_vip': {
'ip': gateway_vip_ip,
'active': False,
'master': None,
'interface': None
},
'storage_vip': {
'ip': storage_vip_ip,
'active': False,
'master': None,
'interface': None
}
}
# Check gateway VIP
try:
# Use 'ip -j addr show to <vip>' to get JSON output for reliable parsing
result = subprocess.run(['ip', '-j', 'addr', 'show', 'to', gateway_vip_ip],
capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
# Parse JSON output - if there's any output, VIP is active on this node
interfaces = json.loads(result.stdout)
if interfaces:
get_cluster_vip_status function · python · L2150-L2232 (83 LOC)config/ansible/admin/files/app.py
def get_cluster_vip_status(host_health):
"""Get VIP status across all cluster nodes from existing health data"""
vip_info = {
'gateway_vip': {
'ip': '10.0.0.254',
'active_on': None,
'master_hostname': None,
'interface': None
},
'storage_vip': {
'ip': '10.0.0.100',
'active_on': None,
'master_hostname': None,
'interface': None
},
'keepalived_nodes': [] # Single list for keepalived status across all nodes
}
# Get all hosts to find core nodes
all_hosts = get_all_hosts()
# Process only core nodes (where keepalived runs and VIPs can be active)
for core_node in CORE_NODES:
# Find the core node in all_hosts to get its IP
core_host = next((host for host in all_hosts if host['hostname'] == core_node), None)
if not core_host:
# Core node not found in allocations - add as missing
page 1 / 8next ›