Function bodies 48 total
check_speedtest_cli function · python · L30-L36 (7 LOC)nas-monitor/monitor.py
def check_speedtest_cli() -> bool:
"""Check if the official Ookla speedtest CLI is available."""
try:
result = subprocess.run(['speedtest', '--version'], capture_output=True, timeout=5)
return result.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired):
return FalseConfig.load method · python · L72-L109 (38 LOC)nas-monitor/monitor.py
def load(cls, config_path: str = 'config.yaml') -> 'Config':
"""Load configuration from YAML file and environment variables."""
config = cls()
# Load from YAML if exists
if os.path.exists(config_path):
with open(config_path, 'r') as f:
yaml_config = yaml.safe_load(f) or {}
for key, value in yaml_config.items():
if hasattr(config, key):
setattr(config, key, value)
# Override with environment variables
env_mappings = {
'PING_TARGETS': ('ping_targets', lambda x: x.split(',')),
'PING_INTERVAL': ('ping_interval_seconds', int),
'PING_TIMEOUT': ('ping_timeout_seconds', int),
'HEARTBEAT_INTERVAL': ('heartbeat_interval_seconds', int),
'VPS_URL': ('vps_url', str),
'LOG_DIRECTORY': ('log_directory', str),
'LOG_RETENTION_DAYS': ('log_retention_days', int),
'NTFY_SPingMonitor.ping method · python · L151-L178 (28 LOC)nas-monitor/monitor.py
def ping(self, target: str) -> tuple[bool, Optional[float]]:
"""Ping a target and return (success, latency_ms)."""
try:
if sys.platform == 'darwin': # macOS
cmd = ['ping', '-c', str(self.config.ping_count), '-t',
str(self.config.ping_timeout_seconds), target]
elif sys.platform == 'win32':
cmd = ['ping', '-n', str(self.config.ping_count), '-w',
str(self.config.ping_timeout_seconds * 1000), target]
else: # Linux
cmd = ['ping', '-c', str(self.config.ping_count), '-W',
str(self.config.ping_timeout_seconds), target]
result = subprocess.run(
cmd, capture_output=True, text=True,
timeout=self.config.ping_timeout_seconds * self.config.ping_count + 5
)
if result.returncode == 0:
latency = self._parse_ping_latency(result.stdout)
PingMonitor._parse_ping_latency method · python · L180-L192 (13 LOC)nas-monitor/monitor.py
def _parse_ping_latency(self, output: str) -> Optional[float]:
"""Extract average latency from ping output."""
try:
for line in output.split('\n'):
if 'avg' in line.lower() or 'average' in line.lower():
if '=' in line:
numbers_part = line.split('=')[1].strip()
parts = numbers_part.split('/')
if len(parts) >= 2:
return float(parts[1])
return None
except (ValueError, IndexError):
return NonePingMonitor.check_connectivity method · python · L194-L218 (25 LOC)nas-monitor/monitor.py
def check_connectivity(self) -> PingResult:
"""Check internet connectivity by pinging targets."""
timestamp = time.time()
datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
best_latency = None
any_success = False
successful_target = None
for target in self.config.ping_targets:
success, latency = self.ping(target)
if success:
any_success = True
successful_target = target
if latency is not None:
if best_latency is None or latency < best_latency:
best_latency = latency
break
return PingResult(
timestamp=timestamp,
datetime_str=datetime_str,
status='online' if any_success else 'outage',
ping_ms=best_latency,
target=successful_target or self.config.ping_targets[0]
)SpeedTester.run_test method · python · L228-L270 (43 LOC)nas-monitor/monitor.py
def run_test(self, trigger: str = 'manual') -> Optional[SpeedTestResult]:
"""Run a speed test by downloading file from VPS."""
url = f"{self.config.vps_url}/speedtest"
timestamp = time.time()
datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
start_time = time.time()
response = self.session.get(url, timeout=60, stream=True)
if response.status_code != 200:
logger.error(f"Speed test failed: HTTP {response.status_code}")
return None
# Download and measure
total_bytes = 0
for chunk in response.iter_content(chunk_size=8192):
total_bytes += len(chunk)
end_time = time.time()
duration = end_time - start_time
if duration > 0:
speed_mbps = (total_bytes * 8) / (duration * 1_000_000)
else:
speed_mbps = 0
result = SpeedTestResuSpeedTester.run_ookla_test method · python · L272-L329 (58 LOC)nas-monitor/monitor.py
def run_ookla_test(self, trigger: str = 'manual') -> Optional[SpeedTestResult]:
"""Run a full Ookla speed test using official CLI (slower but more accurate)."""
if not SPEEDTEST_AVAILABLE:
logger.error("Ookla speedtest CLI not installed")
return None
timestamp = time.time()
datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
logger.info("Running Ookla speed test (this may take 30-60 seconds)...")
start_time = time.time()
# Run official Ookla CLI with JSON output
result = subprocess.run(
['speedtest', '--accept-license', '--accept-gdpr', '--format=json'],
capture_output=True,
text=True,
timeout=120
)
end_time = time.time()
duration = end_time - start_time
if result.returncode != 0:
logger.error(f"Ookla CLI failed: {result.stderr}Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
EventLogger._ensure_log_directory method · python · L341-L347 (7 LOC)nas-monitor/monitor.py
def _ensure_log_directory(self):
"""Create log directory if it doesn't exist."""
try:
self.log_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
logger.error(f"Failed to create log directory: {e}")
raiseEventLogger._cleanup_old_logs method · python · L349-L368 (20 LOC)nas-monitor/monitor.py
def _cleanup_old_logs(self):
"""Delete log files older than retention period."""
today = datetime.now().strftime('%Y-%m-%d')
if self._last_cleanup_date == today:
return
self._last_cleanup_date = today
cutoff_date = datetime.now() - timedelta(days=self.config.log_retention_days)
try:
for log_file in self.log_dir.glob('*.csv'):
try:
file_date = datetime.strptime(log_file.stem, '%Y-%m-%d')
if file_date < cutoff_date:
log_file.unlink()
logger.info(f"Deleted old log: {log_file.name}")
except (ValueError, OSError):
continue
except OSError as e:
logger.warning(f"Error during log cleanup: {e}")EventLogger.log_event method · python · L375-L393 (19 LOC)nas-monitor/monitor.py
def log_event(self, event_type: str, data: dict):
"""Log an event to the CSV file."""
self._cleanup_old_logs()
log_file = self._get_log_filename()
file_exists = log_file.exists()
try:
with open(log_file, 'a', newline='') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(['timestamp', 'datetime', 'event_type', 'details'])
timestamp = data.get('timestamp', time.time())
datetime_str = data.get('datetime_str', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
details = {k: v for k, v in data.items() if k not in ['timestamp', 'datetime_str']}
writer.writerow([timestamp, datetime_str, event_type, str(details)])
except OSError as e:
logger.error(f"Failed to write to log file: {e}")NtfyNotifier.send method · python · L403-L418 (16 LOC)nas-monitor/monitor.py
def send(self, title: str, message: str, priority: str = 'default', tags: list = None) -> bool:
"""Send a notification to ntfy."""
if not self.config.ntfy_topic:
return False
try:
url = f"{self.config.ntfy_server_url}/{self.config.ntfy_topic}"
headers = {'Title': title, 'Priority': priority}
if tags:
headers['Tags'] = ','.join(tags)
response = self.session.post(url, data=message.encode('utf-8'), headers=headers, timeout=10)
return response.status_code == 200
except requests.RequestException as e:
logger.error(f"Failed to send notification: {e}")
return FalseNtfyNotifier.notify_speed_test method · python · L420-L436 (17 LOC)nas-monitor/monitor.py
def notify_speed_test(self, result: SpeedTestResult):
"""Send speed test result notification."""
speed = result.speed_mbps
if speed < self.config.slow_speed_threshold_mbps:
title = f"Speed Test: {speed:.1f} Mbps (SLOW)"
priority = 'high'
tags = ['warning', 'speedboat']
else:
title = f"Speed Test: {speed:.1f} Mbps"
priority = 'default'
tags = ['white_check_mark', 'speedboat']
message = f"Type: {result.test_type}\nTrigger: {result.trigger}\nDownload: {speed:.1f} Mbps"
if result.upload_mbps is not None:
message += f"\nUpload: {result.upload_mbps:.1f} Mbps"
message += f"\nDuration: {result.duration_seconds:.1f}s"
self.send(title, message, priority, tags)VPSClient._get_boot_id method · python · L446-L452 (7 LOC)nas-monitor/monitor.py
def _get_boot_id(self) -> str:
"""Get the Linux boot ID (changes on every reboot)."""
try:
with open('/proc/sys/kernel/random/boot_id', 'r') as f:
return f.read().strip()
except Exception:
return ''VPSClient._get_uptime_seconds method · python · L454-L460 (7 LOC)nas-monitor/monitor.py
def _get_uptime_seconds(self) -> float:
"""Get system uptime in seconds."""
try:
with open('/proc/uptime', 'r') as f:
return float(f.read().split()[0])
except Exception:
return 0.0VPSClient.send_heartbeat method · python · L462-L478 (17 LOC)nas-monitor/monitor.py
def send_heartbeat(self) -> bool:
"""Send a heartbeat to the VPS."""
try:
response = self.session.post(
f"{self.config.vps_url}/heartbeat",
json={
'timestamp': time.time(),
'datetime': datetime.now().isoformat(),
'source': 'nas-monitor',
'boot_id': self._get_boot_id(),
'uptime_seconds': self._get_uptime_seconds(),
},
timeout=10
)
return response.status_code == 200
except requests.RequestException:
return FalseCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
VPSClient.send_outage_report method · python · L480-L497 (18 LOC)nas-monitor/monitor.py
def send_outage_report(self, outage: OutageEvent) -> bool:
"""Send an outage report to the VPS."""
try:
response = self.session.post(
f"{self.config.vps_url}/outage",
json={
'start_time': outage.start_time,
'start_datetime': outage.start_datetime,
'end_time': outage.end_time,
'end_datetime': outage.end_datetime,
'duration_seconds': outage.duration_seconds,
'source': 'nas-monitor'
},
timeout=10
)
return response.status_code == 200
except requests.RequestException:
return FalseVPSClient.send_event method · python · L499-L512 (14 LOC)nas-monitor/monitor.py
def send_event(self, event_type: str, data: dict) -> bool:
"""Send an event to the VPS for dashboard history."""
try:
response = self.session.post(
f"{self.config.vps_url}/api/events",
json={
'event_type': event_type,
'data': data
},
timeout=10
)
return response.status_code == 200
except requests.RequestException:
return FalseInternetMonitor.__init__ method · python · L518-L536 (19 LOC)nas-monitor/monitor.py
def __init__(self, config: Config):
self.config = config
self.ping_monitor = PingMonitor(config)
self.speed_tester = SpeedTester(config)
self.event_logger = EventLogger(config)
self.notifier = NtfyNotifier(config)
self.vps_client = VPSClient(config)
self.running = False
self.current_outage: Optional[OutageEvent] = None
self.pending_outages: Queue[OutageEvent] = Queue()
self.last_heartbeat_time = 0
self.last_status = 'online'
self.last_speed_test_time = 0
self.last_scheduled_test_time = 0
self.in_slow_speed_mode = False
self.speed_test_requested = threading.Event()
self._shutdown_event = threading.Event()InternetMonitor.request_speed_test method · python · L543-L563 (21 LOC)nas-monitor/monitor.py
def request_speed_test(self, use_ookla: bool = False) -> Optional[SpeedTestResult]:
"""Trigger a manual speed test."""
if use_ookla:
result = self.speed_tester.run_ookla_test(trigger='manual')
else:
result = self.speed_tester.run_test(trigger='manual')
if result:
log_data = {
'timestamp': result.timestamp,
'datetime_str': result.datetime_str,
'speed_mbps': result.speed_mbps,
'trigger': result.trigger,
'test_type': result.test_type
}
if result.upload_mbps is not None:
log_data['upload_mbps'] = result.upload_mbps
self._log_and_forward('speed_test', log_data)
self.notifier.notify_speed_test(result)
self._check_slow_speed(result)
return resultInternetMonitor.request_full_speed_test method · python · L565-L600 (36 LOC)nas-monitor/monitor.py
def request_full_speed_test(self) -> dict:
"""Run all speed tests (VPS + Ookla) and send combined notification."""
results = {}
# Run VPS download test
logger.info("Running full speed test suite...")
vps_result = self.speed_tester.run_test(trigger='manual_full')
if vps_result:
results['vps_download_mbps'] = vps_result.speed_mbps
self._log_and_forward('speed_test', {
'timestamp': vps_result.timestamp,
'datetime_str': vps_result.datetime_str,
'speed_mbps': vps_result.speed_mbps,
'trigger': 'manual_full',
'test_type': 'vps'
})
# Run Ookla test (download + upload)
ookla_result = self.speed_tester.run_ookla_test(trigger='manual_full')
if ookla_result:
results['ookla_download_mbps'] = ookla_result.speed_mbps
results['ookla_upload_mbps'] = ookla_result.upload_mbps
seInternetMonitor._notify_full_speed_test method · python · L602-L633 (32 LOC)nas-monitor/monitor.py
def _notify_full_speed_test(self, results: dict):
"""Send a combined notification for full speed test."""
lines = ["Full Speed Test Results:", ""]
vps_dl = results.get('vps_download_mbps')
ookla_dl = results.get('ookla_download_mbps')
ookla_ul = results.get('ookla_upload_mbps')
if vps_dl is not None:
lines.append(f"VPS Download: {vps_dl:.1f} Mbps")
if ookla_dl is not None:
lines.append(f"Ookla Download: {ookla_dl:.1f} Mbps")
if ookla_ul is not None:
lines.append(f"Ookla Upload: {ookla_ul:.1f} Mbps")
# Check if any download speed is slow (upload has different expectations)
threshold = self.config.slow_speed_threshold_mbps
is_slow = any(
v is not None and v < threshold
for v in [vps_dl, ookla_dl]
)
if is_slow:
title = "Full Speed Test: SLOW"
priority = 'high'
tags = ['warning', 'speeInternetMonitor._check_slow_speed method · python · L635-L644 (10 LOC)nas-monitor/monitor.py
def _check_slow_speed(self, result: SpeedTestResult):
"""Check if we should enter slow speed mode."""
if result.speed_mbps < self.config.slow_speed_threshold_mbps:
if not self.in_slow_speed_mode:
logger.warning(f"Entering slow speed mode ({result.speed_mbps:.1f} Mbps < {self.config.slow_speed_threshold_mbps} Mbps)")
self.in_slow_speed_mode = True
else:
if self.in_slow_speed_mode:
logger.info(f"Exiting slow speed mode ({result.speed_mbps:.1f} Mbps)")
self.in_slow_speed_mode = FalseInternetMonitor._maybe_run_speed_test method · python · L646-L704 (59 LOC)nas-monitor/monitor.py
def _maybe_run_speed_test(self, trigger: str, only_log_if_slow: bool = False) -> Optional[SpeedTestResult]:
"""Run speed test with triage: if VPS test shows slow, confirm with Ookla."""
result = self.speed_tester.run_test(trigger=trigger)
if not result:
return None
self.last_speed_test_time = time.time()
vps_speed = result.speed_mbps
is_slow = vps_speed < self.config.slow_speed_threshold_mbps
# If VPS test shows slow speed, run Ookla to confirm
if is_slow and SPEEDTEST_AVAILABLE:
logger.info(f"VPS test shows {vps_speed:.1f} Mbps - running Ookla to confirm...")
ookla_result = self.speed_tester.run_ookla_test(trigger=f"{trigger}_confirm")
if ookla_result:
# Use Ookla result as the authoritative measurement
confirmed_slow = ookla_result.speed_mbps < self.config.slow_speed_threshold_mbps
# Log both results
seRepobility (the analyzer behind this table) · https://repobility.com
InternetMonitor._handle_status_change method · python · L706-L740 (35 LOC)nas-monitor/monitor.py
def _handle_status_change(self, result: PingResult) -> bool:
"""Handle transitions between online and outage states. Returns True if status changed."""
status_changed = False
if result.status == 'outage' and self.last_status == 'online':
# Outage started
self.current_outage = OutageEvent(start_time=result.timestamp, start_datetime=result.datetime_str)
logger.warning(f"OUTAGE DETECTED at {result.datetime_str}")
self._log_and_forward('outage_start', {
'timestamp': result.timestamp,
'datetime_str': result.datetime_str
})
status_changed = True
elif result.status == 'online' and self.last_status == 'outage':
# Outage ended
if self.current_outage:
self.current_outage.end_time = result.timestamp
self.current_outage.end_datetime = result.datetime_str
self.current_outage.duration_secoInternetMonitor._send_pending_outages method · python · L742-L753 (12 LOC)nas-monitor/monitor.py
def _send_pending_outages(self):
"""Send any pending outage reports to the VPS."""
while not self.pending_outages.empty():
try:
outage = self.pending_outages.get_nowait()
if self.vps_client.send_outage_report(outage):
logger.info("Outage report sent to VPS")
else:
self.pending_outages.put(outage)
break
except Empty:
breakInternetMonitor._maybe_send_heartbeat method · python · L755-L763 (9 LOC)nas-monitor/monitor.py
def _maybe_send_heartbeat(self):
"""Send heartbeat if enough time has passed and we're online."""
current_time = time.time()
if (self.last_status == 'online' and
current_time - self.last_heartbeat_time >= self.config.heartbeat_interval_seconds):
if self.vps_client.send_heartbeat():
logger.debug("Heartbeat sent to VPS")
self.last_heartbeat_time = current_time
self._send_pending_outages()InternetMonitor.run method · python · L765-L841 (77 LOC)nas-monitor/monitor.py
def run(self):
"""Main monitoring loop."""
self.running = True
logger.info("NAS Internet Monitor started")
logger.info(f"Ping targets: {self.config.ping_targets}")
logger.info(f"Ping interval: {self.config.ping_interval_seconds}s")
logger.info(f"VPS URL: {self.config.vps_url}")
logger.info(f"High latency threshold: {self.config.high_latency_threshold_ms}ms")
logger.info(f"Slow speed threshold: {self.config.slow_speed_threshold_mbps} Mbps")
logger.info(f"Scheduled speed test: every {self.config.scheduled_speed_test_interval_seconds}s (always logged)")
logger.info(f"HTTP server port: {self.config.http_port}")
self._maybe_send_heartbeat()
while self.running and not self._shutdown_event.is_set():
try:
# Check for manual speed test request
if self.speed_test_requested.is_set():
self.request_speed_test()
selfRequestHandler.do_GET method · python · L856-L908 (53 LOC)nas-monitor/monitor.py
def do_GET(self):
if self.path == '/speedtest':
result = self.monitor.request_speed_test(use_ookla=False)
if result:
response = f'{{"speed_mbps": {result.speed_mbps:.1f}, "trigger": "{result.trigger}", "test_type": "{result.test_type}"}}'
self.send_response(200)
else:
response = '{"error": "Speed test failed"}'
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(response.encode())
elif self.path == '/speedtest/ookla':
result = self.monitor.request_speed_test(use_ookla=True)
if result:
upload = result.upload_mbps if result.upload_mbps else 0
response = f'{{"download_mbps": {result.speed_mbps:.1f}, "upload_mbps": {upload:.1f}, "test_type": "{result.test_type}"}}'
self.send_response(200)
elserun_http_server function · python · L911-L916 (6 LOC)nas-monitor/monitor.py
def run_http_server(monitor: InternetMonitor, port: int):
"""Run the HTTP server in a separate thread."""
RequestHandler.monitor = monitor
server = HTTPServer(('0.0.0.0', port), RequestHandler)
logger.info(f"HTTP server listening on port {port}")
server.serve_forever()main function · python · L919-L941 (23 LOC)nas-monitor/monitor.py
def main():
"""Main entry point."""
config_path = os.environ.get('CONFIG_PATH', 'config.yaml')
config = Config.load(config_path)
monitor = InternetMonitor(config)
# Start HTTP server thread
http_thread = threading.Thread(target=run_http_server, args=(monitor, config.http_port), daemon=True)
http_thread.start()
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down...")
monitor.stop()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
monitor.run()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
monitor.stop()Config.load method · python · L564-L597 (34 LOC)vps-monitor/monitor.py
def load(cls, config_path: str = 'config.yaml') -> 'Config':
"""Load configuration from YAML file and environment variables."""
config = cls()
# Load from YAML if exists
if os.path.exists(config_path):
with open(config_path, 'r') as f:
yaml_config = yaml.safe_load(f) or {}
for key, value in yaml_config.items():
if hasattr(config, key):
setattr(config, key, value)
# Override with environment variables
env_mappings = {
'LISTEN_HOST': ('listen_host', str),
'LISTEN_PORT': ('listen_port', int),
'HEARTBEAT_TIMEOUT': ('heartbeat_timeout_seconds', int),
'CHECK_INTERVAL': ('check_interval_seconds', int),
'NTFY_SERVER_URL': ('ntfy_server_url', str),
'NTFY_TOPIC': ('ntfy_topic', str),
'OUTAGE_LOG_FILE': ('outage_log_file', str),
'STARTUP_GRACE': ('startup_grace_sRepobility · severity-and-effort ranking · https://repobility.com
EventStore._init_db method · python · L613-L628 (16 LOC)vps-monitor/monitor.py
def _init_db(self):
"""Initialize database tables."""
with self._get_connection() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
datetime TEXT NOT NULL,
event_type TEXT NOT NULL,
data TEXT NOT NULL,
source TEXT DEFAULT 'vps'
)
''')
conn.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON events(timestamp)')
conn.execute('CREATE INDEX IF NOT EXISTS idx_event_type ON events(event_type)')
conn.commit()EventStore.add_event method · python · L630-L639 (10 LOC)vps-monitor/monitor.py
def add_event(self, event_type: str, data: dict, source: str = 'vps'):
"""Add an event to the store."""
timestamp = data.get('timestamp', time.time())
datetime_str = data.get('datetime_str', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
with self._get_connection() as conn:
conn.execute(
'INSERT INTO events (timestamp, datetime, event_type, data, source) VALUES (?, ?, ?, ?, ?)',
(timestamp, datetime_str, event_type, json.dumps(data), source)
)
conn.commit()EventStore.get_events method · python · L641-L666 (26 LOC)vps-monitor/monitor.py
def get_events(self, event_type: str = None, hours: int = 24, limit: int = 1000) -> list:
"""Get events from the store."""
cutoff = time.time() - (hours * 3600)
with self._get_connection() as conn:
if event_type:
cursor = conn.execute(
'SELECT * FROM events WHERE event_type = ? AND timestamp > ? ORDER BY timestamp DESC LIMIT ?',
(event_type, cutoff, limit)
)
else:
cursor = conn.execute(
'SELECT * FROM events WHERE timestamp > ? ORDER BY timestamp DESC LIMIT ?',
(cutoff, limit)
)
rows = cursor.fetchall()
return [
{
'id': row['id'],
'timestamp': row['timestamp'],
'datetime': row['datetime'],
'event_type': row['event_type'],
'data': json.loads(row['data']),
EventStore.get_uptime_periods method · python · L668-L674 (7 LOC)vps-monitor/monitor.py
def get_uptime_periods(self, hours: int = 24) -> list:
"""Get uptime/downtime periods for timeline visualization."""
events = self.get_events(hours=hours, limit=5000)
# Filter to status change events
status_events = [e for e in events if e['event_type'] in ('down', 'restored', 'heartbeat_first')]
status_events.sort(key=lambda x: x['timestamp'])
return status_eventsEventStore.cleanup_old_events method · python · L689-L694 (6 LOC)vps-monitor/monitor.py
def cleanup_old_events(self, days: int = 30):
"""Delete events older than specified days."""
cutoff = time.time() - (days * 86400)
with self._get_connection() as conn:
conn.execute('DELETE FROM events WHERE timestamp < ?', (cutoff,))
conn.commit()HeartbeatTracker.__init__ method · python · L700-L710 (11 LOC)vps-monitor/monitor.py
def __init__(self, config: Config):
self.config = config
self.last_heartbeat_time: Optional[float] = None
self.last_heartbeat_data: Optional[dict] = None
self.is_online = True
self.outage_start_time: Optional[float] = None
self.startup_time = time.time()
self._lock = threading.Lock()
# Track boot_id to detect NAS reboots (power cuts)
self.last_boot_id: Optional[str] = None
self.boot_id_before_outage: Optional[str] = NoneHeartbeatTracker.record_heartbeat method · python · L712-L747 (36 LOC)vps-monitor/monitor.py
def record_heartbeat(self, data: dict):
"""Record a received heartbeat."""
with self._lock:
now = time.time()
was_offline = not self.is_online
new_boot_id = data.get('boot_id', '')
new_uptime = data.get('uptime_seconds', 0)
self.last_heartbeat_time = now
self.last_heartbeat_data = data
self.is_online = True
if was_offline and self.outage_start_time:
# Calculate outage duration
duration = now - self.outage_start_time
self.outage_start_time = None
# Detect if NAS rebooted during outage
nas_rebooted = False
if self.boot_id_before_outage and new_boot_id:
nas_rebooted = (new_boot_id != self.boot_id_before_outage)
self.last_boot_id = new_boot_id
self.boot_id_before_outage = None
return {
'eveHeartbeatTracker.check_status method · python · L749-L780 (32 LOC)vps-monitor/monitor.py
def check_status(self) -> Optional[dict]:
"""Check if we've missed heartbeats (called periodically)."""
with self._lock:
now = time.time()
# Don't check during startup grace period
if now - self.startup_time < self.config.startup_grace_seconds:
return None
# If we've never received a heartbeat, start tracking
if self.last_heartbeat_time is None:
if self.is_online:
self.is_online = False
self.outage_start_time = now
self.boot_id_before_outage = self.last_boot_id
return {'event': 'down', 'reason': 'no_heartbeat_received'}
return None
# Check if heartbeat is overdue
time_since_heartbeat = now - self.last_heartbeat_time
if time_since_heartbeat > self.config.heartbeat_timeout_seconds:
if self.is_online:
self.is_oProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
HeartbeatTracker.get_status method · python · L782-L798 (17 LOC)vps-monitor/monitor.py
def get_status(self) -> dict:
"""Get current status for API endpoint."""
with self._lock:
now = time.time()
return {
'is_online': self.is_online,
'last_heartbeat_time': self.last_heartbeat_time,
'last_heartbeat_age_seconds': (
now - self.last_heartbeat_time
if self.last_heartbeat_time else None
),
'outage_start_time': self.outage_start_time,
'current_outage_duration_seconds': (
now - self.outage_start_time
if self.outage_start_time else None
)
}NtfyNotifier.send method · python · L808-L840 (33 LOC)vps-monitor/monitor.py
def send(self, title: str, message: str, priority: str = 'default',
tags: list = None) -> bool:
"""Send a notification to ntfy."""
if not self.config.ntfy_topic:
logger.warning("No ntfy topic configured, skipping notification")
return False
try:
url = f"{self.config.ntfy_server_url}/{self.config.ntfy_topic}"
headers = {
'Title': title,
'Priority': priority,
}
if tags:
headers['Tags'] = ','.join(tags)
response = self.session.post(
url,
data=message.encode('utf-8'),
headers=headers,
timeout=10
)
if response.status_code == 200:
logger.info(f"Notification sent: {title}")
return True
else:
logger.error(f"Failed to send notification: {response.status_code}")
NtfyNotifier.notify_down method · python · L842-L850 (9 LOC)vps-monitor/monitor.py
def notify_down(self, reason: str = None):
"""Send notification that home network is down."""
extra = f" ({reason})" if reason else ""
self.send(
title="Home Network DOWN",
message=f"No heartbeat received from NAS monitor{extra}",
priority='high',
tags=['warning', 'house']
)NtfyNotifier.notify_restored method · python · L852-L868 (17 LOC)vps-monitor/monitor.py
def notify_restored(self, duration_seconds: float):
"""Send notification that home network is restored."""
duration_min = duration_seconds / 60
if duration_min < 1:
duration_str = f"{duration_seconds:.0f} seconds"
elif duration_min < 60:
duration_str = f"{duration_min:.1f} minutes"
else:
hours = duration_min / 60
duration_str = f"{hours:.1f} hours"
self.send(
title="Home Network RESTORED",
message=f"Connection restored after {duration_str}",
priority='default',
tags=['white_check_mark', 'house']
)OutageLogger.log_outage method · python · L883-L890 (8 LOC)vps-monitor/monitor.py
def log_outage(self, outage_data: dict):
"""Log an outage event."""
try:
with open(self.log_path, 'a') as f:
timestamp = datetime.now().isoformat()
f.write(f"{timestamp} | {outage_data}\n")
except OSError as e:
logger.error(f"Failed to log outage: {e}")VPSMonitor.__init__ method · python · L896-L907 (12 LOC)vps-monitor/monitor.py
def __init__(self, config: Config):
self.config = config
self.tracker = HeartbeatTracker(config)
self.notifier = NtfyNotifier(config)
self.outage_logger = OutageLogger(config)
self.event_store = EventStore(config)
self.app = Flask(__name__)
self._setup_routes()
self.running = False
self._check_thread: Optional[threading.Thread] = NoneVPSMonitor._check_loop method · python · L1258-L1280 (23 LOC)vps-monitor/monitor.py
def _check_loop(self):
"""Background loop to check for missed heartbeats."""
while self.running:
try:
result = self.tracker.check_status()
if result:
if result.get('event') == 'down':
reason = result.get('reason', 'unknown')
logger.warning(f"Home network DOWN: {reason}")
self.notifier.notify_down(reason)
self.outage_logger.log_outage({
'type': 'down_detected',
'reason': reason
})
self.event_store.add_event('down', {
'timestamp': time.time(),
'datetime_str': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'reason': reason
}, source='vps')
except Exception as e:
logger.error(f"VPSMonitor.run method · python · L1282-L1303 (22 LOC)vps-monitor/monitor.py
def run(self):
"""Run the VPS monitor."""
self.running = True
logger.info("VPS Internet Monitor started")
logger.info(f"Listening on {self.config.listen_host}:{self.config.listen_port}")
logger.info(f"Heartbeat timeout: {self.config.heartbeat_timeout_seconds}s")
logger.info(f"ntfy topic: {self.config.ntfy_topic or '(not configured)'}")
logger.info(f"Startup grace period: {self.config.startup_grace_seconds}s")
# Start background check thread
self._check_thread = threading.Thread(target=self._check_loop, daemon=True)
self._check_thread.start()
# Run Flask app
# Using threaded=True for handling concurrent requests
self.app.run(
host=self.config.listen_host,
port=self.config.listen_port,
threaded=True,
use_reloader=False # Disable reloader for production
)Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
main function · python · L1310-L1339 (30 LOC)vps-monitor/monitor.py
def main():
"""Main entry point."""
# Load configuration
config_path = os.environ.get('CONFIG_PATH', 'config.yaml')
config = Config.load(config_path)
# Validate required configuration
if not config.ntfy_topic:
logger.warning(
"NTFY_TOPIC not configured. Notifications will be disabled. "
"Set via config.yaml or NTFY_TOPIC environment variable."
)
# Create and run monitor
monitor = VPSMonitor(config)
# Setup signal handlers
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down...")
monitor.stop()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
monitor.run()
except KeyboardInterrupt:
logger.info("Keyboard interrupt received")
monitor.stop()