← back to jowtron__internet-monitor

Function bodies 48 total

All specs Real LLM only Function bodies
check_speedtest_cli function · python · L30-L36 (7 LOC)
nas-monitor/monitor.py
def check_speedtest_cli() -> bool:
    """Check if the official Ookla speedtest CLI is available."""
    try:
        result = subprocess.run(['speedtest', '--version'], capture_output=True, timeout=5)
        return result.returncode == 0
    except (FileNotFoundError, subprocess.TimeoutExpired):
        return False
Config.load method · python · L72-L109 (38 LOC)
nas-monitor/monitor.py
    def load(cls, config_path: str = 'config.yaml') -> 'Config':
        """Load configuration from YAML file and environment variables."""
        config = cls()

        # Load from YAML if exists
        if os.path.exists(config_path):
            with open(config_path, 'r') as f:
                yaml_config = yaml.safe_load(f) or {}
                for key, value in yaml_config.items():
                    if hasattr(config, key):
                        setattr(config, key, value)

        # Override with environment variables
        env_mappings = {
            'PING_TARGETS': ('ping_targets', lambda x: x.split(',')),
            'PING_INTERVAL': ('ping_interval_seconds', int),
            'PING_TIMEOUT': ('ping_timeout_seconds', int),
            'HEARTBEAT_INTERVAL': ('heartbeat_interval_seconds', int),
            'VPS_URL': ('vps_url', str),
            'LOG_DIRECTORY': ('log_directory', str),
            'LOG_RETENTION_DAYS': ('log_retention_days', int),
            'NTFY_S
PingMonitor.ping method · python · L151-L178 (28 LOC)
nas-monitor/monitor.py
    def ping(self, target: str) -> tuple[bool, Optional[float]]:
        """Ping a target and return (success, latency_ms)."""
        try:
            if sys.platform == 'darwin':  # macOS
                cmd = ['ping', '-c', str(self.config.ping_count), '-t',
                       str(self.config.ping_timeout_seconds), target]
            elif sys.platform == 'win32':
                cmd = ['ping', '-n', str(self.config.ping_count), '-w',
                       str(self.config.ping_timeout_seconds * 1000), target]
            else:  # Linux
                cmd = ['ping', '-c', str(self.config.ping_count), '-W',
                       str(self.config.ping_timeout_seconds), target]

            result = subprocess.run(
                cmd, capture_output=True, text=True,
                timeout=self.config.ping_timeout_seconds * self.config.ping_count + 5
            )

            if result.returncode == 0:
                latency = self._parse_ping_latency(result.stdout)
           
PingMonitor._parse_ping_latency method · python · L180-L192 (13 LOC)
nas-monitor/monitor.py
    def _parse_ping_latency(self, output: str) -> Optional[float]:
        """Extract average latency from ping output."""
        try:
            for line in output.split('\n'):
                if 'avg' in line.lower() or 'average' in line.lower():
                    if '=' in line:
                        numbers_part = line.split('=')[1].strip()
                        parts = numbers_part.split('/')
                        if len(parts) >= 2:
                            return float(parts[1])
            return None
        except (ValueError, IndexError):
            return None
PingMonitor.check_connectivity method · python · L194-L218 (25 LOC)
nas-monitor/monitor.py
    def check_connectivity(self) -> PingResult:
        """Check internet connectivity by pinging targets."""
        timestamp = time.time()
        datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        best_latency = None
        any_success = False
        successful_target = None

        for target in self.config.ping_targets:
            success, latency = self.ping(target)
            if success:
                any_success = True
                successful_target = target
                if latency is not None:
                    if best_latency is None or latency < best_latency:
                        best_latency = latency
                break

        return PingResult(
            timestamp=timestamp,
            datetime_str=datetime_str,
            status='online' if any_success else 'outage',
            ping_ms=best_latency,
            target=successful_target or self.config.ping_targets[0]
        )
SpeedTester.run_test method · python · L228-L270 (43 LOC)
nas-monitor/monitor.py
    def run_test(self, trigger: str = 'manual') -> Optional[SpeedTestResult]:
        """Run a speed test by downloading file from VPS."""
        url = f"{self.config.vps_url}/speedtest"
        timestamp = time.time()
        datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        try:
            start_time = time.time()
            response = self.session.get(url, timeout=60, stream=True)

            if response.status_code != 200:
                logger.error(f"Speed test failed: HTTP {response.status_code}")
                return None

            # Download and measure
            total_bytes = 0
            for chunk in response.iter_content(chunk_size=8192):
                total_bytes += len(chunk)

            end_time = time.time()
            duration = end_time - start_time

            if duration > 0:
                speed_mbps = (total_bytes * 8) / (duration * 1_000_000)
            else:
                speed_mbps = 0

            result = SpeedTestResu
SpeedTester.run_ookla_test method · python · L272-L329 (58 LOC)
nas-monitor/monitor.py
    def run_ookla_test(self, trigger: str = 'manual') -> Optional[SpeedTestResult]:
        """Run a full Ookla speed test using official CLI (slower but more accurate)."""
        if not SPEEDTEST_AVAILABLE:
            logger.error("Ookla speedtest CLI not installed")
            return None

        timestamp = time.time()
        datetime_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        try:
            logger.info("Running Ookla speed test (this may take 30-60 seconds)...")
            start_time = time.time()

            # Run official Ookla CLI with JSON output
            result = subprocess.run(
                ['speedtest', '--accept-license', '--accept-gdpr', '--format=json'],
                capture_output=True,
                text=True,
                timeout=120
            )

            end_time = time.time()
            duration = end_time - start_time

            if result.returncode != 0:
                logger.error(f"Ookla CLI failed: {result.stderr}
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
EventLogger._ensure_log_directory method · python · L341-L347 (7 LOC)
nas-monitor/monitor.py
    def _ensure_log_directory(self):
        """Create log directory if it doesn't exist."""
        try:
            self.log_dir.mkdir(parents=True, exist_ok=True)
        except OSError as e:
            logger.error(f"Failed to create log directory: {e}")
            raise
EventLogger._cleanup_old_logs method · python · L349-L368 (20 LOC)
nas-monitor/monitor.py
    def _cleanup_old_logs(self):
        """Delete log files older than retention period."""
        today = datetime.now().strftime('%Y-%m-%d')
        if self._last_cleanup_date == today:
            return

        self._last_cleanup_date = today
        cutoff_date = datetime.now() - timedelta(days=self.config.log_retention_days)

        try:
            for log_file in self.log_dir.glob('*.csv'):
                try:
                    file_date = datetime.strptime(log_file.stem, '%Y-%m-%d')
                    if file_date < cutoff_date:
                        log_file.unlink()
                        logger.info(f"Deleted old log: {log_file.name}")
                except (ValueError, OSError):
                    continue
        except OSError as e:
            logger.warning(f"Error during log cleanup: {e}")
EventLogger.log_event method · python · L375-L393 (19 LOC)
nas-monitor/monitor.py
    def log_event(self, event_type: str, data: dict):
        """Log an event to the CSV file."""
        self._cleanup_old_logs()
        log_file = self._get_log_filename()
        file_exists = log_file.exists()

        try:
            with open(log_file, 'a', newline='') as f:
                writer = csv.writer(f)
                if not file_exists:
                    writer.writerow(['timestamp', 'datetime', 'event_type', 'details'])

                timestamp = data.get('timestamp', time.time())
                datetime_str = data.get('datetime_str', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                details = {k: v for k, v in data.items() if k not in ['timestamp', 'datetime_str']}

                writer.writerow([timestamp, datetime_str, event_type, str(details)])
        except OSError as e:
            logger.error(f"Failed to write to log file: {e}")
NtfyNotifier.send method · python · L403-L418 (16 LOC)
nas-monitor/monitor.py
    def send(self, title: str, message: str, priority: str = 'default', tags: list = None) -> bool:
        """Send a notification to ntfy."""
        if not self.config.ntfy_topic:
            return False

        try:
            url = f"{self.config.ntfy_server_url}/{self.config.ntfy_topic}"
            headers = {'Title': title, 'Priority': priority}
            if tags:
                headers['Tags'] = ','.join(tags)

            response = self.session.post(url, data=message.encode('utf-8'), headers=headers, timeout=10)
            return response.status_code == 200
        except requests.RequestException as e:
            logger.error(f"Failed to send notification: {e}")
            return False
NtfyNotifier.notify_speed_test method · python · L420-L436 (17 LOC)
nas-monitor/monitor.py
    def notify_speed_test(self, result: SpeedTestResult):
        """Send speed test result notification."""
        speed = result.speed_mbps
        if speed < self.config.slow_speed_threshold_mbps:
            title = f"Speed Test: {speed:.1f} Mbps (SLOW)"
            priority = 'high'
            tags = ['warning', 'speedboat']
        else:
            title = f"Speed Test: {speed:.1f} Mbps"
            priority = 'default'
            tags = ['white_check_mark', 'speedboat']

        message = f"Type: {result.test_type}\nTrigger: {result.trigger}\nDownload: {speed:.1f} Mbps"
        if result.upload_mbps is not None:
            message += f"\nUpload: {result.upload_mbps:.1f} Mbps"
        message += f"\nDuration: {result.duration_seconds:.1f}s"
        self.send(title, message, priority, tags)
VPSClient._get_boot_id method · python · L446-L452 (7 LOC)
nas-monitor/monitor.py
    def _get_boot_id(self) -> str:
        """Get the Linux boot ID (changes on every reboot)."""
        try:
            with open('/proc/sys/kernel/random/boot_id', 'r') as f:
                return f.read().strip()
        except Exception:
            return ''
VPSClient._get_uptime_seconds method · python · L454-L460 (7 LOC)
nas-monitor/monitor.py
    def _get_uptime_seconds(self) -> float:
        """Get system uptime in seconds."""
        try:
            with open('/proc/uptime', 'r') as f:
                return float(f.read().split()[0])
        except Exception:
            return 0.0
VPSClient.send_heartbeat method · python · L462-L478 (17 LOC)
nas-monitor/monitor.py
    def send_heartbeat(self) -> bool:
        """Send a heartbeat to the VPS."""
        try:
            response = self.session.post(
                f"{self.config.vps_url}/heartbeat",
                json={
                    'timestamp': time.time(),
                    'datetime': datetime.now().isoformat(),
                    'source': 'nas-monitor',
                    'boot_id': self._get_boot_id(),
                    'uptime_seconds': self._get_uptime_seconds(),
                },
                timeout=10
            )
            return response.status_code == 200
        except requests.RequestException:
            return False
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
VPSClient.send_outage_report method · python · L480-L497 (18 LOC)
nas-monitor/monitor.py
    def send_outage_report(self, outage: OutageEvent) -> bool:
        """Send an outage report to the VPS."""
        try:
            response = self.session.post(
                f"{self.config.vps_url}/outage",
                json={
                    'start_time': outage.start_time,
                    'start_datetime': outage.start_datetime,
                    'end_time': outage.end_time,
                    'end_datetime': outage.end_datetime,
                    'duration_seconds': outage.duration_seconds,
                    'source': 'nas-monitor'
                },
                timeout=10
            )
            return response.status_code == 200
        except requests.RequestException:
            return False
VPSClient.send_event method · python · L499-L512 (14 LOC)
nas-monitor/monitor.py
    def send_event(self, event_type: str, data: dict) -> bool:
        """Send an event to the VPS for dashboard history."""
        try:
            response = self.session.post(
                f"{self.config.vps_url}/api/events",
                json={
                    'event_type': event_type,
                    'data': data
                },
                timeout=10
            )
            return response.status_code == 200
        except requests.RequestException:
            return False
InternetMonitor.__init__ method · python · L518-L536 (19 LOC)
nas-monitor/monitor.py
    def __init__(self, config: Config):
        self.config = config
        self.ping_monitor = PingMonitor(config)
        self.speed_tester = SpeedTester(config)
        self.event_logger = EventLogger(config)
        self.notifier = NtfyNotifier(config)
        self.vps_client = VPSClient(config)

        self.running = False
        self.current_outage: Optional[OutageEvent] = None
        self.pending_outages: Queue[OutageEvent] = Queue()
        self.last_heartbeat_time = 0
        self.last_status = 'online'
        self.last_speed_test_time = 0
        self.last_scheduled_test_time = 0
        self.in_slow_speed_mode = False
        self.speed_test_requested = threading.Event()

        self._shutdown_event = threading.Event()
InternetMonitor.request_speed_test method · python · L543-L563 (21 LOC)
nas-monitor/monitor.py
    def request_speed_test(self, use_ookla: bool = False) -> Optional[SpeedTestResult]:
        """Trigger a manual speed test."""
        if use_ookla:
            result = self.speed_tester.run_ookla_test(trigger='manual')
        else:
            result = self.speed_tester.run_test(trigger='manual')

        if result:
            log_data = {
                'timestamp': result.timestamp,
                'datetime_str': result.datetime_str,
                'speed_mbps': result.speed_mbps,
                'trigger': result.trigger,
                'test_type': result.test_type
            }
            if result.upload_mbps is not None:
                log_data['upload_mbps'] = result.upload_mbps
            self._log_and_forward('speed_test', log_data)
            self.notifier.notify_speed_test(result)
            self._check_slow_speed(result)
        return result
InternetMonitor.request_full_speed_test method · python · L565-L600 (36 LOC)
nas-monitor/monitor.py
    def request_full_speed_test(self) -> dict:
        """Run all speed tests (VPS + Ookla) and send combined notification."""
        results = {}

        # Run VPS download test
        logger.info("Running full speed test suite...")
        vps_result = self.speed_tester.run_test(trigger='manual_full')
        if vps_result:
            results['vps_download_mbps'] = vps_result.speed_mbps
            self._log_and_forward('speed_test', {
                'timestamp': vps_result.timestamp,
                'datetime_str': vps_result.datetime_str,
                'speed_mbps': vps_result.speed_mbps,
                'trigger': 'manual_full',
                'test_type': 'vps'
            })

        # Run Ookla test (download + upload)
        ookla_result = self.speed_tester.run_ookla_test(trigger='manual_full')
        if ookla_result:
            results['ookla_download_mbps'] = ookla_result.speed_mbps
            results['ookla_upload_mbps'] = ookla_result.upload_mbps
            se
InternetMonitor._notify_full_speed_test method · python · L602-L633 (32 LOC)
nas-monitor/monitor.py
    def _notify_full_speed_test(self, results: dict):
        """Send a combined notification for full speed test."""
        lines = ["Full Speed Test Results:", ""]

        vps_dl = results.get('vps_download_mbps')
        ookla_dl = results.get('ookla_download_mbps')
        ookla_ul = results.get('ookla_upload_mbps')

        if vps_dl is not None:
            lines.append(f"VPS Download: {vps_dl:.1f} Mbps")
        if ookla_dl is not None:
            lines.append(f"Ookla Download: {ookla_dl:.1f} Mbps")
        if ookla_ul is not None:
            lines.append(f"Ookla Upload: {ookla_ul:.1f} Mbps")

        # Check if any download speed is slow (upload has different expectations)
        threshold = self.config.slow_speed_threshold_mbps
        is_slow = any(
            v is not None and v < threshold
            for v in [vps_dl, ookla_dl]
        )

        if is_slow:
            title = "Full Speed Test: SLOW"
            priority = 'high'
            tags = ['warning', 'spee
InternetMonitor._check_slow_speed method · python · L635-L644 (10 LOC)
nas-monitor/monitor.py
    def _check_slow_speed(self, result: SpeedTestResult):
        """Check if we should enter slow speed mode."""
        if result.speed_mbps < self.config.slow_speed_threshold_mbps:
            if not self.in_slow_speed_mode:
                logger.warning(f"Entering slow speed mode ({result.speed_mbps:.1f} Mbps < {self.config.slow_speed_threshold_mbps} Mbps)")
                self.in_slow_speed_mode = True
        else:
            if self.in_slow_speed_mode:
                logger.info(f"Exiting slow speed mode ({result.speed_mbps:.1f} Mbps)")
                self.in_slow_speed_mode = False
InternetMonitor._maybe_run_speed_test method · python · L646-L704 (59 LOC)
nas-monitor/monitor.py
    def _maybe_run_speed_test(self, trigger: str, only_log_if_slow: bool = False) -> Optional[SpeedTestResult]:
        """Run speed test with triage: if VPS test shows slow, confirm with Ookla."""
        result = self.speed_tester.run_test(trigger=trigger)
        if not result:
            return None

        self.last_speed_test_time = time.time()
        vps_speed = result.speed_mbps
        is_slow = vps_speed < self.config.slow_speed_threshold_mbps

        # If VPS test shows slow speed, run Ookla to confirm
        if is_slow and SPEEDTEST_AVAILABLE:
            logger.info(f"VPS test shows {vps_speed:.1f} Mbps - running Ookla to confirm...")
            ookla_result = self.speed_tester.run_ookla_test(trigger=f"{trigger}_confirm")

            if ookla_result:
                # Use Ookla result as the authoritative measurement
                confirmed_slow = ookla_result.speed_mbps < self.config.slow_speed_threshold_mbps

                # Log both results
                se
Repobility (the analyzer behind this table) · https://repobility.com
InternetMonitor._handle_status_change method · python · L706-L740 (35 LOC)
nas-monitor/monitor.py
    def _handle_status_change(self, result: PingResult) -> bool:
        """Handle transitions between online and outage states. Returns True if status changed."""
        status_changed = False

        if result.status == 'outage' and self.last_status == 'online':
            # Outage started
            self.current_outage = OutageEvent(start_time=result.timestamp, start_datetime=result.datetime_str)
            logger.warning(f"OUTAGE DETECTED at {result.datetime_str}")
            self._log_and_forward('outage_start', {
                'timestamp': result.timestamp,
                'datetime_str': result.datetime_str
            })
            status_changed = True

        elif result.status == 'online' and self.last_status == 'outage':
            # Outage ended
            if self.current_outage:
                self.current_outage.end_time = result.timestamp
                self.current_outage.end_datetime = result.datetime_str
                self.current_outage.duration_seco
InternetMonitor._send_pending_outages method · python · L742-L753 (12 LOC)
nas-monitor/monitor.py
    def _send_pending_outages(self):
        """Send any pending outage reports to the VPS."""
        while not self.pending_outages.empty():
            try:
                outage = self.pending_outages.get_nowait()
                if self.vps_client.send_outage_report(outage):
                    logger.info("Outage report sent to VPS")
                else:
                    self.pending_outages.put(outage)
                    break
            except Empty:
                break
InternetMonitor._maybe_send_heartbeat method · python · L755-L763 (9 LOC)
nas-monitor/monitor.py
    def _maybe_send_heartbeat(self):
        """Send heartbeat if enough time has passed and we're online."""
        current_time = time.time()
        if (self.last_status == 'online' and
            current_time - self.last_heartbeat_time >= self.config.heartbeat_interval_seconds):
            if self.vps_client.send_heartbeat():
                logger.debug("Heartbeat sent to VPS")
                self.last_heartbeat_time = current_time
                self._send_pending_outages()
InternetMonitor.run method · python · L765-L841 (77 LOC)
nas-monitor/monitor.py
    def run(self):
        """Main monitoring loop."""
        self.running = True
        logger.info("NAS Internet Monitor started")
        logger.info(f"Ping targets: {self.config.ping_targets}")
        logger.info(f"Ping interval: {self.config.ping_interval_seconds}s")
        logger.info(f"VPS URL: {self.config.vps_url}")
        logger.info(f"High latency threshold: {self.config.high_latency_threshold_ms}ms")
        logger.info(f"Slow speed threshold: {self.config.slow_speed_threshold_mbps} Mbps")
        logger.info(f"Scheduled speed test: every {self.config.scheduled_speed_test_interval_seconds}s (always logged)")
        logger.info(f"HTTP server port: {self.config.http_port}")

        self._maybe_send_heartbeat()

        while self.running and not self._shutdown_event.is_set():
            try:
                # Check for manual speed test request
                if self.speed_test_requested.is_set():
                    self.request_speed_test()
                    self
RequestHandler.do_GET method · python · L856-L908 (53 LOC)
nas-monitor/monitor.py
    def do_GET(self):
        if self.path == '/speedtest':
            result = self.monitor.request_speed_test(use_ookla=False)
            if result:
                response = f'{{"speed_mbps": {result.speed_mbps:.1f}, "trigger": "{result.trigger}", "test_type": "{result.test_type}"}}'
                self.send_response(200)
            else:
                response = '{"error": "Speed test failed"}'
                self.send_response(500)
            self.send_header('Content-Type', 'application/json')
            self.end_headers()
            self.wfile.write(response.encode())

        elif self.path == '/speedtest/ookla':
            result = self.monitor.request_speed_test(use_ookla=True)
            if result:
                upload = result.upload_mbps if result.upload_mbps else 0
                response = f'{{"download_mbps": {result.speed_mbps:.1f}, "upload_mbps": {upload:.1f}, "test_type": "{result.test_type}"}}'
                self.send_response(200)
            else
run_http_server function · python · L911-L916 (6 LOC)
nas-monitor/monitor.py
def run_http_server(monitor: InternetMonitor, port: int):
    """Run the HTTP server in a separate thread."""
    RequestHandler.monitor = monitor
    server = HTTPServer(('0.0.0.0', port), RequestHandler)
    logger.info(f"HTTP server listening on port {port}")
    server.serve_forever()
main function · python · L919-L941 (23 LOC)
nas-monitor/monitor.py
def main():
    """Main entry point."""
    config_path = os.environ.get('CONFIG_PATH', 'config.yaml')
    config = Config.load(config_path)

    monitor = InternetMonitor(config)

    # Start HTTP server thread
    http_thread = threading.Thread(target=run_http_server, args=(monitor, config.http_port), daemon=True)
    http_thread.start()

    def signal_handler(signum, frame):
        logger.info(f"Received signal {signum}, shutting down...")
        monitor.stop()

    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    try:
        monitor.run()
    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received")
        monitor.stop()
Config.load method · python · L564-L597 (34 LOC)
vps-monitor/monitor.py
    def load(cls, config_path: str = 'config.yaml') -> 'Config':
        """Load configuration from YAML file and environment variables."""
        config = cls()

        # Load from YAML if exists
        if os.path.exists(config_path):
            with open(config_path, 'r') as f:
                yaml_config = yaml.safe_load(f) or {}
                for key, value in yaml_config.items():
                    if hasattr(config, key):
                        setattr(config, key, value)

        # Override with environment variables
        env_mappings = {
            'LISTEN_HOST': ('listen_host', str),
            'LISTEN_PORT': ('listen_port', int),
            'HEARTBEAT_TIMEOUT': ('heartbeat_timeout_seconds', int),
            'CHECK_INTERVAL': ('check_interval_seconds', int),
            'NTFY_SERVER_URL': ('ntfy_server_url', str),
            'NTFY_TOPIC': ('ntfy_topic', str),
            'OUTAGE_LOG_FILE': ('outage_log_file', str),
            'STARTUP_GRACE': ('startup_grace_s
Repobility · severity-and-effort ranking · https://repobility.com
EventStore._init_db method · python · L613-L628 (16 LOC)
vps-monitor/monitor.py
    def _init_db(self):
        """Initialize database tables."""
        with self._get_connection() as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS events (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    timestamp REAL NOT NULL,
                    datetime TEXT NOT NULL,
                    event_type TEXT NOT NULL,
                    data TEXT NOT NULL,
                    source TEXT DEFAULT 'vps'
                )
            ''')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON events(timestamp)')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_event_type ON events(event_type)')
            conn.commit()
EventStore.add_event method · python · L630-L639 (10 LOC)
vps-monitor/monitor.py
    def add_event(self, event_type: str, data: dict, source: str = 'vps'):
        """Add an event to the store."""
        timestamp = data.get('timestamp', time.time())
        datetime_str = data.get('datetime_str', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        with self._get_connection() as conn:
            conn.execute(
                'INSERT INTO events (timestamp, datetime, event_type, data, source) VALUES (?, ?, ?, ?, ?)',
                (timestamp, datetime_str, event_type, json.dumps(data), source)
            )
            conn.commit()
EventStore.get_events method · python · L641-L666 (26 LOC)
vps-monitor/monitor.py
    def get_events(self, event_type: str = None, hours: int = 24, limit: int = 1000) -> list:
        """Get events from the store."""
        cutoff = time.time() - (hours * 3600)
        with self._get_connection() as conn:
            if event_type:
                cursor = conn.execute(
                    'SELECT * FROM events WHERE event_type = ? AND timestamp > ? ORDER BY timestamp DESC LIMIT ?',
                    (event_type, cutoff, limit)
                )
            else:
                cursor = conn.execute(
                    'SELECT * FROM events WHERE timestamp > ? ORDER BY timestamp DESC LIMIT ?',
                    (cutoff, limit)
                )
            rows = cursor.fetchall()
            return [
                {
                    'id': row['id'],
                    'timestamp': row['timestamp'],
                    'datetime': row['datetime'],
                    'event_type': row['event_type'],
                    'data': json.loads(row['data']),
 
EventStore.get_uptime_periods method · python · L668-L674 (7 LOC)
vps-monitor/monitor.py
    def get_uptime_periods(self, hours: int = 24) -> list:
        """Get uptime/downtime periods for timeline visualization."""
        events = self.get_events(hours=hours, limit=5000)
        # Filter to status change events
        status_events = [e for e in events if e['event_type'] in ('down', 'restored', 'heartbeat_first')]
        status_events.sort(key=lambda x: x['timestamp'])
        return status_events
EventStore.cleanup_old_events method · python · L689-L694 (6 LOC)
vps-monitor/monitor.py
    def cleanup_old_events(self, days: int = 30):
        """Delete events older than specified days."""
        cutoff = time.time() - (days * 86400)
        with self._get_connection() as conn:
            conn.execute('DELETE FROM events WHERE timestamp < ?', (cutoff,))
            conn.commit()
HeartbeatTracker.__init__ method · python · L700-L710 (11 LOC)
vps-monitor/monitor.py
    def __init__(self, config: Config):
        self.config = config
        self.last_heartbeat_time: Optional[float] = None
        self.last_heartbeat_data: Optional[dict] = None
        self.is_online = True
        self.outage_start_time: Optional[float] = None
        self.startup_time = time.time()
        self._lock = threading.Lock()
        # Track boot_id to detect NAS reboots (power cuts)
        self.last_boot_id: Optional[str] = None
        self.boot_id_before_outage: Optional[str] = None
HeartbeatTracker.record_heartbeat method · python · L712-L747 (36 LOC)
vps-monitor/monitor.py
    def record_heartbeat(self, data: dict):
        """Record a received heartbeat."""
        with self._lock:
            now = time.time()
            was_offline = not self.is_online
            new_boot_id = data.get('boot_id', '')
            new_uptime = data.get('uptime_seconds', 0)

            self.last_heartbeat_time = now
            self.last_heartbeat_data = data
            self.is_online = True

            if was_offline and self.outage_start_time:
                # Calculate outage duration
                duration = now - self.outage_start_time
                self.outage_start_time = None

                # Detect if NAS rebooted during outage
                nas_rebooted = False
                if self.boot_id_before_outage and new_boot_id:
                    nas_rebooted = (new_boot_id != self.boot_id_before_outage)

                self.last_boot_id = new_boot_id
                self.boot_id_before_outage = None

                return {
                    'eve
HeartbeatTracker.check_status method · python · L749-L780 (32 LOC)
vps-monitor/monitor.py
    def check_status(self) -> Optional[dict]:
        """Check if we've missed heartbeats (called periodically)."""
        with self._lock:
            now = time.time()

            # Don't check during startup grace period
            if now - self.startup_time < self.config.startup_grace_seconds:
                return None

            # If we've never received a heartbeat, start tracking
            if self.last_heartbeat_time is None:
                if self.is_online:
                    self.is_online = False
                    self.outage_start_time = now
                    self.boot_id_before_outage = self.last_boot_id
                    return {'event': 'down', 'reason': 'no_heartbeat_received'}
                return None

            # Check if heartbeat is overdue
            time_since_heartbeat = now - self.last_heartbeat_time
            if time_since_heartbeat > self.config.heartbeat_timeout_seconds:
                if self.is_online:
                    self.is_o
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
HeartbeatTracker.get_status method · python · L782-L798 (17 LOC)
vps-monitor/monitor.py
    def get_status(self) -> dict:
        """Get current status for API endpoint."""
        with self._lock:
            now = time.time()
            return {
                'is_online': self.is_online,
                'last_heartbeat_time': self.last_heartbeat_time,
                'last_heartbeat_age_seconds': (
                    now - self.last_heartbeat_time
                    if self.last_heartbeat_time else None
                ),
                'outage_start_time': self.outage_start_time,
                'current_outage_duration_seconds': (
                    now - self.outage_start_time
                    if self.outage_start_time else None
                )
            }
NtfyNotifier.send method · python · L808-L840 (33 LOC)
vps-monitor/monitor.py
    def send(self, title: str, message: str, priority: str = 'default',
             tags: list = None) -> bool:
        """Send a notification to ntfy."""
        if not self.config.ntfy_topic:
            logger.warning("No ntfy topic configured, skipping notification")
            return False

        try:
            url = f"{self.config.ntfy_server_url}/{self.config.ntfy_topic}"
            headers = {
                'Title': title,
                'Priority': priority,
            }
            if tags:
                headers['Tags'] = ','.join(tags)

            response = self.session.post(
                url,
                data=message.encode('utf-8'),
                headers=headers,
                timeout=10
            )

            if response.status_code == 200:
                logger.info(f"Notification sent: {title}")
                return True
            else:
                logger.error(f"Failed to send notification: {response.status_code}")
               
NtfyNotifier.notify_down method · python · L842-L850 (9 LOC)
vps-monitor/monitor.py
    def notify_down(self, reason: str = None):
        """Send notification that home network is down."""
        extra = f" ({reason})" if reason else ""
        self.send(
            title="Home Network DOWN",
            message=f"No heartbeat received from NAS monitor{extra}",
            priority='high',
            tags=['warning', 'house']
        )
NtfyNotifier.notify_restored method · python · L852-L868 (17 LOC)
vps-monitor/monitor.py
    def notify_restored(self, duration_seconds: float):
        """Send notification that home network is restored."""
        duration_min = duration_seconds / 60
        if duration_min < 1:
            duration_str = f"{duration_seconds:.0f} seconds"
        elif duration_min < 60:
            duration_str = f"{duration_min:.1f} minutes"
        else:
            hours = duration_min / 60
            duration_str = f"{hours:.1f} hours"

        self.send(
            title="Home Network RESTORED",
            message=f"Connection restored after {duration_str}",
            priority='default',
            tags=['white_check_mark', 'house']
        )
OutageLogger.log_outage method · python · L883-L890 (8 LOC)
vps-monitor/monitor.py
    def log_outage(self, outage_data: dict):
        """Log an outage event."""
        try:
            with open(self.log_path, 'a') as f:
                timestamp = datetime.now().isoformat()
                f.write(f"{timestamp} | {outage_data}\n")
        except OSError as e:
            logger.error(f"Failed to log outage: {e}")
VPSMonitor.__init__ method · python · L896-L907 (12 LOC)
vps-monitor/monitor.py
    def __init__(self, config: Config):
        self.config = config
        self.tracker = HeartbeatTracker(config)
        self.notifier = NtfyNotifier(config)
        self.outage_logger = OutageLogger(config)
        self.event_store = EventStore(config)

        self.app = Flask(__name__)
        self._setup_routes()

        self.running = False
        self._check_thread: Optional[threading.Thread] = None
VPSMonitor._check_loop method · python · L1258-L1280 (23 LOC)
vps-monitor/monitor.py
    def _check_loop(self):
        """Background loop to check for missed heartbeats."""
        while self.running:
            try:
                result = self.tracker.check_status()
                if result:
                    if result.get('event') == 'down':
                        reason = result.get('reason', 'unknown')
                        logger.warning(f"Home network DOWN: {reason}")
                        self.notifier.notify_down(reason)
                        self.outage_logger.log_outage({
                            'type': 'down_detected',
                            'reason': reason
                        })
                        self.event_store.add_event('down', {
                            'timestamp': time.time(),
                            'datetime_str': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                            'reason': reason
                        }, source='vps')
            except Exception as e:
                logger.error(f"
VPSMonitor.run method · python · L1282-L1303 (22 LOC)
vps-monitor/monitor.py
    def run(self):
        """Run the VPS monitor."""
        self.running = True

        logger.info("VPS Internet Monitor started")
        logger.info(f"Listening on {self.config.listen_host}:{self.config.listen_port}")
        logger.info(f"Heartbeat timeout: {self.config.heartbeat_timeout_seconds}s")
        logger.info(f"ntfy topic: {self.config.ntfy_topic or '(not configured)'}")
        logger.info(f"Startup grace period: {self.config.startup_grace_seconds}s")

        # Start background check thread
        self._check_thread = threading.Thread(target=self._check_loop, daemon=True)
        self._check_thread.start()

        # Run Flask app
        # Using threaded=True for handling concurrent requests
        self.app.run(
            host=self.config.listen_host,
            port=self.config.listen_port,
            threaded=True,
            use_reloader=False  # Disable reloader for production
        )
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
main function · python · L1310-L1339 (30 LOC)
vps-monitor/monitor.py
def main():
    """Main entry point."""
    # Load configuration
    config_path = os.environ.get('CONFIG_PATH', 'config.yaml')
    config = Config.load(config_path)

    # Validate required configuration
    if not config.ntfy_topic:
        logger.warning(
            "NTFY_TOPIC not configured. Notifications will be disabled. "
            "Set via config.yaml or NTFY_TOPIC environment variable."
        )

    # Create and run monitor
    monitor = VPSMonitor(config)

    # Setup signal handlers
    def signal_handler(signum, frame):
        logger.info(f"Received signal {signum}, shutting down...")
        monitor.stop()
        sys.exit(0)

    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    try:
        monitor.run()
    except KeyboardInterrupt:
        logger.info("Keyboard interrupt received")
        monitor.stop()