Function bodies 1,000 total
RustParser._extract_py_name method · python · L126-L133 (8 LOC)kglite/code_tree/parsers/rust.py
def _extract_py_name(self, attrs: list[str], keyword: str = "#[pyclass") -> str | None:
"""Extract name="X" from #[pyclass(name = "X")] or similar."""
for a in attrs:
if keyword in a:
m = self._PY_NAME_RE.search(a)
if m:
return m.group(1)
return NoneRustParser._get_return_type method · python · L143-L150 (8 LOC)kglite/code_tree/parsers/rust.py
def _get_return_type(self, node, source: bytes) -> str | None:
saw_arrow = False
for child in node.children:
if not child.is_named and node_text(child, source) == "->":
saw_arrow = True
elif saw_arrow and child.type != "block":
return node_text(child, source)
return NoneRustParser._get_signature method · python · L152-L158 (7 LOC)kglite/code_tree/parsers/rust.py
def _get_signature(self, node, source: bytes) -> str:
parts = []
for child in node.children:
if child.type == "block":
break
parts.append(node_text(child, source))
return " ".join(parts)RustParser._is_async_fn method · python · L160-L166 (7 LOC)kglite/code_tree/parsers/rust.py
def _is_async_fn(self, node, source: bytes) -> bool:
for child in node.children:
if not child.is_named and node_text(child, source) == "async":
return True
if child.type == "identifier" or node_text(child, source) == "fn":
break
return FalseRustParser._extract_type_name_from_node method · python · L174-L192 (19 LOC)kglite/code_tree/parsers/rust.py
def _extract_type_name_from_node(self, node, source: bytes) -> str | None:
"""Extract the base type name from a type node.
Handles: type_identifier, generic_type (e.g. Foo<T>),
and scoped_type_identifier (e.g. crate::module::Foo).
"""
if node.type == "type_identifier":
return node_text(node, source)
elif node.type == "generic_type":
for child in node.children:
if child.type == "type_identifier":
return node_text(child, source)
elif child.type == "scoped_type_identifier":
return self._extract_type_name_from_node(child, source)
elif node.type == "scoped_type_identifier":
for child in reversed(node.children):
if child.type == "type_identifier":
return node_text(child, source)
return NoneRustParser._extract_calls method · python · L198-L257 (60 LOC)kglite/code_tree/parsers/rust.py
def _extract_calls(self, body_node, source: bytes) -> list[tuple[str, int]]:
"""Extract function/method names called directly within a block.
Emits qualified calls where possible: "Receiver.method" for
field expressions and "Type.method" for scoped identifiers.
Returns list of (call_name, line_number) tuples.
Scope-aware: does not descend into nested functions or closures —
their calls belong to them, not the parent.
Receiver hints use the syntactic field name, not the resolved type.
For example, ``self.inner.has_index()`` emits ``("inner.has_index", line)``
because the parser sees the field name ``inner``, not its type
``DirGraph``. Downstream call-edge resolution therefore cannot match
this to ``DirGraph::has_index`` — resolving field names to types would
require full type inference, which is out of scope for a tree-sitter
based parser.
"""
calls: list[tupleRustParser._file_to_module_path method · python · L259-L267 (9 LOC)kglite/code_tree/parsers/rust.py
def _file_to_module_path(self, filepath: Path, src_root: Path) -> str:
rel = filepath.relative_to(src_root)
parts = list(rel.parts)
parts[-1] = parts[-1].replace(".rs", "")
if parts[-1] in ("mod", "lib"):
parts = parts[:-1]
if not parts:
return "crate"
return "crate::" + "::".join(parts)Repobility · open methodology · https://repobility.com/research/
RustParser._extract_struct_fields method · python · L269-L301 (33 LOC)kglite/code_tree/parsers/rust.py
def _extract_struct_fields(self, node, source: bytes,
owner_qname: str, rel_path: str) -> list:
"""Extract fields from a struct's field_declaration_list."""
attrs = []
for child in node.children:
if child.type == "field_declaration_list":
for field in child.children:
if field.type == "field_declaration":
name = None
type_ann = None
vis = "private"
saw_colon = False
for fc in field.children:
if fc.type == "visibility_modifier":
text = node_text(fc, source)
vis = "pub(crate)" if "crate" in text else "pub"
elif fc.type in ("field_identifier", "identifier") and not saw_colon:
name = node_text(fc, source)
RustParser._get_enum_variants method · python · L303-L326 (24 LOC)kglite/code_tree/parsers/rust.py
def _get_enum_variants(self, node, source: bytes) -> tuple[list[str], list[dict]]:
"""Extract variant names and structured details from an enum_variant_list."""
names = []
details = []
for child in node.children:
if child.type == "enum_variant_list":
for variant in child.children:
if variant.type == "enum_variant":
name = self._get_name(variant, source, "identifier")
if not name:
continue
names.append(name)
detail: dict = {"name": name, "kind": "unit"}
for vc in variant.children:
if vc.type == "field_declaration_list":
# Struct variant: Variant { field: Type }
detail["kind"] = "struct"
detail["fields"] = self._extract_variant_struct_fieRustParser._extract_variant_struct_fields method · python · L328-L348 (21 LOC)kglite/code_tree/parsers/rust.py
def _extract_variant_struct_fields(self, field_list, source: bytes) -> list[dict]:
"""Extract named fields from a struct enum variant."""
fields = []
for child in field_list.children:
if child.type == "field_declaration":
name = None
type_ann = None
saw_colon = False
for fc in child.children:
if fc.type in ("field_identifier", "identifier") and not saw_colon:
name = node_text(fc, source)
elif not fc.is_named and node_text(fc, source) == ":":
saw_colon = True
elif saw_colon and type_ann is None and fc.is_named:
type_ann = node_text(fc, source)
if name:
entry: dict = {"name": name}
if type_ann:
entry["type"] = type_ann
fields.append(entry)
return fieRustParser._extract_variant_tuple_fields method · python · L350-L356 (7 LOC)kglite/code_tree/parsers/rust.py
def _extract_variant_tuple_fields(self, field_list, source: bytes) -> list[dict]:
"""Extract positional types from a tuple enum variant."""
fields = []
for child in field_list.children:
if child.is_named and child.type != "visibility_modifier":
fields.append({"type": node_text(child, source)})
return fieldsRustParser._parse_function method · python · L360-L420 (61 LOC)kglite/code_tree/parsers/rust.py
def _parse_function(self, node, source: bytes, module_path: str,
file_path: str, is_method: bool = False,
owner: str | None = None,
impl_is_pymethods: bool = False) -> FunctionInfo:
name = self._get_name(node, source, "identifier") or "unknown"
prefix = f"{module_path}::{owner}" if owner else module_path
qualified_name = f"{prefix}::{name}"
attrs = self._get_attributes(node, source)
body = None
for child in node.children:
if child.type == "block":
body = child
break
is_pymethod = self._is_pymethod_fn(attrs, impl_is_pymethods)
is_ffi = any("#[no_mangle]" in a for a in attrs)
is_test = any(a in ("#[test]", "#[bench]") or
"#[tokio::test" in a or "#[rstest" in a
for a in attrs)
ffi_kind = None
if is_pymethod:
ffi_kind = "pyo3"RustParser.parse_file method · python · L648-L675 (28 LOC)kglite/code_tree/parsers/rust.py
def parse_file(self, filepath: Path, src_root: Path) -> ParseResult:
source = filepath.read_bytes()
tree = self._parser.parse(source)
root = tree.root_node
rel_path = str(filepath.relative_to(src_root))
module_path = self._file_to_module_path(filepath, src_root)
loc = count_lines(source)
file_info = FileInfo(
path=rel_path,
filename=filepath.name,
loc=loc,
module_path=module_path,
language="rust",
)
stem = filepath.stem
if (stem.endswith("_test") or stem.endswith("_tests")
or stem.startswith("test_")
or "/tests/" in rel_path or rel_path.startswith("tests/")
or "/benches/" in rel_path or rel_path.startswith("benches/")):
file_info.is_test = True
result = ParseResult()
result.files.append(file_info)
self._parse_items(root, source, module_path, rel_path, file_in_BaseJSTSParser._get_name method · python · L63-L70 (8 LOC)kglite/code_tree/parsers/typescript.py
def _get_name(self, node, source: bytes,
name_type: str = "identifier") -> str:
"""Get the first identifier-like child as the item name."""
for child in node.children:
if child.type in (name_type, "type_identifier",
"property_identifier"):
return node_text(child, source)
return "unknown"_BaseJSTSParser._get_visibility method · python · L72-L87 (16 LOC)kglite/code_tree/parsers/typescript.py
def _get_visibility(self, node, source: bytes) -> str:
"""Determine visibility: 'export' if exported, else 'private'."""
# Check if this node or its parent is an export_statement
parent = node.parent
if parent and parent.type == "export_statement":
return "export"
# Check for accessibility modifiers inside the node
for child in node.children:
if child.type == "accessibility_modifier":
text = node_text(child, source)
if text == "private":
return "private"
elif text == "protected":
return "protected"
return "public"
return "private"All rows above produced by Repobility · https://repobility.com
_BaseJSTSParser._get_block method · python · L89-L94 (6 LOC)kglite/code_tree/parsers/typescript.py
def _get_block(self, node):
"""Get the body/block of a definition."""
for child in node.children:
if child.type in ("statement_block", "class_body"):
return child
return None_BaseJSTSParser._get_docstring method · python · L96-L115 (20 LOC)kglite/code_tree/parsers/typescript.py
def _get_docstring(self, node, source: bytes) -> str | None:
"""Extract JSDoc comment before a node."""
sibling = node.prev_named_sibling
if sibling and sibling.type == "comment":
text = node_text(sibling, source).strip()
if text.startswith("/**"):
# Strip /** ... */ delimiters and leading * on each line
text = text[3:]
if text.endswith("*/"):
text = text[:-2]
lines = []
for line in text.split("\n"):
line = line.strip()
if line.startswith("* "):
line = line[2:]
elif line.startswith("*"):
line = line[1:]
lines.append(line)
return "\n".join(lines).strip()
return None_BaseJSTSParser._get_heritage method · python · L117-L148 (32 LOC)kglite/code_tree/parsers/typescript.py
def _get_heritage(self, node, source: bytes) -> tuple[list[str], list[str]]:
"""Extract extends and implements from a class/interface declaration.
Returns (extends_list, implements_list)
"""
extends = []
implements = []
current_list = None
for child in node.children:
if child.type == "extends_clause":
current_list = extends
for sub in child.children:
if sub.type in ("identifier", "type_identifier"):
current_list.append(node_text(sub, source))
elif sub.type == "member_expression":
current_list.append(node_text(sub, source))
elif child.type == "implements_clause":
current_list = implements
for sub in child.children:
if sub.type in ("identifier", "type_identifier"):
current_list.append(node_text(sub, source))
_BaseJSTSParser._get_signature method · python · L150-L157 (8 LOC)kglite/code_tree/parsers/typescript.py
def _get_signature(self, node, source: bytes) -> str:
"""Extract function/method signature (everything before the body)."""
parts = []
for child in node.children:
if child.type in ("statement_block", "class_body"):
break
parts.append(node_text(child, source))
return " ".join(parts)_BaseJSTSParser._get_return_type method · python · L159-L168 (10 LOC)kglite/code_tree/parsers/typescript.py
def _get_return_type(self, node, source: bytes) -> str | None:
"""Extract return type annotation from a function."""
for child in node.children:
if child.type == "type_annotation":
# Skip the ":" prefix
text = node_text(child, source)
if text.startswith(":"):
text = text[1:].strip()
return text
return None_BaseJSTSParser._is_async method · python · L170-L176 (7 LOC)kglite/code_tree/parsers/typescript.py
def _is_async(self, node, source: bytes) -> bool:
for child in node.children:
if not child.is_named and node_text(child, source) == "async":
return True
if not child.is_named and node_text(child, source) in ("function", "("):
break
return False_BaseJSTSParser._extract_calls method · python · L185-L235 (51 LOC)kglite/code_tree/parsers/typescript.py
def _extract_calls(self, body_node, source: bytes) -> list[tuple[str, int]]:
"""Extract function/method names called directly within a block.
Emits qualified calls where possible: "receiver.method" for
member expressions, bare names for this/super and plain calls.
Returns list of (call_name, line_number) tuples.
Scope-aware: does not descend into nested functions, arrow functions,
or method definitions — their calls belong to them, not the parent.
"""
calls: list[tuple[str, int]] = []
def walk(node):
if node.type == "call_expression":
line = node.start_point[0] + 1
func = node.children[0] if node.children else None
if func:
if func.type == "identifier":
calls.append((node_text(func, source), line))
elif func.type == "member_expression":
prop = func.child_by_field_BaseJSTSParser._file_to_module_path method · python · L237-L250 (14 LOC)kglite/code_tree/parsers/typescript.py
def _file_to_module_path(self, filepath: Path, src_root: Path) -> str:
rel = filepath.relative_to(src_root)
parts = list(rel.parts)
# Strip extension from last part
for ext in (".ts", ".tsx", ".js", ".jsx", ".mjs"):
if parts[-1].endswith(ext):
parts[-1] = parts[-1][:-len(ext)]
break
# index files represent the directory
if parts[-1] == "index":
parts = parts[:-1]
if not parts:
return src_root.name
return "/".join(parts)Repobility · severity-and-effort ranking · https://repobility.com
_BaseJSTSParser._get_decorators method · python · L252-L268 (17 LOC)kglite/code_tree/parsers/typescript.py
def _get_decorators(self, node, source: bytes) -> list[str]:
"""Walk backward through siblings to collect decorator nodes."""
decorators = []
sibling = node.prev_named_sibling
while sibling is not None:
if sibling.type == "decorator":
text = node_text(sibling, source).strip()
if text.startswith("@"):
text = text[1:]
decorators.insert(0, text)
sibling = sibling.prev_named_sibling
continue
elif sibling.type == "comment":
sibling = sibling.prev_named_sibling
continue
break
return decorators_BaseJSTSParser._extract_class_fields method · python · L270-L316 (47 LOC)kglite/code_tree/parsers/typescript.py
def _extract_class_fields(self, body, source: bytes,
rel_path: str, owner_qualified: str,
result: ParseResult):
"""Extract class field/property declarations as AttributeInfo."""
if body is None:
return
for child in body.children:
if child.type not in ("public_field_definition",
"property_declaration",
"field_definition"):
continue
# Get field name
name = None
type_ann = None
default_val = None
visibility = "private"
for sub in child.children:
if sub.type in ("property_identifier", "identifier"):
if name is None:
name = node_text(sub, source)
elif sub.type == "type_annotation":
text = node_text(sub, source)
if_BaseJSTSParser._get_enum_members method · python · L318-L334 (17 LOC)kglite/code_tree/parsers/typescript.py
def _get_enum_members(self, node, source: bytes) -> list[str]:
"""Extract member names from an enum body."""
members = []
body = self._get_block(node)
if body is None:
# Try enum_body specifically
for child in node.children:
if child.type == "enum_body":
body = child
break
if body:
for child in body.children:
if child.type in ("enum_assignment", "property_identifier"):
members.append(node_text(child, source).split("=")[0].strip())
elif child.type == "identifier":
members.append(node_text(child, source))
return members_BaseJSTSParser._parse_function method · python · L338-L373 (36 LOC)kglite/code_tree/parsers/typescript.py
def _parse_function(self, node, source: bytes, module_path: str,
rel_path: str, is_method: bool = False,
owner: str | None = None) -> FunctionInfo:
name = self._get_name(node, source)
if owner:
prefix = f"{module_path}.{owner}"
else:
prefix = module_path
qualified_name = f"{prefix}.{name}"
block = self._get_block(node)
# Detect static methods
metadata: dict = {}
for child in node.children:
if not child.is_named and node_text(child, source) == "static":
metadata["is_static"] = True
break
return FunctionInfo(
name=name,
qualified_name=qualified_name,
visibility=self._get_visibility(node, source),
is_async=self._is_async(node, source),
is_method=is_method,
signature=self._get_signature(node, source),
file_path=rel__BaseJSTSParser._parse_class method · python · L375-L448 (74 LOC)kglite/code_tree/parsers/typescript.py
def _parse_class(self, node, source: bytes, module_path: str,
rel_path: str, result: ParseResult):
"""Parse a class_declaration node."""
name = self._get_name(node, source)
qualified_name = f"{module_path}.{name}"
extends, implements = self._get_heritage(node, source)
docstring = self._get_docstring(node, source)
decorators = self._get_decorators(node, source)
metadata: dict = {}
if decorators:
metadata["decorators"] = decorators
result.classes.append(ClassInfo(
name=name,
qualified_name=qualified_name,
kind="class",
visibility=self._get_visibility(node, source),
file_path=rel_path,
line_number=node.start_point[0] + 1,
docstring=docstring,
end_line=node.end_point[0] + 1,
bases=extends,
type_parameters=get_type_parameters(node, source),
metadata=me_BaseJSTSParser._parse_interface method · python · L450-L496 (47 LOC)kglite/code_tree/parsers/typescript.py
def _parse_interface(self, node, source: bytes, module_path: str,
rel_path: str, result: ParseResult):
"""Parse an interface_declaration node (TypeScript only)."""
name = self._get_name(node, source)
qualified_name = f"{module_path}.{name}"
extends, _ = self._get_heritage(node, source)
docstring = self._get_docstring(node, source)
result.interfaces.append(InterfaceInfo(
name=name,
qualified_name=qualified_name,
kind="interface",
visibility=self._get_visibility(node, source),
file_path=rel_path,
line_number=node.start_point[0] + 1,
docstring=docstring,
end_line=node.end_point[0] + 1,
type_parameters=get_type_parameters(node, source),
))
# Interface extends edges (interface extending another interface)
for base in extends:
result.type_relationships.append(TypeRelations_BaseJSTSParser._parse_enum method · python · L498-L513 (16 LOC)kglite/code_tree/parsers/typescript.py
def _parse_enum(self, node, source: bytes, module_path: str,
rel_path: str, result: ParseResult):
"""Parse an enum_declaration node."""
name = self._get_name(node, source)
qualified_name = f"{module_path}.{name}"
result.enums.append(EnumInfo(
name=name,
qualified_name=qualified_name,
visibility=self._get_visibility(node, source),
file_path=rel_path,
line_number=node.start_point[0] + 1,
docstring=self._get_docstring(node, source),
end_line=node.end_point[0] + 1,
variants=self._get_enum_members(node, source),
))_BaseJSTSParser._parse_top_level method · python · L515-L629 (115 LOC)kglite/code_tree/parsers/typescript.py
def _parse_top_level(self, node, source: bytes, module_path: str,
rel_path: str, result: ParseResult,
file_info: FileInfo):
"""Parse a top-level AST node."""
if node.type in ("function_declaration", "function"):
result.functions.append(self._parse_function(
node, source, module_path, rel_path,
))
elif node.type == "class_declaration":
self._parse_class(node, source, module_path, rel_path, result)
elif node.type == "interface_declaration":
self._parse_interface(node, source, module_path, rel_path, result)
elif node.type == "enum_declaration":
self._parse_enum(node, source, module_path, rel_path, result)
elif node.type == "export_statement":
# Track exported names
for child in node.children:
if child.type in ("function_declaration", "class_declaration",
Repobility · MCP-ready · https://repobility.com
_BaseJSTSParser.parse_file method · python · L631-L666 (36 LOC)kglite/code_tree/parsers/typescript.py
def parse_file(self, filepath: Path, src_root: Path) -> ParseResult:
source = filepath.read_bytes()
tree = self._parser.parse(source)
root = tree.root_node
rel_path = str(filepath.relative_to(src_root))
module_path = self._file_to_module_path(filepath, src_root)
loc = count_lines(source)
file_info = FileInfo(
path=rel_path,
filename=filepath.name,
loc=loc,
module_path=module_path,
language=self.language_name,
)
fname = filepath.name
if (any(fname.endswith(ext) for ext in (
".test.ts", ".spec.ts", ".test.tsx", ".spec.tsx",
".test.js", ".spec.js", ".test.jsx", ".spec.jsx",
".test.mjs", ".spec.mjs"))
or "/__tests__/" in rel_path
or rel_path.startswith("__tests__/")):
file_info.is_test = True
result = ParseResult()
result.files.append(fileEmbeddingModel.load method · python · L69-L75 (7 LOC)kglite/__init__.pyi
def load(self) -> None:
"""(Optional) Load model weights / allocate resources.
Called automatically before ``embed()`` in ``embed_texts()`` and
``search_text()``. If not defined, this step is skipped.
"""
...EmbeddingModel.unload method · python · L77-L84 (8 LOC)kglite/__init__.pyi
def unload(self) -> None:
"""(Optional) Release model weights / free resources.
Called automatically after ``embed_texts()`` and ``search_text()``
complete (including on error). A common pattern is to start a
cooldown timer here instead of releasing immediately.
"""
...ResultView.to_gdf method · python · L154-L175 (22 LOC)kglite/__init__.pyi
def to_gdf(
self,
geometry_column: str = "geometry",
crs: Optional[str] = None,
) -> Any:
"""Convert to a GeoDataFrame with a geometry column parsed from WKT.
Materializes the data as a DataFrame, then converts the specified
WKT string column into shapely geometries and returns a
``geopandas.GeoDataFrame``.
Args:
geometry_column: Column containing WKT strings. Default ``'geometry'``.
crs: Coordinate reference system (e.g. ``'EPSG:4326'``), or ``None``.
Returns:
A ``geopandas.GeoDataFrame``.
Raises:
ImportError: If geopandas is not installed.
"""
...load function · python · L184-L193 (10 LOC)kglite/__init__.pyi
def load(path: str) -> KnowledgeGraph:
"""Load a graph from a binary file previously saved with ``save()``.
Args:
path: Path to the ``.kgl`` file.
Returns:
A new KnowledgeGraph with the loaded data.
"""
...from_blueprint function · python · L196-L227 (32 LOC)kglite/__init__.pyi
def from_blueprint(
blueprint_path: Union[str, Path],
*,
verbose: bool = False,
save: bool = True,
) -> KnowledgeGraph:
"""Build a KnowledgeGraph from a JSON blueprint and CSV files.
The blueprint JSON describes all node types, properties, connections,
timeseries, and data sources. CSV paths in the blueprint are resolved
relative to ``settings.root``.
Args:
blueprint_path: Path to the blueprint JSON file.
verbose: If True, print progress information during loading.
save: If True and the blueprint specifies an ``output`` path,
save the graph to that path after building.
Returns:
A new KnowledgeGraph populated from the blueprint.
Raises:
FileNotFoundError: If the blueprint file is missing.
ValueError: If the blueprint JSON is malformed.
Example::
import kglite
graph = kglite.from_blueprint("blueprint.json", verbose=True)
"""
...KnowledgeGraph.add_nodes method · python · L266-L327 (62 LOC)kglite/__init__.pyi
def add_nodes(
self,
data: pd.DataFrame,
node_type: str,
unique_id_field: str,
node_title_field: Optional[str] = None,
columns: Optional[list[str]] = None,
conflict_handling: Optional[str] = None,
skip_columns: Optional[list[str]] = None,
column_types: Optional[dict[str, str]] = None,
timeseries: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
"""Add nodes from a DataFrame.
String and integer IDs are auto-detected from the DataFrame dtype.
Non-contiguous DataFrame indexes (e.g. from filtering) are handled
automatically.
When ``timeseries`` is provided, the DataFrame may contain multiple
rows per unique ID (one per time step). Rows are deduplicated
automatically — the first occurrence per ID provides static node
properties, and all rows contribute to the timeseries channels.
Args:
data: DataFrame containing nKnowledgeGraph.add_connections method · python · L329-L363 (35 LOC)kglite/__init__.pyi
def add_connections(
self,
data: pd.DataFrame,
connection_type: str,
source_type: str,
source_id_field: str,
target_type: str,
target_id_field: str,
source_title_field: Optional[str] = None,
target_title_field: Optional[str] = None,
columns: Optional[list[str]] = None,
skip_columns: Optional[list[str]] = None,
conflict_handling: Optional[str] = None,
column_types: Optional[dict[str, str]] = None,
) -> dict[str, Any]:
"""Add connections (edges) between existing nodes.
Args:
data: DataFrame containing edge data.
connection_type: Label for this edge type (e.g. ``'KNOWS'``).
source_type: Node type of source nodes.
source_id_field: Column with source node IDs.
target_type: Node type of target nodes.
target_id_field: Column with target node IDs.
source_title_field: Optional title coRepobility · open methodology · https://repobility.com/research/
KnowledgeGraph.add_nodes_bulk method · python · L365-L374 (10 LOC)kglite/__init__.pyi
def add_nodes_bulk(self, nodes: list[dict[str, Any]]) -> dict[str, int]:
"""Add multiple node types at once.
Each dict in *nodes* must contain ``node_type``, ``unique_id_field``,
``node_title_field``, and ``data`` (a DataFrame).
Returns:
Mapping of ``node_type`` to count of nodes added.
"""
...KnowledgeGraph.add_connections_bulk method · python · L376-L385 (10 LOC)kglite/__init__.pyi
def add_connections_bulk(self, connections: list[dict[str, Any]]) -> dict[str, int]:
"""Add multiple connection types at once.
Each dict must contain ``source_type``, ``target_type``,
``connection_name``, and ``data`` (DataFrame with ``source_id``/``target_id`` columns).
Returns:
Mapping of ``connection_name`` to count of connections created.
"""
...KnowledgeGraph.add_connections_from_source method · python · L387-L396 (10 LOC)kglite/__init__.pyi
def add_connections_from_source(self, connections: list[dict[str, Any]]) -> dict[str, int]:
"""Add connections, auto-filtering to types already loaded in the graph.
Same spec format as :meth:`add_connections_bulk`, but silently skips
connection specs whose source or target type is not in the graph.
Returns:
Mapping of ``connection_name`` to count of connections created.
"""
...KnowledgeGraph.type_filter method · python · L402-L418 (17 LOC)kglite/__init__.pyi
def type_filter(
self,
node_type: str,
sort: Optional[Union[str, list[tuple[str, bool]]]] = None,
max_nodes: Optional[int] = None,
) -> KnowledgeGraph:
"""Select all nodes of a given type.
Args:
node_type: The node type to select (e.g. ``'Person'``).
sort: Optional sort spec — a property name or list of ``(field, ascending)`` tuples.
max_nodes: Limit the number of selected nodes.
Returns:
A new KnowledgeGraph with the filtered selection.
"""
...KnowledgeGraph.filter method · python · L420-L446 (27 LOC)kglite/__init__.pyi
def filter(
self,
conditions: dict[str, Any],
sort: Optional[Union[str, list[tuple[str, bool]]]] = None,
max_nodes: Optional[int] = None,
) -> KnowledgeGraph:
"""Filter the current selection by property conditions.
Conditions support exact match, comparison operators
(``'>'``, ``'<'``, ``'>='``, ``'<='``), ``'in'``, ``'is_null'``,
``'is_not_null'``, ``'contains'``, ``'starts_with'``, ``'ends_with'``,
``'regex'`` (or ``'=~'``), and negated variants: ``'not_contains'``,
``'not_starts_with'``, ``'not_ends_with'``, ``'not_in'``, ``'not_regex'``.
Example::
graph.type_filter('Person').filter({
'age': {'>': 25},
'city': 'Oslo',
'name': {'regex': '^A.*'},
'status': {'not_in': ['inactive', 'banned']},
})
Returns:
A new KnowledgeGraph with the filtered selection.
"""
...KnowledgeGraph.filter_any method · python · L448-L477 (30 LOC)kglite/__init__.pyi
def filter_any(
self,
conditions: list[dict[str, Any]],
sort: Optional[Union[str, list[tuple[str, bool]]]] = None,
max_nodes: Optional[int] = None,
) -> KnowledgeGraph:
"""Filter the current selection with OR logic across multiple condition sets.
Each dict in *conditions* is a set of AND conditions (same as ``filter()``).
A node is kept if it matches **any** of the condition sets.
Args:
conditions: List of condition dicts. Must contain at least one.
sort: Optional sort spec.
max_nodes: Limit the number of selected nodes.
Returns:
A new KnowledgeGraph with the filtered selection.
Example::
graph.type_filter('Person').filter_any([
{'city': 'Oslo'},
{'city': 'Bergen'},
])
Raises:
ValueError: If *conditions* is empty.
"""
...KnowledgeGraph.filter_orphans method · python · L479-L496 (18 LOC)kglite/__init__.pyi
def filter_orphans(
self,
include_orphans: Optional[bool] = None,
sort: Optional[Union[str, list[tuple[str, bool]]]] = None,
max_nodes: Optional[int] = None,
) -> KnowledgeGraph:
"""Filter nodes based on whether they have connections.
Args:
include_orphans: If ``True``, keep only orphan (disconnected) nodes.
If ``False``, keep only connected nodes. Default ``True``.
sort: Optional sort spec.
max_nodes: Limit the number of selected nodes.
Returns:
A new KnowledgeGraph with the filtered selection.
"""
...KnowledgeGraph.sort method · python · L498-L512 (15 LOC)kglite/__init__.pyi
def sort(
self,
sort: Union[str, list[tuple[str, bool]]],
ascending: Optional[bool] = None,
) -> KnowledgeGraph:
"""Sort the current selection.
Args:
sort: Property name (string) or list of ``(field, ascending)`` tuples.
ascending: Direction when *sort* is a single string. Default ``True``.
Returns:
A new KnowledgeGraph with the sorted selection.
"""
...All rows above produced by Repobility · https://repobility.com
KnowledgeGraph.max_nodes method · python · L514-L523 (10 LOC)kglite/__init__.pyi
def max_nodes(self, max_per_group: int) -> KnowledgeGraph:
"""Limit the number of nodes per parent group.
Args:
max_per_group: Maximum number of nodes to keep per group.
Returns:
A new KnowledgeGraph with the limited selection.
"""
...KnowledgeGraph.offset method · python · L525-L537 (13 LOC)kglite/__init__.pyi
def offset(self, n: int) -> KnowledgeGraph:
"""Skip the first *n* nodes per parent group (pagination).
Combine with ``max_nodes()`` for pagination:
``graph.sort('name').offset(20).max_nodes(10)``
Args:
n: Number of nodes to skip.
Returns:
A new KnowledgeGraph with the offset selection.
"""
...KnowledgeGraph.has_connection method · python · L539-L559 (21 LOC)kglite/__init__.pyi
def has_connection(
self,
connection_type: str,
direction: Optional[str] = None,
) -> KnowledgeGraph:
"""Filter nodes that have at least one connection of the given type.
Keeps only nodes from the current selection that participate in
edges of the specified type and direction.
Args:
connection_type: Edge type to check (e.g. ``'KNOWS'``).
direction: ``'outgoing'``, ``'incoming'``, or ``'any'`` (default).
Returns:
A new KnowledgeGraph with only connected nodes.
Raises:
ValueError: If *direction* is not one of the valid values.
"""
...