← back to kornia__bubbaloop

Function bodies 699 total

All specs Real LLM only Function bodies
find_proto_files function · rust · L4-L17 (14 LOC)
crates/bubbaloop/build.rs
fn find_proto_files(dir: &str) -> Vec<String> {
    let mut proto_files = Vec::new();
    if let Ok(entries) = std::fs::read_dir(dir) {
        for entry in entries.flatten() {
            let path = entry.path();
            if path.is_dir() {
                proto_files.extend(find_proto_files(path.to_str().unwrap()));
            } else if path.extension().and_then(|s| s.to_str()) == Some("proto") {
                proto_files.push(path.to_string_lossy().into_owned());
            }
        }
    }
    proto_files
}
main function · rust · L18-L54 (37 LOC)
crates/bubbaloop/build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
    let out_dir = PathBuf::from(std::env::var("OUT_DIR")?);
    let protos_dir = PathBuf::from("../bubbaloop-schemas/protos");

    // Bubbaloop protobuf files
    let proto_files = find_proto_files(protos_dir.to_str().unwrap());

    // Compile all proto files
    let mut prost_build = prost_build::Config::new();
    prost_build.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]");
    prost_build.file_descriptor_set_path(out_dir.join("descriptor.bin"));
    prost_build.compile_protos(&proto_files, &[protos_dir.to_string_lossy().as_ref()])?;

    // Rerun if any proto file changes
    for proto_file in &proto_files {
        println!("cargo:rerun-if-changed={}", proto_file);
    }
    println!("cargo:rerun-if-changed={}", protos_dir.to_string_lossy());

    // Rebuild if any dashboard dist file changes (for rust-embed)
    fn watch_dir(dir: &str) {
        if let Ok(entries) = std::fs::read_dir(dir) {
           
watch_dir function · rust · L39-L50 (12 LOC)
crates/bubbaloop/build.rs
    fn watch_dir(dir: &str) {
        if let Ok(entries) = std::fs::read_dir(dir) {
            for entry in entries.flatten() {
                let path = entry.path();
                if path.is_dir() {
                    watch_dir(path.to_str().unwrap());
                } else {
                    println!("cargo:rerun-if-changed={}", path.display());
                }
            }
        }
    }
load_config function · rust · L4-L10 (7 LOC)
crates/bubbaloop-node-sdk/src/config.rs
pub fn load_config<C: serde::de::DeserializeOwned>(path: &Path) -> anyhow::Result<C> {
    let content = std::fs::read_to_string(path)
        .map_err(|e| anyhow::anyhow!("Failed to read config '{}': {}", path.display(), e))?;
    let config: C = serde_yaml::from_str(&content)
        .map_err(|e| anyhow::anyhow!("Failed to parse config '{}': {}", path.display(), e))?;
    Ok(config)
}
test_load_valid_config function · rust · L24-L31 (8 LOC)
crates/bubbaloop-node-sdk/src/config.rs
    fn test_load_valid_config() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("config.yaml");
        std::fs::write(&path, "name: test\nrate_hz: 10.0\n").unwrap();
        let config: TestConfig = load_config(&path).unwrap();
        assert_eq!(config.name, "test");
        assert!((config.rate_hz - 10.0).abs() < f64::EPSILON);
    }
test_load_invalid_yaml function · rust · L40-L46 (7 LOC)
crates/bubbaloop-node-sdk/src/config.rs
    fn test_load_invalid_yaml() {
        let dir = tempfile::tempdir().unwrap();
        let path = dir.path().join("bad.yaml");
        std::fs::write(&path, "not: [valid: yaml: {{").unwrap();
        let result: anyhow::Result<TestConfig> = load_config(&path);
        assert!(result.is_err());
    }
init function · rust · L67-L87 (21 LOC)
crates/bubbaloop-node-sdk/src/lib.rs
    async fn init(ctx: &NodeContext, config: &Self::Config) -> anyhow::Result<Self>
    where
        Self: Sized;

    /// Main loop. Called after init(). Must respect ctx.shutdown_rx for graceful exit.
    /// When the shutdown signal fires, this method should return Ok(()).
    async fn run(self, ctx: NodeContext) -> anyhow::Result<()>;
}

/// Built-in CLI arguments handled by the SDK.
#[derive(argh::FromArgs)]
#[argh(description = "Bubbaloop node")]
struct SdkArgs {
    /// path to configuration file
    #[argh(option, short = 'c', default = "default_config_path()")]
    config: PathBuf,

