← back to drbhatiasanjay__sajaag

Function bodies 140 total

All specs Real LLM only Function bodies
TrafficScraper class · python · L103-L191 (89 LOC)
backend/scrapers/traffic.py
class TrafficScraper(BaseScraper):
    name = "traffic"
    max_retries = 3

    async def fetch(self) -> Any:
        """Fetch traffic feeds."""
        results = []

        async with aiohttp.ClientSession() as session:
            for source in TRAFFIC_SOURCES:
                try:
                    async with session.get(
                        source["url"],
                        headers={"User-Agent": "Sajaag/0.1"},
                        timeout=aiohttp.ClientTimeout(total=15),
                    ) as resp:
                        if resp.status == 200:
                            text = await resp.text()
                            results.append({"xml": text, "config": source})
                except Exception as e:
                    logger.warning("Traffic source %s failed: %s", source["source"], e)

        return results

    async def validate(self, raw: Any) -> list[dict]:
        """Parse traffic feeds into incident records."""
        incidents = []
        se
fetch method · python · L107-L125 (19 LOC)
backend/scrapers/traffic.py
    async def fetch(self) -> Any:
        """Fetch traffic feeds."""
        results = []

        async with aiohttp.ClientSession() as session:
            for source in TRAFFIC_SOURCES:
                try:
                    async with session.get(
                        source["url"],
                        headers={"User-Agent": "Sajaag/0.1"},
                        timeout=aiohttp.ClientTimeout(total=15),
                    ) as resp:
                        if resp.status == 200:
                            text = await resp.text()
                            results.append({"xml": text, "config": source})
                except Exception as e:
                    logger.warning("Traffic source %s failed: %s", source["source"], e)

        return results
validate method · python · L127-L178 (52 LOC)
backend/scrapers/traffic.py
    async def validate(self, raw: Any) -> list[dict]:
        """Parse traffic feeds into incident records."""
        incidents = []
        seen: set[str] = set()
        now = datetime.now(timezone.utc).isoformat()

        for feed_data in raw:
            config = feed_data["config"]
            parsed = feedparser.parse(feed_data["xml"])

            for entry in parsed.entries[:15]:
                title = entry.get("title", "").strip()
                if not title:
                    continue

                summary = entry.get("summary", "")
                full_text = f"{title} {summary}"

                # Must mention Pune
                if "pune" not in full_text.lower():
                    continue

                area_id = match_area(full_text)
                if not area_id:
                    continue

                incident_id = md5(f"{title}:{config['source']}".encode()).hexdigest()[:16]
                if incident_id in seen:
                    continue
   
store method · python · L180-L191 (12 LOC)
backend/scrapers/traffic.py
    async def store(self, data: list[dict]) -> None:
        """Upsert traffic incidents into Supabase."""
        if not data or supabase is None:
            return

        for incident in data:
            try:
                supabase.table("traffic_incidents").upsert(incident, on_conflict="id").execute()
            except Exception as e:
                logger.warning("Failed to upsert traffic incident: %s", e)

        logger.info("traffic: stored %d incidents", len(data))
WeatherScraper class · python · L61-L150 (90 LOC)
backend/scrapers/weather.py
class WeatherScraper(BaseScraper):
    name = "weather"
    max_retries = 3

    async def fetch(self) -> Any:
        """Fetch current weather for Pune center (single request covers all areas).

        Pune is small enough (~20km across) that one weather station covers all areas.
        We use Pune center coordinates.
        """
        pune_center = {"lat": 18.5204, "lng": 73.8567}

        params = {
            "latitude": pune_center["lat"],
            "longitude": pune_center["lng"],
            "current": "temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m",
            "hourly": "temperature_2m,precipitation_probability,weather_code",
            "forecast_days": 2,
            "timezone": "Asia/Kolkata",
        }

        async with aiohttp.ClientSession() as session:
            async with session.get(
                settings.OPEN_METEO_URL,
                params=params,
                timeout=aiohttp.ClientTimeout(total=15),
            ) as resp:
      
fetch method · python · L65-L89 (25 LOC)
backend/scrapers/weather.py
    async def fetch(self) -> Any:
        """Fetch current weather for Pune center (single request covers all areas).

        Pune is small enough (~20km across) that one weather station covers all areas.
        We use Pune center coordinates.
        """
        pune_center = {"lat": 18.5204, "lng": 73.8567}

        params = {
            "latitude": pune_center["lat"],
            "longitude": pune_center["lng"],
            "current": "temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m",
            "hourly": "temperature_2m,precipitation_probability,weather_code",
            "forecast_days": 2,
            "timezone": "Asia/Kolkata",
        }

        async with aiohttp.ClientSession() as session:
            async with session.get(
                settings.OPEN_METEO_URL,
                params=params,
                timeout=aiohttp.ClientTimeout(total=15),
            ) as resp:
                resp.raise_for_status()
                return await resp.json()
