← back to devrandom__YCluster

Function bodies 355 total

All specs Real LLM only Function bodies
get_leadership_status function · python · L2235-L2255 (21 LOC)
config/ansible/admin/files/app.py
def get_leadership_status():
    """Get current leadership status from etcd"""
    try:
        client = get_etcd_client()
        leadership = {}
        
        # Get storage leader
        result = client.get('/cluster/leader/app')
        if result[0]:
            storage_leader = result[0].decode()
            leadership['storage_leader'] = storage_leader
        
        # Get DHCP leader
        result = client.get('/cluster/leader/dhcp')
        if result[0]:
            dhcp_leader = result[0].decode()
            leadership['dhcp_leader'] = dhcp_leader
            
        return leadership
    except:
        return {}
cluster_status_api function · python · L2258-L2283 (26 LOC)
config/ansible/admin/files/app.py
def cluster_status_api():
    """API endpoint returning cluster status as JSON"""
    hosts = get_all_hosts()
    host_health = {}
    leadership = get_leadership_status()
    certificate_status = check_certificate_expiry()

    # Get health status for each host (skip disabled hosts)
    for host in hosts:
        if host.get('disabled', False):
            host_health[host['hostname']] = {'status': 'disabled', 'services': []}
        else:
            host_health[host['hostname']] = get_host_health(host['ip'])
    
    # Extract VIP status from existing health data
    vip_status = get_cluster_vip_status(host_health)
    
    return jsonify({
        'hosts': hosts,
        'hostHealth': host_health,
        'leadership': leadership,
        'vipStatus': vip_status,
        'certificateStatus': certificate_status,
        'respondingHostname': platform.node(),
        'timestamp': datetime.now().isoformat()
    })
static_files function · python · L2286-L2301 (16 LOC)
config/ansible/admin/files/app.py
def static_files(filename):
    """Serve static files"""
    import os
    from flask import Response
    # Get the directory where this script is located
    script_dir = os.path.dirname(os.path.abspath(__file__))
    static_dir = os.path.join(script_dir, 'static')
    
    # Get the response first
    response = send_from_directory(static_dir, filename)
    
    # Set correct MIME type for JavaScript files
    if filename.endswith('.js'):
        response.headers['Content-Type'] = 'application/javascript'
    
    return response
