Function bodies 10 total
extract_command function · python · L63-L95 (33 LOC)src/aumai_policyminer/cli.py
def extract_command(
logs_path: Path,
output_path: Path | None,
min_support: float,
min_confidence: float,
min_lift: float,
name: str,
) -> None:
"""Extract governance policies from a JSONL behavior log file.
Example:
aumai-policyminer extract --logs behavior.jsonl --min-confidence 0.7
"""
parser = LogParser()
logs = parser.parse_file(logs_path)
click.echo(f"Parsed {len(logs)} valid log entries.")
extractor = PolicyExtractor(
min_support=min_support,
min_confidence=min_confidence,
min_lift=min_lift,
)
policy_set = extractor.extract(logs, name=name)
click.echo(f"Mined {len(policy_set.policies)} policies.")
formatter = PolicyFormatter()
dest = output_path or logs_path.parent / "policies.json"
formatter.to_json_file(policy_set, dest)
click.echo(f"Saved policy set to {dest}")
# Print a brief summary
click.echo(formatter.to_text(policy_set, max_policies=10))format_command function · python · L121-L144 (24 LOC)src/aumai_policyminer/cli.py
def format_command(
policies_path: Path, output_format: str, max_policies: int
) -> None:
"""Render a JSON policy set as text, Markdown, or JSON.
Example:
aumai-policyminer format --policies policies.json --output-format markdown
"""
try:
data = json.loads(policies_path.read_text(encoding="utf-8"))
policy_set = PolicySet.model_validate(data)
except Exception as exc:
click.echo(f"ERROR loading policy set: {exc}", err=True)
sys.exit(1)
formatter = PolicyFormatter()
if output_format == "text":
click.echo(formatter.to_text(policy_set, max_policies=max_policies))
elif output_format == "markdown":
click.echo(formatter.to_markdown(policy_set, max_policies=max_policies))
else:
click.echo(policy_set.model_dump_json(indent=2))LogParser.parse_file method · python · L35-L57 (23 LOC)src/aumai_policyminer/core.py
def parse_file(self, path: Path) -> list[BehaviorLog]:
"""Parse a JSONL file and return validated BehaviorLog objects.
Args:
path: Path to a JSONL file where each line is a BehaviorLog.
Returns:
List of BehaviorLog instances (invalid lines skipped).
"""
logs: list[BehaviorLog] = []
self.skipped_count = 0
with path.open(encoding="utf-8") as fh:
for line_number, line in enumerate(fh, start=1):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
logs.append(BehaviorLog.model_validate(data))
except Exception:
self.skipped_count += 1
continue
return logsLogParser.parse_list method · python · L59-L74 (16 LOC)src/aumai_policyminer/core.py
def parse_list(self, records: list[dict[str, Any]]) -> list[BehaviorLog]:
"""Parse a list of raw dicts into BehaviorLog objects.
Args:
records: List of raw dictionaries.
Returns:
List of validated BehaviorLog instances.
"""
logs: list[BehaviorLog] = []
for record in records:
try:
logs.append(BehaviorLog.model_validate(record))
except Exception:
continue
return logsPolicyExtractor.__init__ method · python · L98-L113 (16 LOC)src/aumai_policyminer/core.py
def __init__(
self,
min_support: float = 0.05,
min_confidence: float = 0.6,
min_lift: float = 1.0,
) -> None:
"""Initialise the extractor with threshold parameters.
Args:
min_support: Minimum support fraction for a rule to be returned.
min_confidence: Minimum confidence fraction.
min_lift: Minimum lift value (1.0 means no filtering by lift).
"""
self.min_support = min_support
self.min_confidence = min_confidence
self.min_lift = min_liftPolicyExtractor.extract method · python · L115-L182 (68 LOC)src/aumai_policyminer/core.py
def extract(self, logs: list[BehaviorLog], name: str = "Mined Policy Set") -> PolicySet:
"""Mine association rules from a list of behavior logs.
Args:
logs: List of BehaviorLog objects to analyse.
name: Name for the resulting PolicySet.
Returns:
PolicySet populated with discovered policies.
"""
total = len(logs)
if total == 0:
return PolicySet(name=name, source_logs=0)
# Count action frequencies
action_counts: Counter[str] = Counter(log.action for log in logs)
# Build (context_key, context_value, action) co-occurrence counts
# antecedent = (key, value) pair from context
# consequent = action
antecedent_counts: Counter[tuple[str, str]] = Counter()
cooccurrence_counts: Counter[tuple[str, str, str]] = Counter()
for log in logs:
for key, value in log.context.items():
str_val = str(value)
PolicyFormatter.to_text method · python · L199-L223 (25 LOC)src/aumai_policyminer/core.py
def to_text(self, policy_set: PolicySet, max_policies: int = 50) -> str:
"""Render policies as a plain-text report.
Args:
policy_set: The PolicySet to render.
max_policies: Maximum number of policies to include.
Returns:
Formatted string report.
"""
lines: list[str] = [
f"Policy Set: {policy_set.name}",
f"Source logs: {policy_set.source_logs}",
f"Generated at: {policy_set.generated_at}",
f"Total policies: {len(policy_set.policies)}",
"-" * 60,
]
for policy in policy_set.policies[:max_policies]:
lines.append(f"[{policy.policy_id}] {policy.description}")
lines.append(
f" support={policy.support:.4f} "
f"confidence={policy.confidence:.4f} "
f"lift={policy.lift:.4f}"
)
return "\n".join(lines)Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
PolicyFormatter.to_markdown method · python · L225-L251 (27 LOC)src/aumai_policyminer/core.py
def to_markdown(self, policy_set: PolicySet, max_policies: int = 50) -> str:
"""Render policies as a Markdown table.
Args:
policy_set: The PolicySet to render.
max_policies: Maximum number of policies to include.
Returns:
Markdown-formatted string.
"""
lines: list[str] = [
f"# {policy_set.name}",
"",
f"- **Source logs:** {policy_set.source_logs}",
f"- **Generated at:** {policy_set.generated_at}",
f"- **Total policies:** {len(policy_set.policies)}",
"",
"| ID | Antecedent | Consequent | Support | Confidence | Lift |",
"|----|-----------|-----------|---------|------------|------|",
]
for policy in policy_set.policies[:max_policies]:
antecedent_str = ", ".join(f"{k}={v}" for k, v in policy.antecedent.items())
lines.append(
f"| {policy.policy_id} | {antecedentPolicyFormatter.to_json_file method · python · L253-L260 (8 LOC)src/aumai_policyminer/core.py
def to_json_file(self, policy_set: PolicySet, path: Path) -> None:
"""Serialise a PolicySet to a JSON file.
Args:
policy_set: The PolicySet to serialise.
path: Destination file path.
"""
path.write_text(policy_set.model_dump_json(indent=2), encoding="utf-8")PolicySet.top_policies method · python · L76-L85 (10 LOC)src/aumai_policyminer/models.py
def top_policies(self, n: int = 10) -> list[MinedPolicy]:
"""Return the top-n policies sorted by confidence descending.
Args:
n: Maximum number of policies to return.
Returns:
List of MinedPolicy objects.
"""
return sorted(self.policies, key=lambda p: p.confidence, reverse=True)[:n]