← back to kornia__bubbaloop

Function bodies 699 total

All specs Real LLM only Function bodies
test_list_filter_base_only function · rust · L2261-L2312 (52 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_list_filter_base_only() {
        let nodes = [
            NodeState {
                name: "rtsp-camera".to_string(),
                path: "/path".to_string(),
                status: "running".to_string(),
                installed: true,
                autostart_enabled: false,
                version: "1.0.0".to_string(),
                description: "Camera".to_string(),
                node_type: "rust".to_string(),
                is_built: true,
                base_node: String::new(),
            },
            NodeState {
                name: "rtsp-camera-terrace".to_string(),
                path: "/path".to_string(),
                status: "running".to_string(),
                installed: true,
                autostart_enabled: false,
                version: "1.0.0".to_string(),
                description: "Terrace cam".to_string(),
                node_type: "rust".to_string(),
                is_built: true,
                base_node: "rtsp-camera".t
test_instance_name_validation function · rust · L2315-L2328 (14 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_instance_name_validation() {
        // Valid instance names
        assert!(is_valid_node_name("rtsp-camera-terrace"));
        assert!(is_valid_node_name("rtsp-camera_1"));
        assert!(is_valid_node_name("cam01"));

        // Invalid: empty, starts with dash, special chars
        assert!(!is_valid_node_name(""));
        assert!(!is_valid_node_name("-starts-with-dash"));
        assert!(!is_valid_node_name("has space"));
        assert!(!is_valid_node_name("has;semicolon"));
        assert!(!is_valid_node_name("../traversal"));
        assert!(!is_valid_node_name(".hidden"));
    }
test_discover_nodes_ignores_files function · rust · L2331-L2340 (10 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_discover_nodes_ignores_files() {
        let dir = tempfile::tempdir().unwrap();
        // Create a file named "node.yaml" at root (not a subdir)
        std::fs::write(dir.path().join("node.yaml"), "name: root").unwrap();
        // Create a regular file (not a directory)
        std::fs::write(dir.path().join("README.md"), "hello").unwrap();

        let nodes = discover_nodes_in_subdirs(dir.path());
        assert_eq!(nodes.len(), 0); // Only scans directories, not root
    }
test_instance_suffix_validation_valid function · rust · L2346-L2363 (18 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_instance_suffix_validation_valid() {
        // Valid suffixes that should work
        let valid_suffixes = ["terrace", "entrance", "cam01", "garden_1", "my-camera"];
        for suffix in valid_suffixes {
            assert!(
                suffix
                    .chars()
                    .all(|c| c.is_alphanumeric() || c == '-' || c == '_'),
                "Expected '{}' to be valid",
                suffix
            );
            assert!(
                !suffix.starts_with('-') && !suffix.starts_with('_'),
                "Expected '{}' to not start with - or _",
                suffix
            );
        }
    }
test_instance_suffix_validation_invalid function · rust · L2366-L2387 (22 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_instance_suffix_validation_invalid() {
        // Invalid suffixes that should be rejected
        let invalid_suffixes = [
            "",              // empty
            "-terrace",      // starts with dash
            "_terrace",      // starts with underscore
            "terrace space", // contains space
            "terrace/path",  // contains slash
            "../traversal",  // path traversal
            "terrace;cmd",   // shell metacharacter
        ];
        for suffix in invalid_suffixes {
            let is_valid = !suffix.is_empty()
                && suffix.len() <= 64
                && suffix
                    .chars()
                    .all(|c| c.is_alphanumeric() || c == '-' || c == '_')
                && !suffix.starts_with('-')
                && !suffix.starts_with('_');
            assert!(!is_valid, "Expected '{}' to be invalid", suffix);
        }
    }
test_instance_name_construction function · rust · L2391-L2401 (11 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_instance_name_construction() {
        let base = "rtsp-camera";
        let suffix = "terrace";
        let instance_name = format!("{}-{}", base, suffix);
        assert_eq!(instance_name, "rtsp-camera-terrace");

        let base2 = "weather-node";
        let suffix2 = "station_1";
        let instance_name2 = format!("{}-{}", base2, suffix2);
        assert_eq!(instance_name2, "weather-node-station_1");
    }
test_find_example_config_finds_yaml function · rust · L2405-L2414 (10 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_find_example_config_finds_yaml() {
        let dir = tempfile::tempdir().unwrap();
        let configs_dir = dir.path().join("configs");
        std::fs::create_dir(&configs_dir).unwrap();
        std::fs::write(configs_dir.join("example.yaml"), "name: test").unwrap();

        let result = find_example_config(&configs_dir);
        assert!(result.is_ok());
        assert!(result.unwrap().to_string_lossy().contains("example.yaml"));
    }
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
test_find_example_config_finds_yml function · rust · L2417-L2425 (9 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_find_example_config_finds_yml() {
        let dir = tempfile::tempdir().unwrap();
        let configs_dir = dir.path().join("configs");
        std::fs::create_dir(&configs_dir).unwrap();
        std::fs::write(configs_dir.join("example.yml"), "name: test").unwrap();

        let result = find_example_config(&configs_dir);
        assert!(result.is_ok());
    }
test_find_example_config_empty_dir function · rust · L2428-L2436 (9 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_find_example_config_empty_dir() {
        let dir = tempfile::tempdir().unwrap();
        let configs_dir = dir.path().join("configs");
        std::fs::create_dir(&configs_dir).unwrap();

        let result = find_example_config(&configs_dir);
        assert!(result.is_err());
        assert!(result.unwrap_err().to_string().contains("No .yaml config"));
    }
test_find_example_config_ignores_non_yaml function · rust · L2439-L2448 (10 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_find_example_config_ignores_non_yaml() {
        let dir = tempfile::tempdir().unwrap();
        let configs_dir = dir.path().join("configs");
        std::fs::create_dir(&configs_dir).unwrap();
        std::fs::write(configs_dir.join("readme.txt"), "not a config").unwrap();
        std::fs::write(configs_dir.join("config.json"), "{}").unwrap();

        let result = find_example_config(&configs_dir);
        assert!(result.is_err()); // Only .yaml/.yml files
    }
test_detect_arch function · rust · L2453-L2461 (9 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_detect_arch() {
        let arch = crate::marketplace::detect_arch();
        assert!(arch.is_ok());
        let arch = arch.unwrap();
        #[cfg(target_arch = "x86_64")]
        assert_eq!(arch, "amd64");
        #[cfg(target_arch = "aarch64")]
        assert_eq!(arch, "arm64");
    }
test_precompiled_url_python_returns_none function · rust · L2485-L2499 (15 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_precompiled_url_python_returns_none() {
        let entry = registry::RegistryNode {
            name: "network-monitor".into(),
            version: "0.1.0".into(),
            node_type: "python".into(),
            description: "Network monitor".into(),
            category: "monitoring".into(),
            tags: vec![],
            repo: "kornia/bubbaloop-nodes-official".into(),
            subdir: "network-monitor".into(),
            binary: None,
        };

        assert!(registry::precompiled_url(&entry, "arm64").is_none());
    }
test_verify_sha256_valid function · rust · L2502-L2513 (12 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_verify_sha256_valid() {
        let dir = tempfile::tempdir().unwrap();
        let file_path = dir.path().join("test_binary");
        std::fs::write(&file_path, b"hello world\n").unwrap();

        // Compute expected checksum
        let output = Command::new("sha256sum").arg(&file_path).output().unwrap();
        let expected = String::from_utf8_lossy(&output.stdout).to_string();

        let result = crate::marketplace::verify_sha256(&file_path, &expected);
        assert!(result.is_ok());
    }
test_verify_sha256_mismatch function · rust · L2516-L2529 (14 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_verify_sha256_mismatch() {
        let dir = tempfile::tempdir().unwrap();
        let file_path = dir.path().join("test_binary");
        std::fs::write(&file_path, b"hello world\n").unwrap();

        let wrong_checksum =
            "0000000000000000000000000000000000000000000000000000000000000000  test_binary";
        let result = crate::marketplace::verify_sha256(&file_path, wrong_checksum);
        assert!(result.is_err());
        assert!(result
            .unwrap_err()
            .to_string()
            .contains("Checksum mismatch"));
    }
test_set_executable function · rust · L2532-L2541 (10 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_set_executable() {
        use std::os::unix::fs::PermissionsExt;
        let dir = tempfile::tempdir().unwrap();
        let file_path = dir.path().join("test_bin");
        std::fs::write(&file_path, b"#!/bin/sh\necho hi").unwrap();

        crate::marketplace::set_executable(&file_path).unwrap();
        let perms = std::fs::metadata(&file_path).unwrap().permissions();
        assert_eq!(perms.mode() & 0o777, 0o755);
    }
Repobility — same analyzer, your code, free for public repos · /scan/
test_try_download_precompiled_no_binary_field function · rust · L2544-L2563 (20 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_try_download_precompiled_no_binary_field() {
        let entry = registry::RegistryNode {
            name: "test".into(),
            version: "0.1.0".into(),
            node_type: "rust".into(),
            description: "test".into(),
            category: "test".into(),
            tags: vec![],
            repo: "user/repo".into(),
            subdir: "test".into(),
            binary: None,
        };
        let result = try_download_precompiled(&entry);
        assert!(result.is_err());
        // MarketplaceError::NoBinary: "No precompiled binary available for 'test'"
        assert!(result
            .unwrap_err()
            .to_string()
            .contains("No precompiled binary"));
    }
test_try_download_precompiled_python_node function · rust · L2566-L2585 (20 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_try_download_precompiled_python_node() {
        let entry = registry::RegistryNode {
            name: "test".into(),
            version: "0.1.0".into(),
            node_type: "python".into(),
            description: "test".into(),
            category: "test".into(),
            tags: vec![],
            repo: "user/repo".into(),
            subdir: "test".into(),
            binary: Some("test_bin".into()),
        };
        let result = try_download_precompiled(&entry);
        assert!(result.is_err());
        // MarketplaceError::NoBinary includes type info: "only rust nodes have precompiled binaries"
        assert!(result
            .unwrap_err()
            .to_string()
            .contains("precompiled binary"));
    }
test_copy_canonical_header_proto_creates_file function · rust · L2588-L2610 (23 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_copy_canonical_header_proto_creates_file() {
        use tempfile::tempdir;

        let tmp = tempdir().unwrap();
        let node_path = tmp.path();
        let protos_dir = node_path.join("protos");
        std::fs::create_dir_all(&protos_dir).unwrap();

        // Call the function
        copy_canonical_header_proto(node_path);

        // Verify header.proto was created
        let header_path = protos_dir.join("header.proto");
        assert!(header_path.exists(), "header.proto should be created");

        // Verify content matches the embedded canonical version
        let written = std::fs::read(&header_path).unwrap();
        assert_eq!(
            written,
            crate::HEADER_PROTO,
            "Written header.proto should match canonical version"
        );
    }
test_copy_canonical_header_proto_no_protos_dir function · rust · L2613-L2628 (16 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_copy_canonical_header_proto_no_protos_dir() {
        use tempfile::tempdir;

        let tmp = tempdir().unwrap();
        let node_path = tmp.path();

        // Don't create protos/ dir
        copy_canonical_header_proto(node_path);

        // Should not panic, just silently skip
        let header_path = node_path.join("protos").join("header.proto");
        assert!(
            !header_path.exists(),
            "Should not create header.proto without protos/ dir"
        );
    }
test_copy_canonical_header_proto_overwrites_existing function · rust · L2631-L2654 (24 LOC)
crates/bubbaloop/src/cli/node.rs
    fn test_copy_canonical_header_proto_overwrites_existing() {
        use tempfile::tempdir;

        let tmp = tempdir().unwrap();
        let node_path = tmp.path();
        let protos_dir = node_path.join("protos");
        std::fs::create_dir_all(&protos_dir).unwrap();

        // Create an old/different header.proto
        let header_path = protos_dir.join("header.proto");
        std::fs::write(&header_path, b"old header content").unwrap();

        // Call the function
        copy_canonical_header_proto(node_path);

        // Verify it was overwritten with canonical version
        let written = std::fs::read(&header_path).unwrap();
        assert_eq!(
            written,
            crate::HEADER_PROTO,
            "Should overwrite old header.proto with canonical version"
        );
        assert_ne!(written, b"old header content");
    }
check_systemd_service function · rust · L85-L103 (19 LOC)
crates/bubbaloop/src/cli/status.rs
async fn check_systemd_service(service_name: &str) -> String {
    let output = Command::new("systemctl")
        .args(["--user", "is-active", service_name])
        .output()
        .await;

    match output {
        Ok(out) => {
            let status = String::from_utf8_lossy(&out.stdout).trim().to_string();
            if status == "active" {
                "running".to_string()
            } else {
                status
            }
        }
        Err(_) => "unknown".to_string(),
    }
}
query_nodes function · rust · L104-L130 (27 LOC)
crates/bubbaloop/src/cli/status.rs
async fn query_nodes(session: &Session) -> Result<NodeListResponse> {
    let replies = session
        .get(API_NODES)
        .target(QueryTarget::BestMatching)
        .timeout(Duration::from_secs(5))
        .await
        .map_err(|e| anyhow!("Zenoh query failed: {}", e))?;

    let timeout_duration = Duration::from_secs(5);
    let start = std::time::Instant::now();

    while start.elapsed() < timeout_duration {
        match tokio::time::timeout(timeout_duration - start.elapsed(), replies.recv_async()).await {
            Ok(Ok(reply)) => {
                if let Ok(sample) = reply.result() {
                    let bytes = sample.payload().to_bytes();
                    let result: NodeListResponse = serde_json::from_slice(&bytes)?;
                    return Ok(result);
                }
            }
            Ok(Err(_)) | Err(_) => break,
        }
    }

    Err(anyhow!("No reply received from daemon (timeout after 5s)"))
}
print_table function · rust · L131-L178 (48 LOC)
crates/bubbaloop/src/cli/status.rs
fn print_table(status: &StatusOutput) {
    println!("bubbaloop status");
    println!("================");

    // Zenoh Router
    let zenoh_symbol = if status.zenoh.running { "✓" } else { "✗" };
    let zenoh_state = if status.zenoh.running {
        format!("running (port {})", status.zenoh.port)
    } else {
        "stopped".to_string()
    };
    println!("Zenoh Router:  {} {}", zenoh_symbol, zenoh_state);

    // Daemon
    let daemon_symbol = if status.daemon.running { "✓" } else { "✗" };
    let daemon_state = if status.daemon.running {
        if let Some(count) = status.daemon.nodes {
            format!("running ({} nodes)", count)
        } else {
            "running".to_string()
        }
    } else {
        "stopped".to_string()
    };
    println!("Daemon:        {} {}", daemon_symbol, daemon_state);

    // Bridge
    let bridge_symbol = if status.bridge.running { "✓" } else { "✗" };
    let bridge_state = if status.bridge.running {
        "running".to_string()
  
Repobility · MCP-ready · https://repobility.com
print_json function · rust · L179-L184 (6 LOC)
crates/bubbaloop/src/cli/status.rs
fn print_json(status: &StatusOutput) -> Result<()> {
    let json = serde_json::to_string_pretty(status)?;
    println!("{}", json);
    Ok(())
}
run function · rust · L185-L198 (14 LOC)
crates/bubbaloop/src/cli/status.rs
pub async fn run(format: &str) -> Result<()> {
    // Collect status information
    let status = collect_status().await?;

    // Output in requested format
    match format {
        "json" => print_json(&status)?,
        "table" => print_table(&status),
        _ => print_table(&status),
    }

    Ok(())
}
collect_status function · rust · L199-L257 (59 LOC)
crates/bubbaloop/src/cli/status.rs
async fn collect_status() -> Result<StatusOutput> {
    // Check Zenoh router
    let zenoh_running = is_process_running("zenohd").await;
    let zenoh = ZenohStatus {
        running: zenoh_running,
        port: 7447,
    };

    // Check daemon
    let daemon_service = check_systemd_service("bubbaloop-daemon.service").await;
    let daemon_running = daemon_service == "active";

    // Try to get node count from daemon if it's running
    let mut node_count = None;
    let mut node_summary = NodeSummary {
        running: 0,
        stopped: 0,
        not_installed: 0,
    };

    if daemon_running && zenoh_running {
        if let Ok(session) = get_zenoh_session().await {
            if let Ok(response) = query_nodes(&session).await {
                node_count = Some(response.nodes.len());

                // Count node states
                for node in response.nodes {
                    if !node.installed {
                        node_summary.not_installed += 1;
            
is_process_running function · rust · L258-L263 (6 LOC)
crates/bubbaloop/src/cli/status.rs
async fn is_process_running(name: &str) -> bool {
    let output = Command::new("pgrep").arg("-x").arg(name).output().await;

    matches!(output, Ok(out) if out.status.success())
}
get_zenoh_session function · rust · L264-L286 (23 LOC)
crates/bubbaloop/src/cli/status.rs
async fn get_zenoh_session() -> Result<Session> {
    let mut config = zenoh::Config::default();

    config
        .insert_json5("mode", "\"client\"")
        .map_err(|e| anyhow!("Failed to configure client mode: {}", e))?;

    let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
        .unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());

    config
        .insert_json5("connect/endpoints", &format!("[\"{}\"]", endpoint))
        .map_err(|e| anyhow!("Failed to configure endpoint: {}", e))?;

    // Disable scouting
    let _ = config.insert_json5("scouting/multicast/enabled", "false");
    let _ = config.insert_json5("scouting/gossip/enabled", "false");

    zenoh::open(config)
        .await
        .map_err(|e| anyhow!("Failed to open Zenoh session: {}", e))
}
test_status_output_serialization function · rust · L293-L314 (22 LOC)
crates/bubbaloop/src/cli/status.rs
    fn test_status_output_serialization() {
        let status = StatusOutput {
            zenoh: ZenohStatus {
                running: true,
                port: 7447,
            },
            daemon: DaemonStatus {
                running: true,
                nodes: Some(3),
            },
            bridge: BridgeStatus { running: false },
            nodes: NodeSummary {
                running: 1,
                stopped: 2,
                not_installed: 0,
            },
        };

        let json = serde_json::to_string(&status).unwrap();
        assert!(json.contains("\"running\":true"));
        assert!(json.contains("\"port\":7447"));
    }
test_zenoh_status_serialization function · rust · L317-L325 (9 LOC)
crates/bubbaloop/src/cli/status.rs
    fn test_zenoh_status_serialization() {
        let status = ZenohStatus {
            running: true,
            port: 7447,
        };
        let json = serde_json::to_string(&status).unwrap();
        assert!(json.contains("true"));
        assert!(json.contains("7447"));
    }
test_daemon_status_serialization function · rust · L328-L336 (9 LOC)
crates/bubbaloop/src/cli/status.rs
    fn test_daemon_status_serialization() {
        let status = DaemonStatus {
            running: true,
            nodes: Some(5),
        };
        let json = serde_json::to_string(&status).unwrap();
        assert!(json.contains("true"));
        assert!(json.contains("5"));
    }
Same scanner, your repo: https://repobility.com — Repobility
test_node_summary_serialization function · rust · L346-L356 (11 LOC)
crates/bubbaloop/src/cli/status.rs
    fn test_node_summary_serialization() {
        let summary = NodeSummary {
            running: 1,
            stopped: 2,
            not_installed: 3,
        };
        let json = serde_json::to_string(&summary).unwrap();
        assert!(json.contains("\"running\":1"));
        assert!(json.contains("\"stopped\":2"));
        assert!(json.contains("\"not_installed\":3"));
    }
test_node_state_deserialization function · rust · L359-L379 (21 LOC)
crates/bubbaloop/src/cli/status.rs
    fn test_node_state_deserialization() {
        let json = r#"{
            "name": "rtsp-camera",
            "path": "/path/to/node",
            "status": "running",
            "installed": true,
            "autostart_enabled": true,
            "version": "0.1.0",
            "description": "RTSP Camera Node",
            "node_type": "rust",
            "is_built": true,
            "build_output": ["Building...", "Done"]
        }"#;

        let node: NodeState = serde_json::from_str(json).unwrap();
        assert_eq!(node.name, "rtsp-camera");
        assert_eq!(node.status, "running");
        assert_eq!(node.node_type, "rust");
        assert!(node.installed);
        assert!(node.is_built);
    }
test_node_list_response_deserialization function · rust · L382-L405 (24 LOC)
crates/bubbaloop/src/cli/status.rs
    fn test_node_list_response_deserialization() {
        let json = r#"{
            "nodes": [
                {
                    "name": "node1",
                    "path": "/path1",
                    "status": "running",
                    "installed": true,
                    "autostart_enabled": false,
                    "version": "1.0.0",
                    "description": "First node",
                    "node_type": "rust",
                    "is_built": true,
                    "build_output": []
                }
            ],
            "timestamp_ms": 1234567890
        }"#;

        let response: NodeListResponse = serde_json::from_str(json).unwrap();
        assert_eq!(response.nodes.len(), 1);
        assert_eq!(response.nodes[0].name, "node1");
        assert_eq!(response.timestamp_ms, 1234567890);
    }
create_session function · rust · L34-L103 (70 LOC)
crates/bubbaloop/src/daemon/mod.rs
pub async fn create_session(endpoint: Option<&str>) -> Result<Arc<Session>, zenoh::Error> {
    // Configure Zenoh session
    let mut config = zenoh::Config::default();

    // Peer mode — allows direct connections from co-located nodes
    config
        .insert_json5("mode", "\"peer\"")
        .expect("Failed to set Zenoh mode");

    // Resolve endpoint: parameter > env var > default
    let env_endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT").ok();
    let ep = endpoint
        .or(env_endpoint.as_deref())
        .unwrap_or("tcp/127.0.0.1:7447");

    log::info!("Connecting to Zenoh router at: {}", ep);

    config
        .insert_json5("connect/endpoints", &format!("[\"{}\"]", ep))
        .expect("Failed to set Zenoh endpoint");

    // Disable all scouting to prevent connecting to remote peers via Tailscale/VPN
    config
        .insert_json5("scouting/multicast/enabled", "false")
        .expect("Failed to disable multicast scouting");
    config
        .insert_json5("
default function · rust · L69-L74 (6 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    fn default() -> Self {
        Self {
            status: BuildStatus::Idle,
            output: Vec::new(),
        }
    }
get_machine_ips function · rust · L78-L89 (12 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
fn get_machine_ips() -> Vec<String> {
    // Use `hostname -I` which returns all IPs (works on Linux)
    if let Ok(output) = std::process::Command::new("hostname").arg("-I").output() {
        if output.status.success() {
            return String::from_utf8_lossy(&output.stdout)
                .split_whitespace()
                .map(String::from)
                .collect();
        }
    }
    Vec::new()
}
effective_name function · rust · L114-L122 (9 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub fn effective_name(&self) -> String {
        if let Some(ref ov) = self.name_override {
            return ov.clone();
        }
        self.manifest
            .as_ref()
            .map(|m| m.name.clone())
            .unwrap_or_else(|| "unknown".to_string())
    }
to_proto function · rust · L125-L161 (37 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub fn to_proto(
        &self,
        machine_id: &str,
        machine_hostname: &str,
        machine_ips: &[String],
    ) -> NodeState {
        let manifest = self.manifest.as_ref();
        let base_name = manifest.map(|m| m.name.clone()).unwrap_or_default();
        NodeState {
            name: self.effective_name(),
            path: self.path.clone(),
            status: self.status as i32,
            installed: self.installed,
            autostart_enabled: self.autostart_enabled,
            version: manifest
                .map(|m| m.version.clone())
                .unwrap_or_else(|| "0.0.0".to_string()),
            description: manifest.map(|m| m.description.clone()).unwrap_or_default(),
            node_type: manifest
                .map(|m| m.node_type.clone())
                .unwrap_or_else(|| "unknown".to_string()),
            is_built: self.is_built,
            last_updated_ms: self.last_updated_ms,
            build_output: self.build_state.output.clon
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
new function · rust · L184-L218 (35 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub async fn new() -> Result<Arc<Self>> {
        let systemd = SystemdClient::new().await?;
        let (event_tx, _) = broadcast::channel(100);

        let machine_id = super::util::get_machine_id();

        // Get machine hostname
        let machine_hostname = hostname::get()
            .map(|h| h.to_string_lossy().to_string())
            .unwrap_or_else(|_| "unknown".to_string());

        let machine_ips = get_machine_ips();

        log::info!(
            "NodeManager using machine_id: {}, hostname: {}, ips: {:?}",
            machine_id,
            machine_hostname,
            machine_ips
        );

        let manager = Arc::new(Self {
            nodes: RwLock::new(HashMap::new()),
            systemd,
            event_tx,
            building_nodes: Mutex::new(HashSet::new()),
            machine_id,
            machine_hostname,
            machine_ips,
        });

        // Initial load
        manager.refresh_all().await?;

        Ok(manager)
    }
start_signal_listener function · rust · L234-L292 (59 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub async fn start_signal_listener(self: Arc<Self>) -> Result<()> {
        let mut signal_rx = self.systemd.subscribe_to_signals().await?;

        tokio::spawn(async move {
            log::info!("Signal listener started");
            // Debounce: collect signals for up to 500ms, then refresh once
            let debounce = Duration::from_millis(500);

            loop {
                // Wait for at least one signal
                let first = match signal_rx.recv().await {
                    Some(ev) => ev,
                    None => {
                        log::warn!("Signal listener ended (channel closed)");
                        break;
                    }
                };

                // Collect node names and event types from this burst
                let mut pending: Vec<(String, String)> = Vec::new();
                Self::push_signal_event(&first, &mut pending);

                // Drain any additional signals within the debounce window
                l
refresh_node function · rust · L318-L367 (50 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub async fn refresh_node(&self, name: &str) -> Result<()> {
        let service_name = systemd::get_service_name(name);

        // Get systemd state
        let active_state = self.systemd.get_active_state(&service_name).await?;
        let installed = systemd::is_service_installed(name);
        let autostart_enabled = if installed {
            self.systemd
                .is_enabled(&service_name)
                .await
                .unwrap_or(false)
        } else {
            false
        };

        let status = if !installed {
            NodeStatus::NotInstalled
        } else {
            match active_state {
                ActiveState::Active => NodeStatus::Running,
                ActiveState::Failed => NodeStatus::Failed,
                ActiveState::Inactive => NodeStatus::Stopped,
                ActiveState::Activating => NodeStatus::Running,
                ActiveState::Deactivating => NodeStatus::Stopped,
                _ => NodeStatus::Stopped,
        
now_ms function · rust · L490-L495 (6 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    fn now_ms() -> i64 {
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_millis() as i64
    }
refresh_all function · rust · L498-L597 (100 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub async fn refresh_all(&self) -> Result<()> {
        let registered = registry::list_nodes()?;
        let mut nodes = self.nodes.write().await;

        // Track which keys we've seen (keyed by effective_name)
        let mut seen = std::collections::HashSet::new();

        for (entry, manifest) in registered {
            // Compute effective name: name_override if present, otherwise manifest name
            let eff_name = manifest
                .as_ref()
                .map(|m| registry::effective_name(&entry, m))
                .unwrap_or_else(|| {
                    entry
                        .name_override
                        .clone()
                        .unwrap_or_else(|| "unknown".to_string())
                });

            // Use effective name as the HashMap key so each instance is distinct
            let key = eff_name.clone();
            seen.insert(key.clone());

            let service_name = systemd::get_service_name(&eff_name);

            
get_node_list function · rust · L600-L611 (12 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub async fn get_node_list(&self) -> NodeList {
        let nodes = self.nodes.read().await;
        log::debug!("get_node_list: nodes HashMap has {} entries", nodes.len());
        NodeList {
            nodes: nodes
                .values()
                .map(|n| n.to_proto(&self.machine_id, &self.machine_hostname, &self.machine_ips))
                .collect(),
            timestamp_ms: Self::now_ms(),
            machine_id: self.machine_id.clone(),
        }
    }
get_cached_manifests function · rust · L617-L623 (7 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub async fn get_cached_manifests(&self) -> Vec<(String, NodeManifest)> {
        let nodes = self.nodes.read().await;
        nodes
            .values()
            .filter_map(|n| n.manifest.as_ref().map(|m| (n.effective_name(), m.clone())))
            .collect()
    }
get_node function · rust · L626-L632 (7 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub async fn get_node(&self, name: &str) -> Option<NodeState> {
        let nodes = self.nodes.read().await;
        nodes
            .values()
            .find(|n| n.effective_name() == name)
            .map(|n| n.to_proto(&self.machine_id, &self.machine_hostname, &self.machine_ips))
    }
Repobility — same analyzer, your code, free for public repos · /scan/
execute_command function · rust · L635-L719 (85 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    pub async fn execute_command(self: &Arc<Self>, cmd: NodeCommand) -> CommandResult {
        let command_type = CommandType::try_from(cmd.command).unwrap_or(CommandType::Refresh);
        log::debug!(
            "execute_command: type={:?} node={}",
            command_type,
            cmd.node_name
        );

        // Special handling for GET_LOGS since it returns data in output field
        if command_type == CommandType::GetLogs {
            let node_state = self.get_node(&cmd.node_name).await;
            return match self.get_logs(&cmd.node_name).await {
                Ok(logs) => CommandResult {
                    request_id: cmd.request_id,
                    success: true,
                    message: "Logs retrieved".to_string(),
                    output: logs,
                    node_state,
                    timestamp_ms: Self::now_ms(),
                    responding_machine: self.machine_id.clone(),
                },
                Err(e) => CommandResul
find_node_path function · rust · L722-L729 (8 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    async fn find_node_path(&self, name: &str) -> Result<String> {
        let nodes = self.nodes.read().await;
        nodes
            .values()
            .find(|n| n.effective_name() == name)
            .map(|n| n.path.clone())
            .ok_or_else(|| NodeManagerError::NodeNotFound(name.to_string()))
    }
get_logs function · rust · L732-L754 (23 LOC)
crates/bubbaloop/src/daemon/node_manager.rs
    async fn get_logs(&self, name: &str) -> Result<String> {
        // Check if node exists
        let _path = self.find_node_path(name).await?;

        let service_name = systemd::get_service_name(name);

        // Use _SYSTEMD_USER_UNIT filter for user services (logs are in system journal)
        // This works on systems where --user journal doesn't exist
        let unit_filter = format!("_SYSTEMD_USER_UNIT={}", service_name);
        let journal_output = Command::new(JOURNALCTL_PATH)
            .args([&unit_filter, "-n", "50", "--no-pager", "-o", "cat"])
            .output()
            .await?;

        let stdout = String::from_utf8_lossy(&journal_output.stdout);
        let lines: Vec<&str> = stdout.lines().collect();

        if lines.is_empty() {
            Ok("No logs available".to_string())
        } else {
            Ok(lines.join("\n"))
        }
    }
‹ prevpage 4 / 14next ›