    /// zenoh endpoint to connect to
    #[argh(option, short = 'e')]
    endpoint: Option<String>,
}
Powered by Repobility — scan your code at https://repobility.com
run_node function · rust · L98-L162 (65 LOC)
crates/bubbaloop-node-sdk/src/lib.rs
pub async fn run_node<N: Node>() -> anyhow::Result<()> {
    // 1. Init logging
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

    // 2. Parse CLI args
    let args: SdkArgs = argh::from_env();

    // 3. Load config
    let node_config: N::Config = config::load_config(&args.config)?;
    log::info!(
        "{}: config loaded from {}",
        N::name(),
        args.config.display()
    );

    // 4. Resolve scope + machine_id
    let scope = std::env::var("BUBBALOOP_SCOPE").unwrap_or_else(|_| "local".to_string());
    let machine_id = std::env::var("BUBBALOOP_MACHINE_ID")
        .unwrap_or_else(|_| {
            hostname::get()
                .map(|h| h.to_string_lossy().to_string())
                .unwrap_or_else(|_| "unknown".to_string())
        })
        .replace('-', "_");
    log::info!("Scope: {}, Machine ID: {}", scope, machine_id);

    // 5. Setup shutdown channel
    let (shutdown_tx, _) = shutdown::setup_shutdown()?;
declare_schema_queryable function · rust · L8-L33 (26 LOC)
crates/bubbaloop-node-sdk/src/schema.rs
pub async fn declare_schema_queryable(
    session: &Arc<zenoh::Session>,
    scope: &str,
    machine_id: &str,
    node_name: &str,
    descriptor: &'static [u8],
) -> anyhow::Result<zenoh::query::Queryable<()>> {
    let schema_key = format!("bubbaloop/{}/{}/{}/schema", scope, machine_id, node_name);

    let queryable = session
        .declare_queryable(&schema_key)
        .callback({
            let descriptor = descriptor.to_vec();
            move |query| {
                log::debug!("Schema query received");
                if let Err(e) = query.reply(&query.key_expr().clone(), descriptor.as_slice()) {
                    log::warn!("Failed to reply to schema query: {}", e);
                }
            }
        })
        .await
        .map_err(|e| anyhow::anyhow!("Failed to create schema queryable: {}", e))?;

    log::info!("Schema queryable: {}", schema_key);
    Ok(queryable)
}
setup_shutdown function · rust · L6-L14 (9 LOC)
crates/bubbaloop-node-sdk/src/shutdown.rs
pub fn setup_shutdown() -> anyhow::Result<(watch::Sender<()>, watch::Receiver<()>)> {
    let (tx, rx) = watch::channel(());
    let shutdown_tx = tx.clone();
    ctrlc::set_handler(move || {
        log::info!("Shutdown signal received");
        let _ = shutdown_tx.send(());
    })?;
    Ok((tx, rx))
}
open_zenoh_session function · rust · L10-L41 (32 LOC)
crates/bubbaloop-node-sdk/src/zenoh_session.rs
pub async fn open_zenoh_session(endpoint: &Option<String>) -> anyhow::Result<Arc<zenoh::Session>> {
    let endpoint = std::env::var("ZENOH_ENDPOINT")
        .or_else(|_| std::env::var("BUBBALOOP_ZENOH_ENDPOINT"))
        .ok()
        .or_else(|| endpoint.clone())
        .unwrap_or_else(|| "tcp/127.0.0.1:7447".to_string());

    log::info!("Connecting to Zenoh at: {}", endpoint);

    let mut config = zenoh::Config::default();
    // Client mode is mandatory — peer mode doesn't route through zenohd router
    config
        .insert_json5("mode", r#""client""#)
        .map_err(|e| anyhow::anyhow!("Failed to set Zenoh mode: {}", e))?;
    config
        .insert_json5("connect/endpoints", &format!(r#"["{}"]"#, endpoint))
        .map_err(|e| anyhow::anyhow!("Failed to set Zenoh endpoint: {}", e))?;
    // Disable scouting to prevent connecting to remote peers via Tailscale/VPN
    config
        .insert_json5("scouting/multicast/enabled", "false")
        .map_err(|e| anyhow::anyhow!(
main function · rust · L2-L31 (30 LOC)
crates/bubbaloop-schemas/build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
    let out_dir = PathBuf::from(std::env::var("OUT_DIR")?);
    let protos_dir = PathBuf::from("protos");

    let mut proto_files = Vec::new();
    for entry in std::fs::read_dir(&protos_dir)? {
        let entry = entry?;
        let path = entry.path();
        if path.extension().and_then(|s| s.to_str()) == Some("proto") {
            proto_files.push(path.to_string_lossy().into_owned());
        }
    }

    if proto_files.is_empty() {
        return Ok(());
    }

    let mut config = prost_build::Config::new();
    config.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]");
    config.file_descriptor_set_path(out_dir.join("descriptor.bin"));
    config.compile_protos(&proto_files, &[protos_dir.to_string_lossy().as_ref()])?;

    for proto_file in &proto_files {
        println!("cargo:rerun-if-changed={}", proto_file);
    }
    println!("cargo:rerun-if-changed=protos");

    Ok(())
}
test_parse_valid_yaml function · rust · L39-L45 (7 LOC)
crates/bubbaloop-schemas/src/config.rs
    fn test_parse_valid_yaml() {
        let yaml = "topics:\n  - /camera/cam0/compressed\n  - /weather/current\n";
        let config = TopicsConfig::parse(yaml).unwrap();
        assert_eq!(config.topics.len(), 2);
        assert_eq!(config.topics[0], "/camera/cam0/compressed");
        assert_eq!(config.topics[1], "/weather/current");
    }
test_parse_invalid_yaml function · rust · L55-L60 (6 LOC)
crates/bubbaloop-schemas/src/config.rs
    fn test_parse_invalid_yaml() {
        let result = TopicsConfig::parse("not: valid: yaml: [");
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert!(matches!(err, ConfigError::ParseError(_)));
    }
test_from_file_nonexistent function · rust · L69-L74 (6 LOC)
crates/bubbaloop-schemas/src/config.rs
    fn test_from_file_nonexistent() {
        let result = TopicsConfig::from_file("/nonexistent/path.yaml");
        assert!(result.is_err());
        let err = result.unwrap_err();
        assert!(matches!(err, ConfigError::IoError(_)));
    }
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
new function · rust · L60-L65 (6 LOC)
crates/bubbaloop-schemas/src/lib.rs
        pub fn new(descriptor_bytes: Vec<u8>, schema_name: &str) -> Self {
            Self {
                descriptor_bytes,
                schema_name: schema_name.to_string(),
            }
        }
get_descriptor_pool function · rust · L70-L76 (7 LOC)
crates/bubbaloop-schemas/src/lib.rs
    fn get_descriptor_pool() -> &'static DescriptorPool {
        DESCRIPTOR_POOL.get_or_init(|| {
            DescriptorPool::decode(DESCRIPTOR)
                .expect("Failed to decode FileDescriptorSet into DescriptorPool")
        })
    }
extract_message_descriptor function · rust · L79-L128 (50 LOC)
crates/bubbaloop-schemas/src/lib.rs
    fn extract_message_descriptor(type_name: &str) -> Result<Vec<u8>, prost::DecodeError> {
        let pool = get_descriptor_pool();

        let message_descriptor = pool.get_message_by_name(type_name).ok_or_else(|| {
            prost::DecodeError::new(format!(
                "Message type '{}' not found in descriptor pool",
                type_name
            ))
        })?;

        let file_descriptor = message_descriptor.parent_file();

        let mut collected_files = std::collections::HashSet::new();
        let mut files_to_include = Vec::new();

        fn collect_file_and_deps(
            file: &FileDescriptor,
            collected: &mut std::collections::HashSet<String>,
            result: &mut Vec<prost_types::FileDescriptorProto>,
        ) {
            let file_name = file.name();
            if collected.contains(file_name) {
                return;
            }
            collected.insert(file_name.to_string());

            result.push(file.file_descriptor_
collect_file_and_deps function · rust · L93-L110 (18 LOC)
crates/bubbaloop-schemas/src/lib.rs
        fn collect_file_and_deps(
            file: &FileDescriptor,
            collected: &mut std::collections::HashSet<String>,
            result: &mut Vec<prost_types::FileDescriptorProto>,
        ) {
            let file_name = file.name();
            if collected.contains(file_name) {
                return;
            }
            collected.insert(file_name.to_string());

            result.push(file.file_descriptor_proto().clone());

            for dep_file in file.dependencies() {
                collect_file_and_deps(&dep_file, collected, result);
            }
        }
get_descriptor_for_message function · rust · L134-L139 (6 LOC)
crates/bubbaloop-schemas/src/lib.rs
    pub fn get_descriptor_for_message<T: MessageTypeName>(
    ) -> Result<MessageDescriptor, prost::DecodeError> {
        let type_name = T::type_name();
        let descriptor_bytes = extract_message_descriptor(type_name)?;
        Ok(MessageDescriptor::new(descriptor_bytes, type_name))
    }
test_header_roundtrip function · rust · L163-L177 (15 LOC)
crates/bubbaloop-schemas/src/lib.rs
    fn test_header_roundtrip() {
        let header = Header {
            acq_time: 1000,
            pub_time: 2000,
            sequence: 42,
            frame_id: "cam0".into(),
            machine_id: "jetson1".into(),
            scope: "default".into(),
        };
        let bytes = header.encode_to_vec();
        let decoded = Header::decode(bytes.as_slice()).unwrap();
        assert_eq!(decoded.sequence, 42);
        assert_eq!(decoded.frame_id, "cam0");
        assert_eq!(decoded.scope, "default");
    }
test_header_serde_json_roundtrip function · rust · L188-L198 (11 LOC)
crates/bubbaloop-schemas/src/lib.rs
    fn test_header_serde_json_roundtrip() {
        let header = Header {
            sequence: 99,
            frame_id: "test".into(),
            ..Default::default()
        };
        let json = serde_json::to_string(&header).unwrap();
        let decoded: Header = serde_json::from_str(&json).unwrap();
        assert_eq!(decoded.sequence, 99);
        assert_eq!(decoded.frame_id, "test");
    }
print_json_envelope function · rust · L124-L143 (20 LOC)
crates/bubbaloop/src/cli/debug.rs
fn print_json_envelope(
    envelope: &serde_json::Value,
    payload_key: &str,
    payload: &[u8],
) -> Result<()> {
    let mut obj = envelope.as_object().cloned().unwrap_or_default();
    if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(payload) {
        obj.insert(payload_key.to_string(), json_val);
    } else {
        obj.insert(
            payload_key.to_string(),
            serde_json::Value::String(String::from_utf8_lossy(payload).into_owned()),
        );
    }
    println!(
        "{}",
        serde_json::to_string_pretty(&serde_json::Value::Object(obj))?
    );
    Ok(())
}
Repobility · code-quality intelligence platform · https://repobility.com
run function · rust · L146-L167 (22 LOC)
crates/bubbaloop/src/cli/debug.rs
    pub async fn run(self) -> Result<()> {
        match self.action {
            None => {
                Self::print_help();
                Ok(())
            }
            Some(DebugAction::Topics(args)) => {
                log::warn!("Note: 'bubbaloop debug topics' is deprecated. Use MCP tool 'list_topics' instead.");
                list_topics(args).await
            }
            Some(DebugAction::Subscribe(args)) => {
                log::warn!("Note: 'bubbaloop debug subscribe' is deprecated. Use MCP tool 'subscribe_topic' instead.");
                subscribe_topic(args).await
            }
            Some(DebugAction::Query(args)) => {
                log::warn!("Note: 'bubbaloop debug query' is deprecated. Use MCP tool 'query_zenoh' instead.");
                query_endpoint(args).await
            }
            Some(DebugAction::Info(args)) => show_info(args).await,
            Some(DebugAction::Liveliness(args)) => query_liveliness(args).await,
        }
    }
print_help function · rust · L168-L179 (12 LOC)
crates/bubbaloop/src/cli/debug.rs
    fn print_help() {
        eprintln!("Debug commands for Zenoh network inspection\n");
        eprintln!("Usage: bubbaloop debug <command>\n");
        eprintln!("Commands:");
        eprintln!("  info        Show Zenoh connection information");
        eprintln!("  topics      List all active Zenoh topics");
        eprintln!("  query       Query a Zenoh queryable endpoint");
        eprintln!("  subscribe   Subscribe to a Zenoh topic and watch messages");
        eprintln!("  liveliness  Query liveliness tokens (entity discovery)");
        eprintln!("\nRun 'bubbaloop debug <command> --help' for more information.");
    }
get_zenoh_session function · rust · L181-L208 (28 LOC)
crates/bubbaloop/src/cli/debug.rs
async fn get_zenoh_session() -> Result<zenoh::Session> {
    let mut config = zenoh::Config::default();

    // Run as client mode
    config
        .insert_json5("mode", "\"client\"")
        .map_err(|e| DebugError::Zenoh(e.to_string()))?;

    let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
        .unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());
    config
        .insert_json5("connect/endpoints", &format!("[\"{}\"]", endpoint))
        .map_err(|e| DebugError::Zenoh(e.to_string()))?;

    // Disable scouting
    config
        .insert_json5("scouting/multicast/enabled", "false")
        .map_err(|e| DebugError::Zenoh(e.to_string()))?;
    config
        .insert_json5("scouting/gossip/enabled", "false")
        .map_err(|e| DebugError::Zenoh(e.to_string()))?;

    let session = zenoh::open(config)
        .await
        .map_err(|e| DebugError::Zenoh(e.to_string()))?;
    Ok(session)
}
subscribe_topic function · rust · L274-L331 (58 LOC)
crates/bubbaloop/src/cli/debug.rs
async fn subscribe_topic(args: SubscribeArgs) -> Result<()> {
    let session = get_zenoh_session().await?;

    println!("Subscribing to: {}", args.topic);
    println!("Press Ctrl+C to stop...\n");

    let subscriber = session
        .declare_subscriber(&args.topic)
        .await
        .map_err(|e| DebugError::Zenoh(e.to_string()))?;

    loop {
        let sample = subscriber
            .recv_async()
            .await
            .map_err(|e| DebugError::Zenoh(e.to_string()))?;

        let key = sample.key_expr().to_string();
        let payload = sample.payload().to_bytes();
        let timestamp = sample
            .timestamp()
            .map(|t| t.to_string())
            .unwrap_or_else(|| "N/A".to_string());

        if args.json {
            print_json_envelope(
                &json!({"key": key, "timestamp": timestamp}),
                "payload",
                &payload,
            )?;
        } else {
            println!("[{}] {}", timestamp, key);

       
query_endpoint function · rust · L332-L423 (92 LOC)
crates/bubbaloop/src/cli/debug.rs
async fn query_endpoint(args: QueryArgs) -> Result<()> {
    let session = get_zenoh_session().await?;

    if !args.json {
        println!("Querying: {}", args.key);
        println!("Timeout: {}s\n", args.timeout);
    }

    let mut get_builder = session
        .get(&args.key)
        .target(QueryTarget::All)
        .timeout(Duration::from_secs(args.timeout));

    if let Some(ref payload) = args.payload {
        get_builder = get_builder.payload(payload.clone());
    }

    let replies: Vec<_> = get_builder
        .await
        .map_err(|e| DebugError::Zenoh(e.to_string()))?
        .into_iter()
        .collect();

    if replies.is_empty() {
        if args.json {
            println!(
                "{}",
                serde_json::to_string_pretty(&json!({
                    "error": "No replies received",
                    "key": args.key
                }))?
            );
        } else {
            println!("No replies received. Endpoint may not exist or is no
show_info function · rust · L424-L467 (44 LOC)
crates/bubbaloop/src/cli/debug.rs
async fn show_info(args: InfoArgs) -> Result<()> {
    let session = get_zenoh_session().await?;

    let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
        .unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());

    let session_id = session.zid().to_string();

    if args.json {
        println!(
            "{}",
            serde_json::to_string_pretty(&json!({
                "endpoint": endpoint,
                "session_id": session_id,
                "mode": "client",
                "status": "connected"
            }))?
        );
    } else {
        println!("Zenoh Connection Info");
        println!("{}", "=".repeat(80));
        println!("Endpoint:     {}", endpoint);
        println!("Session ID:   {}", session_id);
        println!("Mode:         client");
        println!("Status:       connected");
        println!("\nEnvironment:");
        println!(
            "  BUBBALOOP_ZENOH_ENDPOINT: {}",
            std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
       
query_liveliness function · rust · L468-L570 (103 LOC)
crates/bubbaloop/src/cli/debug.rs
async fn query_liveliness(args: LivelinessArgs) -> Result<()> {
    let session = get_zenoh_session().await?;

    if !args.json {
        println!("Querying liveliness tokens: {}", args.pattern);
        println!("Timeout: {}s\n", args.timeout);
    }

    let replies: Vec<_> = session
        .liveliness()
        .get(&args.pattern)
        .timeout(Duration::from_secs(args.timeout))
        .await
        .map_err(|e| DebugError::Zenoh(e.to_string()))?
        .into_iter()
        .collect();

    let mut tokens: Vec<String> = Vec::new();
    for reply in &replies {
        if let Ok(sample) = reply.result() {
            tokens.push(sample.key_expr().to_string());
        }
    }
    tokens.sort();

    if args.json {
        // Parse liveliness tokens into structured data
        let parsed: Vec<serde_json::Value> = tokens
            .iter()
            .map(|t| {
                let parts: Vec<&str> = t.split('/').collect();
                // @ros2_lv/<domain>/<zid>/<nid>/<ei
test_debug_error_display function · rust · L577-L583 (7 LOC)
crates/bubbaloop/src/cli/debug.rs
    fn test_debug_error_display() {
        let err = DebugError::Timeout;
        assert_eq!(err.to_string(), "Timeout waiting for response");

        let err = DebugError::Zenoh("connection failed".to_string());
        assert_eq!(err.to_string(), "Zenoh error: connection failed");
    }
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
description function · rust · L32-L43 (12 LOC)
crates/bubbaloop/src/cli/doctor.rs
    fn description(&self) -> &'static str {
        match self {
            FixAction::StartZenohd => "Start zenohd router",
            FixAction::StartDaemonService => "Start bubbaloop-daemon service",
            FixAction::RestartDaemonService => "Restart bubbaloop-daemon service",
            FixAction::StartBridgeService => "Start zenoh-bridge service",
            FixAction::CreateZenohConfig => "Create Zenoh config file",
            FixAction::CreateMarketplaceSources => {
                "Create marketplace sources with official registry"
            }
        }
    }
execute function · rust · L44-L156 (113 LOC)
crates/bubbaloop/src/cli/doctor.rs
    async fn execute(&self) -> Result<String> {
        match self {
            FixAction::StartZenohd => {
                // Start zenohd in background
                let mut cmd = Command::new("zenohd");
                cmd.stdout(std::process::Stdio::null())
                    .stderr(std::process::Stdio::null());

                let child = cmd.spawn()?;
                // Detach the process
                std::mem::forget(child);

                // Wait a moment for it to start
                tokio::time::sleep(Duration::from_millis(500)).await;

                // Verify it started
                if is_process_running("zenohd").await {
                    Ok("zenohd started successfully".to_string())
                } else {
                    Err(anyhow!("Failed to start zenohd"))
                }
            }
            FixAction::StartDaemonService => {
                let output = Command::new("systemctl")
                    .args(["--user", "start", "bubbaloop
pass function · rust · L173-L182 (10 LOC)
crates/bubbaloop/src/cli/doctor.rs
    fn pass(check: &str, message: &str) -> Self {
        Self {
            check: check.to_string(),
            passed: true,
            message: message.to_string(),
            fix: None,
            fix_action: None,
            details: None,
        }
    }
pass_with_details function · rust · L183-L193 (11 LOC)
crates/bubbaloop/src/cli/doctor.rs
    fn pass_with_details(check: &str, message: &str, details: serde_json::Value) -> Self {
        Self {
            check: check.to_string(),
            passed: true,
            message: message.to_string(),
            fix: None,
            fix_action: None,
            details: Some(details),
        }
    }
fail function · rust · L194-L204 (11 LOC)
crates/bubbaloop/src/cli/doctor.rs
    fn fail(check: &str, message: &str, fix: &str) -> Self {
        Self {
            check: check.to_string(),
            passed: false,
            message: message.to_string(),
            fix: Some(fix.to_string()),
            fix_action: None,
            details: None,
        }
    }
fail_with_action function · rust · L205-L215 (11 LOC)
crates/bubbaloop/src/cli/doctor.rs
    fn fail_with_action(check: &str, message: &str, fix: &str, action: FixAction) -> Self {
        Self {
            check: check.to_string(),
            passed: false,
            message: message.to_string(),
            fix: Some(fix.to_string()),
            fix_action: Some(action),
            details: None,
        }
    }
fail_with_details function · rust · L216-L231 (16 LOC)
crates/bubbaloop/src/cli/doctor.rs
    fn fail_with_details(
        check: &str,
        message: &str,
        fix: &str,
        details: serde_json::Value,
    ) -> Self {
        Self {
            check: check.to_string(),
            passed: false,
            message: message.to_string(),
            fix: Some(fix.to_string()),
            fix_action: None,
            details: Some(details),
        }
    }
run function · rust · L269-L423 (155 LOC)
crates/bubbaloop/src/cli/doctor.rs
pub async fn run(fix: bool, json: bool, check: &str) -> Result<()> {
    // Normalize check name
    let check_type = check.to_lowercase();

    if !json {
        if fix {
            println!("bubbaloop doctor --fix");
            println!("=====================");
        } else {
            println!("bubbaloop doctor");
            println!("================");
        }
        println!();
    }

    let mut results = Vec::new();
    let mut fixes_applied = 0;

    // Determine which checks to run
    let run_config = check_type == "all" || check_type == "config";
    let run_services = check_type == "all" || check_type == "zenoh";
    let run_zenoh = check_type == "all" || check_type == "zenoh";
    let run_daemon = check_type == "all" || check_type == "daemon";
    let run_security = check_type == "all" || check_type == "security";

    let mut session: Option<Session> = None;

    // 0. Check configuration files
    if run_config {
        if !json {
            println!("[0/
Powered by Repobility — scan your code at https://repobility.com
print_json_results function · rust · L424-L441 (18 LOC)
crates/bubbaloop/src/cli/doctor.rs
fn print_json_results(results: &[DiagnosticResult], fixes_applied: usize) -> Result<()> {
    let passed = results.iter().filter(|r| r.passed).count();
    let failed = results.iter().filter(|r| !r.passed).count();

    let output = serde_json::json!({
        "summary": {
            "total": results.len(),
            "passed": passed,
            "failed": failed,
            "fixes_applied": fixes_applied,
        },
        "checks": results,
    });

    println!("{}", serde_json::to_string_pretty(&output)?);
    Ok(())
}
print_human_results function · rust · L442-L512 (71 LOC)
crates/bubbaloop/src/cli/doctor.rs
fn print_human_results(
    results: &[DiagnosticResult],
    fixes_applied: usize,
    fix: bool,
) -> Result<()> {
    println!("Summary");
    println!("=======");
    println!();

    let mut issues_found = 0;

    for result in results {
        let symbol = if result.passed { "✓" } else { "✗" };
        println!("[{}] {}: {}", symbol, result.check, result.message);

        if !result.passed {
            issues_found += 1;
            if let Some(fix_hint) = &result.fix {
                if result.fix_action.is_some() {
                    println!("    → Auto-fixable: {}", fix_hint);
                } else {
                    println!("    → Fix: {}", fix_hint);
                }
            }
        }

        // Print details if available
        if let Some(details) = &result.details {
            if let Some(obj) = details.as_object() {
                for (key, value) in obj {
                    println!("    {} = {}", key, value);
                }
            }
    
apply_fixes function · rust · L515-L540 (26 LOC)
crates/bubbaloop/src/cli/doctor.rs
async fn apply_fixes(results: &mut [DiagnosticResult]) -> usize {
    let mut fixes_applied = 0;

    for result in results.iter_mut() {
        if result.passed || result.fix_action.is_none() {
            continue;
        }

        let action = result.fix_action.clone().unwrap();
        println!("    → Fixing: {}", action.description());

        match action.execute().await {
            Ok(msg) => {
                println!("      ✓ {}", msg);
                result.passed = true;
                result.message = format!("{} (fixed)", result.message);
                fixes_applied += 1;
            }
            Err(e) => {
                println!("      ✗ Failed: {}", e);
            }
        }
    }

    fixes_applied
}
check_system_services function · rust · L646-L713 (68 LOC)
crates/bubbaloop/src/cli/doctor.rs
async fn check_system_services() -> Vec<DiagnosticResult> {
    let mut results = Vec::new();

    // Check zenohd
    let zenohd_running = is_process_running("zenohd").await;
    if zenohd_running {
        // Additional check: is port 7447 listening?
        let port_check = check_port(7447).await;
        if port_check {
            results.push(DiagnosticResult::pass("zenohd", "running on port 7447"));
        } else {
            results.push(DiagnosticResult::fail(
                "zenohd",
                "running but port 7447 not accessible",
                "Check if zenohd is configured to listen on tcp/127.0.0.1:7447",
            ));
        }
    } else {
        results.push(DiagnosticResult::fail_with_action(
            "zenohd",
            "not running",
            "Run: zenohd &",
            FixAction::StartZenohd,
        ));
    }

    // Check bubbaloop-daemon
    let daemon_service = check_systemd_service("bubbaloop-daemon.service").await;
    if daemon_servi
is_process_running function · rust · L714-L719 (6 LOC)
crates/bubbaloop/src/cli/doctor.rs
async fn is_process_running(name: &str) -> bool {
    let output = Command::new("pgrep").arg("-x").arg(name).output().await;

    matches!(output, Ok(out) if out.status.success())
}
check_port function · rust · L720-L726 (7 LOC)
crates/bubbaloop/src/cli/doctor.rs
async fn check_port(port: u16) -> bool {
    // Try to connect to the port
    tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
        .await
        .is_ok()
}
check_systemd_service function · rust · L727-L738 (12 LOC)
crates/bubbaloop/src/cli/doctor.rs
async fn check_systemd_service(service_name: &str) -> String {
    let output = Command::new("systemctl")
        .args(["--user", "is-active", service_name])
        .output()
        .await;

    match output {
        Ok(out) => String::from_utf8_lossy(&out.stdout).trim().to_string(),
        Err(_) => "unknown".to_string(),
    }
}
get_zenoh_session function · rust · L739-L761 (23 LOC)
crates/bubbaloop/src/cli/doctor.rs
async fn get_zenoh_session() -> Result<Session> {
    let mut config = zenoh::Config::default();

    config
        .insert_json5("mode", "\"client\"")
        .map_err(|e| anyhow!("Failed to configure client mode: {}", e))?;

    let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
        .unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());

    config
        .insert_json5("connect/endpoints", &format!("[\"{}\"]", endpoint))
        .map_err(|e| anyhow!("Failed to configure endpoint: {}", e))?;

    // Disable scouting
    let _ = config.insert_json5("scouting/multicast/enabled", "false");
    let _ = config.insert_json5("scouting/gossip/enabled", "false");

    zenoh::open(config)
        .await
        .map_err(|e| anyhow!("Failed to open Zenoh session: {}", e))
}
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
check_zenoh_comprehensive function · rust · L764-L912 (149 LOC)
crates/bubbaloop/src/cli/doctor.rs
async fn check_zenoh_comprehensive() -> Vec<DiagnosticResult> {
    let mut results = Vec::new();

    let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
        .unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());

    // Parse endpoint to get host and port
    let (host, port) = parse_zenoh_endpoint(&endpoint);

    // Check 1: Can we create a session?
    let session_result = get_zenoh_session().await;
    match &session_result {
        Ok(session) => {
            let session_id = session.zid().to_string();
            results.push(DiagnosticResult::pass_with_details(
                "Zenoh connection",
                &format!("connected to {}", endpoint),
                serde_json::json!({
                    "endpoint": endpoint,
                    "session_id": session_id,
                    "mode": "client",
                }),
            ));

            // Check 2: Can we declare a test queryable?
            match session.declare_queryable("bubbaloop/doctor/tes
parse_zenoh_endpoint function · rust · L913-L925 (13 LOC)
crates/bubbaloop/src/cli/doctor.rs
fn parse_zenoh_endpoint(endpoint: &str) -> (String, u16) {
    // Parse "tcp/127.0.0.1:7447" format
    if let Some(addr_part) = endpoint.strip_prefix("tcp/") {
        if let Some((host, port_str)) = addr_part.rsplit_once(':') {
            if let Ok(port) = port_str.parse() {
                return (host.to_string(), port);
            }
        }
    }
    // Default
    ("127.0.0.1".to_string(), 7447)
}
check_daemon_health function · rust · L926-L970 (45 LOC)
crates/bubbaloop/src/cli/doctor.rs
async fn check_daemon_health(session: &Session) -> Vec<DiagnosticResult> {
    let mut results = Vec::new();

    // Query health endpoint
    match query_with_timeout::<HealthResponse>(session, "bubbaloop/daemon/api/health").await {
        Ok(response) => {
            if response.status == "ok" {
                results.push(DiagnosticResult::pass("Daemon health", "ok"));
            } else {
                results.push(DiagnosticResult::fail(
                    "Daemon health",
                    &format!("unexpected status: {}", response.status),
                    "Run: systemctl --user restart bubbaloop-daemon",
                ));
            }
        }
        Err(e) => {
            results.push(DiagnosticResult::fail(
                "Daemon health",
                &format!("query failed: {}", e),
                "Check if daemon is connected to the same Zenoh router. Run: systemctl --user status bubbaloop-daemon",
            ));
        }
    }

    // Query nodes e
page 1 / 14next ›