Function bodies 699 total
test_list_filter_base_only function · rust · L2261-L2312 (52 LOC)crates/bubbaloop/src/cli/node.rs
fn test_list_filter_base_only() {
let nodes = [
NodeState {
name: "rtsp-camera".to_string(),
path: "/path".to_string(),
status: "running".to_string(),
installed: true,
autostart_enabled: false,
version: "1.0.0".to_string(),
description: "Camera".to_string(),
node_type: "rust".to_string(),
is_built: true,
base_node: String::new(),
},
NodeState {
name: "rtsp-camera-terrace".to_string(),
path: "/path".to_string(),
status: "running".to_string(),
installed: true,
autostart_enabled: false,
version: "1.0.0".to_string(),
description: "Terrace cam".to_string(),
node_type: "rust".to_string(),
is_built: true,
base_node: "rtsp-camera".ttest_instance_name_validation function · rust · L2315-L2328 (14 LOC)crates/bubbaloop/src/cli/node.rs
fn test_instance_name_validation() {
// Valid instance names
assert!(is_valid_node_name("rtsp-camera-terrace"));
assert!(is_valid_node_name("rtsp-camera_1"));
assert!(is_valid_node_name("cam01"));
// Invalid: empty, starts with dash, special chars
assert!(!is_valid_node_name(""));
assert!(!is_valid_node_name("-starts-with-dash"));
assert!(!is_valid_node_name("has space"));
assert!(!is_valid_node_name("has;semicolon"));
assert!(!is_valid_node_name("../traversal"));
assert!(!is_valid_node_name(".hidden"));
}test_discover_nodes_ignores_files function · rust · L2331-L2340 (10 LOC)crates/bubbaloop/src/cli/node.rs
fn test_discover_nodes_ignores_files() {
let dir = tempfile::tempdir().unwrap();
// Create a file named "node.yaml" at root (not a subdir)
std::fs::write(dir.path().join("node.yaml"), "name: root").unwrap();
// Create a regular file (not a directory)
std::fs::write(dir.path().join("README.md"), "hello").unwrap();
let nodes = discover_nodes_in_subdirs(dir.path());
assert_eq!(nodes.len(), 0); // Only scans directories, not root
}test_instance_suffix_validation_valid function · rust · L2346-L2363 (18 LOC)crates/bubbaloop/src/cli/node.rs
fn test_instance_suffix_validation_valid() {
// Valid suffixes that should work
let valid_suffixes = ["terrace", "entrance", "cam01", "garden_1", "my-camera"];
for suffix in valid_suffixes {
assert!(
suffix
.chars()
.all(|c| c.is_alphanumeric() || c == '-' || c == '_'),
"Expected '{}' to be valid",
suffix
);
assert!(
!suffix.starts_with('-') && !suffix.starts_with('_'),
"Expected '{}' to not start with - or _",
suffix
);
}
}test_instance_suffix_validation_invalid function · rust · L2366-L2387 (22 LOC)crates/bubbaloop/src/cli/node.rs
fn test_instance_suffix_validation_invalid() {
// Invalid suffixes that should be rejected
let invalid_suffixes = [
"", // empty
"-terrace", // starts with dash
"_terrace", // starts with underscore
"terrace space", // contains space
"terrace/path", // contains slash
"../traversal", // path traversal
"terrace;cmd", // shell metacharacter
];
for suffix in invalid_suffixes {
let is_valid = !suffix.is_empty()
&& suffix.len() <= 64
&& suffix
.chars()
.all(|c| c.is_alphanumeric() || c == '-' || c == '_')
&& !suffix.starts_with('-')
&& !suffix.starts_with('_');
assert!(!is_valid, "Expected '{}' to be invalid", suffix);
}
}test_instance_name_construction function · rust · L2391-L2401 (11 LOC)crates/bubbaloop/src/cli/node.rs
fn test_instance_name_construction() {
let base = "rtsp-camera";
let suffix = "terrace";
let instance_name = format!("{}-{}", base, suffix);
assert_eq!(instance_name, "rtsp-camera-terrace");
let base2 = "weather-node";
let suffix2 = "station_1";
let instance_name2 = format!("{}-{}", base2, suffix2);
assert_eq!(instance_name2, "weather-node-station_1");
}test_find_example_config_finds_yaml function · rust · L2405-L2414 (10 LOC)crates/bubbaloop/src/cli/node.rs
fn test_find_example_config_finds_yaml() {
let dir = tempfile::tempdir().unwrap();
let configs_dir = dir.path().join("configs");
std::fs::create_dir(&configs_dir).unwrap();
std::fs::write(configs_dir.join("example.yaml"), "name: test").unwrap();
let result = find_example_config(&configs_dir);
assert!(result.is_ok());
assert!(result.unwrap().to_string_lossy().contains("example.yaml"));
}Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
test_find_example_config_finds_yml function · rust · L2417-L2425 (9 LOC)crates/bubbaloop/src/cli/node.rs
fn test_find_example_config_finds_yml() {
let dir = tempfile::tempdir().unwrap();
let configs_dir = dir.path().join("configs");
std::fs::create_dir(&configs_dir).unwrap();
std::fs::write(configs_dir.join("example.yml"), "name: test").unwrap();
let result = find_example_config(&configs_dir);
assert!(result.is_ok());
}test_find_example_config_empty_dir function · rust · L2428-L2436 (9 LOC)crates/bubbaloop/src/cli/node.rs
fn test_find_example_config_empty_dir() {
let dir = tempfile::tempdir().unwrap();
let configs_dir = dir.path().join("configs");
std::fs::create_dir(&configs_dir).unwrap();
let result = find_example_config(&configs_dir);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("No .yaml config"));
}test_find_example_config_ignores_non_yaml function · rust · L2439-L2448 (10 LOC)crates/bubbaloop/src/cli/node.rs
fn test_find_example_config_ignores_non_yaml() {
let dir = tempfile::tempdir().unwrap();
let configs_dir = dir.path().join("configs");
std::fs::create_dir(&configs_dir).unwrap();
std::fs::write(configs_dir.join("readme.txt"), "not a config").unwrap();
std::fs::write(configs_dir.join("config.json"), "{}").unwrap();
let result = find_example_config(&configs_dir);
assert!(result.is_err()); // Only .yaml/.yml files
}test_detect_arch function · rust · L2453-L2461 (9 LOC)crates/bubbaloop/src/cli/node.rs
fn test_detect_arch() {
let arch = crate::marketplace::detect_arch();
assert!(arch.is_ok());
let arch = arch.unwrap();
#[cfg(target_arch = "x86_64")]
assert_eq!(arch, "amd64");
#[cfg(target_arch = "aarch64")]
assert_eq!(arch, "arm64");
}test_precompiled_url_python_returns_none function · rust · L2485-L2499 (15 LOC)crates/bubbaloop/src/cli/node.rs
fn test_precompiled_url_python_returns_none() {
let entry = registry::RegistryNode {
name: "network-monitor".into(),
version: "0.1.0".into(),
node_type: "python".into(),
description: "Network monitor".into(),
category: "monitoring".into(),
tags: vec![],
repo: "kornia/bubbaloop-nodes-official".into(),
subdir: "network-monitor".into(),
binary: None,
};
assert!(registry::precompiled_url(&entry, "arm64").is_none());
}test_verify_sha256_valid function · rust · L2502-L2513 (12 LOC)crates/bubbaloop/src/cli/node.rs
fn test_verify_sha256_valid() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("test_binary");
std::fs::write(&file_path, b"hello world\n").unwrap();
// Compute expected checksum
let output = Command::new("sha256sum").arg(&file_path).output().unwrap();
let expected = String::from_utf8_lossy(&output.stdout).to_string();
let result = crate::marketplace::verify_sha256(&file_path, &expected);
assert!(result.is_ok());
}test_verify_sha256_mismatch function · rust · L2516-L2529 (14 LOC)crates/bubbaloop/src/cli/node.rs
fn test_verify_sha256_mismatch() {
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("test_binary");
std::fs::write(&file_path, b"hello world\n").unwrap();
let wrong_checksum =
"0000000000000000000000000000000000000000000000000000000000000000 test_binary";
let result = crate::marketplace::verify_sha256(&file_path, wrong_checksum);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Checksum mismatch"));
}test_set_executable function · rust · L2532-L2541 (10 LOC)crates/bubbaloop/src/cli/node.rs
fn test_set_executable() {
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::tempdir().unwrap();
let file_path = dir.path().join("test_bin");
std::fs::write(&file_path, b"#!/bin/sh\necho hi").unwrap();
crate::marketplace::set_executable(&file_path).unwrap();
let perms = std::fs::metadata(&file_path).unwrap().permissions();
assert_eq!(perms.mode() & 0o777, 0o755);
}Repobility — same analyzer, your code, free for public repos · /scan/
test_try_download_precompiled_no_binary_field function · rust · L2544-L2563 (20 LOC)crates/bubbaloop/src/cli/node.rs
fn test_try_download_precompiled_no_binary_field() {
let entry = registry::RegistryNode {
name: "test".into(),
version: "0.1.0".into(),
node_type: "rust".into(),
description: "test".into(),
category: "test".into(),
tags: vec![],
repo: "user/repo".into(),
subdir: "test".into(),
binary: None,
};
let result = try_download_precompiled(&entry);
assert!(result.is_err());
// MarketplaceError::NoBinary: "No precompiled binary available for 'test'"
assert!(result
.unwrap_err()
.to_string()
.contains("No precompiled binary"));
}test_try_download_precompiled_python_node function · rust · L2566-L2585 (20 LOC)crates/bubbaloop/src/cli/node.rs
fn test_try_download_precompiled_python_node() {
let entry = registry::RegistryNode {
name: "test".into(),
version: "0.1.0".into(),
node_type: "python".into(),
description: "test".into(),
category: "test".into(),
tags: vec![],
repo: "user/repo".into(),
subdir: "test".into(),
binary: Some("test_bin".into()),
};
let result = try_download_precompiled(&entry);
assert!(result.is_err());
// MarketplaceError::NoBinary includes type info: "only rust nodes have precompiled binaries"
assert!(result
.unwrap_err()
.to_string()
.contains("precompiled binary"));
}test_copy_canonical_header_proto_creates_file function · rust · L2588-L2610 (23 LOC)crates/bubbaloop/src/cli/node.rs
fn test_copy_canonical_header_proto_creates_file() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let node_path = tmp.path();
let protos_dir = node_path.join("protos");
std::fs::create_dir_all(&protos_dir).unwrap();
// Call the function
copy_canonical_header_proto(node_path);
// Verify header.proto was created
let header_path = protos_dir.join("header.proto");
assert!(header_path.exists(), "header.proto should be created");
// Verify content matches the embedded canonical version
let written = std::fs::read(&header_path).unwrap();
assert_eq!(
written,
crate::HEADER_PROTO,
"Written header.proto should match canonical version"
);
}test_copy_canonical_header_proto_no_protos_dir function · rust · L2613-L2628 (16 LOC)crates/bubbaloop/src/cli/node.rs
fn test_copy_canonical_header_proto_no_protos_dir() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let node_path = tmp.path();
// Don't create protos/ dir
copy_canonical_header_proto(node_path);
// Should not panic, just silently skip
let header_path = node_path.join("protos").join("header.proto");
assert!(
!header_path.exists(),
"Should not create header.proto without protos/ dir"
);
}test_copy_canonical_header_proto_overwrites_existing function · rust · L2631-L2654 (24 LOC)crates/bubbaloop/src/cli/node.rs
fn test_copy_canonical_header_proto_overwrites_existing() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let node_path = tmp.path();
let protos_dir = node_path.join("protos");
std::fs::create_dir_all(&protos_dir).unwrap();
// Create an old/different header.proto
let header_path = protos_dir.join("header.proto");
std::fs::write(&header_path, b"old header content").unwrap();
// Call the function
copy_canonical_header_proto(node_path);
// Verify it was overwritten with canonical version
let written = std::fs::read(&header_path).unwrap();
assert_eq!(
written,
crate::HEADER_PROTO,
"Should overwrite old header.proto with canonical version"
);
assert_ne!(written, b"old header content");
}check_systemd_service function · rust · L85-L103 (19 LOC)crates/bubbaloop/src/cli/status.rs
async fn check_systemd_service(service_name: &str) -> String {
let output = Command::new("systemctl")
.args(["--user", "is-active", service_name])
.output()
.await;
match output {
Ok(out) => {
let status = String::from_utf8_lossy(&out.stdout).trim().to_string();
if status == "active" {
"running".to_string()
} else {
status
}
}
Err(_) => "unknown".to_string(),
}
}query_nodes function · rust · L104-L130 (27 LOC)crates/bubbaloop/src/cli/status.rs
async fn query_nodes(session: &Session) -> Result<NodeListResponse> {
let replies = session
.get(API_NODES)
.target(QueryTarget::BestMatching)
.timeout(Duration::from_secs(5))
.await
.map_err(|e| anyhow!("Zenoh query failed: {}", e))?;
let timeout_duration = Duration::from_secs(5);
let start = std::time::Instant::now();
while start.elapsed() < timeout_duration {
match tokio::time::timeout(timeout_duration - start.elapsed(), replies.recv_async()).await {
Ok(Ok(reply)) => {
if let Ok(sample) = reply.result() {
let bytes = sample.payload().to_bytes();
let result: NodeListResponse = serde_json::from_slice(&bytes)?;
return Ok(result);
}
}
Ok(Err(_)) | Err(_) => break,
}
}
Err(anyhow!("No reply received from daemon (timeout after 5s)"))
}print_table function · rust · L131-L178 (48 LOC)crates/bubbaloop/src/cli/status.rs
fn print_table(status: &StatusOutput) {
println!("bubbaloop status");
println!("================");
// Zenoh Router
let zenoh_symbol = if status.zenoh.running { "✓" } else { "✗" };
let zenoh_state = if status.zenoh.running {
format!("running (port {})", status.zenoh.port)
} else {
"stopped".to_string()
};
println!("Zenoh Router: {} {}", zenoh_symbol, zenoh_state);
// Daemon
let daemon_symbol = if status.daemon.running { "✓" } else { "✗" };
let daemon_state = if status.daemon.running {
if let Some(count) = status.daemon.nodes {
format!("running ({} nodes)", count)
} else {
"running".to_string()
}
} else {
"stopped".to_string()
};
println!("Daemon: {} {}", daemon_symbol, daemon_state);
// Bridge
let bridge_symbol = if status.bridge.running { "✓" } else { "✗" };
let bridge_state = if status.bridge.running {
"running".to_string()
Repobility · MCP-ready · https://repobility.com
print_json function · rust · L179-L184 (6 LOC)crates/bubbaloop/src/cli/status.rs
fn print_json(status: &StatusOutput) -> Result<()> {
let json = serde_json::to_string_pretty(status)?;
println!("{}", json);
Ok(())
}run function · rust · L185-L198 (14 LOC)crates/bubbaloop/src/cli/status.rs
pub async fn run(format: &str) -> Result<()> {
// Collect status information
let status = collect_status().await?;
// Output in requested format
match format {
"json" => print_json(&status)?,
"table" => print_table(&status),
_ => print_table(&status),
}
Ok(())
}collect_status function · rust · L199-L257 (59 LOC)crates/bubbaloop/src/cli/status.rs
async fn collect_status() -> Result<StatusOutput> {
// Check Zenoh router
let zenoh_running = is_process_running("zenohd").await;
let zenoh = ZenohStatus {
running: zenoh_running,
port: 7447,
};
// Check daemon
let daemon_service = check_systemd_service("bubbaloop-daemon.service").await;
let daemon_running = daemon_service == "active";
// Try to get node count from daemon if it's running
let mut node_count = None;
let mut node_summary = NodeSummary {
running: 0,
stopped: 0,
not_installed: 0,
};
if daemon_running && zenoh_running {
if let Ok(session) = get_zenoh_session().await {
if let Ok(response) = query_nodes(&session).await {
node_count = Some(response.nodes.len());
// Count node states
for node in response.nodes {
if !node.installed {
node_summary.not_installed += 1;
is_process_running function · rust · L258-L263 (6 LOC)crates/bubbaloop/src/cli/status.rs
async fn is_process_running(name: &str) -> bool {
let output = Command::new("pgrep").arg("-x").arg(name).output().await;
matches!(output, Ok(out) if out.status.success())
}get_zenoh_session function · rust · L264-L286 (23 LOC)crates/bubbaloop/src/cli/status.rs
async fn get_zenoh_session() -> Result<Session> {
let mut config = zenoh::Config::default();
config
.insert_json5("mode", "\"client\"")
.map_err(|e| anyhow!("Failed to configure client mode: {}", e))?;
let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
.unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());
config
.insert_json5("connect/endpoints", &format!("[\"{}\"]", endpoint))
.map_err(|e| anyhow!("Failed to configure endpoint: {}", e))?;
// Disable scouting
let _ = config.insert_json5("scouting/multicast/enabled", "false");
let _ = config.insert_json5("scouting/gossip/enabled", "false");
zenoh::open(config)
.await
.map_err(|e| anyhow!("Failed to open Zenoh session: {}", e))
}test_status_output_serialization function · rust · L293-L314 (22 LOC)crates/bubbaloop/src/cli/status.rs
fn test_status_output_serialization() {
let status = StatusOutput {
zenoh: ZenohStatus {
running: true,
port: 7447,
},
daemon: DaemonStatus {
running: true,
nodes: Some(3),
},
bridge: BridgeStatus { running: false },
nodes: NodeSummary {
running: 1,
stopped: 2,
not_installed: 0,
},
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("\"running\":true"));
assert!(json.contains("\"port\":7447"));
}test_zenoh_status_serialization function · rust · L317-L325 (9 LOC)crates/bubbaloop/src/cli/status.rs
fn test_zenoh_status_serialization() {
let status = ZenohStatus {
running: true,
port: 7447,
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("true"));
assert!(json.contains("7447"));
}test_daemon_status_serialization function · rust · L328-L336 (9 LOC)crates/bubbaloop/src/cli/status.rs
fn test_daemon_status_serialization() {
let status = DaemonStatus {
running: true,
nodes: Some(5),
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("true"));
assert!(json.contains("5"));
}Same scanner, your repo: https://repobility.com — Repobility
test_node_summary_serialization function · rust · L346-L356 (11 LOC)crates/bubbaloop/src/cli/status.rs
fn test_node_summary_serialization() {
let summary = NodeSummary {
running: 1,
stopped: 2,
not_installed: 3,
};
let json = serde_json::to_string(&summary).unwrap();
assert!(json.contains("\"running\":1"));
assert!(json.contains("\"stopped\":2"));
assert!(json.contains("\"not_installed\":3"));
}test_node_state_deserialization function · rust · L359-L379 (21 LOC)crates/bubbaloop/src/cli/status.rs
fn test_node_state_deserialization() {
let json = r#"{
"name": "rtsp-camera",
"path": "/path/to/node",
"status": "running",
"installed": true,
"autostart_enabled": true,
"version": "0.1.0",
"description": "RTSP Camera Node",
"node_type": "rust",
"is_built": true,
"build_output": ["Building...", "Done"]
}"#;
let node: NodeState = serde_json::from_str(json).unwrap();
assert_eq!(node.name, "rtsp-camera");
assert_eq!(node.status, "running");
assert_eq!(node.node_type, "rust");
assert!(node.installed);
assert!(node.is_built);
}test_node_list_response_deserialization function · rust · L382-L405 (24 LOC)crates/bubbaloop/src/cli/status.rs
fn test_node_list_response_deserialization() {
let json = r#"{
"nodes": [
{
"name": "node1",
"path": "/path1",
"status": "running",
"installed": true,
"autostart_enabled": false,
"version": "1.0.0",
"description": "First node",
"node_type": "rust",
"is_built": true,
"build_output": []
}
],
"timestamp_ms": 1234567890
}"#;
let response: NodeListResponse = serde_json::from_str(json).unwrap();
assert_eq!(response.nodes.len(), 1);
assert_eq!(response.nodes[0].name, "node1");
assert_eq!(response.timestamp_ms, 1234567890);
}create_session function · rust · L34-L103 (70 LOC)crates/bubbaloop/src/daemon/mod.rs
pub async fn create_session(endpoint: Option<&str>) -> Result<Arc<Session>, zenoh::Error> {
// Configure Zenoh session
let mut config = zenoh::Config::default();
// Peer mode — allows direct connections from co-located nodes
config
.insert_json5("mode", "\"peer\"")
.expect("Failed to set Zenoh mode");
// Resolve endpoint: parameter > env var > default
let env_endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT").ok();
let ep = endpoint
.or(env_endpoint.as_deref())
.unwrap_or("tcp/127.0.0.1:7447");
log::info!("Connecting to Zenoh router at: {}", ep);
config
.insert_json5("connect/endpoints", &format!("[\"{}\"]", ep))
.expect("Failed to set Zenoh endpoint");
// Disable all scouting to prevent connecting to remote peers via Tailscale/VPN
config
.insert_json5("scouting/multicast/enabled", "false")
.expect("Failed to disable multicast scouting");
config
.insert_json5("default function · rust · L69-L74 (6 LOC)crates/bubbaloop/src/daemon/node_manager.rs
fn default() -> Self {
Self {
status: BuildStatus::Idle,
output: Vec::new(),
}
}get_machine_ips function · rust · L78-L89 (12 LOC)crates/bubbaloop/src/daemon/node_manager.rs
fn get_machine_ips() -> Vec<String> {
// Use `hostname -I` which returns all IPs (works on Linux)
if let Ok(output) = std::process::Command::new("hostname").arg("-I").output() {
if output.status.success() {
return String::from_utf8_lossy(&output.stdout)
.split_whitespace()
.map(String::from)
.collect();
}
}
Vec::new()
}effective_name function · rust · L114-L122 (9 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub fn effective_name(&self) -> String {
if let Some(ref ov) = self.name_override {
return ov.clone();
}
self.manifest
.as_ref()
.map(|m| m.name.clone())
.unwrap_or_else(|| "unknown".to_string())
}to_proto function · rust · L125-L161 (37 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub fn to_proto(
&self,
machine_id: &str,
machine_hostname: &str,
machine_ips: &[String],
) -> NodeState {
let manifest = self.manifest.as_ref();
let base_name = manifest.map(|m| m.name.clone()).unwrap_or_default();
NodeState {
name: self.effective_name(),
path: self.path.clone(),
status: self.status as i32,
installed: self.installed,
autostart_enabled: self.autostart_enabled,
version: manifest
.map(|m| m.version.clone())
.unwrap_or_else(|| "0.0.0".to_string()),
description: manifest.map(|m| m.description.clone()).unwrap_or_default(),
node_type: manifest
.map(|m| m.node_type.clone())
.unwrap_or_else(|| "unknown".to_string()),
is_built: self.is_built,
last_updated_ms: self.last_updated_ms,
build_output: self.build_state.output.clonHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
new function · rust · L184-L218 (35 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub async fn new() -> Result<Arc<Self>> {
let systemd = SystemdClient::new().await?;
let (event_tx, _) = broadcast::channel(100);
let machine_id = super::util::get_machine_id();
// Get machine hostname
let machine_hostname = hostname::get()
.map(|h| h.to_string_lossy().to_string())
.unwrap_or_else(|_| "unknown".to_string());
let machine_ips = get_machine_ips();
log::info!(
"NodeManager using machine_id: {}, hostname: {}, ips: {:?}",
machine_id,
machine_hostname,
machine_ips
);
let manager = Arc::new(Self {
nodes: RwLock::new(HashMap::new()),
systemd,
event_tx,
building_nodes: Mutex::new(HashSet::new()),
machine_id,
machine_hostname,
machine_ips,
});
// Initial load
manager.refresh_all().await?;
Ok(manager)
}start_signal_listener function · rust · L234-L292 (59 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub async fn start_signal_listener(self: Arc<Self>) -> Result<()> {
let mut signal_rx = self.systemd.subscribe_to_signals().await?;
tokio::spawn(async move {
log::info!("Signal listener started");
// Debounce: collect signals for up to 500ms, then refresh once
let debounce = Duration::from_millis(500);
loop {
// Wait for at least one signal
let first = match signal_rx.recv().await {
Some(ev) => ev,
None => {
log::warn!("Signal listener ended (channel closed)");
break;
}
};
// Collect node names and event types from this burst
let mut pending: Vec<(String, String)> = Vec::new();
Self::push_signal_event(&first, &mut pending);
// Drain any additional signals within the debounce window
lrefresh_node function · rust · L318-L367 (50 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub async fn refresh_node(&self, name: &str) -> Result<()> {
let service_name = systemd::get_service_name(name);
// Get systemd state
let active_state = self.systemd.get_active_state(&service_name).await?;
let installed = systemd::is_service_installed(name);
let autostart_enabled = if installed {
self.systemd
.is_enabled(&service_name)
.await
.unwrap_or(false)
} else {
false
};
let status = if !installed {
NodeStatus::NotInstalled
} else {
match active_state {
ActiveState::Active => NodeStatus::Running,
ActiveState::Failed => NodeStatus::Failed,
ActiveState::Inactive => NodeStatus::Stopped,
ActiveState::Activating => NodeStatus::Running,
ActiveState::Deactivating => NodeStatus::Stopped,
_ => NodeStatus::Stopped,
now_ms function · rust · L490-L495 (6 LOC)crates/bubbaloop/src/daemon/node_manager.rs
fn now_ms() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}refresh_all function · rust · L498-L597 (100 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub async fn refresh_all(&self) -> Result<()> {
let registered = registry::list_nodes()?;
let mut nodes = self.nodes.write().await;
// Track which keys we've seen (keyed by effective_name)
let mut seen = std::collections::HashSet::new();
for (entry, manifest) in registered {
// Compute effective name: name_override if present, otherwise manifest name
let eff_name = manifest
.as_ref()
.map(|m| registry::effective_name(&entry, m))
.unwrap_or_else(|| {
entry
.name_override
.clone()
.unwrap_or_else(|| "unknown".to_string())
});
// Use effective name as the HashMap key so each instance is distinct
let key = eff_name.clone();
seen.insert(key.clone());
let service_name = systemd::get_service_name(&eff_name);
get_node_list function · rust · L600-L611 (12 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub async fn get_node_list(&self) -> NodeList {
let nodes = self.nodes.read().await;
log::debug!("get_node_list: nodes HashMap has {} entries", nodes.len());
NodeList {
nodes: nodes
.values()
.map(|n| n.to_proto(&self.machine_id, &self.machine_hostname, &self.machine_ips))
.collect(),
timestamp_ms: Self::now_ms(),
machine_id: self.machine_id.clone(),
}
}get_cached_manifests function · rust · L617-L623 (7 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub async fn get_cached_manifests(&self) -> Vec<(String, NodeManifest)> {
let nodes = self.nodes.read().await;
nodes
.values()
.filter_map(|n| n.manifest.as_ref().map(|m| (n.effective_name(), m.clone())))
.collect()
}get_node function · rust · L626-L632 (7 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub async fn get_node(&self, name: &str) -> Option<NodeState> {
let nodes = self.nodes.read().await;
nodes
.values()
.find(|n| n.effective_name() == name)
.map(|n| n.to_proto(&self.machine_id, &self.machine_hostname, &self.machine_ips))
}Repobility — same analyzer, your code, free for public repos · /scan/
execute_command function · rust · L635-L719 (85 LOC)crates/bubbaloop/src/daemon/node_manager.rs
pub async fn execute_command(self: &Arc<Self>, cmd: NodeCommand) -> CommandResult {
let command_type = CommandType::try_from(cmd.command).unwrap_or(CommandType::Refresh);
log::debug!(
"execute_command: type={:?} node={}",
command_type,
cmd.node_name
);
// Special handling for GET_LOGS since it returns data in output field
if command_type == CommandType::GetLogs {
let node_state = self.get_node(&cmd.node_name).await;
return match self.get_logs(&cmd.node_name).await {
Ok(logs) => CommandResult {
request_id: cmd.request_id,
success: true,
message: "Logs retrieved".to_string(),
output: logs,
node_state,
timestamp_ms: Self::now_ms(),
responding_machine: self.machine_id.clone(),
},
Err(e) => CommandResulfind_node_path function · rust · L722-L729 (8 LOC)crates/bubbaloop/src/daemon/node_manager.rs
async fn find_node_path(&self, name: &str) -> Result<String> {
let nodes = self.nodes.read().await;
nodes
.values()
.find(|n| n.effective_name() == name)
.map(|n| n.path.clone())
.ok_or_else(|| NodeManagerError::NodeNotFound(name.to_string()))
}get_logs function · rust · L732-L754 (23 LOC)crates/bubbaloop/src/daemon/node_manager.rs
async fn get_logs(&self, name: &str) -> Result<String> {
// Check if node exists
let _path = self.find_node_path(name).await?;
let service_name = systemd::get_service_name(name);
// Use _SYSTEMD_USER_UNIT filter for user services (logs are in system journal)
// This works on systems where --user journal doesn't exist
let unit_filter = format!("_SYSTEMD_USER_UNIT={}", service_name);
let journal_output = Command::new(JOURNALCTL_PATH)
.args([&unit_filter, "-n", "50", "--no-pager", "-o", "cat"])
.output()
.await?;
let stdout = String::from_utf8_lossy(&journal_output.stdout);
let lines: Vec<&str> = stdout.lines().collect();
if lines.is_empty() {
Ok("No logs available".to_string())
} else {
Ok(lines.join("\n"))
}
}