← back to drhiramrodriguez__medscribe-ai

Function bodies 223 total

All specs Real LLM only Function bodies
NoteDraftUpdate class · python · L140-L148 (9 LOC)
app/schemas/notes.py
class NoteDraftUpdate(BaseModel):
    output_text: str

    @field_validator("output_text")
    @classmethod
    def not_empty(cls, v: str) -> str:
        if not v.strip():
            raise ValueError("output_text cannot be empty")
        return v
not_empty method · python · L145-L148 (4 LOC)
app/schemas/notes.py
    def not_empty(cls, v: str) -> str:
        if not v.strip():
            raise ValueError("output_text cannot be empty")
        return v
AuthError class · python · L17-L21 (5 LOC)
app/services/auth.py
class AuthError(Exception):
    def __init__(self, message: str, status_code: int = 400):
        self.message = message
        self.status_code = status_code
        super().__init__(message)
__init__ method · python · L18-L21 (4 LOC)
app/services/auth.py
    def __init__(self, message: str, status_code: int = 400):
        self.message = message
        self.status_code = status_code
        super().__init__(message)
register_user function · python · L24-L69 (46 LOC)
app/services/auth.py
async def register_user(db: AsyncSession, data: UserRegister) -> UserOut:
    # Normalize inputs before storage
    normalized_email = data.email.lower().strip()
    normalized_username = data.username.strip()
    normalized_full_name = data.full_name.strip() if data.full_name else ""

    existing = await db.scalar(
        select(User).where(
            (User.email == normalized_email) | (User.username == normalized_username)
        )
    )
    if existing:
        if existing.email == normalized_email:
            raise AuthError("Email already registered", 409)
        raise AuthError("Username already taken", 409)

    user = User(
        id=uuid.uuid4(),
        email=normalized_email,
        username=normalized_username,
        password_hash=hash_password(data.password),
        full_name=normalized_full_name,
        specialty=data.specialty,
    )
    db.add(user)

    try:
        await db.flush([user])
    except IntegrityError:
        await db.rollback()
        raise
login_user function · python · L72-L112 (41 LOC)
app/services/auth.py
async def login_user(db: AsyncSession, data: UserLogin, ip_address: str | None = None) -> Token:
    user = await db.scalar(
        select(User).where(User.username == data.username)
    )

    # Constant-time: always run verify even if user not found
    dummy_hash = "$argon2id$v=19$m=65536,t=3,p=4$dummy"
    password_ok = verify_password(data.password, user.password_hash if user else dummy_hash)

    if not user or not password_ok:
        await write_audit_event(
            db,
            action="login_failed",
            metadata={"username": data.username},
            ip_address=ip_address,
        )
        raise AuthError("Invalid username or password", 401)

    if not user.is_active:
        raise AuthError("Account is inactive", 403)

    # Rehash if needed (argon2 params updated)
    if needs_rehash(user.password_hash):
        user.password_hash = hash_password(data.password)
        await db.flush()

    user.last_login = datetime.now(timezone.utc)

    token = create
PhiEntity class · python · L136-L141 (6 LOC)
app/services/deid.py
class PhiEntity:
    """A detected PHI entity."""
    entity_type: str  # 'name', 'date', 'mrn', 'phone', 'ssn', 'email', 'address', 'age'
    original_value: str
    start_position: int
    end_position: int
Want this analysis on your repo? https://repobility.com/scan/
DeIdResult class · python · L145-L150 (6 LOC)
app/services/deid.py
class DeIdResult:
    """Result of de-identification."""
    original_text: str
    redacted_text: str
    entities_found: list[dict]
    warning: str
