Function bodies 228 total
__init__ method · python · L20-L22 (3 LOC)backend/calculation_engine.py
def __init__(self, db_file: str = DB_FILE):
self.db_file = db_file
self.conn = Noneconnect_db method · python · L24-L28 (5 LOC)backend/calculation_engine.py
def connect_db(self):
"""Connect to database"""
self.conn = sqlite3.connect(self.db_file)
self.conn.row_factory = sqlite3.Row
print(f"✓ Connected to database")close_db method · python · L30-L33 (4 LOC)backend/calculation_engine.py
def close_db(self):
"""Close database connection"""
if self.conn:
self.conn.close()calculate_season_stats method · python · L35-L124 (90 LOC)backend/calculation_engine.py
def calculate_season_stats(self, season_year: int):
"""Aggregate game stats into season totals for all players"""
print(f"\n=== CALCULATING SEASON STATS FOR {season_year} ===")
cursor = self.conn.cursor()
# Get all players who played in this season
cursor.execute("""
SELECT DISTINCT
pgs.player_id,
p.position,
pgs.team_id
FROM player_game_stats pgs
JOIN players p ON pgs.player_id = p.player_id
JOIN games g ON pgs.game_id = g.game_id
WHERE g.season_year = ? AND g.season_type = 'REG'
""", (season_year,))
players = cursor.fetchall()
players_processed = 0
for player in players:
player_id = player['player_id']
position = player['position']
team_id = player['team_id']
# Aggregate stats for this player
cucalculate_position_rankings method · python · L126-L173 (48 LOC)backend/calculation_engine.py
def calculate_position_rankings(self, season_year: int):
"""Calculate position rankings for total points and FPPG"""
print(f"\n=== CALCULATING POSITION RANKINGS FOR {season_year} ===")
cursor = self.conn.cursor()
# Get all positions
positions = ['QB', 'RB', 'WR', 'TE']
for position in positions:
# Rank by total points
cursor.execute("""
SELECT player_id, total_fantasy_points
FROM player_season_stats
WHERE season_year = ? AND position = ? AND games_played > 0
ORDER BY total_fantasy_points DESC
""", (season_year, position))
players_total = cursor.fetchall()
for rank, player in enumerate(players_total, 1):
cursor.execute("""
UPDATE player_season_stats
SET position_rank_total_points = ?
WHEcalculate_weekly_position_rankings method · python · L175-L247 (73 LOC)backend/calculation_engine.py
def calculate_weekly_position_rankings(self, season_year: int):
"""Calculate position rankings for each week of the season"""
print(f"\n=== CALCULATING WEEKLY POSITION RANKINGS FOR {season_year} ===")
cursor = self.conn.cursor()
# First, check if position_rank column exists, if not add it
cursor.execute("PRAGMA table_info(player_game_stats)")
columns = [col[1] for col in cursor.fetchall()]
if 'position_rank' not in columns:
print("Adding position_rank column to player_game_stats...")
cursor.execute("""
ALTER TABLE player_game_stats
ADD COLUMN position_rank INTEGER
""")
self.conn.commit()
print("✓ Added position_rank column")
# Get all weeks in this season
cursor.execute("""
SELECT DISTINCT week
FROM games
WHERE season_year = ? AND season_type = 'REGcalculate_defensive_rankings method · python · L249-L379 (131 LOC)backend/calculation_engine.py
def calculate_defensive_rankings(self, season_year: int, through_week: int = None):
"""Calculate defensive rankings by position"""
print(f"\n=== CALCULATING DEFENSIVE RANKINGS FOR {season_year} ===")
cursor = self.conn.cursor()
positions = ['QB', 'RB', 'WR', 'TE']
# Calculate for each team and position
cursor.execute("SELECT team_id FROM teams")
teams = cursor.fetchall()
for team in teams:
team_id = team['team_id']
for position in positions:
# Get all points scored against this team by this position
week_clause = f"AND g.week <= {through_week}" if through_week else ""
cursor.execute(f"""
SELECT
COUNT(DISTINCT g.game_id) as games_played,
SUM(pgs.fantasy_points) as total_points_allowed
FROM player_game_stats pgs
Powered by Repobility — scan your code at https://repobility.com
calculate_projections method · python · L381-L413 (33 LOC)backend/calculation_engine.py
def calculate_projections(self, season_year: int, week: int):
"""Calculate projections for upcoming week"""
print(f"\n=== CALCULATING PROJECTIONS FOR {season_year} WEEK {week} ===")
cursor = self.conn.cursor()
# Get all scheduled games for this week
cursor.execute("""
SELECT game_id, home_team_id, away_team_id
FROM games
WHERE season_year = ? AND week = ? AND season_type = 'REG'
""", (season_year, week))
games = cursor.fetchall()
projections_created = 0
for game in games:
# Process home team players
self._project_team_players(
game['home_team_id'], game['away_team_id'],
season_year, week, cursor
)
projections_created += 1
# Process away team players
self._project_team_players(
game['away_team_id'], game['home__project_team_players method · python · L415-L488 (74 LOC)backend/calculation_engine.py
def _project_team_players(self, team_id: str, opponent_id: str,
season_year: int, week: int, cursor):
"""Project fantasy points for all players on a team"""
# Get all active players on this team
cursor.execute("""
SELECT DISTINCT player_id, position
FROM players
WHERE team_id = ? AND position IN ('QB', 'RB', 'WR', 'TE')
""", (team_id,))
players = cursor.fetchall()
for player in players:
player_id = player['player_id']
position = player['position']
# Calculate last 3 games FPPG
if week <= 3:
# Use previous season's FPPG
cursor.execute("""
SELECT fantasy_points_per_game
FROM player_season_stats
WHERE player_id = ? AND season_year = ?
""", (player_id, season_year - 1))
run_all_calculations method · python · L490-L527 (38 LOC)backend/calculation_engine.py
def run_all_calculations(self):
"""Run all calculations for all seasons"""
print("\n" + "="*60)
print("DYNASTY FANTASY FOOTBALL - CALCULATIONS")
print("="*60)
self.connect_db()
seasons = [2021, 2022, 2023, 2024, 2025]
for season_year in seasons:
# Calculate season stats
self.calculate_season_stats(season_year)
# Calculate weekly position rankings
self.calculate_weekly_position_rankings(season_year)
# Calculate season-level position rankings
self.calculate_position_rankings(season_year)
# Calculate defensive rankings (full season)
self.calculate_defensive_rankings(season_year)
# Calculate week-by-week defensive rankings for current season
if season_year == 2025:
for week in range(1, 18):
self.calcapi_call function · python · L27-L47 (21 LOC)backend/pull_2025_schedule_only.py
def api_call(endpoint: str) -> Optional[Dict]:
"""Make API call with rate limiting"""
url = f"{BASE_URL}/{endpoint}?api_key={SPORTRADAR_API_KEY}"
try:
time.sleep(API_CALL_DELAY)
response = requests.get(url, timeout=30)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
print("Rate limit hit, waiting 60 seconds...")
time.sleep(60)
return api_call(endpoint)
else:
print(f"API Error {response.status_code}: {endpoint}")
return None
except Exception as e:
print(f"Exception: {e}")
return Nonepull_schedule function · python · L50-L157 (108 LOC)backend/pull_2025_schedule_only.py
def pull_schedule():
"""Pull and insert schedule for weeks 4-18"""
print("\n" + "="*60)
print("PULL 2025 SCHEDULE - WEEKS 4-18")
print("="*60)
print(f"\nThis will add weeks {WEEKS_TO_ADD[0]}-{WEEKS_TO_ADD[-1]} as 'scheduled' games")
print("No stats will be pulled - just the schedule for testing future matchups")
print("="*60)
# Connect to database
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# Fetch the schedule
print(f"\nFetching {SEASON_YEAR} {SEASON_TYPE} schedule...")
endpoint = f"games/{SEASON_YEAR}/{SEASON_TYPE}/schedule.json"
data = api_call(endpoint)
if not data:
print("Failed to get schedule")
conn.close()
return
games_added = 0
games_skipped = 0
for week in data.get('weeks', []):
week_num = week.get('sequence', 0)
# Only process specified weeks
if week_num not in WEEKS_TO_ADD:
continue
print(f"\nProcessing Week {week_num}api_call function · python · L23-L47 (25 LOC)backend/pull_full_rosters.py
def api_call(endpoint: str) -> Optional[Dict]:
"""Make API call with rate limiting and error handling"""
url = f"{BASE_URL}/{endpoint}"
headers = {
'accept': 'application/json',
'x-api-key': SPORTRADAR_API_KEY
}
try:
time.sleep(API_CALL_DELAY)
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
print(f" Rate limit hit, waiting 60 seconds...")
time.sleep(60)
return api_call(endpoint) # Retry
else:
print(f" API Error {response.status_code}: {endpoint}")
return None
except Exception as e:
print(f" Exception calling API: {e}")
return Noneget_all_team_ids function · python · L50-L54 (5 LOC)backend/pull_full_rosters.py
def get_all_team_ids(conn) -> list:
"""Get all team IDs from the database"""
cursor = conn.cursor()
cursor.execute("SELECT team_id, team_alias FROM teams ORDER BY team_alias")
return cursor.fetchall()pull_team_roster function · python · L57-L125 (69 LOC)backend/pull_full_rosters.py
def pull_team_roster(team_id: str, team_alias: str, conn) -> int:
"""Pull full roster for a single team and update database"""
print(f" Pulling roster for {team_alias}...")
data = api_call(f"teams/{team_id}/full_roster.json")
if not data:
print(f" Failed to get roster for {team_alias}")
return 0
cursor = conn.cursor()
players_updated = 0
players = data.get('players', [])
for player in players:
player_id = player.get('id')
if not player_id:
continue
# Only include fantasy-relevant positions
position = player.get('position', '')
if position not in ['QB', 'RB', 'WR', 'TE', 'K']:
continue
# Build full name
first_name = player.get('first_name', '')
last_name = player.get('last_name', '')
full_name = player.get('name', f"{first_name} {last_name}")
# Get other fields
jersey = player.get('jersey')
status = player.get(Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
main function · python · L128-L190 (63 LOC)backend/pull_full_rosters.py
def main():
print("=" * 60)
print("PULL FULL NFL ROSTERS")
print("=" * 60)
print()
print("This will pull complete rosters for all 32 NFL teams")
print("and update the players table with any missing players.")
print("=" * 60)
print()
response = input("Pull full rosters for all teams? (yes/no): ")
if response.lower() != 'yes':
print("Cancelled.")
return
# Connect to database
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
# Get player count before
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM players")
players_before = cursor.fetchone()[0]
print(f"\nPlayers in database before: {players_before}")
# Get all teams
teams = get_all_team_ids(conn)
print(f"Teams to process: {len(teams)}")
print()
total_players = 0
for team_id, team_alias in teams:
count = pull_team_roster(team_id, team_alias, conn)
total_players += count
print__init__ method · python · L40-L44 (5 LOC)backend/rapidapi_data_puller.py
def __init__(self, db_file: str = DB_FILE):
self.db_file = db_file
self.conn = None
self.api_calls_made = 0
self.position_lookup = {} # Cache for player positions from backupload_position_lookup method · python · L46-L64 (19 LOC)backend/rapidapi_data_puller.py
def load_position_lookup(self):
"""Load position data from Sportradar backup for hybrid approach"""
backup_path = os.path.join(os.path.dirname(__file__), 'dynasty_football_sportradar_backup.db')
if not os.path.exists(backup_path):
print("[WARN] No backup database found for position lookup")
return
try:
backup_conn = sqlite3.connect(backup_path)
cursor = backup_conn.cursor()
cursor.execute('SELECT full_name, position FROM players WHERE position IS NOT NULL')
for row in cursor.fetchall():
# Normalize name for lookup
name = row[0].lower().strip()
self.position_lookup[name] = row[1]
backup_conn.close()
print(f"[OK] Loaded {len(self.position_lookup)} player positions from backup")
except Exception as e:
print(f"[WARN] Could not load position lookup: {e}")connect_db method · python · L66-L70 (5 LOC)backend/rapidapi_data_puller.py
def connect_db(self):
"""Connect to SQLite database"""
self.conn = sqlite3.connect(self.db_file)
self.conn.row_factory = sqlite3.Row
print(f"✓ Connected to database: {self.db_file}")close_db method · python · L72-L76 (5 LOC)backend/rapidapi_data_puller.py
def close_db(self):
"""Close database connection"""
if self.conn:
self.conn.close()
print(f"✓ Closed database connection")init_database method · python · L78-L98 (21 LOC)backend/rapidapi_data_puller.py
def init_database(self):
"""Initialize database with schema (skip if tables exist)"""
print("\n=== INITIALIZING DATABASE ===")
cursor = self.conn.cursor()
# Check if tables already exist
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='teams'")
if cursor.fetchone():
print("[OK] Database tables already exist, skipping schema creation")
return
# Read and execute schema only if tables don't exist
schema_path = os.path.join(os.path.dirname(__file__), 'dynasty_schema.sql')
with open(schema_path, 'r') as f:
schema = f.read()
cursor.executescript(schema)
self.conn.commit()
print("[OK] Database schema created")api_call method · python · L100-L124 (25 LOC)backend/rapidapi_data_puller.py
def api_call(self, endpoint: str, params: Dict = None) -> Optional[Dict]:
"""Make API call with rate limiting and error handling"""
url = f"{BASE_URL}/{endpoint}"
try:
# Rate limiting
time.sleep(API_CALL_DELAY)
response = requests.get(url, headers=HEADERS, params=params, timeout=30)
self.api_calls_made += 1
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
print(f"⚠ Rate limit hit, waiting 60 seconds...")
time.sleep(60)
return self.api_call(endpoint, params) # Retry
else:
print(f"✗ API Error {response.status_code}: {endpoint}")
print(f" Response: {response.text[:200]}")
return None
except Exception as e:
print(f"✗ Exception calling API: {e}")
return Nonepull_teams method · python · L130-L182 (53 LOC)backend/rapidapi_data_puller.py
def pull_teams(self):
"""Pull all NFL teams from /nfl-team-listing/v1/data"""
print("\n=== PULLING TEAMS ===")
data = self.api_call("nfl-team-listing/v1/data")
if not data:
print("✗ Failed to get teams")
return []
cursor = self.conn.cursor()
teams_added = 0
team_ids = []
# Response is an array of {'team': {...}} objects
teams_list = data if isinstance(data, list) else data.get('teams', [])
for item in teams_list:
try:
# Unwrap the 'team' key if present
team = item.get('team', item) if isinstance(item, dict) else item
team_id = team.get('id')
if not team_id:
continue
team_ids.append(team_id)
# Extract logo URL
logos = team.get('logos', [])
logo_url = logos[0].get('href') if logos else None
cursor.exeOpen data scored by Repobility · https://repobility.com
pull_team_roster method · python · L188-L270 (83 LOC)backend/rapidapi_data_puller.py
def pull_team_roster(self, team_id: str):
"""Pull roster for a specific team from /nfl-team-roster"""
data = self.api_call("nfl-team-roster", params={"id": team_id})
if not data:
return 0
cursor = self.conn.cursor()
players_added = 0
# Athletes is a direct array of player objects
athletes = data.get('athletes', [])
for player in athletes:
if isinstance(player, dict):
try:
player_id = player.get('id')
if not player_id:
continue
# Extract position - try lookup from backup first
full_name = player.get('fullName', player.get('displayName', ''))
position = self.position_lookup.get(full_name.lower().strip(), '')
# If not found in lookup, use generic position or empty
if not position:
position = ppull_all_rosters method · python · L272-L286 (15 LOC)backend/rapidapi_data_puller.py
def pull_all_rosters(self, team_ids: List[str]):
"""Pull rosters for all teams"""
print("\n=== PULLING ROSTERS ===")
# Load position lookup from backup for hybrid approach
self.load_position_lookup()
total_players = 0
for idx, team_id in enumerate(team_ids, 1):
print(f" Team {idx}/{len(team_ids)}", end='\r')
players = self.pull_team_roster(team_id)
total_players += players
print(f"\n✓ Added {total_players} players from {len(team_ids)} teams")pull_week_schedule method · python · L292-L306 (15 LOC)backend/rapidapi_data_puller.py
def pull_week_schedule(self, year: int, week: int) -> List[str]:
"""Pull game IDs for a specific week from /nfl-weeks-events"""
data = self.api_call("nfl-weeks-events", params={
"year": year,
"week": week,
"type": 2 # Regular season
})
if not data:
return []
items = data.get('items', [])
game_ids = [item.get('eventid') for item in items if item.get('eventid')]
return game_idspull_season_schedule method · python · L308-L320 (13 LOC)backend/rapidapi_data_puller.py
def pull_season_schedule(self, year: int, weeks: range = range(1, 19)):
"""Pull schedule for an entire season"""
print(f"\n=== PULLING {year} SEASON SCHEDULE ===")
all_game_ids = []
for week in weeks:
game_ids = self.pull_week_schedule(year, week)
all_game_ids.extend([(gid, week, year) for gid in game_ids])
print(f" Week {week}: {len(game_ids)} games")
print(f"✓ Found {len(all_game_ids)} total games for {year}")
return all_game_idspull_boxscore method · python · L326-L527 (202 LOC)backend/rapidapi_data_puller.py
def pull_boxscore(self, game_id: str, week: int, season_year: int):
"""Pull boxscore for a specific game from /nfl-boxscore"""
data = self.api_call("nfl-boxscore", params={"id": game_id})
if not data:
return
cursor = self.conn.cursor()
# Extract game info
boxscore = data.get('boxscore', {})
# Get teams info for the games table
teams_data = boxscore.get('teams', [])
home_team = None
away_team = None
for team_data in teams_data:
if team_data.get('homeAway') == 'home':
home_team = team_data.get('team', {})
elif team_data.get('homeAway') == 'away':
away_team = team_data.get('team', {})
# Insert/update game record
if home_team and away_team:
# Get game header info
header = data.get('header', {})
competitions = header.get('competitions', [{}])
competition = competit_parse_stat method · python · L529-L536 (8 LOC)backend/rapidapi_data_puller.py
def _parse_stat(self, value) -> int:
"""Parse a stat value to integer, handling strings and None"""
if value is None or value == '--' or value == '-':
return 0
try:
return int(float(str(value).replace(',', '')))
except (ValueError, TypeError):
return 0calculate_fantasy_points method · python · L538-L564 (27 LOC)backend/rapidapi_data_puller.py
def calculate_fantasy_points(self, pass_yards, pass_td, pass_int,
rush_yards, rush_td,
receptions, rec_yards, rec_td,
two_pt_conversions=0, fumbles_lost=0, return_td=0):
"""Calculate fantasy points based on league scoring rules (Half-PPR)"""
points = 0.0
# Passing
points += (pass_yards / 15.0) # 1 point per 15 yards
points += (pass_td * 4) # 4 points per TD
points += (pass_int * -2) # -2 per INT
# Rushing
points += (rush_yards / 10.0) # 1 point per 10 yards
points += (rush_td * 6) # 6 points per TD
# Receiving
points += (receptions * 0.5) # 0.5 PPR
points += (rec_yards / 10.0) # 1 point per 10 yards
points += (rec_td * 6) # 6 points per TD
# Other
points += (two_pt_conversions * 2) # 2 points per 2pt conversion
points += (fumbles_lost * -2) # -2pull_game_scores method · python · L570-L638 (69 LOC)backend/rapidapi_data_puller.py
def pull_game_scores(self, season_year: int, weeks: range = None):
"""Pull game scores, dates, and status from the scoreboard endpoint.
The boxscore endpoint doesn't include final scores, but scoreboard does."""
if weeks is None:
weeks = range(1, 19)
print(f"\n=== PULLING GAME SCORES FOR {season_year} ===")
cursor = self.conn.cursor()
games_updated = 0
for week in weeks:
data = self.api_call(f"nfl-scoreboard?year={season_year}&seasonType=2&week={week}")
if not data:
continue
events = data.get('events', [])
week_updated = 0
for event in events:
game_id = event.get('id')
if not game_id:
continue
# Extract game date and status
game_date = event.get('date') # ISO format e.g. "2025-01-04T21:30Z"
status_info = event.get('status', {}).get('typMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
pull_all_data method · python · L644-L688 (45 LOC)backend/rapidapi_data_puller.py
def pull_all_data(self, seasons: List[int] = None, weeks: range = None):
"""Main method to pull all data"""
if seasons is None:
seasons = [2025]
if weeks is None:
weeks = range(1, 19)
print("\n" + "="*60)
print("DYNASTY FANTASY FOOTBALL - RAPIDAPI DATA PULL")
print("="*60)
self.connect_db()
self.init_database()
# Step 1: Pull teams
team_ids = self.pull_teams()
# Step 2: Pull rosters for all teams
self.pull_all_rosters(team_ids)
# Step 3: Pull schedules and boxscores for each season
for season_year in seasons:
# Get schedule (game IDs)
game_data = self.pull_season_schedule(season_year, weeks)
# Pull boxscores for each game
print(f"\n=== PULLING BOXSCORES FOR {season_year} ===")
total_games = len(game_data)
for idx, (game_id, week, year) in enumerate(game_data, 1):
main function · python · L691-L734 (44 LOC)backend/rapidapi_data_puller.py
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description='Pull NFL data from RapidAPI')
parser.add_argument('--seasons', nargs='+', type=int, default=[2025],
help='Seasons to pull (default: 2025)')
parser.add_argument('--weeks', nargs=2, type=int, default=[1, 18],
help='Week range to pull (default: 1 18)')
parser.add_argument('--teams-only', action='store_true',
help='Only pull teams data')
parser.add_argument('--rosters-only', action='store_true',
help='Only pull rosters (requires teams)')
parser.add_argument('--scores-only', action='store_true',
help='Only pull game scores from scoreboard (fast, 18 API calls)')
parser.add_argument('--test', action='store_true',
help='Use test database (nfl_data_test.db)')
args = parser.parse_args()
db_file = 'nfl_data_test.dexecute_query function · python · L14-L22 (9 LOC)backend/sample_queries.py
def execute_query(query, params=()):
"""Execute query and return results"""
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(query, params)
results = cursor.fetchall()
conn.close()
return resultsdict_from_row function · python · L25-L27 (3 LOC)backend/sample_queries.py
def dict_from_row(row):
"""Convert sqlite3.Row to dict"""
return {key: row[key] for key in row.keys()}run function · javascript · L30-L37 (8 LOC)backend/scripts/migrate-games-live-scoring.js
function run(sql) {
return new Promise((resolve, reject) => {
db.run(sql, (err) => {
if (err) reject(err);
else resolve();
});
});
}get function · javascript · L39-L46 (8 LOC)backend/scripts/migrate-games-live-scoring.js
function get(sql) {
return new Promise((resolve, reject) => {
db.get(sql, (err, row) => {
if (err) reject(err);
else resolve(row);
});
});
}migrate function · javascript · L48-L81 (34 LOC)backend/scripts/migrate-games-live-scoring.js
async function migrate() {
// Check if columns already exist
const tableInfo = await new Promise((resolve, reject) => {
db.all('PRAGMA table_info(games)', (err, rows) => {
if (err) reject(err);
else resolve(rows);
});
});
const columns = tableInfo.map(r => r.name);
if (!columns.includes('is_finalized')) {
await run('ALTER TABLE games ADD COLUMN is_finalized INTEGER DEFAULT 0');
console.log(' Added is_finalized column');
} else {
console.log(' is_finalized column already exists');
}
if (!columns.includes('last_polled_at')) {
await run('ALTER TABLE games ADD COLUMN last_polled_at TEXT');
console.log(' Added last_polled_at column');
} else {
console.log(' last_polled_at column already exists');
}
// Backfill: mark closed games as finalized
await run("UPDATE games SET is_finalized = 1 WHERE status = 'closed' AND is_finalized = 0");
const count = await get("SELECT COUNT(*) as cnt FROM games WHERE is_finalized = 1"migrateDb function · javascript · L42-L60 (19 LOC)backend/scripts/migrate-league-matchups.js
function migrateDb(dbPath) {
return new Promise((resolve, reject) => {
const db = new sqlite3.Database(dbPath, sqlite3.OPEN_READWRITE, (err) => {
if (err) { reject(err); return; }
});
db.exec(MATCHUPS_SQL, (err) => {
if (err) {
console.error(` Failed ${path.basename(dbPath)}: ${err.message}`);
db.close();
reject(err);
} else {
console.log(` Migrated ${path.basename(dbPath)}`);
db.close();
resolve();
}
});
});
}Powered by Repobility — scan your code at https://repobility.com
main function · javascript · L62-L75 (14 LOC)backend/scripts/migrate-league-matchups.js
async function main() {
const files = fs.readdirSync(leaguesDir).filter(f => f.match(/^league_\d+\.db$/));
if (files.length === 0) {
console.log('No league DB files found.');
return;
}
for (const file of files) {
await migrateDb(path.join(leaguesDir, file));
}
console.log(`\nMigrated ${files.length} league DB(s).`);
}run function · javascript · L30-L37 (8 LOC)backend/scripts/seed-test-leagues.js
function run(sql, params = []) {
return new Promise((resolve, reject) => {
db.run(sql, params, function (err) {
if (err) reject(err);
else resolve({ lastID: this.lastID, changes: this.changes });
});
});
}get function · javascript · L39-L46 (8 LOC)backend/scripts/seed-test-leagues.js
function get(sql, params = []) {
return new Promise((resolve, reject) => {
db.get(sql, params, (err, row) => {
if (err) reject(err);
else resolve(row);
});
});
}all function · javascript · L48-L55 (8 LOC)backend/scripts/seed-test-leagues.js
function all(sql, params = []) {
return new Promise((resolve, reject) => {
db.all(sql, params, (err, rows) => {
if (err) reject(err);
else resolve(rows);
});
});
}seed function · javascript · L57-L208 (152 LOC)backend/scripts/seed-test-leagues.js
async function seed() {
console.log('=== Seed Test Leagues ===\n');
// ── Step 1: Disband League 5 ──
console.log('Step 1: Disband league 5...');
const league5 = await get('SELECT id FROM leagues WHERE id = 5');
if (league5) {
const memberDel = await run('DELETE FROM league_members WHERE league_id = 5');
console.log(` Deleted ${memberDel.changes} league_members rows`);
const leagueDel = await run('DELETE FROM leagues WHERE id = 5');
console.log(` Deleted ${leagueDel.changes} league row`);
} else {
console.log(' League 5 not found, already disbanded.');
}
const league5DbPath = path.join(leaguesTestDir, 'league_5.db');
if (fs.existsSync(league5DbPath)) {
fs.unlinkSync(league5DbPath);
console.log(' Deleted league_5.db');
} else {
console.log(' league_5.db not found, already deleted.');
}
// ── Step 2: Create new users ──
console.log('\nStep 2: Create new users...');
const newUsers = [
// League 6 (Snake) — teams 4-10openDb function · javascript · L60-L67 (8 LOC)backend/scripts/split-database.js
function openDb(dbPath, mode) {
return new Promise((resolve, reject) => {
const db = new sqlite3.Database(dbPath, mode, (err) => {
if (err) reject(err);
else resolve(db);
});
});
}run function · javascript · L69-L76 (8 LOC)backend/scripts/split-database.js
function run(db, sql, params = []) {
return new Promise((resolve, reject) => {
db.run(sql, params, function (err) {
if (err) reject(err);
else resolve({ lastID: this.lastID, changes: this.changes });
});
});
}all function · javascript · L78-L85 (8 LOC)backend/scripts/split-database.js
function all(db, sql, params = []) {
return new Promise((resolve, reject) => {
db.all(sql, params, (err, rows) => {
if (err) reject(err);
else resolve(rows);
});
});
}Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
get function · javascript · L87-L94 (8 LOC)backend/scripts/split-database.js
function get(db, sql, params = []) {
return new Promise((resolve, reject) => {
db.get(sql, params, (err, row) => {
if (err) reject(err);
else resolve(row);
});
});
}closeDb function · javascript · L96-L100 (5 LOC)backend/scripts/split-database.js
function closeDb(db) {
return new Promise((resolve) => {
db.close(() => resolve());
});
}getCreateStatement function · javascript · L105-L112 (8 LOC)backend/scripts/split-database.js
async function getCreateStatement(sourceDb, tableName) {
const row = await get(
sourceDb,
"SELECT sql FROM sqlite_master WHERE type='table' AND name=?",
[tableName]
);
return row ? row.sql : null;
}page 1 / 5next ›