← back to dripaidata__bronasty-league

Function bodies 228 total

All specs Real LLM only Function bodies
__init__ method · python · L20-L22 (3 LOC)
backend/calculation_engine.py
    def __init__(self, db_file: str = DB_FILE):
        self.db_file = db_file
        self.conn = None
connect_db method · python · L24-L28 (5 LOC)
backend/calculation_engine.py
    def connect_db(self):
        """Connect to database"""
        self.conn = sqlite3.connect(self.db_file)
        self.conn.row_factory = sqlite3.Row
        print(f"✓ Connected to database")
close_db method · python · L30-L33 (4 LOC)
backend/calculation_engine.py
    def close_db(self):
        """Close database connection"""
        if self.conn:
            self.conn.close()
calculate_season_stats method · python · L35-L124 (90 LOC)
backend/calculation_engine.py
    def calculate_season_stats(self, season_year: int):
        """Aggregate game stats into season totals for all players"""
        print(f"\n=== CALCULATING SEASON STATS FOR {season_year} ===")
        
        cursor = self.conn.cursor()
        
        # Get all players who played in this season
        cursor.execute("""
            SELECT DISTINCT 
                pgs.player_id,
                p.position,
                pgs.team_id
            FROM player_game_stats pgs
            JOIN players p ON pgs.player_id = p.player_id
            JOIN games g ON pgs.game_id = g.game_id
            WHERE g.season_year = ? AND g.season_type = 'REG'
        """, (season_year,))
        
        players = cursor.fetchall()
        players_processed = 0
        
        for player in players:
            player_id = player['player_id']
            position = player['position']
            team_id = player['team_id']
            
            # Aggregate stats for this player
            cu
