← back to drbhatiasanjay__sajaag

Function bodies 140 total

All specs Real LLM only Function bodies
store method · python · L116-L121 (6 LOC)
backend/scrapers/aqicn_aqi.py
    async def store(self, data: list[dict]) -> None:
        if not data or supabase is None:
            return
        for reading in data:
            supabase.table("aqi_readings").insert(reading).execute()
        logger.info("aqicn_aqi: stored %d readings", len(data))
BaseScraper class · python · L14-L109 (96 LOC)
backend/scrapers/base.py
class BaseScraper(ABC):
    """Provides retry logic, exponential backoff, and run-logging.

    Subclasses must implement:
        - fetch()   -- retrieve raw data from the source
        - validate() -- clean / validate the raw data
        - store()   -- persist validated data to Supabase
    """

    name: str = "base"
    max_retries: int = 3

    async def run(self) -> bool:
        """Execute the full scrape cycle with retries.

        Returns True on success, False on exhausted retries.
        """
        last_error: Exception | None = None

        for attempt in range(1, self.max_retries + 1):
            try:
                raw = await self.fetch()
                validated = await self.validate(raw)
                await self.store(validated)
                await self.log_success()
                logger.info("%s: completed on attempt %d", self.name, attempt)
                return True
            except Exception as exc:
                last_error = exc
               
