← back to invincible-jha__aumai-policyminer

Function bodies 10 total

All specs Real LLM only Function bodies
extract_command function · python · L63-L95 (33 LOC)
src/aumai_policyminer/cli.py
def extract_command(
    logs_path: Path,
    output_path: Path | None,
    min_support: float,
    min_confidence: float,
    min_lift: float,
    name: str,
) -> None:
    """Extract governance policies from a JSONL behavior log file.

    Example:

        aumai-policyminer extract --logs behavior.jsonl --min-confidence 0.7
    """
    parser = LogParser()
    logs = parser.parse_file(logs_path)
    click.echo(f"Parsed {len(logs)} valid log entries.")

    extractor = PolicyExtractor(
        min_support=min_support,
        min_confidence=min_confidence,
        min_lift=min_lift,
    )
    policy_set = extractor.extract(logs, name=name)
    click.echo(f"Mined {len(policy_set.policies)} policies.")

    formatter = PolicyFormatter()
    dest = output_path or logs_path.parent / "policies.json"
    formatter.to_json_file(policy_set, dest)
    click.echo(f"Saved policy set to {dest}")

    # Print a brief summary
    click.echo(formatter.to_text(policy_set, max_policies=10))
format_command function · python · L121-L144 (24 LOC)
src/aumai_policyminer/cli.py
def format_command(
    policies_path: Path, output_format: str, max_policies: int
) -> None:
    """Render a JSON policy set as text, Markdown, or JSON.

    Example:

        aumai-policyminer format --policies policies.json --output-format markdown
    """
    try:
        data = json.loads(policies_path.read_text(encoding="utf-8"))
        policy_set = PolicySet.model_validate(data)
    except Exception as exc:
        click.echo(f"ERROR loading policy set: {exc}", err=True)
        sys.exit(1)

    formatter = PolicyFormatter()

    if output_format == "text":
        click.echo(formatter.to_text(policy_set, max_policies=max_policies))
    elif output_format == "markdown":
        click.echo(formatter.to_markdown(policy_set, max_policies=max_policies))
    else:
        click.echo(policy_set.model_dump_json(indent=2))
LogParser.parse_file method · python · L35-L57 (23 LOC)
src/aumai_policyminer/core.py
    def parse_file(self, path: Path) -> list[BehaviorLog]:
        """Parse a JSONL file and return validated BehaviorLog objects.

        Args:
            path: Path to a JSONL file where each line is a BehaviorLog.

        Returns:
            List of BehaviorLog instances (invalid lines skipped).
        """
        logs: list[BehaviorLog] = []
        self.skipped_count = 0
        with path.open(encoding="utf-8") as fh:
            for line_number, line in enumerate(fh, start=1):
                line = line.strip()
                if not line:
                    continue
                try:
                    data = json.loads(line)
                    logs.append(BehaviorLog.model_validate(data))
                except Exception:
                    self.skipped_count += 1
                    continue
        return logs
LogParser.parse_list method · python · L59-L74 (16 LOC)
src/aumai_policyminer/core.py
    def parse_list(self, records: list[dict[str, Any]]) -> list[BehaviorLog]:
        """Parse a list of raw dicts into BehaviorLog objects.

        Args:
            records: List of raw dictionaries.

        Returns:
            List of validated BehaviorLog instances.
        """
        logs: list[BehaviorLog] = []
        for record in records:
            try:
                logs.append(BehaviorLog.model_validate(record))
            except Exception:
                continue
        return logs
PolicyExtractor.__init__ method · python · L98-L113 (16 LOC)
src/aumai_policyminer/core.py
    def __init__(
        self,
        min_support: float = 0.05,
        min_confidence: float = 0.6,
        min_lift: float = 1.0,
    ) -> None:
        """Initialise the extractor with threshold parameters.

        Args:
            min_support: Minimum support fraction for a rule to be returned.
            min_confidence: Minimum confidence fraction.
            min_lift: Minimum lift value (1.0 means no filtering by lift).
        """
        self.min_support = min_support
        self.min_confidence = min_confidence
        self.min_lift = min_lift
