← back to invincible-jha__aumai-confidentialrag

Function bodies 11 total

All specs Real LLM only Function bodies
keygen_command function · python · L32-L37 (6 LOC)
src/aumai_confidentialrag/cli.py
def keygen_command(key_path: str) -> None:
    """Generate a new Fernet encryption key."""
    key = DocumentEncryptor.generate_key()
    Path(key_path).write_bytes(key)
    click.echo(f"Key written to {key_path}")
    click.echo("IMPORTANT: Keep this key secret and backed up.")
encrypt_command function · python · L70-L101 (32 LOC)
src/aumai_confidentialrag/cli.py
def encrypt_command(
    input_dir: str, output_dir: str, key_path: str, key_id: str
) -> None:
    """Encrypt all text files in a directory."""
    key = Path(key_path).read_bytes()
    encryptor = DocumentEncryptor()
    out_path = Path(output_dir)
    out_path.mkdir(parents=True, exist_ok=True)

    config = EncryptionConfig(algorithm="fernet", key_id=key_id)
    count = 0

    for txt_file in sorted(Path(input_dir).rglob("*")):
        if not txt_file.is_file():
            continue
        content = txt_file.read_text(encoding="utf-8", errors="replace")
        doc_id = str(uuid.uuid4())
        ciphertext = encryptor.encrypt(content, key)
        doc = ConfidentialDocument(
            doc_id=doc_id,
            encrypted_content=ciphertext,
            metadata={
                "original_filename": txt_file.name,
                "key_id": config.key_id,
                "algorithm": config.algorithm,
            },
        )
        out_file = out_path / f"{doc_id}.json"
       
search_command function · python · L128-L176 (49 LOC)
src/aumai_confidentialrag/cli.py
def search_command(
    query: str,
    index_dir: str,
    key_path: str,
    top_k: int,
    context_json: str,
) -> None:
    """Search an encrypted document index."""
    key = Path(key_path).read_bytes()
    try:
        requester_context = json.loads(context_json)
    except json.JSONDecodeError as exc:
        click.echo(f"Error: invalid JSON for --context: {exc}", err=True)
        sys.exit(1)

    index = ConfidentialIndex()

    doc_files = list(Path(index_dir).glob("*.json"))
    if not doc_files:
        click.echo("No encrypted documents found.", err=True)
        sys.exit(1)

    for doc_file in doc_files:
        doc = ConfidentialDocument.model_validate(
            json.loads(doc_file.read_text())
        )
        index.add_document(doc)

    click.echo(f"Loaded {index.document_count()} document(s).", err=True)

    results = index.search(
        query=query,
        key=key,
        top_k=top_k,
        requester_context=requester_context,
    )

    if not results:
DocumentEncryptor.encrypt method · python · L35-L42 (8 LOC)
src/aumai_confidentialrag/core.py
    def encrypt(self, content: str, key: bytes) -> str:
        """
        Encrypt *content* with *key*.

        Returns a URL-safe base64-encoded ciphertext string.
        """
        f = Fernet(key)
        return f.encrypt(content.encode("utf-8")).decode("ascii")
DocumentEncryptor.decrypt method · python · L44-L51 (8 LOC)
src/aumai_confidentialrag/core.py
    def decrypt(self, ciphertext: str, key: bytes) -> str:
        """
        Decrypt *ciphertext* with *key*.

        Raises ``InvalidToken`` if the key is wrong or the token is corrupted.
        """
        f = Fernet(key)
        return f.decrypt(ciphertext.encode("ascii")).decode("utf-8")
AccessController.check method · python · L68-L83 (16 LOC)
src/aumai_confidentialrag/core.py
    def check(
        self,
        policy: dict[str, Any],
        requester_context: dict[str, Any],
    ) -> bool:
        """
        Return ``True`` if *requester_context* satisfies *policy*.

        An empty policy grants access to everyone.
        """
        if not policy:
            return True
        for required_key, required_value in policy.items():
            if requester_context.get(required_key) != required_value:
                return False
        return True
ConfidentialIndex.add_document method · python · L102-L107 (6 LOC)
src/aumai_confidentialrag/core.py
    def add_document(
        self,
        doc: ConfidentialDocument,
    ) -> None:
        """Add an encrypted document to the index."""
        self._documents[doc.doc_id] = doc
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
ConfidentialIndex.search method · python · L113-L167 (55 LOC)
src/aumai_confidentialrag/core.py
    def search(
        self,
        query: str,
        key: bytes,
        top_k: int = 5,
        requester_context: dict[str, Any] | None = None,
    ) -> list[QueryResult]:
        """
        Search the index for documents relevant to *query*.

        Decrypts each document using *key*, scores it, then returns the
        top-k results with a decrypted snippet.  Access policy is enforced
        per document via *requester_context*.

        Args:
            query: Plain-text search query.
            key: Fernet key for decryption.
            top_k: Maximum number of results to return.
            requester_context: Attributes used to evaluate access policies.

        Returns:
            List of ``QueryResult`` sorted by relevance descending.
        """
        encryptor = DocumentEncryptor()
        context = requester_context or {}
        query_terms = _tokenize(query)
        scored: list[tuple[str, float, str]] = []

        for doc_id, doc in self._documents.items()
ConfidentialIndex.get_document method · python · L173-L178 (6 LOC)
src/aumai_confidentialrag/core.py
    def get_document(self, doc_id: str) -> ConfidentialDocument:
        """Retrieve a document by ID."""
        doc = self._documents.get(doc_id)
        if doc is None:
            raise KeyError(f"No document with id {doc_id!r}.")
        return doc
_tfidf_score function · python · L195-L214 (20 LOC)
src/aumai_confidentialrag/core.py
def _tfidf_score(query_terms: list[str], document: str) -> float:
    """
    Compute a simple TF-based relevance score.

    For each query term, TF = count(term, doc) / len(doc_terms).
    Score = sum of TF values across query terms, normalized to [0, 1]
    by dividing by the maximum possible score.
    """
    if not query_terms:
        return 0.0
    doc_terms = _tokenize(document)
    if not doc_terms:
        return 0.0
    term_counts = Counter(doc_terms)
    doc_len = len(doc_terms)
    total_tf = sum(
        term_counts.get(term, 0) / doc_len for term in query_terms
    )
    # Normalize: max possible tf per term is 1.0, so max total = len(query_terms)
    return total_tf / len(query_terms)
_extract_snippet function · python · L217-L240 (24 LOC)
src/aumai_confidentialrag/core.py
def _extract_snippet(
    text: str, query_terms: list[str], max_len: int
) -> str:
    """
    Extract a snippet from *text* centered around the first query term hit.

    Returns a substring of at most *max_len* characters.
    """
    lower = text.lower()
    best_pos = len(text)
    for term in query_terms:
        pos = lower.find(term)
        if pos != -1 and pos < best_pos:
            best_pos = pos
    if best_pos == len(text):
        return text[:max_len]
    start = max(0, best_pos - max_len // 4)
    end = min(len(text), start + max_len)
    snippet = text[start:end]
    if start > 0:
        snippet = "..." + snippet
    if end < len(text):
        snippet = snippet + "..."
    return snippet