← back to kkollsga__kglite

Function bodies 1,000 total

All specs Real LLM only Function bodies
graph_overview function · python · L81-L99 (19 LOC)
examples/mcp_server.py
def graph_overview(
    types: list[str] | None = None,
    connections: bool | list[str] | None = None,
    cypher: bool | list[str] | None = None,
) -> str:
    """Get graph schema, connection details, or Cypher language reference.

    Three independent axes — call with no args first for the overview:
      graph_overview()                            — inventory of node types
      graph_overview(types=["Type"])              — property schemas, samples
      graph_overview(connections=True)            — all connection types
      graph_overview(connections=["CITES"])        — deep-dive with properties
      graph_overview(cypher=True)                 — Cypher reference
      graph_overview(cypher=["temporal","MATCH"])  — detailed docs with examples
    """
    try:
        return graph.describe(types=types, connections=connections, cypher=cypher)
    except Exception as e:
        return f"Error: {e}"
cypher_query function · python · L103-L120 (18 LOC)
examples/mcp_server.py
def cypher_query(query: str) -> str:
    """Run a Cypher query against the knowledge graph. Returns up to 200 rows.

    Supports MATCH, WHERE, RETURN, ORDER BY, LIMIT, WITH, OPTIONAL MATCH,
    UNWIND, UNION, CREATE, SET, DELETE, MERGE, aggregations, path patterns,
    CALL procedures (pagerank, louvain, etc.), and spatial/temporal functions.
    Call graph_overview() first if you need the schema."""
    try:
        result = graph.cypher(query)
        if len(result) == 0:
            return "No results."
        rows = [str(dict(row)) for row in result[:200]]
        header = f"{len(result)} row(s)"
        if len(result) > 200:
            header += " (showing first 200)"
        return header + ":\n" + "\n".join(rows)
    except Exception as e:
        return f"Cypher error: {e}"
bug_report function · python · L124-L129 (6 LOC)
examples/mcp_server.py
def bug_report(query: str, result: str, expected: str, description: str) -> str:
    """File a Cypher bug report to reported_bugs.md."""
    try:
        return graph.bug_report(query, result, expected, description)
    except Exception as e:
        return f"Error: {e}"