PolicyExtractor.extract method · python · L115-L182 (68 LOC)
src/aumai_policyminer/core.py
    def extract(self, logs: list[BehaviorLog], name: str = "Mined Policy Set") -> PolicySet:
        """Mine association rules from a list of behavior logs.

        Args:
            logs: List of BehaviorLog objects to analyse.
            name: Name for the resulting PolicySet.

        Returns:
            PolicySet populated with discovered policies.
        """
        total = len(logs)
        if total == 0:
            return PolicySet(name=name, source_logs=0)

        # Count action frequencies
        action_counts: Counter[str] = Counter(log.action for log in logs)

        # Build (context_key, context_value, action) co-occurrence counts
        # antecedent = (key, value) pair from context
        # consequent = action
        antecedent_counts: Counter[tuple[str, str]] = Counter()
        cooccurrence_counts: Counter[tuple[str, str, str]] = Counter()

        for log in logs:
            for key, value in log.context.items():
                str_val = str(value)
        
PolicyFormatter.to_text method · python · L199-L223 (25 LOC)
src/aumai_policyminer/core.py
    def to_text(self, policy_set: PolicySet, max_policies: int = 50) -> str:
        """Render policies as a plain-text report.

        Args:
            policy_set: The PolicySet to render.
            max_policies: Maximum number of policies to include.

        Returns:
            Formatted string report.
        """
        lines: list[str] = [
            f"Policy Set: {policy_set.name}",
            f"Source logs: {policy_set.source_logs}",
            f"Generated at: {policy_set.generated_at}",
            f"Total policies: {len(policy_set.policies)}",
            "-" * 60,
        ]
        for policy in policy_set.policies[:max_policies]:
            lines.append(f"[{policy.policy_id}] {policy.description}")
            lines.append(
                f"  support={policy.support:.4f} "
                f"confidence={policy.confidence:.4f} "
                f"lift={policy.lift:.4f}"
            )
        return "\n".join(lines)
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
PolicyFormatter.to_markdown method · python · L225-L251 (27 LOC)
src/aumai_policyminer/core.py
    def to_markdown(self, policy_set: PolicySet, max_policies: int = 50) -> str:
        """Render policies as a Markdown table.

        Args:
            policy_set: The PolicySet to render.
            max_policies: Maximum number of policies to include.

        Returns:
            Markdown-formatted string.
        """
        lines: list[str] = [
            f"# {policy_set.name}",
            "",
            f"- **Source logs:** {policy_set.source_logs}",
            f"- **Generated at:** {policy_set.generated_at}",
            f"- **Total policies:** {len(policy_set.policies)}",
            "",
            "| ID | Antecedent | Consequent | Support | Confidence | Lift |",
            "|----|-----------|-----------|---------|------------|------|",
        ]
        for policy in policy_set.policies[:max_policies]:
            antecedent_str = ", ".join(f"{k}={v}" for k, v in policy.antecedent.items())
            lines.append(
                f"| {policy.policy_id} | {antecedent
PolicyFormatter.to_json_file method · python · L253-L260 (8 LOC)
src/aumai_policyminer/core.py
    def to_json_file(self, policy_set: PolicySet, path: Path) -> None:
        """Serialise a PolicySet to a JSON file.

        Args:
            policy_set: The PolicySet to serialise.
            path: Destination file path.
        """
        path.write_text(policy_set.model_dump_json(indent=2), encoding="utf-8")
PolicySet.top_policies method · python · L76-L85 (10 LOC)
src/aumai_policyminer/models.py
    def top_policies(self, n: int = 10) -> list[MinedPolicy]:
        """Return the top-n policies sorted by confidence descending.

        Args:
            n: Maximum number of policies to return.

        Returns:
            List of MinedPolicy objects.
        """
        return sorted(self.policies, key=lambda p: p.confidence, reverse=True)[:n]