validate method · python · L91-L140 (50 LOC)
backend/scrapers/weather.py
    async def validate(self, raw: Any) -> list[dict]:
        """Parse Open-Meteo response into weather records per area."""
        if not raw or "current" not in raw:
            return []

        current = raw["current"]
        hourly = raw.get("hourly", {})
        now = datetime.now(timezone.utc).isoformat()

        temp = current.get("temperature_2m", 0)
        humidity = current.get("relative_humidity_2m", 0)
        weather_code = current.get("weather_code", 0)
        condition = WMO_CODES.get(weather_code, "Unknown")

        # Calculate rain probability from next 6 hours
        precip_probs = hourly.get("precipitation_probability", [])
        rain_prob = max(precip_probs[:6]) if precip_probs else 0

        # Build 48h forecast JSON from hourly data
        forecast = {}
        hourly_times = hourly.get("time", [])
        hourly_temps = hourly.get("temperature_2m", [])
        hourly_codes = hourly.get("weather_code", [])
        hourly_precip = hourly.get("precipita
All rows scored by the Repobility analyzer (https://repobility.com)
store method · python · L142-L150 (9 LOC)
backend/scrapers/weather.py
    async def store(self, data: list[dict]) -> None:
        """Insert weather records into Supabase."""
        if not data or supabase is None:
            return

        for record in data:
            supabase.table("weather_data").insert(record).execute()

        logger.info("weather: stored %d records", len(data))
getAlerts function · typescript · L9-L11 (3 LOC)
packages/api-client/src/alerts.ts
export async function getAlerts(areaId: string): Promise<AlertsResponse> {
  return apiClient<AlertsResponse>(`/api/alerts/${areaId}`)
}
getMorningBrief function · typescript · L4-L6 (3 LOC)
packages/api-client/src/brief.ts
export async function getMorningBrief(areaId: string, lang: Language = 'en'): Promise<MorningBrief> {
  return apiClient<MorningBrief>(`/api/brief/${areaId}?lang=${lang}`)
}
apiClient function · typescript · L10-L34 (25 LOC)
packages/api-client/src/client.ts
export async function apiClient<T>(path: string, options: FetchOptions = {}): Promise<T> {
  const { method = 'GET', body, headers = {}, adminKey } = options

  const allHeaders: Record<string, string> = {
    'Content-Type': 'application/json',
    ...headers,
  }

  if (adminKey) {
    allHeaders['X-Admin-Key'] = adminKey
  }

  const response = await fetch(`${API_BASE_URL}${path}`, {
    method,
    headers: allHeaders,
    body: body ? JSON.stringify(body) : undefined,
  })

  if (!response.ok) {
    const error = await response.text().catch(() => 'Unknown error')
    throw new Error(`API ${method} ${path} failed (${response.status}): ${error}`)
  }

  return response.json()
}
submitContent function · typescript · L12-L14 (3 LOC)
packages/api-client/src/content.ts
export async function submitContent(data: SubmitContentPayload): Promise<ContentSubmission> {
  return apiClient<ContentSubmission>('/api/content/submit', { method: 'POST', body: data })
}
getPendingSubmissions function · typescript · L16-L18 (3 LOC)
packages/api-client/src/content.ts
export async function getPendingSubmissions(adminKey: string): Promise<ContentSubmission[]> {
  return apiClient<ContentSubmission[]>('/api/admin/pending', { adminKey })
}
reviewSubmission function · typescript · L20-L31 (12 LOC)
packages/api-client/src/content.ts
export async function reviewSubmission(
  id: string,
  action: 'approve' | 'reject',
  note: string,
  adminKey: string
): Promise<ContentSubmission> {
  return apiClient<ContentSubmission>(`/api/admin/review/${id}`, {
    method: 'POST',
    body: { action, note },
    adminKey,
  })
}
getDeals function · typescript · L4-L6 (3 LOC)
packages/api-client/src/deals.ts
export async function getDeals(areaId: string): Promise<Deal[]> {
  return apiClient<Deal[]>(`/api/deals/${areaId}`)
}
Powered by Repobility — scan your code at https://repobility.com
submitDeal function · typescript · L8-L10 (3 LOC)
packages/api-client/src/deals.ts
export async function submitDeal(deal: Omit<Deal, 'id' | 'created_at' | 'verified'>): Promise<Deal> {
  return apiClient<Deal>('/api/deals', { method: 'POST', body: deal })
}
getLayerData function · typescript · L4-L6 (3 LOC)
packages/api-client/src/layers.ts
export async function getLayerData(type: LayerType, areaId: string): Promise<MapMarker[]> {
  return apiClient<MapMarker[]>(`/api/layers/${type}/${areaId}`)
}
getNews function · typescript · L4-L6 (3 LOC)
packages/api-client/src/news.ts
export async function getNews(areaId: string): Promise<NewsItem[]> {
  return apiClient<NewsItem[]>(`/api/news/${areaId}`)
}
formatDistance function · typescript · L1-L4 (4 LOC)
packages/shared/lib/format.ts
export function formatDistance(meters: number): string {
  if (meters < 1000) return `${Math.round(meters)}m`
  return `${(meters / 1000).toFixed(1)}km`
}
formatTime function · typescript · L6-L11 (6 LOC)
packages/shared/lib/format.ts
export function formatTime(timeStr: string): string {
  const [h, m] = timeStr.split(':').map(Number)
  const period = h >= 12 ? 'PM' : 'AM'
  const hour = h % 12 || 12
  return m === 0 ? `${hour} ${period}` : `${hour}:${String(m).padStart(2, '0')} ${period}`
}
formatAqiCategory function · typescript · L13-L20 (8 LOC)
packages/shared/lib/format.ts
export function formatAqiCategory(aqi: number): string {
  if (aqi <= 50) return 'Good'
  if (aqi <= 100) return 'Satisfactory'
  if (aqi <= 200) return 'Moderate'
  if (aqi <= 300) return 'Poor'
  if (aqi <= 400) return 'Very Poor'
  return 'Severe'
}
formatAqiColor function · typescript · L22-L29 (8 LOC)
packages/shared/lib/format.ts
export function formatAqiColor(aqi: number): string {
  if (aqi <= 50) return '#22c55e'
  if (aqi <= 100) return '#84cc16'
  if (aqi <= 200) return '#fb923c'
  if (aqi <= 300) return '#ef4444'
  if (aqi <= 400) return '#dc2626'
  return '#7f1d1d'
}
formatRelativeTime function · typescript · L31-L43 (13 LOC)
packages/shared/lib/format.ts
export function formatRelativeTime(dateStr: string): string {
  const now = new Date()
  const date = new Date(dateStr)
  const diffMs = now.getTime() - date.getTime()
  const diffMin = Math.floor(diffMs / 60000)

  if (diffMin < 1) return 'just now'
  if (diffMin < 60) return `${diffMin}m ago`
  const diffHr = Math.floor(diffMin / 60)
  if (diffHr < 24) return `${diffHr}h ago`
  const diffDay = Math.floor(diffHr / 24)
  return `${diffDay}d ago`
}
All rows above produced by Repobility · https://repobility.com
formatReliability function · typescript · L45-L47 (3 LOC)
packages/shared/lib/format.ts
export function formatReliability(pct: number): string {
  return `${Math.round(pct)}%`
}
createRadiusCircle function · typescript · L5-L8 (4 LOC)
packages/shared/lib/spatial.ts
export function createRadiusCircle(center: GeoPosition, radiusMeters: number, steps = 64) {
  const point = turf.point([center.lng, center.lat])
  return turf.circle(point, radiusMeters / 1000, { steps, units: 'kilometers' })
}
getH3Index function · typescript · L10-L12 (3 LOC)
packages/shared/lib/spatial.ts
export function getH3Index(position: GeoPosition, resolution: number): string {
  return h3.latLngToCell(position.lat, position.lng, resolution)
}
getH3Ring function · typescript · L14-L17 (4 LOC)
packages/shared/lib/spatial.ts
export function getH3Ring(position: GeoPosition, resolution: number, ringSize = 1): string[] {
  const centerCell = getH3Index(position, resolution)
  return h3.gridDisk(centerCell, ringSize)
}
getH3CellBoundary function · typescript · L19-L21 (3 LOC)
packages/shared/lib/spatial.ts
export function getH3CellBoundary(cellIndex: string): [number, number][] {
  return h3.cellToBoundary(cellIndex).map(([lat, lng]) => [lng, lat])
}
distanceBetween function · typescript · L23-L27 (5 LOC)
packages/shared/lib/spatial.ts
export function distanceBetween(a: GeoPosition, b: GeoPosition): number {
  const from = turf.point([a.lng, a.lat])
  const to = turf.point([b.lng, b.lat])
  return turf.distance(from, to, { units: 'meters' })
}
isWithinRadius function · typescript · L29-L31 (3 LOC)
packages/shared/lib/spatial.ts
export function isWithinRadius(center: GeoPosition, point: GeoPosition, radiusMeters: number): boolean {
  return distanceBetween(center, point) <= radiusMeters
}
findNearestArea function · typescript · L33-L44 (12 LOC)
packages/shared/lib/spatial.ts
export function findNearestArea(position: GeoPosition, areas: { lat: number; lng: number; id: string }[]): string | null {
  let nearest: string | null = null
  let minDist = Infinity
  for (const area of areas) {
    const dist = distanceBetween(position, { lat: area.lat, lng: area.lng })
    if (dist < minDist) {
      minDist = dist
      nearest = area.id
    }
  }
  return nearest
}
Want this analysis on your repo? https://repobility.com/scan/
api function · python · L35-L55 (21 LOC)
scripts/deploy.py
def api(method, path, body=None):
    """Call Render API."""
    url = f"{RENDER_API}{path}"
    data = json.dumps(body).encode() if body else None
    req = urllib.request.Request(
        url, data=data, method=method,
        headers={
            "Authorization": f"Bearer {RENDER_API_KEY}",
            "Content-Type": "application/json",
        },
    )
    try:
        with urllib.request.urlopen(req, timeout=15) as resp:
            body = resp.read()
            return json.loads(body) if body else None
    except urllib.error.HTTPError as e:
        print(f"  API error {e.code}: {e.read().decode()[:200]}")
        return None
    except Exception as e:
        print(f"  API exception: {e}")
        return None
get_deploy_status function · python · L58-L63 (6 LOC)
scripts/deploy.py
def get_deploy_status(service_id):
    """Get latest deploy status."""
    result = api("GET", f"/services/{service_id}/deploys?limit=1")
    if result and len(result) > 0:
        return result[0]["deploy"]
    return None
trigger_deploy function · python · L66-L93 (28 LOC)
scripts/deploy.py
def trigger_deploy(service_id, name, clear_cache=False):
    """Trigger a new deploy, or wait for existing one."""
    print(f"\n{'='*60}")
    print(f"  DEPLOYING: {name}")
    print(f"{'='*60}")

    # Check if there's already a deploy in progress
    current = get_deploy_status(service_id)
    if current and current["status"] in ("build_in_progress", "update_in_progress"):
        print(f"  Existing deploy in progress: {current['id']}")
        print(f"  Waiting for it to complete...")
        return current["id"]

    cache = "clear" if clear_cache else "do_not_clear"
    result = api("POST", f"/services/{service_id}/deploys", {"clearCache": cache})
    if not result:
        print(f"  FAILED to trigger deploy for {name}")
        return None

    deploy_id = result.get("id")
    if not deploy_id:
        print(f"  Unexpected response: {json.dumps(result)[:200]}")
        return None

    print(f"  Deploy ID: {deploy_id}")
    print(f"  Status: {result.get('status', 'unknown')}")
 
wait_for_deploy function · python · L96-L129 (34 LOC)
scripts/deploy.py
def wait_for_deploy(service_id, name, deploy_id):
    """Wait for deploy to complete with progress bar."""
    est = AVG_BUILD_TIME.get(name, 120)
    start = time.time()
    last_status = ""

    while True:
        elapsed = int(time.time() - start)
        dep = get_deploy_status(service_id)
        if not dep:
            time.sleep(POLL_INTERVAL)
            continue

        status = dep["status"]
        if status != last_status:
            print(f"  [{elapsed:3d}s] {status}")
            last_status = status

        if status == "live":
            print(f"  LIVE in {elapsed}s")
            # Update average build time
            AVG_BUILD_TIME[name] = int(elapsed * 0.7 + AVG_BUILD_TIME.get(name, elapsed) * 0.3)
            return True

        if "fail" in status:
            print(f"  FAILED after {elapsed}s")
            return False

        # Progress indicator
        pct = min(95, int(elapsed / est * 100))
        bar = "#" * (pct // 5) + "-" * (20 - pct // 5)
        
verify_endpoint function · python · L132-L149 (18 LOC)
scripts/deploy.py
def verify_endpoint(url, name, expected_key=None):
    """Verify an endpoint returns 200 and optional JSON key."""
    print(f"  Checking {url}...")
    try:
        req = urllib.request.Request(url, headers={"User-Agent": "Sajaag-Deploy/1.0"})
        with urllib.request.urlopen(req, timeout=10) as resp:
            if resp.status == 200:
                body = json.loads(resp.read())
                if expected_key and expected_key not in body:
                    print(f"  WARN: missing key '{expected_key}'")
                    return False
                print(f"  OK (200)")
                return True
            print(f"  FAIL ({resp.status})")
            return False
    except Exception as e:
        print(f"  FAIL: {e}")
        return False
git_push function · python · L152-L180 (29 LOC)
scripts/deploy.py
def git_push():
    """Commit any changes and push to origin."""
    print("\n── Git Push ──────────────────────────────────────")

    # Check for changes
    result = subprocess.run(["git", "status", "--porcelain"], capture_output=True, text=True)
    if result.stdout.strip():
        print(f"  Uncommitted changes detected:")
        for line in result.stdout.strip().split("\n")[:5]:
            print(f"    {line}")

        # Stage and commit
        subprocess.run(["git", "add", "-A"], check=True)
        subprocess.run([
            "git", "commit", "-m",
            f"deploy: auto-deploy {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\nCo-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>"
        ], check=True)
    else:
        print("  No changes to commit")

    # Push
    print("  Pushing to origin...")
    result = subprocess.run(["git", "push"], capture_output=True, text=True)
    if result.returncode == 0:
        print("  Pushed successfully")
        return
deploy_backend function · python · L183-L204 (22 LOC)
scripts/deploy.py
def deploy_backend():
    """Deploy backend API service."""
    deploy_id = trigger_deploy(BACKEND_SERVICE_ID, "backend")
    if not deploy_id:
        return False

    success = wait_for_deploy(BACKEND_SERVICE_ID, "backend", deploy_id)
    if not success:
        return False

    # Wait for scrapers to initialize
    print("  Waiting 30s for scrapers to init...")
    time.sleep(30)

    # Verify
    print("\n── Backend Verification ───────────────────────────")
    checks = [
        verify_endpoint(f"{BACKEND_URL}/", "root", "app"),
        verify_endpoint(f"{BACKEND_URL}/api/health", "health", "db_connected"),
        verify_endpoint(f"{BACKEND_URL}/api/brief/magarpatta", "brief", "area_name"),
    ]
    return all(checks)
deploy_frontend function · python · L207-L231 (25 LOC)
scripts/deploy.py
def deploy_frontend():
    """Deploy frontend static site."""
    deploy_id = trigger_deploy(FRONTEND_SERVICE_ID, "frontend")
    if not deploy_id:
        return False

    success = wait_for_deploy(FRONTEND_SERVICE_ID, "frontend", deploy_id)
    if not success:
        return False

    # Verify
    print("\n── Frontend Verification ──────────────────────────")
    try:
        req = urllib.request.Request(FRONTEND_URL, headers={"User-Agent": "Sajaag-Deploy/1.0"})
        with urllib.request.urlopen(req, timeout=10) as resp:
            if resp.status == 200:
                html = resp.read().decode()
                if "Sajaag" in html or "<div" in html:
                    print(f"  OK — HTML served at {FRONTEND_URL}")
                    return True
            print(f"  FAIL ({resp.status})")
            return False
    except Exception as e:
        print(f"  FAIL: {e}")
        return False
All rows scored by the Repobility analyzer (https://repobility.com)
main function · python · L234-L277 (44 LOC)
scripts/deploy.py
def main():
    target = sys.argv[1] if len(sys.argv) > 1 else "all"
    skip_git = "--skip-git" in sys.argv

    print(f"\n{'='*60}")
    print(f"  SAJAAG DEPLOY PIPELINE")
    print(f"  Target: {target} | Git: {'skip' if skip_git else 'push'}")
    print(f"  Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"{'='*60}")

    # Step 1: Git push
    if not skip_git:
        if not git_push():
            print("\nGit push failed. Use --skip-git to deploy existing code.")
            sys.exit(1)

    results = {}

    # Step 2: Deploy backend
    if target in ("all", "backend"):
        results["backend"] = deploy_backend()

    # Step 3: Deploy frontend
    if target in ("all", "frontend"):
        results["frontend"] = deploy_frontend()

    # Summary
    print(f"\n{'='*60}")
    print(f"  DEPLOY SUMMARY")
    print(f"{'='*60}")
    for name, success in results.items():
        icon = "OK" if success else "FAIL"
        url = BACKEND_URL if name == "backend" else FRONTE
‹ prevpage 3 / 3