from_blueprint function · python · L28-L69 (42 LOC)
kglite/blueprint/loader.py
def from_blueprint(
    blueprint_path: Union[str, Path],
    *,
    verbose: bool = False,
    save: bool = True,
) -> kglite.KnowledgeGraph:
    """Parse a JSON blueprint and build a KnowledgeGraph from CSV files.

    Args:
        blueprint_path: Path to the blueprint JSON file.
        verbose: If True, print progress information.
        save: If True and the blueprint specifies an output path, save the graph.

    Returns:
        A populated KnowledgeGraph.

    Raises:
        FileNotFoundError: If the blueprint file is missing.
        ValueError: If the blueprint JSON is malformed.
    """
    blueprint_path = Path(blueprint_path)
    if not blueprint_path.exists():
        raise FileNotFoundError(f"Blueprint file not found: {blueprint_path}")

    with open(blueprint_path) as f:
        raw = json.load(f)

    loader = BlueprintLoader(raw, verbose=verbose)
    graph = loader.build()

    if loader.errors:
        print(f"\nBlueprint loaded with {len(loader.errors)} warning(
BlueprintLoader.__init__ method · python · L75-L100 (26 LOC)
kglite/blueprint/loader.py
    def __init__(self, raw: dict[str, Any], verbose: bool = False):
        self.raw = raw
        self.verbose = verbose
        self.graph = kglite.KnowledgeGraph()
        self.errors: list[dict[str, str]] = []

        # Parse settings
        settings = raw.get("settings", {})
        self.root = Path(settings.get("root", "."))
        output = settings.get("output")
        self.output_path = self.root / output if output else None

        # Node specs keyed by type name
        self.nodes: dict[str, dict[str, Any]] = raw.get("nodes", {})

        # CSV cache: relative_path -> DataFrame
        self._csv_cache: dict[str, pd.DataFrame] = {}

        # Track which node types have been loaded (for edge creation)
        self._loaded_types: set[str] = set()

        # Stats
        self.stats: dict[str, Any] = {
            "nodes_by_type": {},
            "edges_by_type": {},
        }
BlueprintLoader.build method · python · L102-L140 (39 LOC)
kglite/blueprint/loader.py
    def build(self) -> kglite.KnowledgeGraph:
        """Execute the full loading sequence."""
        t0 = time.time()
        if self.verbose:
            print(f"Loading blueprint...")
            print(f"  Root: {self.root}")

        # Collect all node specs (core + sub_nodes) with their parent info
        core_specs, sub_specs = self._collect_specs()

        # Phase 1: Manual nodes (no csv)
        self._load_manual_nodes(core_specs, sub_specs)

        # Phase 2: Core nodes (with csv)
        self._load_nodes(core_specs, phase_name="core nodes")

        # Phase 3: Sub-nodes
        self._load_nodes(sub_specs, phase_name="sub-nodes")

        # Phase 3b: Register parent types for supporting node tiers
        for sub in sub_specs:
            sub_type = sub["_node_type"]
            parent = sub["_parent_type"]
            if sub_type in self._loaded_types:
                self.graph.set_parent_type(sub_type, parent)

        # Phase 4: FK edges
        self._load_fk_edges(cor
BlueprintLoader._collect_specs method · python · L144-L168 (25 LOC)
kglite/blueprint/loader.py
    def _collect_specs(
        self,
    ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
        """Separate core nodes from sub_nodes, annotating each with node_type."""
        core = []
        subs = []
        for node_type, spec in self.nodes.items():
            spec = dict(spec)  # shallow copy
            spec["_node_type"] = node_type
            if spec.get("csv") is not None:
                core.append(spec)
            elif not spec.get("_is_manual"):
                # Nodes without csv are manual — handled separately
                spec["_is_manual"] = True
                core.append(spec)

            # Collect sub_nodes
            for sub_type, sub_spec in spec.get("sub_nodes", {}).items():
                sub = dict(sub_spec)
                sub["_node_type"] = sub_type
                sub["_parent_type"] = node_type
                sub["_parent_pk"] = spec.get("pk", "id")
                subs.append(sub)

        return core, subs
Source: Repobility analyzer · https://repobility.com
BlueprintLoader._load_manual_nodes method · python · L172-L238 (67 LOC)
kglite/blueprint/loader.py
    def _load_manual_nodes(
        self,
        core_specs: list[dict[str, Any]],
        sub_specs: list[dict[str, Any]],
    ) -> None:
        """Create nodes for types without a CSV, from distinct FK values."""
        all_specs = core_specs + sub_specs
        manual_types = {
            s["_node_type"]: s for s in core_specs if s.get("_is_manual")
        }
        if not manual_types:
            return

        if self.verbose:
            print("  Loading manual nodes...")

        for manual_type, manual_spec in manual_types.items():
            distinct_values: set[Any] = set()

            # Scan all FK edges across all specs to find references to this type
            for spec in all_specs:
                for _edge_type, edge_def in (
                    spec.get("connections", {}).get("fk_edges", {}).items()
                ):
                    if edge_def.get("target") != manual_type:
                        continue
                    csv_path = spec.get("csv")
 
BlueprintLoader._load_nodes method · python · L242-L341 (100 LOC)
kglite/blueprint/loader.py
    def _load_nodes(
        self, specs: list[dict[str, Any]], phase_name: str
    ) -> None:
        """Load nodes from CSV files."""
        loadable = [s for s in specs if not s.get("_is_manual") and s.get("csv")]
        if not loadable:
            return

        if self.verbose:
            print(f"  Loading {phase_name}...")

        for spec in loadable:
            node_type = spec["_node_type"]
            csv_path = spec["csv"]

            try:
                df = self._read_csv(csv_path)
            except FileNotFoundError:
                self._report_error(node_type, f"CSV not found: {csv_path}")
                continue

            # Apply filter
            filt = spec.get("filter")
            if filt:
                df = self._apply_filter(df, filt)

            # Handle pk: "auto"
            pk = spec.get("pk", "id")
            if pk == "auto":
                pk = f"_{node_type}_id"
                df = df.copy()
                df[pk] = range(1, len(df) + 
BlueprintLoader._load_fk_edges method · python · L345-L445 (101 LOC)
kglite/blueprint/loader.py
    def _load_fk_edges(self, all_specs: list[dict[str, Any]]) -> None:
        """Create edges from FK columns in source CSVs.

        Also handles core nodes with a ``parent`` key — these get an
        implicit FK edge to their parent type.
        """
        has_edges = any(
            spec.get("connections", {}).get("fk_edges")
            or spec.get("parent")
            for spec in all_specs
        )
        if not has_edges:
            return

        if self.verbose:
            print("  Loading FK edges...")

        for spec in all_specs:
            node_type = spec["_node_type"]
            csv_path = spec.get("csv")
            fk_edges = spec.get("connections", {}).get("fk_edges", {})

            # Core nodes with "parent" key get an implicit FK edge
            parent_type = spec.get("parent")
            parent_fk = spec.get("parent_fk")
            if parent_type and parent_fk:
                edge_type = f"OF_{parent_type.upper()}"
                fk_edges = di
BlueprintLoader._load_junction_edges method · python · L449-L527 (79 LOC)
kglite/blueprint/loader.py
    def _load_junction_edges(self, all_specs: list[dict[str, Any]]) -> None:
        """Create edges from junction (many-to-many) CSVs."""
        has_junctions = any(
            spec.get("connections", {}).get("junction_edges")
            for spec in all_specs
        )
        if not has_junctions:
            return

        if self.verbose:
            print("  Loading junction edges...")

        for spec in all_specs:
            node_type = spec["_node_type"]
            junction_edges = (
                spec.get("connections", {}).get("junction_edges", {})
            )

            for edge_type, junc_def in junction_edges.items():
                csv_path = junc_def.get("csv")
                if not csv_path:
                    self._report_error(
                        node_type,
                        f"Junction edge {edge_type} missing 'csv' key",
                    )
                    continue

                try:
                    df = self._read_csv(csv_path
BlueprintLoader._read_csv method · python · L531-L542 (12 LOC)
kglite/blueprint/loader.py
    def _read_csv(self, relative_path: str) -> pd.DataFrame:
        """Read a CSV, caching by path. Returns a copy to avoid mutation."""
        if relative_path in self._csv_cache:
            return self._csv_cache[relative_path].copy()

        full_path = self.root / relative_path
        if not full_path.exists():
            raise FileNotFoundError(f"CSV file not found: {full_path}")

        df = pd.read_csv(full_path, low_memory=False)
        self._csv_cache[relative_path] = df
        return df.copy()
BlueprintLoader._apply_filter method · python · L544-L574 (31 LOC)
kglite/blueprint/loader.py
    def _apply_filter(
        self, df: pd.DataFrame, filt: dict[str, Any]
    ) -> pd.DataFrame:
        """Apply filters to a DataFrame.

        Supports two forms:
            - Simple equality: ``{"column": value}``
            - Operator dict:   ``{"column": {"!=": value}}``

        Supported operators: ``=``, ``!=``, ``>``, ``<``, ``>=``, ``<=``.
        """
        for col, val in filt.items():
            if col not in df.columns:
                continue
            if isinstance(val, dict):
                for op, operand in val.items():
                    if op == "!=":
                        df = df[df[col] != operand]
                    elif op == ">":
                        df = df[df[col] > operand]
                    elif op == "<":
                        df = df[df[col] < operand]
                    elif op == ">=":
                        df = df[df[col] >= operand]
                    elif op == "<=":
                        df = df[df[col] <= operand]
    
BlueprintLoader._build_column_types method · python · L576-L588 (13 LOC)
kglite/blueprint/loader.py
    def _build_column_types(
        self, properties: dict[str, str]
    ) -> dict[str, str]:
        """Convert blueprint property types to KGLite column_types dict."""
        result = {}
        for col, typ in properties.items():
            if typ in _SPATIAL_TYPES:
                # Spatial types are handled separately via set_spatial
                continue
            mapped = _TYPE_MAP.get(typ)
            if mapped:
                result[col] = mapped
        return result
BlueprintLoader._convert_geometry method · python · L590-L645 (56 LOC)
kglite/blueprint/loader.py
    def _convert_geometry(
        self,
        df: pd.DataFrame,
        properties: dict[str, str],
        node_type: str,
    ) -> pd.DataFrame:
        """Convert _geometry GeoJSON column to WKT + centroid lat/lon."""
        try:
            from shapely.geometry import shape as shapely_shape
        except ImportError:
            raise ImportError(
                "Blueprint uses geometry/location types which require shapely. "
                "Install with: pip install shapely"
            ) from None

        if "_geometry" not in df.columns:
            self._report_error(
                node_type,
                "Blueprint uses geometry types but CSV has no '_geometry' column",
            )
            return df

        df = df.copy()

        # Find which columns need which geometry-derived values
        wkt_col = None
        lat_col = None
        lon_col = None
        for col, typ in properties.items():
            if typ == "geometry":
                wkt_col = 
All rows above produced by Repobility · https://repobility.com
BlueprintLoader._apply_spatial_config method · python · L647-L668 (22 LOC)
kglite/blueprint/loader.py
    def _apply_spatial_config(
        self, node_type: str, properties: dict[str, str]
    ) -> None:
        """Call set_spatial() based on blueprint property types."""
        lat_col = None
        lon_col = None
        geom_col = None

        for col, typ in properties.items():
            if typ == "location.lat":
                lat_col = col
            elif typ == "location.lon":
                lon_col = col
            elif typ == "geometry":
                geom_col = col

        location = (lat_col, lon_col) if lat_col and lon_col else None
        self.graph.set_spatial(
            node_type,
            location=location,
            geometry=geom_col,
        )
BlueprintLoader._prepare_timeseries method · python · L670-L737 (68 LOC)
kglite/blueprint/loader.py
    def _prepare_timeseries(
        self,
        df: pd.DataFrame,
        ts_config: dict[str, Any],
        skip: list[str],
    ) -> tuple[dict[str, Any], pd.DataFrame]:
        """Prepare timeseries parameter for add_nodes() inline loading.

        Renames DataFrame columns from CSV names to channel names.
        Returns the timeseries dict and the (possibly modified) DataFrame.
        """
        df = df.copy()

        time_key = ts_config["time_key"]
        channels_map = ts_config.get("channels", {})
        resolution = ts_config.get("resolution")
        units = ts_config.get("units", {})

        # time_key can be a string (single column) or dict (composite)
        if isinstance(time_key, str):
            ts_time = time_key
        else:
            # Dict: {"year": "col_y", "month": "col_m"} -> use as-is for add_nodes
            ts_time = time_key

        # Auto-skip rows where any time component is 0 (e.g. month=0 annual
        # totals).  A zero month/day/hour 
_parse_all function · python · L41-L57 (17 LOC)
kglite/code_tree/builder.py
def _parse_all(src_root: Path, verbose: bool = False) -> tuple[ParseResult, frozenset[str]]:
    """Auto-detect languages and parse all source files."""
    parsers = get_parsers_for_directory(src_root)
    if not parsers:
        raise FileNotFoundError(
            f"No supported source files found in {src_root}"
        )

    combined = ParseResult()
    noise_names: set[str] = set()
    for parser in parsers:
        if verbose:
            print(f"Parsing {parser.language_name} files...")
        result = parser.parse_directory(src_root)
        combined.merge(result)
        noise_names.update(parser.noise_names)
    return combined, frozenset(noise_names)
_reprefix function · python · L60-L70 (11 LOC)
kglite/code_tree/builder.py
def _reprefix(value: str, old_prefix: str, new_prefix: str, sep: str) -> str:
    """Replace a module path prefix in a qualified name.

    Returns value unchanged if it doesn't start with old_prefix
    (e.g. short names like base class names).
    """
    if value == old_prefix:
        return new_prefix
    if value.startswith(old_prefix + sep):
        return new_prefix + value[len(old_prefix):]
    return value
_parse_all_roots function · python · L73-L206 (134 LOC)
kglite/code_tree/builder.py
def _parse_all_roots(
    project_root: Path,
    source_roots: list[SourceRoot],
    verbose: bool = False,
) -> tuple[ParseResult, frozenset[str]]:
    """Parse source files from specific source roots (manifest-guided)."""
    combined = ParseResult()
    all_noise: set[str] = set()
    parsed_dirs: list[Path] = []  # track parsed roots to detect overlaps

    for root in source_roots:
        if not root.path.is_dir():
            if verbose:
                print(f"  Skipping missing source root: {root.path}")
            continue

        # Skip if this root is a subdirectory of an already-parsed root
        # (e.g. xarray/tests/ is inside xarray/).  Instead, just apply
        # the is_test flag to matching entities already in combined.
        already_covered = any(
            root.path != d and root.path.is_relative_to(d)
            for d in parsed_dirs
        )
        if already_covered:
            if root.is_test:
                rel_prefix = str(root.path.relative_to(p
_get_separator function · python · L212-L217 (6 LOC)
kglite/code_tree/builder.py
def _get_separator(language: str) -> str:
    if language in ("rust", "cpp"):
        return "::"
    elif language in ("python", "java", "csharp"):
        return "."
    return "/"  # go, c, typescript, javascript
_build_modules function · python · L220-L255 (36 LOC)
kglite/code_tree/builder.py
def _build_modules(files: list[FileInfo]) -> list[dict]:
    """Build module nodes from file module paths and submodule declarations."""
    modules = {}
    # Map module_path → file_path from parsed files
    file_module_paths: dict[str, str] = {}
    for f in files:
        file_module_paths[f.module_path] = f.path

    for f in files:
        sep = _get_separator(f.language)
        path = f.module_path
        if path not in modules:
            parts = path.split(sep)
            modules[path] = {
                "qualified_name": path,
                "name": parts[-1] if parts else path,
                "path": f.path,
                "language": f.language,
            }
        for sub_name in f.submodule_declarations:
            child_path = f"{path}{sep}{sub_name}"
            if child_path not in modules:
                # Try to resolve path from a parsed file with this module_path
                child_file_path = file_module_paths.get(child_path, "")
                if 
_build_contains_edges function · python · L258-L268 (11 LOC)
kglite/code_tree/builder.py
def _build_contains_edges(files: list[FileInfo]) -> list[dict]:
    """Build Module CONTAINS Module edges from submodule declarations."""
    edges = []
    for f in files:
        sep = _get_separator(f.language)
        for sub_name in f.submodule_declarations:
            edges.append({
                "parent": f.module_path,
                "child": f"{f.module_path}{sep}{sub_name}",
            })
    return edges
Repobility analyzer · published findings · https://repobility.com
_infer_lang_group function · python · L271-L277 (7 LOC)
kglite/code_tree/builder.py
def _infer_lang_group(qualified_name: str) -> str:
    """Infer language group from qualified name separator convention."""
    if "::" in qualified_name:
        return "rust_cpp"
    elif "/" in qualified_name:
        return "go_ts_js"
    return "python_java"
_build_call_edges function · python · L280-L402 (123 LOC)
kglite/code_tree/builder.py
def _build_call_edges(all_functions: list[FunctionInfo],
                      max_targets: int = 5,
                      excluded_names: frozenset[str] = frozenset()) -> pd.DataFrame:
    """Resolve function calls using tiered scope-aware name matching.

    Resolution priority (first match wins):
      1. Receiver hint — "Receiver.method" narrows to targets whose owner matches
      2. Same owner — caller and target share the same qualified_name prefix
      3. Same file — caller and target are defined in the same source file
      4. Same language — caller and target use the same separator convention
      5. Global fallback — all targets with matching bare name

    Names in excluded_names are skipped (common stdlib methods).
    Names with more than max_targets definitions are skipped as too ambiguous.

    Note: Receiver hints are syntactic (field names, not resolved types).
    A call like ``self.inner.method()`` produces hint ``"inner"``, which
    won't match a target owned b
_build_type_relationship_edges function · python · L405-L475 (71 LOC)
kglite/code_tree/builder.py
def _build_type_relationship_edges(
    type_rels: list[TypeRelationship],
    known_interfaces: set[str],
    name_to_qname: dict[str, str],
) -> tuple[list[dict], list[dict], list[dict], list[dict]]:
    """Build implements, extends, and has_method edges from TypeRelationships.

    Returns (implements_edges, extends_edges, has_method_edges, external_traits).
    External traits are those referenced in impl blocks but not defined locally.
    """
    implements_edges = []
    extends_edges = []
    has_method_edges = []
    seen_impl = set()
    seen_ext = set()
    external_traits: dict[str, dict] = {}  # name -> node dict

    def resolve(name: str) -> str:
        return name_to_qname.get(name, name)

    for tr in type_rels:
        if tr.relationship == "inherent":
            for method in tr.methods:
                for sep in ("::", "."):
                    if sep in method.qualified_name:
                        owner = method.qualified_name.rsplit(sep, 1)[0]
              
_build_import_edges function · python · L478-L493 (16 LOC)
kglite/code_tree/builder.py
def _build_import_edges(files: list[FileInfo], known_modules: set[str]) -> list[dict]:
    """Build File IMPORTS Module edges from import declarations."""
    edges = []
    for f in files:
        sep = _get_separator(f.language)
        for use_path in f.imports:
            parts = use_path.split(sep)
            for end in range(len(parts), 0, -1):
                candidate = sep.join(parts[:end])
                if candidate in known_modules:
                    edges.append({
                        "file_path": f.path,
                        "module": candidate,
                    })
                    break
    return edges
_build_defines_edges function · python · L496-L530 (35 LOC)
kglite/code_tree/builder.py
def _build_defines_edges(result: ParseResult) -> list[dict]:
    """Build File DEFINES item edges."""
    edges = []
    for fn in result.functions:
        if not fn.is_method:
            edges.append({
                "file_path": fn.file_path,
                "item_qname": fn.qualified_name,
                "item_type": "Function",
            })
    for cls in result.classes:
        edges.append({
            "file_path": cls.file_path,
            "item_qname": cls.qualified_name,
            "item_type": NODE_TYPE_MAP[cls.kind],
        })
    for enum in result.enums:
        edges.append({
            "file_path": enum.file_path,
            "item_qname": enum.qualified_name,
            "item_type": "Enum",
        })
    for iface in result.interfaces:
        edges.append({
            "file_path": iface.file_path,
            "item_qname": iface.qualified_name,
            "item_type": NODE_TYPE_MAP[iface.kind],
        })
    for const in result.constants:
        edges.
_add_typed_connections function · python · L806-L831 (26 LOC)
kglite/code_tree/builder.py
def _add_typed_connections(graph, df, conn_type, source_field, target_field,
                           classes, interfaces):
    """Add connections where source/target types depend on the entity kind."""
    src_type_map = {}
    for c in classes:
        src_type_map[c.name] = NODE_TYPE_MAP[c.kind]
        src_type_map[c.qualified_name] = NODE_TYPE_MAP[c.kind]

    tgt_type_map = {}
    for i in interfaces:
        tgt_type_map[i.name] = NODE_TYPE_MAP[i.kind]
        tgt_type_map[i.qualified_name] = NODE_TYPE_MAP[i.kind]

    groups: dict[tuple[str, str], list] = defaultdict(list)
    for _, row in df.iterrows():
        src_nt = src_type_map.get(row[source_field], "Struct")
        tgt_nt = tgt_type_map.get(row[target_field], "Trait")
        groups[(src_nt, tgt_nt)].append(row.to_dict())

    for (src_nt, tgt_nt), rows in groups.items():
        sub_df = pd.DataFrame(rows)
        graph.add_connections(
            data=sub_df, connection_type=conn_type,
            source_type=src
_add_typed_connections_same function · python · L834-L854 (21 LOC)
kglite/code_tree/builder.py
def _add_typed_connections_same(graph, df, conn_type, source_field, target_field,
                                 classes):
    """Add connections between same-type entities (e.g. Class EXTENDS Class)."""
    type_map = {}
    for c in classes:
        type_map[c.name] = NODE_TYPE_MAP[c.kind]
        type_map[c.qualified_name] = NODE_TYPE_MAP[c.kind]

    groups: dict[tuple[str, str], list] = defaultdict(list)
    for _, row in df.iterrows():
        src_nt = type_map.get(row[source_field], "Class")
        tgt_nt = type_map.get(row[target_field], "Class")
        groups[(src_nt, tgt_nt)].append(row.to_dict())

    for (src_nt, tgt_nt), rows in groups.items():
        sub_df = pd.DataFrame(rows)
        graph.add_connections(
            data=sub_df, connection_type=conn_type,
            source_type=src_nt, source_id_field=source_field,
            target_type=tgt_nt, target_id_field=target_field,
        )
_add_has_method_connections function · python · L857-L898 (42 LOC)
kglite/code_tree/builder.py
def _add_has_method_connections(graph, df, classes, interfaces=None):
    """Add HAS_METHOD connections, routing through correct source node type."""
    type_map = {}
    for c in classes:
        type_map[c.name] = NODE_TYPE_MAP[c.kind]
        type_map[c.qualified_name] = NODE_TYPE_MAP[c.kind]
    if interfaces:
        for i in interfaces:
            type_map[i.name] = NODE_TYPE_MAP[i.kind]
            type_map[i.qualified_name] = NODE_TYPE_MAP[i.kind]

    schema = graph.schema()
    # Pick a default type from what's available
    default_type = None
    for candidate in ("Class", "Struct", "Trait", "Interface", "Protocol"):
        if candidate in schema["node_types"]:
            default_type = candidate
            break
    if default_type is None:
        return

    groups: dict[str, list] = defaultdict(list)
    for _, row in df.iterrows():
        owner = row["owner"]
        for sep in ("::", "."):
            if sep in owner:
                name = owner.rsplit(sep, 1)[
Repobility · open methodology · https://repobility.com/research/
_build_uses_type_edges function · python · L901-L956 (56 LOC)
kglite/code_tree/builder.py
def _build_uses_type_edges(
    functions: list[FunctionInfo],
    classes: list,
    enums: list,
    interfaces: list,
) -> dict[str, list[dict]]:
    """Build USES_TYPE edges from functions to types referenced in signatures.

    Scans each function's parameters and return_type for known type names,
    producing (Function)-[:USES_TYPE]->(Struct|Class|Enum|Trait|...) edges.
    """
    # Collect all known type names → (qualified_name, node_type)
    type_lookup: dict[str, tuple[str, str]] = {}
    for c in classes:
        if len(c.name) > 1:  # skip single-char generics
            type_lookup[c.name] = (c.qualified_name, NODE_TYPE_MAP[c.kind])
    for e in enums:
        if len(e.name) > 1:
            type_lookup[e.name] = (e.qualified_name, "Enum")
    for i in interfaces:
        if len(i.name) > 1:
            type_lookup[i.name] = (i.qualified_name, NODE_TYPE_MAP[i.kind])

    if not type_lookup:
        return {}

    # Build regex matching any known type name as a whole wor
_build_ffi_exposes_edges function · python · L959-L1002 (44 LOC)
kglite/code_tree/builder.py
def _build_ffi_exposes_edges(result: ParseResult) -> list[dict]:
    """Build EXPOSES edges from #[pymodule] functions to #[pyclass]/#[pyfunction] items.

    Connects the FFI module entry point to the types and functions it registers,
    showing the cross-language boundary for Maturin/PyO3 projects.
    """
    # Find pymodule functions
    pymodule_fns = [f for f in result.functions
                    if f.metadata.get("is_pymodule")]
    if not pymodule_fns:
        return []

    # Find all pyclass structs and pyfunction functions
    pyclass_qnames = {}
    for c in result.classes:
        if c.metadata.get("is_pyclass"):
            py_name = c.metadata.get("py_name", c.name)
            pyclass_qnames[c.name] = (c.qualified_name, "Struct", py_name)

    pyfunction_qnames = {}
    for f in result.functions:
        if f.metadata.get("ffi_kind") == "pyo3" and not f.is_method and not f.metadata.get("is_pymodule"):
            py_name = f.metadata.get("py_name", f.name)
          
build function · python · L1008-L1149 (142 LOC)
kglite/code_tree/builder.py
def build(
    src_dir: str | Path,
    *,
    save_to: str | Path | None = None,
    verbose: bool = False,
    include_tests: bool = True,
) -> kglite.KnowledgeGraph:
    """Parse a codebase and build a KGLite knowledge graph.

    If a project manifest (pyproject.toml, Cargo.toml) is found, uses it
    to discover source roots and extract project metadata.  Otherwise
    falls back to scanning the entire directory.

    Args:
        src_dir: Path to a source directory or manifest file.
        save_to: Optional path to save the graph as a .kgl file.
        verbose: If True, print progress information.
        include_tests: If True, also parse test directories found in the
            manifest.  Has no effect when no manifest is detected.

    Returns:
        A KnowledgeGraph populated with code entities and relationships.

    Raises:
        FileNotFoundError: If src_dir doesn't exist or contains no
            supported files.

    Example::

        from kglite.code_tree impo
LanguageParser.parse_directory method · python · L38-L48 (11 LOC)
kglite/code_tree/parsers/base.py
    def parse_directory(self, src_root: Path) -> ParseResult:
        """Parse all matching files under src_root."""
        combined = ParseResult()
        source_files = []
        for ext in self.file_extensions:
            source_files.extend(sorted(src_root.rglob(f"*{ext}")))
        print(f"  Found {len(source_files)} {self.language_name} files")
        for filepath in source_files:
            result = self.parse_file(filepath, src_root)
            combined.merge(result)
        return combined
get_type_parameters function · python · L64-L81 (18 LOC)
kglite/code_tree/parsers/base.py
def get_type_parameters(node, source: bytes,
                        node_type: str = "type_parameters") -> str | None:
    """Extract generic/template type parameters from a declaration node.

    Looks for a child of the given node_type (e.g. "type_parameters",
    "type_parameter_list") and returns the inner text with angle brackets
    stripped.  Returns None if no type parameters are found.
    """
    for child in node.children:
        if child.type == node_type:
            text = source[child.start_byte:child.end_byte].decode("utf8")
            # Strip surrounding < > or [ ] if present
            if text.startswith("<") and text.endswith(">"):
                text = text[1:-1].strip()
            elif text.startswith("[") and text.endswith("]"):
                text = text[1:-1].strip()
            return text if text else None
    return None
extract_parameters_from_signature function · python · L87-L111 (25 LOC)
kglite/code_tree/parsers/base.py
def extract_parameters_from_signature(signature: str) -> str | None:
    """Extract the parameter list from a function signature string.

    Finds the first balanced (...) group, filters out self/cls, and
    returns the cleaned parameter text or None if empty.
    """
    start = signature.find("(")
    if start == -1:
        return None
    depth = 0
    end = start
    for i in range(start, len(signature)):
        if signature[i] == "(":
            depth += 1
        elif signature[i] == ")":
            depth -= 1
            if depth == 0:
                end = i
                break
    params_text = signature[start + 1:end].strip()
    if not params_text:
        return None
    parts = [p.strip() for p in params_text.split(",")]
    filtered = [p for p in parts if p and p.strip() not in _SELF_PARAMS]
    return ", ".join(filtered) if filtered else None
extract_comment_annotations function · python · L120-L145 (26 LOC)
kglite/code_tree/parsers/base.py
def extract_comment_annotations(
    root_node,
    source: bytes,
    comment_types: tuple[str, ...] = ("line_comment", "block_comment", "comment"),
) -> list[dict] | None:
    """Recursively scan all comment nodes for TODO/FIXME/etc annotations.

    Returns a list of dicts with keys: kind, text, line.
    Returns None if no annotations found.
    """
    annotations: list[dict] = []

    def walk(node):
        if node.type in comment_types:
            text = source[node.start_byte:node.end_byte].decode("utf8")
            for match in _ANNOTATION_PATTERN.finditer(text):
                annotations.append({
                    "kind": match.group(1).upper(),
                    "text": match.group(2).strip()[:200],
                    "line": node.start_point[0] + 1,
                })
        for child in node.children:
            walk(child)

    walk(root_node)
    return annotations if annotations else None
_BaseCCppParser._get_name method · python · L58-L64 (7 LOC)
kglite/code_tree/parsers/cpp.py
    def _get_name(self, node, source: bytes,
                  name_type: str = "identifier") -> str | None:
        for child in node.children:
            if child.type in (name_type, "type_identifier",
                              "field_identifier"):
                return node_text(child, source)
        return None
Source: Repobility analyzer · https://repobility.com
_BaseCCppParser._get_doc_comment method · python · L66-L99 (34 LOC)
kglite/code_tree/parsers/cpp.py
    def _get_doc_comment(self, node, source: bytes) -> str | None:
        """Walk backward to collect /** */ or /// doc comments."""
        sibling = node.prev_named_sibling
        if sibling and sibling.type == "comment":
            text = node_text(sibling, source).strip()
            # Block comment: /** ... */
            if text.startswith("/**"):
                text = text[3:]
                if text.endswith("*/"):
                    text = text[:-2]
                lines = []
                for line in text.split("\n"):
                    line = line.strip()
                    if line.startswith("* "):
                        line = line[2:]
                    elif line.startswith("*"):
                        line = line[1:]
                    lines.append(line)
                return "\n".join(lines).strip()
            # Line comment: ///
            if text.startswith("///"):
                doc_lines = []
                while sibling is not None and sibling.typ
_BaseCCppParser._get_signature method · python · L101-L107 (7 LOC)
kglite/code_tree/parsers/cpp.py
    def _get_signature(self, node, source: bytes) -> str:
        parts = []
        for child in node.children:
            if child.type in ("compound_statement", "field_declaration_list"):
                break
            parts.append(node_text(child, source))
        return " ".join(parts)
_BaseCCppParser._get_return_type method · python · L109-L120 (12 LOC)
kglite/code_tree/parsers/cpp.py
    def _get_return_type(self, node, source: bytes) -> str | None:
        """Return type is the first type-like child before the declarator."""
        for child in node.children:
            if child.type in ("function_declarator", "identifier",
                              "pointer_declarator"):
                break
            if child.type in ("primitive_type", "type_identifier",
                              "sized_type_specifier", "struct_specifier",
                              "enum_specifier", "union_specifier",
                              "type_qualifier"):
                return node_text(child, source)
        return None
_BaseCCppParser._has_storage_class method · python · L122-L128 (7 LOC)
kglite/code_tree/parsers/cpp.py
    def _has_storage_class(self, node, source: bytes,
                            specifier: str) -> bool:
        for child in node.children:
            if child.type == "storage_class_specifier":
                if node_text(child, source) == specifier:
                    return True
        return False
_BaseCCppParser._extract_calls method · python · L134-L179 (46 LOC)
kglite/code_tree/parsers/cpp.py
    def _extract_calls(self, body_node, source: bytes) -> list[tuple[str, int]]:
        """Extract function/method names called directly within a block.

        Emits qualified calls where possible: "Receiver.method" for
        field/member access and "Type.method" for scoped identifiers.
        Returns list of (call_name, line_number) tuples.

        Scope-aware: does not descend into nested functions or lambdas —
        their calls belong to them, not the parent.
        """
        calls: list[tuple[str, int]] = []

        def walk(node):
            if node.type == "call_expression":
                line = node.start_point[0] + 1
                func = node.children[0] if node.children else None
                if func:
                    if func.type == "identifier":
                        calls.append((node_text(func, source), line))
                    elif func.type == "field_expression":
                        field = func.child_by_field_name("field")
               
_BaseCCppParser._file_to_module_path method · python · L181-L191 (11 LOC)
kglite/code_tree/parsers/cpp.py
    def _file_to_module_path(self, filepath: Path, src_root: Path) -> str:
        rel = filepath.relative_to(src_root)
        parts = list(rel.parts)
        # Strip extension from last part
        name = parts[-1]
        for ext in (".c", ".h", ".cpp", ".cc", ".cxx",
                    ".hpp", ".hh", ".hxx"):
            if name.endswith(ext):
                parts[-1] = name[:-len(ext)]
                break
        return "/".join(parts)
_BaseCCppParser._extract_struct_fields method · python · L193-L232 (40 LOC)
kglite/code_tree/parsers/cpp.py
    def _extract_struct_fields(self, node, source: bytes,
                                owner_qname: str,
                                rel_path: str) -> list[AttributeInfo]:
        """Extract fields from a struct/union field_declaration_list."""
        attrs: list[AttributeInfo] = []
        for child in node.children:
            if child.type == "field_declaration_list":
                for field in child.children:
                    if field.type == "field_declaration":
                        type_ann = None
                        names: list[str] = []
                        for fc in field.children:
                            if fc.type in ("primitive_type",
                                           "type_identifier",
                                           "sized_type_specifier",
                                           "struct_specifier",
                                           "enum_specifier",
                                           "union_specifier"):
 
_BaseCCppParser._get_enum_variants method · python · L234-L243 (10 LOC)
kglite/code_tree/parsers/cpp.py
    def _get_enum_variants(self, node, source: bytes) -> list[str]:
        variants: list[str] = []
        for child in node.children:
            if child.type == "enumerator_list":
                for sub in child.children:
                    if sub.type == "enumerator":
                        name = self._get_name(sub, source)
                        if name:
                            variants.append(name)
        return variants
All rows above produced by Repobility · https://repobility.com
_BaseCCppParser._parse_function method · python · L247-L302 (56 LOC)
kglite/code_tree/parsers/cpp.py
    def _parse_function(self, node, source: bytes, module_path: str,
                        rel_path: str, is_method: bool = False,
                        owner: str | None = None) -> FunctionInfo:
        """Parse a function_definition or declaration node."""
        # Extract name from the declarator
        name = "unknown"
        declarator = None
        for child in node.children:
            if child.type in ("function_declarator", "pointer_declarator"):
                declarator = child
                break

        if declarator:
            # Unwrap pointer_declarator
            while declarator and declarator.type == "pointer_declarator":
                for c in declarator.children:
                    if c.type == "function_declarator":
                        declarator = c
                        break
                else:
                    break
            if declarator:
                fn_name = self._get_name(declarator, source)
                if fn_name:
 
_BaseCCppParser._parse_struct method · python · L304-L324 (21 LOC)
kglite/code_tree/parsers/cpp.py
    def _parse_struct(self, node, source: bytes, module_path: str,
                      rel_path: str, result: ParseResult):
        """Parse a struct_specifier (standalone or in type_definition)."""
        name = self._get_name(node, source, "type_identifier")
        if not name:
            return
        sep = "::" if self.language_name == "cpp" else "/"
        qname = f"{module_path}{sep}{name}"

        result.classes.append(ClassInfo(
            name=name,
            qualified_name=qname,
            kind="struct",
            visibility="public",
            file_path=rel_path,
            line_number=node.start_point[0] + 1,
            end_line=node.end_point[0] + 1,
            docstring=self._get_doc_comment(node, source),
        ))
        result.attributes.extend(
            self._extract_struct_fields(node, source, qname, rel_path))
_BaseCCppParser._parse_enum method · python · L326-L342 (17 LOC)
kglite/code_tree/parsers/cpp.py
    def _parse_enum(self, node, source: bytes, module_path: str,
                    rel_path: str, result: ParseResult):
        """Parse an enum_specifier."""
        name = self._get_name(node, source, "type_identifier")
        if not name:
            return
        sep = "::" if self.language_name == "cpp" else "/"
        result.enums.append(EnumInfo(
            name=name,
            qualified_name=f"{module_path}{sep}{name}",
            visibility="public",
            file_path=rel_path,
            line_number=node.start_point[0] + 1,
            end_line=node.end_point[0] + 1,
            docstring=self._get_doc_comment(node, source),
            variants=self._get_enum_variants(node, source),
        ))
page 1 / 20next ›