Function bodies 223 total
NoteDraftUpdate class · python · L140-L148 (9 LOC)app/schemas/notes.py
class NoteDraftUpdate(BaseModel):
output_text: str
@field_validator("output_text")
@classmethod
def not_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("output_text cannot be empty")
return vnot_empty method · python · L145-L148 (4 LOC)app/schemas/notes.py
def not_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("output_text cannot be empty")
return vAuthError class · python · L17-L21 (5 LOC)app/services/auth.py
class AuthError(Exception):
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
super().__init__(message)__init__ method · python · L18-L21 (4 LOC)app/services/auth.py
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
super().__init__(message)register_user function · python · L24-L69 (46 LOC)app/services/auth.py
async def register_user(db: AsyncSession, data: UserRegister) -> UserOut:
# Normalize inputs before storage
normalized_email = data.email.lower().strip()
normalized_username = data.username.strip()
normalized_full_name = data.full_name.strip() if data.full_name else ""
existing = await db.scalar(
select(User).where(
(User.email == normalized_email) | (User.username == normalized_username)
)
)
if existing:
if existing.email == normalized_email:
raise AuthError("Email already registered", 409)
raise AuthError("Username already taken", 409)
user = User(
id=uuid.uuid4(),
email=normalized_email,
username=normalized_username,
password_hash=hash_password(data.password),
full_name=normalized_full_name,
specialty=data.specialty,
)
db.add(user)
try:
await db.flush([user])
except IntegrityError:
await db.rollback()
raiselogin_user function · python · L72-L112 (41 LOC)app/services/auth.py
async def login_user(db: AsyncSession, data: UserLogin, ip_address: str | None = None) -> Token:
user = await db.scalar(
select(User).where(User.username == data.username)
)
# Constant-time: always run verify even if user not found
dummy_hash = "$argon2id$v=19$m=65536,t=3,p=4$dummy"
password_ok = verify_password(data.password, user.password_hash if user else dummy_hash)
if not user or not password_ok:
await write_audit_event(
db,
action="login_failed",
metadata={"username": data.username},
ip_address=ip_address,
)
raise AuthError("Invalid username or password", 401)
if not user.is_active:
raise AuthError("Account is inactive", 403)
# Rehash if needed (argon2 params updated)
if needs_rehash(user.password_hash):
user.password_hash = hash_password(data.password)
await db.flush()
user.last_login = datetime.now(timezone.utc)
token = createPhiEntity class · python · L136-L141 (6 LOC)app/services/deid.py
class PhiEntity:
"""A detected PHI entity."""
entity_type: str # 'name', 'date', 'mrn', 'phone', 'ssn', 'email', 'address', 'age'
original_value: str
start_position: int
end_position: intWant this analysis on your repo? https://repobility.com/scan/
DeIdResult class · python · L145-L150 (6 LOC)app/services/deid.py
class DeIdResult:
"""Result of de-identification."""
original_text: str
redacted_text: str
entities_found: list[dict]
warning: str_detect_names function · python · L153-L179 (27 LOC)app/services/deid.py
def _detect_names(text: str) -> list[PhiEntity]:
"""Detect potential person names (first + last name combinations).
Looks for capitalized word sequences where both first and last names are in
the common names list, or where one is common and the other is capitalized
and followed by common surname indicators.
"""
entities = []
# Simple pattern: Capitalized word followed by another Capitalized word
pattern = re.compile(r'\b([A-Z][a-z]+)\s+([A-Z][a-z]+)\b')
for match in pattern.finditer(text):
first, last = match.group(1), match.group(2)
first_lower = first.lower()
last_lower = last.lower()
# Match if both are in common names, or first is common and last is capitalized
if (first_lower in COMMON_FIRST_NAMES and last_lower in COMMON_SURNAMES) or \
(first_lower in COMMON_FIRST_NAMES and last_lower not in {"the", "and", "or", "of"}):
entities.append(PhiEntity(
entity_type='nam_detect_dates function · python · L182-L192 (11 LOC)app/services/deid.py
def _detect_dates(text: str) -> list[PhiEntity]:
"""Detect date patterns in various formats."""
entities = []
for match in _DATE_PATTERN.finditer(text):
entities.append(PhiEntity(
entity_type='date',
original_value=match.group(0),
start_position=match.start(),
end_position=match.end(),
))
return entities_detect_mrn function · python · L195-L217 (23 LOC)app/services/deid.py
def _detect_mrn(text: str) -> list[PhiEntity]:
"""Detect MRN/account numbers (6-10 digit sequences).
Conservative: looks for sequences of 6-10 consecutive digits, possibly separated
by hyphens or spaces.
"""
entities = []
# More sophisticated MRN pattern: digits with optional separators
mrn_pattern = re.compile(r'\b\d{6,10}\b')
for match in mrn_pattern.finditer(text):
# Filter out common non-PHI numbers (years, etc.)
value = match.group(0)
num = int(value)
# If it looks like a year or century, skip it
if (num >= 1900 and num <= 2100):
continue
entities.append(PhiEntity(
entity_type='mrn',
original_value=value,
start_position=match.start(),
end_position=match.end(),
))
return entities_detect_phones function · python · L220-L230 (11 LOC)app/services/deid.py
def _detect_phones(text: str) -> list[PhiEntity]:
"""Detect phone numbers."""
entities = []
for match in _PHONE_PATTERN.finditer(text):
entities.append(PhiEntity(
entity_type='phone',
original_value=match.group(0),
start_position=match.start(),
end_position=match.end(),
))
return entities_detect_ssn function · python · L233-L243 (11 LOC)app/services/deid.py
def _detect_ssn(text: str) -> list[PhiEntity]:
"""Detect Social Security Numbers."""
entities = []
for match in _SSN_PATTERN.finditer(text):
entities.append(PhiEntity(
entity_type='ssn',
original_value=match.group(0),
start_position=match.start(),
end_position=match.end(),
))
return entities_detect_emails function · python · L246-L256 (11 LOC)app/services/deid.py
def _detect_emails(text: str) -> list[PhiEntity]:
"""Detect email addresses."""
entities = []
for match in _EMAIL_PATTERN.finditer(text):
entities.append(PhiEntity(
entity_type='email',
original_value=match.group(0),
start_position=match.start(),
end_position=match.end(),
))
return entities_detect_addresses function · python · L259-L269 (11 LOC)app/services/deid.py
def _detect_addresses(text: str) -> list[PhiEntity]:
"""Detect street addresses."""
entities = []
for match in _ADDRESS_PATTERN.finditer(text):
entities.append(PhiEntity(
entity_type='address',
original_value=match.group(0),
start_position=match.start(),
end_position=match.end(),
))
return entitiesRepobility · open methodology · https://repobility.com/research/
_detect_ages_over_89 function · python · L272-L290 (19 LOC)app/services/deid.py
def _detect_ages_over_89(text: str) -> list[PhiEntity]:
"""Detect ages explicitly mentioned as over 89 (HIPAA safe harbor)."""
entities = []
for match in _AGE_PATTERN.finditer(text):
# Extract the age number from either group 1 or group 2
age_str = match.group(1) or match.group(2)
try:
age = int(age_str)
# Only flag ages >= 90 (safe harbor in HIPAA)
if age >= 90:
entities.append(PhiEntity(
entity_type='age',
original_value=match.group(0),
start_position=match.start(),
end_position=match.end(),
))
except (ValueError, TypeError):
pass
return entities_merge_overlapping_entities function · python · L293-L312 (20 LOC)app/services/deid.py
def _merge_overlapping_entities(entities: list[PhiEntity]) -> list[PhiEntity]:
"""Remove overlapping entities, keeping the longest match."""
if not entities:
return []
# Sort by start position, then by length (descending)
sorted_ents = sorted(entities, key=lambda e: (e.start_position, -(e.end_position - e.start_position)))
merged = []
for ent in sorted_ents:
# Check if this entity overlaps with any already added
overlap = False
for added in merged:
if not (ent.end_position <= added.start_position or ent.start_position >= added.end_position):
overlap = True
break
if not overlap:
merged.append(ent)
return sorted(merged, key=lambda e: e.start_position)redact_phi function · python · L315-L378 (64 LOC)app/services/deid.py
def redact_phi(text: str) -> DeIdResult:
"""Detect and redact PHI from clinical text.
Returns a DeIdResult with:
- original_text: the input text
- redacted_text: text with all detected PHI replaced with [REDACTED_TYPE] markers
- entities_found: list of dicts with entity type, original value, start, end positions
- warning: HIPAA de-identification disclaimer
"""
# Detect all PHI entities
all_entities = []
all_entities.extend(_detect_names(text))
all_entities.extend(_detect_dates(text))
all_entities.extend(_detect_phones(text))
all_entities.extend(_detect_ssn(text))
all_entities.extend(_detect_emails(text))
all_entities.extend(_detect_addresses(text))
all_entities.extend(_detect_mrn(text))
all_entities.extend(_detect_ages_over_89(text))
# Merge overlapping entities
entities = _merge_overlapping_entities(all_entities)
# Build redacted text by replacing entities
redaction_map = {
'name': '[REDACEHRExporter class · python · L19-L33 (15 LOC)app/services/ehr_export.py
class EHRExporter(ABC):
"""Base class for EHR-specific export formats."""
@abstractmethod
def render(self, note: StructuredNote, metadata: dict) -> str:
"""Render a StructuredNote to EHR-specific format string."""
...
@abstractmethod
def file_extension(self) -> str:
...
@abstractmethod
def content_type(self) -> str:
...render method · python · L23-L25 (3 LOC)app/services/ehr_export.py
def render(self, note: StructuredNote, metadata: dict) -> str:
"""Render a StructuredNote to EHR-specific format string."""
...MeditechExporter class · python · L36-L356 (321 LOC)app/services/ehr_export.py
class MeditechExporter(EHRExporter):
"""
MEDITECH-compatible structured text format.
Uses section markers (^^^SECTION_NAME^^^) that align with MEDITECH's
text import conventions. Produces a flat ASCII text file that can be
copy/pasted or imported via MEDITECH's document intake.
Format conventions:
- Section markers: ^^^SECTION_NAME^^^
- Problem-based plan uses numbered items with indented sub-items
- Metadata header with note type, date, language, draft status
- Medication changes formatted with dose/route/frequency on indented lines
- Follow-up items include timeframe
- Discharge summaries include dedicated ^^^DISCHARGE_MEDICATIONS^^^ section
- ^^^PENDING_RESULTS^^^ section for any pending items
- Footer: ^^^END_OF_NOTE^^^
- Output is clean ASCII (no unicode that MEDITECH terminals cannot render)
- Bilingual section labels (EN/ES) for Puerto Rico hospital environments
"""
# ── Section label text by language ───_to_ascii method · python · L148-L177 (30 LOC)app/services/ehr_export.py
def _to_ascii(text: str) -> str:
"""Convert text to clean ASCII for MEDITECH terminal compatibility.
Replaces accented characters with their unaccented equivalents,
em/en dashes with --, smart quotes with straight quotes, and
strips any remaining non-ASCII characters.
"""
# Normalize unicode to decomposed form, then drop combining marks
normalized = unicodedata.normalize("NFKD", text)
ascii_chars: list[str] = []
for ch in normalized:
if ord(ch) < 128:
ascii_chars.append(ch)
elif unicodedata.category(ch) == "Mn":
# Combining mark (accent) -- skip it, base char already added
continue
elif ch in ("\u2014", "\u2013"): # em dash, en dash
ascii_chars.append("--")
elif ch in ("\u2018", "\u2019"): # smart single quotes
ascii_chars.append("'")
elif ch in ("\u201c", "\u201d"): # _marker method · python · L179-L184 (6 LOC)app/services/ehr_export.py
def _marker(self, section_key: str, lang: str) -> str:
"""Build a ^^^SECTION_LABEL^^^ marker using localized label text."""
labels = self._SECTION_LABELS.get(lang, self._SECTION_LABELS["en"])
label = labels.get(section_key, section_key.upper())
# Ensure marker text is ASCII-safe (no accents)
return f"^^^{self._to_ascii(label).replace(' ', '_')}^^^"Same scanner, your repo: https://repobility.com — Repobility
_note_type_display method · python · L186-L190 (5 LOC)app/services/ehr_export.py
def _note_type_display(self, note_type: str, lang: str) -> str:
"""Return a display-friendly note type name."""
type_map = self._NOTE_TYPE_DISPLAY.get(lang, self._NOTE_TYPE_DISPLAY["en"])
display = type_map.get(note_type, note_type.upper().replace("_", " "))
return self._to_ascii(display)_format_medication_block method · python · L192-L219 (28 LOC)app/services/ehr_export.py
def _format_medication_block(self, med_text: str) -> list[str]:
"""Format medication text with dose/route/frequency on indented lines.
Splits medication entries and formats structured information
(dose, route, frequency) on separate indented lines when present.
"""
out: list[str] = []
# Split on newlines -- each line may be a medication entry
entries = [ln.strip() for ln in med_text.strip().splitlines() if ln.strip()]
for idx, entry in enumerate(entries, 1):
# Strip leading bullet/number markers
clean = re.sub(r"^[\d]+[.)]\s*", "", entry)
clean = re.sub(r"^[-*]\s*", "", clean)
if not clean:
continue
# Try to parse structured medication info
# Pattern: "Drug Name dose route frequency" or comma/semicolon separated
parts = re.split(r"\s*[,;]\s*", clean)
if len(parts) >= 3:
# Structured: name, d_format_follow_up_items method · python · L221-L228 (8 LOC)app/services/ehr_export.py
def _format_follow_up_items(self, items: list[str]) -> list[str]:
"""Format follow-up items ensuring timeframe is visible."""
out: list[str] = []
for idx, item in enumerate(items, 1):
text = self._to_ascii(item)
# If no obvious timeframe, keep as-is -- the LLM should provide it
out.append(f" {idx}. {text}")
return outrender method · python · L230-L356 (127 LOC)app/services/ehr_export.py
def render(self, note: StructuredNote, metadata: dict) -> str:
lines: list[str] = []
lang = note.language if note.language in self._SECTION_LABELS else "en"
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
note_type_display = self._note_type_display(note.note_type, lang)
# ── Header block ────────────────────────────────────────────
lines.append("^^^HEADER^^^")
lines.append(f"NOTE TYPE: {note_type_display}")
lines.append(f"DATE: {now}")
lines.append(f"LANGUAGE: {lang.upper()}")
lines.append(f"DRAFT STATUS: {self._DRAFT_STATUS.get(lang, self._DRAFT_STATUS['en'])}")
lines.append(f"MODEL: {metadata.get('model_used', 'N/A')}")
lines.append(f"PROMPT VERSION: {metadata.get('prompt_version', 'N/A')}")
lines.append("")
# ── Chief complaint ─────────────────────────────────────────
if note.chief_complaint:
lines.append(self._marker("chief_complget_exporter function · python · L359-L367 (9 LOC)app/services/ehr_export.py
def get_exporter(format_name: str) -> EHRExporter:
"""Factory function to get an EHR exporter by format name."""
exporters = {
"meditext": MeditechExporter(),
}
exporter = exporters.get(format_name)
if not exporter:
raise ValueError(f"Unsupported EHR export format: {format_name}. Available: {', '.join(exporters.keys())}")
return exporter_char_entropy function · python · L38-L44 (7 LOC)app/services/notes.py
def _char_entropy(text: str) -> float:
"""Calculate Shannon entropy per character. Low entropy = repeated chars."""
if not text:
return 0.0
freq = Counter(text)
length = len(text)
return -sum((c / length) * math.log2(c / length) for c in freq.values())NoteError class · python · L47-L51 (5 LOC)app/services/notes.py
class NoteError(Exception):
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
super().__init__(message)__init__ method · python · L48-L51 (4 LOC)app/services/notes.py
def __init__(self, message: str, status_code: int = 400):
self.message = message
self.status_code = status_code
super().__init__(message)Repobility · severity-and-effort ranking · https://repobility.com
create_note_session function · python · L54-L66 (13 LOC)app/services/notes.py
async def create_note_session(
db: AsyncSession, user_id: uuid.UUID, data: NoteSessionCreate
) -> NoteSessionOut:
session = NoteSession(
id=uuid.uuid4(),
user_id=user_id,
note_type=data.note_type,
patient_context=data.patient_context,
language=data.language,
)
db.add(session)
await db.flush([session])
return NoteSessionOut.model_validate(session)generate_note function · python · L69-L215 (147 LOC)app/services/notes.py
async def generate_note(
db: AsyncSession, user_id: uuid.UUID, request: NoteGenerateRequest
) -> NoteGenerateResponse:
# Verify session ownership
session = await db.scalar(
select(NoteSession).where(
NoteSession.id == request.session_id,
NoteSession.user_id == user_id,
NoteSession.deleted_at.is_(None),
)
)
if not session:
raise NoteError("Session not found or access denied", 404)
# ── Input quality checks ────────────────────────────────────────
if _char_entropy(request.input_text) < _MIN_INPUT_ENTROPY:
raise NoteError(
"Input text appears to be repeated or non-meaningful characters. "
"Please provide actual clinical content.",
422,
)
if session.patient_context and len(session.patient_context) > _MAX_PATIENT_CONTEXT_LEN:
raise NoteError(
f"Patient context must not exceed {_MAX_PATIENT_CONTEXT_LEN} characters.",
get_draft function · python · L218-L233 (16 LOC)app/services/notes.py
async def get_draft(
db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> NoteDraftOut:
draft = await db.scalar(
select(NoteDraft)
.join(NoteSession, NoteDraft.session_id == NoteSession.id)
.where(
NoteDraft.id == draft_id,
NoteSession.user_id == user_id,
NoteDraft.deleted_at.is_(None),
NoteSession.deleted_at.is_(None),
)
)
if not draft:
raise NoteError("Draft not found", 404)
return NoteDraftOut.model_validate(draft)update_draft function · python · L236-L268 (33 LOC)app/services/notes.py
async def update_draft(
db: AsyncSession,
draft_id: uuid.UUID,
user_id: uuid.UUID,
data: NoteDraftUpdate,
) -> NoteDraftOut:
draft = await db.scalar(
select(NoteDraft)
.join(NoteSession, NoteDraft.session_id == NoteSession.id)
.where(
NoteDraft.id == draft_id,
NoteSession.user_id == user_id,
NoteDraft.deleted_at.is_(None),
NoteSession.deleted_at.is_(None),
)
)
if not draft:
raise NoteError("Draft not found", 404)
draft.output_text = data.output_text
draft.version += 1
await db.flush([draft])
await write_audit_event(
db,
action="draft_edited",
user_id=user_id,
resource_type="note_draft",
resource_id=draft.id,
metadata={"version": draft.version},
)
return NoteDraftOut.model_validate(draft)export_draft_text function · python · L271-L297 (27 LOC)app/services/notes.py
async def export_draft_text(
db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> tuple[str, str]:
"""Returns (output_text, draft_id_str) for text export. Emits audit event."""
draft = await db.scalar(
select(NoteDraft)
.join(NoteSession, NoteDraft.session_id == NoteSession.id)
.where(
NoteDraft.id == draft_id,
NoteSession.user_id == user_id,
NoteDraft.deleted_at.is_(None),
NoteSession.deleted_at.is_(None),
)
)
if not draft:
raise NoteError("Draft not found", 404)
await write_audit_event(
db,
action="note_exported",
user_id=user_id,
resource_type="note_draft",
resource_id=draft.id,
metadata={"format": "text"},
)
return draft.output_text or "", str(draft.id)export_draft_pdf function · python · L300-L346 (47 LOC)app/services/notes.py
async def export_draft_pdf(
db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> tuple[bytes, str]:
"""Returns (pdf_bytes, filename) for PDF export. Emits audit event."""
from app.utils.pdf_export import render_note_pdf
draft = await db.scalar(
select(NoteDraft)
.join(NoteSession, NoteDraft.session_id == NoteSession.id)
.where(
NoteDraft.id == draft_id,
NoteSession.user_id == user_id,
NoteDraft.deleted_at.is_(None),
NoteSession.deleted_at.is_(None),
)
)
if not draft:
raise NoteError("Draft not found", 404)
# Get the note_type, patient_context, language from the parent session
session = await db.scalar(
select(NoteSession).where(NoteSession.id == draft.session_id)
)
note_type = session.note_type if session else ""
patient_context = (session.patient_context or "") if session else ""
language = (session.language or "en") if session elsexport_draft_meditext function · python · L349-L393 (45 LOC)app/services/notes.py
async def export_draft_meditext(
db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> tuple[str, str]:
"""Returns (meditext_content, filename) for MEDITECH export. Emits audit event."""
from app.services.ehr_export import MeditechExporter
from app.schemas.note_generation import StructuredNote
draft = await db.scalar(
select(NoteDraft)
.join(NoteSession, NoteDraft.session_id == NoteSession.id)
.where(
NoteDraft.id == draft_id,
NoteSession.user_id == user_id,
NoteDraft.deleted_at.is_(None),
NoteSession.deleted_at.is_(None),
)
)
if not draft:
raise NoteError("Draft not found", 404)
# Need structured payload to render MEDITEXT
if not draft.structured_payload:
raise NoteError("Draft has no structured payload — cannot export to MEDITEXT format", 400)
structured_note = StructuredNote.model_validate(draft.structured_payload)
metadata = {
get_session function · python · L396-L422 (27 LOC)app/services/notes.py
async def get_session(
db: AsyncSession, session_id: uuid.UUID, user_id: uuid.UUID
) -> NoteSessionDetailOut:
"""Return a single note session with its drafts, owned by the given user."""
session = await db.scalar(
select(NoteSession).where(
NoteSession.id == session_id,
NoteSession.user_id == user_id,
NoteSession.deleted_at.is_(None),
)
)
if not session:
raise NoteError("Session not found or access denied", 404)
# Fetch non-deleted drafts for this session
drafts_result = await db.scalars(
select(NoteDraft)
.where(
NoteDraft.session_id == session_id,
NoteDraft.deleted_at.is_(None),
)
.order_by(NoteDraft.version)
)
drafts = [NoteDraftOut.model_validate(d) for d in drafts_result.all()]
session_data = NoteSessionOut.model_validate(session)
return NoteSessionDetailOut(**session_data.model_dump(), drafts=drafts)Want this analysis on your repo? https://repobility.com/scan/
list_sessions function · python · L425-L437 (13 LOC)app/services/notes.py
async def list_sessions(
db: AsyncSession, user_id: uuid.UUID
) -> list[NoteSessionOut]:
result = await db.scalars(
select(NoteSession)
.where(
NoteSession.user_id == user_id,
NoteSession.deleted_at.is_(None),
)
.order_by(NoteSession.created_at.desc())
.limit(20)
)
return [NoteSessionOut.model_validate(s) for s in result.all()]list_session_drafts function · python · L440-L461 (22 LOC)app/services/notes.py
async def list_session_drafts(
db: AsyncSession, session_id: uuid.UUID, user_id: uuid.UUID
) -> list[NoteDraftOut]:
session = await db.scalar(
select(NoteSession).where(
NoteSession.id == session_id,
NoteSession.user_id == user_id,
NoteSession.deleted_at.is_(None),
)
)
if not session:
raise NoteError("Session not found or access denied", 404)
result = await db.scalars(
select(NoteDraft)
.where(
NoteDraft.session_id == session_id,
NoteDraft.deleted_at.is_(None),
)
.order_by(NoteDraft.version)
)
return [NoteDraftOut.model_validate(d) for d in result.all()]soft_delete_session function · python · L464-L499 (36 LOC)app/services/notes.py
async def soft_delete_session(
db: AsyncSession, session_id: uuid.UUID, user_id: uuid.UUID
) -> None:
"""Soft-delete a session and all its drafts."""
session = await db.scalar(
select(NoteSession).where(
NoteSession.id == session_id,
NoteSession.user_id == user_id,
NoteSession.deleted_at.is_(None),
)
)
if not session:
raise NoteError("Session not found or access denied", 404)
now = datetime.now(timezone.utc)
session.deleted_at = now
# Soft-delete all drafts in this session
drafts = await db.scalars(
select(NoteDraft).where(
NoteDraft.session_id == session_id,
NoteDraft.deleted_at.is_(None),
)
)
for draft in drafts.all():
draft.deleted_at = now
await db.flush()
await write_audit_event(
db,
action="session_deleted",
user_id=user_id,
resource_type="note_session",
resource_id=session_id,
soft_delete_draft function · python · L502-L527 (26 LOC)app/services/notes.py
async def soft_delete_draft(
db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> None:
"""Soft-delete a single draft."""
draft = await db.scalar(
select(NoteDraft)
.join(NoteSession, NoteDraft.session_id == NoteSession.id)
.where(
NoteDraft.id == draft_id,
NoteSession.user_id == user_id,
NoteDraft.deleted_at.is_(None),
)
)
if not draft:
raise NoteError("Draft not found", 404)
draft.deleted_at = datetime.now(timezone.utc)
await db.flush()
await write_audit_event(
db,
action="draft_deleted",
user_id=user_id,
resource_type="note_draft",
resource_id=draft_id,
)render_note_text function · python · L96-L207 (112 LOC)app/services/render_note.py
def render_note_text(note: StructuredNote) -> str:
"""
Render a StructuredNote into formatted clinical note text.
Returns:
Formatted clinical note string ready for physician review.
"""
lang = note.language if note.language in SECTION_LABELS else "en"
labels = SECTION_LABELS[lang]
type_display = NOTE_TYPE_DISPLAY.get(lang, NOTE_TYPE_DISPLAY["en"])
sections: list[str] = []
# ── Header ───────────────────────────────────────────────────────
header = type_display.get(
note.note_type, note.note_type.upper().replace("_", " ")
)
sections.append(header)
sections.append("")
# ── Chief Complaint ──────────────────────────────────────────────
if note.chief_complaint:
sections.append(f"{labels['chief_complaint']}:")
sections.append(note.chief_complaint)
sections.append("")
# ── Subjective / HPI ─────────────────────────────────────────────
if note.subjective:
label = (
ensure_free_subscription function · python · L36-L55 (20 LOC)app/services/stripe_billing.py
async def ensure_free_subscription(db: AsyncSession, user_id: uuid.UUID) -> Subscription:
"""
Create a free-tier subscription for a newly registered user.
Called during registration. Idempotent — skips if subscription exists.
"""
existing = await db.scalar(
select(Subscription).where(Subscription.user_id == user_id)
)
if existing:
return existing
sub = Subscription(
id=uuid.uuid4(),
user_id=user_id,
tier="free",
status="active",
)
db.add(sub)
await db.flush([sub])
return subget_user_subscription function · python · L58-L62 (5 LOC)app/services/stripe_billing.py
async def get_user_subscription(db: AsyncSession, user_id: uuid.UUID) -> Subscription | None:
"""Get the current subscription for a user."""
return await db.scalar(
select(Subscription).where(Subscription.user_id == user_id)
)get_note_limit_for_tier function · python · L65-L72 (8 LOC)app/services/stripe_billing.py
def get_note_limit_for_tier(tier: str) -> int:
"""Return the daily note generation limit for a subscription tier."""
if tier == "free":
return settings.stripe_free_tier_note_limit
if tier == "pro":
return settings.note_daily_limit
# Unknown tier — default to free limits
return settings.stripe_free_tier_note_limitRepobility · open methodology · https://repobility.com/research/
is_subscription_active function · python · L75-L79 (5 LOC)app/services/stripe_billing.py
def is_subscription_active(sub: Subscription | None) -> bool:
"""Check whether a subscription allows note generation."""
if sub is None:
return False
return sub.status in ACTIVE_STATUSES or sub.status in GRACE_STATUSEShandle_checkout_completed function · python · L85-L136 (52 LOC)app/services/stripe_billing.py
async def handle_checkout_completed(db: AsyncSession, event_data: dict) -> None:
"""
Handle checkout.session.completed — user just subscribed.
Links Stripe customer to our User and creates/upgrades Subscription.
"""
session_obj = event_data.get("object", {})
stripe_customer_id = session_obj.get("customer")
stripe_subscription_id = session_obj.get("subscription")
customer_email = session_obj.get("customer_email") or session_obj.get("customer_details", {}).get("email")
if not customer_email:
log.warning("stripe_checkout_no_email", customer_id=stripe_customer_id)
return
# Find user by email
user = await db.scalar(select(User).where(User.email == customer_email))
if not user:
log.warning("stripe_checkout_user_not_found", email="[REDACTED]")
return
# Link Stripe customer ID
if not user.stripe_customer_id:
user.stripe_customer_id = stripe_customer_id
# Create or update subscription
subhandle_subscription_updated function · python · L139-L183 (45 LOC)app/services/stripe_billing.py
async def handle_subscription_updated(db: AsyncSession, event_data: dict) -> None:
"""
Handle customer.subscription.updated — status change, renewal, plan change.
"""
sub_obj = event_data.get("object", {})
stripe_sub_id = sub_obj.get("id")
new_status = sub_obj.get("status", "active")
cancel_at_end = sub_obj.get("cancel_at_period_end", False)
# Period timestamps from Stripe (Unix epoch)
period_start_ts = sub_obj.get("current_period_start")
period_end_ts = sub_obj.get("current_period_end")
sub = await db.scalar(
select(Subscription).where(Subscription.stripe_subscription_id == stripe_sub_id)
)
if not sub:
log.warning("stripe_sub_update_not_found", stripe_sub_id=stripe_sub_id)
return
sub.status = new_status
sub.cancel_at_period_end = cancel_at_end
if period_start_ts:
sub.current_period_start = datetime.fromtimestamp(period_start_ts, tz=timezone.utc)
if period_end_ts:
sub.current