← back to devrandom__YCluster

Function bodies 355 total

All specs Real LLM only Function bodies
ca_generate_server function · python · L139-L146 (8 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def ca_generate_server(args):
    """Generate server certificate"""
    from ..utils import ca_manager
    try:
        ca_manager.generate_server_cert(args.hostname, args.san)
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)
ca_list function · python · L149-L152 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def ca_list(args):
    """List certificates"""
    from ..utils import ca_manager
    ca_manager.list_certificates()
ca_revoke function · python · L155-L162 (8 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def ca_revoke(args):
    """Revoke certificate"""
    from ..utils import ca_manager
    try:
        ca_manager.revoke_certificate(args.hostname)
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)
register_wg_commands function · python · L11-L51 (41 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def register_wg_commands(subparsers):
    p = subparsers.add_parser('wg', help='WireGuard overlay management')
    p.set_defaults(func=lambda args: args.parser.print_help(), parser=p)
    sub = p.add_subparsers(dest='wg_command', help='wg commands')

    init_p = sub.add_parser('init', help='Initialize wg server (generate keys, set endpoints)')
    init_p.add_argument('endpoints', nargs='+', metavar='HOST[:PORT]',
                        help='Public endpoint(s) clients will dial. Port defaults to 51820.')
    init_p.add_argument('--port', type=int, help='Listen port (default: 51820)')
    init_p.add_argument('--rotate', action='store_true', help='Rotate server keypair')
    init_p.set_defaults(func=_init)

    show_p = sub.add_parser('show', help='Show server configuration')
    show_p.set_defaults(func=_show)

    list_p = sub.add_parser('list', help='List peers')
    list_p.add_argument('--pending', action='store_true', help='Only show pending')
    list_p.add_argument('--approved',
_init function · python · L54-L60 (7 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _init(args):
    server = wg_config.init_server(args.endpoints, port=args.port, rotate=args.rotate)
    print(f"wg server initialized")
    print(f"  pubkey:    {server['pubkey']}")
    print(f"  port:      {server['port']}")
    print(f"  endpoints: {', '.join(server['endpoints'])}")
    print(f"  server_ip: {server['server_ip']}")
_show function · python · L63-L70 (8 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _show(args):
    server = wg_config.get_server()
    if not server:
        print("not initialized", file=sys.stderr)
        sys.exit(1)
    redacted = {k: v for k, v in server.items() if k != 'privkey'}
    redacted['privkey'] = '***'
    print(json.dumps(redacted, indent=2))
_list function · python · L73-L103 (31 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _list(args):
    peers = wg_config.list_peers()
    if args.pending:
        peers = [p for p in peers if p[1]['status'] == 'pending']
    if args.approved:
        peers = [p for p in peers if p[1]['status'] == 'approved']

    if args.as_json:
        out = []
        for hostname, peer, node in peers:
            out.append({
                'hostname': hostname,
                'status': peer['status'],
                'ip': node['ip'] if node else None,
                'type': node['type'] if node else None,
                'pubkey_sha256': peer.get('pubkey_sha256'),
                'created_at': peer.get('created_at'),
                'approved_at': peer.get('approved_at'),
            })
        print(json.dumps(out, indent=2))
        return

    if not peers:
        print("no peers")
        return
    print(f"{'HOSTNAME':<10} {'STATUS':<10} {'TYPE':<10} {'IP':<16} {'FP':<18} CREATED")
    for hostname, peer, node in peers:
        ip = node['ip'] if node else '-'
       
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
_approve function · python · L106-L112 (7 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _approve(args):
    peer = wg_config.set_peer_status(args.hostname, 'approved')
    print(f"{args.hostname}: approved")
    try:
        wg_config.reconcile()
    except Exception as e:
        print(f"warning: reconcile failed: {e}", file=sys.stderr)
_revoke function · python · L115-L121 (7 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _revoke(args):
    wg_config.set_peer_status(args.hostname, 'revoked')
    print(f"{args.hostname}: revoked")
    try:
        wg_config.reconcile()
    except Exception as e:
        print(f"warning: reconcile failed: {e}", file=sys.stderr)
_delete function · python · L124-L130 (7 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _delete(args):
    wg_config.delete_peer(args.hostname)
    print(f"{args.hostname}: deleted")
    try:
        wg_config.reconcile()
    except Exception as e:
        print(f"warning: reconcile failed: {e}", file=sys.stderr)
_render function · python · L137-L141 (5 LOC)
config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _render(args):
    if args.client:
        print(wg_config.render_client_config(args.client))
    else:
        print(wg_config.render_server_config())
get_etcd_hosts function · python · L6-L20 (15 LOC)
config/ansible/admin/files/ycluster/ycluster/common/etcd_utils.py
def get_etcd_hosts():
    """Return etcd hosts from environment.

    Checks ETCD_HOSTS (comma separated) then ETCD_HOST.
    Falls back to localhost if nothing is set.
    """
    hosts = os.environ.get('ETCD_HOSTS')
    if hosts:
        host_list = [h.strip() for h in hosts.split(',') if h.strip()]
        if host_list:
            return host_list
    host = os.environ.get('ETCD_HOST')
    if host:
        return [host.strip()]
    return ['localhost:2379']
connect_with_retry function · python · L23-L45 (23 LOC)
config/ansible/admin/files/ycluster/ycluster/common/etcd_utils.py
def connect_with_retry(hosts, max_retries=3, retry_delay=1, grpc_options=None):
    """Attempt to connect to etcd using host list with retries."""
    grpc_options = grpc_options or [('grpc.enable_http_proxy', 0)]
    last_errors = []
    
    for attempt in range(max_retries):
        attempt_errors = []
        for host_port in hosts:
            try:
                host, port = host_port.split(':')
                client = etcd3.client(host=host, port=int(port), grpc_options=grpc_options)
                client.status()
                return client
            except Exception as e:
                attempt_errors.append(f"{host_port}: {str(e)}")
                continue
        
        last_errors = attempt_errors
        if attempt < max_retries - 1:
            time.sleep(retry_delay)
    
    error_details = "; ".join(last_errors)
    raise ConnectionError(f"Could not connect to any etcd host after {max_retries} attempts. Errors: {error_details}")
get_etcd_client function · python · L51-L70 (20 LOC)
config/ansible/admin/files/ycluster/ycluster/common/etcd_utils.py
def get_etcd_client(hosts=None, max_retries=3, retry_delay=1, grpc_options=None):
    """Return a cached etcd client, creating it with retries if needed."""
    global _CACHED_CLIENT
    if _CACHED_CLIENT:
        try:
            _CACHED_CLIENT.status()
            return _CACHED_CLIENT
        except Exception as e:
            print(f"Cached etcd client failed health check: {e}")
            _CACHED_CLIENT = None

    hosts = hosts or get_etcd_hosts()
    try:
        _CACHED_CLIENT = connect_with_retry(
            hosts, max_retries=max_retries, retry_delay=retry_delay, grpc_options=grpc_options
        )
        return _CACHED_CLIENT
    except ConnectionError as e:
        print(f"Failed to establish etcd connection: {e}")
        raise
main function · python · L8-L11 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/services/dhcp_server.py
def main():
    """Main entry point for DHCP server service"""
    # Import and run the original DHCP server
    dhcp_main()
Repobility (the analyzer behind this table) · https://repobility.com
ensure_ca_directory function · python · L23-L26 (4 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def ensure_ca_directory():
    """Ensure CA directory structure exists"""
    os.makedirs(CA_BASE_PATH, mode=0o700, exist_ok=True)
    os.makedirs(CERTS_PATH, mode=0o755, exist_ok=True)
is_storage_leader function · python · L28-L41 (14 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def is_storage_leader():
    """Check if this node is the storage leader"""
    try:
        client = get_etcd_client()
        leader_key = '/cluster/leader/app'
        leader_value, _ = client.get(leader_key)
        if leader_value:
            leader_hostname = leader_value.decode().strip()
            current_hostname = os.uname().nodename
            return leader_hostname == current_hostname
    except Exception as e:
        print(f"Error checking storage leader status: {e}")
        return False
    return False
generate_ca function · python · L43-L112 (70 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def generate_ca():
    """Generate a new Certificate Authority"""
    if not is_storage_leader():
        raise Exception("CA operations can only be performed on the storage leader")
    
    ensure_ca_directory()
    
    # Generate CA private key
    ca_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=4096,
    )
    
    # Create CA certificate
    subject = issuer = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Local"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, "Local"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, "YCluster"),
        x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "Certificate Authority"),
        x509.NameAttribute(NameOID.COMMON_NAME, "YCluster Root CA"),
    ])
    
    ca_cert = x509.CertificateBuilder().subject_name(
        subject
    ).issuer_name(
        issuer
    ).public_key(
        ca_key.public_key()
    ).serial_n
load_ca function · python · L114-L125 (12 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def load_ca():
    """Load existing CA certificate and key"""
    if not os.path.exists(CA_CERT_PATH) or not os.path.exists(CA_KEY_PATH):
        raise Exception("CA certificate or key not found. Generate CA first.")
    
    with open(CA_CERT_PATH, 'rb') as f:
        ca_cert = x509.load_pem_x509_certificate(f.read())
    
    with open(CA_KEY_PATH, 'rb') as f:
        ca_key = serialization.load_pem_private_key(f.read(), password=None)
    
    return ca_cert, ca_key
generate_server_cert function · python · L127-L222 (96 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def generate_server_cert(hostname, san_list=None):
    """Generate a server certificate signed by the CA"""
    if not is_storage_leader():
        raise Exception("Certificate generation can only be performed on the storage leader")
    
    ensure_ca_directory()
    ca_cert, ca_key = load_ca()
    
    if san_list is None:
        san_list = [hostname]
    
    # Generate server private key
    server_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
    )
    
    # Create server certificate
    subject = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Local"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, "Local"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, "YCluster"),
        x509.NameAttribute(NameOID.COMMON_NAME, hostname),
    ])
    
    # Build SAN list
    san_names = []
    for san in san_list:
        try:
            # Try to parse as IP addr
list_certificates function · python · L224-L262 (39 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def list_certificates():
    """List all generated certificates"""
    if not os.path.exists(CERTS_PATH):
        print("No certificates directory found")
        return
    
    cert_files = list(Path(CERTS_PATH).glob("*.crt"))
    if not cert_files:
        print("No certificates found")
        return
    
    print("Generated certificates:")
    for cert_file in sorted(cert_files):
        try:
            with open(cert_file, 'rb') as f:
                cert = x509.load_pem_x509_certificate(f.read())
            
            hostname = cert_file.stem
            subject_cn = None
            for attr in cert.subject:
                if attr.oid == NameOID.COMMON_NAME:
                    subject_cn = attr.value
                    break
            
            print(f"  {hostname}:")
            print(f"    Subject: {subject_cn}")
            print(f"    Valid from: {cert.not_valid_before}")
            print(f"    Valid until: {cert.not_valid_after}")
            
            # 
get_ca_info function · python · L264-L276 (13 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def get_ca_info():
    """Get CA certificate information"""
    try:
        ca_cert, _ = load_ca()
        print("CA Certificate Information:")
        print(f"Subject: {ca_cert.subject}")
        print(f"Valid from: {ca_cert.not_valid_before}")
        print(f"Valid until: {ca_cert.not_valid_after}")
        print(f"Serial number: {ca_cert.serial_number}")
        return True
    except Exception as e:
        print(f"Error loading CA: {e}")
        return False
revoke_certificate function · python · L278-L298 (21 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def revoke_certificate(hostname):
    """Revoke a certificate (remove files)"""
    if not is_storage_leader():
        raise Exception("Certificate revocation can only be performed on the storage leader")
    
    cert_file = f"{CERTS_PATH}/{hostname}.crt"
    key_file = f"{CERTS_PATH}/{hostname}.key"
    
    removed = False
    if os.path.exists(cert_file):
        os.remove(cert_file)
        print(f"Removed certificate: {cert_file}")
        removed = True
    
    if os.path.exists(key_file):
        os.remove(key_file)
        print(f"Removed private key: {key_file}")
        removed = True
    
    if not removed:
        print(f"No certificate found for {hostname}")
Repobility — same analyzer, your code, free for public repos · /scan/
get_ca_cert_path function · python · L300-L302 (3 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def get_ca_cert_path():
    """Get the path to the CA certificate"""
    return CA_CERT_PATH
get_server_cert_paths function · python · L304-L308 (5 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def get_server_cert_paths(hostname):
    """Get paths to server certificate and key"""
    cert_file = f"{CERTS_PATH}/{hostname}.crt"
    key_file = f"{CERTS_PATH}/{hostname}.key"
    return cert_file, key_file
write_tls_key_to_temp function · python · L15-L36 (22 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def write_tls_key_to_temp():
    """Write TLS private key from etcd to temporary file for CSR generation"""
    client = get_etcd_client()
    
    # Get key from etcd
    key_value, _ = client.get('/cluster/tls/key')
    
    if not key_value:
        print("No TLS private key found in etcd. Generate one with 'tls-config generate' first.")
        return None
    
    # Create temporary file for the key
    temp_key = tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False)
    temp_key.write(key_value.decode())
    temp_key.close()
    
    # Set proper permissions
    os.chmod(temp_key.name, 0o600)
    
    print(f"TLS key written to temporary file: {temp_key.name}")
    
    return temp_key.name
write_tls_key_to_nginx function · python · L38-L61 (24 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def write_tls_key_to_nginx():
    """Write TLS private key from etcd to nginx path"""
    client = get_etcd_client()
    
    # Get key from etcd
    key_value, _ = client.get('/cluster/tls/key')
    
    if not key_value:
        print("No TLS private key found in etcd. Generate one with 'tls-config generate' first.")
        return None
    
    # Nginx SSL paths (matching the template)
    ssl_dir = Path('/etc/nginx/ssl')
    ssl_dir.mkdir(parents=True, exist_ok=True)
    
    key_path = ssl_dir / 'key.pem'
    
    # Write key only
    key_path.write_text(key_value.decode())
    key_path.chmod(0o600)
    
    print(f"TLS key written to: {key_path}")
    
    return str(key_path)
generate_csr function · python · L63-L109 (47 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def generate_csr(key_path, domains):
    """Generate a Certificate Signing Request using our existing private key"""
    primary_domain = domains[0]
    
    # Create temporary CSR file
    temp_csr = tempfile.NamedTemporaryFile(mode='w', suffix='.csr', delete=False)
    temp_csr.close()
    
    # Build openssl command to generate CSR
    cmd = [
        'openssl', 'req', '-new',
        '-key', key_path,
        '-out', temp_csr.name,
        '-subj', f'/CN={primary_domain}',
        '-config', '/dev/stdin'
    ]
    
    # Create config for SAN (Subject Alternative Names)
    config = f"""[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req

[req_distinguished_name]

[v3_req]
subjectAltName = @alt_names

[alt_names]
"""
    
    # Add all domains as SAN entries
    for i, domain in enumerate(domains, 1):
        config += f"DNS.{i} = {domain}\n"
    
    try:
        print(f"Generating CSR for domains: {', '.join(domains)}")
        result = subprocess.run(cmd, 
discover_nginx_instance_subdomains function · python · L111-L125 (15 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def discover_nginx_instance_subdomains():
    """Discover instance subdomains from nginx template files"""
    sites = discover_nginx_templates()
    
    # Extract instance names from template files that have underscores
    instances = []
    for site_name, _ in sites:
        # Check if this is an instance template (has an underscore in the name)
        if '_' in site_name:
            # Extract instance name (everything after the last underscore)
            instance = site_name.split('_')[-1]
            if instance not in instances:
                instances.append(instance)
    
    return instances
discover_nginx_templates function · python · L127-L144 (18 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def discover_nginx_templates():
    """Discover all nginx template files in /etc/nginx/templates"""
    templates_dir = Path('/etc/nginx/templates')
    
    if not templates_dir.exists():
        print(f"Templates directory not found: {templates_dir}")
        return []
    
    # Find all .j2 template files
    template_files = list(templates_dir.glob('*.conf.j2'))
    
    # Convert to site names (remove .conf.j2 suffix)
    sites = []
    for template_file in template_files:
        site_name = template_file.name.removesuffix('.conf.j2')
        sites.append((site_name, template_file))
    
    return sites
update_nginx_configs function · python · L146-L227 (82 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def update_nginx_configs(local_mode=False):
    """Update nginx configurations from templates with the primary domain from etcd or localhost"""
    if local_mode:
        # Local mode: use localhost and skip etcd
        primary_domain = "localhost"
        use_tls = False
        print("Using local mode - domain: localhost, TLS: disabled")
    else:
        # Normal mode: get config from etcd
        config = get_https_config()
        
        if 'domain' not in config:
            print("No primary domain configured - cannot generate nginx configs")
            return False
        
        primary_domain = config['domain']
        use_tls = True
    
    sites = discover_nginx_templates()
    
    if not sites:
        print("No nginx templates found")
        return False
    
    updated_count = 0
    
    for site_name, template_path in sites:
        nginx_config_path = Path(f'/etc/nginx/sites-available/{site_name}')
        
        print(f"Processing template: {template_path}
Powered by Repobility — scan your code at https://repobility.com
ensure_nginx_cert_from_etcd function · python · L229-L246 (18 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def ensure_nginx_cert_from_etcd():
    """Ensure nginx has a certificate from etcd (fallback for when no Let's Encrypt cert exists)"""
    client = get_etcd_client()
    
    cert_value, _ = client.get('/cluster/tls/cert')
    if not cert_value:
        return False
    
    cert_path = Path('/etc/nginx/ssl/cert.pem')
    cert_path.parent.mkdir(parents=True, exist_ok=True)
    
    # Only write if it doesn't exist (don't overwrite Let's Encrypt certs)
    if not cert_path.exists():
        cert_path.write_text(cert_value.decode())
        cert_path.chmod(0o644)
        print(f"TLS certificate written to: {cert_path}")
    
    return True
get_https_config function · python · L248-L277 (30 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def get_https_config():
    """Get HTTPS configuration from etcd"""
    client = get_etcd_client()
    
    domain_key = '/cluster/https/domain'
    aliases_key = '/cluster/https/aliases'
    email_key = '/cluster/https/email'
    
    domain_value, _ = client.get(domain_key)
    aliases_value, _ = client.get(aliases_key)
    email_value, _ = client.get(email_key)
    
    config = {}
    
    if domain_value:
        config['domain'] = domain_value.decode().strip()
    
    if aliases_value:
        try:
            config['aliases'] = json.loads(aliases_value.decode())
        except json.JSONDecodeError:
            config['aliases'] = []
    else:
        config['aliases'] = []
    
    if email_value:
        config['email'] = email_value.decode().strip()
    # No default email - will use --register-unsafely-without-email if not set
    
    return config
get_all_domains function · python · L279-L298 (20 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def get_all_domains(config):
    """Get all domains (primary + aliases + admin subdomain + instance subdomains) as a list"""
    domains = []
    
    if 'domain' in config:
        primary_domain = config['domain']
        domains.append(primary_domain)
        # Always add admin subdomain
        admin_domain = f"admin.{primary_domain}"
        domains.append(admin_domain)
        
        # Add subdomains for any nginx templates that have instance suffixes
        instances = discover_nginx_instance_subdomains()
        for instance in instances:
            instance_domain = f"{instance}.{primary_domain}"
            if instance_domain not in domains:
                domains.append(instance_domain)
    
    domains.extend(config.get('aliases', []))
    return domains
run_certbot_command function · python · L300-L309 (10 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def run_certbot_command(cmd, check=True):
    """Run certbot command and return result"""
    print(f"Running: {' '.join(cmd)}")
    
    try:
        result = subprocess.run(cmd, check=check)
        return result
    except subprocess.CalledProcessError as e:
        print(f"Command failed with exit code {e.returncode}")
        raise
stop_fetch_tls_timer function · python · L311-L317 (7 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def stop_fetch_tls_timer():
    """Stop the fetch-tls-certs timer to prevent race conditions"""
    try:
        subprocess.run(['systemctl', 'stop', 'fetch-tls-certs.timer'], check=False)
        print("Stopped fetch-tls-certs timer")
    except Exception as e:
        print(f"Warning: Could not stop fetch-tls-certs timer: {e}")
start_fetch_tls_timer function · python · L319-L325 (7 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def start_fetch_tls_timer():
    """Start the fetch-tls-certs timer"""
    try:
        subprocess.run(['systemctl', 'start', 'fetch-tls-certs.timer'], check=False)
        print("Started fetch-tls-certs timer")
    except Exception as e:
        print(f"Warning: Could not start fetch-tls-certs timer: {e}")
obtain_certificate function · python · L327-L461 (135 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def obtain_certificate(test_cert=False, non_interactive=False):
    """Obtain a new certificate from Let's Encrypt using CSR mode"""
    config = get_https_config()
    domains = get_all_domains(config)
    
    if not domains:
        print("No domains configured. Use 'https-config set-domain <domain>' first.")
        return False
    
    if 'domain' not in config:
        print("No primary domain configured. Use 'https-config set-domain <domain>' first.")
        return False
    
    primary_domain = config['domain']
    email = config.get('email')
    
    print(f"Obtaining certificate for domains: {', '.join(domains)}")
    print(f"Primary domain: {primary_domain}")
    if email:
        print(f"Email: {email}")
    else:
        print("No email configured - using --register-unsafely-without-email")
    
    # Write TLS key to temporary file for CSR generation
    temp_key_path = write_tls_key_to_temp()
    if not temp_key_path:
        return False
    
    # Generate CSR with 
renew_certificates function · python · L463-L581 (119 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def renew_certificates(non_interactive=False):
    """Renew existing certificates using CSR mode"""
    print("Renewing certificates...")
    
    config = get_https_config()
    if 'domain' not in config:
        print("No primary domain configured for renewal")
        return False
    
    primary_domain = config['domain']
    domains = get_all_domains(config)
    
    # Check if we have a certificate in etcd (our source of truth)
    client = get_etcd_client()
    cert_value, _ = client.get('/cluster/tls/cert')
    if not cert_value:
        print("No existing certificate found in etcd")
        return False
    
    # Write TLS key to temporary file for CSR generation
    temp_key_path = write_tls_key_to_temp()
    if not temp_key_path:
        return False
    
    # Generate CSR with our existing private key
    csr_path = generate_csr(temp_key_path, domains)
    if not csr_path:
        # Clean up temp key file
        try:
            os.unlink(temp_key_path)
        except:
 
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
store_cert_info function · python · L583-L602 (20 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def store_cert_info(primary_domain, domains):
    """Store certificate information in etcd"""
    try:
        client = get_etcd_client()
        
        cert_info = {
            'primary_domain': primary_domain,
            'domains': domains,
            'cert_path': f'/etc/letsencrypt/live/{primary_domain}/fullchain.pem',
            'key_path': f'/etc/letsencrypt/live/{primary_domain}/privkey.pem',
            'nginx_cert_path': '/etc/nginx/ssl/cert.pem',
            'nginx_key_path': '/etc/nginx/ssl/key.pem',
            'updated_at': subprocess.run(['date', '-Iseconds'], capture_output=True, text=True).stdout.strip()
        }
        
        key = '/cluster/https/certificate_info'
        client.put(key, json.dumps(cert_info))
        print("Certificate information stored in etcd")
    except Exception as e:
        print(f"Warning: Could not store certificate info in etcd: {e}")
list_certificates function · python · L605-L647 (43 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def list_certificates():
    """List certificate status from etcd"""
    print("Certificate status:")
    
    try:
        config = get_https_config()
        domains = get_all_domains(config)
        
        if not domains:
            print("No domains configured")
            return True
        
        print(f"Configured domains: {', '.join(domains)}")
        
        # Check etcd for certificate
        client = get_etcd_client()
        cert_value, _ = client.get('/cluster/tls/cert')
        key_value, _ = client.get('/cluster/tls/key')
        
        if cert_value and key_value:
            print("Certificate: Present in etcd")
            
            # Try to get certificate info
            try:
                import subprocess
                result = subprocess.run(['openssl', 'x509', '-noout', '-text'], 
                                      input=cert_value.decode(), text=True, 
                                      capture_output=True)
                if result.re
revoke_certificate function · python · L649-L689 (41 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def revoke_certificate(domain):
    """Revoke a certificate using the certificate from etcd"""
    print(f"Revoking certificate for {domain}...")
    
    try:
        client = get_etcd_client()
        cert_value, _ = client.get('/cluster/tls/cert')
        
        if not cert_value:
            print("No certificate found in etcd")
            return False
        
        # Write certificate to temporary file for revocation
        temp_cert = tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False)
        temp_cert.write(cert_value.decode())
        temp_cert.close()
        
        cmd = ['certbot', 'revoke', '--cert-path', temp_cert.name]
        
        try:
            run_certbot_command(cmd)
            print("Certificate revoked successfully!")
            
            # Remove from etcd
            client.delete('/cluster/tls/cert')
            print("Certificate removed from etcd")
            
            return True
        finally:
            # Clean up t
delete_certificate function · python · L691-L712 (22 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def delete_certificate(domain):
    """Delete certificate from etcd and nginx"""
    print(f"Deleting certificate for {domain}...")
    
    try:
        client = get_etcd_client()
        
        # Remove certificate from etcd (keep the key)
        client.delete('/cluster/tls/cert')
        print("Certificate removed from etcd")
        
        # Remove nginx certificate file
        nginx_cert_path = Path('/etc/nginx/ssl/cert.pem')
        if nginx_cert_path.exists():
            nginx_cert_path.unlink()
            print("Certificate removed from nginx")
        
        print("Certificate deleted successfully!")
        return True
    except Exception as e:
        print(f"Error deleting certificate: {e}")
        return False
get_cluster_members function · python · L31-L52 (22 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def get_cluster_members(initial_host):
    """Get all etcd cluster members using etcdctl"""
    try:
        import subprocess
        result = subprocess.run(['etcdctl', 'member', 'list', '--write-out=json'], 
                              capture_output=True, text=True, timeout=5)
        if result.returncode == 0:
            members_data = json.loads(result.stdout)
            members = []
            for member in members_data.get('members', []):
                for url in member.get('clientURLs', []):
                    if url.startswith('http://'):
                        ip = url.replace('http://', '').split(':')[0]
                        members.append(f"{ip}:2379")
                        break
            if members:
                return members
    except Exception as e:
        print(f"Error using etcdctl: {e}")
    
    # Fallback to initial host
    return [initial_host]
check_etcd_host function · python · L54-L94 (41 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def check_etcd_host(host_port):
    """Check health of a single etcd host"""
    try:
        host, port = host_port.split(':')
        client = etcd3.client(host=host, port=int(port), grpc_options=GRPC_OPTIONS)
        
        # Get status
        status = client.status()
        
        # Get member list and find current member
        members = list(client.members)
        current_member_id = None
        
        # Find the member ID for the current host we're connected to
        for member in members:
            for url in member.client_urls:
                if host in url:
                    current_member_id = member.id
                    break
            if current_member_id:
                break
        
        # Check if this specific member is the leader
        is_leader = current_member_id == status.leader.id if current_member_id else False
        
        return {
            'host': host_port,
            'healthy': True,
            'version': status.version,
get_all_nodes function · python · L96-L110 (15 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def get_all_nodes(client):
    """Get all nodes from etcd"""
    nodes = []
    try:
        # Get all keys with prefix
        for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
            try:
                node_data = json.loads(value.decode('utf-8'))
                nodes.append(node_data)
            except json.JSONDecodeError:
                continue
    except Exception as e:
        print(f"Error getting nodes: {e}")
    
    return nodes
check_dhcp_service_on_host function · python · L112-L140 (29 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def check_dhcp_service_on_host(host_ip):
    """Check if DHCP server is running on a specific host using health monitoring endpoint"""
    try:
        url = f"http://{host_ip}:{DHCP_HEALTH_PORT}/health"
        with urllib.request.urlopen(url, timeout=5) as response:
            health_data = json.loads(response.read().decode())
            
            if response.status == 200 and health_data.get('status') == 'healthy':
                return {
                    'host': host_ip, 'running': True, 'method': 'health_endpoint',
                    'etcd_connected': health_data.get('etcd_connected', False),
                    'server_ip': health_data.get('server_ip', host_ip)
                }
            else:
                status = health_data.get('status', f'HTTP {response.status}')
                return {
                    'host': host_ip, 'running': False, 'method': 'health_endpoint',
                    'error': f'DHCP server unhealthy: {status}'
                }
         
Repobility (the analyzer behind this table) · https://repobility.com
check_dnsmasq_service_on_host function · python · L142-L216 (75 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def check_dnsmasq_service_on_host(host_ip, hostname):
    """Check if DNS service is running on a specific host and can resolve hostname"""
    try:
        import dns.resolver
        resolver = dns.resolver.Resolver()
        resolver.nameservers = [host_ip]
        resolver.timeout = 3
        resolver.lifetime = 3
        
        # Try to resolve the hostname and its AMT counterpart
        results = {'host': host_ip, 'hostname': hostname, 'running': True, 'dns_working': True, 'method': 'dns_query'}
        
        try:
            # Test main hostname
            answer = resolver.resolve(hostname, 'A')
            resolved_ip = str(answer[0])
            results['resolved_ip'] = resolved_ip
            
            # Test AMT hostname if this is a regular node (not already AMT)
            if not hostname.endswith('a'):
                amt_hostname = f"{hostname}a"
                try:
                    amt_answer = resolver.resolve(amt_hostname, 'A')
                    amt_
fetch_cluster_status function · python · L219-L227 (9 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def fetch_cluster_status() -> dict[str, Any]:
    """Fetch aggregated cluster status from the storage VIP admin API."""
    try:
        url = f"http://{STORAGE_VIP}:{ADMIN_API_PORT}/api/cluster-status"
        with urllib.request.urlopen(url, timeout=5) as response:
            data: dict[str, Any] = json.loads(response.read().decode())
            return data
    except Exception as e:
        return {'_error': str(e)}
_svc_status_str function · python · L230-L244 (15 LOC)
config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def _svc_status_str(details: Any) -> str:
    """Extract a human-readable string from a service details field."""
    if isinstance(details, dict):
        if 'message' in details:
            return details['message']
        # Construct summary from common service detail fields
        parts = []
        if details.get('service_active'):
            parts.append('active')
        if details.get('port_open'):
            parts.append('port open')
        if parts:
            return ', '.join(parts)
        return str(details)
    return str(details) if details else ''
‹ prevpage 4 / 8next ›