run method · python · L26-L55 (30 LOC)
backend/scrapers/base.py
    async def run(self) -> bool:
        """Execute the full scrape cycle with retries.

        Returns True on success, False on exhausted retries.
        """
        last_error: Exception | None = None

        for attempt in range(1, self.max_retries + 1):
            try:
                raw = await self.fetch()
                validated = await self.validate(raw)
                await self.store(validated)
                await self.log_success()
                logger.info("%s: completed on attempt %d", self.name, attempt)
                return True
            except Exception as exc:
                last_error = exc
                wait = 2 ** attempt  # 2, 4, 8 seconds
                logger.warning(
                    "%s: attempt %d failed (%s), retrying in %ds",
                    self.name,
                    attempt,
                    exc,
                    wait,
                )
                await asyncio.sleep(wait)

        await self.log_failure(str(last
fetch method · python · L62-L64 (3 LOC)
backend/scrapers/base.py
    async def fetch(self) -> Any:
        """Retrieve raw data from the external source."""
        ...
validate method · python · L67-L69 (3 LOC)
backend/scrapers/base.py
    async def validate(self, raw: Any) -> Any:
        """Clean and validate the raw data."""
        ...
store method · python · L72-L74 (3 LOC)
backend/scrapers/base.py
    async def store(self, data: Any) -> None:
        """Persist validated data to Supabase."""
        ...
log_success method · python · L80-L93 (14 LOC)
backend/scrapers/base.py
    async def log_success(self) -> None:
        """Record a successful scraper run in the scraper_runs table."""
        if supabase is None:
            return
        try:
            supabase.table("scraper_runs").insert(
                {
                    "scraper_name": self.name,
                    "status": "success",
                    "run_at": datetime.now(timezone.utc).isoformat(),
                }
            ).execute()
        except Exception as exc:
            logger.warning("Failed to log success for %s: %s", self.name, exc)
Open data scored by Repobility · https://repobility.com
log_failure method · python · L95-L109 (15 LOC)
backend/scrapers/base.py
    async def log_failure(self, error_message: str) -> None:
        """Record a failed scraper run in the scraper_runs table."""
        if supabase is None:
            return
        try:
            supabase.table("scraper_runs").insert(
                {
                    "scraper_name": self.name,
                    "status": "failure",
                    "error_message": error_message,
                    "run_at": datetime.now(timezone.utc).isoformat(),
                }
            ).execute()
        except Exception as exc:
            logger.warning("Failed to log failure for %s: %s", self.name, exc)
tag_areas function · python · L41-L44 (4 LOC)
backend/scrapers/community.py
def tag_areas(text):
    text_lower = text.lower()
    areas = [aid for aid, kws in AREA_KEYWORDS.items() if any(k in text_lower for k in kws)]
    return areas or ["pune"]
categorize function · python · L47-L53 (7 LOC)
backend/scrapers/community.py
def categorize(text):
    t = text.lower()
    if any(w in t for w in ["traffic", "road", "metro", "bus"]): return "Transport"
    if any(w in t for w in ["water", "power", "pmc", "msedcl"]): return "Essential"
    if any(w in t for w in ["food", "restaurant", "cafe"]): return "Food"
    if any(w in t for w in ["crime", "police", "theft"]): return "Safety"
    return "Community"
CommunityScraper class · python · L56-L128 (73 LOC)
backend/scrapers/community.py
class CommunityScraper(BaseScraper):
    name = "community"
    max_retries = 3

    async def fetch(self):
        results = []
        async with aiohttp.ClientSession() as session:
            for feed in COMMUNITY_FEEDS:
                url = feed["url"]
                try:
                    async with session.get(url, headers={"User-Agent": "Sajaag/0.1"}, timeout=aiohttp.ClientTimeout(total=15)) as resp:
                        if resp.status == 200:
                            results.append({"xml": await resp.text(), "config": feed})
                        elif feed["platform"] == "twitter" and "rsshub.app" in url:
                            for alt in RSSHUB_FALLBACKS:
                                try:
                                    alt_url = url.replace("https://rsshub.app", alt)
                                    async with session.get(alt_url, timeout=aiohttp.ClientTimeout(total=10)) as r:
                                        if r.status == 200:
            
fetch method · python · L60-L82 (23 LOC)
backend/scrapers/community.py
    async def fetch(self):
        results = []
        async with aiohttp.ClientSession() as session:
            for feed in COMMUNITY_FEEDS:
                url = feed["url"]
                try:
                    async with session.get(url, headers={"User-Agent": "Sajaag/0.1"}, timeout=aiohttp.ClientTimeout(total=15)) as resp:
                        if resp.status == 200:
                            results.append({"xml": await resp.text(), "config": feed})
                        elif feed["platform"] == "twitter" and "rsshub.app" in url:
                            for alt in RSSHUB_FALLBACKS:
                                try:
                                    alt_url = url.replace("https://rsshub.app", alt)
                                    async with session.get(alt_url, timeout=aiohttp.ClientTimeout(total=10)) as r:
                                        if r.status == 200:
                                            results.append({"xml": await r.text(), "config": 
validate method · python · L84-L118 (35 LOC)
backend/scrapers/community.py
    async def validate(self, raw):
        posts = []
        seen = set()
        now = datetime.now(timezone.utc).isoformat()
        for feed_data in raw:
            config = feed_data["config"]
            parsed = feedparser.parse(feed_data["xml"])
            for entry in parsed.entries[:15]:
                title = entry.get("title", "").strip()
                if not title:
                    continue
                pid = md5(f"{title}:{config['source']}".encode()).hexdigest()[:16]
                if pid in seen:
                    continue
                seen.add(pid)
                body = re.sub(r'<[^>]+>', '', entry.get("summary", "")).strip()[:500]
                pub = now
                if entry.get("published"):
                    try:
                        import email.utils
                        pub = email.utils.parsedate_to_datetime(entry["published"]).isoformat()
                    except Exception:
                        pass
                posts.app
store method · python · L120-L128 (9 LOC)
backend/scrapers/community.py
    async def store(self, data):
        if not data or supabase is None:
            return
        for post in data:
            try:
                supabase.table("community_posts").upsert(post, on_conflict="id").execute()
            except Exception as e:
                logger.warning("Failed to upsert community post: %s", e)
        logger.info("community: stored %d posts", len(data))
FuelPriceScraper class · python · L19-L97 (79 LOC)
backend/scrapers/fuel_prices.py
class FuelPriceScraper(BaseScraper):
    name = "fuel_prices"
    max_retries = 3

    async def fetch(self) -> Any:
        async with aiohttp.ClientSession() as session:
            headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
            async with session.get(FUEL_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
                resp.raise_for_status()
                return await resp.text()

    async def validate(self, raw: Any) -> list[dict]:
        soup = BeautifulSoup(raw, "lxml")
        now = datetime.now(timezone.utc).isoformat()
        today = date.today().isoformat()

        petrol = None
        diesel = None

        # Look for price values in the page
        # GoodReturns typically shows prices in a table or prominent divs
        price_pattern = re.compile(r'₹?\s*(\d+\.\d{2})')

        # Try table-based extraction
        tables = soup.find_all("table")
        for table in tables:
            tex
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
fetch method · python · L23-L28 (6 LOC)
backend/scrapers/fuel_prices.py
    async def fetch(self) -> Any:
        async with aiohttp.ClientSession() as session:
            headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
            async with session.get(FUEL_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
                resp.raise_for_status()
                return await resp.text()
validate method · python · L30-L90 (61 LOC)
backend/scrapers/fuel_prices.py
    async def validate(self, raw: Any) -> list[dict]:
        soup = BeautifulSoup(raw, "lxml")
        now = datetime.now(timezone.utc).isoformat()
        today = date.today().isoformat()

        petrol = None
        diesel = None

        # Look for price values in the page
        # GoodReturns typically shows prices in a table or prominent divs
        price_pattern = re.compile(r'₹?\s*(\d+\.\d{2})')

        # Try table-based extraction
        tables = soup.find_all("table")
        for table in tables:
            text = table.get_text().lower()
            if "petrol" in text or "diesel" in text:
                rows = table.find_all("tr")
                for row in rows:
                    cells = [c.get_text(strip=True) for c in row.find_all(["td", "th"])]
                    for i, cell in enumerate(cells):
                        if "petrol" in cell.lower():
                            for c in cells[i+1:]:
                                m = price_pattern.search(c)
   
store method · python · L92-L97 (6 LOC)
backend/scrapers/fuel_prices.py
    async def store(self, data: list[dict]) -> None:
        if not data or supabase is None:
            return
        for record in data:
            supabase.table("fuel_prices").insert(record).execute()
        logger.info("fuel_prices: stored %d records", len(data))
match_area function · python · L43-L50 (8 LOC)
backend/scrapers/msedcl_power.py
def match_area(text: str) -> str | None:
    """Match outage location text to an area ID."""
    text_lower = text.lower()
    for area_id, keywords in SUBSTATION_AREA_MAP.items():
        for kw in keywords:
            if kw in text_lower:
                return area_id
    return None
parse_time function · python · L53-L72 (20 LOC)
backend/scrapers/msedcl_power.py
def parse_time(time_str: str) -> str | None:
    """Try to parse various time formats from MSEDCL pages."""
    time_str = time_str.strip()

    patterns = [
        (r'(\d{1,2}):(\d{2})\s*(am|pm|AM|PM)', '%I:%M %p'),
        (r'(\d{1,2})\s*(am|pm|AM|PM)', '%I %p'),
        (r'(\d{1,2}):(\d{2})', '%H:%M'),
    ]

    for pattern, fmt in patterns:
        match = re.search(pattern, time_str)
        if match:
            try:
                t = datetime.strptime(match.group(0), fmt)
                return t.strftime('%H:%M')
            except ValueError:
                continue

    return None
MsedclPowerScraper class · python · L75-L291 (217 LOC)
backend/scrapers/msedcl_power.py
class MsedclPowerScraper(BaseScraper):
    name = "msedcl_power"
    max_retries = 3

    async def fetch(self) -> Any:
        """Fetch outage pages from MSEDCL."""
        results = {}

        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml",
        }

        async with aiohttp.ClientSession() as session:
            # Try planned outages page
            for label, url in [("planned", MSEDCL_PLANNED_URL), ("outage", MSEDCL_OUTAGE_URL)]:
                try:
                    async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                        if resp.status == 200:
                            results[label] = await resp.text()
                        else:
                            logger.warning("MSEDCL %s page returned %d", label, resp.status)
                except Exception as e:
     
fetch method · python · L79-L123 (45 LOC)
backend/scrapers/msedcl_power.py
    async def fetch(self) -> Any:
        """Fetch outage pages from MSEDCL."""
        results = {}

        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml",
        }

        async with aiohttp.ClientSession() as session:
            # Try planned outages page
            for label, url in [("planned", MSEDCL_PLANNED_URL), ("outage", MSEDCL_OUTAGE_URL)]:
                try:
                    async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                        if resp.status == 200:
                            results[label] = await resp.text()
                        else:
                            logger.warning("MSEDCL %s page returned %d", label, resp.status)
                except Exception as e:
                    logger.warning("Failed to fetch MSEDCL %s: %s", label, e)

            
validate method · python · L125-L281 (157 LOC)
backend/scrapers/msedcl_power.py
    async def validate(self, raw: Any) -> list[dict]:
        """Parse MSEDCL HTML/JSON into power outage records."""
        outages = []
        now = datetime.now(timezone.utc).isoformat()
        today = date.today().isoformat()
        seen = set()

        # Parse JSON API response if available
        if "api" in raw and isinstance(raw["api"], (list, dict)):
            api_data = raw["api"] if isinstance(raw["api"], list) else raw["api"].get("data", [])
            for item in api_data:
                area_text = item.get("area", item.get("location", ""))
                area_id = match_area(area_text)
                if not area_id:
                    continue

                key = f"{area_id}:{item.get('start', '')}:{item.get('end', '')}"
                if key in seen:
                    continue
                seen.add(key)

                outages.append({
                    "area_id": area_id,
                    "date": item.get("date", today),
                    
All rows above produced by Repobility · https://repobility.com
store method · python · L283-L291 (9 LOC)
backend/scrapers/msedcl_power.py
    async def store(self, data: list[dict]) -> None:
        """Insert power outage records into Supabase."""
        if not data or supabase is None:
            return

        for record in data:
            supabase.table("power_outages").insert(record).execute()

        logger.info("msedcl_power: stored %d records", len(data))
detect_severity function · python · L48-L53 (6 LOC)
backend/scrapers/ndma_alerts.py
def detect_severity(text: str) -> str:
    text_lower = text.lower()
    for level, keywords in SEVERITY_KEYWORDS.items():
        if any(kw in text_lower for kw in keywords):
            return level
    return "info"
tag_areas function · python · L56-L62 (7 LOC)
backend/scrapers/ndma_alerts.py
def tag_areas(text: str) -> list[str]:
    text_lower = text.lower()
    areas = []
    for area_id, keywords in AREA_KEYWORDS.items():
        if any(kw in text_lower for kw in keywords):
            areas.append(area_id)
    return areas if areas else ["pune"]
NdmaAlertsScraper class · python · L65-L134 (70 LOC)
backend/scrapers/ndma_alerts.py
class NdmaAlertsScraper(BaseScraper):
    name = "ndma_alerts"
    max_retries = 3

    async def fetch(self) -> Any:
        results = []
        async with aiohttp.ClientSession() as session:
            for feed in NDMA_FEEDS:
                try:
                    async with session.get(
                        feed["url"],
                        headers={"User-Agent": "Sajaag/0.1"},
                        timeout=aiohttp.ClientTimeout(total=15),
                    ) as resp:
                        if resp.status == 200:
                            results.append({"xml": await resp.text(), "source": feed["source"]})
                except Exception as e:
                    logger.warning("NDMA feed %s failed: %s", feed["source"], e)
        return results

    async def validate(self, raw: Any) -> list[dict]:
        alerts = []
        seen = set()
        now = datetime.now(timezone.utc).isoformat()

        for feed_data in raw:
            parsed = feedparser.parse(feed_
fetch method · python · L69-L83 (15 LOC)
backend/scrapers/ndma_alerts.py
    async def fetch(self) -> Any:
        results = []
        async with aiohttp.ClientSession() as session:
            for feed in NDMA_FEEDS:
                try:
                    async with session.get(
                        feed["url"],
                        headers={"User-Agent": "Sajaag/0.1"},
                        timeout=aiohttp.ClientTimeout(total=15),
                    ) as resp:
                        if resp.status == 200:
                            results.append({"xml": await resp.text(), "source": feed["source"]})
                except Exception as e:
                    logger.warning("NDMA feed %s failed: %s", feed["source"], e)
        return results
validate method · python · L85-L124 (40 LOC)
backend/scrapers/ndma_alerts.py
    async def validate(self, raw: Any) -> list[dict]:
        alerts = []
        seen = set()
        now = datetime.now(timezone.utc).isoformat()

        for feed_data in raw:
            parsed = feedparser.parse(feed_data["xml"])
            for entry in parsed.entries[:20]:
                title = entry.get("title", "").strip()
                if not title:
                    continue

                # Filter for Pune/Maharashtra relevance
                full_text = f"{title} {entry.get('summary', '')}"
                if not any(kw in full_text.lower() for kw in ["pune", "maharashtra", "western maharashtra", "deccan"]):
                    continue

                alert_id = md5(f"{title}:{feed_data['source']}".encode()).hexdigest()[:16]
                if alert_id in seen:
                    continue
                seen.add(alert_id)

                import re
                summary = re.sub(r'<[^>]+>', '', entry.get("summary", "")).strip()[:300]

                alerts.
store method · python · L126-L134 (9 LOC)
backend/scrapers/ndma_alerts.py
    async def store(self, data: list[dict]) -> None:
        if not data or supabase is None:
            return
        for alert in data:
            try:
                supabase.table("emergency_alerts").upsert(alert, on_conflict="id").execute()
            except Exception as e:
                logger.warning("Failed to upsert alert: %s", e)
        logger.info("ndma_alerts: stored %d alerts", len(data))
generate_news_id function · python · L207-L209 (3 LOC)
backend/scrapers/news_rss.py
def generate_news_id(title: str, source: str) -> str:
    """Generate a deterministic ID to avoid duplicate inserts."""
    return md5(f"{title}:{source}".encode()).hexdigest()[:16]
Repobility · code-quality intelligence · https://repobility.com
tag_areas function · python · L212-L222 (11 LOC)
backend/scrapers/news_rss.py
def tag_areas(text: str) -> list[str]:
    """Tag a news item with matching area IDs based on keyword matching."""
    text_lower = text.lower()
    matched = []
    for area_id, keywords in AREA_KEYWORDS.items():
        for kw in keywords:
            if kw.lower() in text_lower:
                matched.append(area_id)
                break
    # If no specific area matched, tag as city-wide
    return matched if matched else ["pune"]
tag_category function · python · L225-L239 (15 LOC)
backend/scrapers/news_rss.py
def tag_category(text: str, bias: str | None = None) -> str:
    """Categorize news item based on keywords."""
    if bias:
        return bias

    text_lower = text.lower()
    scores: dict[str, int] = {}
    for cat, keywords in CATEGORY_KEYWORDS.items():
        score = sum(1 for kw in keywords if kw in text_lower)
        if score > 0:
            scores[cat] = score

    if scores:
        return max(scores, key=scores.get)
    return "Essential"  # Default category
NewsRssScraper class · python · L242-L345 (104 LOC)
backend/scrapers/news_rss.py
class NewsRssScraper(BaseScraper):
    name = "news_rss"
    max_retries = 3

    async def fetch(self) -> Any:
        """Fetch all RSS feeds concurrently."""
        results = []

        async with aiohttp.ClientSession() as session:
            for feed_config in RSS_FEEDS:
                try:
                    async with session.get(
                        feed_config["url"],
                        headers={"User-Agent": "Sajaag/0.1 RSS Reader"},
                        timeout=aiohttp.ClientTimeout(total=15),
                    ) as resp:
                        if resp.status == 200:
                            text = await resp.text()
                            results.append({
                                "xml": text,
                                "config": feed_config,
                            })
                        else:
                            logger.warning("RSS feed %s returned %d", feed_config["source"], resp.status)
                except Exceptio
fetch method · python · L246-L270 (25 LOC)
backend/scrapers/news_rss.py
    async def fetch(self) -> Any:
        """Fetch all RSS feeds concurrently."""
        results = []

        async with aiohttp.ClientSession() as session:
            for feed_config in RSS_FEEDS:
                try:
                    async with session.get(
                        feed_config["url"],
                        headers={"User-Agent": "Sajaag/0.1 RSS Reader"},
                        timeout=aiohttp.ClientTimeout(total=15),
                    ) as resp:
                        if resp.status == 200:
                            text = await resp.text()
                            results.append({
                                "xml": text,
                                "config": feed_config,
                            })
                        else:
                            logger.warning("RSS feed %s returned %d", feed_config["source"], resp.status)
                except Exception as e:
                    logger.warning("Failed to fetch RSS from %s: %s", 
validate method · python · L272-L332 (61 LOC)
backend/scrapers/news_rss.py
    async def validate(self, raw: Any) -> list[dict]:
        """Parse RSS XML into news items."""
        if not raw:
            return []

        items = []
        seen_titles: set[str] = set()
        now = datetime.now(timezone.utc).isoformat()

        for feed_data in raw:
            config = feed_data["config"]
            parsed = feedparser.parse(feed_data["xml"])

            for entry in parsed.entries[:20]:  # Max 20 per feed
                title = entry.get("title", "").strip()
                if not title or title in seen_titles:
                    continue
                seen_titles.add(title)

                summary = entry.get("summary", entry.get("description", ""))
                # Strip HTML tags from summary
                summary = re.sub(r'<[^>]+>', '', summary).strip()
                # Truncate to 300 chars
                if len(summary) > 300:
                    summary = summary[:297] + "..."

                link = entry.get("link", "")
         
store method · python · L334-L345 (12 LOC)
backend/scrapers/news_rss.py
    async def store(self, data: list[dict]) -> None:
        """Upsert news items into Supabase."""
        if not data or supabase is None:
            return

        for item in data:
            try:
                supabase.table("news_items").upsert(item, on_conflict="id").execute()
            except Exception as e:
                logger.warning("Failed to upsert news item '%s': %s", item.get("title", "?")[:50], e)

        logger.info("news_rss: stored %d items", len(data))
match_area function · python · L107-L125 (19 LOC)
backend/scrapers/pmc_water.py
def match_area(text: str) -> list[str]:
    """Match text to area IDs."""
    text_lower = text.lower()
    matched = []

    # Check zone mapping
    for zone, areas in ZONE_AREA_MAP.items():
        if zone in text_lower:
            matched.extend(areas)

    # Check keyword mapping
    for area_id, keywords in WATER_AREA_KEYWORDS.items():
        for kw in keywords:
            if kw in text_lower:
                if area_id not in matched:
                    matched.append(area_id)
                break

    return matched
extract_times function · python · L128-L154 (27 LOC)
backend/scrapers/pmc_water.py
def extract_times(text: str) -> tuple[str | None, str | None]:
    """Extract start and end times from text."""
    times = re.findall(r'(\d{1,2}[:.]\d{2}\s*(?:am|pm|AM|PM)?)', text)
    if not times:
        times = re.findall(r'(\d{1,2}\s*(?:am|pm|AM|PM))', text)

    start = None
    end = None

    for t in times[:2]:
        t = t.strip().replace('.', ':')
        try:
            # Try 12-hour format
            if re.search(r'(am|pm)', t, re.I):
                parsed = datetime.strptime(t.upper().replace(' ', ''), '%I:%M%p' if ':' in t else '%I%p')
            else:
                parsed = datetime.strptime(t, '%H:%M')

            formatted = parsed.strftime('%H:%M')
            if start is None:
                start = formatted
            else:
                end = formatted
        except ValueError:
            continue

    return start, end
Open data scored by Repobility · https://repobility.com
PmcWaterScraper class · python · L157-L303 (147 LOC)
backend/scrapers/pmc_water.py
class PmcWaterScraper(BaseScraper):
    name = "pmc_water"
    max_retries = 3

    async def fetch(self) -> Any:
        """Fetch water schedule pages from PMC."""
        results = {}

        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml",
        }

        async with aiohttp.ClientSession() as session:
            for url in PMC_WATER_URLS:
                try:
                    async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30), allow_redirects=True) as resp:
                        if resp.status == 200:
                            results[url] = await resp.text()
                except Exception as e:
                    logger.warning("Failed to fetch PMC page %s: %s", url, e)

            # Also try Google search for recent PMC water notices
            try:
                google_url = "https://news.
fetch method · python · L161-L188 (28 LOC)
backend/scrapers/pmc_water.py
    async def fetch(self) -> Any:
        """Fetch water schedule pages from PMC."""
        results = {}

        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
            "Accept": "text/html,application/xhtml+xml",
        }

        async with aiohttp.ClientSession() as session:
            for url in PMC_WATER_URLS:
                try:
                    async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30), allow_redirects=True) as resp:
                        if resp.status == 200:
                            results[url] = await resp.text()
                except Exception as e:
                    logger.warning("Failed to fetch PMC page %s: %s", url, e)

            # Also try Google search for recent PMC water notices
            try:
                google_url = "https://news.google.com/rss/search?q=pune+pmc+water+supply+schedule+today&hl=en-IN&gl=IN"
   
validate method · python · L190-L293 (104 LOC)
backend/scrapers/pmc_water.py
    async def validate(self, raw: Any) -> list[dict]:
        """Parse water schedule data."""
        schedules = []
        now = datetime.now(timezone.utc).isoformat()
        today = date.today().isoformat()
        found_areas: set[str] = set()

        for source_url, html in raw.items():
            if source_url == "google_news":
                # Parse RSS for water disruption news
                import feedparser
                feed = feedparser.parse(html)
                for entry in feed.entries[:5]:
                    title = entry.get("title", "")
                    areas = match_area(title)
                    for area_id in areas:
                        if area_id in found_areas:
                            continue
                        found_areas.add(area_id)
                        # If news mentions disruption, mark with altered schedule
                        if any(kw in title.lower() for kw in ["cut", "disrupt", "no water", "shortage"]):
               
store method · python · L295-L303 (9 LOC)
backend/scrapers/pmc_water.py
    async def store(self, data: list[dict]) -> None:
        """Insert water schedules into Supabase."""
        if not data or supabase is None:
            return

        for record in data:
            supabase.table("water_schedules").insert(record).execute()

        logger.info("pmc_water: stored %d schedules", len(data))
categorize_aqi function · python · L48-L54 (7 LOC)
backend/scrapers/safar_aqi.py
def categorize_aqi(aqi: int) -> str:
    if aqi <= 50: return "Good"
    if aqi <= 100: return "Satisfactory"
    if aqi <= 200: return "Moderate"
    if aqi <= 300: return "Poor"
    if aqi <= 400: return "Very Poor"
    return "Severe"
SafarAqiScraper class · python · L57-L181 (125 LOC)
backend/scrapers/safar_aqi.py
class SafarAqiScraper(BaseScraper):
    name = "safar_aqi"
    max_retries = 3

    async def fetch(self) -> Any:
        """Fetch AQI data from SAFAR.

        Tries the JSON API first, falls back to scraping the HTML page.
        """
        url = f"{settings.SAFAR_BASE_URL}/api/aqi-data"

        ssl_ctx = ssl.create_default_context(cafile=certifi.where())
        ssl_ctx.check_hostname = False
        ssl_ctx.verify_mode = ssl.CERT_NONE

        conn = aiohttp.TCPConnector(ssl=ssl_ctx)
        async with aiohttp.ClientSession(connector=conn) as session:
            # Try JSON API endpoint
            try:
                headers = {
                    "Accept": "application/json",
                    "User-Agent": "Sajaag/0.1 (City Intelligence Dashboard)",
                    "Referer": settings.SAFAR_BASE_URL,
                }
                async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                    if resp.status == 200
fetch method · python · L61-L97 (37 LOC)
backend/scrapers/safar_aqi.py
    async def fetch(self) -> Any:
        """Fetch AQI data from SAFAR.

        Tries the JSON API first, falls back to scraping the HTML page.
        """
        url = f"{settings.SAFAR_BASE_URL}/api/aqi-data"

        ssl_ctx = ssl.create_default_context(cafile=certifi.where())
        ssl_ctx.check_hostname = False
        ssl_ctx.verify_mode = ssl.CERT_NONE

        conn = aiohttp.TCPConnector(ssl=ssl_ctx)
        async with aiohttp.ClientSession(connector=conn) as session:
            # Try JSON API endpoint
            try:
                headers = {
                    "Accept": "application/json",
                    "User-Agent": "Sajaag/0.1 (City Intelligence Dashboard)",
                    "Referer": settings.SAFAR_BASE_URL,
                }
                async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                    if resp.status == 200:
                        content_type = resp.headers.get("Content-Type", "")
  
validate method · python · L99-L171 (73 LOC)
backend/scrapers/safar_aqi.py
    async def validate(self, raw: Any) -> list[dict]:
        """Parse and validate AQI readings."""
        readings = []
        now = datetime.now(timezone.utc).isoformat()

        if isinstance(raw, dict) and raw.get("type") == "html_fallback":
            # HTML fallback: parse with BeautifulSoup
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(raw["html"], "lxml")

            # Look for AQI values in the page
            # SAFAR pages typically show station-wise AQI in card elements
            aqi_elements = soup.select("[class*='aqi'], [class*='station'], .city-data")

            if not aqi_elements:
                # If we can't parse specific elements, try to find AQI numbers
                # Look for patterns like "AQI: 142" or numeric values near station names
                import re
                text = soup.get_text()

                # Try to extract any AQI value for Pune
                aqi_match = re.search(r'(?:AQI|aqi)[:\s]*(\d+)', 
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
store method · python · L173-L181 (9 LOC)
backend/scrapers/safar_aqi.py
    async def store(self, data: list[dict]) -> None:
        """Upsert AQI readings into Supabase."""
        if not data or supabase is None:
            return

        for reading in data:
            supabase.table("aqi_readings").insert(reading).execute()

        logger.info("safar_aqi: stored %d readings", len(data))
match_area function · python · L83-L90 (8 LOC)
backend/scrapers/traffic.py
def match_area(text: str) -> str | None:
    """Match traffic text to the most relevant area."""
    text_lower = text.lower()
    for area_id, keywords in ROAD_AREA_MAP.items():
        for kw in keywords:
            if kw in text_lower:
                return area_id
    return None
detect_severity function · python · L93-L100 (8 LOC)
backend/scrapers/traffic.py
def detect_severity(text: str) -> str:
    """Classify traffic incident severity."""
    text_lower = text.lower()
    for level, keywords in SEVERITY_MAP.items():
        for kw in keywords:
            if kw in text_lower:
                return level
    return "low"
‹ prevpage 2 / 3next ›