Function bodies 355 total
frontend_delete function · python · L45-L47 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/frontend.py
def frontend_delete(args):
"""Delete frontend node"""
frontend_manager.delete_frontend_node(args.name)frontend_show function · python · L50-L52 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/frontend.py
def frontend_show(args):
"""Show frontend node"""
frontend_manager.show_frontend_node(args.name)register_healthchecks_commands function · python · L11-L53 (43 LOC)config/ansible/admin/files/ycluster/ycluster/cli/healthchecks.py
def register_healthchecks_commands(subparsers):
"""Register healthchecks subcommands"""
healthchecks_parser = subparsers.add_parser(
'healthchecks',
help='Manage healthchecks.io configuration'
)
healthchecks_subparsers = healthchecks_parser.add_subparsers(
dest='healthchecks_command',
help='Healthchecks commands'
)
# Set URL
set_parser = healthchecks_subparsers.add_parser(
'set-url',
help='Set healthchecks.io ping URL'
)
set_parser.add_argument(
'url',
help='Healthchecks.io ping URL (e.g., https://hc-ping.com/YOUR-UUID-HERE)'
)
set_parser.set_defaults(func=healthchecks_set_url)
# Get URL
get_parser = healthchecks_subparsers.add_parser(
'get-url',
help='Get current healthchecks.io ping URL'
)
get_parser.set_defaults(func=healthchecks_get_url)
# Delete URL
delete_parser = healthchecks_subparsers.add_parser(
'deletehealthchecks_set_url function · python · L55-L72 (18 LOC)config/ansible/admin/files/ycluster/ycluster/cli/healthchecks.py
def healthchecks_set_url(args):
"""Set healthchecks.io ping URL in etcd"""
try:
client = get_etcd_client()
# Validate URL format
url = args.url.strip()
if not url.startswith(('http://', 'https://')):
print(f"Error: URL must start with http:// or https://", file=sys.stderr)
sys.exit(1)
# Store in etcd
client.put(ETCD_KEY, url)
print(f"Healthchecks URL set to: {url}")
except Exception as e:
print(f"Error setting healthchecks URL: {e}", file=sys.stderr)
sys.exit(1)healthchecks_get_url function · python · L74-L89 (16 LOC)config/ansible/admin/files/ycluster/ycluster/cli/healthchecks.py
def healthchecks_get_url(args):
"""Get current healthchecks.io ping URL from etcd"""
try:
client = get_etcd_client()
result = client.get(ETCD_KEY)
if result[0]:
url = result[0].decode()
print(f"Healthchecks URL: {url}")
else:
print("No healthchecks URL configured")
sys.exit(1)
except Exception as e:
print(f"Error getting healthchecks URL: {e}", file=sys.stderr)
sys.exit(1)healthchecks_delete_url function · python · L91-L108 (18 LOC)config/ansible/admin/files/ycluster/ycluster/cli/healthchecks.py
def healthchecks_delete_url(args):
"""Delete healthchecks.io ping URL from etcd"""
try:
client = get_etcd_client()
# Check if exists
result = client.get(ETCD_KEY)
if not result[0]:
print("No healthchecks URL configured")
sys.exit(1)
# Delete from etcd
client.delete(ETCD_KEY)
print("Healthchecks URL deleted")
except Exception as e:
print(f"Error deleting healthchecks URL: {e}", file=sys.stderr)
sys.exit(1)healthchecks_test function · python · L110-L146 (37 LOC)config/ansible/admin/files/ycluster/ycluster/cli/healthchecks.py
def healthchecks_test(args):
"""Send a test ping to healthchecks.io"""
import requests
import socket
from datetime import datetime
try:
client = get_etcd_client()
# Get URL from etcd
result = client.get(ETCD_KEY)
if not result[0]:
print("No healthchecks URL configured", file=sys.stderr)
sys.exit(1)
url = result[0].decode()
# Prepare test message
hostname = socket.gethostname()
message = f"Test ping from {hostname} at {datetime.now().isoformat()}"
# Send ping
print(f"Sending test ping to: {url}")
response = requests.post(url, data=message, timeout=10)
if response.status_code == 200:
print("Test ping sent successfully")
else:
print(f"Test ping failed with HTTP {response.status_code}", file=sys.stderr)
sys.exit(1)
except requests.exceptions.RGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
register_https_commands function · python · L7-L43 (37 LOC)config/ansible/admin/files/ycluster/ycluster/cli/https.py
def register_https_commands(subparsers):
"""Register HTTPS management commands"""
https_parser = subparsers.add_parser('https', help='HTTPS domain configuration')
https_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=https_parser)
https_subparsers = https_parser.add_subparsers(dest='https_command', help='HTTPS commands')
# Set domain command
domain_parser = https_subparsers.add_parser('set-domain', help='Set primary domain')
domain_parser.add_argument('domain', help='Primary domain name')
domain_parser.set_defaults(func=https_set_domain)
# Add alias command
alias_parser = https_subparsers.add_parser('add-alias', help='Add domain alias')
alias_parser.add_argument('alias', help='Domain alias to add')
alias_parser.set_defaults(func=https_add_alias)
# Remove alias command
remove_parser = https_subparsers.add_parser('remove-alias', help='Remove domain alias')
remove_parser.add_argument('alias',https_set_domain function · python · L46-L48 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/https.py
def https_set_domain(args):
"""Set primary domain"""
https_config.set_domain(args.domain)https_add_alias function · python · L51-L53 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/https.py
def https_add_alias(args):
"""Add domain alias"""
https_config.add_alias(args.alias)https_remove_alias function · python · L56-L58 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/https.py
def https_remove_alias(args):
"""Remove domain alias"""
https_config.remove_alias(args.alias)https_set_email function · python · L61-L63 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/https.py
def https_set_email(args):
"""Set email"""
https_config.set_email(args.email)https_get function · python · L66-L82 (17 LOC)config/ansible/admin/files/ycluster/ycluster/cli/https.py
def https_get(args):
"""Get HTTPS configuration"""
config = https_config.get_https_config()
if config:
print("HTTPS Configuration:")
if 'domain' in config:
print(f"Primary domain: {config['domain']}")
if config.get('aliases'):
print(f"Aliases: {', '.join(config['aliases'])}")
if 'email' in config:
print(f"Email: {config['email']}")
all_domains = https_config.get_all_domains()
if all_domains:
print(f"All domains: {', '.join(all_domains)}")
else:
print("No HTTPS configuration found")https_list_domains function · python · L85-L92 (8 LOC)config/ansible/admin/files/ycluster/ycluster/cli/https.py
def https_list_domains(args):
"""List all domains"""
domains = https_config.get_all_domains()
if domains:
for domain in domains:
print(domain)
else:
print("No domains configured")https_delete function · python · L95-L97 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/https.py
def https_delete(args):
"""Delete HTTPS configuration"""
https_config.delete_https_config()Repobility · code-quality intelligence · https://repobility.com
register_inference_commands function · python · L10-L44 (35 LOC)config/ansible/admin/files/ycluster/ycluster/cli/inference.py
def register_inference_commands(subparsers):
"""Register inference management commands"""
parser = subparsers.add_parser('inference', help='Inference gateway management (LiteLLM)')
parser.set_defaults(func=lambda args: args.parser.print_help(), parser=parser)
sub = parser.add_subparsers(dest='inference_command', help='Inference commands')
# ycluster inference models
models_parser = sub.add_parser('models', help='List configured models with backends')
models_parser.set_defaults(func=inference_models)
# ycluster inference ls (alias for models)
ls_parser = sub.add_parser('ls', help='List configured models with backends (alias for models)')
ls_parser.set_defaults(func=inference_models)
# ycluster inference add <api-base> [model-name] [--backend-model <name>] [key=value ...]
add_parser = sub.add_parser('add', help='Add model(s) from a backend')
add_parser.add_argument('api_base', help='Backend URL — shorthand allowed (e.g. nv1.xc -> httinference_models function · python · L47-L49 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/inference.py
def inference_models(args):
"""List configured models"""
inference_manager.list_models()_parse_extra_params function · python · L53-L70 (18 LOC)config/ansible/admin/files/ycluster/ycluster/cli/inference.py
def _parse_extra_params(extra_params):
"""Parse key=value pairs into a dict, auto-converting numeric values."""
params = {}
for item in extra_params:
if '=' not in item:
print(f"Invalid parameter (expected key=value): {item}")
sys.exit(1)
key, value = item.split('=', 1)
# Auto-convert numeric values
try:
value = int(value)
except ValueError:
try:
value = float(value)
except ValueError:
pass
params[key] = value
return paramsinference_add function · python · L73-L76 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/inference.py
def inference_add(args):
"""Add a model backend"""
extra = _parse_extra_params(args.extra_params)
inference_manager.add_model(args.model_name, args.api_base, args.backend_model, extra)inference_remove function · python · L79-L81 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/inference.py
def inference_remove(args):
"""Remove a model"""
inference_manager.remove_model(args.model_name, args.api_base)inference_key function · python · L84-L86 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/inference.py
def inference_key(args):
"""Print the LiteLLM master API key"""
inference_manager.print_master_key()inference_reload function · python · L89-L91 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/inference.py
def inference_reload(args):
"""Reload LiteLLM configuration"""
inference_manager.reload_litellm()create_parser function · python · L11-L63 (53 LOC)config/ansible/admin/files/ycluster/ycluster/cli/main.py
def create_parser():
"""Create the main argument parser"""
parser = argparse.ArgumentParser(
prog='ycluster',
description='YCluster Infrastructure Management Tools',
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'--version',
action='version',
version=f'ycluster {__version__}'
)
parser.add_argument(
'--completion',
action='store_true',
help='Generate bash completion script'
)
subparsers = parser.add_subparsers(
dest='command',
help='Available commands',
metavar='COMMAND'
)
# Import and register subcommands
from .cluster import register_cluster_commands, register_health_alias
from .dhcp import register_dhcp_commands
from .tls import register_tls_commands
from .https import register_https_commands
from .certbot import register_certbot_commands
from .rathole import register_ratholRepobility · severity-and-effort ranking · https://repobility.com
extract_parser_structure function · python · L66-L84 (19 LOC)config/ansible/admin/files/ycluster/ycluster/cli/main.py
def extract_parser_structure(parser):
"""Extract command structure from argparse parser"""
structure = {
'commands': {},
'options': []
}
# Get top-level options
for action in parser._actions:
if action.option_strings:
structure['options'].extend(action.option_strings)
# Get subcommands
for action in parser._actions:
if isinstance(action, argparse._SubParsersAction):
for choice, subparser in action.choices.items():
structure['commands'][choice] = extract_subparser_structure(subparser)
return structureextract_subparser_structure function · python · L86-L108 (23 LOC)config/ansible/admin/files/ycluster/ycluster/cli/main.py
def extract_subparser_structure(parser):
"""Extract structure from a subparser"""
structure = {
'commands': {},
'options': [],
'positional': []
}
# Get options
for action in parser._actions:
if action.option_strings:
structure['options'].extend(action.option_strings)
elif action.dest not in ['help', 'func', 'parser'] and not action.dest.endswith('_command'):
# Positional arguments (excluding special ones)
structure['positional'].append(action.dest)
# Get subcommands
for action in parser._actions:
if isinstance(action, argparse._SubParsersAction):
for choice, subparser in action.choices.items():
structure['commands'][choice] = extract_subparser_structure(subparser)
return structuregenerate_bash_completion_from_structure function · python · L110-L131 (22 LOC)config/ansible/admin/files/ycluster/ycluster/cli/main.py
def generate_bash_completion_from_structure(structure, cmd_path=[]):
"""Generate bash completion logic from parser structure"""
lines = []
if not cmd_path: # Top level
# Generate main command completion
main_commands = list(structure['commands'].keys()) + structure['options']
lines.append(f' COMPREPLY=($(compgen -W "{" ".join(main_commands)}" -- ${{cur}}))')
lines.append(' return 0')
else:
# Generate subcommand completion
if structure['commands']:
subcommands = list(structure['commands'].keys())
lines.append(f' COMPREPLY=($(compgen -W "{" ".join(subcommands)}" -- ${{cur}}))')
elif structure['options']:
options = structure['options']
lines.append(f' COMPREPLY=($(compgen -W "{" ".join(options)}" -- ${{cur}}))')
else:
lines.append(' COMPREPLY=()')
lines.appendgenerate_completion_script function · python · L133-L232 (100 LOC)config/ansible/admin/files/ycluster/ycluster/cli/main.py
def generate_completion_script():
"""Generate bash completion script dynamically from argparse structure"""
parser = create_parser()
structure = extract_parser_structure(parser)
# Build the completion script
script_lines = [
'#!/bin/bash',
'# Bash completion for ycluster command (auto-generated from argparse)',
'',
'_ycluster_completions() {',
' local cur prev opts',
' COMPREPLY=()',
' cur="${COMP_WORDS[COMP_CWORD]}"',
' prev="${COMP_WORDS[COMP_CWORD-1]}"',
' ',
' case ${COMP_CWORD} in',
' 1)',
' # Complete main commands'
]
# Add main command completion
main_completion = generate_bash_completion_from_structure(structure)
script_lines.extend([' ' + line for line in main_completion])
script_lines.extend([
' ;;',
' 2)',
' # Complete subcommanmain function · python · L235-L257 (23 LOC)config/ansible/admin/files/ycluster/ycluster/cli/main.py
def main():
"""Main CLI entry point"""
parser = create_parser()
args = parser.parse_args()
# Handle completion generation
if args.completion:
print(generate_completion_script())
return
if not args.command:
parser.print_help()
sys.exit(1)
# Execute the command
try:
args.func(args)
except KeyboardInterrupt:
print("\nInterrupted by user", file=sys.stderr)
sys.exit(130)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)register_rathole_commands function · python · L7-L33 (27 LOC)config/ansible/admin/files/ycluster/ycluster/cli/rathole.py
def register_rathole_commands(subparsers):
"""Register rathole management commands"""
rathole_parser = subparsers.add_parser('rathole', help='Rathole tunnel configuration')
rathole_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=rathole_parser)
rathole_subparsers = rathole_parser.add_subparsers(dest='rathole_command', help='Rathole commands')
# Set command
set_parser = rathole_subparsers.add_parser('set', help='Set rathole configuration')
set_parser.add_argument('--remote-addr', help='Remote server address (e.g., server.com:2333)')
set_parser.add_argument('--token', help='Authentication token')
set_parser.set_defaults(func=rathole_set)
# Get command
get_parser = rathole_subparsers.add_parser('get', help='Get current rathole configuration')
get_parser.set_defaults(func=rathole_get)
# Generate client config command
gen_client_parser = rathole_subparsers.add_parser('generate-client', help='Generarathole_set function · python · L36-L38 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/rathole.py
def rathole_set(args):
"""Set rathole configuration"""
rathole_config.set_rathole_config(args.remote_addr, args.token)rathole_get function · python · L41-L43 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/rathole.py
def rathole_get(args):
"""Get rathole configuration"""
rathole_config.get_rathole_config()If a scraper extracted this row, it came from Repobility (https://repobility.com)
rathole_generate_client function · python · L46-L48 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/rathole.py
def rathole_generate_client(args):
"""Generate client configuration"""
rathole_config.generate_client_config()rathole_generate_ssh_client function · python · L51-L53 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/rathole.py
def rathole_generate_ssh_client(args):
"""Generate SSH-only client configuration"""
rathole_config.generate_ssh_client_config()rathole_delete function · python · L56-L58 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/rathole.py
def rathole_delete(args):
"""Delete rathole configuration"""
rathole_config.delete_rathole_config()register_storage_commands function · python · L8-L40 (33 LOC)config/ansible/admin/files/ycluster/ycluster/cli/storage.py
def register_storage_commands(subparsers):
"""Register storage management commands"""
storage_parser = subparsers.add_parser('storage', help='Storage and RBD management')
storage_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=storage_parser)
storage_subparsers = storage_parser.add_subparsers(dest='storage_command', help='Storage commands')
# User RBD commands
rbd_parser = storage_subparsers.add_parser('rbd', help='User RBD volume management')
rbd_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=rbd_parser)
rbd_subparsers = rbd_parser.add_subparsers(dest='rbd_command', help='RBD commands')
# RBD start command
start_parser = rbd_subparsers.add_parser('start', help='Acquire RBD lock and mount volume')
start_parser.add_argument('-K', action='store_true', help='Use LUKS passphrase instead of Clevis')
start_parser.set_defaults(func=storage_rbd_start)
# RBD stop command
stop_pastorage_rbd_start function · python · L43-L51 (9 LOC)config/ansible/admin/files/ycluster/ycluster/cli/storage.py
def storage_rbd_start(args):
"""Start user RBD"""
success = user_rbd.rbd_start(use_passphrase=args.K)
if success:
print("User RBD successfully acquired and mounted")
sys.exit(0)
else:
print("Failed to acquire User RBD lock")
sys.exit(1)storage_rbd_stop function · python · L54-L56 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/storage.py
def storage_rbd_stop(args):
"""Stop user RBD"""
user_rbd.rbd_stop()storage_rbd_status function · python · L59-L61 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/storage.py
def storage_rbd_status(args):
"""Show RBD status"""
user_rbd.rbd_status()storage_rbd_check function · python · L64-L67 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/storage.py
def storage_rbd_check(args):
"""Check RBD decrypt capability"""
success = user_rbd.rbd_check(use_passphrase=args.K)
sys.exit(0 if success else 1)Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
storage_rbd_bind function · python · L70-L81 (12 LOC)config/ansible/admin/files/ycluster/ycluster/cli/storage.py
def storage_rbd_bind(args):
"""Bind RBD to Clevis"""
result = user_rbd.rbd_bind(args.key_file)
if result == 0:
print("Clevis binding already correct")
sys.exit(0)
elif result == 1:
print("Clevis binding updated successfully")
sys.exit(1)
else:
print("Clevis binding failed")
sys.exit(2)register_tls_commands function · python · L8-L73 (66 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def register_tls_commands(subparsers):
"""Register TLS management commands"""
tls_parser = subparsers.add_parser('tls', help='TLS certificate management')
tls_parser.set_defaults(func=lambda args: args.parser.print_help(), parser=tls_parser)
tls_subparsers = tls_parser.add_subparsers(dest='tls_command', help='TLS commands')
# Generate command
gen_parser = tls_subparsers.add_parser('generate', help='Generate and store a new self-signed certificate')
gen_parser.add_argument('--common-name', help='Common name for the certificate')
gen_parser.add_argument('--san', action='append', help='Subject Alternative Name (can be used multiple times)')
gen_parser.set_defaults(func=tls_generate)
# Set common name command
cn_parser = tls_subparsers.add_parser('set-common-name', help='Set common name in etcd')
cn_parser.add_argument('common_name', help='Common name to store in etcd')
cn_parser.set_defaults(func=tls_set_common_name)
#tls_generate function · python · L76-L79 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def tls_generate(args):
"""Generate TLS certificate"""
san_list = args.san or ["10.0.0.254", "cluster.local", "localhost"]
tls_config.generate_and_store_cert(args.common_name, san_list)tls_set_common_name function · python · L82-L84 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def tls_set_common_name(args):
"""Set common name"""
tls_config.set_common_name_in_etcd(args.common_name)tls_get_common_name function · python · L87-L93 (7 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def tls_get_common_name(args):
"""Get common name"""
common_name = tls_config.get_common_name_from_etcd()
if common_name:
print(f"Common name: {common_name}")
else:
print("No common name set in etcd")tls_set function · python · L96-L102 (7 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def tls_set(args):
"""Set TLS certificate from files"""
with open(args.cert_file, 'r') as f:
cert_pem = f.read()
with open(args.key_file, 'r') as f:
key_pem = f.read()
tls_config.set_tls_cert(cert_pem, key_pem)tls_get function · python · L105-L108 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def tls_get(args):
"""Get TLS certificate"""
if not tls_config.get_tls_cert():
sys.exit(1)tls_delete function · python · L111-L113 (3 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def tls_delete(args):
"""Delete TLS certificate"""
tls_config.delete_tls_cert()Repobility · code-quality intelligence · https://repobility.com
tls_fetch_certs function · python · L116-L119 (4 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def tls_fetch_certs(args):
"""Fetch TLS certificates from etcd to nginx"""
from ..utils import fetch_tls_certs
fetch_tls_certs.main()ca_generate function · python · L122-L129 (8 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def ca_generate(args):
"""Generate CA certificate"""
from ..utils import ca_manager
try:
ca_manager.generate_ca()
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)ca_info function · python · L132-L136 (5 LOC)config/ansible/admin/files/ycluster/ycluster/cli/tls.py
def ca_info(args):
"""Show CA information"""
from ..utils import ca_manager
if not ca_manager.get_ca_info():
sys.exit(1)