Function bodies 33 total
_get_conn function · python · L14-L22 (9 LOC)db/icij_db.py
def _get_conn() -> sqlite3.Connection:
"""Get a thread-local SQLite connection."""
if not hasattr(_local, "conn") or _local.conn is None:
_local.conn = sqlite3.connect(DB_PATH)
_local.conn.row_factory = sqlite3.Row
_local.conn.execute("PRAGMA journal_mode=WAL")
_local.conn.execute("PRAGMA cache_size=-128000") # 128MB cache
_local.conn.execute("PRAGMA query_only=ON")
return _local.connsearch_nodes function · python · L25-L78 (54 LOC)db/icij_db.py
def search_nodes(query: str, limit: int = 30) -> list:
"""FTS5 name search with prefix matching. Returns list of node dicts."""
limit = int(limit)
conn = _get_conn()
# Sanitize: remove FTS5 special characters, add prefix matching
sanitized = "".join(c for c in query if c.isalnum() or c.isspace() or c == "-")
sanitized = sanitized.strip()
if not sanitized:
return []
# Add prefix wildcard for partial matching
terms = sanitized.split()
fts_query = " ".join(f'"{t}"*' for t in terms if t)
rows = conn.execute(
"""SELECT n.node_id, n.name, n.node_type, n.countries, n.country_codes,
n.source_id, n.jurisdiction, n.address, n.company_type,
pp.node_id IS NOT NULL AS is_power_player,
pp.title AS power_player_title,
pp.release AS power_player_release
FROM nodes_fts fts
JOIN nodes n ON n.rowid = fts.rowid
LEFT JOIN power_players pp ONget_node function · python · L81-L112 (32 LOC)db/icij_db.py
def get_node(node_id: str) -> Optional[dict]:
"""Fetch a single node by ID with power player status."""
conn = _get_conn()
r = conn.execute(
"""SELECT n.*, pp.node_id IS NOT NULL AS is_power_player,
pp.title AS power_player_title,
pp.release AS power_player_release
FROM nodes n
LEFT JOIN power_players pp ON pp.node_id = n.node_id
WHERE n.node_id = ?""",
(node_id,),
).fetchone()
if not r:
return None
return {
"node_id": r["node_id"],
"name": r["name"],
"node_type": r["node_type"],
"countries": r["countries"],
"country_codes": r["country_codes"],
"source_id": r["source_id"],
"jurisdiction": r["jurisdiction"],
"address": r["address"],
"company_type": r["company_type"],
"incorporation_date": r["incorporation_date"],
"status": r["status"],
"is_power_player": bool(r["is_power_play_batch_query function · python · L115-L125 (11 LOC)db/icij_db.py
def _batch_query(conn, sql_template: str, ids: list, extra_params: list = None) -> list:
"""Execute a query with batched IN clause (SQLite 999 variable limit)."""
results = []
batch_size = 900 # Leave room for extra params
for i in range(0, len(ids), batch_size):
batch = ids[i : i + batch_size]
placeholders = ",".join("?" for _ in batch)
sql = sql_template.replace("__PLACEHOLDERS__", placeholders)
params = (extra_params or []) + batch
results.extend(conn.execute(sql, params).fetchall())
return resultsget_subgraph function · python · L128-L344 (217 LOC)db/icij_db.py
def get_subgraph(
center_node_id: str,
max_depth: int = 3,
max_nodes: int = 500,
) -> dict:
max_depth = int(max_depth)
max_nodes = int(max_nodes)
"""BFS from center node, returning nodes with hop distance and edges."""
conn = _get_conn()
# Verify center node exists
center = get_node(center_node_id)
if not center:
return {
"nodes": [],
"edges": [],
"power_players_found": [],
"truncated": False,
"stats": {"total_nodes": 0, "total_edges": 0, "max_depth_reached": 0},
}
visited = set()
seen_names = set() # Deduplicate same person appearing as multiple records
all_nodes = []
all_edges = []
all_edge_keys = set()
power_players_found = []
truncated = False
max_depth_reached = 0
# Add center node at hop 0
center["hop"] = 0
all_nodes.append(center)
visited.add(center_node_id)
seen_names.add((center["name"].lower(), center["nodeget_node_detail function · python · L347-L387 (41 LOC)db/icij_db.py
def get_node_detail(node_id: str) -> Optional[dict]:
"""Fetch full node details with immediate connections and ICIJ URL."""
conn = _get_conn()
node = get_node(node_id)
if not node:
return None
# Get immediate connections
connections = conn.execute(
"""SELECT r.rel_type, r.link, r.source_id, r.start_date, r.end_date,
n.node_id, n.name, n.node_type
FROM relationships r
JOIN nodes n ON n.node_id = CASE
WHEN r.node_id_start = ? THEN r.node_id_end
ELSE r.node_id_start
END
WHERE (r.node_id_start = ? OR r.node_id_end = ?)
AND r.rel_type NOT IN ('same_name_as', 'similar', 'same_company_as',
'same_as', 'same_id_as')
LIMIT 100""",
(node_id, node_id, node_id),
).fetchall()
node["connections"] = [
{
"rel_type": c["rel_type"],
"link": c["link"],
"sourcget_leak_name function · python · L390-L401 (12 LOC)db/icij_db.py
def get_leak_name(source_id: str) -> str:
"""Map source_id to human-readable leak name."""
if not source_id:
return "Unknown"
conn = _get_conn()
r = conn.execute(
"SELECT leak_name FROM sources WHERE source_id = ?", (source_id,)
).fetchone()
if r:
return r["leak_name"]
# Fallback: extract from prefix
return source_id.split(" - ")[0] if " - " in source_id else source_idMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
get_node_url function · python · L404-L406 (3 LOC)db/icij_db.py
def get_node_url(node_id: str) -> str:
"""Construct ICIJ Offshore Leaks URL for a node."""
return f"https://offshoreleaks.icij.org/nodes/{node_id}"get_stats function · python · L409-L439 (31 LOC)db/icij_db.py
def get_stats() -> dict:
"""Return database statistics."""
conn = _get_conn()
total = conn.execute("SELECT COUNT(*) FROM nodes").fetchone()[0]
type_counts = {}
for row in conn.execute(
"SELECT node_type, COUNT(*) as cnt FROM nodes GROUP BY node_type"
):
type_counts[row["node_type"]] = row["cnt"]
rel_count = conn.execute("SELECT COUNT(*) FROM relationships").fetchone()[0]
pp_count = conn.execute("SELECT COUNT(*) FROM power_players").fetchone()[0]
leaks = [
row[0]
for row in conn.execute("SELECT DISTINCT leak_name FROM sources ORDER BY leak_name")
]
return {
"total_nodes": total,
"total_entities": type_counts.get("entity", 0),
"total_officers": type_counts.get("officer", 0),
"total_addresses": type_counts.get("address", 0),
"total_intermediaries": type_counts.get("intermediary", 0),
"total_others": type_counts.get("other", 0),
"total_relationships": rel_load_pp_json function · python · L447-L453 (7 LOC)db/icij_db.py
def _load_pp_json():
global _pp_json_cache
if _pp_json_cache is None:
pp_path = os.path.join(os.path.dirname(__file__), "..", "data", "power_players.json")
with open(pp_path) as f:
_pp_json_cache = {p["slug"]: p for p in json.load(f)}
return _pp_json_cache_compute_pp_connectivity function · python · L456-L486 (31 LOC)db/icij_db.py
def _compute_pp_connectivity(pps: list) -> list:
"""Pre-compute which PP pairs are connected. Cached after first call."""
global _pp_connectivity_cache
if _pp_connectivity_cache is not None:
return _pp_connectivity_cache
# Check for cached file first
cache_path = os.path.join(os.path.dirname(__file__), "..", "data", "pp_connections.json")
if os.path.exists(cache_path):
with open(cache_path) as f:
_pp_connectivity_cache = json.load(f)
return _pp_connectivity_cache
# Compute all pairs
connected = []
for i in range(len(pps)):
for j in range(i + 1, len(pps)):
r = find_path(pps[i]["node_ids"][0], pps[j]["node_ids"][0], max_depth=8)
if r["found"]:
connected.append({
"slug_a": pps[i]["slug"],
"slug_b": pps[j]["slug"],
"hops": r["hops"],
})
# Cache to file for next time
with open(cache_pathget_power_players function · python · L489-L529 (41 LOC)db/icij_db.py
def get_power_players() -> list:
"""Return all Power Players that have DB entries, with image URLs from JSON."""
conn = _get_conn()
pp_json = _load_pp_json()
rows = conn.execute(
"""SELECT slug, GROUP_CONCAT(node_id) as node_ids, name, title, release
FROM power_players GROUP BY slug ORDER BY name"""
).fetchall()
players = []
for r in rows:
slug = r["slug"]
json_entry = pp_json.get(slug, {})
players.append({
"slug": slug,
"name": r["name"],
"title": r["title"],
"release": r["release"],
"image": json_entry.get("image", ""),
"node_ids": r["node_ids"].split(","),
"icij_link": json_entry.get("link", f"/power-players/{slug}"),
})
# Compute connectivity (cached after first call)
connected_pairs = _compute_pp_connectivity(players)
# Mark which PPs have connections and build adjacency
connectable_slugs = set()
find_path function · python · L543-L700 (158 LOC)db/icij_db.py
def find_path(start_id: str, end_id: str, max_depth: int = 6) -> dict:
"""Bidirectional BFS between two node IDs. Returns shortest path."""
max_depth = int(max_depth)
start_id = str(start_id)
end_id = str(end_id)
conn = _get_conn()
if start_id == end_id:
node = get_node(start_id)
if node:
node["hop"] = 0
return {"found": True, "path_nodes": [node], "path_edges": [], "hops": 0}
return {"found": False, "path_nodes": [], "path_edges": [], "hops": 0}
# Forward BFS state (from start)
fwd_visited = {start_id: None}
fwd_frontier = [start_id]
fwd_edge_map = {} # node_id -> (parent_id, rel_type, link, source_id)
# Backward BFS state (from end)
bwd_visited = {end_id: None}
bwd_frontier = [end_id]
bwd_edge_map = {}
meeting_node = None
for depth in range(1, max_depth + 1):
if not fwd_frontier and not bwd_frontier:
break
# Expand the smaller frontier (orfind_connections function · python · L703-L752 (50 LOC)db/icij_db.py
def find_connections(node_ids: list, max_depth: int = 6) -> dict:
"""Find paths between all pairs of node IDs, merge into one subgraph."""
max_depth = int(max_depth)
node_ids = [str(nid) for nid in node_ids]
all_nodes = {} # node_id -> node dict
all_edges = []
all_edge_keys = set()
paths_found = []
paths_not_found = []
# All unique pairs
for i in range(len(node_ids)):
for j in range(i + 1, len(node_ids)):
result = find_path(node_ids[i], node_ids[j], max_depth=max_depth)
if result["found"]:
paths_found.append({
"start_id": node_ids[i],
"end_id": node_ids[j],
"hops": result["hops"],
"path_node_ids": [n["node_id"] for n in result["path_nodes"]],
})
for node in result["path_nodes"]:
all_nodes[node["node_id"]] = node
for edge in result["path_edges"]:
get_cached_summary function · python · L759-L770 (12 LOC)db/icij_db.py
def get_cached_summary(start_id: str, end_id: str) -> str:
"""Return cached LLM summary for a pair, or empty string if not cached."""
key = f"{start_id}:{end_id}"
alt_key = f"{end_id}:{start_id}"
try:
if os.path.exists(_SUMMARY_CACHE_PATH):
with open(_SUMMARY_CACHE_PATH) as f:
cache = json.load(f)
return cache.get(key, cache.get(alt_key, ""))
except Exception:
pass
return ""Repobility — the code-quality scanner for AI-generated software · https://repobility.com
save_cached_summary function · python · L773-L785 (13 LOC)db/icij_db.py
def save_cached_summary(start_id: str, end_id: str, summary: str) -> None:
"""Persist an LLM summary so we never pay for the same call twice."""
key = f"{start_id}:{end_id}"
cache = {}
try:
if os.path.exists(_SUMMARY_CACHE_PATH):
with open(_SUMMARY_CACHE_PATH) as f:
cache = json.load(f)
except Exception:
pass
cache[key] = summary
with open(_SUMMARY_CACHE_PATH, "w") as f:
json.dump(cache, f, indent=2)get_neighbor_options function · python · L788-L824 (37 LOC)db/icij_db.py
def get_neighbor_options(node_id: str) -> list:
"""Get all meaningful neighbors of a node with metadata for agent decision-making."""
conn = _get_conn()
node_id = str(node_id)
rows = conn.execute(
"""SELECT
CASE WHEN r.node_id_start = ? THEN r.node_id_end ELSE r.node_id_start END as neighbor_id,
r.rel_type,
n.name, n.node_type, n.countries, n.jurisdiction,
pp.node_id IS NOT NULL as is_pp,
pp.title as pp_title
FROM relationships r
JOIN nodes n ON n.node_id = CASE WHEN r.node_id_start = ? THEN r.node_id_end ELSE r.node_id_start END
LEFT JOIN power_players pp ON pp.node_id = n.node_id
WHERE (r.node_id_start = ? OR r.node_id_end = ?)
AND r.rel_type NOT IN ('same_name_as','similar','same_company_as','same_as',
'same_id_as','similar_company_as','probably_same_officer_as',
generate_investigation_log function · python · L827-L915 (89 LOC)db/icij_db.py
def generate_investigation_log(path_result: dict) -> list:
"""Generate a step-by-step decision log for a path, showing what the agent
considered and why it chose each direction."""
log = []
for path_info in path_result.get("paths_found", []):
path_ids = path_info.get("path_node_ids", [])
if len(path_ids) < 2:
continue
# Get full node info for path
path_nodes_map = {n["node_id"]: n for n in path_result["nodes"] if n["node_id"] in path_ids}
start = path_nodes_map.get(path_ids[0], {})
end = path_nodes_map.get(path_ids[-1], {})
log.append({
"type": "start",
"icon": "search",
"text": f"Starting investigation from {start.get('name', '?')}",
"detail": f"{start.get('power_player_title') or start.get('node_type', '')} — searching for path to {end.get('name', '?')}",
})
for i in range(len(path_ids) - 1):
current_id = path_ids[i]
expand_subgraph function · python · L918-L962 (45 LOC)db/icij_db.py
def expand_subgraph(result: dict, expand_depth: int = 1, max_extra: int = 8) -> dict:
"""Expand a path result by adding neighbors of each path node.
Turns a thin chain into a rich network."""
conn = _get_conn()
existing_ids = {n["node_id"] for n in result["nodes"]}
extra_nodes = {}
extra_edges = []
extra_edge_keys = {(e["source"], e["target"], e["rel_type"]) for e in result["edges"]}
for node in result["nodes"]:
nid = node["node_id"]
# Get neighbors not already in the graph
neighbors = get_neighbor_options(nid)
added = 0
for nb in neighbors:
if nb["node_id"] in existing_ids or nb["node_id"] in extra_nodes:
continue
if added >= max_extra:
break
# Fetch full node details
full_node = get_node(nb["node_id"])
if not full_node:
continue
full_node["hop"] = node.get("hop", 0) + 1
extra_nodes[build_path_description function · python · L965-L985 (21 LOC)db/icij_db.py
def build_path_description(path_nodes: list, path_edges: list) -> str:
"""Build a structured text description of a path for LLM consumption."""
if not path_nodes:
return ""
parts = []
for i, node in enumerate(path_nodes):
jurisdiction = node.get("jurisdiction", "") or node.get("countries", "")
desc = f"{node['name']} ({node['node_type']}"
if jurisdiction:
desc += f", {jurisdiction}"
desc += ")"
if i == 0:
parts.append(desc)
elif i < len(path_edges) + 1:
edge = path_edges[i - 1]
rel = edge["rel_type"].replace("_", " ")
parts.append(f"--({rel})--> {desc}")
return " ".join(parts)download_file function · python · L19-L47 (29 LOC)scripts/download_icij.py
def download_file(url: str, dest: str, label: str) -> None:
"""Download a file with progress reporting."""
if os.path.exists(dest) and os.path.getsize(dest) > 1000:
print(f" {label}: already exists at {dest}, skipping download")
return
print(f" {label}: downloading from {url}")
req = urllib.request.Request(url, headers={"User-Agent": "ICIJ-Explorer/1.0"})
response = urllib.request.urlopen(req)
total = int(response.headers.get("Content-Length", 0))
downloaded = 0
chunk_size = 1024 * 256 # 256KB chunks
with open(dest, "wb") as f:
while True:
chunk = response.read(chunk_size)
if not chunk:
break
f.write(chunk)
downloaded += len(chunk)
if total > 0:
pct = downloaded * 100 // total
mb = downloaded / (1024 * 1024)
total_mb = total / (1024 * 1024)
print(f"\r {mb:.1f}/{total_mb:.1f} MB (download_csv_data function · python · L50-L67 (18 LOC)scripts/download_icij.py
def download_csv_data() -> str:
"""Download and extract the ICIJ CSV zip. Returns path to extracted dir."""
os.makedirs(DATA_DIR, exist_ok=True)
download_file(CSV_URL, ZIP_PATH, "ICIJ CSV data")
if os.path.exists(CSV_DIR) and len(os.listdir(CSV_DIR)) >= 6:
print(f" CSV files already extracted to {CSV_DIR}")
return CSV_DIR
print(f" Extracting to {CSV_DIR}...")
os.makedirs(CSV_DIR, exist_ok=True)
with zipfile.ZipFile(ZIP_PATH, "r") as zf:
zf.extractall(CSV_DIR)
files = os.listdir(CSV_DIR)
print(f" Extracted {len(files)} files: {', '.join(sorted(files))}")
return CSV_DIRfetch_power_players function · python · L70-L99 (30 LOC)scripts/download_icij.py
def fetch_power_players() -> list:
"""Fetch Power Players JSON list. Returns list of power player dicts."""
os.makedirs(DATA_DIR, exist_ok=True)
if os.path.exists(POWER_PLAYERS_PATH):
print(f" Power Players: already exists at {POWER_PLAYERS_PATH}")
with open(POWER_PLAYERS_PATH) as f:
data = json.load(f)
print(f" {len(data)} power players loaded from cache")
return data
print(f" Power Players: fetching from {POWER_PLAYERS_URL}")
req = urllib.request.Request(
POWER_PLAYERS_URL, headers={"User-Agent": "ICIJ-Explorer/1.0"}
)
response = urllib.request.urlopen(req)
raw = response.read().decode("utf-8")
data = json.loads(raw)
with open(POWER_PLAYERS_PATH, "w") as f:
json.dump(data, f, indent=2)
print(f" {len(data)} power players saved to {POWER_PLAYERS_PATH}")
# Show a sample entry
if data:
sample = data[0]
print(f" Sample entry: {json.dumps(sample, indent=Repobility · severity-and-effort ranking · https://repobility.com
main function · python · L102-L117 (16 LOC)scripts/download_icij.py
def main():
print("=" * 60)
print("ICIJ Offshore Leaks Data Download")
print("=" * 60)
print("\n[1/2] Downloading CSV data (~70 MB)...")
csv_dir = download_csv_data()
print("\n[2/2] Fetching Power Players list...")
players = fetch_power_players()
print("\n" + "=" * 60)
print("Download complete!")
print(f" CSV directory: {csv_dir}")
print(f" Power Players: {len(players)} entries")
print("=" * 60)create_schema function · python · L56-L104 (49 LOC)scripts/ingest_icij.py
def create_schema(conn: sqlite3.Connection) -> None:
"""Create all tables, indexes, and FTS5."""
conn.executescript(
"""
DROP TABLE IF EXISTS nodes;
DROP TABLE IF EXISTS relationships;
DROP TABLE IF EXISTS power_players;
DROP TABLE IF EXISTS sources;
CREATE TABLE nodes (
node_id TEXT PRIMARY KEY,
node_type TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
countries TEXT DEFAULT '',
country_codes TEXT DEFAULT '',
jurisdiction TEXT DEFAULT '',
source_id TEXT DEFAULT '',
address TEXT DEFAULT '',
company_type TEXT DEFAULT '',
incorporation_date TEXT DEFAULT '',
status TEXT DEFAULT ''
);
CREATE TABLE relationships (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id_start TEXT NOT NULL,
node_id_end TEXT NOT NULL,
create_indexes function · python · L107-L120 (14 LOC)scripts/ingest_icij.py
def create_indexes(conn: sqlite3.Connection) -> None:
"""Create indexes after bulk loading (faster than indexing during insert)."""
print(" Creating indexes...")
conn.executescript(
"""
CREATE INDEX IF NOT EXISTS idx_nodes_name ON nodes(name);
CREATE INDEX IF NOT EXISTS idx_nodes_type ON nodes(node_type);
CREATE INDEX IF NOT EXISTS idx_nodes_source ON nodes(source_id);
CREATE INDEX IF NOT EXISTS idx_rel_start ON relationships(node_id_start);
CREATE INDEX IF NOT EXISTS idx_rel_end ON relationships(node_id_end);
CREATE INDEX IF NOT EXISTS idx_rel_type ON relationships(rel_type);
"""
)
print(" Indexes created")create_fts function · python · L123-L139 (17 LOC)scripts/ingest_icij.py
def create_fts(conn: sqlite3.Connection) -> None:
"""Create and populate FTS5 index for name search."""
print(" Building FTS5 index...")
conn.executescript(
"""
DROP TABLE IF EXISTS nodes_fts;
CREATE VIRTUAL TABLE nodes_fts USING fts5(
name,
content='nodes',
content_rowid='rowid',
tokenize='porter unicode61'
);
INSERT INTO nodes_fts(rowid, name) SELECT rowid, name FROM nodes;
"""
)
count = conn.execute("SELECT COUNT(*) FROM nodes_fts").fetchone()[0]
print(f" FTS5 index built: {count:,} entries")ingest_node_csv function · python · L142-L209 (68 LOC)scripts/ingest_icij.py
def ingest_node_csv(
conn: sqlite3.Connection, csv_path: str, node_type: str
) -> int:
"""Ingest a single node CSV file. Returns row count."""
if not os.path.exists(csv_path):
print(f" WARNING: {csv_path} not found, skipping")
return 0
count = 0
batch = []
with open(csv_path, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f, delimiter=",")
for row in reader:
node_id = row.get("node_id", "").strip()
if not node_id:
continue
name = row.get("name", "").strip()
if not name and node_type == "address":
name = row.get("address", "").strip()
if not name:
name = row.get("original_name", "").strip()
batch.append(
(
node_id,
node_type,
name,
row.get("countries", "").strip(),
row.get("country_ingest_nodes function · python · L212-L229 (18 LOC)scripts/ingest_icij.py
def ingest_nodes(conn: sqlite3.Connection) -> int:
"""Ingest all 5 node CSV files."""
total = 0
csv_files = [
("nodes-entities.csv", "entity"),
("nodes-officers.csv", "officer"),
("nodes-addresses.csv", "address"),
("nodes-intermediaries.csv", "intermediary"),
("nodes-others.csv", "other"),
]
for filename, node_type in csv_files:
path = os.path.join(CSV_DIR, filename)
print(f" Ingesting {filename} (type={node_type})...")
count = ingest_node_csv(conn, path, node_type)
total += count
return totalingest_relationships function · python · L232-L287 (56 LOC)scripts/ingest_icij.py
def ingest_relationships(conn: sqlite3.Connection) -> int:
"""Ingest relationships.csv."""
csv_path = os.path.join(CSV_DIR, "relationships.csv")
if not os.path.exists(csv_path):
print(f" WARNING: {csv_path} not found")
return 0
count = 0
batch = []
with open(csv_path, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f, delimiter=",")
for row in reader:
start = row.get("node_id_start", "").strip()
end = row.get("node_id_end", "").strip()
if not start or not end:
continue
batch.append(
(
start,
end,
row.get("rel_type", "").strip(),
row.get("link", "").strip(),
row.get("sourceID", "").strip(),
row.get("start_date", "").strip(),
row.get("end_date", "").strip(),
)
)
tag_power_players function · python · L290-L357 (68 LOC)scripts/ingest_icij.py
def tag_power_players(conn: sqlite3.Connection) -> int:
"""Match Power Players by name against officer nodes and tag them."""
if not os.path.exists(POWER_PLAYERS_PATH):
print(f" WARNING: {POWER_PLAYERS_PATH} not found, skipping Power Player tagging")
return 0
with open(POWER_PLAYERS_PATH) as f:
players = json.load(f)
tagged = 0
for pp in players:
name = pp.get("subtitle", "").strip()
title = pp.get("title", "").strip()
slug = pp.get("slug", "").strip()
release = pp.get("release", "").strip()
if not name:
continue
# Get source_id patterns for this release
source_patterns = RELEASE_TO_SOURCE.get(release, [])
# Try exact name match first, filtered by source
matched_ids = []
if source_patterns:
placeholders = ",".join("?" for _ in source_patterns)
rows = conn.execute(
f"""SELECT node_id FROM nodes
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
populate_sources function · python · L360-L385 (26 LOC)scripts/ingest_icij.py
def populate_sources(conn: sqlite3.Connection) -> None:
"""Populate the sources table mapping source_id to leak names."""
for source_id, leak_name in SOURCE_TO_LEAK.items():
conn.execute(
"INSERT OR IGNORE INTO sources (source_id, leak_name) VALUES (?, ?)",
(source_id, leak_name),
)
conn.commit()
# Also discover any source_ids in the data not in our map
unknown = conn.execute(
"""SELECT DISTINCT source_id FROM nodes
WHERE source_id != '' AND source_id NOT IN (SELECT source_id FROM sources)"""
).fetchall()
if unknown:
print(f" Note: {len(unknown)} unmapped source_ids found in data:")
for row in unknown[:10]:
print(f" - {row[0]}")
# Auto-map based on prefix
sid = row[0]
leak_name = sid.split(" - ")[0] if " - " in sid else sid
conn.execute(
"INSERT OR IGNORE INTO sources (source_id, leak_name) VALUES (?, ?)"main function · python · L388-L451 (64 LOC)scripts/ingest_icij.py
def main():
print("=" * 60)
print("ICIJ Offshore Leaks Data Ingestion")
print("=" * 60)
# Check CSVs exist
if not os.path.exists(CSV_DIR):
print(f"\nERROR: CSV directory not found at {CSV_DIR}")
print("Run download_icij.py first.")
sys.exit(1)
# Remove existing DB
if os.path.exists(DB_PATH):
os.remove(DB_PATH)
print(f"\nRemoved existing database at {DB_PATH}")
conn = sqlite3.connect(DB_PATH)
# Performance settings for bulk loading
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=OFF")
conn.execute("PRAGMA cache_size=-512000") # 512MB cache
conn.execute("PRAGMA temp_store=MEMORY")
start = time.time()
print("\n[1/7] Creating schema...")
create_schema(conn)
print("\n[2/7] Ingesting nodes...")
node_count = ingest_nodes(conn)
print("\n[3/7] Ingesting relationships...")
rel_count = ingest_relationships(conn)
print("\n[4/7] Creating index