Function bodies 223 total
run_migrations_offline function · python · L25-L34 (10 LOC)alembic/env.py
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()do_run_migrations function · python · L37-L40 (4 LOC)alembic/env.py
def do_run_migrations(connection: Connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()run_async_migrations function · python · L43-L51 (9 LOC)alembic/env.py
async def run_async_migrations() -> None:
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()upgrade function · python · L21-L148 (128 LOC)alembic/versions/001_initial_schema.py
def upgrade() -> None:
# --- users ---
op.create_table(
"users",
sa.Column("id", sa.Uuid(), primary_key=True),
sa.Column("email", sa.String(255), nullable=False),
sa.Column("username", sa.String(100), nullable=False),
sa.Column("password_hash", sa.String(255), nullable=False),
sa.Column("full_name", sa.String(255), nullable=False),
sa.Column("specialty", sa.String(100), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
sa.Column("last_login", sa.DateTime(timezone=True), nullable=True),
)
op.create_index("ix_users_email", "users", ["email"], unique=True)downgrade function · python · L151-L156 (6 LOC)alembic/versions/001_initial_schema.py
def downgrade() -> None:
op.drop_table("usage_records")
op.drop_table("audit_events")
op.drop_table("note_drafts")
op.drop_table("note_sessions")
op.drop_table("users")upgrade function · python · L21-L75 (55 LOC)alembic/versions/002_add_subscriptions_and_stripe.py
def upgrade() -> None:
# --- Add stripe_customer_id to users ---
op.add_column(
"users",
sa.Column("stripe_customer_id", sa.String(255), nullable=True),
)
op.create_index(
"ix_users_stripe_customer_id",
"users",
["stripe_customer_id"],
unique=True,
)
# --- subscriptions ---
op.create_table(
"subscriptions",
sa.Column("id", sa.Uuid(), primary_key=True),
sa.Column(
"user_id",
sa.Uuid(),
sa.ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("stripe_subscription_id", sa.String(255), nullable=True),
sa.Column("stripe_price_id", sa.String(255), nullable=True),
sa.Column("tier", sa.String(20), nullable=False, server_default="free"),
sa.Column("status", sa.String(30), nullable=False, server_default="active"),
sa.Column("current_period_start", sa.DateTime(timezone=True), nullabledowngrade function · python · L78-L81 (4 LOC)alembic/versions/002_add_subscriptions_and_stripe.py
def downgrade() -> None:
op.drop_table("subscriptions")
op.drop_index("ix_users_stripe_customer_id", table_name="users")
op.drop_column("users", "stripe_customer_id")Want this analysis on your repo? https://repobility.com/scan/
Settings class · python · L8-L81 (74 LOC)app/config.py
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=False,
)
# App
app_env: str = "development"
allowed_origins: str = "http://localhost:3000,http://localhost:8000"
# Database
database_url: str = "postgresql+asyncpg://medscribe:medscribe@localhost:5432/medscribe"
# Redis
redis_url: str = "redis://localhost:6379/0"
# Auth
jwt_secret: str
jwt_expiry_minutes: int = 480 # 8 hours — typical hospital shift
refresh_token_expiry_days: int = 7
# OpenAI
openai_api_key: str
openai_model: str = "gpt-4o"
openai_max_tokens: int = 2000
openai_temperature: float = 0.3
# Transcription
whisper_model_size: str = "base"
# Stripe
stripe_secret_key: str = ""
stripe_webhook_secret: str = ""
stripe_free_tier_note_limit: int = 10
note_daily_limit: int = 50
# Observability
log_level: str = "INFO"
svalidate_openai_api_key method · python · L51-L56 (6 LOC)app/config.py
def validate_openai_api_key(cls, v: str) -> str:
if not v.startswith("sk-"):
raise ValueError("OpenAI API key must start with 'sk-'")
if len(v) < 20:
raise ValueError("OpenAI API key must be at least 20 characters")
return vvalidate_production_config method · python · L59-L73 (15 LOC)app/config.py
def validate_production_config(self) -> "Settings":
if self.is_production:
if len(self.jwt_secret) < 32:
raise ValueError(
"In production, jwt_secret must be at least 32 characters"
)
if not self.stripe_secret_key:
raise ValueError(
"In production, stripe_secret_key must not be empty"
)
if not self.stripe_webhook_secret:
raise ValueError(
"In production, stripe_webhook_secret must not be empty"
)
return selfget_db function · python · L30-L37 (8 LOC)app/database.py
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseget_current_user function · python · L22-L61 (40 LOC)app/dependencies.py
async def get_current_user(
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
db: AsyncSession = Depends(get_db),
) -> User:
if creds is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
try:
payload = decode_access_token(creds.credentials)
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired")
except (jwt.InvalidTokenError, jwt.DecodeError, KeyError, ValueError):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token")
# Check token blacklist (Redis). If Redis is unavailable, log and proceed.
jti = payload.get("jti")
if jti:
try:
from app.utils.redis import is_token_blacklisted
if await is_token_blacklisted(jti):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has been revokedcheck_note_quota function · python · L64-L132 (69 LOC)app/dependencies.py
async def check_note_quota(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> User:
"""Enforce per-user daily note generation limit with subscription-aware caps.
IMPORTANT: This dependency checks and **increments** the quota atomically
before the endpoint handler runs. This prevents race conditions where multiple
concurrent requests pass the check before any of them increment.
1. Looks up the user's subscription tier (free / pro).
2. Checks whether the subscription is active.
3. Applies the tier's daily note limit.
4. Uses SELECT ... FOR UPDATE to lock the usage record.
5. **Increments the counter atomically** before returning (prerequisite for the request).
6. Falls back to regular SELECT on SQLite (tests).
The quota check/increment is now atomic and happens BEFORE the OpenAI call.
If the API call fails after this, the quota was still consumed (by design).
"""
# ── Subscription check ────lifespan function · python · L28-L38 (11 LOC)app/main.py
async def lifespan(app: FastAPI):
configure_logging(log_level=settings.log_level, is_production=settings.is_production)
log.info("startup", env=settings.app_env)
# Tables created via Alembic in production.
# In development/test, create if not exists for convenience.
if not settings.is_production:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
await engine.dispose()
log.info("shutdown")validation_error_handler function · python · L108-L122 (15 LOC)app/main.py
async def validation_error_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
"""Return human-readable validation messages instead of raw Pydantic internals."""
messages = []
for err in exc.errors():
field = " -> ".join(str(loc) for loc in err.get("loc", []) if loc != "body")
msg = err.get("msg", "Invalid value")
# Strip Pydantic prefix like "Value error, " for cleaner output
if msg.startswith("Value error, "):
msg = msg[len("Value error, "):]
if field:
messages.append(f"{field}: {msg}")
else:
messages.append(msg)
detail = "; ".join(messages) if messages else "Invalid request data."
return JSONResponse(status_code=422, content={"detail": detail})Repobility · code-quality intelligence · https://repobility.com
openai_auth_handler function · python · L126-L131 (6 LOC)app/main.py
async def openai_auth_handler(request: Request, exc: openai.AuthenticationError) -> JSONResponse:
log.error("openai_authentication_error", path=request.url.path)
return JSONResponse(
status_code=503,
content={"detail": "AI service configuration error. Please contact support."},
)openai_rate_limit_handler function · python · L135-L140 (6 LOC)app/main.py
async def openai_rate_limit_handler(request: Request, exc: openai.RateLimitError) -> JSONResponse:
log.warning("openai_rate_limit", path=request.url.path)
return JSONResponse(
status_code=503,
content={"detail": "AI service temporarily unavailable. Please try again in a few minutes."},
)openai_timeout_handler function · python · L144-L146 (3 LOC)app/main.py
async def openai_timeout_handler(request: Request, exc: openai.APITimeoutError) -> JSONResponse:
log.error("openai_timeout", path=request.url.path)
return JSONResponse(status_code=504, content={"detail": "AI service timed out. Please retry."})openai_connection_handler function · python · L150-L152 (3 LOC)app/main.py
async def openai_connection_handler(request: Request, exc: openai.APIConnectionError) -> JSONResponse:
log.error("openai_connection_error", path=request.url.path)
return JSONResponse(status_code=503, content={"detail": "AI service unavailable. Please retry."})generic_error_handler function · python · L156-L166 (11 LOC)app/main.py
async def generic_error_handler(request: Request, exc: Exception) -> JSONResponse:
log.error(
"unhandled_exception",
path=request.url.path,
error_type=type(exc).__name__,
error_message=str(exc),
)
return JSONResponse(
status_code=500,
content={"detail": "An unexpected error occurred. Please try again."},
)RequestIDMiddleware class · python · L15-L58 (44 LOC)app/middleware/request_id.py
class RequestIDMiddleware(BaseHTTPMiddleware):
"""Assigns a unique request_id to every request and binds it to structlog context.
- Accepts an incoming ``X-Request-ID`` header for distributed tracing.
- Falls back to a generated UUID4 if the header is absent.
- Adds ``X-Request-ID`` to the response headers.
- Logs request start and completion with method, path, status, and duration.
- Does NOT log request body, query params, or any content that could contain PHI.
"""
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
# Accept caller-provided request ID or generate one
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
# Bind to structlog context so every log call in this request includes it
clear_contextvars()
bind_contextvars(request_id=request_id)
start = time.monotonic()
log.info("request_started", method=request.method, path=requestdispatch method · python · L25-L58 (34 LOC)app/middleware/request_id.py
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
# Accept caller-provided request ID or generate one
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
# Bind to structlog context so every log call in this request includes it
clear_contextvars()
bind_contextvars(request_id=request_id)
start = time.monotonic()
log.info("request_started", method=request.method, path=request.url.path)
try:
response = await call_next(request)
except Exception:
duration_ms = round((time.monotonic() - start) * 1000, 1)
log.error(
"request_failed",
method=request.method,
path=request.url.path,
duration_ms=duration_ms,
)
raise
else:
duration_ms = round((time.monotonic() - start) * 1000, 1)
log.info(
"reUser class · python · L27-L51 (25 LOC)app/models/db.py
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = uuid_pk()
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
username: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
full_name: Mapped[str] = mapped_column(String(255), nullable=False, default="")
specialty: Mapped[str] = mapped_column(String(100), nullable=False, default="internal_medicine")
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = now_utc()
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)
last_login: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
stripe_customer_id: Mapped[str | None] = mapped_column(StrinSource: Repobility analyzer · https://repobility.com
NoteSession class · python · L54-L84 (31 LOC)app/models/db.py
class NoteSession(Base):
__tablename__ = "note_sessions"
id: Mapped[uuid.UUID] = uuid_pk()
user_id: Mapped[uuid.UUID] = mapped_column(
Uuid, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
note_type: Mapped[str] = mapped_column(
String(50), nullable=False, default="soap_progress"
) # soap_admission | soap_progress | discharge_summary
status: Mapped[str] = mapped_column(
String(20), nullable=False, default="draft"
) # draft | complete
patient_context: Mapped[str | None] = mapped_column(Text, nullable=True)
language: Mapped[str] = mapped_column(String(5), nullable=False, default="en")
created_at: Mapped[datetime] = now_utc()
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)
deleted_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullabNoteDraft class · python · L87-L111 (25 LOC)app/models/db.py
class NoteDraft(Base):
__tablename__ = "note_drafts"
id: Mapped[uuid.UUID] = uuid_pk()
session_id: Mapped[uuid.UUID] = mapped_column(
Uuid, ForeignKey("note_sessions.id", ondelete="CASCADE"), nullable=False, index=True
)
version: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
input_text: Mapped[str] = mapped_column(Text, nullable=False)
output_text: Mapped[str | None] = mapped_column(Text, nullable=True)
structured_payload: Mapped[dict | None] = mapped_column(
"structured_payload", JSON, nullable=True
)
schema_version: Mapped[str | None] = mapped_column(String(20), nullable=True)
model_used: Mapped[str | None] = mapped_column(String(100), nullable=True)
prompt_version: Mapped[str | None] = mapped_column(String(50), nullable=True)
tokens_used: Mapped[int | None] = mapped_column(Integer, nullable=True)
generation_time_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
created_at: MapAuditEvent class · python · L114-L131 (18 LOC)app/models/db.py
class AuditEvent(Base):
__tablename__ = "audit_events"
id: Mapped[uuid.UUID] = uuid_pk()
user_id: Mapped[uuid.UUID | None] = mapped_column(
Uuid, ForeignKey("users.id", ondelete="SET NULL"), nullable=True, index=True
)
action: Mapped[str] = mapped_column(
String(100), nullable=False, index=True
) # note_generated | login | logout | export | register
resource_type: Mapped[str | None] = mapped_column(String(50), nullable=True)
resource_id: Mapped[uuid.UUID | None] = mapped_column(Uuid, nullable=True)
metadata_: Mapped[dict | None] = mapped_column("metadata", JSON, nullable=True)
ip_address: Mapped[str | None] = mapped_column(String(45), nullable=True)
created_at: Mapped[datetime] = now_utc()
# Relationships
user: Mapped["User | None"] = relationship(back_populates="audit_events")UsageRecord class · python · L134-L146 (13 LOC)app/models/db.py
class UsageRecord(Base):
__tablename__ = "usage_records"
id: Mapped[uuid.UUID] = uuid_pk()
user_id: Mapped[uuid.UUID] = mapped_column(
Uuid, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
period_start: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, index=True)
notes_generated: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
tokens_consumed: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
# Relationships
user: Mapped["User"] = relationship(back_populates="usage_records")Subscription class · python · L149-L193 (45 LOC)app/models/db.py
class Subscription(Base):
"""
Stripe subscription record — synced via webhook events.
Tiers:
- free: default on registration, capped at stripe_free_tier_note_limit/day
- pro: paid monthly, full daily limit
- (future: team, enterprise)
Status values mirror Stripe's subscription.status:
active, past_due, canceled, trialing, unpaid, incomplete, incomplete_expired, paused
"""
__tablename__ = "subscriptions"
id: Mapped[uuid.UUID] = uuid_pk()
user_id: Mapped[uuid.UUID] = mapped_column(
Uuid, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, unique=True, index=True
)
stripe_subscription_id: Mapped[str | None] = mapped_column(
String(255), unique=True, nullable=True
)
stripe_price_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
tier: Mapped[str] = mapped_column(
String(20), nullable=False, default="free"
) # free | pro
status: Mapped[str] = mapped_column(get_templates_by_category function · python · L1412-L1424 (13 LOC)app/prompts/diagnosis_templates.py
def get_templates_by_category(category: str | None = None) -> dict[str, list[str]] | list[str]:
"""
Return templates grouped by category.
If category is specified, return list of template keys for that category.
If category is None, return {category: [template_key, ...]} for all.
"""
index: dict[str, list[str]] = {}
for key, template in DIAGNOSIS_TEMPLATES.items():
cat = template.get("category", "uncategorized")
index.setdefault(cat, []).append(key)
if category is not None:
return index.get(category.lower(), [])
return indexget_template function · python · L1427-L1429 (3 LOC)app/prompts/diagnosis_templates.py
def get_template(problem_key: str) -> dict[str, str] | None:
"""Lookup a single template by key (case-insensitive)."""
return DIAGNOSIS_TEMPLATES.get(problem_key.lower())get_system_prompt function · python · L310-L314 (5 LOC)app/prompts/hospitalist_prompts.py
def get_system_prompt(language: str = "en") -> str:
"""Return the system prompt for the given language."""
if language == "es":
return SYSTEM_PROMPT_ES
return SYSTEM_PROMPTRepobility · MCP-ready · https://repobility.com
_render_patient_state function · python · L644-L705 (62 LOC)app/prompts/hospitalist_prompts.py
def _render_patient_state(patient_state: dict) -> str:
"""
Render the longitudinal patient state object into prompt context.
Supports the full Production Brain state model (Section 3):
identity/encounter, problem list, diagnostics, therapies, consults,
transition state, disposition barriers.
"""
parts: list[str] = []
# Identity and encounter context
identity_fields = [
("encounter_id", "Encounter"),
("attending", "Attending"),
("service", "Service"),
("note_author", "Note author"),
("admission_date", "Admission date"),
("admission_source", "Admission source"),
("code_status", "Code status"),
("disposition_target", "Disposition target"),
]
identity = []
for key, label in identity_fields:
val = patient_state.get(key)
if val:
identity.append(f"{label}: {val}")
if identity:
parts.append("\n".join(identity))
# Active problem list
pbuild_prompt function · python · L708-L795 (88 LOC)app/prompts/hospitalist_prompts.py
def build_prompt(
input_text: str,
note_type: str = "progress_note",
specialty: str = "internal_medicine",
active_problems: list[str] | None = None,
style_compression: str = "high",
patient_state: dict | None = None,
language: str = "en",
) -> str:
"""
Build the user-facing prompt for note generation.
The system prompt (SYSTEM_PROMPT) is sent separately as the system message.
Args:
input_text: Raw dictation, transcript, or clinical summary from physician
note_type: One of admission_hp / ed_consult_admission / progress_note /
event_note / consult_note / discharge_summary
specialty: Clinical specialty context (default: internal_medicine)
active_problems: List of active problem keys to load templates for
style_compression: high (terse) / medium (standard) / low (detailed)
patient_state: Optional dict with encounter context (see Production Brain Section 3)
Returns:
Fget_discharge_prompt function · python · L798-L824 (27 LOC)app/prompts/hospitalist_prompts.py
def get_discharge_prompt(
input_text: str,
active_problems: list[str] | None = None,
patient_state: dict | None = None,
) -> str:
"""
Convenience wrapper for discharge summary generation.
Enforces the discharge gold standard checklist.
"""
checklist_str = "\n".join(f"- {item}" for item in DISCHARGE_CHECKLIST)
base_prompt = build_prompt(
input_text=input_text,
note_type="discharge_summary",
active_problems=active_problems,
style_compression="medium",
patient_state=patient_state,
)
return base_prompt + f"""
DISCHARGE GOLD STANDARD — YOUR OUTPUT MUST SATISFY ALL OF THESE:
{checklist_str}
If any item cannot be satisfied from the provided clinical input,
state explicitly: "[Item] — not available from provided information"
Do not omit. Do not fabricate."""reconstruct_dictation function · python · L827-L836 (10 LOC)app/prompts/hospitalist_prompts.py
def reconstruct_dictation(raw_input: str) -> str:
"""
Pre-process raw shorthand dictation into expanded clinical language
before passing to the main prompt builder.
"""
expanded = raw_input
for shorthand, expansion in VOICE_SHORTCUTS.items():
pattern = r'\b' + re.escape(shorthand) + r'\b'
expanded = re.sub(pattern, expansion, expanded, flags=re.IGNORECASE)
return expandedbuild_prompt function · python · L1567-L1648 (82 LOC)app/prompts/soap_hospitalist_3.py
def build_prompt(
input_text: str,
note_type: str = "progress_note",
specialty: str = "internal_medicine",
active_problems: Optional[list[str]] = None,
style_compression: str = "high",
patient_state: Optional[dict] = None,
) -> str:
"""
Build the user-facing prompt for note generation.
Send SYSTEM_PROMPT separately as the OpenAI system message.
Args:
input_text: Raw dictation, transcript, or clinical summary
note_type: admission_hp / progress_note / event_note /
consult_note / discharge_summary
specialty: Clinical specialty (default: internal_medicine)
active_problems: List of diagnosis template keys to load
style_compression: high / medium / low
patient_state: Optional encounter context dict
Returns:
Formatted prompt string for OpenAI API call
"""
note_instruction = NOTE_TYPE_INSTRUCTIONS.get(
note_type,
NOTE_TYPE_INSTRUCTIONS["progress_noteget_discharge_prompt function · python · L1651-L1676 (26 LOC)app/prompts/soap_hospitalist_3.py
def get_discharge_prompt(
input_text: str,
active_problems: Optional[list[str]] = None,
patient_state: Optional[dict] = None,
) -> str:
"""
Discharge summary generation with gold standard checklist enforcement.
"""
checklist_str = "\n".join(f"- {item}" for item in DISCHARGE_CHECKLIST)
base = build_prompt(
input_text=input_text,
note_type="discharge_summary",
active_problems=active_problems,
style_compression="medium",
patient_state=patient_state,
)
return (
base
+ f"\n\nDISCHARGE GOLD STANDARD — YOUR OUTPUT MUST SATISFY ALL:\n"
+ checklist_str
+ "\n\nIf any item cannot be satisfied from provided input, state explicitly: "
+ "'[Item] — not available from provided information'. "
+ "Do not omit. Do not fabricate."
)reconstruct_dictation function · python · L1679-L1688 (10 LOC)app/prompts/soap_hospitalist_3.py
def reconstruct_dictation(raw_input: str) -> str:
"""
Expand compressed shorthand dictation into clinical language
before passing to the main prompt builder.
"""
expanded = raw_input
for shorthand, expansion in VOICE_SHORTCUTS.items():
pattern = r'\b' + re.escape(shorthand) + r'\b'
expanded = re.sub(pattern, expansion, expanded, flags=re.IGNORECASE)
return expandedget_qa_prompt function · python · L1691-L1712 (22 LOC)app/prompts/soap_hospitalist_3.py
def get_qa_prompt(draft_note: str) -> str:
"""
QA review prompt. Pass a draft note for contradiction/omission check.
"""
return (
"QA MODE: Review the following clinical note draft.\n\n"
"Check for:\n"
"1. Contradictions (AKI marked resolved with rising creatinine, "
"discharge stability claimed with unresolved barriers, etc.)\n"
"2. Missing sections (pending results, medication changes, "
"follow-up ownership)\n"
"3. Unsupported statements (severity claims without clinical backing)\n"
"4. Weak billing specificity (vague diagnoses, missing cause linkages, "
"missing treatment intensity)\n"
"5. Omitted status label (every problem must close with "
"improving/stable/worsening/resolved)\n\n"
"For each finding:\n"
"- Flag the issue\n"
"- Brief explanation\n"
"- Auto-corrected version when possible\n\n"
f"DRAFT NOTE:\n{draft_note}"
)Want this analysis on your repo? https://repobility.com/scan/
list_templates_by_system function · python · L1715-L1724 (10 LOC)app/prompts/soap_hospitalist_3.py
def list_templates_by_system() -> dict[str, list[str]]:
"""
Return diagnosis template keys organized by clinical system.
Useful for UI problem selection menus.
"""
by_system: dict[str, list[str]] = {}
for key, template in DIAGNOSIS_TEMPLATES.items():
system = template.get("system", "general")
by_system.setdefault(system, []).append(key)
return by_systemregister function · python · L35-L43 (9 LOC)app/routes/auth.py
async def register(
request: Request,
data: UserRegister,
db: AsyncSession = Depends(get_db)
) -> UserOut:
try:
return await register_user(db, data)
except AuthError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)login function · python · L57-L66 (10 LOC)app/routes/auth.py
async def login(
request: Request,
data: UserLogin,
db: AsyncSession = Depends(get_db)
) -> Token:
ip = request.client.host if request.client else None
try:
return await login_user(db, data, ip_address=ip)
except AuthError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)update_me function · python · L90-L133 (44 LOC)app/routes/auth.py
async def update_me(
request: Request,
data: UserUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> UserOut:
ip = request.client.host if request.client else None
profile_changed = False
# Password change requires current_password
if data.new_password:
if not data.current_password:
raise HTTPException(status_code=400, detail="Current password is required to set a new password")
if not verify_password(data.current_password, current_user.password_hash):
raise HTTPException(status_code=400, detail="Current password is incorrect")
current_user.password_hash = hash_password(data.new_password)
await write_audit_event(
db,
action="password_changed",
user_id=current_user.id,
resource_type="user",
resource_id=current_user.id,
ip_address=ip,
)
if data.full_name is not None and datlogout function · python · L142-L167 (26 LOC)app/routes/auth.py
async def logout(
creds: HTTPAuthorizationCredentials | None = Depends(_bearer),
) -> dict:
if creds is None:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
payload = decode_access_token(creds.credentials)
except Exception:
raise HTTPException(status_code=401, detail="Invalid or expired token")
jti = payload.get("jti")
exp = payload.get("exp")
if not jti or not exp:
raise HTTPException(status_code=401, detail="Invalid token payload")
# Calculate remaining TTL
remaining = int(exp - datetime.now(timezone.utc).timestamp())
if remaining > 0:
try:
await blacklist_token(jti, remaining)
except Exception:
# Redis unavailable — token won't be blacklisted but logout
# still succeeds. In production, Redis must be available.
pass
return {"detail": "Successfully logged out"}de_identify function · python · L38-L101 (64 LOC)app/routes/deid.py
async def de_identify(
request: DeIdRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> DeIdResult:
"""
De-identify clinical text by detecting and redacting PHI.
This endpoint:
1. Analyzes the input text for common PHI patterns
2. Redacts all detected PHI with [REDACTED_TYPE] markers
3. Returns the redacted text and a list of detected entities
4. Emits an audit event with entity count (not actual values)
The redaction is conservative — it may flag non-PHI sequences if they match
common patterns. Physicians must manually verify the redacted output.
"""
# Run de-identification (service handles all PHI detection)
result = redact_phi(request.text)
# Convert entities to response model
entities_out = [
PhiEntityOut(
type=ent["type"],
original=ent["original"],
start=ent["start"],
end=ent["end"],
)
for ent health function · python · L21-L51 (31 LOC)app/routes/health.py
async def health() -> dict:
checks = {"status": "ok", "version": "1.0.0-beta"}
# Postgres
try:
async with AsyncSessionLocal() as session:
await session.execute(text("SELECT 1"))
checks["postgres"] = "ok"
except Exception as e:
checks["postgres"] = "error"
checks["status"] = "degraded"
log.error("health_check_postgres_failed", error_type=type(e).__name__)
# Redis
try:
r = aioredis.from_url(settings.redis_url)
await r.ping()
await r.aclose()
checks["redis"] = "ok"
except Exception as e:
checks["redis"] = "error"
checks["status"] = "degraded"
log.error("health_check_redis_failed", error_type=type(e).__name__)
# OpenAI (non-blocking — if it fails, status stays ok but openai_status is unavailable)
try:
import openai
client = openai.AsyncOpenAI(api_key=settings.openai_api_key)
await asyncio.wait_for(client.models.list(), timeocreate_session function · python · L51-L56 (6 LOC)app/routes/notes.py
async def create_session(
data: NoteSessionCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> NoteSessionOut:
return await create_note_session(db, current_user.id, data)Repobility · code-quality intelligence · https://repobility.com
list_sessions_endpoint function · python · L64-L68 (5 LOC)app/routes/notes.py
async def list_sessions_endpoint(
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> list[NoteSessionOut]:
return await list_sessions(db, current_user.id)get_session_endpoint function · python · L77-L85 (9 LOC)app/routes/notes.py
async def get_session_endpoint(
session_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> NoteSessionDetailOut:
try:
return await get_session(db, session_id, current_user.id)
except NoteError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)generate function · python · L100-L120 (21 LOC)app/routes/notes.py
async def generate(
request: NoteGenerateRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(check_note_quota),
) -> NoteGenerateResponse:
try:
return await generate_note(db, current_user.id, request)
except NoteError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)
except openai.APITimeoutError:
log.error("openai_timeout_generate", user_id=str(current_user.id))
raise HTTPException(
status_code=504,
detail="AI service timed out. Please retry.",
)
except openai.APIConnectionError:
log.error("openai_connection_error_generate", user_id=str(current_user.id))
raise HTTPException(
status_code=502,
detail="AI service unavailable. Please retry.",
)page 1 / 5next ›