Function bodies 223 total
get_draft_endpoint function · python · L129-L137 (9 LOC)app/routes/notes.py
async def get_draft_endpoint(
draft_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> NoteDraftOut:
try:
return await get_draft(db, draft_id, current_user.id)
except NoteError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)export_draft_endpoint function · python · L149-L199 (51 LOC)app/routes/notes.py
async def export_draft_endpoint(
draft_id: UUID,
format: str = Query("text"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if format == "pdf":
try:
pdf_bytes, filename = await export_draft_pdf(db, draft_id, current_user.id)
except NoteError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)
if not pdf_bytes:
raise HTTPException(
status_code=400,
detail="Cannot export empty note. Generate or edit note content first."
)
return StreamingResponse(
iter([pdf_bytes]),
media_type="application/pdf",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
if format == "meditext":
try:
content, filename = await export_draft_meditext(db, draft_id, current_user.id)
except NoteError as e:
raise HTTPExceptupdate_draft_endpoint function · python · L212-L221 (10 LOC)app/routes/notes.py
async def update_draft_endpoint(
draft_id: UUID,
data: NoteDraftUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> NoteDraftOut:
try:
return await update_draft(db, draft_id, current_user.id, data)
except NoteError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)list_drafts function · python · L230-L238 (9 LOC)app/routes/notes.py
async def list_drafts(
session_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> list[NoteDraftOut]:
try:
return await list_session_drafts(db, session_id, current_user.id)
except NoteError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)delete_session function · python · L248-L257 (10 LOC)app/routes/notes.py
async def delete_session(
session_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
try:
await soft_delete_session(db, session_id, current_user.id)
except NoteError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)
return Nonedelete_draft function · python · L266-L275 (10 LOC)app/routes/notes.py
async def delete_draft(
draft_id: UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
try:
await soft_delete_draft(db, draft_id, current_user.id)
except NoteError as e:
raise HTTPException(status_code=e.status_code, detail=e.message)
return Nonestripe_webhook function · python · L42-L87 (46 LOC)app/routes/stripe_webhook.py
async def stripe_webhook(
request: Request,
db: AsyncSession = Depends(get_db),
):
"""
POST /stripe/webhook — Stripe sends events here.
Signature verification ensures only Stripe can call this endpoint.
If STRIPE_WEBHOOK_SECRET is not configured, signature verification
is skipped (development only).
"""
payload = await request.body()
sig_header = request.headers.get("stripe-signature", "")
# ── Verify signature ─────────────────────────────────────────
if not settings.stripe_webhook_secret or settings.stripe_webhook_secret == "whsec_REPLACE":
log.error("stripe_webhook_not_configured")
raise HTTPException(
status_code=503,
detail="Stripe webhooks not configured"
)
try:
import stripe
event = stripe.Webhook.construct_event(
payload, sig_header, settings.stripe_webhook_secret
)
except ValueError:
log.warning("stripe_webhook_invalid_payload")Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
create_checkout_session_endpoint function · python · L91-L117 (27 LOC)app/routes/stripe_webhook.py
async def create_checkout_session_endpoint(
request: CheckoutRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> CheckoutResponse:
"""
POST /stripe/create-checkout-session — Create a Stripe checkout session.
Allows authenticated users to upgrade from free to pro tier.
If the user doesn't have a Stripe customer ID, one is created.
Request body:
price_id: str — Stripe price ID for the plan
success_url: str — URL to redirect on successful checkout
cancel_url: str — URL to redirect if user cancels
Returns:
CheckoutResponse with checkout_url
"""
checkout_url = await create_checkout_session(
db,
user,
request.price_id,
request.success_url,
request.cancel_url,
)
return CheckoutResponse(checkout_url=checkout_url)get_subscription_status function · python · L121-L150 (30 LOC)app/routes/stripe_webhook.py
async def get_subscription_status(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> SubscriptionStatusOut:
"""
GET /stripe/subscription — Get current user's subscription status.
Returns tier, status, billing period info, and note generation limit.
"""
sub = await get_user_subscription(db, user.id)
if not sub:
log.warning("get_subscription_no_sub", user_id=str(user.id))
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Subscription not found",
)
tier = sub.tier
note_limit = get_note_limit_for_tier(tier)
is_active = is_subscription_active(sub)
return SubscriptionStatusOut(
tier=tier,
status=sub.status,
cancel_at_period_end=sub.cancel_at_period_end,
current_period_end=sub.current_period_end,
note_limit=note_limit,
is_active=is_active,
)create_customer_portal_endpoint function · python · L154-L175 (22 LOC)app/routes/stripe_webhook.py
async def create_customer_portal_endpoint(
request: PortalRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> PortalResponse:
"""
POST /stripe/customer-portal — Create a Stripe billing portal session.
Allows users to manage or cancel their subscription.
Request body:
return_url: str — URL to return to after portal session
Returns:
PortalResponse with portal_url
"""
portal_url = await create_portal_session(
db,
user,
request.return_url,
)
return PortalResponse(portal_url=portal_url)transcribe function · python · L46-L136 (91 LOC)app/routes/transcribe.py
async def transcribe(
request: Request,
file: UploadFile = File(..., description="Audio file to transcribe"),
language: str = Query(
"en",
description="Language hint for transcription (ISO 639-1 code: en, es, fr, etc.). Whisper will auto-detect if unsure.",
pattern="^[a-z]{2}$",
),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> dict:
"""
Transcribe audio file to text using OpenAI Whisper.
Returns JSON with:
- transcription: the transcribed text
- language: detected language (ISO 639-1 code)
- model: the Whisper model used
- warning: transcription accuracy warning (always included)
Audio data is processed transiently and never persisted or logged.
"""
# ── Validate filename extension ──────────────────────────────────────
if not file.filename:
raise HTTPException(status_code=422, detail="File must have a name")
ext = Path(file.filename).suUserRegister class · python · L8-L44 (37 LOC)app/schemas/auth.py
class UserRegister(BaseModel):
email: EmailStr
username: str
password: str
full_name: str = ""
specialty: str = "internal_medicine"
@field_validator("username")
@classmethod
def username_alphanumeric(cls, v: str) -> str:
v = v.strip()
if not v.replace("_", "").isalnum():
raise ValueError("Username must be alphanumeric (underscores only, no spaces or hyphens)")
if len(v) < 3 or len(v) > 30:
raise ValueError("Username must be between 3 and 30 characters")
return v.lower()
@field_validator("full_name")
@classmethod
def full_name_length(cls, v: str) -> str:
v = v.strip()
if len(v) > 100:
raise ValueError("Full name must not exceed 100 characters")
return v
@field_validator("password")
@classmethod
def password_strength(cls, v: str) -> str:
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
iusername_alphanumeric method · python · L17-L23 (7 LOC)app/schemas/auth.py
def username_alphanumeric(cls, v: str) -> str:
v = v.strip()
if not v.replace("_", "").isalnum():
raise ValueError("Username must be alphanumeric (underscores only, no spaces or hyphens)")
if len(v) < 3 or len(v) > 30:
raise ValueError("Username must be between 3 and 30 characters")
return v.lower()full_name_length method · python · L27-L31 (5 LOC)app/schemas/auth.py
def full_name_length(cls, v: str) -> str:
v = v.strip()
if len(v) > 100:
raise ValueError("Full name must not exceed 100 characters")
return vpassword_strength method · python · L35-L44 (10 LOC)app/schemas/auth.py
def password_strength(cls, v: str) -> str:
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
if not any(c.isupper() for c in v):
raise ValueError("Password must contain at least one uppercase letter")
if not any(c.islower() for c in v):
raise ValueError("Password must contain at least one lowercase letter")
if not any(c.isdigit() for c in v):
raise ValueError("Password must contain at least one digit")
return vRepobility · open methodology · https://repobility.com/research/
UserLogin class · python · L47-L49 (3 LOC)app/schemas/auth.py
class UserLogin(BaseModel):
username: str
password: strToken class · python · L52-L54 (3 LOC)app/schemas/auth.py
class Token(BaseModel):
access_token: str
token_type: str = "bearer"TokenData class · python · L57-L59 (3 LOC)app/schemas/auth.py
class TokenData(BaseModel):
user_id: str
username: strUserUpdate class · python · L62-L103 (42 LOC)app/schemas/auth.py
class UserUpdate(BaseModel):
"""Fields that can be updated via PATCH /auth/me."""
full_name: str | None = None
specialty: str | None = None
current_password: str | None = None
new_password: str | None = None
@field_validator("full_name")
@classmethod
def full_name_length(cls, v: str | None) -> str | None:
if v is not None:
v = v.strip()
if len(v) < 2:
raise ValueError("Full name must be at least 2 characters")
if len(v) > 100:
raise ValueError("Full name must not exceed 100 characters")
return v
@field_validator("specialty")
@classmethod
def valid_specialty(cls, v: str | None) -> str | None:
allowed = {
"internal_medicine", "hospitalist", "family_medicine",
"emergency_medicine", "other",
}
if v is not None and v not in allowed:
raise ValueError(f"Specialty must be one of: {', '.join(sorted(allowed))full_name_length method · python · L71-L78 (8 LOC)app/schemas/auth.py
def full_name_length(cls, v: str | None) -> str | None:
if v is not None:
v = v.strip()
if len(v) < 2:
raise ValueError("Full name must be at least 2 characters")
if len(v) > 100:
raise ValueError("Full name must not exceed 100 characters")
return vvalid_specialty method · python · L82-L89 (8 LOC)app/schemas/auth.py
def valid_specialty(cls, v: str | None) -> str | None:
allowed = {
"internal_medicine", "hospitalist", "family_medicine",
"emergency_medicine", "other",
}
if v is not None and v not in allowed:
raise ValueError(f"Specialty must be one of: {', '.join(sorted(allowed))}")
return vnew_password_strength method · python · L93-L103 (11 LOC)app/schemas/auth.py
def new_password_strength(cls, v: str | None) -> str | None:
if v is not None:
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
if not any(c.isupper() for c in v):
raise ValueError("Password must contain at least one uppercase letter")
if not any(c.islower() for c in v):
raise ValueError("Password must contain at least one lowercase letter")
if not any(c.isdigit() for c in v):
raise ValueError("Password must contain at least one digit")
return vUserOut class · python · L106-L116 (11 LOC)app/schemas/auth.py
class UserOut(BaseModel):
id: UUID
email: str
username: str
full_name: str
specialty: str
is_active: bool
created_at: datetime | None = None
updated_at: datetime | None = None
model_config = {"from_attributes": True}Open data scored by Repobility · https://repobility.com
CheckoutRequest class · python · L13-L35 (23 LOC)app/schemas/billing.py
class CheckoutRequest(BaseModel):
"""Request to create a Stripe checkout session for subscription upgrade."""
price_id: str
success_url: str
cancel_url: str
@field_validator("price_id")
@classmethod
def price_id_required(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("price_id cannot be empty")
return v
@field_validator("success_url", "cancel_url")
@classmethod
def url_required(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("URL cannot be empty")
if not (v.startswith("http://") or v.startswith("https://")):
raise ValueError("URL must start with http:// or https://")
return vprice_id_required method · python · L21-L25 (5 LOC)app/schemas/billing.py
def price_id_required(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("price_id cannot be empty")
return vurl_required method · python · L29-L35 (7 LOC)app/schemas/billing.py
def url_required(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("URL cannot be empty")
if not (v.startswith("http://") or v.startswith("https://")):
raise ValueError("URL must start with http:// or https://")
return vCheckoutResponse class · python · L38-L40 (3 LOC)app/schemas/billing.py
class CheckoutResponse(BaseModel):
"""Response containing the Stripe checkout session URL."""
checkout_url: strPortalRequest class · python · L43-L55 (13 LOC)app/schemas/billing.py
class PortalRequest(BaseModel):
"""Request to create a Stripe billing portal session."""
return_url: str
@field_validator("return_url")
@classmethod
def return_url_required(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("return_url cannot be empty")
if not (v.startswith("http://") or v.startswith("https://")):
raise ValueError("return_url must start with http:// or https://")
return vreturn_url_required method · python · L49-L55 (7 LOC)app/schemas/billing.py
def return_url_required(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("return_url cannot be empty")
if not (v.startswith("http://") or v.startswith("https://")):
raise ValueError("return_url must start with http:// or https://")
return vPortalResponse class · python · L58-L60 (3 LOC)app/schemas/billing.py
class PortalResponse(BaseModel):
"""Response containing the Stripe customer portal session URL."""
portal_url: strSubscriptionStatusOut class · python · L63-L72 (10 LOC)app/schemas/billing.py
class SubscriptionStatusOut(BaseModel):
"""Current subscription status for the authenticated user."""
tier: str # free | pro
status: str # active | past_due | canceled | trialing | ...
cancel_at_period_end: bool
current_period_end: datetime | None
note_limit: int
is_active: bool
model_config = {"from_attributes": True}Repobility · MCP-ready · https://repobility.com
DeIdRequest class · python · L6-L32 (27 LOC)app/schemas/deid.py
class DeIdRequest(BaseModel):
"""Request for text de-identification."""
text: str
model_config = {
"json_schema_extra": {
"examples": [
{
"text": "Patient Jane Doe, MRN 000000, DOB 01/01/1970, "
"called at 555-000-0000. Lives at 123 Example Street, Anytown, PR. "
"Email: [email protected]. Patient is 75 years old. "
"SSN 000-00-0000."
}
]
}
}
@field_validator("text")
@classmethod
def validate_text_length(cls, v: str) -> str:
"""Ensure text is between 1 and 50,000 characters."""
v = v.strip()
if len(v) < 1:
raise ValueError("text must be at least 1 character")
if len(v) > 50_000:
raise ValueError("text must not exceed 50,000 characters")
return vvalidate_text_length method · python · L25-L32 (8 LOC)app/schemas/deid.py
def validate_text_length(cls, v: str) -> str:
"""Ensure text is between 1 and 50,000 characters."""
v = v.strip()
if len(v) < 1:
raise ValueError("text must be at least 1 character")
if len(v) > 50_000:
raise ValueError("text must not exceed 50,000 characters")
return vPhiEntityOut class · python · L35-L42 (8 LOC)app/schemas/deid.py
class PhiEntityOut(BaseModel):
"""A detected PHI entity in the response."""
type: str # 'name', 'date', 'mrn', 'phone', 'ssn', 'email', 'address', 'age'
original: str
start: int
end: int
model_config = {"from_attributes": True}DeIdResult class · python · L45-L59 (15 LOC)app/schemas/deid.py
class DeIdResult(BaseModel):
"""Result of de-identification operation."""
original_text: str
redacted_text: str
entities_found: list[PhiEntityOut]
warning: str
model_config = {
"json_schema_extra": {
"description": (
"De-identification result with original text, redacted text, detected entities, "
"and a disclaimer that de-identification is incomplete."
)
}
}PlanItem class · python · L54-L59 (6 LOC)app/schemas/note_generation.py
class PlanItem(BaseModel):
"""A single problem-based plan entry."""
problem: str
assessment: str
interventions: list[str]StructuredNote class · python · L62-L116 (55 LOC)app/schemas/note_generation.py
class StructuredNote(BaseModel):
"""
Validated structured output for hospitalist clinical notes.
All note types share this schema. Per-note-type required section
validation is applied via model_validator after construction.
"""
note_type: str
language: str = "en"
# ── Core SOAP sections ───────────────────────────────────────────
chief_complaint: str | None = None
subjective: str | None = None # HPI (admission) or interval hx (progress)
objective: str | None = None # Vitals, exam, labs, imaging
assessment: str | None = None # One-liner or summary assessment
plan: list[PlanItem] = [] # Problem-based plan
# ── Clinical quality ─────────────────────────────────────────────
diagnostic_uncertainty: list[str] = [] # Differentials, rule-outs
billing_keywords: list[str] = [] # Billing-grade clinical phrases
# ── Discharge-specific ───────────────────────────────────────────
hospital_validate_required_sections method · python · L94-L112 (19 LOC)app/schemas/note_generation.py
def validate_required_sections(self) -> StructuredNote:
"""Enforce per-note-type required sections."""
required = NOTE_TYPE_REQUIRED_SECTIONS.get(self.note_type, set())
missing: list[str] = []
for section in required:
value = getattr(self, section, None)
# List fields: must be non-empty
if section in ("plan", "follow_up"):
if not getattr(self, section):
missing.append(section)
# String fields: must be non-empty
elif not value or (isinstance(value, str) and not value.strip()):
missing.append(section)
if missing:
raise ValueError(
f"Note type '{self.note_type}' requires non-empty sections: "
f"{', '.join(sorted(missing))}"
)
return selfto_json method · python · L114-L116 (3 LOC)app/schemas/note_generation.py
def to_json(self) -> dict:
"""Serialize to JSON-safe dict for DB persistence."""
return self.model_dump(mode="json")Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
get_structured_note_json_instruction function · python · L119-L147 (29 LOC)app/schemas/note_generation.py
def get_structured_note_json_instruction() -> str:
"""
Return a human-readable JSON schema description for inclusion in
the system prompt. This tells the LLM exactly what shape to output.
"""
return """{
"note_type": "<string: the note type being generated>",
"language": "<string: 'en' or 'es'>",
"chief_complaint": "<string or null: one-line chief complaint>",
"subjective": "<string or null: HPI narrative or interval history — full clinical prose>",
"objective": "<string or null: vitals, physical exam, labs, imaging — clinical detail>",
"assessment": "<string or null: clinical assessment summary>",
"plan": [
{
"problem": "<string: problem name>",
"assessment": "<string: brief assessment for this problem>",
"interventions": ["<string: intervention 1>", "<string: intervention 2>"]
}
],
"diagnostic_uncertainty": ["<string: differential or rule-out>"],
"billing_keywords": ["<string: billing-grade clinical phrase>"],
"hospiNoteSessionCreate class · python · L19-L59 (41 LOC)app/schemas/notes.py
class NoteSessionCreate(BaseModel):
note_type: str = "soap_progress"
patient_context: str | None = None
language: str = "en"
model_config = {
"json_schema_extra": {
"examples": [
{
"note_type": "admission_hp",
"patient_context": "68 y/o male, CHF exacerbation, presenting with dyspnea and lower extremity edema",
"language": "en",
}
]
}
}
@field_validator("note_type")
@classmethod
def valid_note_type(cls, v: str) -> str:
if v not in NOTE_TYPES:
raise ValueError(f"note_type must be one of: {', '.join(sorted(NOTE_TYPES))}")
return v
@field_validator("patient_context")
@classmethod
def patient_context_length(cls, v: str | None) -> str | None:
if v is not None:
v = v.strip()
if len(v) > 500:
raise ValueError("patient_context must not exceed valid_note_type method · python · L38-L41 (4 LOC)app/schemas/notes.py
def valid_note_type(cls, v: str) -> str:
if v not in NOTE_TYPES:
raise ValueError(f"note_type must be one of: {', '.join(sorted(NOTE_TYPES))}")
return vpatient_context_length method · python · L45-L52 (8 LOC)app/schemas/notes.py
def patient_context_length(cls, v: str | None) -> str | None:
if v is not None:
v = v.strip()
if len(v) > 500:
raise ValueError("patient_context must not exceed 500 characters")
if len(v) == 0:
return None
return vvalid_language method · python · L56-L59 (4 LOC)app/schemas/notes.py
def valid_language(cls, v: str) -> str:
if v not in SUPPORTED_LANGUAGES:
raise ValueError(f"language must be one of: {', '.join(sorted(SUPPORTED_LANGUAGES))}")
return vNoteSessionOut class · python · L62-L72 (11 LOC)app/schemas/notes.py
class NoteSessionOut(BaseModel):
id: UUID
note_type: str
status: str
patient_context: str | None
language: str = "en"
created_at: datetime | None = None
updated_at: datetime | None = None
deleted_at: datetime | None = None
model_config = {"from_attributes": True}NoteSessionDetailOut class · python · L75-L77 (3 LOC)app/schemas/notes.py
class NoteSessionDetailOut(NoteSessionOut):
"""Extended session response that includes drafts — used for single-session fetch."""
drafts: list["NoteDraftOut"] = []NoteGenerateRequest class · python · L80-L103 (24 LOC)app/schemas/notes.py
class NoteGenerateRequest(BaseModel):
session_id: UUID
input_text: str
model_config = {
"json_schema_extra": {
"examples": [
{
"session_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"input_text": "Patient is a 72 year old female presenting with acute onset chest pain radiating to the left arm. History of hypertension and type 2 diabetes. Vitals stable, EKG shows ST changes in leads II, III, aVF. Troponin pending.",
}
]
}
}
@field_validator("input_text")
@classmethod
def input_not_empty(cls, v: str) -> str:
v = v.strip()
if len(v) < 20:
raise ValueError("input_text must be at least 20 characters")
if len(v) > 10_000:
raise ValueError("input_text must not exceed 10,000 characters")
return vRepobility · open methodology · https://repobility.com/research/
input_not_empty method · python · L97-L103 (7 LOC)app/schemas/notes.py
def input_not_empty(cls, v: str) -> str:
v = v.strip()
if len(v) < 20:
raise ValueError("input_text must be at least 20 characters")
if len(v) > 10_000:
raise ValueError("input_text must not exceed 10,000 characters")
return vNoteGenerateResponse class · python · L106-L119 (14 LOC)app/schemas/notes.py
class NoteGenerateResponse(BaseModel):
draft_id: UUID
session_id: UUID
output_text: str
structured_payload: dict | None = None
version: int
model_used: str
prompt_version: str
schema_version: str | None = None
tokens_used: int | None
generation_time_ms: int | None
billing_hints: list[str] = []
model_config = {"from_attributes": True, "protected_namespaces": ()}NoteDraftOut class · python · L122-L137 (16 LOC)app/schemas/notes.py
class NoteDraftOut(BaseModel):
id: UUID
session_id: UUID
version: int
input_text: str
output_text: str | None
structured_payload: dict | None = None
schema_version: str | None = None
model_used: str | None
prompt_version: str | None
tokens_used: int | None
generation_time_ms: int | None = None
created_at: datetime | None = None
deleted_at: datetime | None = None
model_config = {"from_attributes": True, "protected_namespaces": ()}