← back to missioncreep11__rtk-wave-buoy

Function bodies 21 total

All specs Real LLM only Function bodies
initialize_gnss_uart_f function · c · L48-L105 (58 LOC)
buoy_combo.h

void initialize_gnss_uart_f() {
  Serial.println(F("=== Initializing ZED-F9P via UART ==="));
  SerialBT.println(F("=== Initializing ZED-F9P via UART ==="));
  Serial.print(F("TX_GPS pin: "));
  SerialBT.print(F("TX_GPS pin: "));
  Serial.println(TX_GPS);
  SerialBT.println(TX_GPS);
  Serial.print(F("RX_GPS pin: "));
  SerialBT.print(F("RX_GPS pin: "));
  Serial.println(RX_GPS);
  SerialBT.println(RX_GPS);
  
  const long baudRates[] = {115200, 115200, 115200, 115200, 115200};
  const int numRates = 5;
  
  for (int i = 0; i < numRates; i++) {
    Serial.print(F("Trying "));
    SerialBT.print(F("Trying "));
    Serial.print(baudRates[i]);
    SerialBT.print(baudRates[i]);
    Serial.println(F(" baud..."));
    SerialBT.println(F(" baud..."));
    
    gpsSerial.begin(baudRates[i], SERIAL_8N1, RX_GPS, TX_GPS);
    delay(1000);  // Give more time
    
    // Try to get any response
    Serial.println(F("  Attempting myGNSS.begin()..."));
    SerialBT.println(F("  Attempting myGNSS.beg
network_status_check_f function · c · L106-L207 (102 LOC)
buoy_combo.h

void network_status_check_f() {
  // Serial.println("START network_status_check");
  if (!networkConnected) {
    Serial.println(F("=== Checking Network Status ==="));
    SerialBT.println(F("=== Checking Network Status ==="));
    
    // Check signal strength first
    uint8_t rssi = modem.getRSSI();
    Serial.print(F("Signal strength (RSSI): "));
    SerialBT.print(F("Signal strength (RSSI): "));
    Serial.print(rssi);
    SerialBT.print(rssi);
    if (rssi == 0) {
      Serial.println(F(" (No signal)"));
      SerialBT.println(F(" (No signal)"));
    } else if (rssi == 99) {
      Serial.println(F(" (Unknown)"));
      SerialBT.println(F(" (Unknown)"));
    } else if (rssi < 10) {
      Serial.println(F(" (Poor)"));
      SerialBT.println(F(" (Poor)"));
    } else if (rssi < 15) {
      Serial.println(F(" (Fair)"));
      SerialBT.println(F(" (Fair)"));
    } else if (rssi < 20) {
      Serial.println(F(" (Good)"));
      SerialBT.println(F(" (Good)"));
    } else {
      Seria
enable_gprs_f function · c · L210-L301 (92 LOC)
buoy_combo.h
void enable_gprs_f() {
  // Serial.println("START enable_gprs_f");
  if (networkConnected && !gprsEnabled) {
    Serial.println(F("=== Starting GPRS Enable Process ==="));
    SerialBT.println(F("=== Starting GPRS Enable Process ==="));
    // Step 1: Check signal strength first
    uint8_t rssi = modem.getRSSI();
    Serial.print(F("Signal strength (RSSI): "));
    SerialBT.print(F("Signal strength (RSSI): "));
    Serial.println(rssi);
    SerialBT.println(rssi);
    
    if (rssi == 0 || rssi == 99) {
      Serial.println(F("Poor/no signal - waiting before GPRS attempt"));
      SerialBT.println(F("Poor/no signal - waiting before GPRS attempt"));
      delay(5000);
      return; // Try again next loop
    }
    
    // Step 2: Ensure we're really registered (sometimes status lies)
    Serial.println(F("Double-checking network registration..."));
    SerialBT.println(F("Double-checking network registration..."));
    uint8_t n = modem.getNetworkStatus();
    if (n != 1 && n != 5) {
 
handleNTRIPData function · c · L511-L560 (50 LOC)
buoy_combo.h


void handleNTRIPData() {
  // if (!modem.TCPconnected()) {
  //   Serial.println(F("TCP lost - closing socket"));
  //   modem.sendCheckReply(F("AT+CIPCLOSE"), F("CLOSE OK"), 5000);
  //   ntripConnected = false;
  //   return;
  // }
  
  uint16_t available = modem.TCPavailable();
  
  if (available == 0) {
    // Only disconnect after real timeout, not a single bad AT response
    if (millis() - lastReceivedRTCM_ms > maxTimeBeforeHangup_ms) {
      Serial.println(F("RTCM timeout. Disconnecting..."));
      SerialBT.println(F("RTCM timeout. Disconnecting..."));
      modem.sendCheckReply(F("AT+CIPCLOSE"), F("CLOSE OK"), 5000);
      ntripConnected = false;
    }
    return;
  }  
  uint8_t rtcmBuffer[256];
  uint16_t totalSent = 0;
  int readCount = 0;
  
  while (modem.TCPavailable() > 0 && readCount < 40) {
    uint16_t bytesRead = modem.TCPread(rtcmBuffer, 250);
    if (bytesRead > 0 && gpsUARTOnline) {
      gpsSerial.write(rtcmBuffer, bytesRead);
      totalSent += bytesRead;
monitor_connection_health function · c · L561-L600 (40 LOC)
buoy_combo.h
void monitor_connection_health() {
  if (ntripConnected) return;
  // Serial.println("START monitor_connection_health");
  static unsigned long lastHealthCheck = 0;
  
  if (millis() - lastHealthCheck > 30000) { // Check every 30 seconds
    lastHealthCheck = millis();
    
    Serial.println(F("=== Connection Health Check ==="));
    SerialBT.println(F("=== Connection Health Check ==="));
    
    if (networkConnected) {
      uint8_t status = modem.getNetworkStatus();
      uint8_t rssi = modem.getRSSI();
      
      Serial.print(F("Network: "));
      SerialBT.print(F("Network: "));
      Serial.print(status);
      SerialBT.print(status);
      Serial.print(F(", Signal: "));
      SerialBT.print(F(", Signal: "));
      Serial.println(rssi);
      SerialBT.println(rssi);
      
      // Reset flags if network is lost
      if (status != 1 && status != 5) {
        Serial.println(F("Network lost - resetting connection flags"));
        SerialBT.println(F("Network lost - resetting c
printDebugStatus function · c · L603-L636 (34 LOC)
buoy_combo.h
void printDebugStatus() {
  Serial.println(F("=== DEBUG STATUS ==="));
  SerialBT.println(F("=== DEBUG STATUS ==="));
  Serial.print(F("networkConnected = "));
  SerialBT.print(F("networkConnected = "));
  Serial.println(networkConnected ? "true" : "false");
  SerialBT.println(networkConnected ? "true" : "false");
  Serial.print(F("gprsEnabled = "));
  SerialBT.print(F("gprsEnabled = "));
  Serial.println(gprsEnabled ? "true" : "false");
  SerialBT.println(gprsEnabled ? "true" : "false");
  Serial.print(F("gpsEnabled = "));
  SerialBT.print(F("gpsEnabled = "));
  Serial.println(gpsEnabled ? "true" : "false");
  SerialBT.println(gpsEnabled ? "true" : "false");
  Serial.print(F("ntripConnected = "));
  SerialBT.print(F("ntripConnected = "));
  Serial.println(ntripConnected ? "true" : "false");
  SerialBT.println(ntripConnected ? "true" : "false");
  Serial.print(F("lastNTRIPAttempt = "));
  SerialBT.print(F("lastNTRIPAttempt = "));
  Serial.println(lastNTRIPAttempt);
  SerialBT.println(l
updateStatusLED function · c · L637-L646 (10 LOC)
buoy_combo.h
void updateStatusLED() {

  if (ntripConnected) {
    digitalWrite(STATUS_LED, HIGH);  // Solid = NTRIP active
    return;
  }
  
  digitalWrite(STATUS_LED, LOW);
}
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
shutdownISR function · c · L647-L650 (4 LOC)
buoy_combo.h
void IRAM_ATTR shutdownISR() {
  shutdownRequested = true;
}
gracefulShutdown function · c · L651-L706 (56 LOC)
buoy_combo.h
void gracefulShutdown() {
  Serial.println(F("\n=== SHUTDOWN REQUESTED ==="));
  SerialBT.println(F("\n=== SHUTDOWN REQUESTED ==="));
  
  // Blink LED 3 times to confirm shutdown
  for (int i = 0; i < 3; i++) {
    digitalWrite(STATUS_LED, HIGH);
    delay(100);
    digitalWrite(STATUS_LED, LOW);
    delay(100);
  }
  
  // Close NTRIP/TCP connection
  if (ntripConnected) {
    Serial.println(F("Closing NTRIP connection..."));
    SerialBT.println(F("Closing NTRIP connection..."));
    modem.sendCheckReply(F("AT+CIPCLOSE"), F("CLOSE OK"), 5000);
    ntripConnected = false;
    delay(1000);
  }
  
  // Disable GPRS
  if (gprsEnabled) {
    Serial.println(F("Disabling GPRS..."));
    SerialBT.println(F("Disabling GPRS..."));
    modem.enableGPRS(false);
    gprsEnabled = false;
    delay(1000);
  }
  
  // Power down modem
  Serial.println(F("Powering down modem..."));
  SerialBT.println(F("Powering down modem..."));
  modem.sendCheckReply(F("AT+CPOWD=1"), F("NORMAL POWER DOWN"), 5000)
parse_ubx_file function · python · L11-L75 (65 LOC)
ubx_parser.py
def parse_ubx_file(filename, output_csv=None):
    """
    Parse UBX file and extract position data
    
    Args:
        filename (str): Path to UBX file
        output_csv (str): Optional CSV output filename
    """
    
    positions = []
    
    with open(filename, 'rb') as stream:
        ubr = UBXReader(stream)
        
        print("Parsing UBX file...")
        print("-" * 50)
        
        for (raw_data, parsed_data) in ubr:
            # Focus on NAV-PVT messages (Position, Velocity, Time)
            if hasattr(parsed_data, 'identity') and parsed_data.identity == 'NAV-PVT':
                
                # Extract key position data with error handling for different UBX versions
                position_data = {
                    'timestamp': getattr(parsed_data, 'iTOW', 0) / 1000.0,  # GPS time of week in seconds
                    'year': getattr(parsed_data, 'year', 0),
                    'month': getattr(parsed_data, 'month', 0),
                    'day': get
get_fix_type_description function · python · L77-L87 (11 LOC)
ubx_parser.py
def get_fix_type_description(fix_type):
    """Convert fix type number to description"""
    fix_types = {
        0: "No Fix",
        1: "Dead Reckoning",
        2: "2D Fix",
        3: "3D Fix", 
        4: "GNSS + Dead Reckoning",
        5: "Time Only"
    }
    return fix_types.get(fix_type, f"Unknown ({fix_type})")
save_to_csv function · python · L89-L102 (14 LOC)
ubx_parser.py
def save_to_csv(positions, filename):
    """Save position data to CSV file"""
    
    fieldnames = [
        'timestamp', 'year', 'month', 'day', 'hour', 'minute', 'second',
        'latitude', 'longitude', 'altitude_msl', 'altitude_ellipsoid',
        'horizontal_accuracy', 'vertical_accuracy', 'fix_type', 'num_satellites',
        'speed_2d', 'heading'
    ]
    
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(positions)
analyze_accuracy function · python · L104-L138 (35 LOC)
ubx_parser.py
def analyze_accuracy(positions):
    """Analyze the accuracy statistics of the position data"""
    
    if not positions:
        print("No position data to analyze")
        return
    
    h_accuracies = [p['horizontal_accuracy'] for p in positions if p['horizontal_accuracy'] > 0]
    v_accuracies = [p['vertical_accuracy'] for p in positions if p['vertical_accuracy'] > 0]
    
    print("Accuracy Analysis:")
    print("-" * 30)
    
    if h_accuracies:
        print(f"Horizontal Accuracy:")
        print(f"  Mean: {sum(h_accuracies)/len(h_accuracies):.4f} m")
        print(f"  Min:  {min(h_accuracies):.4f} m")
        print(f"  Max:  {max(h_accuracies):.4f} m")
    
    if v_accuracies:
        print(f"Vertical Accuracy:")
        print(f"  Mean: {sum(v_accuracies)/len(v_accuracies):.4f} m")
        print(f"  Min:  {min(v_accuracies):.4f} m")
        print(f"  Max:  {max(v_accuracies):.4f} m")
    
    # Count fix types
    fix_counts = {}
    for pos in positions:
        fix_type
parse_ubx_file function · python · L13-L116 (104 LOC)
v2_ubx_parser.py
def parse_ubx_file(filename, output_csv=None):
    """
    Parse UBX file and extract position data
    
    Args:
        filename (str): Path to UBX file
        output_csv (str): Optional CSV output filename
    """
    
    positions = []
    
    with open(filename, 'rb') as stream:
        ubr = UBXReader(stream)
        
        print("Parsing UBX file...")
        print("-" * 50)
        
        for (raw_data, parsed_data) in ubr:
            # Focus on NAV-PVT messages (Position, Velocity, Time)
            if hasattr(parsed_data, 'identity') and parsed_data.identity == 'NAV-PVT':
                
                # Extract key position data
                # NOTE: pyubx2 already converts lat/lon to degrees and heights to meters!
                # Check the raw values to understand what pyubx2 gives us
                raw_lat = getattr(parsed_data, 'lat', 0)
                raw_lon = getattr(parsed_data, 'lon', 0)
                raw_hMSL = getattr(parsed_data, 'hMSL', 0)
    
get_fix_type_description function · python · L118-L128 (11 LOC)
v2_ubx_parser.py
def get_fix_type_description(fix_type):
    """Convert fix type number to description"""
    fix_types = {
        0: "No Fix",
        1: "Dead Reckoning",
        2: "2D Fix",
        3: "3D Fix", 
        4: "GNSS + Dead Reckoning",
        5: "Time Only"
    }
    return fix_types.get(fix_type, f"Unknown ({fix_type})")
Repobility · severity-and-effort ranking · https://repobility.com
get_carrier_solution_description function · python · L130-L137 (8 LOC)
v2_ubx_parser.py
def get_carrier_solution_description(carr_soln):
    """Convert carrier solution number to description"""
    solutions = {
        0: "No RTK",
        1: "RTK Float",
        2: "RTK Fix"
    }
    return solutions.get(carr_soln, f"Unknown ({carr_soln})")
save_to_csv function · python · L139-L153 (15 LOC)
v2_ubx_parser.py
def save_to_csv(positions, filename):
    """Save position data to CSV file"""
    
    fieldnames = [
        'timestamp', 'year', 'month', 'day', 'hour', 'minute', 'second',
        'latitude', 'longitude', 'altitude_msl', 'altitude_ellipsoid',
        'horizontal_accuracy', 'vertical_accuracy', 'fix_type', 
        'carrier_solution', 'num_satellites', 'pDOP',
        'speed_2d', 'heading', 'flags'
    ]
    
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(positions)
analyze_accuracy function · python · L155-L214 (60 LOC)
v2_ubx_parser.py
def analyze_accuracy(positions):
    """Analyze the accuracy statistics of the position data"""
    
    if not positions:
        print("No position data to analyze")
        return
    
    h_accuracies = [p['horizontal_accuracy'] for p in positions if p['horizontal_accuracy'] > 0]
    v_accuracies = [p['vertical_accuracy'] for p in positions if p['vertical_accuracy'] > 0]
    
    print("\n" + "=" * 50)
    print("ACCURACY ANALYSIS")
    print("=" * 50)
    
    if h_accuracies:
        print(f"\nHorizontal Accuracy:")
        print(f"  Mean: {sum(h_accuracies)/len(h_accuracies):.4f} m")
        print(f"  Min:  {min(h_accuracies):.4f} m")
        print(f"  Max:  {max(h_accuracies):.4f} m")
    
    if v_accuracies:
        print(f"\nVertical Accuracy:")
        print(f"  Mean: {sum(v_accuracies)/len(v_accuracies):.4f} m")
        print(f"  Min:  {min(v_accuracies):.4f} m")
        print(f"  Max:  {max(v_accuracies):.4f} m")
    
    # Count fix types
    print(f"\nFix Type Distribut
parse_ubx_file function · python · L11-L157 (147 LOC)
v3_ubx_parser.py
def parse_ubx_file(filename, output_csv=None):
    
    # store messages by timestamp so we can match them up later
    pvt_msgs = {}
    hppos_msgs = {}
    
    with open(filename, 'rb') as f:
        ubr = UBXReader(f)
        
        print("Parsing...")
        counts = {'NAV-PVT': 0, 'NAV-HPPOSLLH': 0}
        
        for (raw, parsed) in ubr:
            if not hasattr(parsed, 'identity'):
                continue
                
            ident = parsed.identity
            iTOW = getattr(parsed, 'iTOW', None)
            if iTOW is None:
                continue
            
            if ident == 'NAV-PVT':
                counts['NAV-PVT'] += 1
                pvt_msgs[iTOW] = {
                    'iTOW': iTOW,
                    'year': getattr(parsed, 'year', 0),
                    'month': getattr(parsed, 'month', 0),
                    'day': getattr(parsed, 'day', 0),
                    'hour': getattr(parsed, 'hour', 0),
                    'minute': getattr(
save_csv function · python · L160-L173 (14 LOC)
v3_ubx_parser.py
def save_csv(positions, filename):
    fields = [
        'timestamp', 'year', 'month', 'day', 'hour', 'minute', 'second',
        'latitude', 'longitude', 'altitude_msl', 'altitude_ellipsoid',
        'horizontal_accuracy', 'vertical_accuracy', 'fix_type', 
        'carrier_solution', 'num_satellites', 'pDOP',
        'speed_2d', 'heading', 'vel_north', 'vel_east', 'vel_down',
        'flags', 'source'
    ]
    
    with open(filename, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=fields)
        writer.writeheader()
        writer.writerows(positions)
analyze function · python · L176-L222 (47 LOC)
v3_ubx_parser.py
def analyze(positions):
    if not positions:
        print("No data")
        return
    
    print("\n" + "="*50)
    print("ANALYSIS")
    print("="*50)
    
    # fix type breakdown
    print("\nFix types:")
    fix_names = {0: "No Fix", 1: "DR", 2: "2D", 3: "3D", 4: "GNSS+DR", 5: "Time"}
    fix_counts = {}
    for p in positions:
        ft = fix_names.get(p['fix_type'], str(p['fix_type']))
        fix_counts[ft] = fix_counts.get(ft, 0) + 1
    for ft, cnt in sorted(fix_counts.items()):
        print(f"  {ft}: {cnt} ({cnt/len(positions)*100:.1f}%)")
    
    # RTK status
    print("\nRTK status:")
    rtk_names = {0: "None", 1: "Float", 2: "Fix"}
    rtk_counts = {}
    for p in positions:
        rs = rtk_names.get(p['carrier_solution'], str(p['carrier_solution']))
        rtk_counts[rs] = rtk_counts.get(rs, 0) + 1
    for rs, cnt in sorted(rtk_counts.items()):
        print(f"  {rs}: {cnt} ({cnt/len(positions)*100:.1f}%)")
    
    # position stats for RTK fix only
    rtk_fix