Function bodies 140 total
store method · python · L116-L121 (6 LOC)backend/scrapers/aqicn_aqi.py
async def store(self, data: list[dict]) -> None:
if not data or supabase is None:
return
for reading in data:
supabase.table("aqi_readings").insert(reading).execute()
logger.info("aqicn_aqi: stored %d readings", len(data))BaseScraper class · python · L14-L109 (96 LOC)backend/scrapers/base.py
class BaseScraper(ABC):
"""Provides retry logic, exponential backoff, and run-logging.
Subclasses must implement:
- fetch() -- retrieve raw data from the source
- validate() -- clean / validate the raw data
- store() -- persist validated data to Supabase
"""
name: str = "base"
max_retries: int = 3
async def run(self) -> bool:
"""Execute the full scrape cycle with retries.
Returns True on success, False on exhausted retries.
"""
last_error: Exception | None = None
for attempt in range(1, self.max_retries + 1):
try:
raw = await self.fetch()
validated = await self.validate(raw)
await self.store(validated)
await self.log_success()
logger.info("%s: completed on attempt %d", self.name, attempt)
return True
except Exception as exc:
last_error = exc
run method · python · L26-L55 (30 LOC)backend/scrapers/base.py
async def run(self) -> bool:
"""Execute the full scrape cycle with retries.
Returns True on success, False on exhausted retries.
"""
last_error: Exception | None = None
for attempt in range(1, self.max_retries + 1):
try:
raw = await self.fetch()
validated = await self.validate(raw)
await self.store(validated)
await self.log_success()
logger.info("%s: completed on attempt %d", self.name, attempt)
return True
except Exception as exc:
last_error = exc
wait = 2 ** attempt # 2, 4, 8 seconds
logger.warning(
"%s: attempt %d failed (%s), retrying in %ds",
self.name,
attempt,
exc,
wait,
)
await asyncio.sleep(wait)
await self.log_failure(str(lastfetch method · python · L62-L64 (3 LOC)backend/scrapers/base.py
async def fetch(self) -> Any:
"""Retrieve raw data from the external source."""
...validate method · python · L67-L69 (3 LOC)backend/scrapers/base.py
async def validate(self, raw: Any) -> Any:
"""Clean and validate the raw data."""
...store method · python · L72-L74 (3 LOC)backend/scrapers/base.py
async def store(self, data: Any) -> None:
"""Persist validated data to Supabase."""
...log_success method · python · L80-L93 (14 LOC)backend/scrapers/base.py
async def log_success(self) -> None:
"""Record a successful scraper run in the scraper_runs table."""
if supabase is None:
return
try:
supabase.table("scraper_runs").insert(
{
"scraper_name": self.name,
"status": "success",
"run_at": datetime.now(timezone.utc).isoformat(),
}
).execute()
except Exception as exc:
logger.warning("Failed to log success for %s: %s", self.name, exc)Open data scored by Repobility · https://repobility.com
log_failure method · python · L95-L109 (15 LOC)backend/scrapers/base.py
async def log_failure(self, error_message: str) -> None:
"""Record a failed scraper run in the scraper_runs table."""
if supabase is None:
return
try:
supabase.table("scraper_runs").insert(
{
"scraper_name": self.name,
"status": "failure",
"error_message": error_message,
"run_at": datetime.now(timezone.utc).isoformat(),
}
).execute()
except Exception as exc:
logger.warning("Failed to log failure for %s: %s", self.name, exc)tag_areas function · python · L41-L44 (4 LOC)backend/scrapers/community.py
def tag_areas(text):
text_lower = text.lower()
areas = [aid for aid, kws in AREA_KEYWORDS.items() if any(k in text_lower for k in kws)]
return areas or ["pune"]categorize function · python · L47-L53 (7 LOC)backend/scrapers/community.py
def categorize(text):
t = text.lower()
if any(w in t for w in ["traffic", "road", "metro", "bus"]): return "Transport"
if any(w in t for w in ["water", "power", "pmc", "msedcl"]): return "Essential"
if any(w in t for w in ["food", "restaurant", "cafe"]): return "Food"
if any(w in t for w in ["crime", "police", "theft"]): return "Safety"
return "Community"CommunityScraper class · python · L56-L128 (73 LOC)backend/scrapers/community.py
class CommunityScraper(BaseScraper):
name = "community"
max_retries = 3
async def fetch(self):
results = []
async with aiohttp.ClientSession() as session:
for feed in COMMUNITY_FEEDS:
url = feed["url"]
try:
async with session.get(url, headers={"User-Agent": "Sajaag/0.1"}, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
results.append({"xml": await resp.text(), "config": feed})
elif feed["platform"] == "twitter" and "rsshub.app" in url:
for alt in RSSHUB_FALLBACKS:
try:
alt_url = url.replace("https://rsshub.app", alt)
async with session.get(alt_url, timeout=aiohttp.ClientTimeout(total=10)) as r:
if r.status == 200:
fetch method · python · L60-L82 (23 LOC)backend/scrapers/community.py
async def fetch(self):
results = []
async with aiohttp.ClientSession() as session:
for feed in COMMUNITY_FEEDS:
url = feed["url"]
try:
async with session.get(url, headers={"User-Agent": "Sajaag/0.1"}, timeout=aiohttp.ClientTimeout(total=15)) as resp:
if resp.status == 200:
results.append({"xml": await resp.text(), "config": feed})
elif feed["platform"] == "twitter" and "rsshub.app" in url:
for alt in RSSHUB_FALLBACKS:
try:
alt_url = url.replace("https://rsshub.app", alt)
async with session.get(alt_url, timeout=aiohttp.ClientTimeout(total=10)) as r:
if r.status == 200:
results.append({"xml": await r.text(), "config": validate method · python · L84-L118 (35 LOC)backend/scrapers/community.py
async def validate(self, raw):
posts = []
seen = set()
now = datetime.now(timezone.utc).isoformat()
for feed_data in raw:
config = feed_data["config"]
parsed = feedparser.parse(feed_data["xml"])
for entry in parsed.entries[:15]:
title = entry.get("title", "").strip()
if not title:
continue
pid = md5(f"{title}:{config['source']}".encode()).hexdigest()[:16]
if pid in seen:
continue
seen.add(pid)
body = re.sub(r'<[^>]+>', '', entry.get("summary", "")).strip()[:500]
pub = now
if entry.get("published"):
try:
import email.utils
pub = email.utils.parsedate_to_datetime(entry["published"]).isoformat()
except Exception:
pass
posts.appstore method · python · L120-L128 (9 LOC)backend/scrapers/community.py
async def store(self, data):
if not data or supabase is None:
return
for post in data:
try:
supabase.table("community_posts").upsert(post, on_conflict="id").execute()
except Exception as e:
logger.warning("Failed to upsert community post: %s", e)
logger.info("community: stored %d posts", len(data))FuelPriceScraper class · python · L19-L97 (79 LOC)backend/scrapers/fuel_prices.py
class FuelPriceScraper(BaseScraper):
name = "fuel_prices"
max_retries = 3
async def fetch(self) -> Any:
async with aiohttp.ClientSession() as session:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
async with session.get(FUEL_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
resp.raise_for_status()
return await resp.text()
async def validate(self, raw: Any) -> list[dict]:
soup = BeautifulSoup(raw, "lxml")
now = datetime.now(timezone.utc).isoformat()
today = date.today().isoformat()
petrol = None
diesel = None
# Look for price values in the page
# GoodReturns typically shows prices in a table or prominent divs
price_pattern = re.compile(r'₹?\s*(\d+\.\d{2})')
# Try table-based extraction
tables = soup.find_all("table")
for table in tables:
texProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
fetch method · python · L23-L28 (6 LOC)backend/scrapers/fuel_prices.py
async def fetch(self) -> Any:
async with aiohttp.ClientSession() as session:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
async with session.get(FUEL_URL, headers=headers, timeout=aiohttp.ClientTimeout(total=15)) as resp:
resp.raise_for_status()
return await resp.text()validate method · python · L30-L90 (61 LOC)backend/scrapers/fuel_prices.py
async def validate(self, raw: Any) -> list[dict]:
soup = BeautifulSoup(raw, "lxml")
now = datetime.now(timezone.utc).isoformat()
today = date.today().isoformat()
petrol = None
diesel = None
# Look for price values in the page
# GoodReturns typically shows prices in a table or prominent divs
price_pattern = re.compile(r'₹?\s*(\d+\.\d{2})')
# Try table-based extraction
tables = soup.find_all("table")
for table in tables:
text = table.get_text().lower()
if "petrol" in text or "diesel" in text:
rows = table.find_all("tr")
for row in rows:
cells = [c.get_text(strip=True) for c in row.find_all(["td", "th"])]
for i, cell in enumerate(cells):
if "petrol" in cell.lower():
for c in cells[i+1:]:
m = price_pattern.search(c)
store method · python · L92-L97 (6 LOC)backend/scrapers/fuel_prices.py
async def store(self, data: list[dict]) -> None:
if not data or supabase is None:
return
for record in data:
supabase.table("fuel_prices").insert(record).execute()
logger.info("fuel_prices: stored %d records", len(data))match_area function · python · L43-L50 (8 LOC)backend/scrapers/msedcl_power.py
def match_area(text: str) -> str | None:
"""Match outage location text to an area ID."""
text_lower = text.lower()
for area_id, keywords in SUBSTATION_AREA_MAP.items():
for kw in keywords:
if kw in text_lower:
return area_id
return Noneparse_time function · python · L53-L72 (20 LOC)backend/scrapers/msedcl_power.py
def parse_time(time_str: str) -> str | None:
"""Try to parse various time formats from MSEDCL pages."""
time_str = time_str.strip()
patterns = [
(r'(\d{1,2}):(\d{2})\s*(am|pm|AM|PM)', '%I:%M %p'),
(r'(\d{1,2})\s*(am|pm|AM|PM)', '%I %p'),
(r'(\d{1,2}):(\d{2})', '%H:%M'),
]
for pattern, fmt in patterns:
match = re.search(pattern, time_str)
if match:
try:
t = datetime.strptime(match.group(0), fmt)
return t.strftime('%H:%M')
except ValueError:
continue
return NoneMsedclPowerScraper class · python · L75-L291 (217 LOC)backend/scrapers/msedcl_power.py
class MsedclPowerScraper(BaseScraper):
name = "msedcl_power"
max_retries = 3
async def fetch(self) -> Any:
"""Fetch outage pages from MSEDCL."""
results = {}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
}
async with aiohttp.ClientSession() as session:
# Try planned outages page
for label, url in [("planned", MSEDCL_PLANNED_URL), ("outage", MSEDCL_OUTAGE_URL)]:
try:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status == 200:
results[label] = await resp.text()
else:
logger.warning("MSEDCL %s page returned %d", label, resp.status)
except Exception as e:
fetch method · python · L79-L123 (45 LOC)backend/scrapers/msedcl_power.py
async def fetch(self) -> Any:
"""Fetch outage pages from MSEDCL."""
results = {}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
}
async with aiohttp.ClientSession() as session:
# Try planned outages page
for label, url in [("planned", MSEDCL_PLANNED_URL), ("outage", MSEDCL_OUTAGE_URL)]:
try:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status == 200:
results[label] = await resp.text()
else:
logger.warning("MSEDCL %s page returned %d", label, resp.status)
except Exception as e:
logger.warning("Failed to fetch MSEDCL %s: %s", label, e)
validate method · python · L125-L281 (157 LOC)backend/scrapers/msedcl_power.py
async def validate(self, raw: Any) -> list[dict]:
"""Parse MSEDCL HTML/JSON into power outage records."""
outages = []
now = datetime.now(timezone.utc).isoformat()
today = date.today().isoformat()
seen = set()
# Parse JSON API response if available
if "api" in raw and isinstance(raw["api"], (list, dict)):
api_data = raw["api"] if isinstance(raw["api"], list) else raw["api"].get("data", [])
for item in api_data:
area_text = item.get("area", item.get("location", ""))
area_id = match_area(area_text)
if not area_id:
continue
key = f"{area_id}:{item.get('start', '')}:{item.get('end', '')}"
if key in seen:
continue
seen.add(key)
outages.append({
"area_id": area_id,
"date": item.get("date", today),
All rows above produced by Repobility · https://repobility.com
store method · python · L283-L291 (9 LOC)backend/scrapers/msedcl_power.py
async def store(self, data: list[dict]) -> None:
"""Insert power outage records into Supabase."""
if not data or supabase is None:
return
for record in data:
supabase.table("power_outages").insert(record).execute()
logger.info("msedcl_power: stored %d records", len(data))detect_severity function · python · L48-L53 (6 LOC)backend/scrapers/ndma_alerts.py
def detect_severity(text: str) -> str:
text_lower = text.lower()
for level, keywords in SEVERITY_KEYWORDS.items():
if any(kw in text_lower for kw in keywords):
return level
return "info"tag_areas function · python · L56-L62 (7 LOC)backend/scrapers/ndma_alerts.py
def tag_areas(text: str) -> list[str]:
text_lower = text.lower()
areas = []
for area_id, keywords in AREA_KEYWORDS.items():
if any(kw in text_lower for kw in keywords):
areas.append(area_id)
return areas if areas else ["pune"]NdmaAlertsScraper class · python · L65-L134 (70 LOC)backend/scrapers/ndma_alerts.py
class NdmaAlertsScraper(BaseScraper):
name = "ndma_alerts"
max_retries = 3
async def fetch(self) -> Any:
results = []
async with aiohttp.ClientSession() as session:
for feed in NDMA_FEEDS:
try:
async with session.get(
feed["url"],
headers={"User-Agent": "Sajaag/0.1"},
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status == 200:
results.append({"xml": await resp.text(), "source": feed["source"]})
except Exception as e:
logger.warning("NDMA feed %s failed: %s", feed["source"], e)
return results
async def validate(self, raw: Any) -> list[dict]:
alerts = []
seen = set()
now = datetime.now(timezone.utc).isoformat()
for feed_data in raw:
parsed = feedparser.parse(feed_fetch method · python · L69-L83 (15 LOC)backend/scrapers/ndma_alerts.py
async def fetch(self) -> Any:
results = []
async with aiohttp.ClientSession() as session:
for feed in NDMA_FEEDS:
try:
async with session.get(
feed["url"],
headers={"User-Agent": "Sajaag/0.1"},
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status == 200:
results.append({"xml": await resp.text(), "source": feed["source"]})
except Exception as e:
logger.warning("NDMA feed %s failed: %s", feed["source"], e)
return resultsvalidate method · python · L85-L124 (40 LOC)backend/scrapers/ndma_alerts.py
async def validate(self, raw: Any) -> list[dict]:
alerts = []
seen = set()
now = datetime.now(timezone.utc).isoformat()
for feed_data in raw:
parsed = feedparser.parse(feed_data["xml"])
for entry in parsed.entries[:20]:
title = entry.get("title", "").strip()
if not title:
continue
# Filter for Pune/Maharashtra relevance
full_text = f"{title} {entry.get('summary', '')}"
if not any(kw in full_text.lower() for kw in ["pune", "maharashtra", "western maharashtra", "deccan"]):
continue
alert_id = md5(f"{title}:{feed_data['source']}".encode()).hexdigest()[:16]
if alert_id in seen:
continue
seen.add(alert_id)
import re
summary = re.sub(r'<[^>]+>', '', entry.get("summary", "")).strip()[:300]
alerts.store method · python · L126-L134 (9 LOC)backend/scrapers/ndma_alerts.py
async def store(self, data: list[dict]) -> None:
if not data or supabase is None:
return
for alert in data:
try:
supabase.table("emergency_alerts").upsert(alert, on_conflict="id").execute()
except Exception as e:
logger.warning("Failed to upsert alert: %s", e)
logger.info("ndma_alerts: stored %d alerts", len(data))generate_news_id function · python · L207-L209 (3 LOC)backend/scrapers/news_rss.py
def generate_news_id(title: str, source: str) -> str:
"""Generate a deterministic ID to avoid duplicate inserts."""
return md5(f"{title}:{source}".encode()).hexdigest()[:16]Repobility · code-quality intelligence · https://repobility.com
tag_areas function · python · L212-L222 (11 LOC)backend/scrapers/news_rss.py
def tag_areas(text: str) -> list[str]:
"""Tag a news item with matching area IDs based on keyword matching."""
text_lower = text.lower()
matched = []
for area_id, keywords in AREA_KEYWORDS.items():
for kw in keywords:
if kw.lower() in text_lower:
matched.append(area_id)
break
# If no specific area matched, tag as city-wide
return matched if matched else ["pune"]tag_category function · python · L225-L239 (15 LOC)backend/scrapers/news_rss.py
def tag_category(text: str, bias: str | None = None) -> str:
"""Categorize news item based on keywords."""
if bias:
return bias
text_lower = text.lower()
scores: dict[str, int] = {}
for cat, keywords in CATEGORY_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in text_lower)
if score > 0:
scores[cat] = score
if scores:
return max(scores, key=scores.get)
return "Essential" # Default categoryNewsRssScraper class · python · L242-L345 (104 LOC)backend/scrapers/news_rss.py
class NewsRssScraper(BaseScraper):
name = "news_rss"
max_retries = 3
async def fetch(self) -> Any:
"""Fetch all RSS feeds concurrently."""
results = []
async with aiohttp.ClientSession() as session:
for feed_config in RSS_FEEDS:
try:
async with session.get(
feed_config["url"],
headers={"User-Agent": "Sajaag/0.1 RSS Reader"},
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status == 200:
text = await resp.text()
results.append({
"xml": text,
"config": feed_config,
})
else:
logger.warning("RSS feed %s returned %d", feed_config["source"], resp.status)
except Exceptiofetch method · python · L246-L270 (25 LOC)backend/scrapers/news_rss.py
async def fetch(self) -> Any:
"""Fetch all RSS feeds concurrently."""
results = []
async with aiohttp.ClientSession() as session:
for feed_config in RSS_FEEDS:
try:
async with session.get(
feed_config["url"],
headers={"User-Agent": "Sajaag/0.1 RSS Reader"},
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status == 200:
text = await resp.text()
results.append({
"xml": text,
"config": feed_config,
})
else:
logger.warning("RSS feed %s returned %d", feed_config["source"], resp.status)
except Exception as e:
logger.warning("Failed to fetch RSS from %s: %s", validate method · python · L272-L332 (61 LOC)backend/scrapers/news_rss.py
async def validate(self, raw: Any) -> list[dict]:
"""Parse RSS XML into news items."""
if not raw:
return []
items = []
seen_titles: set[str] = set()
now = datetime.now(timezone.utc).isoformat()
for feed_data in raw:
config = feed_data["config"]
parsed = feedparser.parse(feed_data["xml"])
for entry in parsed.entries[:20]: # Max 20 per feed
title = entry.get("title", "").strip()
if not title or title in seen_titles:
continue
seen_titles.add(title)
summary = entry.get("summary", entry.get("description", ""))
# Strip HTML tags from summary
summary = re.sub(r'<[^>]+>', '', summary).strip()
# Truncate to 300 chars
if len(summary) > 300:
summary = summary[:297] + "..."
link = entry.get("link", "")
store method · python · L334-L345 (12 LOC)backend/scrapers/news_rss.py
async def store(self, data: list[dict]) -> None:
"""Upsert news items into Supabase."""
if not data or supabase is None:
return
for item in data:
try:
supabase.table("news_items").upsert(item, on_conflict="id").execute()
except Exception as e:
logger.warning("Failed to upsert news item '%s': %s", item.get("title", "?")[:50], e)
logger.info("news_rss: stored %d items", len(data))match_area function · python · L107-L125 (19 LOC)backend/scrapers/pmc_water.py
def match_area(text: str) -> list[str]:
"""Match text to area IDs."""
text_lower = text.lower()
matched = []
# Check zone mapping
for zone, areas in ZONE_AREA_MAP.items():
if zone in text_lower:
matched.extend(areas)
# Check keyword mapping
for area_id, keywords in WATER_AREA_KEYWORDS.items():
for kw in keywords:
if kw in text_lower:
if area_id not in matched:
matched.append(area_id)
break
return matchedextract_times function · python · L128-L154 (27 LOC)backend/scrapers/pmc_water.py
def extract_times(text: str) -> tuple[str | None, str | None]:
"""Extract start and end times from text."""
times = re.findall(r'(\d{1,2}[:.]\d{2}\s*(?:am|pm|AM|PM)?)', text)
if not times:
times = re.findall(r'(\d{1,2}\s*(?:am|pm|AM|PM))', text)
start = None
end = None
for t in times[:2]:
t = t.strip().replace('.', ':')
try:
# Try 12-hour format
if re.search(r'(am|pm)', t, re.I):
parsed = datetime.strptime(t.upper().replace(' ', ''), '%I:%M%p' if ':' in t else '%I%p')
else:
parsed = datetime.strptime(t, '%H:%M')
formatted = parsed.strftime('%H:%M')
if start is None:
start = formatted
else:
end = formatted
except ValueError:
continue
return start, endOpen data scored by Repobility · https://repobility.com
PmcWaterScraper class · python · L157-L303 (147 LOC)backend/scrapers/pmc_water.py
class PmcWaterScraper(BaseScraper):
name = "pmc_water"
max_retries = 3
async def fetch(self) -> Any:
"""Fetch water schedule pages from PMC."""
results = {}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
}
async with aiohttp.ClientSession() as session:
for url in PMC_WATER_URLS:
try:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30), allow_redirects=True) as resp:
if resp.status == 200:
results[url] = await resp.text()
except Exception as e:
logger.warning("Failed to fetch PMC page %s: %s", url, e)
# Also try Google search for recent PMC water notices
try:
google_url = "https://news.fetch method · python · L161-L188 (28 LOC)backend/scrapers/pmc_water.py
async def fetch(self) -> Any:
"""Fetch water schedule pages from PMC."""
results = {}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
}
async with aiohttp.ClientSession() as session:
for url in PMC_WATER_URLS:
try:
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30), allow_redirects=True) as resp:
if resp.status == 200:
results[url] = await resp.text()
except Exception as e:
logger.warning("Failed to fetch PMC page %s: %s", url, e)
# Also try Google search for recent PMC water notices
try:
google_url = "https://news.google.com/rss/search?q=pune+pmc+water+supply+schedule+today&hl=en-IN&gl=IN"
validate method · python · L190-L293 (104 LOC)backend/scrapers/pmc_water.py
async def validate(self, raw: Any) -> list[dict]:
"""Parse water schedule data."""
schedules = []
now = datetime.now(timezone.utc).isoformat()
today = date.today().isoformat()
found_areas: set[str] = set()
for source_url, html in raw.items():
if source_url == "google_news":
# Parse RSS for water disruption news
import feedparser
feed = feedparser.parse(html)
for entry in feed.entries[:5]:
title = entry.get("title", "")
areas = match_area(title)
for area_id in areas:
if area_id in found_areas:
continue
found_areas.add(area_id)
# If news mentions disruption, mark with altered schedule
if any(kw in title.lower() for kw in ["cut", "disrupt", "no water", "shortage"]):
store method · python · L295-L303 (9 LOC)backend/scrapers/pmc_water.py
async def store(self, data: list[dict]) -> None:
"""Insert water schedules into Supabase."""
if not data or supabase is None:
return
for record in data:
supabase.table("water_schedules").insert(record).execute()
logger.info("pmc_water: stored %d schedules", len(data))categorize_aqi function · python · L48-L54 (7 LOC)backend/scrapers/safar_aqi.py
def categorize_aqi(aqi: int) -> str:
if aqi <= 50: return "Good"
if aqi <= 100: return "Satisfactory"
if aqi <= 200: return "Moderate"
if aqi <= 300: return "Poor"
if aqi <= 400: return "Very Poor"
return "Severe"SafarAqiScraper class · python · L57-L181 (125 LOC)backend/scrapers/safar_aqi.py
class SafarAqiScraper(BaseScraper):
name = "safar_aqi"
max_retries = 3
async def fetch(self) -> Any:
"""Fetch AQI data from SAFAR.
Tries the JSON API first, falls back to scraping the HTML page.
"""
url = f"{settings.SAFAR_BASE_URL}/api/aqi-data"
ssl_ctx = ssl.create_default_context(cafile=certifi.where())
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
conn = aiohttp.TCPConnector(ssl=ssl_ctx)
async with aiohttp.ClientSession(connector=conn) as session:
# Try JSON API endpoint
try:
headers = {
"Accept": "application/json",
"User-Agent": "Sajaag/0.1 (City Intelligence Dashboard)",
"Referer": settings.SAFAR_BASE_URL,
}
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status == 200fetch method · python · L61-L97 (37 LOC)backend/scrapers/safar_aqi.py
async def fetch(self) -> Any:
"""Fetch AQI data from SAFAR.
Tries the JSON API first, falls back to scraping the HTML page.
"""
url = f"{settings.SAFAR_BASE_URL}/api/aqi-data"
ssl_ctx = ssl.create_default_context(cafile=certifi.where())
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
conn = aiohttp.TCPConnector(ssl=ssl_ctx)
async with aiohttp.ClientSession(connector=conn) as session:
# Try JSON API endpoint
try:
headers = {
"Accept": "application/json",
"User-Agent": "Sajaag/0.1 (City Intelligence Dashboard)",
"Referer": settings.SAFAR_BASE_URL,
}
async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=30)) as resp:
if resp.status == 200:
content_type = resp.headers.get("Content-Type", "")
validate method · python · L99-L171 (73 LOC)backend/scrapers/safar_aqi.py
async def validate(self, raw: Any) -> list[dict]:
"""Parse and validate AQI readings."""
readings = []
now = datetime.now(timezone.utc).isoformat()
if isinstance(raw, dict) and raw.get("type") == "html_fallback":
# HTML fallback: parse with BeautifulSoup
from bs4 import BeautifulSoup
soup = BeautifulSoup(raw["html"], "lxml")
# Look for AQI values in the page
# SAFAR pages typically show station-wise AQI in card elements
aqi_elements = soup.select("[class*='aqi'], [class*='station'], .city-data")
if not aqi_elements:
# If we can't parse specific elements, try to find AQI numbers
# Look for patterns like "AQI: 142" or numeric values near station names
import re
text = soup.get_text()
# Try to extract any AQI value for Pune
aqi_match = re.search(r'(?:AQI|aqi)[:\s]*(\d+)', Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
store method · python · L173-L181 (9 LOC)backend/scrapers/safar_aqi.py
async def store(self, data: list[dict]) -> None:
"""Upsert AQI readings into Supabase."""
if not data or supabase is None:
return
for reading in data:
supabase.table("aqi_readings").insert(reading).execute()
logger.info("safar_aqi: stored %d readings", len(data))match_area function · python · L83-L90 (8 LOC)backend/scrapers/traffic.py
def match_area(text: str) -> str | None:
"""Match traffic text to the most relevant area."""
text_lower = text.lower()
for area_id, keywords in ROAD_AREA_MAP.items():
for kw in keywords:
if kw in text_lower:
return area_id
return Nonedetect_severity function · python · L93-L100 (8 LOC)backend/scrapers/traffic.py
def detect_severity(text: str) -> str:
"""Classify traffic incident severity."""
text_lower = text.lower()
for level, keywords in SEVERITY_MAP.items():
for kw in keywords:
if kw in text_lower:
return level
return "low"