Function bodies 21 total
initialize_gnss_uart_f function · c · L48-L105 (58 LOC)buoy_combo.h
void initialize_gnss_uart_f() {
Serial.println(F("=== Initializing ZED-F9P via UART ==="));
SerialBT.println(F("=== Initializing ZED-F9P via UART ==="));
Serial.print(F("TX_GPS pin: "));
SerialBT.print(F("TX_GPS pin: "));
Serial.println(TX_GPS);
SerialBT.println(TX_GPS);
Serial.print(F("RX_GPS pin: "));
SerialBT.print(F("RX_GPS pin: "));
Serial.println(RX_GPS);
SerialBT.println(RX_GPS);
const long baudRates[] = {115200, 115200, 115200, 115200, 115200};
const int numRates = 5;
for (int i = 0; i < numRates; i++) {
Serial.print(F("Trying "));
SerialBT.print(F("Trying "));
Serial.print(baudRates[i]);
SerialBT.print(baudRates[i]);
Serial.println(F(" baud..."));
SerialBT.println(F(" baud..."));
gpsSerial.begin(baudRates[i], SERIAL_8N1, RX_GPS, TX_GPS);
delay(1000); // Give more time
// Try to get any response
Serial.println(F(" Attempting myGNSS.begin()..."));
SerialBT.println(F(" Attempting myGNSS.begnetwork_status_check_f function · c · L106-L207 (102 LOC)buoy_combo.h
void network_status_check_f() {
// Serial.println("START network_status_check");
if (!networkConnected) {
Serial.println(F("=== Checking Network Status ==="));
SerialBT.println(F("=== Checking Network Status ==="));
// Check signal strength first
uint8_t rssi = modem.getRSSI();
Serial.print(F("Signal strength (RSSI): "));
SerialBT.print(F("Signal strength (RSSI): "));
Serial.print(rssi);
SerialBT.print(rssi);
if (rssi == 0) {
Serial.println(F(" (No signal)"));
SerialBT.println(F(" (No signal)"));
} else if (rssi == 99) {
Serial.println(F(" (Unknown)"));
SerialBT.println(F(" (Unknown)"));
} else if (rssi < 10) {
Serial.println(F(" (Poor)"));
SerialBT.println(F(" (Poor)"));
} else if (rssi < 15) {
Serial.println(F(" (Fair)"));
SerialBT.println(F(" (Fair)"));
} else if (rssi < 20) {
Serial.println(F(" (Good)"));
SerialBT.println(F(" (Good)"));
} else {
Seriaenable_gprs_f function · c · L210-L301 (92 LOC)buoy_combo.h
void enable_gprs_f() {
// Serial.println("START enable_gprs_f");
if (networkConnected && !gprsEnabled) {
Serial.println(F("=== Starting GPRS Enable Process ==="));
SerialBT.println(F("=== Starting GPRS Enable Process ==="));
// Step 1: Check signal strength first
uint8_t rssi = modem.getRSSI();
Serial.print(F("Signal strength (RSSI): "));
SerialBT.print(F("Signal strength (RSSI): "));
Serial.println(rssi);
SerialBT.println(rssi);
if (rssi == 0 || rssi == 99) {
Serial.println(F("Poor/no signal - waiting before GPRS attempt"));
SerialBT.println(F("Poor/no signal - waiting before GPRS attempt"));
delay(5000);
return; // Try again next loop
}
// Step 2: Ensure we're really registered (sometimes status lies)
Serial.println(F("Double-checking network registration..."));
SerialBT.println(F("Double-checking network registration..."));
uint8_t n = modem.getNetworkStatus();
if (n != 1 && n != 5) {
handleNTRIPData function · c · L511-L560 (50 LOC)buoy_combo.h
void handleNTRIPData() {
// if (!modem.TCPconnected()) {
// Serial.println(F("TCP lost - closing socket"));
// modem.sendCheckReply(F("AT+CIPCLOSE"), F("CLOSE OK"), 5000);
// ntripConnected = false;
// return;
// }
uint16_t available = modem.TCPavailable();
if (available == 0) {
// Only disconnect after real timeout, not a single bad AT response
if (millis() - lastReceivedRTCM_ms > maxTimeBeforeHangup_ms) {
Serial.println(F("RTCM timeout. Disconnecting..."));
SerialBT.println(F("RTCM timeout. Disconnecting..."));
modem.sendCheckReply(F("AT+CIPCLOSE"), F("CLOSE OK"), 5000);
ntripConnected = false;
}
return;
}
uint8_t rtcmBuffer[256];
uint16_t totalSent = 0;
int readCount = 0;
while (modem.TCPavailable() > 0 && readCount < 40) {
uint16_t bytesRead = modem.TCPread(rtcmBuffer, 250);
if (bytesRead > 0 && gpsUARTOnline) {
gpsSerial.write(rtcmBuffer, bytesRead);
totalSent += bytesRead;
monitor_connection_health function · c · L561-L600 (40 LOC)buoy_combo.h
void monitor_connection_health() {
if (ntripConnected) return;
// Serial.println("START monitor_connection_health");
static unsigned long lastHealthCheck = 0;
if (millis() - lastHealthCheck > 30000) { // Check every 30 seconds
lastHealthCheck = millis();
Serial.println(F("=== Connection Health Check ==="));
SerialBT.println(F("=== Connection Health Check ==="));
if (networkConnected) {
uint8_t status = modem.getNetworkStatus();
uint8_t rssi = modem.getRSSI();
Serial.print(F("Network: "));
SerialBT.print(F("Network: "));
Serial.print(status);
SerialBT.print(status);
Serial.print(F(", Signal: "));
SerialBT.print(F(", Signal: "));
Serial.println(rssi);
SerialBT.println(rssi);
// Reset flags if network is lost
if (status != 1 && status != 5) {
Serial.println(F("Network lost - resetting connection flags"));
SerialBT.println(F("Network lost - resetting cprintDebugStatus function · c · L603-L636 (34 LOC)buoy_combo.h
void printDebugStatus() {
Serial.println(F("=== DEBUG STATUS ==="));
SerialBT.println(F("=== DEBUG STATUS ==="));
Serial.print(F("networkConnected = "));
SerialBT.print(F("networkConnected = "));
Serial.println(networkConnected ? "true" : "false");
SerialBT.println(networkConnected ? "true" : "false");
Serial.print(F("gprsEnabled = "));
SerialBT.print(F("gprsEnabled = "));
Serial.println(gprsEnabled ? "true" : "false");
SerialBT.println(gprsEnabled ? "true" : "false");
Serial.print(F("gpsEnabled = "));
SerialBT.print(F("gpsEnabled = "));
Serial.println(gpsEnabled ? "true" : "false");
SerialBT.println(gpsEnabled ? "true" : "false");
Serial.print(F("ntripConnected = "));
SerialBT.print(F("ntripConnected = "));
Serial.println(ntripConnected ? "true" : "false");
SerialBT.println(ntripConnected ? "true" : "false");
Serial.print(F("lastNTRIPAttempt = "));
SerialBT.print(F("lastNTRIPAttempt = "));
Serial.println(lastNTRIPAttempt);
SerialBT.println(lupdateStatusLED function · c · L637-L646 (10 LOC)buoy_combo.h
void updateStatusLED() {
if (ntripConnected) {
digitalWrite(STATUS_LED, HIGH); // Solid = NTRIP active
return;
}
digitalWrite(STATUS_LED, LOW);
}Repobility — the code-quality scanner for AI-generated software · https://repobility.com
shutdownISR function · c · L647-L650 (4 LOC)buoy_combo.h
void IRAM_ATTR shutdownISR() {
shutdownRequested = true;
}gracefulShutdown function · c · L651-L706 (56 LOC)buoy_combo.h
void gracefulShutdown() {
Serial.println(F("\n=== SHUTDOWN REQUESTED ==="));
SerialBT.println(F("\n=== SHUTDOWN REQUESTED ==="));
// Blink LED 3 times to confirm shutdown
for (int i = 0; i < 3; i++) {
digitalWrite(STATUS_LED, HIGH);
delay(100);
digitalWrite(STATUS_LED, LOW);
delay(100);
}
// Close NTRIP/TCP connection
if (ntripConnected) {
Serial.println(F("Closing NTRIP connection..."));
SerialBT.println(F("Closing NTRIP connection..."));
modem.sendCheckReply(F("AT+CIPCLOSE"), F("CLOSE OK"), 5000);
ntripConnected = false;
delay(1000);
}
// Disable GPRS
if (gprsEnabled) {
Serial.println(F("Disabling GPRS..."));
SerialBT.println(F("Disabling GPRS..."));
modem.enableGPRS(false);
gprsEnabled = false;
delay(1000);
}
// Power down modem
Serial.println(F("Powering down modem..."));
SerialBT.println(F("Powering down modem..."));
modem.sendCheckReply(F("AT+CPOWD=1"), F("NORMAL POWER DOWN"), 5000)parse_ubx_file function · python · L11-L75 (65 LOC)ubx_parser.py
def parse_ubx_file(filename, output_csv=None):
"""
Parse UBX file and extract position data
Args:
filename (str): Path to UBX file
output_csv (str): Optional CSV output filename
"""
positions = []
with open(filename, 'rb') as stream:
ubr = UBXReader(stream)
print("Parsing UBX file...")
print("-" * 50)
for (raw_data, parsed_data) in ubr:
# Focus on NAV-PVT messages (Position, Velocity, Time)
if hasattr(parsed_data, 'identity') and parsed_data.identity == 'NAV-PVT':
# Extract key position data with error handling for different UBX versions
position_data = {
'timestamp': getattr(parsed_data, 'iTOW', 0) / 1000.0, # GPS time of week in seconds
'year': getattr(parsed_data, 'year', 0),
'month': getattr(parsed_data, 'month', 0),
'day': getget_fix_type_description function · python · L77-L87 (11 LOC)ubx_parser.py
def get_fix_type_description(fix_type):
"""Convert fix type number to description"""
fix_types = {
0: "No Fix",
1: "Dead Reckoning",
2: "2D Fix",
3: "3D Fix",
4: "GNSS + Dead Reckoning",
5: "Time Only"
}
return fix_types.get(fix_type, f"Unknown ({fix_type})")save_to_csv function · python · L89-L102 (14 LOC)ubx_parser.py
def save_to_csv(positions, filename):
"""Save position data to CSV file"""
fieldnames = [
'timestamp', 'year', 'month', 'day', 'hour', 'minute', 'second',
'latitude', 'longitude', 'altitude_msl', 'altitude_ellipsoid',
'horizontal_accuracy', 'vertical_accuracy', 'fix_type', 'num_satellites',
'speed_2d', 'heading'
]
with open(filename, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(positions)analyze_accuracy function · python · L104-L138 (35 LOC)ubx_parser.py
def analyze_accuracy(positions):
"""Analyze the accuracy statistics of the position data"""
if not positions:
print("No position data to analyze")
return
h_accuracies = [p['horizontal_accuracy'] for p in positions if p['horizontal_accuracy'] > 0]
v_accuracies = [p['vertical_accuracy'] for p in positions if p['vertical_accuracy'] > 0]
print("Accuracy Analysis:")
print("-" * 30)
if h_accuracies:
print(f"Horizontal Accuracy:")
print(f" Mean: {sum(h_accuracies)/len(h_accuracies):.4f} m")
print(f" Min: {min(h_accuracies):.4f} m")
print(f" Max: {max(h_accuracies):.4f} m")
if v_accuracies:
print(f"Vertical Accuracy:")
print(f" Mean: {sum(v_accuracies)/len(v_accuracies):.4f} m")
print(f" Min: {min(v_accuracies):.4f} m")
print(f" Max: {max(v_accuracies):.4f} m")
# Count fix types
fix_counts = {}
for pos in positions:
fix_typeparse_ubx_file function · python · L13-L116 (104 LOC)v2_ubx_parser.py
def parse_ubx_file(filename, output_csv=None):
"""
Parse UBX file and extract position data
Args:
filename (str): Path to UBX file
output_csv (str): Optional CSV output filename
"""
positions = []
with open(filename, 'rb') as stream:
ubr = UBXReader(stream)
print("Parsing UBX file...")
print("-" * 50)
for (raw_data, parsed_data) in ubr:
# Focus on NAV-PVT messages (Position, Velocity, Time)
if hasattr(parsed_data, 'identity') and parsed_data.identity == 'NAV-PVT':
# Extract key position data
# NOTE: pyubx2 already converts lat/lon to degrees and heights to meters!
# Check the raw values to understand what pyubx2 gives us
raw_lat = getattr(parsed_data, 'lat', 0)
raw_lon = getattr(parsed_data, 'lon', 0)
raw_hMSL = getattr(parsed_data, 'hMSL', 0)
get_fix_type_description function · python · L118-L128 (11 LOC)v2_ubx_parser.py
def get_fix_type_description(fix_type):
"""Convert fix type number to description"""
fix_types = {
0: "No Fix",
1: "Dead Reckoning",
2: "2D Fix",
3: "3D Fix",
4: "GNSS + Dead Reckoning",
5: "Time Only"
}
return fix_types.get(fix_type, f"Unknown ({fix_type})")Repobility · severity-and-effort ranking · https://repobility.com
get_carrier_solution_description function · python · L130-L137 (8 LOC)v2_ubx_parser.py
def get_carrier_solution_description(carr_soln):
"""Convert carrier solution number to description"""
solutions = {
0: "No RTK",
1: "RTK Float",
2: "RTK Fix"
}
return solutions.get(carr_soln, f"Unknown ({carr_soln})")save_to_csv function · python · L139-L153 (15 LOC)v2_ubx_parser.py
def save_to_csv(positions, filename):
"""Save position data to CSV file"""
fieldnames = [
'timestamp', 'year', 'month', 'day', 'hour', 'minute', 'second',
'latitude', 'longitude', 'altitude_msl', 'altitude_ellipsoid',
'horizontal_accuracy', 'vertical_accuracy', 'fix_type',
'carrier_solution', 'num_satellites', 'pDOP',
'speed_2d', 'heading', 'flags'
]
with open(filename, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(positions)analyze_accuracy function · python · L155-L214 (60 LOC)v2_ubx_parser.py
def analyze_accuracy(positions):
"""Analyze the accuracy statistics of the position data"""
if not positions:
print("No position data to analyze")
return
h_accuracies = [p['horizontal_accuracy'] for p in positions if p['horizontal_accuracy'] > 0]
v_accuracies = [p['vertical_accuracy'] for p in positions if p['vertical_accuracy'] > 0]
print("\n" + "=" * 50)
print("ACCURACY ANALYSIS")
print("=" * 50)
if h_accuracies:
print(f"\nHorizontal Accuracy:")
print(f" Mean: {sum(h_accuracies)/len(h_accuracies):.4f} m")
print(f" Min: {min(h_accuracies):.4f} m")
print(f" Max: {max(h_accuracies):.4f} m")
if v_accuracies:
print(f"\nVertical Accuracy:")
print(f" Mean: {sum(v_accuracies)/len(v_accuracies):.4f} m")
print(f" Min: {min(v_accuracies):.4f} m")
print(f" Max: {max(v_accuracies):.4f} m")
# Count fix types
print(f"\nFix Type Distributparse_ubx_file function · python · L11-L157 (147 LOC)v3_ubx_parser.py
def parse_ubx_file(filename, output_csv=None):
# store messages by timestamp so we can match them up later
pvt_msgs = {}
hppos_msgs = {}
with open(filename, 'rb') as f:
ubr = UBXReader(f)
print("Parsing...")
counts = {'NAV-PVT': 0, 'NAV-HPPOSLLH': 0}
for (raw, parsed) in ubr:
if not hasattr(parsed, 'identity'):
continue
ident = parsed.identity
iTOW = getattr(parsed, 'iTOW', None)
if iTOW is None:
continue
if ident == 'NAV-PVT':
counts['NAV-PVT'] += 1
pvt_msgs[iTOW] = {
'iTOW': iTOW,
'year': getattr(parsed, 'year', 0),
'month': getattr(parsed, 'month', 0),
'day': getattr(parsed, 'day', 0),
'hour': getattr(parsed, 'hour', 0),
'minute': getattr(save_csv function · python · L160-L173 (14 LOC)v3_ubx_parser.py
def save_csv(positions, filename):
fields = [
'timestamp', 'year', 'month', 'day', 'hour', 'minute', 'second',
'latitude', 'longitude', 'altitude_msl', 'altitude_ellipsoid',
'horizontal_accuracy', 'vertical_accuracy', 'fix_type',
'carrier_solution', 'num_satellites', 'pDOP',
'speed_2d', 'heading', 'vel_north', 'vel_east', 'vel_down',
'flags', 'source'
]
with open(filename, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fields)
writer.writeheader()
writer.writerows(positions)analyze function · python · L176-L222 (47 LOC)v3_ubx_parser.py
def analyze(positions):
if not positions:
print("No data")
return
print("\n" + "="*50)
print("ANALYSIS")
print("="*50)
# fix type breakdown
print("\nFix types:")
fix_names = {0: "No Fix", 1: "DR", 2: "2D", 3: "3D", 4: "GNSS+DR", 5: "Time"}
fix_counts = {}
for p in positions:
ft = fix_names.get(p['fix_type'], str(p['fix_type']))
fix_counts[ft] = fix_counts.get(ft, 0) + 1
for ft, cnt in sorted(fix_counts.items()):
print(f" {ft}: {cnt} ({cnt/len(positions)*100:.1f}%)")
# RTK status
print("\nRTK status:")
rtk_names = {0: "None", 1: "Float", 2: "Fix"}
rtk_counts = {}
for p in positions:
rs = rtk_names.get(p['carrier_solution'], str(p['carrier_solution']))
rtk_counts[rs] = rtk_counts.get(rs, 0) + 1
for rs, cnt in sorted(rtk_counts.items()):
print(f" {rs}: {cnt} ({cnt/len(positions)*100:.1f}%)")
# position stats for RTK fix only
rtk_fix