← back to drhiramrodriguez__medscribe-ai

Function bodies 223 total

All specs Real LLM only Function bodies
run_migrations_offline function · python · L25-L34 (10 LOC)
alembic/env.py
def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()
do_run_migrations function · python · L37-L40 (4 LOC)
alembic/env.py
def do_run_migrations(connection: Connection) -> None:
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()
run_async_migrations function · python · L43-L51 (9 LOC)
alembic/env.py
async def run_async_migrations() -> None:
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()
upgrade function · python · L21-L148 (128 LOC)
alembic/versions/001_initial_schema.py
def upgrade() -> None:
    # --- users ---
    op.create_table(
        "users",
        sa.Column("id", sa.Uuid(), primary_key=True),
        sa.Column("email", sa.String(255), nullable=False),
        sa.Column("username", sa.String(100), nullable=False),
        sa.Column("password_hash", sa.String(255), nullable=False),
        sa.Column("full_name", sa.String(255), nullable=False),
        sa.Column("specialty", sa.String(100), nullable=False),
        sa.Column("is_active", sa.Boolean(), nullable=False),
        sa.Column(
            "created_at",
            sa.DateTime(timezone=True),
            server_default=sa.func.now(),
            nullable=False,
        ),
        sa.Column(
            "updated_at",
            sa.DateTime(timezone=True),
            server_default=sa.func.now(),
            nullable=False,
        ),
        sa.Column("last_login", sa.DateTime(timezone=True), nullable=True),
    )
    op.create_index("ix_users_email", "users", ["email"], unique=True)
downgrade function · python · L151-L156 (6 LOC)
alembic/versions/001_initial_schema.py
def downgrade() -> None:
    op.drop_table("usage_records")
    op.drop_table("audit_events")
    op.drop_table("note_drafts")
    op.drop_table("note_sessions")
    op.drop_table("users")
