Function bodies 355 total
ca_generate_server function · python · L139-L146 (8 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def ca_generate_server(args):
"""Generate server certificate"""
from ..utils import ca_manager
try:
ca_manager.generate_server_cert(args.hostname, args.san)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)ca_list function · python · L149-L152 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def ca_list(args):
"""List certificates"""
from ..utils import ca_manager
ca_manager.list_certificates()ca_revoke function · python · L155-L162 (8 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def ca_revoke(args):
"""Revoke certificate"""
from ..utils import ca_manager
try:
ca_manager.revoke_certificate(args.hostname)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)register_wg_commands function · python · L11-L51 (41 LOC)config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def register_wg_commands(subparsers):
p = subparsers.add_parser('wg', help='WireGuard overlay management')
p.set_defaults(func=lambda args: args.parser.print_help(), parser=p)
sub = p.add_subparsers(dest='wg_command', help='wg commands')
init_p = sub.add_parser('init', help='Initialize wg server (generate keys, set endpoints)')
init_p.add_argument('endpoints', nargs='+', metavar='HOST[:PORT]',
help='Public endpoint(s) clients will dial. Port defaults to 51820.')
init_p.add_argument('--port', type=int, help='Listen port (default: 51820)')
init_p.add_argument('--rotate', action='store_true', help='Rotate server keypair')
init_p.set_defaults(func=_init)
show_p = sub.add_parser('show', help='Show server configuration')
show_p.set_defaults(func=_show)
list_p = sub.add_parser('list', help='List peers')
list_p.add_argument('--pending', action='store_true', help='Only show pending')
list_p.add_argument('--approved',_init function · python · L54-L60 (7 LOC)config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _init(args):
server = wg_config.init_server(args.endpoints, port=args.port, rotate=args.rotate)
print(f"wg server initialized")
print(f" pubkey: {server['pubkey']}")
print(f" port: {server['port']}")
print(f" endpoints: {', '.join(server['endpoints'])}")
print(f" server_ip: {server['server_ip']}")_show function · python · L63-L70 (8 LOC)config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _show(args):
server = wg_config.get_server()
if not server:
print("not initialized", file=sys.stderr)
sys.exit(1)
redacted = {k: v for k, v in server.items() if k != 'privkey'}
redacted['privkey'] = '***'
print(json.dumps(redacted, indent=2))_list function · python · L73-L103 (31 LOC)config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _list(args):
peers = wg_config.list_peers()
if args.pending:
peers = [p for p in peers if p[1]['status'] == 'pending']
if args.approved:
peers = [p for p in peers if p[1]['status'] == 'approved']
if args.as_json:
out = []
for hostname, peer, node in peers:
out.append({
'hostname': hostname,
'status': peer['status'],
'ip': node['ip'] if node else None,
'type': node['type'] if node else None,
'pubkey_sha256': peer.get('pubkey_sha256'),
'created_at': peer.get('created_at'),
'approved_at': peer.get('approved_at'),
})
print(json.dumps(out, indent=2))
return
if not peers:
print("no peers")
return
print(f"{'HOSTNAME':<10} {'STATUS':<10} {'TYPE':<10} {'IP':<16} {'FP':<18} CREATED")
for hostname, peer, node in peers:
ip = node['ip'] if node else '-'
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
_approve function · python · L106-L112 (7 LOC)config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _approve(args):
peer = wg_config.set_peer_status(args.hostname, 'approved')
print(f"{args.hostname}: approved")
try:
wg_config.reconcile()
except Exception as e:
print(f"warning: reconcile failed: {e}", file=sys.stderr)_revoke function · python · L115-L121 (7 LOC)config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _revoke(args):
wg_config.set_peer_status(args.hostname, 'revoked')
print(f"{args.hostname}: revoked")
try:
wg_config.reconcile()
except Exception as e:
print(f"warning: reconcile failed: {e}", file=sys.stderr)_delete function · python · L124-L130 (7 LOC)config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _delete(args):
wg_config.delete_peer(args.hostname)
print(f"{args.hostname}: deleted")
try:
wg_config.reconcile()
except Exception as e:
print(f"warning: reconcile failed: {e}", file=sys.stderr)_render function · python · L137-L141 (5 LOC)config/ansible/admin/files/ycluster/ycluster/cli/wg.py
def _render(args):
if args.client:
print(wg_config.render_client_config(args.client))
else:
print(wg_config.render_server_config())get_etcd_hosts function · python · L6-L20 (15 LOC)config/ansible/admin/files/ycluster/ycluster/common/etcd_utils.py
def get_etcd_hosts():
"""Return etcd hosts from environment.
Checks ETCD_HOSTS (comma separated) then ETCD_HOST.
Falls back to localhost if nothing is set.
"""
hosts = os.environ.get('ETCD_HOSTS')
if hosts:
host_list = [h.strip() for h in hosts.split(',') if h.strip()]
if host_list:
return host_list
host = os.environ.get('ETCD_HOST')
if host:
return [host.strip()]
return ['localhost:2379']connect_with_retry function · python · L23-L45 (23 LOC)config/ansible/admin/files/ycluster/ycluster/common/etcd_utils.py
def connect_with_retry(hosts, max_retries=3, retry_delay=1, grpc_options=None):
"""Attempt to connect to etcd using host list with retries."""
grpc_options = grpc_options or [('grpc.enable_http_proxy', 0)]
last_errors = []
for attempt in range(max_retries):
attempt_errors = []
for host_port in hosts:
try:
host, port = host_port.split(':')
client = etcd3.client(host=host, port=int(port), grpc_options=grpc_options)
client.status()
return client
except Exception as e:
attempt_errors.append(f"{host_port}: {str(e)}")
continue
last_errors = attempt_errors
if attempt < max_retries - 1:
time.sleep(retry_delay)
error_details = "; ".join(last_errors)
raise ConnectionError(f"Could not connect to any etcd host after {max_retries} attempts. Errors: {error_details}")get_etcd_client function · python · L51-L70 (20 LOC)config/ansible/admin/files/ycluster/ycluster/common/etcd_utils.py
def get_etcd_client(hosts=None, max_retries=3, retry_delay=1, grpc_options=None):
"""Return a cached etcd client, creating it with retries if needed."""
global _CACHED_CLIENT
if _CACHED_CLIENT:
try:
_CACHED_CLIENT.status()
return _CACHED_CLIENT
except Exception as e:
print(f"Cached etcd client failed health check: {e}")
_CACHED_CLIENT = None
hosts = hosts or get_etcd_hosts()
try:
_CACHED_CLIENT = connect_with_retry(
hosts, max_retries=max_retries, retry_delay=retry_delay, grpc_options=grpc_options
)
return _CACHED_CLIENT
except ConnectionError as e:
print(f"Failed to establish etcd connection: {e}")
raisemain function · python · L8-L11 (4 LOC)config/ansible/admin/files/ycluster/ycluster/services/dhcp_server.py
def main():
"""Main entry point for DHCP server service"""
# Import and run the original DHCP server
dhcp_main()Repobility (the analyzer behind this table) · https://repobility.com
ensure_ca_directory function · python · L23-L26 (4 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def ensure_ca_directory():
"""Ensure CA directory structure exists"""
os.makedirs(CA_BASE_PATH, mode=0o700, exist_ok=True)
os.makedirs(CERTS_PATH, mode=0o755, exist_ok=True)is_storage_leader function · python · L28-L41 (14 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def is_storage_leader():
"""Check if this node is the storage leader"""
try:
client = get_etcd_client()
leader_key = '/cluster/leader/app'
leader_value, _ = client.get(leader_key)
if leader_value:
leader_hostname = leader_value.decode().strip()
current_hostname = os.uname().nodename
return leader_hostname == current_hostname
except Exception as e:
print(f"Error checking storage leader status: {e}")
return False
return Falsegenerate_ca function · python · L43-L112 (70 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def generate_ca():
"""Generate a new Certificate Authority"""
if not is_storage_leader():
raise Exception("CA operations can only be performed on the storage leader")
ensure_ca_directory()
# Generate CA private key
ca_key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
)
# Create CA certificate
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Local"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Local"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "YCluster"),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, "Certificate Authority"),
x509.NameAttribute(NameOID.COMMON_NAME, "YCluster Root CA"),
])
ca_cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
ca_key.public_key()
).serial_nload_ca function · python · L114-L125 (12 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def load_ca():
"""Load existing CA certificate and key"""
if not os.path.exists(CA_CERT_PATH) or not os.path.exists(CA_KEY_PATH):
raise Exception("CA certificate or key not found. Generate CA first.")
with open(CA_CERT_PATH, 'rb') as f:
ca_cert = x509.load_pem_x509_certificate(f.read())
with open(CA_KEY_PATH, 'rb') as f:
ca_key = serialization.load_pem_private_key(f.read(), password=None)
return ca_cert, ca_keygenerate_server_cert function · python · L127-L222 (96 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def generate_server_cert(hostname, san_list=None):
"""Generate a server certificate signed by the CA"""
if not is_storage_leader():
raise Exception("Certificate generation can only be performed on the storage leader")
ensure_ca_directory()
ca_cert, ca_key = load_ca()
if san_list is None:
san_list = [hostname]
# Generate server private key
server_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Create server certificate
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Local"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Local"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "YCluster"),
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
])
# Build SAN list
san_names = []
for san in san_list:
try:
# Try to parse as IP addrlist_certificates function · python · L224-L262 (39 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def list_certificates():
"""List all generated certificates"""
if not os.path.exists(CERTS_PATH):
print("No certificates directory found")
return
cert_files = list(Path(CERTS_PATH).glob("*.crt"))
if not cert_files:
print("No certificates found")
return
print("Generated certificates:")
for cert_file in sorted(cert_files):
try:
with open(cert_file, 'rb') as f:
cert = x509.load_pem_x509_certificate(f.read())
hostname = cert_file.stem
subject_cn = None
for attr in cert.subject:
if attr.oid == NameOID.COMMON_NAME:
subject_cn = attr.value
break
print(f" {hostname}:")
print(f" Subject: {subject_cn}")
print(f" Valid from: {cert.not_valid_before}")
print(f" Valid until: {cert.not_valid_after}")
# get_ca_info function · python · L264-L276 (13 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def get_ca_info():
"""Get CA certificate information"""
try:
ca_cert, _ = load_ca()
print("CA Certificate Information:")
print(f"Subject: {ca_cert.subject}")
print(f"Valid from: {ca_cert.not_valid_before}")
print(f"Valid until: {ca_cert.not_valid_after}")
print(f"Serial number: {ca_cert.serial_number}")
return True
except Exception as e:
print(f"Error loading CA: {e}")
return Falserevoke_certificate function · python · L278-L298 (21 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def revoke_certificate(hostname):
"""Revoke a certificate (remove files)"""
if not is_storage_leader():
raise Exception("Certificate revocation can only be performed on the storage leader")
cert_file = f"{CERTS_PATH}/{hostname}.crt"
key_file = f"{CERTS_PATH}/{hostname}.key"
removed = False
if os.path.exists(cert_file):
os.remove(cert_file)
print(f"Removed certificate: {cert_file}")
removed = True
if os.path.exists(key_file):
os.remove(key_file)
print(f"Removed private key: {key_file}")
removed = True
if not removed:
print(f"No certificate found for {hostname}")Repobility — same analyzer, your code, free for public repos · /scan/
get_ca_cert_path function · python · L300-L302 (3 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def get_ca_cert_path():
"""Get the path to the CA certificate"""
return CA_CERT_PATHget_server_cert_paths function · python · L304-L308 (5 LOC)config/ansible/admin/files/ycluster/ycluster/utils/ca_manager.py
def get_server_cert_paths(hostname):
"""Get paths to server certificate and key"""
cert_file = f"{CERTS_PATH}/{hostname}.crt"
key_file = f"{CERTS_PATH}/{hostname}.key"
return cert_file, key_filewrite_tls_key_to_temp function · python · L15-L36 (22 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def write_tls_key_to_temp():
"""Write TLS private key from etcd to temporary file for CSR generation"""
client = get_etcd_client()
# Get key from etcd
key_value, _ = client.get('/cluster/tls/key')
if not key_value:
print("No TLS private key found in etcd. Generate one with 'tls-config generate' first.")
return None
# Create temporary file for the key
temp_key = tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False)
temp_key.write(key_value.decode())
temp_key.close()
# Set proper permissions
os.chmod(temp_key.name, 0o600)
print(f"TLS key written to temporary file: {temp_key.name}")
return temp_key.namewrite_tls_key_to_nginx function · python · L38-L61 (24 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def write_tls_key_to_nginx():
"""Write TLS private key from etcd to nginx path"""
client = get_etcd_client()
# Get key from etcd
key_value, _ = client.get('/cluster/tls/key')
if not key_value:
print("No TLS private key found in etcd. Generate one with 'tls-config generate' first.")
return None
# Nginx SSL paths (matching the template)
ssl_dir = Path('/etc/nginx/ssl')
ssl_dir.mkdir(parents=True, exist_ok=True)
key_path = ssl_dir / 'key.pem'
# Write key only
key_path.write_text(key_value.decode())
key_path.chmod(0o600)
print(f"TLS key written to: {key_path}")
return str(key_path)generate_csr function · python · L63-L109 (47 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def generate_csr(key_path, domains):
"""Generate a Certificate Signing Request using our existing private key"""
primary_domain = domains[0]
# Create temporary CSR file
temp_csr = tempfile.NamedTemporaryFile(mode='w', suffix='.csr', delete=False)
temp_csr.close()
# Build openssl command to generate CSR
cmd = [
'openssl', 'req', '-new',
'-key', key_path,
'-out', temp_csr.name,
'-subj', f'/CN={primary_domain}',
'-config', '/dev/stdin'
]
# Create config for SAN (Subject Alternative Names)
config = f"""[req]
distinguished_name = req_distinguished_name
req_extensions = v3_req
[req_distinguished_name]
[v3_req]
subjectAltName = @alt_names
[alt_names]
"""
# Add all domains as SAN entries
for i, domain in enumerate(domains, 1):
config += f"DNS.{i} = {domain}\n"
try:
print(f"Generating CSR for domains: {', '.join(domains)}")
result = subprocess.run(cmd, discover_nginx_instance_subdomains function · python · L111-L125 (15 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def discover_nginx_instance_subdomains():
"""Discover instance subdomains from nginx template files"""
sites = discover_nginx_templates()
# Extract instance names from template files that have underscores
instances = []
for site_name, _ in sites:
# Check if this is an instance template (has an underscore in the name)
if '_' in site_name:
# Extract instance name (everything after the last underscore)
instance = site_name.split('_')[-1]
if instance not in instances:
instances.append(instance)
return instancesdiscover_nginx_templates function · python · L127-L144 (18 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def discover_nginx_templates():
"""Discover all nginx template files in /etc/nginx/templates"""
templates_dir = Path('/etc/nginx/templates')
if not templates_dir.exists():
print(f"Templates directory not found: {templates_dir}")
return []
# Find all .j2 template files
template_files = list(templates_dir.glob('*.conf.j2'))
# Convert to site names (remove .conf.j2 suffix)
sites = []
for template_file in template_files:
site_name = template_file.name.removesuffix('.conf.j2')
sites.append((site_name, template_file))
return sitesupdate_nginx_configs function · python · L146-L227 (82 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def update_nginx_configs(local_mode=False):
"""Update nginx configurations from templates with the primary domain from etcd or localhost"""
if local_mode:
# Local mode: use localhost and skip etcd
primary_domain = "localhost"
use_tls = False
print("Using local mode - domain: localhost, TLS: disabled")
else:
# Normal mode: get config from etcd
config = get_https_config()
if 'domain' not in config:
print("No primary domain configured - cannot generate nginx configs")
return False
primary_domain = config['domain']
use_tls = True
sites = discover_nginx_templates()
if not sites:
print("No nginx templates found")
return False
updated_count = 0
for site_name, template_path in sites:
nginx_config_path = Path(f'/etc/nginx/sites-available/{site_name}')
print(f"Processing template: {template_path}Powered by Repobility — scan your code at https://repobility.com
ensure_nginx_cert_from_etcd function · python · L229-L246 (18 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def ensure_nginx_cert_from_etcd():
"""Ensure nginx has a certificate from etcd (fallback for when no Let's Encrypt cert exists)"""
client = get_etcd_client()
cert_value, _ = client.get('/cluster/tls/cert')
if not cert_value:
return False
cert_path = Path('/etc/nginx/ssl/cert.pem')
cert_path.parent.mkdir(parents=True, exist_ok=True)
# Only write if it doesn't exist (don't overwrite Let's Encrypt certs)
if not cert_path.exists():
cert_path.write_text(cert_value.decode())
cert_path.chmod(0o644)
print(f"TLS certificate written to: {cert_path}")
return Trueget_https_config function · python · L248-L277 (30 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def get_https_config():
"""Get HTTPS configuration from etcd"""
client = get_etcd_client()
domain_key = '/cluster/https/domain'
aliases_key = '/cluster/https/aliases'
email_key = '/cluster/https/email'
domain_value, _ = client.get(domain_key)
aliases_value, _ = client.get(aliases_key)
email_value, _ = client.get(email_key)
config = {}
if domain_value:
config['domain'] = domain_value.decode().strip()
if aliases_value:
try:
config['aliases'] = json.loads(aliases_value.decode())
except json.JSONDecodeError:
config['aliases'] = []
else:
config['aliases'] = []
if email_value:
config['email'] = email_value.decode().strip()
# No default email - will use --register-unsafely-without-email if not set
return configget_all_domains function · python · L279-L298 (20 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def get_all_domains(config):
"""Get all domains (primary + aliases + admin subdomain + instance subdomains) as a list"""
domains = []
if 'domain' in config:
primary_domain = config['domain']
domains.append(primary_domain)
# Always add admin subdomain
admin_domain = f"admin.{primary_domain}"
domains.append(admin_domain)
# Add subdomains for any nginx templates that have instance suffixes
instances = discover_nginx_instance_subdomains()
for instance in instances:
instance_domain = f"{instance}.{primary_domain}"
if instance_domain not in domains:
domains.append(instance_domain)
domains.extend(config.get('aliases', []))
return domainsrun_certbot_command function · python · L300-L309 (10 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def run_certbot_command(cmd, check=True):
"""Run certbot command and return result"""
print(f"Running: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, check=check)
return result
except subprocess.CalledProcessError as e:
print(f"Command failed with exit code {e.returncode}")
raisestop_fetch_tls_timer function · python · L311-L317 (7 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def stop_fetch_tls_timer():
"""Stop the fetch-tls-certs timer to prevent race conditions"""
try:
subprocess.run(['systemctl', 'stop', 'fetch-tls-certs.timer'], check=False)
print("Stopped fetch-tls-certs timer")
except Exception as e:
print(f"Warning: Could not stop fetch-tls-certs timer: {e}")start_fetch_tls_timer function · python · L319-L325 (7 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def start_fetch_tls_timer():
"""Start the fetch-tls-certs timer"""
try:
subprocess.run(['systemctl', 'start', 'fetch-tls-certs.timer'], check=False)
print("Started fetch-tls-certs timer")
except Exception as e:
print(f"Warning: Could not start fetch-tls-certs timer: {e}")obtain_certificate function · python · L327-L461 (135 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def obtain_certificate(test_cert=False, non_interactive=False):
"""Obtain a new certificate from Let's Encrypt using CSR mode"""
config = get_https_config()
domains = get_all_domains(config)
if not domains:
print("No domains configured. Use 'https-config set-domain <domain>' first.")
return False
if 'domain' not in config:
print("No primary domain configured. Use 'https-config set-domain <domain>' first.")
return False
primary_domain = config['domain']
email = config.get('email')
print(f"Obtaining certificate for domains: {', '.join(domains)}")
print(f"Primary domain: {primary_domain}")
if email:
print(f"Email: {email}")
else:
print("No email configured - using --register-unsafely-without-email")
# Write TLS key to temporary file for CSR generation
temp_key_path = write_tls_key_to_temp()
if not temp_key_path:
return False
# Generate CSR with renew_certificates function · python · L463-L581 (119 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def renew_certificates(non_interactive=False):
"""Renew existing certificates using CSR mode"""
print("Renewing certificates...")
config = get_https_config()
if 'domain' not in config:
print("No primary domain configured for renewal")
return False
primary_domain = config['domain']
domains = get_all_domains(config)
# Check if we have a certificate in etcd (our source of truth)
client = get_etcd_client()
cert_value, _ = client.get('/cluster/tls/cert')
if not cert_value:
print("No existing certificate found in etcd")
return False
# Write TLS key to temporary file for CSR generation
temp_key_path = write_tls_key_to_temp()
if not temp_key_path:
return False
# Generate CSR with our existing private key
csr_path = generate_csr(temp_key_path, domains)
if not csr_path:
# Clean up temp key file
try:
os.unlink(temp_key_path)
except:
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
store_cert_info function · python · L583-L602 (20 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def store_cert_info(primary_domain, domains):
"""Store certificate information in etcd"""
try:
client = get_etcd_client()
cert_info = {
'primary_domain': primary_domain,
'domains': domains,
'cert_path': f'/etc/letsencrypt/live/{primary_domain}/fullchain.pem',
'key_path': f'/etc/letsencrypt/live/{primary_domain}/privkey.pem',
'nginx_cert_path': '/etc/nginx/ssl/cert.pem',
'nginx_key_path': '/etc/nginx/ssl/key.pem',
'updated_at': subprocess.run(['date', '-Iseconds'], capture_output=True, text=True).stdout.strip()
}
key = '/cluster/https/certificate_info'
client.put(key, json.dumps(cert_info))
print("Certificate information stored in etcd")
except Exception as e:
print(f"Warning: Could not store certificate info in etcd: {e}")list_certificates function · python · L605-L647 (43 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def list_certificates():
"""List certificate status from etcd"""
print("Certificate status:")
try:
config = get_https_config()
domains = get_all_domains(config)
if not domains:
print("No domains configured")
return True
print(f"Configured domains: {', '.join(domains)}")
# Check etcd for certificate
client = get_etcd_client()
cert_value, _ = client.get('/cluster/tls/cert')
key_value, _ = client.get('/cluster/tls/key')
if cert_value and key_value:
print("Certificate: Present in etcd")
# Try to get certificate info
try:
import subprocess
result = subprocess.run(['openssl', 'x509', '-noout', '-text'],
input=cert_value.decode(), text=True,
capture_output=True)
if result.rerevoke_certificate function · python · L649-L689 (41 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def revoke_certificate(domain):
"""Revoke a certificate using the certificate from etcd"""
print(f"Revoking certificate for {domain}...")
try:
client = get_etcd_client()
cert_value, _ = client.get('/cluster/tls/cert')
if not cert_value:
print("No certificate found in etcd")
return False
# Write certificate to temporary file for revocation
temp_cert = tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False)
temp_cert.write(cert_value.decode())
temp_cert.close()
cmd = ['certbot', 'revoke', '--cert-path', temp_cert.name]
try:
run_certbot_command(cmd)
print("Certificate revoked successfully!")
# Remove from etcd
client.delete('/cluster/tls/cert')
print("Certificate removed from etcd")
return True
finally:
# Clean up tdelete_certificate function · python · L691-L712 (22 LOC)config/ansible/admin/files/ycluster/ycluster/utils/certbot_manager.py
def delete_certificate(domain):
"""Delete certificate from etcd and nginx"""
print(f"Deleting certificate for {domain}...")
try:
client = get_etcd_client()
# Remove certificate from etcd (keep the key)
client.delete('/cluster/tls/cert')
print("Certificate removed from etcd")
# Remove nginx certificate file
nginx_cert_path = Path('/etc/nginx/ssl/cert.pem')
if nginx_cert_path.exists():
nginx_cert_path.unlink()
print("Certificate removed from nginx")
print("Certificate deleted successfully!")
return True
except Exception as e:
print(f"Error deleting certificate: {e}")
return Falseget_cluster_members function · python · L31-L52 (22 LOC)config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def get_cluster_members(initial_host):
"""Get all etcd cluster members using etcdctl"""
try:
import subprocess
result = subprocess.run(['etcdctl', 'member', 'list', '--write-out=json'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
members_data = json.loads(result.stdout)
members = []
for member in members_data.get('members', []):
for url in member.get('clientURLs', []):
if url.startswith('http://'):
ip = url.replace('http://', '').split(':')[0]
members.append(f"{ip}:2379")
break
if members:
return members
except Exception as e:
print(f"Error using etcdctl: {e}")
# Fallback to initial host
return [initial_host]check_etcd_host function · python · L54-L94 (41 LOC)config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def check_etcd_host(host_port):
"""Check health of a single etcd host"""
try:
host, port = host_port.split(':')
client = etcd3.client(host=host, port=int(port), grpc_options=GRPC_OPTIONS)
# Get status
status = client.status()
# Get member list and find current member
members = list(client.members)
current_member_id = None
# Find the member ID for the current host we're connected to
for member in members:
for url in member.client_urls:
if host in url:
current_member_id = member.id
break
if current_member_id:
break
# Check if this specific member is the leader
is_leader = current_member_id == status.leader.id if current_member_id else False
return {
'host': host_port,
'healthy': True,
'version': status.version,
get_all_nodes function · python · L96-L110 (15 LOC)config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def get_all_nodes(client):
"""Get all nodes from etcd"""
nodes = []
try:
# Get all keys with prefix
for value, metadata in client.get_prefix(f"{ETCD_PREFIX}/by-hostname/"):
try:
node_data = json.loads(value.decode('utf-8'))
nodes.append(node_data)
except json.JSONDecodeError:
continue
except Exception as e:
print(f"Error getting nodes: {e}")
return nodescheck_dhcp_service_on_host function · python · L112-L140 (29 LOC)config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def check_dhcp_service_on_host(host_ip):
"""Check if DHCP server is running on a specific host using health monitoring endpoint"""
try:
url = f"http://{host_ip}:{DHCP_HEALTH_PORT}/health"
with urllib.request.urlopen(url, timeout=5) as response:
health_data = json.loads(response.read().decode())
if response.status == 200 and health_data.get('status') == 'healthy':
return {
'host': host_ip, 'running': True, 'method': 'health_endpoint',
'etcd_connected': health_data.get('etcd_connected', False),
'server_ip': health_data.get('server_ip', host_ip)
}
else:
status = health_data.get('status', f'HTTP {response.status}')
return {
'host': host_ip, 'running': False, 'method': 'health_endpoint',
'error': f'DHCP server unhealthy: {status}'
}
Repobility (the analyzer behind this table) · https://repobility.com
check_dnsmasq_service_on_host function · python · L142-L216 (75 LOC)config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def check_dnsmasq_service_on_host(host_ip, hostname):
"""Check if DNS service is running on a specific host and can resolve hostname"""
try:
import dns.resolver
resolver = dns.resolver.Resolver()
resolver.nameservers = [host_ip]
resolver.timeout = 3
resolver.lifetime = 3
# Try to resolve the hostname and its AMT counterpart
results = {'host': host_ip, 'hostname': hostname, 'running': True, 'dns_working': True, 'method': 'dns_query'}
try:
# Test main hostname
answer = resolver.resolve(hostname, 'A')
resolved_ip = str(answer[0])
results['resolved_ip'] = resolved_ip
# Test AMT hostname if this is a regular node (not already AMT)
if not hostname.endswith('a'):
amt_hostname = f"{hostname}a"
try:
amt_answer = resolver.resolve(amt_hostname, 'A')
amt_fetch_cluster_status function · python · L219-L227 (9 LOC)config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def fetch_cluster_status() -> dict[str, Any]:
"""Fetch aggregated cluster status from the storage VIP admin API."""
try:
url = f"http://{STORAGE_VIP}:{ADMIN_API_PORT}/api/cluster-status"
with urllib.request.urlopen(url, timeout=5) as response:
data: dict[str, Any] = json.loads(response.read().decode())
return data
except Exception as e:
return {'_error': str(e)}_svc_status_str function · python · L230-L244 (15 LOC)config/ansible/admin/files/ycluster/ycluster/utils/check_cluster.py
def _svc_status_str(details: Any) -> str:
"""Extract a human-readable string from a service details field."""
if isinstance(details, dict):
if 'message' in details:
return details['message']
# Construct summary from common service detail fields
parts = []
if details.get('service_active'):
parts.append('active')
if details.get('port_open'):
parts.append('port open')
if parts:
return ', '.join(parts)
return str(details)
return str(details) if details else ''