← back to drPod__tracelight

Function bodies 33 total

All specs Real LLM only Function bodies
_get_conn function · python · L14-L22 (9 LOC)
db/icij_db.py
def _get_conn() -> sqlite3.Connection:
    """Get a thread-local SQLite connection."""
    if not hasattr(_local, "conn") or _local.conn is None:
        _local.conn = sqlite3.connect(DB_PATH)
        _local.conn.row_factory = sqlite3.Row
        _local.conn.execute("PRAGMA journal_mode=WAL")
        _local.conn.execute("PRAGMA cache_size=-128000")  # 128MB cache
        _local.conn.execute("PRAGMA query_only=ON")
    return _local.conn
search_nodes function · python · L25-L78 (54 LOC)
db/icij_db.py
def search_nodes(query: str, limit: int = 30) -> list:
    """FTS5 name search with prefix matching. Returns list of node dicts."""
    limit = int(limit)
    conn = _get_conn()

    # Sanitize: remove FTS5 special characters, add prefix matching
    sanitized = "".join(c for c in query if c.isalnum() or c.isspace() or c == "-")
    sanitized = sanitized.strip()
    if not sanitized:
        return []

    # Add prefix wildcard for partial matching
    terms = sanitized.split()
    fts_query = " ".join(f'"{t}"*' for t in terms if t)

    rows = conn.execute(
        """SELECT n.node_id, n.name, n.node_type, n.countries, n.country_codes,
                  n.source_id, n.jurisdiction, n.address, n.company_type,
                  pp.node_id IS NOT NULL AS is_power_player,
                  pp.title AS power_player_title,
                  pp.release AS power_player_release
           FROM nodes_fts fts
           JOIN nodes n ON n.rowid = fts.rowid
           LEFT JOIN power_players pp ON
get_node function · python · L81-L112 (32 LOC)
db/icij_db.py
def get_node(node_id: str) -> Optional[dict]:
    """Fetch a single node by ID with power player status."""
    conn = _get_conn()
    r = conn.execute(
        """SELECT n.*, pp.node_id IS NOT NULL AS is_power_player,
                  pp.title AS power_player_title,
                  pp.release AS power_player_release
           FROM nodes n
           LEFT JOIN power_players pp ON pp.node_id = n.node_id
           WHERE n.node_id = ?""",
        (node_id,),
    ).fetchone()

    if not r:
        return None

    return {
        "node_id": r["node_id"],
        "name": r["name"],
        "node_type": r["node_type"],
        "countries": r["countries"],
        "country_codes": r["country_codes"],
        "source_id": r["source_id"],
        "jurisdiction": r["jurisdiction"],
        "address": r["address"],
        "company_type": r["company_type"],
        "incorporation_date": r["incorporation_date"],
        "status": r["status"],
        "is_power_player": bool(r["is_power_play
_batch_query function · python · L115-L125 (11 LOC)
db/icij_db.py
def _batch_query(conn, sql_template: str, ids: list, extra_params: list = None) -> list:
    """Execute a query with batched IN clause (SQLite 999 variable limit)."""
    results = []
    batch_size = 900  # Leave room for extra params
    for i in range(0, len(ids), batch_size):
        batch = ids[i : i + batch_size]
        placeholders = ",".join("?" for _ in batch)
        sql = sql_template.replace("__PLACEHOLDERS__", placeholders)
        params = (extra_params or []) + batch
        results.extend(conn.execute(sql, params).fetchall())
    return results
get_subgraph function · python · L128-L344 (217 LOC)
db/icij_db.py
def get_subgraph(
    center_node_id: str,
    max_depth: int = 3,
    max_nodes: int = 500,
) -> dict:
    max_depth = int(max_depth)
    max_nodes = int(max_nodes)
    """BFS from center node, returning nodes with hop distance and edges."""
    conn = _get_conn()

    # Verify center node exists
    center = get_node(center_node_id)
    if not center:
        return {
            "nodes": [],
            "edges": [],
            "power_players_found": [],
            "truncated": False,
            "stats": {"total_nodes": 0, "total_edges": 0, "max_depth_reached": 0},
        }

    visited = set()
    seen_names = set()  # Deduplicate same person appearing as multiple records
    all_nodes = []
    all_edges = []
    all_edge_keys = set()
    power_players_found = []
    truncated = False
    max_depth_reached = 0

    # Add center node at hop 0
    center["hop"] = 0
    all_nodes.append(center)
    visited.add(center_node_id)
    seen_names.add((center["name"].lower(), center["node
get_node_detail function · python · L347-L387 (41 LOC)
db/icij_db.py
def get_node_detail(node_id: str) -> Optional[dict]:
    """Fetch full node details with immediate connections and ICIJ URL."""
    conn = _get_conn()
    node = get_node(node_id)
    if not node:
        return None

    # Get immediate connections
    connections = conn.execute(
        """SELECT r.rel_type, r.link, r.source_id, r.start_date, r.end_date,
                  n.node_id, n.name, n.node_type
           FROM relationships r
           JOIN nodes n ON n.node_id = CASE
               WHEN r.node_id_start = ? THEN r.node_id_end
               ELSE r.node_id_start
           END
           WHERE (r.node_id_start = ? OR r.node_id_end = ?)
           AND r.rel_type NOT IN ('same_name_as', 'similar', 'same_company_as',
                                   'same_as', 'same_id_as')
           LIMIT 100""",
        (node_id, node_id, node_id),
    ).fetchall()

    node["connections"] = [
        {
            "rel_type": c["rel_type"],
            "link": c["link"],
            "sourc
get_leak_name function · python · L390-L401 (12 LOC)
db/icij_db.py
def get_leak_name(source_id: str) -> str:
    """Map source_id to human-readable leak name."""
    if not source_id:
        return "Unknown"
    conn = _get_conn()
    r = conn.execute(
        "SELECT leak_name FROM sources WHERE source_id = ?", (source_id,)
    ).fetchone()
    if r:
        return r["leak_name"]
    # Fallback: extract from prefix
    return source_id.split(" - ")[0] if " - " in source_id else source_id
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
get_node_url function · python · L404-L406 (3 LOC)
db/icij_db.py
def get_node_url(node_id: str) -> str:
    """Construct ICIJ Offshore Leaks URL for a node."""
    return f"https://offshoreleaks.icij.org/nodes/{node_id}"
get_stats function · python · L409-L439 (31 LOC)
db/icij_db.py
def get_stats() -> dict:
    """Return database statistics."""
    conn = _get_conn()

    total = conn.execute("SELECT COUNT(*) FROM nodes").fetchone()[0]

    type_counts = {}
    for row in conn.execute(
        "SELECT node_type, COUNT(*) as cnt FROM nodes GROUP BY node_type"
    ):
        type_counts[row["node_type"]] = row["cnt"]

    rel_count = conn.execute("SELECT COUNT(*) FROM relationships").fetchone()[0]
    pp_count = conn.execute("SELECT COUNT(*) FROM power_players").fetchone()[0]

    leaks = [
        row[0]
        for row in conn.execute("SELECT DISTINCT leak_name FROM sources ORDER BY leak_name")
    ]

    return {
        "total_nodes": total,
        "total_entities": type_counts.get("entity", 0),
        "total_officers": type_counts.get("officer", 0),
        "total_addresses": type_counts.get("address", 0),
        "total_intermediaries": type_counts.get("intermediary", 0),
        "total_others": type_counts.get("other", 0),
        "total_relationships": rel
_load_pp_json function · python · L447-L453 (7 LOC)
db/icij_db.py
def _load_pp_json():
    global _pp_json_cache
    if _pp_json_cache is None:
        pp_path = os.path.join(os.path.dirname(__file__), "..", "data", "power_players.json")
        with open(pp_path) as f:
            _pp_json_cache = {p["slug"]: p for p in json.load(f)}
    return _pp_json_cache
_compute_pp_connectivity function · python · L456-L486 (31 LOC)
db/icij_db.py
def _compute_pp_connectivity(pps: list) -> list:
    """Pre-compute which PP pairs are connected. Cached after first call."""
    global _pp_connectivity_cache
    if _pp_connectivity_cache is not None:
        return _pp_connectivity_cache

    # Check for cached file first
    cache_path = os.path.join(os.path.dirname(__file__), "..", "data", "pp_connections.json")
    if os.path.exists(cache_path):
        with open(cache_path) as f:
            _pp_connectivity_cache = json.load(f)
        return _pp_connectivity_cache

    # Compute all pairs
    connected = []
    for i in range(len(pps)):
        for j in range(i + 1, len(pps)):
            r = find_path(pps[i]["node_ids"][0], pps[j]["node_ids"][0], max_depth=8)
            if r["found"]:
                connected.append({
                    "slug_a": pps[i]["slug"],
                    "slug_b": pps[j]["slug"],
                    "hops": r["hops"],
                })

    # Cache to file for next time
    with open(cache_path
get_power_players function · python · L489-L529 (41 LOC)
db/icij_db.py
def get_power_players() -> list:
    """Return all Power Players that have DB entries, with image URLs from JSON."""
    conn = _get_conn()
    pp_json = _load_pp_json()

    rows = conn.execute(
        """SELECT slug, GROUP_CONCAT(node_id) as node_ids, name, title, release
           FROM power_players GROUP BY slug ORDER BY name"""
    ).fetchall()

    players = []
    for r in rows:
        slug = r["slug"]
        json_entry = pp_json.get(slug, {})
        players.append({
            "slug": slug,
            "name": r["name"],
            "title": r["title"],
            "release": r["release"],
            "image": json_entry.get("image", ""),
            "node_ids": r["node_ids"].split(","),
            "icij_link": json_entry.get("link", f"/power-players/{slug}"),
        })

    # Compute connectivity (cached after first call)
    connected_pairs = _compute_pp_connectivity(players)

    # Mark which PPs have connections and build adjacency
    connectable_slugs = set()
    
find_path function · python · L543-L700 (158 LOC)
db/icij_db.py
def find_path(start_id: str, end_id: str, max_depth: int = 6) -> dict:
    """Bidirectional BFS between two node IDs. Returns shortest path."""
    max_depth = int(max_depth)
    start_id = str(start_id)
    end_id = str(end_id)
    conn = _get_conn()

    if start_id == end_id:
        node = get_node(start_id)
        if node:
            node["hop"] = 0
            return {"found": True, "path_nodes": [node], "path_edges": [], "hops": 0}
        return {"found": False, "path_nodes": [], "path_edges": [], "hops": 0}

    # Forward BFS state (from start)
    fwd_visited = {start_id: None}
    fwd_frontier = [start_id]
    fwd_edge_map = {}  # node_id -> (parent_id, rel_type, link, source_id)

    # Backward BFS state (from end)
    bwd_visited = {end_id: None}
    bwd_frontier = [end_id]
    bwd_edge_map = {}

    meeting_node = None

    for depth in range(1, max_depth + 1):
        if not fwd_frontier and not bwd_frontier:
            break

        # Expand the smaller frontier (or
find_connections function · python · L703-L752 (50 LOC)
db/icij_db.py
def find_connections(node_ids: list, max_depth: int = 6) -> dict:
    """Find paths between all pairs of node IDs, merge into one subgraph."""
    max_depth = int(max_depth)
    node_ids = [str(nid) for nid in node_ids]

    all_nodes = {}  # node_id -> node dict
    all_edges = []
    all_edge_keys = set()
    paths_found = []
    paths_not_found = []

    # All unique pairs
    for i in range(len(node_ids)):
        for j in range(i + 1, len(node_ids)):
            result = find_path(node_ids[i], node_ids[j], max_depth=max_depth)
            if result["found"]:
                paths_found.append({
                    "start_id": node_ids[i],
                    "end_id": node_ids[j],
                    "hops": result["hops"],
                    "path_node_ids": [n["node_id"] for n in result["path_nodes"]],
                })
                for node in result["path_nodes"]:
                    all_nodes[node["node_id"]] = node
                for edge in result["path_edges"]:
     
get_cached_summary function · python · L759-L770 (12 LOC)
db/icij_db.py
def get_cached_summary(start_id: str, end_id: str) -> str:
    """Return cached LLM summary for a pair, or empty string if not cached."""
    key = f"{start_id}:{end_id}"
    alt_key = f"{end_id}:{start_id}"
    try:
        if os.path.exists(_SUMMARY_CACHE_PATH):
            with open(_SUMMARY_CACHE_PATH) as f:
                cache = json.load(f)
            return cache.get(key, cache.get(alt_key, ""))
    except Exception:
        pass
    return ""
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
save_cached_summary function · python · L773-L785 (13 LOC)
db/icij_db.py
def save_cached_summary(start_id: str, end_id: str, summary: str) -> None:
    """Persist an LLM summary so we never pay for the same call twice."""
    key = f"{start_id}:{end_id}"
    cache = {}
    try:
        if os.path.exists(_SUMMARY_CACHE_PATH):
            with open(_SUMMARY_CACHE_PATH) as f:
                cache = json.load(f)
    except Exception:
        pass
    cache[key] = summary
    with open(_SUMMARY_CACHE_PATH, "w") as f:
        json.dump(cache, f, indent=2)
get_neighbor_options function · python · L788-L824 (37 LOC)
db/icij_db.py
def get_neighbor_options(node_id: str) -> list:
    """Get all meaningful neighbors of a node with metadata for agent decision-making."""
    conn = _get_conn()
    node_id = str(node_id)

    rows = conn.execute(
        """SELECT
               CASE WHEN r.node_id_start = ? THEN r.node_id_end ELSE r.node_id_start END as neighbor_id,
               r.rel_type,
               n.name, n.node_type, n.countries, n.jurisdiction,
               pp.node_id IS NOT NULL as is_pp,
               pp.title as pp_title
           FROM relationships r
           JOIN nodes n ON n.node_id = CASE WHEN r.node_id_start = ? THEN r.node_id_end ELSE r.node_id_start END
           LEFT JOIN power_players pp ON pp.node_id = n.node_id
           WHERE (r.node_id_start = ? OR r.node_id_end = ?)
           AND r.rel_type NOT IN ('same_name_as','similar','same_company_as','same_as',
                                   'same_id_as','similar_company_as','probably_same_officer_as',
                                 
generate_investigation_log function · python · L827-L915 (89 LOC)
db/icij_db.py
def generate_investigation_log(path_result: dict) -> list:
    """Generate a step-by-step decision log for a path, showing what the agent
    considered and why it chose each direction."""
    log = []

    for path_info in path_result.get("paths_found", []):
        path_ids = path_info.get("path_node_ids", [])
        if len(path_ids) < 2:
            continue

        # Get full node info for path
        path_nodes_map = {n["node_id"]: n for n in path_result["nodes"] if n["node_id"] in path_ids}

        start = path_nodes_map.get(path_ids[0], {})
        end = path_nodes_map.get(path_ids[-1], {})

        log.append({
            "type": "start",
            "icon": "search",
            "text": f"Starting investigation from {start.get('name', '?')}",
            "detail": f"{start.get('power_player_title') or start.get('node_type', '')} — searching for path to {end.get('name', '?')}",
        })

        for i in range(len(path_ids) - 1):
            current_id = path_ids[i]
    
expand_subgraph function · python · L918-L962 (45 LOC)
db/icij_db.py
def expand_subgraph(result: dict, expand_depth: int = 1, max_extra: int = 8) -> dict:
    """Expand a path result by adding neighbors of each path node.
    Turns a thin chain into a rich network."""
    conn = _get_conn()
    existing_ids = {n["node_id"] for n in result["nodes"]}
    extra_nodes = {}
    extra_edges = []
    extra_edge_keys = {(e["source"], e["target"], e["rel_type"]) for e in result["edges"]}

    for node in result["nodes"]:
        nid = node["node_id"]
        # Get neighbors not already in the graph
        neighbors = get_neighbor_options(nid)
        added = 0
        for nb in neighbors:
            if nb["node_id"] in existing_ids or nb["node_id"] in extra_nodes:
                continue
            if added >= max_extra:
                break
            # Fetch full node details
            full_node = get_node(nb["node_id"])
            if not full_node:
                continue
            full_node["hop"] = node.get("hop", 0) + 1
            extra_nodes[
build_path_description function · python · L965-L985 (21 LOC)
db/icij_db.py
def build_path_description(path_nodes: list, path_edges: list) -> str:
    """Build a structured text description of a path for LLM consumption."""
    if not path_nodes:
        return ""

    parts = []
    for i, node in enumerate(path_nodes):
        jurisdiction = node.get("jurisdiction", "") or node.get("countries", "")
        desc = f"{node['name']} ({node['node_type']}"
        if jurisdiction:
            desc += f", {jurisdiction}"
        desc += ")"

        if i == 0:
            parts.append(desc)
        elif i < len(path_edges) + 1:
            edge = path_edges[i - 1]
            rel = edge["rel_type"].replace("_", " ")
            parts.append(f"--({rel})--> {desc}")

    return " ".join(parts)
download_file function · python · L19-L47 (29 LOC)
scripts/download_icij.py
def download_file(url: str, dest: str, label: str) -> None:
    """Download a file with progress reporting."""
    if os.path.exists(dest) and os.path.getsize(dest) > 1000:
        print(f"  {label}: already exists at {dest}, skipping download")
        return

    print(f"  {label}: downloading from {url}")

    req = urllib.request.Request(url, headers={"User-Agent": "ICIJ-Explorer/1.0"})
    response = urllib.request.urlopen(req)
    total = int(response.headers.get("Content-Length", 0))

    downloaded = 0
    chunk_size = 1024 * 256  # 256KB chunks

    with open(dest, "wb") as f:
        while True:
            chunk = response.read(chunk_size)
            if not chunk:
                break
            f.write(chunk)
            downloaded += len(chunk)
            if total > 0:
                pct = downloaded * 100 // total
                mb = downloaded / (1024 * 1024)
                total_mb = total / (1024 * 1024)
                print(f"\r    {mb:.1f}/{total_mb:.1f} MB (
download_csv_data function · python · L50-L67 (18 LOC)
scripts/download_icij.py
def download_csv_data() -> str:
    """Download and extract the ICIJ CSV zip. Returns path to extracted dir."""
    os.makedirs(DATA_DIR, exist_ok=True)

    download_file(CSV_URL, ZIP_PATH, "ICIJ CSV data")

    if os.path.exists(CSV_DIR) and len(os.listdir(CSV_DIR)) >= 6:
        print(f"  CSV files already extracted to {CSV_DIR}")
        return CSV_DIR

    print(f"  Extracting to {CSV_DIR}...")
    os.makedirs(CSV_DIR, exist_ok=True)
    with zipfile.ZipFile(ZIP_PATH, "r") as zf:
        zf.extractall(CSV_DIR)

    files = os.listdir(CSV_DIR)
    print(f"  Extracted {len(files)} files: {', '.join(sorted(files))}")
    return CSV_DIR
fetch_power_players function · python · L70-L99 (30 LOC)
scripts/download_icij.py
def fetch_power_players() -> list:
    """Fetch Power Players JSON list. Returns list of power player dicts."""
    os.makedirs(DATA_DIR, exist_ok=True)

    if os.path.exists(POWER_PLAYERS_PATH):
        print(f"  Power Players: already exists at {POWER_PLAYERS_PATH}")
        with open(POWER_PLAYERS_PATH) as f:
            data = json.load(f)
        print(f"  {len(data)} power players loaded from cache")
        return data

    print(f"  Power Players: fetching from {POWER_PLAYERS_URL}")
    req = urllib.request.Request(
        POWER_PLAYERS_URL, headers={"User-Agent": "ICIJ-Explorer/1.0"}
    )
    response = urllib.request.urlopen(req)
    raw = response.read().decode("utf-8")
    data = json.loads(raw)

    with open(POWER_PLAYERS_PATH, "w") as f:
        json.dump(data, f, indent=2)

    print(f"  {len(data)} power players saved to {POWER_PLAYERS_PATH}")

    # Show a sample entry
    if data:
        sample = data[0]
        print(f"  Sample entry: {json.dumps(sample, indent=
Repobility · severity-and-effort ranking · https://repobility.com
main function · python · L102-L117 (16 LOC)
scripts/download_icij.py
def main():
    print("=" * 60)
    print("ICIJ Offshore Leaks Data Download")
    print("=" * 60)

    print("\n[1/2] Downloading CSV data (~70 MB)...")
    csv_dir = download_csv_data()

    print("\n[2/2] Fetching Power Players list...")
    players = fetch_power_players()

    print("\n" + "=" * 60)
    print("Download complete!")
    print(f"  CSV directory: {csv_dir}")
    print(f"  Power Players: {len(players)} entries")
    print("=" * 60)
create_schema function · python · L56-L104 (49 LOC)
scripts/ingest_icij.py
def create_schema(conn: sqlite3.Connection) -> None:
    """Create all tables, indexes, and FTS5."""
    conn.executescript(
        """
        DROP TABLE IF EXISTS nodes;
        DROP TABLE IF EXISTS relationships;
        DROP TABLE IF EXISTS power_players;
        DROP TABLE IF EXISTS sources;

        CREATE TABLE nodes (
            node_id     TEXT PRIMARY KEY,
            node_type   TEXT NOT NULL,
            name        TEXT NOT NULL DEFAULT '',
            countries   TEXT DEFAULT '',
            country_codes TEXT DEFAULT '',
            jurisdiction TEXT DEFAULT '',
            source_id   TEXT DEFAULT '',
            address     TEXT DEFAULT '',
            company_type TEXT DEFAULT '',
            incorporation_date TEXT DEFAULT '',
            status      TEXT DEFAULT ''
        );

        CREATE TABLE relationships (
            id            INTEGER PRIMARY KEY AUTOINCREMENT,
            node_id_start TEXT NOT NULL,
            node_id_end   TEXT NOT NULL,
          
create_indexes function · python · L107-L120 (14 LOC)
scripts/ingest_icij.py
def create_indexes(conn: sqlite3.Connection) -> None:
    """Create indexes after bulk loading (faster than indexing during insert)."""
    print("  Creating indexes...")
    conn.executescript(
        """
        CREATE INDEX IF NOT EXISTS idx_nodes_name ON nodes(name);
        CREATE INDEX IF NOT EXISTS idx_nodes_type ON nodes(node_type);
        CREATE INDEX IF NOT EXISTS idx_nodes_source ON nodes(source_id);
        CREATE INDEX IF NOT EXISTS idx_rel_start ON relationships(node_id_start);
        CREATE INDEX IF NOT EXISTS idx_rel_end ON relationships(node_id_end);
        CREATE INDEX IF NOT EXISTS idx_rel_type ON relationships(rel_type);
    """
    )
    print("  Indexes created")
create_fts function · python · L123-L139 (17 LOC)
scripts/ingest_icij.py
def create_fts(conn: sqlite3.Connection) -> None:
    """Create and populate FTS5 index for name search."""
    print("  Building FTS5 index...")
    conn.executescript(
        """
        DROP TABLE IF EXISTS nodes_fts;
        CREATE VIRTUAL TABLE nodes_fts USING fts5(
            name,
            content='nodes',
            content_rowid='rowid',
            tokenize='porter unicode61'
        );
        INSERT INTO nodes_fts(rowid, name) SELECT rowid, name FROM nodes;
    """
    )
    count = conn.execute("SELECT COUNT(*) FROM nodes_fts").fetchone()[0]
    print(f"  FTS5 index built: {count:,} entries")
ingest_node_csv function · python · L142-L209 (68 LOC)
scripts/ingest_icij.py
def ingest_node_csv(
    conn: sqlite3.Connection, csv_path: str, node_type: str
) -> int:
    """Ingest a single node CSV file. Returns row count."""
    if not os.path.exists(csv_path):
        print(f"    WARNING: {csv_path} not found, skipping")
        return 0

    count = 0
    batch = []

    with open(csv_path, "r", encoding="utf-8-sig") as f:
        reader = csv.DictReader(f, delimiter=",")

        for row in reader:
            node_id = row.get("node_id", "").strip()
            if not node_id:
                continue

            name = row.get("name", "").strip()
            if not name and node_type == "address":
                name = row.get("address", "").strip()
            if not name:
                name = row.get("original_name", "").strip()

            batch.append(
                (
                    node_id,
                    node_type,
                    name,
                    row.get("countries", "").strip(),
                    row.get("country_
ingest_nodes function · python · L212-L229 (18 LOC)
scripts/ingest_icij.py
def ingest_nodes(conn: sqlite3.Connection) -> int:
    """Ingest all 5 node CSV files."""
    total = 0
    csv_files = [
        ("nodes-entities.csv", "entity"),
        ("nodes-officers.csv", "officer"),
        ("nodes-addresses.csv", "address"),
        ("nodes-intermediaries.csv", "intermediary"),
        ("nodes-others.csv", "other"),
    ]

    for filename, node_type in csv_files:
        path = os.path.join(CSV_DIR, filename)
        print(f"  Ingesting {filename} (type={node_type})...")
        count = ingest_node_csv(conn, path, node_type)
        total += count

    return total
ingest_relationships function · python · L232-L287 (56 LOC)
scripts/ingest_icij.py
def ingest_relationships(conn: sqlite3.Connection) -> int:
    """Ingest relationships.csv."""
    csv_path = os.path.join(CSV_DIR, "relationships.csv")
    if not os.path.exists(csv_path):
        print(f"  WARNING: {csv_path} not found")
        return 0

    count = 0
    batch = []

    with open(csv_path, "r", encoding="utf-8-sig") as f:
        reader = csv.DictReader(f, delimiter=",")

        for row in reader:
            start = row.get("node_id_start", "").strip()
            end = row.get("node_id_end", "").strip()
            if not start or not end:
                continue

            batch.append(
                (
                    start,
                    end,
                    row.get("rel_type", "").strip(),
                    row.get("link", "").strip(),
                    row.get("sourceID", "").strip(),
                    row.get("start_date", "").strip(),
                    row.get("end_date", "").strip(),
                )
            )

            
tag_power_players function · python · L290-L357 (68 LOC)
scripts/ingest_icij.py
def tag_power_players(conn: sqlite3.Connection) -> int:
    """Match Power Players by name against officer nodes and tag them."""
    if not os.path.exists(POWER_PLAYERS_PATH):
        print(f"  WARNING: {POWER_PLAYERS_PATH} not found, skipping Power Player tagging")
        return 0

    with open(POWER_PLAYERS_PATH) as f:
        players = json.load(f)

    tagged = 0
    for pp in players:
        name = pp.get("subtitle", "").strip()
        title = pp.get("title", "").strip()
        slug = pp.get("slug", "").strip()
        release = pp.get("release", "").strip()

        if not name:
            continue

        # Get source_id patterns for this release
        source_patterns = RELEASE_TO_SOURCE.get(release, [])

        # Try exact name match first, filtered by source
        matched_ids = []
        if source_patterns:
            placeholders = ",".join("?" for _ in source_patterns)
            rows = conn.execute(
                f"""SELECT node_id FROM nodes
             
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
populate_sources function · python · L360-L385 (26 LOC)
scripts/ingest_icij.py
def populate_sources(conn: sqlite3.Connection) -> None:
    """Populate the sources table mapping source_id to leak names."""
    for source_id, leak_name in SOURCE_TO_LEAK.items():
        conn.execute(
            "INSERT OR IGNORE INTO sources (source_id, leak_name) VALUES (?, ?)",
            (source_id, leak_name),
        )
    conn.commit()

    # Also discover any source_ids in the data not in our map
    unknown = conn.execute(
        """SELECT DISTINCT source_id FROM nodes
           WHERE source_id != '' AND source_id NOT IN (SELECT source_id FROM sources)"""
    ).fetchall()
    if unknown:
        print(f"  Note: {len(unknown)} unmapped source_ids found in data:")
        for row in unknown[:10]:
            print(f"    - {row[0]}")
            # Auto-map based on prefix
            sid = row[0]
            leak_name = sid.split(" - ")[0] if " - " in sid else sid
            conn.execute(
                "INSERT OR IGNORE INTO sources (source_id, leak_name) VALUES (?, ?)"
main function · python · L388-L451 (64 LOC)
scripts/ingest_icij.py
def main():
    print("=" * 60)
    print("ICIJ Offshore Leaks Data Ingestion")
    print("=" * 60)

    # Check CSVs exist
    if not os.path.exists(CSV_DIR):
        print(f"\nERROR: CSV directory not found at {CSV_DIR}")
        print("Run download_icij.py first.")
        sys.exit(1)

    # Remove existing DB
    if os.path.exists(DB_PATH):
        os.remove(DB_PATH)
        print(f"\nRemoved existing database at {DB_PATH}")

    conn = sqlite3.connect(DB_PATH)
    # Performance settings for bulk loading
    conn.execute("PRAGMA journal_mode=WAL")
    conn.execute("PRAGMA synchronous=OFF")
    conn.execute("PRAGMA cache_size=-512000")  # 512MB cache
    conn.execute("PRAGMA temp_store=MEMORY")

    start = time.time()

    print("\n[1/7] Creating schema...")
    create_schema(conn)

    print("\n[2/7] Ingesting nodes...")
    node_count = ingest_nodes(conn)

    print("\n[3/7] Ingesting relationships...")
    rel_count = ingest_relationships(conn)

    print("\n[4/7] Creating index