Function bodies 699 total
find_proto_files function · rust · L4-L17 (14 LOC)crates/bubbaloop/build.rs
fn find_proto_files(dir: &str) -> Vec<String> {
let mut proto_files = Vec::new();
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
proto_files.extend(find_proto_files(path.to_str().unwrap()));
} else if path.extension().and_then(|s| s.to_str()) == Some("proto") {
proto_files.push(path.to_string_lossy().into_owned());
}
}
}
proto_files
}main function · rust · L18-L54 (37 LOC)crates/bubbaloop/build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(std::env::var("OUT_DIR")?);
let protos_dir = PathBuf::from("../bubbaloop-schemas/protos");
// Bubbaloop protobuf files
let proto_files = find_proto_files(protos_dir.to_str().unwrap());
// Compile all proto files
let mut prost_build = prost_build::Config::new();
prost_build.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]");
prost_build.file_descriptor_set_path(out_dir.join("descriptor.bin"));
prost_build.compile_protos(&proto_files, &[protos_dir.to_string_lossy().as_ref()])?;
// Rerun if any proto file changes
for proto_file in &proto_files {
println!("cargo:rerun-if-changed={}", proto_file);
}
println!("cargo:rerun-if-changed={}", protos_dir.to_string_lossy());
// Rebuild if any dashboard dist file changes (for rust-embed)
fn watch_dir(dir: &str) {
if let Ok(entries) = std::fs::read_dir(dir) {
watch_dir function · rust · L39-L50 (12 LOC)crates/bubbaloop/build.rs
fn watch_dir(dir: &str) {
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
watch_dir(path.to_str().unwrap());
} else {
println!("cargo:rerun-if-changed={}", path.display());
}
}
}
}load_config function · rust · L4-L10 (7 LOC)crates/bubbaloop-node-sdk/src/config.rs
pub fn load_config<C: serde::de::DeserializeOwned>(path: &Path) -> anyhow::Result<C> {
let content = std::fs::read_to_string(path)
.map_err(|e| anyhow::anyhow!("Failed to read config '{}': {}", path.display(), e))?;
let config: C = serde_yaml::from_str(&content)
.map_err(|e| anyhow::anyhow!("Failed to parse config '{}': {}", path.display(), e))?;
Ok(config)
}test_load_valid_config function · rust · L24-L31 (8 LOC)crates/bubbaloop-node-sdk/src/config.rs
fn test_load_valid_config() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("config.yaml");
std::fs::write(&path, "name: test\nrate_hz: 10.0\n").unwrap();
let config: TestConfig = load_config(&path).unwrap();
assert_eq!(config.name, "test");
assert!((config.rate_hz - 10.0).abs() < f64::EPSILON);
}test_load_invalid_yaml function · rust · L40-L46 (7 LOC)crates/bubbaloop-node-sdk/src/config.rs
fn test_load_invalid_yaml() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("bad.yaml");
std::fs::write(&path, "not: [valid: yaml: {{").unwrap();
let result: anyhow::Result<TestConfig> = load_config(&path);
assert!(result.is_err());
}init function · rust · L67-L87 (21 LOC)crates/bubbaloop-node-sdk/src/lib.rs
async fn init(ctx: &NodeContext, config: &Self::Config) -> anyhow::Result<Self>
where
Self: Sized;
/// Main loop. Called after init(). Must respect ctx.shutdown_rx for graceful exit.
/// When the shutdown signal fires, this method should return Ok(()).
async fn run(self, ctx: NodeContext) -> anyhow::Result<()>;
}
/// Built-in CLI arguments handled by the SDK.
#[derive(argh::FromArgs)]
#[argh(description = "Bubbaloop node")]
struct SdkArgs {
/// path to configuration file
#[argh(option, short = 'c', default = "default_config_path()")]
config: PathBuf,
/// zenoh endpoint to connect to
#[argh(option, short = 'e')]
endpoint: Option<String>,
}Powered by Repobility — scan your code at https://repobility.com
run_node function · rust · L98-L162 (65 LOC)crates/bubbaloop-node-sdk/src/lib.rs
pub async fn run_node<N: Node>() -> anyhow::Result<()> {
// 1. Init logging
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
// 2. Parse CLI args
let args: SdkArgs = argh::from_env();
// 3. Load config
let node_config: N::Config = config::load_config(&args.config)?;
log::info!(
"{}: config loaded from {}",
N::name(),
args.config.display()
);
// 4. Resolve scope + machine_id
let scope = std::env::var("BUBBALOOP_SCOPE").unwrap_or_else(|_| "local".to_string());
let machine_id = std::env::var("BUBBALOOP_MACHINE_ID")
.unwrap_or_else(|_| {
hostname::get()
.map(|h| h.to_string_lossy().to_string())
.unwrap_or_else(|_| "unknown".to_string())
})
.replace('-', "_");
log::info!("Scope: {}, Machine ID: {}", scope, machine_id);
// 5. Setup shutdown channel
let (shutdown_tx, _) = shutdown::setup_shutdown()?;
declare_schema_queryable function · rust · L8-L33 (26 LOC)crates/bubbaloop-node-sdk/src/schema.rs
pub async fn declare_schema_queryable(
session: &Arc<zenoh::Session>,
scope: &str,
machine_id: &str,
node_name: &str,
descriptor: &'static [u8],
) -> anyhow::Result<zenoh::query::Queryable<()>> {
let schema_key = format!("bubbaloop/{}/{}/{}/schema", scope, machine_id, node_name);
let queryable = session
.declare_queryable(&schema_key)
.callback({
let descriptor = descriptor.to_vec();
move |query| {
log::debug!("Schema query received");
if let Err(e) = query.reply(&query.key_expr().clone(), descriptor.as_slice()) {
log::warn!("Failed to reply to schema query: {}", e);
}
}
})
.await
.map_err(|e| anyhow::anyhow!("Failed to create schema queryable: {}", e))?;
log::info!("Schema queryable: {}", schema_key);
Ok(queryable)
}setup_shutdown function · rust · L6-L14 (9 LOC)crates/bubbaloop-node-sdk/src/shutdown.rs
pub fn setup_shutdown() -> anyhow::Result<(watch::Sender<()>, watch::Receiver<()>)> {
let (tx, rx) = watch::channel(());
let shutdown_tx = tx.clone();
ctrlc::set_handler(move || {
log::info!("Shutdown signal received");
let _ = shutdown_tx.send(());
})?;
Ok((tx, rx))
}open_zenoh_session function · rust · L10-L41 (32 LOC)crates/bubbaloop-node-sdk/src/zenoh_session.rs
pub async fn open_zenoh_session(endpoint: &Option<String>) -> anyhow::Result<Arc<zenoh::Session>> {
let endpoint = std::env::var("ZENOH_ENDPOINT")
.or_else(|_| std::env::var("BUBBALOOP_ZENOH_ENDPOINT"))
.ok()
.or_else(|| endpoint.clone())
.unwrap_or_else(|| "tcp/127.0.0.1:7447".to_string());
log::info!("Connecting to Zenoh at: {}", endpoint);
let mut config = zenoh::Config::default();
// Client mode is mandatory — peer mode doesn't route through zenohd router
config
.insert_json5("mode", r#""client""#)
.map_err(|e| anyhow::anyhow!("Failed to set Zenoh mode: {}", e))?;
config
.insert_json5("connect/endpoints", &format!(r#"["{}"]"#, endpoint))
.map_err(|e| anyhow::anyhow!("Failed to set Zenoh endpoint: {}", e))?;
// Disable scouting to prevent connecting to remote peers via Tailscale/VPN
config
.insert_json5("scouting/multicast/enabled", "false")
.map_err(|e| anyhow::anyhow!(main function · rust · L2-L31 (30 LOC)crates/bubbaloop-schemas/build.rs
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(std::env::var("OUT_DIR")?);
let protos_dir = PathBuf::from("protos");
let mut proto_files = Vec::new();
for entry in std::fs::read_dir(&protos_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("proto") {
proto_files.push(path.to_string_lossy().into_owned());
}
}
if proto_files.is_empty() {
return Ok(());
}
let mut config = prost_build::Config::new();
config.type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]");
config.file_descriptor_set_path(out_dir.join("descriptor.bin"));
config.compile_protos(&proto_files, &[protos_dir.to_string_lossy().as_ref()])?;
for proto_file in &proto_files {
println!("cargo:rerun-if-changed={}", proto_file);
}
println!("cargo:rerun-if-changed=protos");
Ok(())
}test_parse_valid_yaml function · rust · L39-L45 (7 LOC)crates/bubbaloop-schemas/src/config.rs
fn test_parse_valid_yaml() {
let yaml = "topics:\n - /camera/cam0/compressed\n - /weather/current\n";
let config = TopicsConfig::parse(yaml).unwrap();
assert_eq!(config.topics.len(), 2);
assert_eq!(config.topics[0], "/camera/cam0/compressed");
assert_eq!(config.topics[1], "/weather/current");
}test_parse_invalid_yaml function · rust · L55-L60 (6 LOC)crates/bubbaloop-schemas/src/config.rs
fn test_parse_invalid_yaml() {
let result = TopicsConfig::parse("not: valid: yaml: [");
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, ConfigError::ParseError(_)));
}test_from_file_nonexistent function · rust · L69-L74 (6 LOC)crates/bubbaloop-schemas/src/config.rs
fn test_from_file_nonexistent() {
let result = TopicsConfig::from_file("/nonexistent/path.yaml");
assert!(result.is_err());
let err = result.unwrap_err();
assert!(matches!(err, ConfigError::IoError(_)));
}Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
new function · rust · L60-L65 (6 LOC)crates/bubbaloop-schemas/src/lib.rs
pub fn new(descriptor_bytes: Vec<u8>, schema_name: &str) -> Self {
Self {
descriptor_bytes,
schema_name: schema_name.to_string(),
}
}get_descriptor_pool function · rust · L70-L76 (7 LOC)crates/bubbaloop-schemas/src/lib.rs
fn get_descriptor_pool() -> &'static DescriptorPool {
DESCRIPTOR_POOL.get_or_init(|| {
DescriptorPool::decode(DESCRIPTOR)
.expect("Failed to decode FileDescriptorSet into DescriptorPool")
})
}extract_message_descriptor function · rust · L79-L128 (50 LOC)crates/bubbaloop-schemas/src/lib.rs
fn extract_message_descriptor(type_name: &str) -> Result<Vec<u8>, prost::DecodeError> {
let pool = get_descriptor_pool();
let message_descriptor = pool.get_message_by_name(type_name).ok_or_else(|| {
prost::DecodeError::new(format!(
"Message type '{}' not found in descriptor pool",
type_name
))
})?;
let file_descriptor = message_descriptor.parent_file();
let mut collected_files = std::collections::HashSet::new();
let mut files_to_include = Vec::new();
fn collect_file_and_deps(
file: &FileDescriptor,
collected: &mut std::collections::HashSet<String>,
result: &mut Vec<prost_types::FileDescriptorProto>,
) {
let file_name = file.name();
if collected.contains(file_name) {
return;
}
collected.insert(file_name.to_string());
result.push(file.file_descriptor_collect_file_and_deps function · rust · L93-L110 (18 LOC)crates/bubbaloop-schemas/src/lib.rs
fn collect_file_and_deps(
file: &FileDescriptor,
collected: &mut std::collections::HashSet<String>,
result: &mut Vec<prost_types::FileDescriptorProto>,
) {
let file_name = file.name();
if collected.contains(file_name) {
return;
}
collected.insert(file_name.to_string());
result.push(file.file_descriptor_proto().clone());
for dep_file in file.dependencies() {
collect_file_and_deps(&dep_file, collected, result);
}
}get_descriptor_for_message function · rust · L134-L139 (6 LOC)crates/bubbaloop-schemas/src/lib.rs
pub fn get_descriptor_for_message<T: MessageTypeName>(
) -> Result<MessageDescriptor, prost::DecodeError> {
let type_name = T::type_name();
let descriptor_bytes = extract_message_descriptor(type_name)?;
Ok(MessageDescriptor::new(descriptor_bytes, type_name))
}test_header_roundtrip function · rust · L163-L177 (15 LOC)crates/bubbaloop-schemas/src/lib.rs
fn test_header_roundtrip() {
let header = Header {
acq_time: 1000,
pub_time: 2000,
sequence: 42,
frame_id: "cam0".into(),
machine_id: "jetson1".into(),
scope: "default".into(),
};
let bytes = header.encode_to_vec();
let decoded = Header::decode(bytes.as_slice()).unwrap();
assert_eq!(decoded.sequence, 42);
assert_eq!(decoded.frame_id, "cam0");
assert_eq!(decoded.scope, "default");
}test_header_serde_json_roundtrip function · rust · L188-L198 (11 LOC)crates/bubbaloop-schemas/src/lib.rs
fn test_header_serde_json_roundtrip() {
let header = Header {
sequence: 99,
frame_id: "test".into(),
..Default::default()
};
let json = serde_json::to_string(&header).unwrap();
let decoded: Header = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.sequence, 99);
assert_eq!(decoded.frame_id, "test");
}print_json_envelope function · rust · L124-L143 (20 LOC)crates/bubbaloop/src/cli/debug.rs
fn print_json_envelope(
envelope: &serde_json::Value,
payload_key: &str,
payload: &[u8],
) -> Result<()> {
let mut obj = envelope.as_object().cloned().unwrap_or_default();
if let Ok(json_val) = serde_json::from_slice::<serde_json::Value>(payload) {
obj.insert(payload_key.to_string(), json_val);
} else {
obj.insert(
payload_key.to_string(),
serde_json::Value::String(String::from_utf8_lossy(payload).into_owned()),
);
}
println!(
"{}",
serde_json::to_string_pretty(&serde_json::Value::Object(obj))?
);
Ok(())
}Repobility · code-quality intelligence platform · https://repobility.com
run function · rust · L146-L167 (22 LOC)crates/bubbaloop/src/cli/debug.rs
pub async fn run(self) -> Result<()> {
match self.action {
None => {
Self::print_help();
Ok(())
}
Some(DebugAction::Topics(args)) => {
log::warn!("Note: 'bubbaloop debug topics' is deprecated. Use MCP tool 'list_topics' instead.");
list_topics(args).await
}
Some(DebugAction::Subscribe(args)) => {
log::warn!("Note: 'bubbaloop debug subscribe' is deprecated. Use MCP tool 'subscribe_topic' instead.");
subscribe_topic(args).await
}
Some(DebugAction::Query(args)) => {
log::warn!("Note: 'bubbaloop debug query' is deprecated. Use MCP tool 'query_zenoh' instead.");
query_endpoint(args).await
}
Some(DebugAction::Info(args)) => show_info(args).await,
Some(DebugAction::Liveliness(args)) => query_liveliness(args).await,
}
}print_help function · rust · L168-L179 (12 LOC)crates/bubbaloop/src/cli/debug.rs
fn print_help() {
eprintln!("Debug commands for Zenoh network inspection\n");
eprintln!("Usage: bubbaloop debug <command>\n");
eprintln!("Commands:");
eprintln!(" info Show Zenoh connection information");
eprintln!(" topics List all active Zenoh topics");
eprintln!(" query Query a Zenoh queryable endpoint");
eprintln!(" subscribe Subscribe to a Zenoh topic and watch messages");
eprintln!(" liveliness Query liveliness tokens (entity discovery)");
eprintln!("\nRun 'bubbaloop debug <command> --help' for more information.");
}get_zenoh_session function · rust · L181-L208 (28 LOC)crates/bubbaloop/src/cli/debug.rs
async fn get_zenoh_session() -> Result<zenoh::Session> {
let mut config = zenoh::Config::default();
// Run as client mode
config
.insert_json5("mode", "\"client\"")
.map_err(|e| DebugError::Zenoh(e.to_string()))?;
let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
.unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());
config
.insert_json5("connect/endpoints", &format!("[\"{}\"]", endpoint))
.map_err(|e| DebugError::Zenoh(e.to_string()))?;
// Disable scouting
config
.insert_json5("scouting/multicast/enabled", "false")
.map_err(|e| DebugError::Zenoh(e.to_string()))?;
config
.insert_json5("scouting/gossip/enabled", "false")
.map_err(|e| DebugError::Zenoh(e.to_string()))?;
let session = zenoh::open(config)
.await
.map_err(|e| DebugError::Zenoh(e.to_string()))?;
Ok(session)
}subscribe_topic function · rust · L274-L331 (58 LOC)crates/bubbaloop/src/cli/debug.rs
async fn subscribe_topic(args: SubscribeArgs) -> Result<()> {
let session = get_zenoh_session().await?;
println!("Subscribing to: {}", args.topic);
println!("Press Ctrl+C to stop...\n");
let subscriber = session
.declare_subscriber(&args.topic)
.await
.map_err(|e| DebugError::Zenoh(e.to_string()))?;
loop {
let sample = subscriber
.recv_async()
.await
.map_err(|e| DebugError::Zenoh(e.to_string()))?;
let key = sample.key_expr().to_string();
let payload = sample.payload().to_bytes();
let timestamp = sample
.timestamp()
.map(|t| t.to_string())
.unwrap_or_else(|| "N/A".to_string());
if args.json {
print_json_envelope(
&json!({"key": key, "timestamp": timestamp}),
"payload",
&payload,
)?;
} else {
println!("[{}] {}", timestamp, key);
query_endpoint function · rust · L332-L423 (92 LOC)crates/bubbaloop/src/cli/debug.rs
async fn query_endpoint(args: QueryArgs) -> Result<()> {
let session = get_zenoh_session().await?;
if !args.json {
println!("Querying: {}", args.key);
println!("Timeout: {}s\n", args.timeout);
}
let mut get_builder = session
.get(&args.key)
.target(QueryTarget::All)
.timeout(Duration::from_secs(args.timeout));
if let Some(ref payload) = args.payload {
get_builder = get_builder.payload(payload.clone());
}
let replies: Vec<_> = get_builder
.await
.map_err(|e| DebugError::Zenoh(e.to_string()))?
.into_iter()
.collect();
if replies.is_empty() {
if args.json {
println!(
"{}",
serde_json::to_string_pretty(&json!({
"error": "No replies received",
"key": args.key
}))?
);
} else {
println!("No replies received. Endpoint may not exist or is noshow_info function · rust · L424-L467 (44 LOC)crates/bubbaloop/src/cli/debug.rs
async fn show_info(args: InfoArgs) -> Result<()> {
let session = get_zenoh_session().await?;
let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
.unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());
let session_id = session.zid().to_string();
if args.json {
println!(
"{}",
serde_json::to_string_pretty(&json!({
"endpoint": endpoint,
"session_id": session_id,
"mode": "client",
"status": "connected"
}))?
);
} else {
println!("Zenoh Connection Info");
println!("{}", "=".repeat(80));
println!("Endpoint: {}", endpoint);
println!("Session ID: {}", session_id);
println!("Mode: client");
println!("Status: connected");
println!("\nEnvironment:");
println!(
" BUBBALOOP_ZENOH_ENDPOINT: {}",
std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
query_liveliness function · rust · L468-L570 (103 LOC)crates/bubbaloop/src/cli/debug.rs
async fn query_liveliness(args: LivelinessArgs) -> Result<()> {
let session = get_zenoh_session().await?;
if !args.json {
println!("Querying liveliness tokens: {}", args.pattern);
println!("Timeout: {}s\n", args.timeout);
}
let replies: Vec<_> = session
.liveliness()
.get(&args.pattern)
.timeout(Duration::from_secs(args.timeout))
.await
.map_err(|e| DebugError::Zenoh(e.to_string()))?
.into_iter()
.collect();
let mut tokens: Vec<String> = Vec::new();
for reply in &replies {
if let Ok(sample) = reply.result() {
tokens.push(sample.key_expr().to_string());
}
}
tokens.sort();
if args.json {
// Parse liveliness tokens into structured data
let parsed: Vec<serde_json::Value> = tokens
.iter()
.map(|t| {
let parts: Vec<&str> = t.split('/').collect();
// @ros2_lv/<domain>/<zid>/<nid>/<eitest_debug_error_display function · rust · L577-L583 (7 LOC)crates/bubbaloop/src/cli/debug.rs
fn test_debug_error_display() {
let err = DebugError::Timeout;
assert_eq!(err.to_string(), "Timeout waiting for response");
let err = DebugError::Zenoh("connection failed".to_string());
assert_eq!(err.to_string(), "Zenoh error: connection failed");
}Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
description function · rust · L32-L43 (12 LOC)crates/bubbaloop/src/cli/doctor.rs
fn description(&self) -> &'static str {
match self {
FixAction::StartZenohd => "Start zenohd router",
FixAction::StartDaemonService => "Start bubbaloop-daemon service",
FixAction::RestartDaemonService => "Restart bubbaloop-daemon service",
FixAction::StartBridgeService => "Start zenoh-bridge service",
FixAction::CreateZenohConfig => "Create Zenoh config file",
FixAction::CreateMarketplaceSources => {
"Create marketplace sources with official registry"
}
}
}execute function · rust · L44-L156 (113 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn execute(&self) -> Result<String> {
match self {
FixAction::StartZenohd => {
// Start zenohd in background
let mut cmd = Command::new("zenohd");
cmd.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
let child = cmd.spawn()?;
// Detach the process
std::mem::forget(child);
// Wait a moment for it to start
tokio::time::sleep(Duration::from_millis(500)).await;
// Verify it started
if is_process_running("zenohd").await {
Ok("zenohd started successfully".to_string())
} else {
Err(anyhow!("Failed to start zenohd"))
}
}
FixAction::StartDaemonService => {
let output = Command::new("systemctl")
.args(["--user", "start", "bubbalooppass function · rust · L173-L182 (10 LOC)crates/bubbaloop/src/cli/doctor.rs
fn pass(check: &str, message: &str) -> Self {
Self {
check: check.to_string(),
passed: true,
message: message.to_string(),
fix: None,
fix_action: None,
details: None,
}
}pass_with_details function · rust · L183-L193 (11 LOC)crates/bubbaloop/src/cli/doctor.rs
fn pass_with_details(check: &str, message: &str, details: serde_json::Value) -> Self {
Self {
check: check.to_string(),
passed: true,
message: message.to_string(),
fix: None,
fix_action: None,
details: Some(details),
}
}fail function · rust · L194-L204 (11 LOC)crates/bubbaloop/src/cli/doctor.rs
fn fail(check: &str, message: &str, fix: &str) -> Self {
Self {
check: check.to_string(),
passed: false,
message: message.to_string(),
fix: Some(fix.to_string()),
fix_action: None,
details: None,
}
}fail_with_action function · rust · L205-L215 (11 LOC)crates/bubbaloop/src/cli/doctor.rs
fn fail_with_action(check: &str, message: &str, fix: &str, action: FixAction) -> Self {
Self {
check: check.to_string(),
passed: false,
message: message.to_string(),
fix: Some(fix.to_string()),
fix_action: Some(action),
details: None,
}
}fail_with_details function · rust · L216-L231 (16 LOC)crates/bubbaloop/src/cli/doctor.rs
fn fail_with_details(
check: &str,
message: &str,
fix: &str,
details: serde_json::Value,
) -> Self {
Self {
check: check.to_string(),
passed: false,
message: message.to_string(),
fix: Some(fix.to_string()),
fix_action: None,
details: Some(details),
}
}run function · rust · L269-L423 (155 LOC)crates/bubbaloop/src/cli/doctor.rs
pub async fn run(fix: bool, json: bool, check: &str) -> Result<()> {
// Normalize check name
let check_type = check.to_lowercase();
if !json {
if fix {
println!("bubbaloop doctor --fix");
println!("=====================");
} else {
println!("bubbaloop doctor");
println!("================");
}
println!();
}
let mut results = Vec::new();
let mut fixes_applied = 0;
// Determine which checks to run
let run_config = check_type == "all" || check_type == "config";
let run_services = check_type == "all" || check_type == "zenoh";
let run_zenoh = check_type == "all" || check_type == "zenoh";
let run_daemon = check_type == "all" || check_type == "daemon";
let run_security = check_type == "all" || check_type == "security";
let mut session: Option<Session> = None;
// 0. Check configuration files
if run_config {
if !json {
println!("[0/Powered by Repobility — scan your code at https://repobility.com
print_json_results function · rust · L424-L441 (18 LOC)crates/bubbaloop/src/cli/doctor.rs
fn print_json_results(results: &[DiagnosticResult], fixes_applied: usize) -> Result<()> {
let passed = results.iter().filter(|r| r.passed).count();
let failed = results.iter().filter(|r| !r.passed).count();
let output = serde_json::json!({
"summary": {
"total": results.len(),
"passed": passed,
"failed": failed,
"fixes_applied": fixes_applied,
},
"checks": results,
});
println!("{}", serde_json::to_string_pretty(&output)?);
Ok(())
}print_human_results function · rust · L442-L512 (71 LOC)crates/bubbaloop/src/cli/doctor.rs
fn print_human_results(
results: &[DiagnosticResult],
fixes_applied: usize,
fix: bool,
) -> Result<()> {
println!("Summary");
println!("=======");
println!();
let mut issues_found = 0;
for result in results {
let symbol = if result.passed { "✓" } else { "✗" };
println!("[{}] {}: {}", symbol, result.check, result.message);
if !result.passed {
issues_found += 1;
if let Some(fix_hint) = &result.fix {
if result.fix_action.is_some() {
println!(" → Auto-fixable: {}", fix_hint);
} else {
println!(" → Fix: {}", fix_hint);
}
}
}
// Print details if available
if let Some(details) = &result.details {
if let Some(obj) = details.as_object() {
for (key, value) in obj {
println!(" {} = {}", key, value);
}
}
apply_fixes function · rust · L515-L540 (26 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn apply_fixes(results: &mut [DiagnosticResult]) -> usize {
let mut fixes_applied = 0;
for result in results.iter_mut() {
if result.passed || result.fix_action.is_none() {
continue;
}
let action = result.fix_action.clone().unwrap();
println!(" → Fixing: {}", action.description());
match action.execute().await {
Ok(msg) => {
println!(" ✓ {}", msg);
result.passed = true;
result.message = format!("{} (fixed)", result.message);
fixes_applied += 1;
}
Err(e) => {
println!(" ✗ Failed: {}", e);
}
}
}
fixes_applied
}check_system_services function · rust · L646-L713 (68 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn check_system_services() -> Vec<DiagnosticResult> {
let mut results = Vec::new();
// Check zenohd
let zenohd_running = is_process_running("zenohd").await;
if zenohd_running {
// Additional check: is port 7447 listening?
let port_check = check_port(7447).await;
if port_check {
results.push(DiagnosticResult::pass("zenohd", "running on port 7447"));
} else {
results.push(DiagnosticResult::fail(
"zenohd",
"running but port 7447 not accessible",
"Check if zenohd is configured to listen on tcp/127.0.0.1:7447",
));
}
} else {
results.push(DiagnosticResult::fail_with_action(
"zenohd",
"not running",
"Run: zenohd &",
FixAction::StartZenohd,
));
}
// Check bubbaloop-daemon
let daemon_service = check_systemd_service("bubbaloop-daemon.service").await;
if daemon_serviis_process_running function · rust · L714-L719 (6 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn is_process_running(name: &str) -> bool {
let output = Command::new("pgrep").arg("-x").arg(name).output().await;
matches!(output, Ok(out) if out.status.success())
}check_port function · rust · L720-L726 (7 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn check_port(port: u16) -> bool {
// Try to connect to the port
tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.is_ok()
}check_systemd_service function · rust · L727-L738 (12 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn check_systemd_service(service_name: &str) -> String {
let output = Command::new("systemctl")
.args(["--user", "is-active", service_name])
.output()
.await;
match output {
Ok(out) => String::from_utf8_lossy(&out.stdout).trim().to_string(),
Err(_) => "unknown".to_string(),
}
}get_zenoh_session function · rust · L739-L761 (23 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn get_zenoh_session() -> Result<Session> {
let mut config = zenoh::Config::default();
config
.insert_json5("mode", "\"client\"")
.map_err(|e| anyhow!("Failed to configure client mode: {}", e))?;
let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
.unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());
config
.insert_json5("connect/endpoints", &format!("[\"{}\"]", endpoint))
.map_err(|e| anyhow!("Failed to configure endpoint: {}", e))?;
// Disable scouting
let _ = config.insert_json5("scouting/multicast/enabled", "false");
let _ = config.insert_json5("scouting/gossip/enabled", "false");
zenoh::open(config)
.await
.map_err(|e| anyhow!("Failed to open Zenoh session: {}", e))
}Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
check_zenoh_comprehensive function · rust · L764-L912 (149 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn check_zenoh_comprehensive() -> Vec<DiagnosticResult> {
let mut results = Vec::new();
let endpoint = std::env::var("BUBBALOOP_ZENOH_ENDPOINT")
.unwrap_or_else(|_| "tcp/127.0.0.1:7447".to_string());
// Parse endpoint to get host and port
let (host, port) = parse_zenoh_endpoint(&endpoint);
// Check 1: Can we create a session?
let session_result = get_zenoh_session().await;
match &session_result {
Ok(session) => {
let session_id = session.zid().to_string();
results.push(DiagnosticResult::pass_with_details(
"Zenoh connection",
&format!("connected to {}", endpoint),
serde_json::json!({
"endpoint": endpoint,
"session_id": session_id,
"mode": "client",
}),
));
// Check 2: Can we declare a test queryable?
match session.declare_queryable("bubbaloop/doctor/tesparse_zenoh_endpoint function · rust · L913-L925 (13 LOC)crates/bubbaloop/src/cli/doctor.rs
fn parse_zenoh_endpoint(endpoint: &str) -> (String, u16) {
// Parse "tcp/127.0.0.1:7447" format
if let Some(addr_part) = endpoint.strip_prefix("tcp/") {
if let Some((host, port_str)) = addr_part.rsplit_once(':') {
if let Ok(port) = port_str.parse() {
return (host.to_string(), port);
}
}
}
// Default
("127.0.0.1".to_string(), 7447)
}check_daemon_health function · rust · L926-L970 (45 LOC)crates/bubbaloop/src/cli/doctor.rs
async fn check_daemon_health(session: &Session) -> Vec<DiagnosticResult> {
let mut results = Vec::new();
// Query health endpoint
match query_with_timeout::<HealthResponse>(session, "bubbaloop/daemon/api/health").await {
Ok(response) => {
if response.status == "ok" {
results.push(DiagnosticResult::pass("Daemon health", "ok"));
} else {
results.push(DiagnosticResult::fail(
"Daemon health",
&format!("unexpected status: {}", response.status),
"Run: systemctl --user restart bubbaloop-daemon",
));
}
}
Err(e) => {
results.push(DiagnosticResult::fail(
"Daemon health",
&format!("query failed: {}", e),
"Check if daemon is connected to the same Zenoh router. Run: systemctl --user status bubbaloop-daemon",
));
}
}
// Query nodes epage 1 / 14next ›