Function bodies 1,000 total
graph_overview function · python · L81-L99 (19 LOC)examples/mcp_server.py
def graph_overview(
types: list[str] | None = None,
connections: bool | list[str] | None = None,
cypher: bool | list[str] | None = None,
) -> str:
"""Get graph schema, connection details, or Cypher language reference.
Three independent axes — call with no args first for the overview:
graph_overview() — inventory of node types
graph_overview(types=["Type"]) — property schemas, samples
graph_overview(connections=True) — all connection types
graph_overview(connections=["CITES"]) — deep-dive with properties
graph_overview(cypher=True) — Cypher reference
graph_overview(cypher=["temporal","MATCH"]) — detailed docs with examples
"""
try:
return graph.describe(types=types, connections=connections, cypher=cypher)
except Exception as e:
return f"Error: {e}"cypher_query function · python · L103-L120 (18 LOC)examples/mcp_server.py
def cypher_query(query: str) -> str:
"""Run a Cypher query against the knowledge graph. Returns up to 200 rows.
Supports MATCH, WHERE, RETURN, ORDER BY, LIMIT, WITH, OPTIONAL MATCH,
UNWIND, UNION, CREATE, SET, DELETE, MERGE, aggregations, path patterns,
CALL procedures (pagerank, louvain, etc.), and spatial/temporal functions.
Call graph_overview() first if you need the schema."""
try:
result = graph.cypher(query)
if len(result) == 0:
return "No results."
rows = [str(dict(row)) for row in result[:200]]
header = f"{len(result)} row(s)"
if len(result) > 200:
header += " (showing first 200)"
return header + ":\n" + "\n".join(rows)
except Exception as e:
return f"Cypher error: {e}"bug_report function · python · L124-L129 (6 LOC)examples/mcp_server.py
def bug_report(query: str, result: str, expected: str, description: str) -> str:
"""File a Cypher bug report to reported_bugs.md."""
try:
return graph.bug_report(query, result, expected, description)
except Exception as e:
return f"Error: {e}"from_blueprint function · python · L28-L69 (42 LOC)kglite/blueprint/loader.py
def from_blueprint(
blueprint_path: Union[str, Path],
*,
verbose: bool = False,
save: bool = True,
) -> kglite.KnowledgeGraph:
"""Parse a JSON blueprint and build a KnowledgeGraph from CSV files.
Args:
blueprint_path: Path to the blueprint JSON file.
verbose: If True, print progress information.
save: If True and the blueprint specifies an output path, save the graph.
Returns:
A populated KnowledgeGraph.
Raises:
FileNotFoundError: If the blueprint file is missing.
ValueError: If the blueprint JSON is malformed.
"""
blueprint_path = Path(blueprint_path)
if not blueprint_path.exists():
raise FileNotFoundError(f"Blueprint file not found: {blueprint_path}")
with open(blueprint_path) as f:
raw = json.load(f)
loader = BlueprintLoader(raw, verbose=verbose)
graph = loader.build()
if loader.errors:
print(f"\nBlueprint loaded with {len(loader.errors)} warning(BlueprintLoader.__init__ method · python · L75-L100 (26 LOC)kglite/blueprint/loader.py
def __init__(self, raw: dict[str, Any], verbose: bool = False):
self.raw = raw
self.verbose = verbose
self.graph = kglite.KnowledgeGraph()
self.errors: list[dict[str, str]] = []
# Parse settings
settings = raw.get("settings", {})
self.root = Path(settings.get("root", "."))
output = settings.get("output")
self.output_path = self.root / output if output else None
# Node specs keyed by type name
self.nodes: dict[str, dict[str, Any]] = raw.get("nodes", {})
# CSV cache: relative_path -> DataFrame
self._csv_cache: dict[str, pd.DataFrame] = {}
# Track which node types have been loaded (for edge creation)
self._loaded_types: set[str] = set()
# Stats
self.stats: dict[str, Any] = {
"nodes_by_type": {},
"edges_by_type": {},
}BlueprintLoader.build method · python · L102-L140 (39 LOC)kglite/blueprint/loader.py
def build(self) -> kglite.KnowledgeGraph:
"""Execute the full loading sequence."""
t0 = time.time()
if self.verbose:
print(f"Loading blueprint...")
print(f" Root: {self.root}")
# Collect all node specs (core + sub_nodes) with their parent info
core_specs, sub_specs = self._collect_specs()
# Phase 1: Manual nodes (no csv)
self._load_manual_nodes(core_specs, sub_specs)
# Phase 2: Core nodes (with csv)
self._load_nodes(core_specs, phase_name="core nodes")
# Phase 3: Sub-nodes
self._load_nodes(sub_specs, phase_name="sub-nodes")
# Phase 3b: Register parent types for supporting node tiers
for sub in sub_specs:
sub_type = sub["_node_type"]
parent = sub["_parent_type"]
if sub_type in self._loaded_types:
self.graph.set_parent_type(sub_type, parent)
# Phase 4: FK edges
self._load_fk_edges(corBlueprintLoader._collect_specs method · python · L144-L168 (25 LOC)kglite/blueprint/loader.py
def _collect_specs(
self,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
"""Separate core nodes from sub_nodes, annotating each with node_type."""
core = []
subs = []
for node_type, spec in self.nodes.items():
spec = dict(spec) # shallow copy
spec["_node_type"] = node_type
if spec.get("csv") is not None:
core.append(spec)
elif not spec.get("_is_manual"):
# Nodes without csv are manual — handled separately
spec["_is_manual"] = True
core.append(spec)
# Collect sub_nodes
for sub_type, sub_spec in spec.get("sub_nodes", {}).items():
sub = dict(sub_spec)
sub["_node_type"] = sub_type
sub["_parent_type"] = node_type
sub["_parent_pk"] = spec.get("pk", "id")
subs.append(sub)
return core, subsSource: Repobility analyzer · https://repobility.com
BlueprintLoader._load_manual_nodes method · python · L172-L238 (67 LOC)kglite/blueprint/loader.py
def _load_manual_nodes(
self,
core_specs: list[dict[str, Any]],
sub_specs: list[dict[str, Any]],
) -> None:
"""Create nodes for types without a CSV, from distinct FK values."""
all_specs = core_specs + sub_specs
manual_types = {
s["_node_type"]: s for s in core_specs if s.get("_is_manual")
}
if not manual_types:
return
if self.verbose:
print(" Loading manual nodes...")
for manual_type, manual_spec in manual_types.items():
distinct_values: set[Any] = set()
# Scan all FK edges across all specs to find references to this type
for spec in all_specs:
for _edge_type, edge_def in (
spec.get("connections", {}).get("fk_edges", {}).items()
):
if edge_def.get("target") != manual_type:
continue
csv_path = spec.get("csv")
BlueprintLoader._load_nodes method · python · L242-L341 (100 LOC)kglite/blueprint/loader.py
def _load_nodes(
self, specs: list[dict[str, Any]], phase_name: str
) -> None:
"""Load nodes from CSV files."""
loadable = [s for s in specs if not s.get("_is_manual") and s.get("csv")]
if not loadable:
return
if self.verbose:
print(f" Loading {phase_name}...")
for spec in loadable:
node_type = spec["_node_type"]
csv_path = spec["csv"]
try:
df = self._read_csv(csv_path)
except FileNotFoundError:
self._report_error(node_type, f"CSV not found: {csv_path}")
continue
# Apply filter
filt = spec.get("filter")
if filt:
df = self._apply_filter(df, filt)
# Handle pk: "auto"
pk = spec.get("pk", "id")
if pk == "auto":
pk = f"_{node_type}_id"
df = df.copy()
df[pk] = range(1, len(df) + BlueprintLoader._load_fk_edges method · python · L345-L445 (101 LOC)kglite/blueprint/loader.py
def _load_fk_edges(self, all_specs: list[dict[str, Any]]) -> None:
"""Create edges from FK columns in source CSVs.
Also handles core nodes with a ``parent`` key — these get an
implicit FK edge to their parent type.
"""
has_edges = any(
spec.get("connections", {}).get("fk_edges")
or spec.get("parent")
for spec in all_specs
)
if not has_edges:
return
if self.verbose:
print(" Loading FK edges...")
for spec in all_specs:
node_type = spec["_node_type"]
csv_path = spec.get("csv")
fk_edges = spec.get("connections", {}).get("fk_edges", {})
# Core nodes with "parent" key get an implicit FK edge
parent_type = spec.get("parent")
parent_fk = spec.get("parent_fk")
if parent_type and parent_fk:
edge_type = f"OF_{parent_type.upper()}"
fk_edges = diBlueprintLoader._load_junction_edges method · python · L449-L527 (79 LOC)kglite/blueprint/loader.py
def _load_junction_edges(self, all_specs: list[dict[str, Any]]) -> None:
"""Create edges from junction (many-to-many) CSVs."""
has_junctions = any(
spec.get("connections", {}).get("junction_edges")
for spec in all_specs
)
if not has_junctions:
return
if self.verbose:
print(" Loading junction edges...")
for spec in all_specs:
node_type = spec["_node_type"]
junction_edges = (
spec.get("connections", {}).get("junction_edges", {})
)
for edge_type, junc_def in junction_edges.items():
csv_path = junc_def.get("csv")
if not csv_path:
self._report_error(
node_type,
f"Junction edge {edge_type} missing 'csv' key",
)
continue
try:
df = self._read_csv(csv_pathBlueprintLoader._read_csv method · python · L531-L542 (12 LOC)kglite/blueprint/loader.py
def _read_csv(self, relative_path: str) -> pd.DataFrame:
"""Read a CSV, caching by path. Returns a copy to avoid mutation."""
if relative_path in self._csv_cache:
return self._csv_cache[relative_path].copy()
full_path = self.root / relative_path
if not full_path.exists():
raise FileNotFoundError(f"CSV file not found: {full_path}")
df = pd.read_csv(full_path, low_memory=False)
self._csv_cache[relative_path] = df
return df.copy()BlueprintLoader._apply_filter method · python · L544-L574 (31 LOC)kglite/blueprint/loader.py
def _apply_filter(
self, df: pd.DataFrame, filt: dict[str, Any]
) -> pd.DataFrame:
"""Apply filters to a DataFrame.
Supports two forms:
- Simple equality: ``{"column": value}``
- Operator dict: ``{"column": {"!=": value}}``
Supported operators: ``=``, ``!=``, ``>``, ``<``, ``>=``, ``<=``.
"""
for col, val in filt.items():
if col not in df.columns:
continue
if isinstance(val, dict):
for op, operand in val.items():
if op == "!=":
df = df[df[col] != operand]
elif op == ">":
df = df[df[col] > operand]
elif op == "<":
df = df[df[col] < operand]
elif op == ">=":
df = df[df[col] >= operand]
elif op == "<=":
df = df[df[col] <= operand]
BlueprintLoader._build_column_types method · python · L576-L588 (13 LOC)kglite/blueprint/loader.py
def _build_column_types(
self, properties: dict[str, str]
) -> dict[str, str]:
"""Convert blueprint property types to KGLite column_types dict."""
result = {}
for col, typ in properties.items():
if typ in _SPATIAL_TYPES:
# Spatial types are handled separately via set_spatial
continue
mapped = _TYPE_MAP.get(typ)
if mapped:
result[col] = mapped
return resultBlueprintLoader._convert_geometry method · python · L590-L645 (56 LOC)kglite/blueprint/loader.py
def _convert_geometry(
self,
df: pd.DataFrame,
properties: dict[str, str],
node_type: str,
) -> pd.DataFrame:
"""Convert _geometry GeoJSON column to WKT + centroid lat/lon."""
try:
from shapely.geometry import shape as shapely_shape
except ImportError:
raise ImportError(
"Blueprint uses geometry/location types which require shapely. "
"Install with: pip install shapely"
) from None
if "_geometry" not in df.columns:
self._report_error(
node_type,
"Blueprint uses geometry types but CSV has no '_geometry' column",
)
return df
df = df.copy()
# Find which columns need which geometry-derived values
wkt_col = None
lat_col = None
lon_col = None
for col, typ in properties.items():
if typ == "geometry":
wkt_col = All rows above produced by Repobility · https://repobility.com
BlueprintLoader._apply_spatial_config method · python · L647-L668 (22 LOC)kglite/blueprint/loader.py
def _apply_spatial_config(
self, node_type: str, properties: dict[str, str]
) -> None:
"""Call set_spatial() based on blueprint property types."""
lat_col = None
lon_col = None
geom_col = None
for col, typ in properties.items():
if typ == "location.lat":
lat_col = col
elif typ == "location.lon":
lon_col = col
elif typ == "geometry":
geom_col = col
location = (lat_col, lon_col) if lat_col and lon_col else None
self.graph.set_spatial(
node_type,
location=location,
geometry=geom_col,
)BlueprintLoader._prepare_timeseries method · python · L670-L737 (68 LOC)kglite/blueprint/loader.py
def _prepare_timeseries(
self,
df: pd.DataFrame,
ts_config: dict[str, Any],
skip: list[str],
) -> tuple[dict[str, Any], pd.DataFrame]:
"""Prepare timeseries parameter for add_nodes() inline loading.
Renames DataFrame columns from CSV names to channel names.
Returns the timeseries dict and the (possibly modified) DataFrame.
"""
df = df.copy()
time_key = ts_config["time_key"]
channels_map = ts_config.get("channels", {})
resolution = ts_config.get("resolution")
units = ts_config.get("units", {})
# time_key can be a string (single column) or dict (composite)
if isinstance(time_key, str):
ts_time = time_key
else:
# Dict: {"year": "col_y", "month": "col_m"} -> use as-is for add_nodes
ts_time = time_key
# Auto-skip rows where any time component is 0 (e.g. month=0 annual
# totals). A zero month/day/hour _parse_all function · python · L41-L57 (17 LOC)kglite/code_tree/builder.py
def _parse_all(src_root: Path, verbose: bool = False) -> tuple[ParseResult, frozenset[str]]:
"""Auto-detect languages and parse all source files."""
parsers = get_parsers_for_directory(src_root)
if not parsers:
raise FileNotFoundError(
f"No supported source files found in {src_root}"
)
combined = ParseResult()
noise_names: set[str] = set()
for parser in parsers:
if verbose:
print(f"Parsing {parser.language_name} files...")
result = parser.parse_directory(src_root)
combined.merge(result)
noise_names.update(parser.noise_names)
return combined, frozenset(noise_names)_reprefix function · python · L60-L70 (11 LOC)kglite/code_tree/builder.py
def _reprefix(value: str, old_prefix: str, new_prefix: str, sep: str) -> str:
"""Replace a module path prefix in a qualified name.
Returns value unchanged if it doesn't start with old_prefix
(e.g. short names like base class names).
"""
if value == old_prefix:
return new_prefix
if value.startswith(old_prefix + sep):
return new_prefix + value[len(old_prefix):]
return value_parse_all_roots function · python · L73-L206 (134 LOC)kglite/code_tree/builder.py
def _parse_all_roots(
project_root: Path,
source_roots: list[SourceRoot],
verbose: bool = False,
) -> tuple[ParseResult, frozenset[str]]:
"""Parse source files from specific source roots (manifest-guided)."""
combined = ParseResult()
all_noise: set[str] = set()
parsed_dirs: list[Path] = [] # track parsed roots to detect overlaps
for root in source_roots:
if not root.path.is_dir():
if verbose:
print(f" Skipping missing source root: {root.path}")
continue
# Skip if this root is a subdirectory of an already-parsed root
# (e.g. xarray/tests/ is inside xarray/). Instead, just apply
# the is_test flag to matching entities already in combined.
already_covered = any(
root.path != d and root.path.is_relative_to(d)
for d in parsed_dirs
)
if already_covered:
if root.is_test:
rel_prefix = str(root.path.relative_to(p_get_separator function · python · L212-L217 (6 LOC)kglite/code_tree/builder.py
def _get_separator(language: str) -> str:
if language in ("rust", "cpp"):
return "::"
elif language in ("python", "java", "csharp"):
return "."
return "/" # go, c, typescript, javascript_build_modules function · python · L220-L255 (36 LOC)kglite/code_tree/builder.py
def _build_modules(files: list[FileInfo]) -> list[dict]:
"""Build module nodes from file module paths and submodule declarations."""
modules = {}
# Map module_path → file_path from parsed files
file_module_paths: dict[str, str] = {}
for f in files:
file_module_paths[f.module_path] = f.path
for f in files:
sep = _get_separator(f.language)
path = f.module_path
if path not in modules:
parts = path.split(sep)
modules[path] = {
"qualified_name": path,
"name": parts[-1] if parts else path,
"path": f.path,
"language": f.language,
}
for sub_name in f.submodule_declarations:
child_path = f"{path}{sep}{sub_name}"
if child_path not in modules:
# Try to resolve path from a parsed file with this module_path
child_file_path = file_module_paths.get(child_path, "")
if _build_contains_edges function · python · L258-L268 (11 LOC)kglite/code_tree/builder.py
def _build_contains_edges(files: list[FileInfo]) -> list[dict]:
"""Build Module CONTAINS Module edges from submodule declarations."""
edges = []
for f in files:
sep = _get_separator(f.language)
for sub_name in f.submodule_declarations:
edges.append({
"parent": f.module_path,
"child": f"{f.module_path}{sep}{sub_name}",
})
return edgesRepobility analyzer · published findings · https://repobility.com
_infer_lang_group function · python · L271-L277 (7 LOC)kglite/code_tree/builder.py
def _infer_lang_group(qualified_name: str) -> str:
"""Infer language group from qualified name separator convention."""
if "::" in qualified_name:
return "rust_cpp"
elif "/" in qualified_name:
return "go_ts_js"
return "python_java"_build_call_edges function · python · L280-L402 (123 LOC)kglite/code_tree/builder.py
def _build_call_edges(all_functions: list[FunctionInfo],
max_targets: int = 5,
excluded_names: frozenset[str] = frozenset()) -> pd.DataFrame:
"""Resolve function calls using tiered scope-aware name matching.
Resolution priority (first match wins):
1. Receiver hint — "Receiver.method" narrows to targets whose owner matches
2. Same owner — caller and target share the same qualified_name prefix
3. Same file — caller and target are defined in the same source file
4. Same language — caller and target use the same separator convention
5. Global fallback — all targets with matching bare name
Names in excluded_names are skipped (common stdlib methods).
Names with more than max_targets definitions are skipped as too ambiguous.
Note: Receiver hints are syntactic (field names, not resolved types).
A call like ``self.inner.method()`` produces hint ``"inner"``, which
won't match a target owned b_build_type_relationship_edges function · python · L405-L475 (71 LOC)kglite/code_tree/builder.py
def _build_type_relationship_edges(
type_rels: list[TypeRelationship],
known_interfaces: set[str],
name_to_qname: dict[str, str],
) -> tuple[list[dict], list[dict], list[dict], list[dict]]:
"""Build implements, extends, and has_method edges from TypeRelationships.
Returns (implements_edges, extends_edges, has_method_edges, external_traits).
External traits are those referenced in impl blocks but not defined locally.
"""
implements_edges = []
extends_edges = []
has_method_edges = []
seen_impl = set()
seen_ext = set()
external_traits: dict[str, dict] = {} # name -> node dict
def resolve(name: str) -> str:
return name_to_qname.get(name, name)
for tr in type_rels:
if tr.relationship == "inherent":
for method in tr.methods:
for sep in ("::", "."):
if sep in method.qualified_name:
owner = method.qualified_name.rsplit(sep, 1)[0]
_build_import_edges function · python · L478-L493 (16 LOC)kglite/code_tree/builder.py
def _build_import_edges(files: list[FileInfo], known_modules: set[str]) -> list[dict]:
"""Build File IMPORTS Module edges from import declarations."""
edges = []
for f in files:
sep = _get_separator(f.language)
for use_path in f.imports:
parts = use_path.split(sep)
for end in range(len(parts), 0, -1):
candidate = sep.join(parts[:end])
if candidate in known_modules:
edges.append({
"file_path": f.path,
"module": candidate,
})
break
return edges_build_defines_edges function · python · L496-L530 (35 LOC)kglite/code_tree/builder.py
def _build_defines_edges(result: ParseResult) -> list[dict]:
"""Build File DEFINES item edges."""
edges = []
for fn in result.functions:
if not fn.is_method:
edges.append({
"file_path": fn.file_path,
"item_qname": fn.qualified_name,
"item_type": "Function",
})
for cls in result.classes:
edges.append({
"file_path": cls.file_path,
"item_qname": cls.qualified_name,
"item_type": NODE_TYPE_MAP[cls.kind],
})
for enum in result.enums:
edges.append({
"file_path": enum.file_path,
"item_qname": enum.qualified_name,
"item_type": "Enum",
})
for iface in result.interfaces:
edges.append({
"file_path": iface.file_path,
"item_qname": iface.qualified_name,
"item_type": NODE_TYPE_MAP[iface.kind],
})
for const in result.constants:
edges._add_typed_connections function · python · L806-L831 (26 LOC)kglite/code_tree/builder.py
def _add_typed_connections(graph, df, conn_type, source_field, target_field,
classes, interfaces):
"""Add connections where source/target types depend on the entity kind."""
src_type_map = {}
for c in classes:
src_type_map[c.name] = NODE_TYPE_MAP[c.kind]
src_type_map[c.qualified_name] = NODE_TYPE_MAP[c.kind]
tgt_type_map = {}
for i in interfaces:
tgt_type_map[i.name] = NODE_TYPE_MAP[i.kind]
tgt_type_map[i.qualified_name] = NODE_TYPE_MAP[i.kind]
groups: dict[tuple[str, str], list] = defaultdict(list)
for _, row in df.iterrows():
src_nt = src_type_map.get(row[source_field], "Struct")
tgt_nt = tgt_type_map.get(row[target_field], "Trait")
groups[(src_nt, tgt_nt)].append(row.to_dict())
for (src_nt, tgt_nt), rows in groups.items():
sub_df = pd.DataFrame(rows)
graph.add_connections(
data=sub_df, connection_type=conn_type,
source_type=src_add_typed_connections_same function · python · L834-L854 (21 LOC)kglite/code_tree/builder.py
def _add_typed_connections_same(graph, df, conn_type, source_field, target_field,
classes):
"""Add connections between same-type entities (e.g. Class EXTENDS Class)."""
type_map = {}
for c in classes:
type_map[c.name] = NODE_TYPE_MAP[c.kind]
type_map[c.qualified_name] = NODE_TYPE_MAP[c.kind]
groups: dict[tuple[str, str], list] = defaultdict(list)
for _, row in df.iterrows():
src_nt = type_map.get(row[source_field], "Class")
tgt_nt = type_map.get(row[target_field], "Class")
groups[(src_nt, tgt_nt)].append(row.to_dict())
for (src_nt, tgt_nt), rows in groups.items():
sub_df = pd.DataFrame(rows)
graph.add_connections(
data=sub_df, connection_type=conn_type,
source_type=src_nt, source_id_field=source_field,
target_type=tgt_nt, target_id_field=target_field,
)_add_has_method_connections function · python · L857-L898 (42 LOC)kglite/code_tree/builder.py
def _add_has_method_connections(graph, df, classes, interfaces=None):
"""Add HAS_METHOD connections, routing through correct source node type."""
type_map = {}
for c in classes:
type_map[c.name] = NODE_TYPE_MAP[c.kind]
type_map[c.qualified_name] = NODE_TYPE_MAP[c.kind]
if interfaces:
for i in interfaces:
type_map[i.name] = NODE_TYPE_MAP[i.kind]
type_map[i.qualified_name] = NODE_TYPE_MAP[i.kind]
schema = graph.schema()
# Pick a default type from what's available
default_type = None
for candidate in ("Class", "Struct", "Trait", "Interface", "Protocol"):
if candidate in schema["node_types"]:
default_type = candidate
break
if default_type is None:
return
groups: dict[str, list] = defaultdict(list)
for _, row in df.iterrows():
owner = row["owner"]
for sep in ("::", "."):
if sep in owner:
name = owner.rsplit(sep, 1)[Repobility · open methodology · https://repobility.com/research/
_build_uses_type_edges function · python · L901-L956 (56 LOC)kglite/code_tree/builder.py
def _build_uses_type_edges(
functions: list[FunctionInfo],
classes: list,
enums: list,
interfaces: list,
) -> dict[str, list[dict]]:
"""Build USES_TYPE edges from functions to types referenced in signatures.
Scans each function's parameters and return_type for known type names,
producing (Function)-[:USES_TYPE]->(Struct|Class|Enum|Trait|...) edges.
"""
# Collect all known type names → (qualified_name, node_type)
type_lookup: dict[str, tuple[str, str]] = {}
for c in classes:
if len(c.name) > 1: # skip single-char generics
type_lookup[c.name] = (c.qualified_name, NODE_TYPE_MAP[c.kind])
for e in enums:
if len(e.name) > 1:
type_lookup[e.name] = (e.qualified_name, "Enum")
for i in interfaces:
if len(i.name) > 1:
type_lookup[i.name] = (i.qualified_name, NODE_TYPE_MAP[i.kind])
if not type_lookup:
return {}
# Build regex matching any known type name as a whole wor_build_ffi_exposes_edges function · python · L959-L1002 (44 LOC)kglite/code_tree/builder.py
def _build_ffi_exposes_edges(result: ParseResult) -> list[dict]:
"""Build EXPOSES edges from #[pymodule] functions to #[pyclass]/#[pyfunction] items.
Connects the FFI module entry point to the types and functions it registers,
showing the cross-language boundary for Maturin/PyO3 projects.
"""
# Find pymodule functions
pymodule_fns = [f for f in result.functions
if f.metadata.get("is_pymodule")]
if not pymodule_fns:
return []
# Find all pyclass structs and pyfunction functions
pyclass_qnames = {}
for c in result.classes:
if c.metadata.get("is_pyclass"):
py_name = c.metadata.get("py_name", c.name)
pyclass_qnames[c.name] = (c.qualified_name, "Struct", py_name)
pyfunction_qnames = {}
for f in result.functions:
if f.metadata.get("ffi_kind") == "pyo3" and not f.is_method and not f.metadata.get("is_pymodule"):
py_name = f.metadata.get("py_name", f.name)
build function · python · L1008-L1149 (142 LOC)kglite/code_tree/builder.py
def build(
src_dir: str | Path,
*,
save_to: str | Path | None = None,
verbose: bool = False,
include_tests: bool = True,
) -> kglite.KnowledgeGraph:
"""Parse a codebase and build a KGLite knowledge graph.
If a project manifest (pyproject.toml, Cargo.toml) is found, uses it
to discover source roots and extract project metadata. Otherwise
falls back to scanning the entire directory.
Args:
src_dir: Path to a source directory or manifest file.
save_to: Optional path to save the graph as a .kgl file.
verbose: If True, print progress information.
include_tests: If True, also parse test directories found in the
manifest. Has no effect when no manifest is detected.
Returns:
A KnowledgeGraph populated with code entities and relationships.
Raises:
FileNotFoundError: If src_dir doesn't exist or contains no
supported files.
Example::
from kglite.code_tree impoLanguageParser.parse_directory method · python · L38-L48 (11 LOC)kglite/code_tree/parsers/base.py
def parse_directory(self, src_root: Path) -> ParseResult:
"""Parse all matching files under src_root."""
combined = ParseResult()
source_files = []
for ext in self.file_extensions:
source_files.extend(sorted(src_root.rglob(f"*{ext}")))
print(f" Found {len(source_files)} {self.language_name} files")
for filepath in source_files:
result = self.parse_file(filepath, src_root)
combined.merge(result)
return combinedget_type_parameters function · python · L64-L81 (18 LOC)kglite/code_tree/parsers/base.py
def get_type_parameters(node, source: bytes,
node_type: str = "type_parameters") -> str | None:
"""Extract generic/template type parameters from a declaration node.
Looks for a child of the given node_type (e.g. "type_parameters",
"type_parameter_list") and returns the inner text with angle brackets
stripped. Returns None if no type parameters are found.
"""
for child in node.children:
if child.type == node_type:
text = source[child.start_byte:child.end_byte].decode("utf8")
# Strip surrounding < > or [ ] if present
if text.startswith("<") and text.endswith(">"):
text = text[1:-1].strip()
elif text.startswith("[") and text.endswith("]"):
text = text[1:-1].strip()
return text if text else None
return Noneextract_parameters_from_signature function · python · L87-L111 (25 LOC)kglite/code_tree/parsers/base.py
def extract_parameters_from_signature(signature: str) -> str | None:
"""Extract the parameter list from a function signature string.
Finds the first balanced (...) group, filters out self/cls, and
returns the cleaned parameter text or None if empty.
"""
start = signature.find("(")
if start == -1:
return None
depth = 0
end = start
for i in range(start, len(signature)):
if signature[i] == "(":
depth += 1
elif signature[i] == ")":
depth -= 1
if depth == 0:
end = i
break
params_text = signature[start + 1:end].strip()
if not params_text:
return None
parts = [p.strip() for p in params_text.split(",")]
filtered = [p for p in parts if p and p.strip() not in _SELF_PARAMS]
return ", ".join(filtered) if filtered else Noneextract_comment_annotations function · python · L120-L145 (26 LOC)kglite/code_tree/parsers/base.py
def extract_comment_annotations(
root_node,
source: bytes,
comment_types: tuple[str, ...] = ("line_comment", "block_comment", "comment"),
) -> list[dict] | None:
"""Recursively scan all comment nodes for TODO/FIXME/etc annotations.
Returns a list of dicts with keys: kind, text, line.
Returns None if no annotations found.
"""
annotations: list[dict] = []
def walk(node):
if node.type in comment_types:
text = source[node.start_byte:node.end_byte].decode("utf8")
for match in _ANNOTATION_PATTERN.finditer(text):
annotations.append({
"kind": match.group(1).upper(),
"text": match.group(2).strip()[:200],
"line": node.start_point[0] + 1,
})
for child in node.children:
walk(child)
walk(root_node)
return annotations if annotations else None_BaseCCppParser._get_name method · python · L58-L64 (7 LOC)kglite/code_tree/parsers/cpp.py
def _get_name(self, node, source: bytes,
name_type: str = "identifier") -> str | None:
for child in node.children:
if child.type in (name_type, "type_identifier",
"field_identifier"):
return node_text(child, source)
return NoneSource: Repobility analyzer · https://repobility.com
_BaseCCppParser._get_doc_comment method · python · L66-L99 (34 LOC)kglite/code_tree/parsers/cpp.py
def _get_doc_comment(self, node, source: bytes) -> str | None:
"""Walk backward to collect /** */ or /// doc comments."""
sibling = node.prev_named_sibling
if sibling and sibling.type == "comment":
text = node_text(sibling, source).strip()
# Block comment: /** ... */
if text.startswith("/**"):
text = text[3:]
if text.endswith("*/"):
text = text[:-2]
lines = []
for line in text.split("\n"):
line = line.strip()
if line.startswith("* "):
line = line[2:]
elif line.startswith("*"):
line = line[1:]
lines.append(line)
return "\n".join(lines).strip()
# Line comment: ///
if text.startswith("///"):
doc_lines = []
while sibling is not None and sibling.typ_BaseCCppParser._get_signature method · python · L101-L107 (7 LOC)kglite/code_tree/parsers/cpp.py
def _get_signature(self, node, source: bytes) -> str:
parts = []
for child in node.children:
if child.type in ("compound_statement", "field_declaration_list"):
break
parts.append(node_text(child, source))
return " ".join(parts)_BaseCCppParser._get_return_type method · python · L109-L120 (12 LOC)kglite/code_tree/parsers/cpp.py
def _get_return_type(self, node, source: bytes) -> str | None:
"""Return type is the first type-like child before the declarator."""
for child in node.children:
if child.type in ("function_declarator", "identifier",
"pointer_declarator"):
break
if child.type in ("primitive_type", "type_identifier",
"sized_type_specifier", "struct_specifier",
"enum_specifier", "union_specifier",
"type_qualifier"):
return node_text(child, source)
return None_BaseCCppParser._has_storage_class method · python · L122-L128 (7 LOC)kglite/code_tree/parsers/cpp.py
def _has_storage_class(self, node, source: bytes,
specifier: str) -> bool:
for child in node.children:
if child.type == "storage_class_specifier":
if node_text(child, source) == specifier:
return True
return False_BaseCCppParser._extract_calls method · python · L134-L179 (46 LOC)kglite/code_tree/parsers/cpp.py
def _extract_calls(self, body_node, source: bytes) -> list[tuple[str, int]]:
"""Extract function/method names called directly within a block.
Emits qualified calls where possible: "Receiver.method" for
field/member access and "Type.method" for scoped identifiers.
Returns list of (call_name, line_number) tuples.
Scope-aware: does not descend into nested functions or lambdas —
their calls belong to them, not the parent.
"""
calls: list[tuple[str, int]] = []
def walk(node):
if node.type == "call_expression":
line = node.start_point[0] + 1
func = node.children[0] if node.children else None
if func:
if func.type == "identifier":
calls.append((node_text(func, source), line))
elif func.type == "field_expression":
field = func.child_by_field_name("field")
_BaseCCppParser._file_to_module_path method · python · L181-L191 (11 LOC)kglite/code_tree/parsers/cpp.py
def _file_to_module_path(self, filepath: Path, src_root: Path) -> str:
rel = filepath.relative_to(src_root)
parts = list(rel.parts)
# Strip extension from last part
name = parts[-1]
for ext in (".c", ".h", ".cpp", ".cc", ".cxx",
".hpp", ".hh", ".hxx"):
if name.endswith(ext):
parts[-1] = name[:-len(ext)]
break
return "/".join(parts)_BaseCCppParser._extract_struct_fields method · python · L193-L232 (40 LOC)kglite/code_tree/parsers/cpp.py
def _extract_struct_fields(self, node, source: bytes,
owner_qname: str,
rel_path: str) -> list[AttributeInfo]:
"""Extract fields from a struct/union field_declaration_list."""
attrs: list[AttributeInfo] = []
for child in node.children:
if child.type == "field_declaration_list":
for field in child.children:
if field.type == "field_declaration":
type_ann = None
names: list[str] = []
for fc in field.children:
if fc.type in ("primitive_type",
"type_identifier",
"sized_type_specifier",
"struct_specifier",
"enum_specifier",
"union_specifier"):
_BaseCCppParser._get_enum_variants method · python · L234-L243 (10 LOC)kglite/code_tree/parsers/cpp.py
def _get_enum_variants(self, node, source: bytes) -> list[str]:
variants: list[str] = []
for child in node.children:
if child.type == "enumerator_list":
for sub in child.children:
if sub.type == "enumerator":
name = self._get_name(sub, source)
if name:
variants.append(name)
return variantsAll rows above produced by Repobility · https://repobility.com
_BaseCCppParser._parse_function method · python · L247-L302 (56 LOC)kglite/code_tree/parsers/cpp.py
def _parse_function(self, node, source: bytes, module_path: str,
rel_path: str, is_method: bool = False,
owner: str | None = None) -> FunctionInfo:
"""Parse a function_definition or declaration node."""
# Extract name from the declarator
name = "unknown"
declarator = None
for child in node.children:
if child.type in ("function_declarator", "pointer_declarator"):
declarator = child
break
if declarator:
# Unwrap pointer_declarator
while declarator and declarator.type == "pointer_declarator":
for c in declarator.children:
if c.type == "function_declarator":
declarator = c
break
else:
break
if declarator:
fn_name = self._get_name(declarator, source)
if fn_name:
_BaseCCppParser._parse_struct method · python · L304-L324 (21 LOC)kglite/code_tree/parsers/cpp.py
def _parse_struct(self, node, source: bytes, module_path: str,
rel_path: str, result: ParseResult):
"""Parse a struct_specifier (standalone or in type_definition)."""
name = self._get_name(node, source, "type_identifier")
if not name:
return
sep = "::" if self.language_name == "cpp" else "/"
qname = f"{module_path}{sep}{name}"
result.classes.append(ClassInfo(
name=name,
qualified_name=qname,
kind="struct",
visibility="public",
file_path=rel_path,
line_number=node.start_point[0] + 1,
end_line=node.end_point[0] + 1,
docstring=self._get_doc_comment(node, source),
))
result.attributes.extend(
self._extract_struct_fields(node, source, qname, rel_path))_BaseCCppParser._parse_enum method · python · L326-L342 (17 LOC)kglite/code_tree/parsers/cpp.py
def _parse_enum(self, node, source: bytes, module_path: str,
rel_path: str, result: ParseResult):
"""Parse an enum_specifier."""
name = self._get_name(node, source, "type_identifier")
if not name:
return
sep = "::" if self.language_name == "cpp" else "/"
result.enums.append(EnumInfo(
name=name,
qualified_name=f"{module_path}{sep}{name}",
visibility="public",
file_path=rel_path,
line_number=node.start_point[0] + 1,
end_line=node.end_point[0] + 1,
docstring=self._get_doc_comment(node, source),
variants=self._get_enum_variants(node, source),
))page 1 / 20next ›