Function bodies 11 total
keygen_command function · python · L32-L37 (6 LOC)src/aumai_confidentialrag/cli.py
def keygen_command(key_path: str) -> None:
"""Generate a new Fernet encryption key."""
key = DocumentEncryptor.generate_key()
Path(key_path).write_bytes(key)
click.echo(f"Key written to {key_path}")
click.echo("IMPORTANT: Keep this key secret and backed up.")encrypt_command function · python · L70-L101 (32 LOC)src/aumai_confidentialrag/cli.py
def encrypt_command(
input_dir: str, output_dir: str, key_path: str, key_id: str
) -> None:
"""Encrypt all text files in a directory."""
key = Path(key_path).read_bytes()
encryptor = DocumentEncryptor()
out_path = Path(output_dir)
out_path.mkdir(parents=True, exist_ok=True)
config = EncryptionConfig(algorithm="fernet", key_id=key_id)
count = 0
for txt_file in sorted(Path(input_dir).rglob("*")):
if not txt_file.is_file():
continue
content = txt_file.read_text(encoding="utf-8", errors="replace")
doc_id = str(uuid.uuid4())
ciphertext = encryptor.encrypt(content, key)
doc = ConfidentialDocument(
doc_id=doc_id,
encrypted_content=ciphertext,
metadata={
"original_filename": txt_file.name,
"key_id": config.key_id,
"algorithm": config.algorithm,
},
)
out_file = out_path / f"{doc_id}.json"
search_command function · python · L128-L176 (49 LOC)src/aumai_confidentialrag/cli.py
def search_command(
query: str,
index_dir: str,
key_path: str,
top_k: int,
context_json: str,
) -> None:
"""Search an encrypted document index."""
key = Path(key_path).read_bytes()
try:
requester_context = json.loads(context_json)
except json.JSONDecodeError as exc:
click.echo(f"Error: invalid JSON for --context: {exc}", err=True)
sys.exit(1)
index = ConfidentialIndex()
doc_files = list(Path(index_dir).glob("*.json"))
if not doc_files:
click.echo("No encrypted documents found.", err=True)
sys.exit(1)
for doc_file in doc_files:
doc = ConfidentialDocument.model_validate(
json.loads(doc_file.read_text())
)
index.add_document(doc)
click.echo(f"Loaded {index.document_count()} document(s).", err=True)
results = index.search(
query=query,
key=key,
top_k=top_k,
requester_context=requester_context,
)
if not results:DocumentEncryptor.encrypt method · python · L35-L42 (8 LOC)src/aumai_confidentialrag/core.py
def encrypt(self, content: str, key: bytes) -> str:
"""
Encrypt *content* with *key*.
Returns a URL-safe base64-encoded ciphertext string.
"""
f = Fernet(key)
return f.encrypt(content.encode("utf-8")).decode("ascii")DocumentEncryptor.decrypt method · python · L44-L51 (8 LOC)src/aumai_confidentialrag/core.py
def decrypt(self, ciphertext: str, key: bytes) -> str:
"""
Decrypt *ciphertext* with *key*.
Raises ``InvalidToken`` if the key is wrong or the token is corrupted.
"""
f = Fernet(key)
return f.decrypt(ciphertext.encode("ascii")).decode("utf-8")AccessController.check method · python · L68-L83 (16 LOC)src/aumai_confidentialrag/core.py
def check(
self,
policy: dict[str, Any],
requester_context: dict[str, Any],
) -> bool:
"""
Return ``True`` if *requester_context* satisfies *policy*.
An empty policy grants access to everyone.
"""
if not policy:
return True
for required_key, required_value in policy.items():
if requester_context.get(required_key) != required_value:
return False
return TrueConfidentialIndex.add_document method · python · L102-L107 (6 LOC)src/aumai_confidentialrag/core.py
def add_document(
self,
doc: ConfidentialDocument,
) -> None:
"""Add an encrypted document to the index."""
self._documents[doc.doc_id] = docHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
ConfidentialIndex.search method · python · L113-L167 (55 LOC)src/aumai_confidentialrag/core.py
def search(
self,
query: str,
key: bytes,
top_k: int = 5,
requester_context: dict[str, Any] | None = None,
) -> list[QueryResult]:
"""
Search the index for documents relevant to *query*.
Decrypts each document using *key*, scores it, then returns the
top-k results with a decrypted snippet. Access policy is enforced
per document via *requester_context*.
Args:
query: Plain-text search query.
key: Fernet key for decryption.
top_k: Maximum number of results to return.
requester_context: Attributes used to evaluate access policies.
Returns:
List of ``QueryResult`` sorted by relevance descending.
"""
encryptor = DocumentEncryptor()
context = requester_context or {}
query_terms = _tokenize(query)
scored: list[tuple[str, float, str]] = []
for doc_id, doc in self._documents.items()ConfidentialIndex.get_document method · python · L173-L178 (6 LOC)src/aumai_confidentialrag/core.py
def get_document(self, doc_id: str) -> ConfidentialDocument:
"""Retrieve a document by ID."""
doc = self._documents.get(doc_id)
if doc is None:
raise KeyError(f"No document with id {doc_id!r}.")
return doc_tfidf_score function · python · L195-L214 (20 LOC)src/aumai_confidentialrag/core.py
def _tfidf_score(query_terms: list[str], document: str) -> float:
"""
Compute a simple TF-based relevance score.
For each query term, TF = count(term, doc) / len(doc_terms).
Score = sum of TF values across query terms, normalized to [0, 1]
by dividing by the maximum possible score.
"""
if not query_terms:
return 0.0
doc_terms = _tokenize(document)
if not doc_terms:
return 0.0
term_counts = Counter(doc_terms)
doc_len = len(doc_terms)
total_tf = sum(
term_counts.get(term, 0) / doc_len for term in query_terms
)
# Normalize: max possible tf per term is 1.0, so max total = len(query_terms)
return total_tf / len(query_terms)_extract_snippet function · python · L217-L240 (24 LOC)src/aumai_confidentialrag/core.py
def _extract_snippet(
text: str, query_terms: list[str], max_len: int
) -> str:
"""
Extract a snippet from *text* centered around the first query term hit.
Returns a substring of at most *max_len* characters.
"""
lower = text.lower()
best_pos = len(text)
for term in query_terms:
pos = lower.find(term)
if pos != -1 and pos < best_pos:
best_pos = pos
if best_pos == len(text):
return text[:max_len]
start = max(0, best_pos - max_len // 4)
end = min(len(text), start + max_len)
snippet = text[start:end]
if start > 0:
snippet = "..." + snippet
if end < len(text):
snippet = snippet + "..."
return snippet