upgrade function · python · L21-L75 (55 LOC)
alembic/versions/002_add_subscriptions_and_stripe.py
def upgrade() -> None:
    # --- Add stripe_customer_id to users ---
    op.add_column(
        "users",
        sa.Column("stripe_customer_id", sa.String(255), nullable=True),
    )
    op.create_index(
        "ix_users_stripe_customer_id",
        "users",
        ["stripe_customer_id"],
        unique=True,
    )

    # --- subscriptions ---
    op.create_table(
        "subscriptions",
        sa.Column("id", sa.Uuid(), primary_key=True),
        sa.Column(
            "user_id",
            sa.Uuid(),
            sa.ForeignKey("users.id", ondelete="CASCADE"),
            nullable=False,
        ),
        sa.Column("stripe_subscription_id", sa.String(255), nullable=True),
        sa.Column("stripe_price_id", sa.String(255), nullable=True),
        sa.Column("tier", sa.String(20), nullable=False, server_default="free"),
        sa.Column("status", sa.String(30), nullable=False, server_default="active"),
        sa.Column("current_period_start", sa.DateTime(timezone=True), nullable
downgrade function · python · L78-L81 (4 LOC)
alembic/versions/002_add_subscriptions_and_stripe.py
def downgrade() -> None:
    op.drop_table("subscriptions")
    op.drop_index("ix_users_stripe_customer_id", table_name="users")
    op.drop_column("users", "stripe_customer_id")
Want this analysis on your repo? https://repobility.com/scan/
Settings class · python · L8-L81 (74 LOC)
app/config.py
class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    # App
    app_env: str = "development"
    allowed_origins: str = "http://localhost:3000,http://localhost:8000"

    # Database
    database_url: str = "postgresql+asyncpg://medscribe:medscribe@localhost:5432/medscribe"

    # Redis
    redis_url: str = "redis://localhost:6379/0"

    # Auth
    jwt_secret: str
    jwt_expiry_minutes: int = 480  # 8 hours — typical hospital shift
    refresh_token_expiry_days: int = 7

    # OpenAI
    openai_api_key: str
    openai_model: str = "gpt-4o"
    openai_max_tokens: int = 2000
    openai_temperature: float = 0.3

    # Transcription
    whisper_model_size: str = "base"

    # Stripe
    stripe_secret_key: str = ""
    stripe_webhook_secret: str = ""
    stripe_free_tier_note_limit: int = 10
    note_daily_limit: int = 50

    # Observability
    log_level: str = "INFO"
    s
validate_openai_api_key method · python · L51-L56 (6 LOC)
app/config.py
    def validate_openai_api_key(cls, v: str) -> str:
        if not v.startswith("sk-"):
            raise ValueError("OpenAI API key must start with 'sk-'")
        if len(v) < 20:
            raise ValueError("OpenAI API key must be at least 20 characters")
        return v
validate_production_config method · python · L59-L73 (15 LOC)
app/config.py
    def validate_production_config(self) -> "Settings":
        if self.is_production:
            if len(self.jwt_secret) < 32:
                raise ValueError(
                    "In production, jwt_secret must be at least 32 characters"
                )
            if not self.stripe_secret_key:
                raise ValueError(
                    "In production, stripe_secret_key must not be empty"
                )
            if not self.stripe_webhook_secret:
                raise ValueError(
                    "In production, stripe_webhook_secret must not be empty"
                )
        return self
get_db function · python · L30-L37 (8 LOC)
app/database.py
async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
get_current_user function · python · L22-L61 (40 LOC)
app/dependencies.py
async def get_current_user(
    creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
    db: AsyncSession = Depends(get_db),
) -> User:
    if creds is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")

    try:
        payload = decode_access_token(creds.credentials)
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired")
    except (jwt.InvalidTokenError, jwt.DecodeError, KeyError, ValueError):
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token")

    # Check token blacklist (Redis). If Redis is unavailable, log and proceed.
    jti = payload.get("jti")
    if jti:
        try:
            from app.utils.redis import is_token_blacklisted
            if await is_token_blacklisted(jti):
                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has been revoked
check_note_quota function · python · L64-L132 (69 LOC)
app/dependencies.py
async def check_note_quota(
    user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
) -> User:
    """Enforce per-user daily note generation limit with subscription-aware caps.

    IMPORTANT: This dependency checks and **increments** the quota atomically
    before the endpoint handler runs. This prevents race conditions where multiple
    concurrent requests pass the check before any of them increment.

    1. Looks up the user's subscription tier (free / pro).
    2. Checks whether the subscription is active.
    3. Applies the tier's daily note limit.
    4. Uses SELECT ... FOR UPDATE to lock the usage record.
    5. **Increments the counter atomically** before returning (prerequisite for the request).
    6. Falls back to regular SELECT on SQLite (tests).

    The quota check/increment is now atomic and happens BEFORE the OpenAI call.
    If the API call fails after this, the quota was still consumed (by design).
    """
    # ── Subscription check ────
lifespan function · python · L28-L38 (11 LOC)
app/main.py
async def lifespan(app: FastAPI):
    configure_logging(log_level=settings.log_level, is_production=settings.is_production)
    log.info("startup", env=settings.app_env)
    # Tables created via Alembic in production.
    # In development/test, create if not exists for convenience.
    if not settings.is_production:
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)
    yield
    await engine.dispose()
    log.info("shutdown")
validation_error_handler function · python · L108-L122 (15 LOC)
app/main.py
async def validation_error_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
    """Return human-readable validation messages instead of raw Pydantic internals."""
    messages = []
    for err in exc.errors():
        field = " -> ".join(str(loc) for loc in err.get("loc", []) if loc != "body")
        msg = err.get("msg", "Invalid value")
        # Strip Pydantic prefix like "Value error, " for cleaner output
        if msg.startswith("Value error, "):
            msg = msg[len("Value error, "):]
        if field:
            messages.append(f"{field}: {msg}")
        else:
            messages.append(msg)
    detail = "; ".join(messages) if messages else "Invalid request data."
    return JSONResponse(status_code=422, content={"detail": detail})
Repobility · code-quality intelligence · https://repobility.com
openai_auth_handler function · python · L126-L131 (6 LOC)
app/main.py
async def openai_auth_handler(request: Request, exc: openai.AuthenticationError) -> JSONResponse:
    log.error("openai_authentication_error", path=request.url.path)
    return JSONResponse(
        status_code=503,
        content={"detail": "AI service configuration error. Please contact support."},
    )
openai_rate_limit_handler function · python · L135-L140 (6 LOC)
app/main.py
async def openai_rate_limit_handler(request: Request, exc: openai.RateLimitError) -> JSONResponse:
    log.warning("openai_rate_limit", path=request.url.path)
    return JSONResponse(
        status_code=503,
        content={"detail": "AI service temporarily unavailable. Please try again in a few minutes."},
    )
openai_timeout_handler function · python · L144-L146 (3 LOC)
app/main.py
async def openai_timeout_handler(request: Request, exc: openai.APITimeoutError) -> JSONResponse:
    log.error("openai_timeout", path=request.url.path)
    return JSONResponse(status_code=504, content={"detail": "AI service timed out. Please retry."})
openai_connection_handler function · python · L150-L152 (3 LOC)
app/main.py
async def openai_connection_handler(request: Request, exc: openai.APIConnectionError) -> JSONResponse:
    log.error("openai_connection_error", path=request.url.path)
    return JSONResponse(status_code=503, content={"detail": "AI service unavailable. Please retry."})
generic_error_handler function · python · L156-L166 (11 LOC)
app/main.py
async def generic_error_handler(request: Request, exc: Exception) -> JSONResponse:
    log.error(
        "unhandled_exception",
        path=request.url.path,
        error_type=type(exc).__name__,
        error_message=str(exc),
    )
    return JSONResponse(
        status_code=500,
        content={"detail": "An unexpected error occurred. Please try again."},
    )
RequestIDMiddleware class · python · L15-L58 (44 LOC)
app/middleware/request_id.py
class RequestIDMiddleware(BaseHTTPMiddleware):
    """Assigns a unique request_id to every request and binds it to structlog context.

    - Accepts an incoming ``X-Request-ID`` header for distributed tracing.
    - Falls back to a generated UUID4 if the header is absent.
    - Adds ``X-Request-ID`` to the response headers.
    - Logs request start and completion with method, path, status, and duration.
    - Does NOT log request body, query params, or any content that could contain PHI.
    """

    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        # Accept caller-provided request ID or generate one
        request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())

        # Bind to structlog context so every log call in this request includes it
        clear_contextvars()
        bind_contextvars(request_id=request_id)

        start = time.monotonic()
        log.info("request_started", method=request.method, path=request
dispatch method · python · L25-L58 (34 LOC)
app/middleware/request_id.py
    async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
        # Accept caller-provided request ID or generate one
        request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())

        # Bind to structlog context so every log call in this request includes it
        clear_contextvars()
        bind_contextvars(request_id=request_id)

        start = time.monotonic()
        log.info("request_started", method=request.method, path=request.url.path)

        try:
            response = await call_next(request)
        except Exception:
            duration_ms = round((time.monotonic() - start) * 1000, 1)
            log.error(
                "request_failed",
                method=request.method,
                path=request.url.path,
                duration_ms=duration_ms,
            )
            raise
        else:
            duration_ms = round((time.monotonic() - start) * 1000, 1)
            log.info(
                "re
User class · python · L27-L51 (25 LOC)
app/models/db.py
class User(Base):
    __tablename__ = "users"

    id: Mapped[uuid.UUID] = uuid_pk()
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
    username: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
    password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
    full_name: Mapped[str] = mapped_column(String(255), nullable=False, default="")
    specialty: Mapped[str] = mapped_column(String(100), nullable=False, default="internal_medicine")
    is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
    created_at: Mapped[datetime] = now_utc()
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
        nullable=False,
    )
    last_login: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
    stripe_customer_id: Mapped[str | None] = mapped_column(Strin
Source: Repobility analyzer · https://repobility.com
NoteSession class · python · L54-L84 (31 LOC)
app/models/db.py
class NoteSession(Base):
    __tablename__ = "note_sessions"

    id: Mapped[uuid.UUID] = uuid_pk()
    user_id: Mapped[uuid.UUID] = mapped_column(
        Uuid, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
    )
    note_type: Mapped[str] = mapped_column(
        String(50), nullable=False, default="soap_progress"
    )  # soap_admission | soap_progress | discharge_summary
    status: Mapped[str] = mapped_column(
        String(20), nullable=False, default="draft"
    )  # draft | complete
    patient_context: Mapped[str | None] = mapped_column(Text, nullable=True)
    language: Mapped[str] = mapped_column(String(5), nullable=False, default="en")
    created_at: Mapped[datetime] = now_utc()
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
        nullable=False,
    )
    deleted_at: Mapped[datetime | None] = mapped_column(
        DateTime(timezone=True), nullab
NoteDraft class · python · L87-L111 (25 LOC)
app/models/db.py
class NoteDraft(Base):
    __tablename__ = "note_drafts"

    id: Mapped[uuid.UUID] = uuid_pk()
    session_id: Mapped[uuid.UUID] = mapped_column(
        Uuid, ForeignKey("note_sessions.id", ondelete="CASCADE"), nullable=False, index=True
    )
    version: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
    input_text: Mapped[str] = mapped_column(Text, nullable=False)
    output_text: Mapped[str | None] = mapped_column(Text, nullable=True)
    structured_payload: Mapped[dict | None] = mapped_column(
        "structured_payload", JSON, nullable=True
    )
    schema_version: Mapped[str | None] = mapped_column(String(20), nullable=True)
    model_used: Mapped[str | None] = mapped_column(String(100), nullable=True)
    prompt_version: Mapped[str | None] = mapped_column(String(50), nullable=True)
    tokens_used: Mapped[int | None] = mapped_column(Integer, nullable=True)
    generation_time_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
    created_at: Map
AuditEvent class · python · L114-L131 (18 LOC)
app/models/db.py
class AuditEvent(Base):
    __tablename__ = "audit_events"

    id: Mapped[uuid.UUID] = uuid_pk()
    user_id: Mapped[uuid.UUID | None] = mapped_column(
        Uuid, ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
    )
    action: Mapped[str] = mapped_column(
        String(100), nullable=False, index=True
    )  # note_generated | login | logout | export | register
    resource_type: Mapped[str | None] = mapped_column(String(50), nullable=True)
    resource_id: Mapped[uuid.UUID | None] = mapped_column(Uuid, nullable=True)
    metadata_: Mapped[dict | None] = mapped_column("metadata", JSON, nullable=True)
    ip_address: Mapped[str | None] = mapped_column(String(45), nullable=True)
    created_at: Mapped[datetime] = now_utc()

    # Relationships
    user: Mapped["User | None"] = relationship(back_populates="audit_events")
UsageRecord class · python · L134-L146 (13 LOC)
app/models/db.py
class UsageRecord(Base):
    __tablename__ = "usage_records"

    id: Mapped[uuid.UUID] = uuid_pk()
    user_id: Mapped[uuid.UUID] = mapped_column(
        Uuid, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
    )
    period_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True)
    notes_generated: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    tokens_consumed: Mapped[int] = mapped_column(Integer, nullable=False, default=0)

    # Relationships
    user: Mapped["User"] = relationship(back_populates="usage_records")