calculate_position_rankings method · python · L126-L173 (48 LOC)
backend/calculation_engine.py
    def calculate_position_rankings(self, season_year: int):
        """Calculate position rankings for total points and FPPG"""
        print(f"\n=== CALCULATING POSITION RANKINGS FOR {season_year} ===")
        
        cursor = self.conn.cursor()
        
        # Get all positions
        positions = ['QB', 'RB', 'WR', 'TE']
        
        for position in positions:
            # Rank by total points
            cursor.execute("""
                SELECT player_id, total_fantasy_points
                FROM player_season_stats
                WHERE season_year = ? AND position = ? AND games_played > 0
                ORDER BY total_fantasy_points DESC
            """, (season_year, position))
            
            players_total = cursor.fetchall()
            
            for rank, player in enumerate(players_total, 1):
                cursor.execute("""
                    UPDATE player_season_stats
                    SET position_rank_total_points = ?
                    WHE
calculate_weekly_position_rankings method · python · L175-L247 (73 LOC)
backend/calculation_engine.py
    def calculate_weekly_position_rankings(self, season_year: int):
        """Calculate position rankings for each week of the season"""
        print(f"\n=== CALCULATING WEEKLY POSITION RANKINGS FOR {season_year} ===")
        
        cursor = self.conn.cursor()
        
        # First, check if position_rank column exists, if not add it
        cursor.execute("PRAGMA table_info(player_game_stats)")
        columns = [col[1] for col in cursor.fetchall()]
        
        if 'position_rank' not in columns:
            print("Adding position_rank column to player_game_stats...")
            cursor.execute("""
                ALTER TABLE player_game_stats 
                ADD COLUMN position_rank INTEGER
            """)
            self.conn.commit()
            print("✓ Added position_rank column")
        
        # Get all weeks in this season
        cursor.execute("""
            SELECT DISTINCT week
            FROM games
            WHERE season_year = ? AND season_type = 'REG
calculate_defensive_rankings method · python · L249-L379 (131 LOC)
backend/calculation_engine.py
    def calculate_defensive_rankings(self, season_year: int, through_week: int = None):
        """Calculate defensive rankings by position"""
        print(f"\n=== CALCULATING DEFENSIVE RANKINGS FOR {season_year} ===")
        
        cursor = self.conn.cursor()
        positions = ['QB', 'RB', 'WR', 'TE']

        # Calculate for each team and position
        cursor.execute("SELECT team_id FROM teams")
        teams = cursor.fetchall()
        
        for team in teams:
            team_id = team['team_id']
            
            for position in positions:
                # Get all points scored against this team by this position
                week_clause = f"AND g.week <= {through_week}" if through_week else ""
                
                cursor.execute(f"""
                    SELECT 
                        COUNT(DISTINCT g.game_id) as games_played,
                        SUM(pgs.fantasy_points) as total_points_allowed
                    FROM player_game_stats pgs
  
Powered by Repobility — scan your code at https://repobility.com
calculate_projections method · python · L381-L413 (33 LOC)
backend/calculation_engine.py
    def calculate_projections(self, season_year: int, week: int):
        """Calculate projections for upcoming week"""
        print(f"\n=== CALCULATING PROJECTIONS FOR {season_year} WEEK {week} ===")
        
        cursor = self.conn.cursor()
        
        # Get all scheduled games for this week
        cursor.execute("""
            SELECT game_id, home_team_id, away_team_id
            FROM games
            WHERE season_year = ? AND week = ? AND season_type = 'REG'
        """, (season_year, week))
        
        games = cursor.fetchall()
        projections_created = 0
        
        for game in games:
            # Process home team players
            self._project_team_players(
                game['home_team_id'], game['away_team_id'],
                season_year, week, cursor
            )
            projections_created += 1
            
            # Process away team players
            self._project_team_players(
                game['away_team_id'], game['home_
_project_team_players method · python · L415-L488 (74 LOC)
backend/calculation_engine.py
    def _project_team_players(self, team_id: str, opponent_id: str,
                              season_year: int, week: int, cursor):
        """Project fantasy points for all players on a team"""
        
        # Get all active players on this team
        cursor.execute("""
            SELECT DISTINCT player_id, position
            FROM players
            WHERE team_id = ? AND position IN ('QB', 'RB', 'WR', 'TE')
        """, (team_id,))
        
        players = cursor.fetchall()
        
        for player in players:
            player_id = player['player_id']
            position = player['position']
            
            # Calculate last 3 games FPPG
            if week <= 3:
                # Use previous season's FPPG
                cursor.execute("""
                    SELECT fantasy_points_per_game
                    FROM player_season_stats
                    WHERE player_id = ? AND season_year = ?
                """, (player_id, season_year - 1))
           
run_all_calculations method · python · L490-L527 (38 LOC)
backend/calculation_engine.py
    def run_all_calculations(self):
        """Run all calculations for all seasons"""
        print("\n" + "="*60)
        print("DYNASTY FANTASY FOOTBALL - CALCULATIONS")
        print("="*60)
        
        self.connect_db()
        
        seasons = [2021, 2022, 2023, 2024, 2025]
        
        for season_year in seasons:
            # Calculate season stats
            self.calculate_season_stats(season_year)
            
            # Calculate weekly position rankings
            self.calculate_weekly_position_rankings(season_year)
            
            # Calculate season-level position rankings
            self.calculate_position_rankings(season_year)
            
            # Calculate defensive rankings (full season)
            self.calculate_defensive_rankings(season_year)
            
            # Calculate week-by-week defensive rankings for current season
            if season_year == 2025:
                for week in range(1, 18):
                    self.calc
api_call function · python · L27-L47 (21 LOC)
backend/pull_2025_schedule_only.py
def api_call(endpoint: str) -> Optional[Dict]:
    """Make API call with rate limiting"""
    url = f"{BASE_URL}/{endpoint}?api_key={SPORTRADAR_API_KEY}"

    try:
        time.sleep(API_CALL_DELAY)
        response = requests.get(url, timeout=30)

        if response.status_code == 200:
            return response.json()
        elif response.status_code == 429:
            print("Rate limit hit, waiting 60 seconds...")
            time.sleep(60)
            return api_call(endpoint)
        else:
            print(f"API Error {response.status_code}: {endpoint}")
            return None

    except Exception as e:
        print(f"Exception: {e}")
        return None
pull_schedule function · python · L50-L157 (108 LOC)
backend/pull_2025_schedule_only.py
def pull_schedule():
    """Pull and insert schedule for weeks 4-18"""
    print("\n" + "="*60)
    print("PULL 2025 SCHEDULE - WEEKS 4-18")
    print("="*60)
    print(f"\nThis will add weeks {WEEKS_TO_ADD[0]}-{WEEKS_TO_ADD[-1]} as 'scheduled' games")
    print("No stats will be pulled - just the schedule for testing future matchups")
    print("="*60)

    # Connect to database
    conn = sqlite3.connect(DB_FILE)
    cursor = conn.cursor()

    # Fetch the schedule
    print(f"\nFetching {SEASON_YEAR} {SEASON_TYPE} schedule...")
    endpoint = f"games/{SEASON_YEAR}/{SEASON_TYPE}/schedule.json"
    data = api_call(endpoint)

    if not data:
        print("Failed to get schedule")
        conn.close()
        return

    games_added = 0
    games_skipped = 0

    for week in data.get('weeks', []):
        week_num = week.get('sequence', 0)

        # Only process specified weeks
        if week_num not in WEEKS_TO_ADD:
            continue

        print(f"\nProcessing Week {week_num}
api_call function · python · L23-L47 (25 LOC)
backend/pull_full_rosters.py
def api_call(endpoint: str) -> Optional[Dict]:
    """Make API call with rate limiting and error handling"""
    url = f"{BASE_URL}/{endpoint}"
    headers = {
        'accept': 'application/json',
        'x-api-key': SPORTRADAR_API_KEY
    }

    try:
        time.sleep(API_CALL_DELAY)
        response = requests.get(url, headers=headers, timeout=30)

        if response.status_code == 200:
            return response.json()
        elif response.status_code == 429:
            print(f"  Rate limit hit, waiting 60 seconds...")
            time.sleep(60)
            return api_call(endpoint)  # Retry
        else:
            print(f"  API Error {response.status_code}: {endpoint}")
            return None

    except Exception as e:
        print(f"  Exception calling API: {e}")
        return None
get_all_team_ids function · python · L50-L54 (5 LOC)
backend/pull_full_rosters.py
def get_all_team_ids(conn) -> list:
    """Get all team IDs from the database"""
    cursor = conn.cursor()
    cursor.execute("SELECT team_id, team_alias FROM teams ORDER BY team_alias")
    return cursor.fetchall()
pull_team_roster function · python · L57-L125 (69 LOC)
backend/pull_full_rosters.py
def pull_team_roster(team_id: str, team_alias: str, conn) -> int:
    """Pull full roster for a single team and update database"""
    print(f"  Pulling roster for {team_alias}...")

    data = api_call(f"teams/{team_id}/full_roster.json")
    if not data:
        print(f"    Failed to get roster for {team_alias}")
        return 0

    cursor = conn.cursor()
    players_updated = 0

    players = data.get('players', [])

    for player in players:
        player_id = player.get('id')
        if not player_id:
            continue

        # Only include fantasy-relevant positions
        position = player.get('position', '')
        if position not in ['QB', 'RB', 'WR', 'TE', 'K']:
            continue

        # Build full name
        first_name = player.get('first_name', '')
        last_name = player.get('last_name', '')
        full_name = player.get('name', f"{first_name} {last_name}")

        # Get other fields
        jersey = player.get('jersey')
        status = player.get(
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
main function · python · L128-L190 (63 LOC)
backend/pull_full_rosters.py
def main():
    print("=" * 60)
    print("PULL FULL NFL ROSTERS")
    print("=" * 60)
    print()
    print("This will pull complete rosters for all 32 NFL teams")
    print("and update the players table with any missing players.")
    print("=" * 60)
    print()

    response = input("Pull full rosters for all teams? (yes/no): ")
    if response.lower() != 'yes':
        print("Cancelled.")
        return

    # Connect to database
    conn = sqlite3.connect(DB_FILE)
    conn.row_factory = sqlite3.Row

    # Get player count before
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM players")
    players_before = cursor.fetchone()[0]
    print(f"\nPlayers in database before: {players_before}")

    # Get all teams
    teams = get_all_team_ids(conn)
    print(f"Teams to process: {len(teams)}")
    print()

    total_players = 0

    for team_id, team_alias in teams:
        count = pull_team_roster(team_id, team_alias, conn)
        total_players += count
        print
__init__ method · python · L40-L44 (5 LOC)
backend/rapidapi_data_puller.py
    def __init__(self, db_file: str = DB_FILE):
        self.db_file = db_file
        self.conn = None
        self.api_calls_made = 0
        self.position_lookup = {}  # Cache for player positions from backup
load_position_lookup method · python · L46-L64 (19 LOC)
backend/rapidapi_data_puller.py
    def load_position_lookup(self):
        """Load position data from Sportradar backup for hybrid approach"""
        backup_path = os.path.join(os.path.dirname(__file__), 'dynasty_football_sportradar_backup.db')
        if not os.path.exists(backup_path):
            print("[WARN] No backup database found for position lookup")
            return

        try:
            backup_conn = sqlite3.connect(backup_path)
            cursor = backup_conn.cursor()
            cursor.execute('SELECT full_name, position FROM players WHERE position IS NOT NULL')
            for row in cursor.fetchall():
                # Normalize name for lookup
                name = row[0].lower().strip()
                self.position_lookup[name] = row[1]
            backup_conn.close()
            print(f"[OK] Loaded {len(self.position_lookup)} player positions from backup")
        except Exception as e:
            print(f"[WARN] Could not load position lookup: {e}")
connect_db method · python · L66-L70 (5 LOC)
backend/rapidapi_data_puller.py
    def connect_db(self):
        """Connect to SQLite database"""
        self.conn = sqlite3.connect(self.db_file)
        self.conn.row_factory = sqlite3.Row
        print(f"✓ Connected to database: {self.db_file}")
close_db method · python · L72-L76 (5 LOC)
backend/rapidapi_data_puller.py
    def close_db(self):
        """Close database connection"""
        if self.conn:
            self.conn.close()
            print(f"✓ Closed database connection")
init_database method · python · L78-L98 (21 LOC)
backend/rapidapi_data_puller.py
    def init_database(self):
        """Initialize database with schema (skip if tables exist)"""
        print("\n=== INITIALIZING DATABASE ===")

        cursor = self.conn.cursor()

        # Check if tables already exist
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='teams'")
        if cursor.fetchone():
            print("[OK] Database tables already exist, skipping schema creation")
            return

        # Read and execute schema only if tables don't exist
        schema_path = os.path.join(os.path.dirname(__file__), 'dynasty_schema.sql')
        with open(schema_path, 'r') as f:
            schema = f.read()

        cursor.executescript(schema)
        self.conn.commit()

        print("[OK] Database schema created")
api_call method · python · L100-L124 (25 LOC)
backend/rapidapi_data_puller.py
    def api_call(self, endpoint: str, params: Dict = None) -> Optional[Dict]:
        """Make API call with rate limiting and error handling"""
        url = f"{BASE_URL}/{endpoint}"

        try:
            # Rate limiting
            time.sleep(API_CALL_DELAY)

            response = requests.get(url, headers=HEADERS, params=params, timeout=30)
            self.api_calls_made += 1

            if response.status_code == 200:
                return response.json()
            elif response.status_code == 429:
                print(f"⚠ Rate limit hit, waiting 60 seconds...")
                time.sleep(60)
                return self.api_call(endpoint, params)  # Retry
            else:
                print(f"✗ API Error {response.status_code}: {endpoint}")
                print(f"  Response: {response.text[:200]}")
                return None

        except Exception as e:
            print(f"✗ Exception calling API: {e}")
            return None
pull_teams method · python · L130-L182 (53 LOC)
backend/rapidapi_data_puller.py
    def pull_teams(self):
        """Pull all NFL teams from /nfl-team-listing/v1/data"""
        print("\n=== PULLING TEAMS ===")

        data = self.api_call("nfl-team-listing/v1/data")
        if not data:
            print("✗ Failed to get teams")
            return []

        cursor = self.conn.cursor()
        teams_added = 0
        team_ids = []

        # Response is an array of {'team': {...}} objects
        teams_list = data if isinstance(data, list) else data.get('teams', [])

        for item in teams_list:
            try:
                # Unwrap the 'team' key if present
                team = item.get('team', item) if isinstance(item, dict) else item

                team_id = team.get('id')
                if not team_id:
                    continue

                team_ids.append(team_id)

                # Extract logo URL
                logos = team.get('logos', [])
                logo_url = logos[0].get('href') if logos else None

                cursor.exe
Open data scored by Repobility · https://repobility.com
pull_team_roster method · python · L188-L270 (83 LOC)
backend/rapidapi_data_puller.py
    def pull_team_roster(self, team_id: str):
        """Pull roster for a specific team from /nfl-team-roster"""
        data = self.api_call("nfl-team-roster", params={"id": team_id})
        if not data:
            return 0

        cursor = self.conn.cursor()
        players_added = 0

        # Athletes is a direct array of player objects
        athletes = data.get('athletes', [])

        for player in athletes:
            if isinstance(player, dict):
                try:
                    player_id = player.get('id')
                    if not player_id:
                        continue

                    # Extract position - try lookup from backup first
                    full_name = player.get('fullName', player.get('displayName', ''))
                    position = self.position_lookup.get(full_name.lower().strip(), '')

                    # If not found in lookup, use generic position or empty
                    if not position:
                        position = p
pull_all_rosters method · python · L272-L286 (15 LOC)
backend/rapidapi_data_puller.py
    def pull_all_rosters(self, team_ids: List[str]):
        """Pull rosters for all teams"""
        print("\n=== PULLING ROSTERS ===")

        # Load position lookup from backup for hybrid approach
        self.load_position_lookup()

        total_players = 0

        for idx, team_id in enumerate(team_ids, 1):
            print(f"  Team {idx}/{len(team_ids)}", end='\r')
            players = self.pull_team_roster(team_id)
            total_players += players

        print(f"\n✓ Added {total_players} players from {len(team_ids)} teams")
pull_week_schedule method · python · L292-L306 (15 LOC)
backend/rapidapi_data_puller.py
    def pull_week_schedule(self, year: int, week: int) -> List[str]:
        """Pull game IDs for a specific week from /nfl-weeks-events"""
        data = self.api_call("nfl-weeks-events", params={
            "year": year,
            "week": week,
            "type": 2  # Regular season
        })

        if not data:
            return []

        items = data.get('items', [])
        game_ids = [item.get('eventid') for item in items if item.get('eventid')]

        return game_ids
pull_season_schedule method · python · L308-L320 (13 LOC)
backend/rapidapi_data_puller.py
    def pull_season_schedule(self, year: int, weeks: range = range(1, 19)):
        """Pull schedule for an entire season"""
        print(f"\n=== PULLING {year} SEASON SCHEDULE ===")

        all_game_ids = []

        for week in weeks:
            game_ids = self.pull_week_schedule(year, week)
            all_game_ids.extend([(gid, week, year) for gid in game_ids])
            print(f"  Week {week}: {len(game_ids)} games")

        print(f"✓ Found {len(all_game_ids)} total games for {year}")
        return all_game_ids
pull_boxscore method · python · L326-L527 (202 LOC)
backend/rapidapi_data_puller.py
    def pull_boxscore(self, game_id: str, week: int, season_year: int):
        """Pull boxscore for a specific game from /nfl-boxscore"""
        data = self.api_call("nfl-boxscore", params={"id": game_id})

        if not data:
            return

        cursor = self.conn.cursor()

        # Extract game info
        boxscore = data.get('boxscore', {})

        # Get teams info for the games table
        teams_data = boxscore.get('teams', [])
        home_team = None
        away_team = None

        for team_data in teams_data:
            if team_data.get('homeAway') == 'home':
                home_team = team_data.get('team', {})
            elif team_data.get('homeAway') == 'away':
                away_team = team_data.get('team', {})

        # Insert/update game record
        if home_team and away_team:
            # Get game header info
            header = data.get('header', {})
            competitions = header.get('competitions', [{}])
            competition = competit
_parse_stat method · python · L529-L536 (8 LOC)
backend/rapidapi_data_puller.py
    def _parse_stat(self, value) -> int:
        """Parse a stat value to integer, handling strings and None"""
        if value is None or value == '--' or value == '-':
            return 0
        try:
            return int(float(str(value).replace(',', '')))
        except (ValueError, TypeError):
            return 0
calculate_fantasy_points method · python · L538-L564 (27 LOC)
backend/rapidapi_data_puller.py
    def calculate_fantasy_points(self, pass_yards, pass_td, pass_int,
                                 rush_yards, rush_td,
                                 receptions, rec_yards, rec_td,
                                 two_pt_conversions=0, fumbles_lost=0, return_td=0):
        """Calculate fantasy points based on league scoring rules (Half-PPR)"""
        points = 0.0

        # Passing
        points += (pass_yards / 15.0)  # 1 point per 15 yards
        points += (pass_td * 4)  # 4 points per TD
        points += (pass_int * -2)  # -2 per INT

        # Rushing
        points += (rush_yards / 10.0)  # 1 point per 10 yards
        points += (rush_td * 6)  # 6 points per TD

        # Receiving
        points += (receptions * 0.5)  # 0.5 PPR
        points += (rec_yards / 10.0)  # 1 point per 10 yards
        points += (rec_td * 6)  # 6 points per TD

        # Other
        points += (two_pt_conversions * 2)  # 2 points per 2pt conversion
        points += (fumbles_lost * -2)  # -2
pull_game_scores method · python · L570-L638 (69 LOC)
backend/rapidapi_data_puller.py
    def pull_game_scores(self, season_year: int, weeks: range = None):
        """Pull game scores, dates, and status from the scoreboard endpoint.
        The boxscore endpoint doesn't include final scores, but scoreboard does."""
        if weeks is None:
            weeks = range(1, 19)

        print(f"\n=== PULLING GAME SCORES FOR {season_year} ===")
        cursor = self.conn.cursor()
        games_updated = 0

        for week in weeks:
            data = self.api_call(f"nfl-scoreboard?year={season_year}&seasonType=2&week={week}")
            if not data:
                continue

            events = data.get('events', [])
            week_updated = 0

            for event in events:
                game_id = event.get('id')
                if not game_id:
                    continue

                # Extract game date and status
                game_date = event.get('date')  # ISO format e.g. "2025-01-04T21:30Z"
                status_info = event.get('status', {}).get('typ
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
pull_all_data method · python · L644-L688 (45 LOC)
backend/rapidapi_data_puller.py
    def pull_all_data(self, seasons: List[int] = None, weeks: range = None):
        """Main method to pull all data"""
        if seasons is None:
            seasons = [2025]
        if weeks is None:
            weeks = range(1, 19)

        print("\n" + "="*60)
        print("DYNASTY FANTASY FOOTBALL - RAPIDAPI DATA PULL")
        print("="*60)

        self.connect_db()
        self.init_database()

        # Step 1: Pull teams
        team_ids = self.pull_teams()

        # Step 2: Pull rosters for all teams
        self.pull_all_rosters(team_ids)

        # Step 3: Pull schedules and boxscores for each season
        for season_year in seasons:
            # Get schedule (game IDs)
            game_data = self.pull_season_schedule(season_year, weeks)

            # Pull boxscores for each game
            print(f"\n=== PULLING BOXSCORES FOR {season_year} ===")
            total_games = len(game_data)

            for idx, (game_id, week, year) in enumerate(game_data, 1):
       
main function · python · L691-L734 (44 LOC)
backend/rapidapi_data_puller.py
def main():
    """Main entry point"""
    import argparse

    parser = argparse.ArgumentParser(description='Pull NFL data from RapidAPI')
    parser.add_argument('--seasons', nargs='+', type=int, default=[2025],
                        help='Seasons to pull (default: 2025)')
    parser.add_argument('--weeks', nargs=2, type=int, default=[1, 18],
                        help='Week range to pull (default: 1 18)')
    parser.add_argument('--teams-only', action='store_true',
                        help='Only pull teams data')
    parser.add_argument('--rosters-only', action='store_true',
                        help='Only pull rosters (requires teams)')
    parser.add_argument('--scores-only', action='store_true',
                        help='Only pull game scores from scoreboard (fast, 18 API calls)')
    parser.add_argument('--test', action='store_true',
                        help='Use test database (nfl_data_test.db)')

    args = parser.parse_args()

    db_file = 'nfl_data_test.d
execute_query function · python · L14-L22 (9 LOC)
backend/sample_queries.py
def execute_query(query, params=()):
    """Execute query and return results"""
    conn = sqlite3.connect(DB_FILE)
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()
    cursor.execute(query, params)
    results = cursor.fetchall()
    conn.close()
    return results
dict_from_row function · python · L25-L27 (3 LOC)
backend/sample_queries.py
def dict_from_row(row):
    """Convert sqlite3.Row to dict"""
    return {key: row[key] for key in row.keys()}
run function · javascript · L30-L37 (8 LOC)
backend/scripts/migrate-games-live-scoring.js
function run(sql) {
  return new Promise((resolve, reject) => {
    db.run(sql, (err) => {
      if (err) reject(err);
      else resolve();
    });
  });
}
get function · javascript · L39-L46 (8 LOC)
backend/scripts/migrate-games-live-scoring.js
function get(sql) {
  return new Promise((resolve, reject) => {
    db.get(sql, (err, row) => {
      if (err) reject(err);
      else resolve(row);
    });
  });
}
migrate function · javascript · L48-L81 (34 LOC)
backend/scripts/migrate-games-live-scoring.js
async function migrate() {
  // Check if columns already exist
  const tableInfo = await new Promise((resolve, reject) => {
    db.all('PRAGMA table_info(games)', (err, rows) => {
      if (err) reject(err);
      else resolve(rows);
    });
  });

  const columns = tableInfo.map(r => r.name);

  if (!columns.includes('is_finalized')) {
    await run('ALTER TABLE games ADD COLUMN is_finalized INTEGER DEFAULT 0');
    console.log('  Added is_finalized column');
  } else {
    console.log('  is_finalized column already exists');
  }

  if (!columns.includes('last_polled_at')) {
    await run('ALTER TABLE games ADD COLUMN last_polled_at TEXT');
    console.log('  Added last_polled_at column');
  } else {
    console.log('  last_polled_at column already exists');
  }

  // Backfill: mark closed games as finalized
  await run("UPDATE games SET is_finalized = 1 WHERE status = 'closed' AND is_finalized = 0");
  const count = await get("SELECT COUNT(*) as cnt FROM games WHERE is_finalized = 1"
migrateDb function · javascript · L42-L60 (19 LOC)
backend/scripts/migrate-league-matchups.js
function migrateDb(dbPath) {
  return new Promise((resolve, reject) => {
    const db = new sqlite3.Database(dbPath, sqlite3.OPEN_READWRITE, (err) => {
      if (err) { reject(err); return; }
    });

    db.exec(MATCHUPS_SQL, (err) => {
      if (err) {
        console.error(`  Failed ${path.basename(dbPath)}: ${err.message}`);
        db.close();
        reject(err);
      } else {
        console.log(`  Migrated ${path.basename(dbPath)}`);
        db.close();
        resolve();
      }
    });
  });
}
Powered by Repobility — scan your code at https://repobility.com
main function · javascript · L62-L75 (14 LOC)
backend/scripts/migrate-league-matchups.js
async function main() {
  const files = fs.readdirSync(leaguesDir).filter(f => f.match(/^league_\d+\.db$/));

  if (files.length === 0) {
    console.log('No league DB files found.');
    return;
  }

  for (const file of files) {
    await migrateDb(path.join(leaguesDir, file));
  }

  console.log(`\nMigrated ${files.length} league DB(s).`);
}
run function · javascript · L30-L37 (8 LOC)
backend/scripts/seed-test-leagues.js
function run(sql, params = []) {
  return new Promise((resolve, reject) => {
    db.run(sql, params, function (err) {
      if (err) reject(err);
      else resolve({ lastID: this.lastID, changes: this.changes });
    });
  });
}
get function · javascript · L39-L46 (8 LOC)
backend/scripts/seed-test-leagues.js
function get(sql, params = []) {
  return new Promise((resolve, reject) => {
    db.get(sql, params, (err, row) => {
      if (err) reject(err);
      else resolve(row);
    });
  });
}
all function · javascript · L48-L55 (8 LOC)
backend/scripts/seed-test-leagues.js
function all(sql, params = []) {
  return new Promise((resolve, reject) => {
    db.all(sql, params, (err, rows) => {
      if (err) reject(err);
      else resolve(rows);
    });
  });
}
seed function · javascript · L57-L208 (152 LOC)
backend/scripts/seed-test-leagues.js
async function seed() {
  console.log('=== Seed Test Leagues ===\n');

  // ── Step 1: Disband League 5 ──
  console.log('Step 1: Disband league 5...');
  const league5 = await get('SELECT id FROM leagues WHERE id = 5');
  if (league5) {
    const memberDel = await run('DELETE FROM league_members WHERE league_id = 5');
    console.log(`  Deleted ${memberDel.changes} league_members rows`);
    const leagueDel = await run('DELETE FROM leagues WHERE id = 5');
    console.log(`  Deleted ${leagueDel.changes} league row`);
  } else {
    console.log('  League 5 not found, already disbanded.');
  }

  const league5DbPath = path.join(leaguesTestDir, 'league_5.db');
  if (fs.existsSync(league5DbPath)) {
    fs.unlinkSync(league5DbPath);
    console.log('  Deleted league_5.db');
  } else {
    console.log('  league_5.db not found, already deleted.');
  }

  // ── Step 2: Create new users ──
  console.log('\nStep 2: Create new users...');

  const newUsers = [
    // League 6 (Snake) — teams 4-10
openDb function · javascript · L60-L67 (8 LOC)
backend/scripts/split-database.js
function openDb(dbPath, mode) {
  return new Promise((resolve, reject) => {
    const db = new sqlite3.Database(dbPath, mode, (err) => {
      if (err) reject(err);
      else resolve(db);
    });
  });
}
run function · javascript · L69-L76 (8 LOC)
backend/scripts/split-database.js
function run(db, sql, params = []) {
  return new Promise((resolve, reject) => {
    db.run(sql, params, function (err) {
      if (err) reject(err);
      else resolve({ lastID: this.lastID, changes: this.changes });
    });
  });
}
all function · javascript · L78-L85 (8 LOC)
backend/scripts/split-database.js
function all(db, sql, params = []) {
  return new Promise((resolve, reject) => {
    db.all(sql, params, (err, rows) => {
      if (err) reject(err);
      else resolve(rows);
    });
  });
}
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
get function · javascript · L87-L94 (8 LOC)
backend/scripts/split-database.js
function get(db, sql, params = []) {
  return new Promise((resolve, reject) => {
    db.get(sql, params, (err, row) => {
      if (err) reject(err);
      else resolve(row);
    });
  });
}
closeDb function · javascript · L96-L100 (5 LOC)
backend/scripts/split-database.js
function closeDb(db) {
  return new Promise((resolve) => {
    db.close(() => resolve());
  });
}
getCreateStatement function · javascript · L105-L112 (8 LOC)
backend/scripts/split-database.js
async function getCreateStatement(sourceDb, tableName) {
  const row = await get(
    sourceDb,
    "SELECT sql FROM sqlite_master WHERE type='table' AND name=?",
    [tableName]
  );
  return row ? row.sql : null;
}
page 1 / 5next ›