_detect_names function · python · L153-L179 (27 LOC)
app/services/deid.py
def _detect_names(text: str) -> list[PhiEntity]:
    """Detect potential person names (first + last name combinations).

    Looks for capitalized word sequences where both first and last names are in
    the common names list, or where one is common and the other is capitalized
    and followed by common surname indicators.
    """
    entities = []
    # Simple pattern: Capitalized word followed by another Capitalized word
    pattern = re.compile(r'\b([A-Z][a-z]+)\s+([A-Z][a-z]+)\b')

    for match in pattern.finditer(text):
        first, last = match.group(1), match.group(2)
        first_lower = first.lower()
        last_lower = last.lower()

        # Match if both are in common names, or first is common and last is capitalized
        if (first_lower in COMMON_FIRST_NAMES and last_lower in COMMON_SURNAMES) or \
           (first_lower in COMMON_FIRST_NAMES and last_lower not in {"the", "and", "or", "of"}):
            entities.append(PhiEntity(
                entity_type='nam
_detect_dates function · python · L182-L192 (11 LOC)
app/services/deid.py
def _detect_dates(text: str) -> list[PhiEntity]:
    """Detect date patterns in various formats."""
    entities = []
    for match in _DATE_PATTERN.finditer(text):
        entities.append(PhiEntity(
            entity_type='date',
            original_value=match.group(0),
            start_position=match.start(),
            end_position=match.end(),
        ))
    return entities
_detect_mrn function · python · L195-L217 (23 LOC)
app/services/deid.py
def _detect_mrn(text: str) -> list[PhiEntity]:
    """Detect MRN/account numbers (6-10 digit sequences).

    Conservative: looks for sequences of 6-10 consecutive digits, possibly separated
    by hyphens or spaces.
    """
    entities = []
    # More sophisticated MRN pattern: digits with optional separators
    mrn_pattern = re.compile(r'\b\d{6,10}\b')
    for match in mrn_pattern.finditer(text):
        # Filter out common non-PHI numbers (years, etc.)
        value = match.group(0)
        num = int(value)
        # If it looks like a year or century, skip it
        if (num >= 1900 and num <= 2100):
            continue
        entities.append(PhiEntity(
            entity_type='mrn',
            original_value=value,
            start_position=match.start(),
            end_position=match.end(),
        ))
    return entities
_detect_phones function · python · L220-L230 (11 LOC)
app/services/deid.py
def _detect_phones(text: str) -> list[PhiEntity]:
    """Detect phone numbers."""
    entities = []
    for match in _PHONE_PATTERN.finditer(text):
        entities.append(PhiEntity(
            entity_type='phone',
            original_value=match.group(0),
            start_position=match.start(),
            end_position=match.end(),
        ))
    return entities
_detect_ssn function · python · L233-L243 (11 LOC)
app/services/deid.py
def _detect_ssn(text: str) -> list[PhiEntity]:
    """Detect Social Security Numbers."""
    entities = []
    for match in _SSN_PATTERN.finditer(text):
        entities.append(PhiEntity(
            entity_type='ssn',
            original_value=match.group(0),
            start_position=match.start(),
            end_position=match.end(),
        ))
    return entities
_detect_emails function · python · L246-L256 (11 LOC)
app/services/deid.py
def _detect_emails(text: str) -> list[PhiEntity]:
    """Detect email addresses."""
    entities = []
    for match in _EMAIL_PATTERN.finditer(text):
        entities.append(PhiEntity(
            entity_type='email',
            original_value=match.group(0),
            start_position=match.start(),
            end_position=match.end(),
        ))
    return entities
_detect_addresses function · python · L259-L269 (11 LOC)
app/services/deid.py
def _detect_addresses(text: str) -> list[PhiEntity]:
    """Detect street addresses."""
    entities = []
    for match in _ADDRESS_PATTERN.finditer(text):
        entities.append(PhiEntity(
            entity_type='address',
            original_value=match.group(0),
            start_position=match.start(),
            end_position=match.end(),
        ))
    return entities
Repobility · open methodology · https://repobility.com/research/
_detect_ages_over_89 function · python · L272-L290 (19 LOC)
app/services/deid.py
def _detect_ages_over_89(text: str) -> list[PhiEntity]:
    """Detect ages explicitly mentioned as over 89 (HIPAA safe harbor)."""
    entities = []
    for match in _AGE_PATTERN.finditer(text):
        # Extract the age number from either group 1 or group 2
        age_str = match.group(1) or match.group(2)
        try:
            age = int(age_str)
            # Only flag ages >= 90 (safe harbor in HIPAA)
            if age >= 90:
                entities.append(PhiEntity(
                    entity_type='age',
                    original_value=match.group(0),
                    start_position=match.start(),
                    end_position=match.end(),
                ))
        except (ValueError, TypeError):
            pass
    return entities
_merge_overlapping_entities function · python · L293-L312 (20 LOC)
app/services/deid.py
def _merge_overlapping_entities(entities: list[PhiEntity]) -> list[PhiEntity]:
    """Remove overlapping entities, keeping the longest match."""
    if not entities:
        return []

    # Sort by start position, then by length (descending)
    sorted_ents = sorted(entities, key=lambda e: (e.start_position, -(e.end_position - e.start_position)))

    merged = []
    for ent in sorted_ents:
        # Check if this entity overlaps with any already added
        overlap = False
        for added in merged:
            if not (ent.end_position <= added.start_position or ent.start_position >= added.end_position):
                overlap = True
                break
        if not overlap:
            merged.append(ent)

    return sorted(merged, key=lambda e: e.start_position)
redact_phi function · python · L315-L378 (64 LOC)
app/services/deid.py
def redact_phi(text: str) -> DeIdResult:
    """Detect and redact PHI from clinical text.

    Returns a DeIdResult with:
    - original_text: the input text
    - redacted_text: text with all detected PHI replaced with [REDACTED_TYPE] markers
    - entities_found: list of dicts with entity type, original value, start, end positions
    - warning: HIPAA de-identification disclaimer
    """

    # Detect all PHI entities
    all_entities = []
    all_entities.extend(_detect_names(text))
    all_entities.extend(_detect_dates(text))
    all_entities.extend(_detect_phones(text))
    all_entities.extend(_detect_ssn(text))
    all_entities.extend(_detect_emails(text))
    all_entities.extend(_detect_addresses(text))
    all_entities.extend(_detect_mrn(text))
    all_entities.extend(_detect_ages_over_89(text))

    # Merge overlapping entities
    entities = _merge_overlapping_entities(all_entities)

    # Build redacted text by replacing entities
    redaction_map = {
        'name': '[REDAC
EHRExporter class · python · L19-L33 (15 LOC)
app/services/ehr_export.py
class EHRExporter(ABC):
    """Base class for EHR-specific export formats."""

    @abstractmethod
    def render(self, note: StructuredNote, metadata: dict) -> str:
        """Render a StructuredNote to EHR-specific format string."""
        ...

    @abstractmethod
    def file_extension(self) -> str:
        ...

    @abstractmethod
    def content_type(self) -> str:
        ...
render method · python · L23-L25 (3 LOC)
app/services/ehr_export.py
    def render(self, note: StructuredNote, metadata: dict) -> str:
        """Render a StructuredNote to EHR-specific format string."""
        ...
MeditechExporter class · python · L36-L356 (321 LOC)
app/services/ehr_export.py
class MeditechExporter(EHRExporter):
    """
    MEDITECH-compatible structured text format.

    Uses section markers (^^^SECTION_NAME^^^) that align with MEDITECH's
    text import conventions. Produces a flat ASCII text file that can be
    copy/pasted or imported via MEDITECH's document intake.

    Format conventions:
    - Section markers: ^^^SECTION_NAME^^^
    - Problem-based plan uses numbered items with indented sub-items
    - Metadata header with note type, date, language, draft status
    - Medication changes formatted with dose/route/frequency on indented lines
    - Follow-up items include timeframe
    - Discharge summaries include dedicated ^^^DISCHARGE_MEDICATIONS^^^ section
    - ^^^PENDING_RESULTS^^^ section for any pending items
    - Footer: ^^^END_OF_NOTE^^^
    - Output is clean ASCII (no unicode that MEDITECH terminals cannot render)
    - Bilingual section labels (EN/ES) for Puerto Rico hospital environments
    """

    # ── Section label text by language ───
_to_ascii method · python · L148-L177 (30 LOC)
app/services/ehr_export.py
    def _to_ascii(text: str) -> str:
        """Convert text to clean ASCII for MEDITECH terminal compatibility.

        Replaces accented characters with their unaccented equivalents,
        em/en dashes with --, smart quotes with straight quotes, and
        strips any remaining non-ASCII characters.
        """
        # Normalize unicode to decomposed form, then drop combining marks
        normalized = unicodedata.normalize("NFKD", text)
        ascii_chars: list[str] = []
        for ch in normalized:
            if ord(ch) < 128:
                ascii_chars.append(ch)
            elif unicodedata.category(ch) == "Mn":
                # Combining mark (accent) -- skip it, base char already added
                continue
            elif ch in ("\u2014", "\u2013"):  # em dash, en dash
                ascii_chars.append("--")
            elif ch in ("\u2018", "\u2019"):  # smart single quotes
                ascii_chars.append("'")
            elif ch in ("\u201c", "\u201d"):  # 
_marker method · python · L179-L184 (6 LOC)
app/services/ehr_export.py
    def _marker(self, section_key: str, lang: str) -> str:
        """Build a ^^^SECTION_LABEL^^^ marker using localized label text."""
        labels = self._SECTION_LABELS.get(lang, self._SECTION_LABELS["en"])
        label = labels.get(section_key, section_key.upper())
        # Ensure marker text is ASCII-safe (no accents)
        return f"^^^{self._to_ascii(label).replace(' ', '_')}^^^"
Same scanner, your repo: https://repobility.com — Repobility
_note_type_display method · python · L186-L190 (5 LOC)
app/services/ehr_export.py
    def _note_type_display(self, note_type: str, lang: str) -> str:
        """Return a display-friendly note type name."""
        type_map = self._NOTE_TYPE_DISPLAY.get(lang, self._NOTE_TYPE_DISPLAY["en"])
        display = type_map.get(note_type, note_type.upper().replace("_", " "))
        return self._to_ascii(display)
_format_medication_block method · python · L192-L219 (28 LOC)
app/services/ehr_export.py
    def _format_medication_block(self, med_text: str) -> list[str]:
        """Format medication text with dose/route/frequency on indented lines.

        Splits medication entries and formats structured information
        (dose, route, frequency) on separate indented lines when present.
        """
        out: list[str] = []
        # Split on newlines -- each line may be a medication entry
        entries = [ln.strip() for ln in med_text.strip().splitlines() if ln.strip()]
        for idx, entry in enumerate(entries, 1):
            # Strip leading bullet/number markers
            clean = re.sub(r"^[\d]+[.)]\s*", "", entry)
            clean = re.sub(r"^[-*]\s*", "", clean)
            if not clean:
                continue

            # Try to parse structured medication info
            # Pattern: "Drug Name dose route frequency" or comma/semicolon separated
            parts = re.split(r"\s*[,;]\s*", clean)
            if len(parts) >= 3:
                # Structured: name, d
_format_follow_up_items method · python · L221-L228 (8 LOC)
app/services/ehr_export.py
    def _format_follow_up_items(self, items: list[str]) -> list[str]:
        """Format follow-up items ensuring timeframe is visible."""
        out: list[str] = []
        for idx, item in enumerate(items, 1):
            text = self._to_ascii(item)
            # If no obvious timeframe, keep as-is -- the LLM should provide it
            out.append(f"  {idx}. {text}")
        return out
render method · python · L230-L356 (127 LOC)
app/services/ehr_export.py
    def render(self, note: StructuredNote, metadata: dict) -> str:
        lines: list[str] = []
        lang = note.language if note.language in self._SECTION_LABELS else "en"
        now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
        note_type_display = self._note_type_display(note.note_type, lang)

        # ── Header block ────────────────────────────────────────────
        lines.append("^^^HEADER^^^")
        lines.append(f"NOTE TYPE: {note_type_display}")
        lines.append(f"DATE: {now}")
        lines.append(f"LANGUAGE: {lang.upper()}")
        lines.append(f"DRAFT STATUS: {self._DRAFT_STATUS.get(lang, self._DRAFT_STATUS['en'])}")
        lines.append(f"MODEL: {metadata.get('model_used', 'N/A')}")
        lines.append(f"PROMPT VERSION: {metadata.get('prompt_version', 'N/A')}")
        lines.append("")

        # ── Chief complaint ─────────────────────────────────────────
        if note.chief_complaint:
            lines.append(self._marker("chief_compl
get_exporter function · python · L359-L367 (9 LOC)
app/services/ehr_export.py
def get_exporter(format_name: str) -> EHRExporter:
    """Factory function to get an EHR exporter by format name."""
    exporters = {
        "meditext": MeditechExporter(),
    }
    exporter = exporters.get(format_name)
    if not exporter:
        raise ValueError(f"Unsupported EHR export format: {format_name}. Available: {', '.join(exporters.keys())}")
    return exporter
_char_entropy function · python · L38-L44 (7 LOC)
app/services/notes.py
def _char_entropy(text: str) -> float:
    """Calculate Shannon entropy per character. Low entropy = repeated chars."""
    if not text:
        return 0.0
    freq = Counter(text)
    length = len(text)
    return -sum((c / length) * math.log2(c / length) for c in freq.values())
NoteError class · python · L47-L51 (5 LOC)
app/services/notes.py
class NoteError(Exception):
    def __init__(self, message: str, status_code: int = 400):
        self.message = message
        self.status_code = status_code
        super().__init__(message)
__init__ method · python · L48-L51 (4 LOC)
app/services/notes.py
    def __init__(self, message: str, status_code: int = 400):
        self.message = message
        self.status_code = status_code
        super().__init__(message)
Repobility · severity-and-effort ranking · https://repobility.com
create_note_session function · python · L54-L66 (13 LOC)
app/services/notes.py
async def create_note_session(
    db: AsyncSession, user_id: uuid.UUID, data: NoteSessionCreate
) -> NoteSessionOut:
    session = NoteSession(
        id=uuid.uuid4(),
        user_id=user_id,
        note_type=data.note_type,
        patient_context=data.patient_context,
        language=data.language,
    )
    db.add(session)
    await db.flush([session])
    return NoteSessionOut.model_validate(session)
generate_note function · python · L69-L215 (147 LOC)
app/services/notes.py
async def generate_note(
    db: AsyncSession, user_id: uuid.UUID, request: NoteGenerateRequest
) -> NoteGenerateResponse:
    # Verify session ownership
    session = await db.scalar(
        select(NoteSession).where(
            NoteSession.id == request.session_id,
            NoteSession.user_id == user_id,
            NoteSession.deleted_at.is_(None),
        )
    )
    if not session:
        raise NoteError("Session not found or access denied", 404)

    # ── Input quality checks ────────────────────────────────────────
    if _char_entropy(request.input_text) < _MIN_INPUT_ENTROPY:
        raise NoteError(
            "Input text appears to be repeated or non-meaningful characters. "
            "Please provide actual clinical content.",
            422,
        )

    if session.patient_context and len(session.patient_context) > _MAX_PATIENT_CONTEXT_LEN:
        raise NoteError(
            f"Patient context must not exceed {_MAX_PATIENT_CONTEXT_LEN} characters.",
           
get_draft function · python · L218-L233 (16 LOC)
app/services/notes.py
async def get_draft(
    db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> NoteDraftOut:
    draft = await db.scalar(
        select(NoteDraft)
        .join(NoteSession, NoteDraft.session_id == NoteSession.id)
        .where(
            NoteDraft.id == draft_id,
            NoteSession.user_id == user_id,
            NoteDraft.deleted_at.is_(None),
            NoteSession.deleted_at.is_(None),
        )
    )
    if not draft:
        raise NoteError("Draft not found", 404)
    return NoteDraftOut.model_validate(draft)
update_draft function · python · L236-L268 (33 LOC)
app/services/notes.py
async def update_draft(
    db: AsyncSession,
    draft_id: uuid.UUID,
    user_id: uuid.UUID,
    data: NoteDraftUpdate,
) -> NoteDraftOut:
    draft = await db.scalar(
        select(NoteDraft)
        .join(NoteSession, NoteDraft.session_id == NoteSession.id)
        .where(
            NoteDraft.id == draft_id,
            NoteSession.user_id == user_id,
            NoteDraft.deleted_at.is_(None),
            NoteSession.deleted_at.is_(None),
        )
    )
    if not draft:
        raise NoteError("Draft not found", 404)

    draft.output_text = data.output_text
    draft.version += 1
    await db.flush([draft])

    await write_audit_event(
        db,
        action="draft_edited",
        user_id=user_id,
        resource_type="note_draft",
        resource_id=draft.id,
        metadata={"version": draft.version},
    )

    return NoteDraftOut.model_validate(draft)
export_draft_text function · python · L271-L297 (27 LOC)
app/services/notes.py
async def export_draft_text(
    db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> tuple[str, str]:
    """Returns (output_text, draft_id_str) for text export. Emits audit event."""
    draft = await db.scalar(
        select(NoteDraft)
        .join(NoteSession, NoteDraft.session_id == NoteSession.id)
        .where(
            NoteDraft.id == draft_id,
            NoteSession.user_id == user_id,
            NoteDraft.deleted_at.is_(None),
            NoteSession.deleted_at.is_(None),
        )
    )
    if not draft:
        raise NoteError("Draft not found", 404)

    await write_audit_event(
        db,
        action="note_exported",
        user_id=user_id,
        resource_type="note_draft",
        resource_id=draft.id,
        metadata={"format": "text"},
    )

    return draft.output_text or "", str(draft.id)
export_draft_pdf function · python · L300-L346 (47 LOC)
app/services/notes.py
async def export_draft_pdf(
    db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> tuple[bytes, str]:
    """Returns (pdf_bytes, filename) for PDF export. Emits audit event."""
    from app.utils.pdf_export import render_note_pdf

    draft = await db.scalar(
        select(NoteDraft)
        .join(NoteSession, NoteDraft.session_id == NoteSession.id)
        .where(
            NoteDraft.id == draft_id,
            NoteSession.user_id == user_id,
            NoteDraft.deleted_at.is_(None),
            NoteSession.deleted_at.is_(None),
        )
    )
    if not draft:
        raise NoteError("Draft not found", 404)

    # Get the note_type, patient_context, language from the parent session
    session = await db.scalar(
        select(NoteSession).where(NoteSession.id == draft.session_id)
    )
    note_type = session.note_type if session else ""
    patient_context = (session.patient_context or "") if session else ""
    language = (session.language or "en") if session els
export_draft_meditext function · python · L349-L393 (45 LOC)
app/services/notes.py
async def export_draft_meditext(
    db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> tuple[str, str]:
    """Returns (meditext_content, filename) for MEDITECH export. Emits audit event."""
    from app.services.ehr_export import MeditechExporter
    from app.schemas.note_generation import StructuredNote

    draft = await db.scalar(
        select(NoteDraft)
        .join(NoteSession, NoteDraft.session_id == NoteSession.id)
        .where(
            NoteDraft.id == draft_id,
            NoteSession.user_id == user_id,
            NoteDraft.deleted_at.is_(None),
            NoteSession.deleted_at.is_(None),
        )
    )
    if not draft:
        raise NoteError("Draft not found", 404)

    # Need structured payload to render MEDITEXT
    if not draft.structured_payload:
        raise NoteError("Draft has no structured payload — cannot export to MEDITEXT format", 400)

    structured_note = StructuredNote.model_validate(draft.structured_payload)

    metadata = {
    
get_session function · python · L396-L422 (27 LOC)
app/services/notes.py
async def get_session(
    db: AsyncSession, session_id: uuid.UUID, user_id: uuid.UUID
) -> NoteSessionDetailOut:
    """Return a single note session with its drafts, owned by the given user."""
    session = await db.scalar(
        select(NoteSession).where(
            NoteSession.id == session_id,
            NoteSession.user_id == user_id,
            NoteSession.deleted_at.is_(None),
        )
    )
    if not session:
        raise NoteError("Session not found or access denied", 404)

    # Fetch non-deleted drafts for this session
    drafts_result = await db.scalars(
        select(NoteDraft)
        .where(
            NoteDraft.session_id == session_id,
            NoteDraft.deleted_at.is_(None),
        )
        .order_by(NoteDraft.version)
    )
    drafts = [NoteDraftOut.model_validate(d) for d in drafts_result.all()]

    session_data = NoteSessionOut.model_validate(session)
    return NoteSessionDetailOut(**session_data.model_dump(), drafts=drafts)
Want this analysis on your repo? https://repobility.com/scan/
list_sessions function · python · L425-L437 (13 LOC)
app/services/notes.py
async def list_sessions(
    db: AsyncSession, user_id: uuid.UUID
) -> list[NoteSessionOut]:
    result = await db.scalars(
        select(NoteSession)
        .where(
            NoteSession.user_id == user_id,
            NoteSession.deleted_at.is_(None),
        )
        .order_by(NoteSession.created_at.desc())
        .limit(20)
    )
    return [NoteSessionOut.model_validate(s) for s in result.all()]
list_session_drafts function · python · L440-L461 (22 LOC)
app/services/notes.py
async def list_session_drafts(
    db: AsyncSession, session_id: uuid.UUID, user_id: uuid.UUID
) -> list[NoteDraftOut]:
    session = await db.scalar(
        select(NoteSession).where(
            NoteSession.id == session_id,
            NoteSession.user_id == user_id,
            NoteSession.deleted_at.is_(None),
        )
    )
    if not session:
        raise NoteError("Session not found or access denied", 404)

    result = await db.scalars(
        select(NoteDraft)
        .where(
            NoteDraft.session_id == session_id,
            NoteDraft.deleted_at.is_(None),
        )
        .order_by(NoteDraft.version)
    )
    return [NoteDraftOut.model_validate(d) for d in result.all()]
soft_delete_session function · python · L464-L499 (36 LOC)
app/services/notes.py
async def soft_delete_session(
    db: AsyncSession, session_id: uuid.UUID, user_id: uuid.UUID
) -> None:
    """Soft-delete a session and all its drafts."""
    session = await db.scalar(
        select(NoteSession).where(
            NoteSession.id == session_id,
            NoteSession.user_id == user_id,
            NoteSession.deleted_at.is_(None),
        )
    )
    if not session:
        raise NoteError("Session not found or access denied", 404)

    now = datetime.now(timezone.utc)
    session.deleted_at = now

    # Soft-delete all drafts in this session
    drafts = await db.scalars(
        select(NoteDraft).where(
            NoteDraft.session_id == session_id,
            NoteDraft.deleted_at.is_(None),
        )
    )
    for draft in drafts.all():
        draft.deleted_at = now

    await db.flush()

    await write_audit_event(
        db,
        action="session_deleted",
        user_id=user_id,
        resource_type="note_session",
        resource_id=session_id,
 
soft_delete_draft function · python · L502-L527 (26 LOC)
app/services/notes.py
async def soft_delete_draft(
    db: AsyncSession, draft_id: uuid.UUID, user_id: uuid.UUID
) -> None:
    """Soft-delete a single draft."""
    draft = await db.scalar(
        select(NoteDraft)
        .join(NoteSession, NoteDraft.session_id == NoteSession.id)
        .where(
            NoteDraft.id == draft_id,
            NoteSession.user_id == user_id,
            NoteDraft.deleted_at.is_(None),
        )
    )
    if not draft:
        raise NoteError("Draft not found", 404)

    draft.deleted_at = datetime.now(timezone.utc)
    await db.flush()

    await write_audit_event(
        db,
        action="draft_deleted",
        user_id=user_id,
        resource_type="note_draft",
        resource_id=draft_id,
    )
render_note_text function · python · L96-L207 (112 LOC)
app/services/render_note.py
def render_note_text(note: StructuredNote) -> str:
    """
    Render a StructuredNote into formatted clinical note text.

    Returns:
        Formatted clinical note string ready for physician review.
    """
    lang = note.language if note.language in SECTION_LABELS else "en"
    labels = SECTION_LABELS[lang]
    type_display = NOTE_TYPE_DISPLAY.get(lang, NOTE_TYPE_DISPLAY["en"])

    sections: list[str] = []

    # ── Header ───────────────────────────────────────────────────────
    header = type_display.get(
        note.note_type, note.note_type.upper().replace("_", " ")
    )
    sections.append(header)
    sections.append("")

    # ── Chief Complaint ──────────────────────────────────────────────
    if note.chief_complaint:
        sections.append(f"{labels['chief_complaint']}:")
        sections.append(note.chief_complaint)
        sections.append("")

    # ── Subjective / HPI ─────────────────────────────────────────────
    if note.subjective:
        label = (
        
ensure_free_subscription function · python · L36-L55 (20 LOC)
app/services/stripe_billing.py
async def ensure_free_subscription(db: AsyncSession, user_id: uuid.UUID) -> Subscription:
    """
    Create a free-tier subscription for a newly registered user.
    Called during registration. Idempotent — skips if subscription exists.
    """
    existing = await db.scalar(
        select(Subscription).where(Subscription.user_id == user_id)
    )
    if existing:
        return existing

    sub = Subscription(
        id=uuid.uuid4(),
        user_id=user_id,
        tier="free",
        status="active",
    )
    db.add(sub)
    await db.flush([sub])
    return sub
get_user_subscription function · python · L58-L62 (5 LOC)
app/services/stripe_billing.py
async def get_user_subscription(db: AsyncSession, user_id: uuid.UUID) -> Subscription | None:
    """Get the current subscription for a user."""
    return await db.scalar(
        select(Subscription).where(Subscription.user_id == user_id)
    )
get_note_limit_for_tier function · python · L65-L72 (8 LOC)
app/services/stripe_billing.py
def get_note_limit_for_tier(tier: str) -> int:
    """Return the daily note generation limit for a subscription tier."""
    if tier == "free":
        return settings.stripe_free_tier_note_limit
    if tier == "pro":
        return settings.note_daily_limit
    # Unknown tier — default to free limits
    return settings.stripe_free_tier_note_limit
Repobility · open methodology · https://repobility.com/research/
is_subscription_active function · python · L75-L79 (5 LOC)
app/services/stripe_billing.py
def is_subscription_active(sub: Subscription | None) -> bool:
    """Check whether a subscription allows note generation."""
    if sub is None:
        return False
    return sub.status in ACTIVE_STATUSES or sub.status in GRACE_STATUSES
handle_checkout_completed function · python · L85-L136 (52 LOC)
app/services/stripe_billing.py
async def handle_checkout_completed(db: AsyncSession, event_data: dict) -> None:
    """
    Handle checkout.session.completed — user just subscribed.
    Links Stripe customer to our User and creates/upgrades Subscription.
    """
    session_obj = event_data.get("object", {})
    stripe_customer_id = session_obj.get("customer")
    stripe_subscription_id = session_obj.get("subscription")
    customer_email = session_obj.get("customer_email") or session_obj.get("customer_details", {}).get("email")

    if not customer_email:
        log.warning("stripe_checkout_no_email", customer_id=stripe_customer_id)
        return

    # Find user by email
    user = await db.scalar(select(User).where(User.email == customer_email))
    if not user:
        log.warning("stripe_checkout_user_not_found", email="[REDACTED]")
        return

    # Link Stripe customer ID
    if not user.stripe_customer_id:
        user.stripe_customer_id = stripe_customer_id

    # Create or update subscription
    sub
handle_subscription_updated function · python · L139-L183 (45 LOC)
app/services/stripe_billing.py
async def handle_subscription_updated(db: AsyncSession, event_data: dict) -> None:
    """
    Handle customer.subscription.updated — status change, renewal, plan change.
    """
    sub_obj = event_data.get("object", {})
    stripe_sub_id = sub_obj.get("id")
    new_status = sub_obj.get("status", "active")
    cancel_at_end = sub_obj.get("cancel_at_period_end", False)

    # Period timestamps from Stripe (Unix epoch)
    period_start_ts = sub_obj.get("current_period_start")
    period_end_ts = sub_obj.get("current_period_end")

    sub = await db.scalar(
        select(Subscription).where(Subscription.stripe_subscription_id == stripe_sub_id)
    )
    if not sub:
        log.warning("stripe_sub_update_not_found", stripe_sub_id=stripe_sub_id)
        return

    sub.status = new_status
    sub.cancel_at_period_end = cancel_at_end

    if period_start_ts:
        sub.current_period_start = datetime.fromtimestamp(period_start_ts, tz=timezone.utc)
    if period_end_ts:
        sub.current
‹ prevpage 3 / 5next ›