Subscription class · python · L149-L193 (45 LOC)
app/models/db.py
class Subscription(Base):
    """
    Stripe subscription record — synced via webhook events.

    Tiers:
      - free: default on registration, capped at stripe_free_tier_note_limit/day
      - pro: paid monthly, full daily limit
      - (future: team, enterprise)

    Status values mirror Stripe's subscription.status:
      active, past_due, canceled, trialing, unpaid, incomplete, incomplete_expired, paused
    """
    __tablename__ = "subscriptions"

    id: Mapped[uuid.UUID] = uuid_pk()
    user_id: Mapped[uuid.UUID] = mapped_column(
        Uuid, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, unique=True, index=True
    )
    stripe_subscription_id: Mapped[str | None] = mapped_column(
        String(255), unique=True, nullable=True
    )
    stripe_price_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
    tier: Mapped[str] = mapped_column(
        String(20), nullable=False, default="free"
    )  # free | pro
    status: Mapped[str] = mapped_column(
get_templates_by_category function · python · L1412-L1424 (13 LOC)
app/prompts/diagnosis_templates.py
def get_templates_by_category(category: str | None = None) -> dict[str, list[str]] | list[str]:
    """
    Return templates grouped by category.
    If category is specified, return list of template keys for that category.
    If category is None, return {category: [template_key, ...]} for all.
    """
    index: dict[str, list[str]] = {}
    for key, template in DIAGNOSIS_TEMPLATES.items():
        cat = template.get("category", "uncategorized")
        index.setdefault(cat, []).append(key)
    if category is not None:
        return index.get(category.lower(), [])
    return index
get_template function · python · L1427-L1429 (3 LOC)
app/prompts/diagnosis_templates.py
def get_template(problem_key: str) -> dict[str, str] | None:
    """Lookup a single template by key (case-insensitive)."""
    return DIAGNOSIS_TEMPLATES.get(problem_key.lower())
get_system_prompt function · python · L310-L314 (5 LOC)
app/prompts/hospitalist_prompts.py
def get_system_prompt(language: str = "en") -> str:
    """Return the system prompt for the given language."""
    if language == "es":
        return SYSTEM_PROMPT_ES
    return SYSTEM_PROMPT
Repobility · MCP-ready · https://repobility.com
_render_patient_state function · python · L644-L705 (62 LOC)
app/prompts/hospitalist_prompts.py
def _render_patient_state(patient_state: dict) -> str:
    """
    Render the longitudinal patient state object into prompt context.
    Supports the full Production Brain state model (Section 3):
    identity/encounter, problem list, diagnostics, therapies, consults,
    transition state, disposition barriers.
    """
    parts: list[str] = []

    # Identity and encounter context
    identity_fields = [
        ("encounter_id", "Encounter"),
        ("attending", "Attending"),
        ("service", "Service"),
        ("note_author", "Note author"),
        ("admission_date", "Admission date"),
        ("admission_source", "Admission source"),
        ("code_status", "Code status"),
        ("disposition_target", "Disposition target"),
    ]
    identity = []
    for key, label in identity_fields:
        val = patient_state.get(key)
        if val:
            identity.append(f"{label}: {val}")
    if identity:
        parts.append("\n".join(identity))

    # Active problem list
    p
build_prompt function · python · L708-L795 (88 LOC)
app/prompts/hospitalist_prompts.py
def build_prompt(
    input_text: str,
    note_type: str = "progress_note",
    specialty: str = "internal_medicine",
    active_problems: list[str] | None = None,
    style_compression: str = "high",
    patient_state: dict | None = None,
    language: str = "en",
) -> str:
    """
    Build the user-facing prompt for note generation.
    The system prompt (SYSTEM_PROMPT) is sent separately as the system message.

    Args:
        input_text: Raw dictation, transcript, or clinical summary from physician
        note_type: One of admission_hp / ed_consult_admission / progress_note /
                   event_note / consult_note / discharge_summary
        specialty: Clinical specialty context (default: internal_medicine)
        active_problems: List of active problem keys to load templates for
        style_compression: high (terse) / medium (standard) / low (detailed)
        patient_state: Optional dict with encounter context (see Production Brain Section 3)

    Returns:
        F
get_discharge_prompt function · python · L798-L824 (27 LOC)
app/prompts/hospitalist_prompts.py
def get_discharge_prompt(
    input_text: str,
    active_problems: list[str] | None = None,
    patient_state: dict | None = None,
) -> str:
    """
    Convenience wrapper for discharge summary generation.
    Enforces the discharge gold standard checklist.
    """
    checklist_str = "\n".join(f"- {item}" for item in DISCHARGE_CHECKLIST)

    base_prompt = build_prompt(
        input_text=input_text,
        note_type="discharge_summary",
        active_problems=active_problems,
        style_compression="medium",
        patient_state=patient_state,
    )

    return base_prompt + f"""

DISCHARGE GOLD STANDARD — YOUR OUTPUT MUST SATISFY ALL OF THESE:
{checklist_str}

If any item cannot be satisfied from the provided clinical input,
state explicitly: "[Item] — not available from provided information"
Do not omit. Do not fabricate."""
reconstruct_dictation function · python · L827-L836 (10 LOC)
app/prompts/hospitalist_prompts.py
def reconstruct_dictation(raw_input: str) -> str:
    """
    Pre-process raw shorthand dictation into expanded clinical language
    before passing to the main prompt builder.
    """
    expanded = raw_input
    for shorthand, expansion in VOICE_SHORTCUTS.items():
        pattern = r'\b' + re.escape(shorthand) + r'\b'
        expanded = re.sub(pattern, expansion, expanded, flags=re.IGNORECASE)
    return expanded
build_prompt function · python · L1567-L1648 (82 LOC)
app/prompts/soap_hospitalist_3.py
def build_prompt(
    input_text: str,
    note_type: str = "progress_note",
    specialty: str = "internal_medicine",
    active_problems: Optional[list[str]] = None,
    style_compression: str = "high",
    patient_state: Optional[dict] = None,
) -> str:
    """
    Build the user-facing prompt for note generation.
    Send SYSTEM_PROMPT separately as the OpenAI system message.

    Args:
        input_text: Raw dictation, transcript, or clinical summary
        note_type: admission_hp / progress_note / event_note /
                   consult_note / discharge_summary
        specialty: Clinical specialty (default: internal_medicine)
        active_problems: List of diagnosis template keys to load
        style_compression: high / medium / low
        patient_state: Optional encounter context dict

    Returns:
        Formatted prompt string for OpenAI API call
    """

    note_instruction = NOTE_TYPE_INSTRUCTIONS.get(
        note_type,
        NOTE_TYPE_INSTRUCTIONS["progress_note
get_discharge_prompt function · python · L1651-L1676 (26 LOC)
app/prompts/soap_hospitalist_3.py
def get_discharge_prompt(
    input_text: str,
    active_problems: Optional[list[str]] = None,
    patient_state: Optional[dict] = None,
) -> str:
    """
    Discharge summary generation with gold standard checklist enforcement.
    """
    checklist_str = "\n".join(f"- {item}" for item in DISCHARGE_CHECKLIST)

    base = build_prompt(
        input_text=input_text,
        note_type="discharge_summary",
        active_problems=active_problems,
        style_compression="medium",
        patient_state=patient_state,
    )

    return (
        base
        + f"\n\nDISCHARGE GOLD STANDARD — YOUR OUTPUT MUST SATISFY ALL:\n"
        + checklist_str
        + "\n\nIf any item cannot be satisfied from provided input, state explicitly: "
        + "'[Item] — not available from provided information'. "
        + "Do not omit. Do not fabricate."
    )
reconstruct_dictation function · python · L1679-L1688 (10 LOC)
app/prompts/soap_hospitalist_3.py
def reconstruct_dictation(raw_input: str) -> str:
    """
    Expand compressed shorthand dictation into clinical language
    before passing to the main prompt builder.
    """
    expanded = raw_input
    for shorthand, expansion in VOICE_SHORTCUTS.items():
        pattern = r'\b' + re.escape(shorthand) + r'\b'
        expanded = re.sub(pattern, expansion, expanded, flags=re.IGNORECASE)
    return expanded
get_qa_prompt function · python · L1691-L1712 (22 LOC)
app/prompts/soap_hospitalist_3.py
def get_qa_prompt(draft_note: str) -> str:
    """
    QA review prompt. Pass a draft note for contradiction/omission check.
    """
    return (
        "QA MODE: Review the following clinical note draft.\n\n"
        "Check for:\n"
        "1. Contradictions (AKI marked resolved with rising creatinine, "
        "discharge stability claimed with unresolved barriers, etc.)\n"
        "2. Missing sections (pending results, medication changes, "
        "follow-up ownership)\n"
        "3. Unsupported statements (severity claims without clinical backing)\n"
        "4. Weak billing specificity (vague diagnoses, missing cause linkages, "
        "missing treatment intensity)\n"
        "5. Omitted status label (every problem must close with "
        "improving/stable/worsening/resolved)\n\n"
        "For each finding:\n"
        "- Flag the issue\n"
        "- Brief explanation\n"
        "- Auto-corrected version when possible\n\n"
        f"DRAFT NOTE:\n{draft_note}"
    )
Want this analysis on your repo? https://repobility.com/scan/
list_templates_by_system function · python · L1715-L1724 (10 LOC)
app/prompts/soap_hospitalist_3.py
def list_templates_by_system() -> dict[str, list[str]]:
    """
    Return diagnosis template keys organized by clinical system.
    Useful for UI problem selection menus.
    """
    by_system: dict[str, list[str]] = {}
    for key, template in DIAGNOSIS_TEMPLATES.items():
        system = template.get("system", "general")
        by_system.setdefault(system, []).append(key)
    return by_system
register function · python · L35-L43 (9 LOC)
app/routes/auth.py
async def register(
    request: Request,
    data: UserRegister,
    db: AsyncSession = Depends(get_db)
) -> UserOut:
    try:
        return await register_user(db, data)
    except AuthError as e:
        raise HTTPException(status_code=e.status_code, detail=e.message)
login function · python · L57-L66 (10 LOC)
app/routes/auth.py
async def login(
    request: Request,
    data: UserLogin,
    db: AsyncSession = Depends(get_db)
) -> Token:
    ip = request.client.host if request.client else None
    try:
        return await login_user(db, data, ip_address=ip)
    except AuthError as e:
        raise HTTPException(status_code=e.status_code, detail=e.message)
update_me function · python · L90-L133 (44 LOC)
app/routes/auth.py
async def update_me(
    request: Request,
    data: UserUpdate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
) -> UserOut:
    ip = request.client.host if request.client else None
    profile_changed = False

    # Password change requires current_password
    if data.new_password:
        if not data.current_password:
            raise HTTPException(status_code=400, detail="Current password is required to set a new password")
        if not verify_password(data.current_password, current_user.password_hash):
            raise HTTPException(status_code=400, detail="Current password is incorrect")
        current_user.password_hash = hash_password(data.new_password)
        await write_audit_event(
            db,
            action="password_changed",
            user_id=current_user.id,
            resource_type="user",
            resource_id=current_user.id,
            ip_address=ip,
        )

    if data.full_name is not None and dat
logout function · python · L142-L167 (26 LOC)
app/routes/auth.py
async def logout(
    creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
) -> dict:
    if creds is None:
        raise HTTPException(status_code=401, detail="Not authenticated")
    try:
        payload = decode_access_token(creds.credentials)
    except Exception:
        raise HTTPException(status_code=401, detail="Invalid or expired token")

    jti = payload.get("jti")
    exp = payload.get("exp")
    if not jti or not exp:
        raise HTTPException(status_code=401, detail="Invalid token payload")

    # Calculate remaining TTL
    remaining = int(exp - datetime.now(timezone.utc).timestamp())
    if remaining > 0:
        try:
            await blacklist_token(jti, remaining)
        except Exception:
            # Redis unavailable — token won't be blacklisted but logout
            # still succeeds. In production, Redis must be available.
            pass

    return {"detail": "Successfully logged out"}
de_identify function · python · L38-L101 (64 LOC)
app/routes/deid.py
async def de_identify(
    request: DeIdRequest,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
) -> DeIdResult:
    """
    De-identify clinical text by detecting and redacting PHI.

    This endpoint:
    1. Analyzes the input text for common PHI patterns
    2. Redacts all detected PHI with [REDACTED_TYPE] markers
    3. Returns the redacted text and a list of detected entities
    4. Emits an audit event with entity count (not actual values)

    The redaction is conservative — it may flag non-PHI sequences if they match
    common patterns. Physicians must manually verify the redacted output.
    """

    # Run de-identification (service handles all PHI detection)
    result = redact_phi(request.text)

    # Convert entities to response model
    entities_out = [
        PhiEntityOut(
            type=ent["type"],
            original=ent["original"],
            start=ent["start"],
            end=ent["end"],
        )
        for ent 
health function · python · L21-L51 (31 LOC)
app/routes/health.py
async def health() -> dict:
    checks = {"status": "ok", "version": "1.0.0-beta"}
    # Postgres
    try:
        async with AsyncSessionLocal() as session:
            await session.execute(text("SELECT 1"))
        checks["postgres"] = "ok"
    except Exception as e:
        checks["postgres"] = "error"
        checks["status"] = "degraded"
        log.error("health_check_postgres_failed", error_type=type(e).__name__)
    # Redis
    try:
        r = aioredis.from_url(settings.redis_url)
        await r.ping()
        await r.aclose()
        checks["redis"] = "ok"
    except Exception as e:
        checks["redis"] = "error"
        checks["status"] = "degraded"
        log.error("health_check_redis_failed", error_type=type(e).__name__)
    # OpenAI (non-blocking — if it fails, status stays ok but openai_status is unavailable)
    try:
        import openai
        client = openai.AsyncOpenAI(api_key=settings.openai_api_key)
        await asyncio.wait_for(client.models.list(), timeo
create_session function · python · L51-L56 (6 LOC)
app/routes/notes.py
async def create_session(
    data: NoteSessionCreate,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
) -> NoteSessionOut:
    return await create_note_session(db, current_user.id, data)
Repobility · code-quality intelligence · https://repobility.com
list_sessions_endpoint function · python · L64-L68 (5 LOC)
app/routes/notes.py
async def list_sessions_endpoint(
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
) -> list[NoteSessionOut]:
    return await list_sessions(db, current_user.id)
get_session_endpoint function · python · L77-L85 (9 LOC)
app/routes/notes.py
async def get_session_endpoint(
    session_id: UUID,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(get_current_user),
) -> NoteSessionDetailOut:
    try:
        return await get_session(db, session_id, current_user.id)
    except NoteError as e:
        raise HTTPException(status_code=e.status_code, detail=e.message)
generate function · python · L100-L120 (21 LOC)
app/routes/notes.py
async def generate(
    request: NoteGenerateRequest,
    db: AsyncSession = Depends(get_db),
    current_user: User = Depends(check_note_quota),
) -> NoteGenerateResponse:
    try:
        return await generate_note(db, current_user.id, request)
    except NoteError as e:
        raise HTTPException(status_code=e.status_code, detail=e.message)
    except openai.APITimeoutError:
        log.error("openai_timeout_generate", user_id=str(current_user.id))
        raise HTTPException(
            status_code=504,
            detail="AI service timed out. Please retry.",
        )
    except openai.APIConnectionError:
        log.error("openai_connection_error_generate", user_id=str(current_user.id))
        raise HTTPException(
            status_code=502,
            detail="AI service unavailable. Please retry.",
        )
page 1 / 5next ›