Function bodies 355 total
get_leadership_status function · python · L2235-L2255 (21 LOC)config/ansible/admin/files/app.py
def get_leadership_status():
"""Get current leadership status from etcd"""
try:
client = get_etcd_client()
leadership = {}
# Get storage leader
result = client.get('/cluster/leader/app')
if result[0]:
storage_leader = result[0].decode()
leadership['storage_leader'] = storage_leader
# Get DHCP leader
result = client.get('/cluster/leader/dhcp')
if result[0]:
dhcp_leader = result[0].decode()
leadership['dhcp_leader'] = dhcp_leader
return leadership
except:
return {}cluster_status_api function · python · L2258-L2283 (26 LOC)config/ansible/admin/files/app.py
def cluster_status_api():
"""API endpoint returning cluster status as JSON"""
hosts = get_all_hosts()
host_health = {}
leadership = get_leadership_status()
certificate_status = check_certificate_expiry()
# Get health status for each host (skip disabled hosts)
for host in hosts:
if host.get('disabled', False):
host_health[host['hostname']] = {'status': 'disabled', 'services': []}
else:
host_health[host['hostname']] = get_host_health(host['ip'])
# Extract VIP status from existing health data
vip_status = get_cluster_vip_status(host_health)
return jsonify({
'hosts': hosts,
'hostHealth': host_health,
'leadership': leadership,
'vipStatus': vip_status,
'certificateStatus': certificate_status,
'respondingHostname': platform.node(),
'timestamp': datetime.now().isoformat()
})static_files function · python · L2286-L2301 (16 LOC)config/ansible/admin/files/app.py
def static_files(filename):
"""Serve static files"""
import os
from flask import Response
# Get the directory where this script is located
script_dir = os.path.dirname(os.path.abspath(__file__))
static_dir = os.path.join(script_dir, 'static')
# Get the response first
response = send_from_directory(static_dir, filename)
# Set correct MIME type for JavaScript files
if filename.endswith('.js'):
response.headers['Content-Type'] = 'application/javascript'
return responsestatus_page function · python · L2304-L2332 (29 LOC)config/ansible/admin/files/app.py
def status_page():
"""Web page showing cluster-wide health status"""
hosts = get_all_hosts()
host_health = {}
leadership = get_leadership_status()
certificate_status = check_certificate_expiry()
# Get health status for each host (skip disabled hosts)
for host in hosts:
if host.get('disabled', False):
host_health[host['hostname']] = {'status': 'disabled', 'services': []}
else:
host_health[host['hostname']] = get_host_health(host['ip'])
# Extract VIP status from existing health data
vip_status = get_cluster_vip_status(host_health)
# Get drain status for this node
current_node_drained = is_node_drained()
return render_template('status.html',
hosts=hosts,
host_health=host_health,
leadership=leadership,
vip_status=vip_status,
certificate_status=certificate_status,get_healthchecks_url function · python · L19-L29 (11 LOC)config/ansible/admin/files/scripts/cluster-heartbeat.py
def get_healthchecks_url():
"""Get healthchecks URL from etcd"""
try:
client = get_etcd_client()
result = client.get(HEALTHCHECKS_ETCD_KEY)
if result[0]:
return result[0].decode()
return None
except Exception as e:
print(f"Failed to get healthchecks URL from etcd: {str(e)}")
return Noneget_cluster_health function · python · L32-L128 (97 LOC)config/ansible/admin/files/scripts/cluster-heartbeat.py
def get_cluster_health():
"""Get comprehensive cluster health status"""
health_data = {
'timestamp': datetime.now().isoformat(),
'hostname': socket.gethostname(),
'services': {},
'nodes': {},
'overall': 'healthy'
}
try:
# Check local health API
response = requests.get('http://localhost:12723/api/health', timeout=10)
if response.status_code in [200, 503]:
local_health = response.json()
health_data['services'] = local_health.get('services', {})
health_data['overall'] = local_health.get('overall', 'unknown')
health_data['storage_leader'] = local_health.get('storage_leader', False)
health_data['dhcp_leader'] = local_health.get('dhcp_leader', False)
except Exception as e:
health_data['error'] = f"Failed to get local health: {str(e)}"
health_data['overall'] = 'error'
# Get cluster-wide status
try:
response = determine_health_status function · python · L130-L156 (27 LOC)config/ansible/admin/files/scripts/cluster-heartbeat.py
def determine_health_status(health_data):
"""Determine overall health status and exit code"""
# Critical failures (exit code 2)
critical_conditions = [
health_data.get('overall') == 'unhealthy',
health_data.get('nodes', {}).get('unreachable', 0) > 1,
health_data.get('certificate', {}).get('status') in ['expired', 'critical'],
'etcd' in health_data.get('services', {}) and
health_data['services']['etcd'].get('status') == 'unhealthy',
'ceph' in health_data.get('services', {}) and
health_data['services']['ceph'].get('status') == 'unhealthy'
]
# Warning conditions (exit code 1)
warning_conditions = [
health_data.get('overall') == 'degraded',
health_data.get('nodes', {}).get('unhealthy', 0) > 0,
health_data.get('nodes', {}).get('unreachable', 0) == 1,
health_data.get('certificate', {}).get('status') == 'warning'
]
if any(critical_conditions):
retRepobility — same analyzer, your code, free for public repos · /scan/
format_health_message function · python · L158-L228 (71 LOC)config/ansible/admin/files/scripts/cluster-heartbeat.py
def format_health_message(health_data, status):
"""Format health data into a human-readable message"""
lines = []
lines.append(f"Cluster Status: {status.upper()}")
lines.append(f"Time: {health_data['timestamp']}")
lines.append(f"Reporting Node: {health_data['hostname']}")
# Node status
nodes = health_data.get('nodes', {})
if nodes:
lines.append(f"Nodes: {nodes.get('healthy', 0)}/{nodes.get('total', 0)} healthy")
if nodes.get('unhealthy_names'):
lines.append(f" Unhealthy: {', '.join(nodes['unhealthy_names'])}")
if nodes.get('unreachable_names'):
lines.append(f" Unreachable: {', '.join(nodes['unreachable_names'])}")
# Unhealthy services by node
unhealthy_by_node = health_data.get('unhealthy_services_by_node', {})
if unhealthy_by_node:
for hostname, services in unhealthy_by_node.items():
lines.append(f" {hostname}: {', '.join(services)}")
# Leadership
lsend_heartbeat function · python · L230-L282 (53 LOC)config/ansible/admin/files/scripts/cluster-heartbeat.py
def send_heartbeat():
"""Send heartbeat to healthchecks.io"""
# Get healthchecks URL from etcd
healthchecks_url = get_healthchecks_url()
if not healthchecks_url:
print("No healthchecks URL configured in etcd")
return False
try:
# Get health data
health_data = get_cluster_health()
# Determine status
exit_code, status = determine_health_status(health_data)
# Format message
message = format_health_message(health_data, status)
print(message)
# Send appropriate ping
if exit_code == 0:
# Success ping
response = requests.post(healthchecks_url,
data=message,
timeout=10)
elif exit_code == 1:
# Warning - use exit code endpoint
response = requests.post(f"{healthchecks_url}/1",
data=message,
main function · python · L284-L288 (5 LOC)config/ansible/admin/files/scripts/cluster-heartbeat.py
def main():
"""Main function - single heartbeat execution"""
# Send single heartbeat
success = send_heartbeat()
sys.exit(0 if success else 1)get_version function · python · L7-L13 (7 LOC)config/ansible/admin/files/ycluster/setup.py
def get_version():
version_file = os.path.join(os.path.dirname(__file__), 'ycluster', '__init__.py')
with open(version_file) as f:
for line in f:
if line.startswith('__version__'):
return line.split('=')[1].strip().strip('"\'')
return '0.1.0'get_requirements function · python · L16-L21 (6 LOC)config/ansible/admin/files/ycluster/setup.py
def get_requirements():
requirements_file = os.path.join(os.path.dirname(__file__), 'requirements.txt')
if os.path.exists(requirements_file):
with open(requirements_file) as f:
return [line.strip() for line in f if line.strip() and not line.startswith('#')]
return []register_backup_commands function · python · L8-L74 (67 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def register_backup_commands(subparsers):
"""Register backup-related commands."""
backup_parser = subparsers.add_parser('backup', help='Backup encryption management')
backup_parser.set_defaults(func=handle_backup_command)
backup_subparsers = backup_parser.add_subparsers(dest='backup_command', help='Backup commands')
# Recipients management
recipients_parser = backup_subparsers.add_parser('recipients', help='Manage backup encryption recipients')
recipients_subparsers = recipients_parser.add_subparsers(dest='recipients_command', help='Recipients commands')
# List recipients
list_parser = recipients_subparsers.add_parser('list', help='List backup encryption recipients')
list_parser.add_argument('--json', action='store_true', help='Output in JSON format')
list_parser.set_defaults(func=backup_recipients_list)
# Add recipient
add_parser = recipients_subparsers.add_parser('add', help='Add backup encryption recipient')
add_backup_recipients_list function · python · L77-L127 (51 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_recipients_list(args):
"""List all backup encryption recipients."""
try:
client = get_etcd_client()
# Get all recipients from etcd
result = client.get_prefix('/cluster/backup/recipients/')
recipients = []
for value, metadata in result:
try:
recipient_data = json.loads(value.decode('utf-8'))
name = metadata.key.decode('utf-8').split('/')[-1]
recipient_info = {
'name': name,
'public_key': recipient_data.get('public_key', 'N/A'),
'created_at': recipient_data.get('created_at', 'N/A')
}
if recipient_data.get('description'):
recipient_info['description'] = recipient_data['description']
recipients.append(recipient_info)
except (json.JSONDecodeErbackup_recipients_add function · python · L130-L167 (38 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_recipients_add(args):
"""Add a backup encryption recipient."""
try:
# Validate public key format
if not args.public_key.startswith('age1'):
print("Error: Public key must start with 'age1'")
sys.exit(1)
client = get_etcd_client()
key = f'/cluster/backup/recipients/{args.name}'
# Check if recipient already exists
existing = client.get(key)
if existing[0] is not None:
print(f"Error: Recipient '{args.name}' already exists")
sys.exit(1)
# Create recipient data
from datetime import datetime
recipient_data = {
'public_key': args.public_key,
'created_at': datetime.utcnow().isoformat() + 'Z'
}
if args.description:
recipient_data['description'] = args.description
# Store in etcd
client.put(key, json.dumps(recipient_data))
pRepobility (the analyzer behind this table) · https://repobility.com
backup_recipients_remove function · python · L170-L189 (20 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_recipients_remove(args):
"""Remove a backup encryption recipient."""
try:
client = get_etcd_client()
key = f'/cluster/backup/recipients/{args.name}'
# Check if recipient exists
existing = client.get(key)
if existing[0] is None:
print(f"Error: Recipient '{args.name}' not found")
sys.exit(1)
# Remove from etcd
client.delete(key)
print(f"Removed backup encryption recipient: {args.name}")
except Exception as e:
print(f"Error removing recipient: {e}")
sys.exit(1)backup_recipients_show function · python · L192-L219 (28 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_recipients_show(args):
"""Show details of a specific recipient."""
try:
client = get_etcd_client()
key = f'/cluster/backup/recipients/{args.name}'
result = client.get(key)
if result[0] is None:
print(f"Error: Recipient '{args.name}' not found")
sys.exit(1)
try:
recipient_data = json.loads(result[0].decode('utf-8'))
print(f"Recipient: {args.name}")
print("-" * 30)
print(f"Public Key: {recipient_data.get('public_key', 'N/A')}")
if recipient_data.get('description'):
print(f"Description: {recipient_data['description']}")
print(f"Added: {recipient_data.get('created_at', 'N/A')}")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error parsing recipient data: {e}")
sys.exit(1)
except Exception as e:
print(f"Errbackup_destinations_list function · python · L222-L269 (48 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_list(args):
"""List all backup rsync destinations."""
try:
client = get_etcd_client()
# Get all destinations from etcd
result = client.get_prefix('/cluster/backup/destinations/')
destinations = []
for value, metadata in result:
try:
destination_data = json.loads(value.decode('utf-8'))
name = metadata.key.decode('utf-8').split('/')[-1]
destination_info = {
'name': name,
'url': destination_data.get('url', 'N/A'),
'enabled': destination_data.get('enabled', True),
'created_at': destination_data.get('created_at', 'N/A')
}
destinations.append(destination_info)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error parsing destinationbackup_destinations_add function · python · L272-L308 (37 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_add(args):
"""Add a backup rsync destination."""
try:
client = get_etcd_client()
key = f'/cluster/backup/destinations/{args.name}'
# Check if destination already exists
existing = client.get(key)
if existing[0] is not None:
print(f"Error: Destination '{args.name}' already exists")
sys.exit(1)
# Determine enabled status
enabled = True
if args.disabled:
enabled = False
elif hasattr(args, 'enabled'):
enabled = args.enabled
# Create destination data
from datetime import datetime
destination_data = {
'url': args.url,
'enabled': enabled,
'created_at': datetime.utcnow().isoformat() + 'Z'
}
# Store in etcd
client.put(key, json.dumps(destination_data))
status = "enabled" if enabled else "disabled"
printbackup_destinations_remove function · python · L311-L330 (20 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_remove(args):
"""Remove a backup rsync destination."""
try:
client = get_etcd_client()
key = f'/cluster/backup/destinations/{args.name}'
# Check if destination exists
existing = client.get(key)
if existing[0] is None:
print(f"Error: Destination '{args.name}' not found")
sys.exit(1)
# Remove from etcd
client.delete(key)
print(f"Removed backup rsync destination: {args.name}")
except Exception as e:
print(f"Error removing destination: {e}")
sys.exit(1)backup_destinations_show function · python · L333-L359 (27 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_show(args):
"""Show details of a specific destination."""
try:
client = get_etcd_client()
key = f'/cluster/backup/destinations/{args.name}'
result = client.get(key)
if result[0] is None:
print(f"Error: Destination '{args.name}' not found")
sys.exit(1)
try:
destination_data = json.loads(result[0].decode('utf-8'))
status = "ENABLED" if destination_data.get('enabled', True) else "DISABLED"
print(f"Destination: {args.name} ({status})")
print("-" * 30)
print(f"URL: {destination_data.get('url', 'N/A')}")
print(f"Added: {destination_data.get('created_at', 'N/A')}")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error parsing destination data: {e}")
sys.exit(1)
except Exception as e:
print(f"Error showing destinabackup_destinations_enable function · python · L362-L389 (28 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_enable(args):
"""Enable a backup rsync destination."""
try:
client = get_etcd_client()
key = f'/cluster/backup/destinations/{args.name}'
# Check if destination exists
result = client.get(key)
if result[0] is None:
print(f"Error: Destination '{args.name}' not found")
sys.exit(1)
try:
destination_data = json.loads(result[0].decode('utf-8'))
destination_data['enabled'] = True
# Update in etcd
client.put(key, json.dumps(destination_data))
print(f"Enabled backup rsync destination: {args.name}")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error parsing destination data: {e}")
sys.exit(1)
except Exception as e:
print(f"Error enabling destination: {e}")
sys.exit(1)backup_destinations_disable function · python · L392-L419 (28 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_disable(args):
"""Disable a backup rsync destination."""
try:
client = get_etcd_client()
key = f'/cluster/backup/destinations/{args.name}'
# Check if destination exists
result = client.get(key)
if result[0] is None:
print(f"Error: Destination '{args.name}' not found")
sys.exit(1)
try:
destination_data = json.loads(result[0].decode('utf-8'))
destination_data['enabled'] = False
# Update in etcd
client.put(key, json.dumps(destination_data))
print(f"Disabled backup rsync destination: {args.name}")
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error parsing destination data: {e}")
sys.exit(1)
except Exception as e:
print(f"Error disabling destination: {e}")
sys.exit(1)Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
handle_backup_command function · python · L422-L431 (10 LOC)config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def handle_backup_command(args):
"""Handle backup command dispatch."""
if not hasattr(args, 'backup_command') or not args.backup_command:
print("Error: No backup command specified")
print("Available commands: recipients, destinations")
sys.exit(1)
# This should not be reached since subparsers handle the dispatch
print(f"Error: Unknown backup command: {args.backup_command}")
sys.exit(1)register_certbot_commands function · python · L12-L50 (39 LOC)config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def register_certbot_commands(subparsers):
"""Register certbot management commands"""
certbot_parser = subparsers.add_parser('certbot', help='Let\'s Encrypt certificate management')
certbot_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=certbot_parser)
certbot_subparsers = certbot_parser.add_subparsers(dest='certbot_command', help='Certbot commands')
# Obtain command
obtain_parser = certbot_subparsers.add_parser('obtain', help='Obtain a new certificate')
obtain_parser.add_argument('--test', action='store_true', help='Use staging server (test certificate)')
obtain_parser.add_argument('-n', '--non-interactive', action='store_true', help='Run in non-interactive mode')
obtain_parser.set_defaults(func=certbot_obtain)
# Renew command
renew_parser = certbot_subparsers.add_parser('renew', help='Renew existing certificates')
renew_parser.add_argument('-n', '--non-interactive', action='store_true', help='Run in noncertbot_obtain function · python · L53-L56 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_obtain(args):
"""Obtain certificate"""
success = certbot_manager.obtain_certificate(test_cert=args.test, non_interactive=args.non_interactive)
sys.exit(0 if success else 1)certbot_renew function · python · L59-L62 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_renew(args):
"""Renew certificates"""
success = certbot_manager.renew_certificates(non_interactive=args.non_interactive)
sys.exit(0 if success else 1)certbot_list function · python · L65-L68 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_list(args):
"""List certificates"""
success = certbot_manager.list_certificates()
sys.exit(0 if success else 1)certbot_revoke function · python · L71-L74 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_revoke(args):
"""Revoke certificate"""
success = certbot_manager.revoke_certificate(args.domain)
sys.exit(0 if success else 1)certbot_delete function · python · L77-L80 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_delete(args):
"""Delete certificate"""
success = certbot_manager.delete_certificate(args.domain)
sys.exit(0 if success else 1)certbot_update_nginx function · python · L83-L86 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_update_nginx(args):
"""Update nginx configuration"""
success = certbot_manager.update_nginx_configs(local_mode=getattr(args, 'local', False))
sys.exit(0 if success else 1)All rows scored by the Repobility analyzer (https://repobility.com)
certbot_status function · python · L89-L154 (66 LOC)config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_status(args):
"""Show certificate status"""
config = certbot_manager.get_https_config()
domains = certbot_manager.get_all_domains(config)
print("HTTPS Configuration:")
if 'domain' in config:
print(f"Primary domain: {config['domain']}")
if config.get('aliases'):
print(f"Aliases: {', '.join(config['aliases'])}")
if config.get('email'):
print(f"Email: {config['email']}")
else:
print("Email: Not configured (will use --register-unsafely-without-email)")
# Check if TLS materials exist
try:
client = get_etcd_client()
key_value, _ = client.get('/cluster/tls/key')
cert_value, _ = client.get('/cluster/tls/cert')
if key_value and cert_value:
print("TLS materials: Present in etcd")
# Check nginx paths
nginx_key = Path('/etc/nginx/ssl/key.pem')
nginx_cert = Path('/etc/nginx/ssl/cert.pem')
register_cluster_commands function · python · L9-L39 (31 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def register_cluster_commands(subparsers):
"""Register cluster management commands"""
cluster_parser = subparsers.add_parser('cluster', help='Cluster status and health management')
cluster_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=cluster_parser)
cluster_subparsers = cluster_parser.add_subparsers(dest='cluster_command', help='Cluster commands')
# Status command (verbose by default for backward compat)
status_parser = cluster_subparsers.add_parser('status', help='Detailed cluster status (verbose)')
status_parser.set_defaults(func=cluster_status_verbose)
# Health command (compact by default)
health_parser = cluster_subparsers.add_parser('health', help='Compact cluster health summary')
health_parser.add_argument('-v', '--verbose', action='store_true', help='Show full detailed output')
health_parser.set_defaults(func=cluster_health)
# Top-level health alias is registered in register_health_alias()
# Pregister_health_alias function · python · L42-L46 (5 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def register_health_alias(subparsers):
"""Register top-level 'health' command as alias for 'cluster health'"""
health_parser = subparsers.add_parser('health', help='Compact cluster health summary')
health_parser.add_argument('-v', '--verbose', action='store_true', help='Show full detailed output')
health_parser.set_defaults(func=cluster_health)cluster_health function · python · L49-L51 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_health(args):
"""Execute compact cluster health check"""
check_cluster.main(verbose=getattr(args, 'verbose', False))cluster_status_verbose function · python · L54-L56 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_status_verbose(args):
"""Execute verbose cluster status check"""
check_cluster.main(verbose=True)cluster_populate_local_node function · python · L59-L62 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_populate_local_node(args):
"""Populate local node information in etcd"""
from ..utils import populate_local_node
populate_local_node.populate_local_node()_set_host_state function · python · L65-L86 (22 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def _set_host_state(hostname, action):
"""Helper to enable/disable a host via API"""
import sys
storage_leader = get_storage_leader_ip()
if not storage_leader:
print("Error: Could not determine storage leader", file=sys.stderr)
sys.exit(1)
url = f"http://{storage_leader}:12723/api/host/{hostname}/{action}"
try:
response = requests.post(url, timeout=10)
if response.status_code == 200:
print(f"{action.title()}d host: {hostname}")
elif response.status_code == 404:
print(f"Error: Host '{hostname}' not found", file=sys.stderr)
sys.exit(1)
else:
print(f"Error: {response.json().get('error', 'Unknown error')}", file=sys.stderr)
sys.exit(1)
except requests.exceptions.RequestException as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)cluster_disable_host function · python · L89-L91 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_disable_host(args):
"""Disable a host so it doesn't appear in status page"""
_set_host_state(args.hostname, 'disable')Repobility — same analyzer, your code, free for public repos · /scan/
cluster_enable_host function · python · L94-L96 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_enable_host(args):
"""Re-enable a host so it appears in status page"""
_set_host_state(args.hostname, 'enable')get_storage_leader_ip function · python · L99-L115 (17 LOC)config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def get_storage_leader_ip():
"""Get storage leader IP from etcd"""
from ..common.etcd_utils import get_etcd_client
import json
try:
client = get_etcd_client()
result = client.get('/cluster/leader/app')
if not result[0]:
return None
leader_hostname = result[0].decode()
result = client.get(f'/cluster/nodes/by-hostname/{leader_hostname}')
if result[0]:
allocation = json.loads(result[0].decode())
return allocation.get('ip')
except:
pass
return Noneregister_dhcp_commands function · python · L8-L35 (28 LOC)config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def register_dhcp_commands(subparsers):
"""Register DHCP management commands"""
dhcp_parser = subparsers.add_parser('dhcp', help='DHCP lease and allocation management')
dhcp_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=dhcp_parser)
dhcp_subparsers = dhcp_parser.add_subparsers(dest='dhcp_command', help='DHCP commands')
# List commands
list_parser = dhcp_subparsers.add_parser('list', help='List DHCP leases and allocations')
list_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=list_parser)
list_subparsers = list_parser.add_subparsers(dest='list_type', help='What to list')
allocations_parser = list_subparsers.add_parser('allocations', help='List node allocations')
allocations_parser.set_defaults(func=dhcp_list_allocations)
leases_parser = list_subparsers.add_parser('leases', help='List DHCP leases')
leases_parser.set_defaults(func=dhcp_list_leases)
all_parser = list_subpdhcp_list_allocations function · python · L38-L40 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_list_allocations(args):
"""List node allocations"""
lease_manager.list_allocations()dhcp_list_leases function · python · L43-L45 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_list_leases(args):
"""List DHCP leases"""
lease_manager.list_leases()dhcp_list_all function · python · L48-L51 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_list_all(args):
"""List both allocations and leases"""
lease_manager.list_allocations()
lease_manager.list_leases()dhcp_delete function · python · L54-L69 (16 LOC)config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_delete(args):
"""Delete DHCP entries"""
target = args.target
# Auto-detect if target is hostname or MAC address
if ':' in target or len(target.replace(':', '').replace('-', '')) == 12:
# Looks like a MAC address
success = lease_manager.delete_by_mac(target)
else:
# Assume it's a hostname
success = lease_manager.delete_by_hostname(target)
if success:
print("Deletion completed successfully")
else:
print("Deletion failed")
sys.exit(1)dhcp_update_hosts function · python · L72-L75 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_update_hosts(args):
"""Update static DNS hosts from etcd"""
from ..utils import update_dhcp_hosts
update_dhcp_hosts.main()Repobility (the analyzer behind this table) · https://repobility.com
register_frontend_commands function · python · L7-L32 (26 LOC)config/ansible/admin/files/ycluster/ycluster/cli/frontend.py
def register_frontend_commands(subparsers):
"""Register frontend management commands"""
frontend_parser = subparsers.add_parser('frontend', help='Frontend node management')
frontend_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=frontend_parser)
frontend_subparsers = frontend_parser.add_subparsers(dest='frontend_command', help='Frontend commands')
# List command
list_parser = frontend_subparsers.add_parser('list', help='List all frontend nodes')
list_parser.set_defaults(func=frontend_list)
# Add command
add_parser = frontend_subparsers.add_parser('add', help='Add a frontend node')
add_parser.add_argument('name', help='Node name')
add_parser.add_argument('address', help='IP address or hostname')
add_parser.add_argument('--description', help='Optional description')
add_parser.set_defaults(func=frontend_add)
# Delete command
del_parser = frontend_subparsers.add_parser('delete', help='Delete frontend_list function · python · L35-L37 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/frontend.py
def frontend_list(args):
"""List frontend nodes"""
frontend_manager.list_frontend_nodes()frontend_add function · python · L40-L42 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/frontend.py
def frontend_add(args):
"""Add frontend node"""
frontend_manager.add_frontend_node(args.name, args.address, args.description)