status_page function · python · L2304-L2332 (29 LOC)
config/ansible/admin/files/app.py
def status_page():
    """Web page showing cluster-wide health status"""
    hosts = get_all_hosts()
    host_health = {}
    leadership = get_leadership_status()
    certificate_status = check_certificate_expiry()

    # Get health status for each host (skip disabled hosts)
    for host in hosts:
        if host.get('disabled', False):
            host_health[host['hostname']] = {'status': 'disabled', 'services': []}
        else:
            host_health[host['hostname']] = get_host_health(host['ip'])
    
    # Extract VIP status from existing health data
    vip_status = get_cluster_vip_status(host_health)
    
    # Get drain status for this node
    current_node_drained = is_node_drained()
    
    return render_template('status.html', 
                         hosts=hosts, 
                         host_health=host_health,
                         leadership=leadership,
                         vip_status=vip_status,
                         certificate_status=certificate_status,
get_healthchecks_url function · python · L19-L29 (11 LOC)
config/ansible/admin/files/scripts/cluster-heartbeat.py
def get_healthchecks_url():
    """Get healthchecks URL from etcd"""
    try:
        client = get_etcd_client()
        result = client.get(HEALTHCHECKS_ETCD_KEY)
        if result[0]:
            return result[0].decode()
        return None
    except Exception as e:
        print(f"Failed to get healthchecks URL from etcd: {str(e)}")
        return None
get_cluster_health function · python · L32-L128 (97 LOC)
config/ansible/admin/files/scripts/cluster-heartbeat.py
def get_cluster_health():
    """Get comprehensive cluster health status"""
    health_data = {
        'timestamp': datetime.now().isoformat(),
        'hostname': socket.gethostname(),
        'services': {},
        'nodes': {},
        'overall': 'healthy'
    }
    
    try:
        # Check local health API
        response = requests.get('http://localhost:12723/api/health', timeout=10)
        if response.status_code in [200, 503]:
            local_health = response.json()
            health_data['services'] = local_health.get('services', {})
            health_data['overall'] = local_health.get('overall', 'unknown')
            health_data['storage_leader'] = local_health.get('storage_leader', False)
            health_data['dhcp_leader'] = local_health.get('dhcp_leader', False)
    except Exception as e:
        health_data['error'] = f"Failed to get local health: {str(e)}"
        health_data['overall'] = 'error'
    
    # Get cluster-wide status
    try:
        response = 
determine_health_status function · python · L130-L156 (27 LOC)
config/ansible/admin/files/scripts/cluster-heartbeat.py
def determine_health_status(health_data):
    """Determine overall health status and exit code"""
    # Critical failures (exit code 2)
    critical_conditions = [
        health_data.get('overall') == 'unhealthy',
        health_data.get('nodes', {}).get('unreachable', 0) > 1,
        health_data.get('certificate', {}).get('status') in ['expired', 'critical'],
        'etcd' in health_data.get('services', {}) and 
            health_data['services']['etcd'].get('status') == 'unhealthy',
        'ceph' in health_data.get('services', {}) and 
            health_data['services']['ceph'].get('status') == 'unhealthy'
    ]
    
    # Warning conditions (exit code 1)
    warning_conditions = [
        health_data.get('overall') == 'degraded',
        health_data.get('nodes', {}).get('unhealthy', 0) > 0,
        health_data.get('nodes', {}).get('unreachable', 0) == 1,
        health_data.get('certificate', {}).get('status') == 'warning'
    ]
    
    if any(critical_conditions):
        ret
Repobility — same analyzer, your code, free for public repos · /scan/
format_health_message function · python · L158-L228 (71 LOC)
config/ansible/admin/files/scripts/cluster-heartbeat.py
def format_health_message(health_data, status):
    """Format health data into a human-readable message"""
    lines = []
    lines.append(f"Cluster Status: {status.upper()}")
    lines.append(f"Time: {health_data['timestamp']}")
    lines.append(f"Reporting Node: {health_data['hostname']}")
    
    # Node status
    nodes = health_data.get('nodes', {})
    if nodes:
        lines.append(f"Nodes: {nodes.get('healthy', 0)}/{nodes.get('total', 0)} healthy")
        if nodes.get('unhealthy_names'):
            lines.append(f"  Unhealthy: {', '.join(nodes['unhealthy_names'])}")
        if nodes.get('unreachable_names'):
            lines.append(f"  Unreachable: {', '.join(nodes['unreachable_names'])}")
    
    # Unhealthy services by node
    unhealthy_by_node = health_data.get('unhealthy_services_by_node', {})
    if unhealthy_by_node:
        for hostname, services in unhealthy_by_node.items():
            lines.append(f"  {hostname}: {', '.join(services)}")
    
    # Leadership
    l
send_heartbeat function · python · L230-L282 (53 LOC)
config/ansible/admin/files/scripts/cluster-heartbeat.py
def send_heartbeat():
    """Send heartbeat to healthchecks.io"""
    # Get healthchecks URL from etcd
    healthchecks_url = get_healthchecks_url()
    if not healthchecks_url:
        print("No healthchecks URL configured in etcd")
        return False
    
    try:
        # Get health data
        health_data = get_cluster_health()
        
        # Determine status
        exit_code, status = determine_health_status(health_data)
        
        # Format message
        message = format_health_message(health_data, status)
        print(message)
        
        # Send appropriate ping
        if exit_code == 0:
            # Success ping
            response = requests.post(healthchecks_url, 
                                    data=message,
                                    timeout=10)
        elif exit_code == 1:
            # Warning - use exit code endpoint
            response = requests.post(f"{healthchecks_url}/1", 
                                    data=message,
     
main function · python · L284-L288 (5 LOC)
config/ansible/admin/files/scripts/cluster-heartbeat.py
def main():
    """Main function - single heartbeat execution"""
    # Send single heartbeat
    success = send_heartbeat()
    sys.exit(0 if success else 1)
get_version function · python · L7-L13 (7 LOC)
config/ansible/admin/files/ycluster/setup.py
def get_version():
    version_file = os.path.join(os.path.dirname(__file__), 'ycluster', '__init__.py')
    with open(version_file) as f:
        for line in f:
            if line.startswith('__version__'):
                return line.split('=')[1].strip().strip('"\'')
    return '0.1.0'
get_requirements function · python · L16-L21 (6 LOC)
config/ansible/admin/files/ycluster/setup.py
def get_requirements():
    requirements_file = os.path.join(os.path.dirname(__file__), 'requirements.txt')
    if os.path.exists(requirements_file):
        with open(requirements_file) as f:
            return [line.strip() for line in f if line.strip() and not line.startswith('#')]
    return []
register_backup_commands function · python · L8-L74 (67 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def register_backup_commands(subparsers):
    """Register backup-related commands."""
    backup_parser = subparsers.add_parser('backup', help='Backup encryption management')
    backup_parser.set_defaults(func=handle_backup_command)
    backup_subparsers = backup_parser.add_subparsers(dest='backup_command', help='Backup commands')
    
    # Recipients management
    recipients_parser = backup_subparsers.add_parser('recipients', help='Manage backup encryption recipients')
    recipients_subparsers = recipients_parser.add_subparsers(dest='recipients_command', help='Recipients commands')
    
    # List recipients
    list_parser = recipients_subparsers.add_parser('list', help='List backup encryption recipients')
    list_parser.add_argument('--json', action='store_true', help='Output in JSON format')
    list_parser.set_defaults(func=backup_recipients_list)
    
    # Add recipient
    add_parser = recipients_subparsers.add_parser('add', help='Add backup encryption recipient')
    add_
backup_recipients_list function · python · L77-L127 (51 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_recipients_list(args):
    """List all backup encryption recipients."""
    try:
        client = get_etcd_client()
        
        # Get all recipients from etcd
        result = client.get_prefix('/cluster/backup/recipients/')
        
        recipients = []
        
        for value, metadata in result:
            try:
                recipient_data = json.loads(value.decode('utf-8'))
                name = metadata.key.decode('utf-8').split('/')[-1]
                
                recipient_info = {
                    'name': name,
                    'public_key': recipient_data.get('public_key', 'N/A'),
                    'created_at': recipient_data.get('created_at', 'N/A')
                }
                
                if recipient_data.get('description'):
                    recipient_info['description'] = recipient_data['description']
                
                recipients.append(recipient_info)
                
            except (json.JSONDecodeEr
backup_recipients_add function · python · L130-L167 (38 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_recipients_add(args):
    """Add a backup encryption recipient."""
    try:
        # Validate public key format
        if not args.public_key.startswith('age1'):
            print("Error: Public key must start with 'age1'")
            sys.exit(1)
        
        client = get_etcd_client()
        key = f'/cluster/backup/recipients/{args.name}'
        
        # Check if recipient already exists
        existing = client.get(key)
        if existing[0] is not None:
            print(f"Error: Recipient '{args.name}' already exists")
            sys.exit(1)
        
        # Create recipient data
        from datetime import datetime
        recipient_data = {
            'public_key': args.public_key,
            'created_at': datetime.utcnow().isoformat() + 'Z'
        }
        
        if args.description:
            recipient_data['description'] = args.description
        
        # Store in etcd
        client.put(key, json.dumps(recipient_data))
        
        p
Repobility (the analyzer behind this table) · https://repobility.com
backup_recipients_remove function · python · L170-L189 (20 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_recipients_remove(args):
    """Remove a backup encryption recipient."""
    try:
        client = get_etcd_client()
        key = f'/cluster/backup/recipients/{args.name}'
        
        # Check if recipient exists
        existing = client.get(key)
        if existing[0] is None:
            print(f"Error: Recipient '{args.name}' not found")
            sys.exit(1)
        
        # Remove from etcd
        client.delete(key)
        
        print(f"Removed backup encryption recipient: {args.name}")
        
    except Exception as e:
        print(f"Error removing recipient: {e}")
        sys.exit(1)
backup_recipients_show function · python · L192-L219 (28 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_recipients_show(args):
    """Show details of a specific recipient."""
    try:
        client = get_etcd_client()
        key = f'/cluster/backup/recipients/{args.name}'
        
        result = client.get(key)
        if result[0] is None:
            print(f"Error: Recipient '{args.name}' not found")
            sys.exit(1)
        
        try:
            recipient_data = json.loads(result[0].decode('utf-8'))
            
            print(f"Recipient: {args.name}")
            print("-" * 30)
            print(f"Public Key: {recipient_data.get('public_key', 'N/A')}")
            if recipient_data.get('description'):
                print(f"Description: {recipient_data['description']}")
            print(f"Added: {recipient_data.get('created_at', 'N/A')}")
            
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            print(f"Error parsing recipient data: {e}")
            sys.exit(1)
            
    except Exception as e:
        print(f"Err
backup_destinations_list function · python · L222-L269 (48 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_list(args):
    """List all backup rsync destinations."""
    try:
        client = get_etcd_client()
        
        # Get all destinations from etcd
        result = client.get_prefix('/cluster/backup/destinations/')
        
        destinations = []
        
        for value, metadata in result:
            try:
                destination_data = json.loads(value.decode('utf-8'))
                name = metadata.key.decode('utf-8').split('/')[-1]
                
                destination_info = {
                    'name': name,
                    'url': destination_data.get('url', 'N/A'),
                    'enabled': destination_data.get('enabled', True),
                    'created_at': destination_data.get('created_at', 'N/A')
                }
                
                destinations.append(destination_info)
                
            except (json.JSONDecodeError, UnicodeDecodeError) as e:
                print(f"Error parsing destination
backup_destinations_add function · python · L272-L308 (37 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_add(args):
    """Add a backup rsync destination."""
    try:
        client = get_etcd_client()
        key = f'/cluster/backup/destinations/{args.name}'
        
        # Check if destination already exists
        existing = client.get(key)
        if existing[0] is not None:
            print(f"Error: Destination '{args.name}' already exists")
            sys.exit(1)
        
        # Determine enabled status
        enabled = True
        if args.disabled:
            enabled = False
        elif hasattr(args, 'enabled'):
            enabled = args.enabled
        
        # Create destination data
        from datetime import datetime
        destination_data = {
            'url': args.url,
            'enabled': enabled,
            'created_at': datetime.utcnow().isoformat() + 'Z'
        }
        
        # Store in etcd
        client.put(key, json.dumps(destination_data))
        
        status = "enabled" if enabled else "disabled"
        print
backup_destinations_remove function · python · L311-L330 (20 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_remove(args):
    """Remove a backup rsync destination."""
    try:
        client = get_etcd_client()
        key = f'/cluster/backup/destinations/{args.name}'
        
        # Check if destination exists
        existing = client.get(key)
        if existing[0] is None:
            print(f"Error: Destination '{args.name}' not found")
            sys.exit(1)
        
        # Remove from etcd
        client.delete(key)
        
        print(f"Removed backup rsync destination: {args.name}")
        
    except Exception as e:
        print(f"Error removing destination: {e}")
        sys.exit(1)
backup_destinations_show function · python · L333-L359 (27 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_show(args):
    """Show details of a specific destination."""
    try:
        client = get_etcd_client()
        key = f'/cluster/backup/destinations/{args.name}'
        
        result = client.get(key)
        if result[0] is None:
            print(f"Error: Destination '{args.name}' not found")
            sys.exit(1)
        
        try:
            destination_data = json.loads(result[0].decode('utf-8'))
            
            status = "ENABLED" if destination_data.get('enabled', True) else "DISABLED"
            print(f"Destination: {args.name} ({status})")
            print("-" * 30)
            print(f"URL: {destination_data.get('url', 'N/A')}")
            print(f"Added: {destination_data.get('created_at', 'N/A')}")
            
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            print(f"Error parsing destination data: {e}")
            sys.exit(1)
            
    except Exception as e:
        print(f"Error showing destina
backup_destinations_enable function · python · L362-L389 (28 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_enable(args):
    """Enable a backup rsync destination."""
    try:
        client = get_etcd_client()
        key = f'/cluster/backup/destinations/{args.name}'
        
        # Check if destination exists
        result = client.get(key)
        if result[0] is None:
            print(f"Error: Destination '{args.name}' not found")
            sys.exit(1)
        
        try:
            destination_data = json.loads(result[0].decode('utf-8'))
            destination_data['enabled'] = True
            
            # Update in etcd
            client.put(key, json.dumps(destination_data))
            
            print(f"Enabled backup rsync destination: {args.name}")
            
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            print(f"Error parsing destination data: {e}")
            sys.exit(1)
            
    except Exception as e:
        print(f"Error enabling destination: {e}")
        sys.exit(1)
backup_destinations_disable function · python · L392-L419 (28 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def backup_destinations_disable(args):
    """Disable a backup rsync destination."""
    try:
        client = get_etcd_client()
        key = f'/cluster/backup/destinations/{args.name}'
        
        # Check if destination exists
        result = client.get(key)
        if result[0] is None:
            print(f"Error: Destination '{args.name}' not found")
            sys.exit(1)
        
        try:
            destination_data = json.loads(result[0].decode('utf-8'))
            destination_data['enabled'] = False
            
            # Update in etcd
            client.put(key, json.dumps(destination_data))
            
            print(f"Disabled backup rsync destination: {args.name}")
            
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            print(f"Error parsing destination data: {e}")
            sys.exit(1)
            
    except Exception as e:
        print(f"Error disabling destination: {e}")
        sys.exit(1)
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
handle_backup_command function · python · L422-L431 (10 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/backup.py
def handle_backup_command(args):
    """Handle backup command dispatch."""
    if not hasattr(args, 'backup_command') or not args.backup_command:
        print("Error: No backup command specified")
        print("Available commands: recipients, destinations")
        sys.exit(1)
    
    # This should not be reached since subparsers handle the dispatch
    print(f"Error: Unknown backup command: {args.backup_command}")
    sys.exit(1)
register_certbot_commands function · python · L12-L50 (39 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def register_certbot_commands(subparsers):
    """Register certbot management commands"""
    certbot_parser = subparsers.add_parser('certbot', help='Let\'s Encrypt certificate management')
    certbot_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=certbot_parser)
    certbot_subparsers = certbot_parser.add_subparsers(dest='certbot_command', help='Certbot commands')
    
    # Obtain command
    obtain_parser = certbot_subparsers.add_parser('obtain', help='Obtain a new certificate')
    obtain_parser.add_argument('--test', action='store_true', help='Use staging server (test certificate)')
    obtain_parser.add_argument('-n', '--non-interactive', action='store_true', help='Run in non-interactive mode')
    obtain_parser.set_defaults(func=certbot_obtain)
    
    # Renew command
    renew_parser = certbot_subparsers.add_parser('renew', help='Renew existing certificates')
    renew_parser.add_argument('-n', '--non-interactive', action='store_true', help='Run in non
certbot_obtain function · python · L53-L56 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_obtain(args):
    """Obtain certificate"""
    success = certbot_manager.obtain_certificate(test_cert=args.test, non_interactive=args.non_interactive)
    sys.exit(0 if success else 1)
certbot_renew function · python · L59-L62 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_renew(args):
    """Renew certificates"""
    success = certbot_manager.renew_certificates(non_interactive=args.non_interactive)
    sys.exit(0 if success else 1)
certbot_list function · python · L65-L68 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_list(args):
    """List certificates"""
    success = certbot_manager.list_certificates()
    sys.exit(0 if success else 1)
certbot_revoke function · python · L71-L74 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_revoke(args):
    """Revoke certificate"""
    success = certbot_manager.revoke_certificate(args.domain)
    sys.exit(0 if success else 1)
certbot_delete function · python · L77-L80 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_delete(args):
    """Delete certificate"""
    success = certbot_manager.delete_certificate(args.domain)
    sys.exit(0 if success else 1)
certbot_update_nginx function · python · L83-L86 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_update_nginx(args):
    """Update nginx configuration"""
    success = certbot_manager.update_nginx_configs(local_mode=getattr(args, 'local', False))
    sys.exit(0 if success else 1)
All rows scored by the Repobility analyzer (https://repobility.com)
certbot_status function · python · L89-L154 (66 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/certbot.py
def certbot_status(args):
    """Show certificate status"""
    config = certbot_manager.get_https_config()
    domains = certbot_manager.get_all_domains(config)
    
    print("HTTPS Configuration:")
    if 'domain' in config:
        print(f"Primary domain: {config['domain']}")
    if config.get('aliases'):
        print(f"Aliases: {', '.join(config['aliases'])}")
    if config.get('email'):
        print(f"Email: {config['email']}")
    else:
        print("Email: Not configured (will use --register-unsafely-without-email)")
    
    # Check if TLS materials exist
    try:
        client = get_etcd_client()
        key_value, _ = client.get('/cluster/tls/key')
        cert_value, _ = client.get('/cluster/tls/cert')
        
        if key_value and cert_value:
            print("TLS materials: Present in etcd")
            
            # Check nginx paths
            nginx_key = Path('/etc/nginx/ssl/key.pem')
            nginx_cert = Path('/etc/nginx/ssl/cert.pem')
            
    
register_cluster_commands function · python · L9-L39 (31 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def register_cluster_commands(subparsers):
    """Register cluster management commands"""
    cluster_parser = subparsers.add_parser('cluster', help='Cluster status and health management')
    cluster_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=cluster_parser)
    cluster_subparsers = cluster_parser.add_subparsers(dest='cluster_command', help='Cluster commands')

    # Status command (verbose by default for backward compat)
    status_parser = cluster_subparsers.add_parser('status', help='Detailed cluster status (verbose)')
    status_parser.set_defaults(func=cluster_status_verbose)

    # Health command (compact by default)
    health_parser = cluster_subparsers.add_parser('health', help='Compact cluster health summary')
    health_parser.add_argument('-v', '--verbose', action='store_true', help='Show full detailed output')
    health_parser.set_defaults(func=cluster_health)

    # Top-level health alias is registered in register_health_alias()
    

    # P
register_health_alias function · python · L42-L46 (5 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def register_health_alias(subparsers):
    """Register top-level 'health' command as alias for 'cluster health'"""
    health_parser = subparsers.add_parser('health', help='Compact cluster health summary')
    health_parser.add_argument('-v', '--verbose', action='store_true', help='Show full detailed output')
    health_parser.set_defaults(func=cluster_health)
cluster_health function · python · L49-L51 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_health(args):
    """Execute compact cluster health check"""
    check_cluster.main(verbose=getattr(args, 'verbose', False))
cluster_status_verbose function · python · L54-L56 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_status_verbose(args):
    """Execute verbose cluster status check"""
    check_cluster.main(verbose=True)
cluster_populate_local_node function · python · L59-L62 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_populate_local_node(args):
    """Populate local node information in etcd"""
    from ..utils import populate_local_node
    populate_local_node.populate_local_node()
_set_host_state function · python · L65-L86 (22 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def _set_host_state(hostname, action):
    """Helper to enable/disable a host via API"""
    import sys
    storage_leader = get_storage_leader_ip()
    if not storage_leader:
        print("Error: Could not determine storage leader", file=sys.stderr)
        sys.exit(1)

    url = f"http://{storage_leader}:12723/api/host/{hostname}/{action}"
    try:
        response = requests.post(url, timeout=10)
        if response.status_code == 200:
            print(f"{action.title()}d host: {hostname}")
        elif response.status_code == 404:
            print(f"Error: Host '{hostname}' not found", file=sys.stderr)
            sys.exit(1)
        else:
            print(f"Error: {response.json().get('error', 'Unknown error')}", file=sys.stderr)
            sys.exit(1)
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)
cluster_disable_host function · python · L89-L91 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_disable_host(args):
    """Disable a host so it doesn't appear in status page"""
    _set_host_state(args.hostname, 'disable')
Repobility — same analyzer, your code, free for public repos · /scan/
cluster_enable_host function · python · L94-L96 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def cluster_enable_host(args):
    """Re-enable a host so it appears in status page"""
    _set_host_state(args.hostname, 'enable')
get_storage_leader_ip function · python · L99-L115 (17 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/cluster.py
def get_storage_leader_ip():
    """Get storage leader IP from etcd"""
    from ..common.etcd_utils import get_etcd_client
    import json
    try:
        client = get_etcd_client()
        result = client.get('/cluster/leader/app')
        if not result[0]:
            return None
        leader_hostname = result[0].decode()
        result = client.get(f'/cluster/nodes/by-hostname/{leader_hostname}')
        if result[0]:
            allocation = json.loads(result[0].decode())
            return allocation.get('ip')
    except:
        pass
    return None
register_dhcp_commands function · python · L8-L35 (28 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def register_dhcp_commands(subparsers):
    """Register DHCP management commands"""
    dhcp_parser = subparsers.add_parser('dhcp', help='DHCP lease and allocation management')
    dhcp_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=dhcp_parser)
    dhcp_subparsers = dhcp_parser.add_subparsers(dest='dhcp_command', help='DHCP commands')
    
    # List commands
    list_parser = dhcp_subparsers.add_parser('list', help='List DHCP leases and allocations')
    list_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=list_parser)
    list_subparsers = list_parser.add_subparsers(dest='list_type', help='What to list')
    
    allocations_parser = list_subparsers.add_parser('allocations', help='List node allocations')
    allocations_parser.set_defaults(func=dhcp_list_allocations)
    
    leases_parser = list_subparsers.add_parser('leases', help='List DHCP leases')
    leases_parser.set_defaults(func=dhcp_list_leases)
    
    all_parser = list_subp
dhcp_list_allocations function · python · L38-L40 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_list_allocations(args):
    """List node allocations"""
    lease_manager.list_allocations()
dhcp_list_leases function · python · L43-L45 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_list_leases(args):
    """List DHCP leases"""
    lease_manager.list_leases()
dhcp_list_all function · python · L48-L51 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_list_all(args):
    """List both allocations and leases"""
    lease_manager.list_allocations()
    lease_manager.list_leases()
dhcp_delete function · python · L54-L69 (16 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_delete(args):
    """Delete DHCP entries"""
    target = args.target
    # Auto-detect if target is hostname or MAC address
    if ':' in target or len(target.replace(':', '').replace('-', '')) == 12:
        # Looks like a MAC address
        success = lease_manager.delete_by_mac(target)
    else:
        # Assume it's a hostname
        success = lease_manager.delete_by_hostname(target)
    
    if success:
        print("Deletion completed successfully")
    else:
        print("Deletion failed")
        sys.exit(1)
dhcp_update_hosts function · python · L72-L75 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/dhcp.py
def dhcp_update_hosts(args):
    """Update static DNS hosts from etcd"""
    from ..utils import update_dhcp_hosts
    update_dhcp_hosts.main()
Repobility (the analyzer behind this table) · https://repobility.com
register_frontend_commands function · python · L7-L32 (26 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/frontend.py
def register_frontend_commands(subparsers):
    """Register frontend management commands"""
    frontend_parser = subparsers.add_parser('frontend', help='Frontend node management')
    frontend_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=frontend_parser)
    frontend_subparsers = frontend_parser.add_subparsers(dest='frontend_command', help='Frontend commands')
    
    # List command
    list_parser = frontend_subparsers.add_parser('list', help='List all frontend nodes')
    list_parser.set_defaults(func=frontend_list)
    
    # Add command
    add_parser = frontend_subparsers.add_parser('add', help='Add a frontend node')
    add_parser.add_argument('name', help='Node name')
    add_parser.add_argument('address', help='IP address or hostname')
    add_parser.add_argument('--description', help='Optional description')
    add_parser.set_defaults(func=frontend_add)
    
    # Delete command
    del_parser = frontend_subparsers.add_parser('delete', help='Delete 
frontend_list function · python · L35-L37 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/frontend.py
def frontend_list(args):
    """List frontend nodes"""
    frontend_manager.list_frontend_nodes()
frontend_add function · python · L40-L42 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/frontend.py
def frontend_add(args):
    """Add frontend node"""
    frontend_manager.add_frontend_node(args.name, args.address, args.description)
‹ prevpage 2 / 8next ›