Function bodies 229 total
ThinkTankHandler.end_headers method · python · L36-L41 (6 LOC)brainstorming/think-tank-server.py
def end_headers(self):
"""Add CORS headers to allow frontend access."""
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
super().end_headers()ThinkTankHandler.do_GET method · python · L48-L62 (15 LOC)brainstorming/think-tank-server.py
def do_GET(self):
"""Handle GET requests."""
parsed_path = urlparse(self.path)
query_params = parse_qs(parsed_path.query)
# API endpoint to get session content
if parsed_path.path == '/api/session':
session_file = query_params.get('file', [SESSION_FILE])[0]
self.send_session_content(session_file)
# API endpoint to list all sessions
elif parsed_path.path == '/api/sessions':
self.list_sessions()
else:
# Serve static files (HTML, JS, etc.)
super().do_GET()ThinkTankHandler.do_POST method · python · L64-L72 (9 LOC)brainstorming/think-tank-server.py
def do_POST(self):
"""Handle POST requests."""
parsed_path = urlparse(self.path)
# API endpoint to append message
if parsed_path.path == '/api/message':
self.append_message()
else:
self.send_error(404, 'Endpoint not found')ThinkTankHandler.json_to_markdown method · python · L74-L205 (132 LOC)brainstorming/think-tank-server.py
def json_to_markdown(self, data):
"""Convert a JSON think tank session to readable markdown format."""
md_lines = []
# Session metadata
if 'session_metadata' in data:
sm = data['session_metadata']
md_lines.append(f"# {sm.get('title', 'Think Tank Session')}")
md_lines.append("")
if 'date' in sm:
md_lines.append(f"**Date:** {sm['date']}")
if 'participant' in sm:
md_lines.append(f"**Participant:** {sm['participant']}")
if 'format' in sm:
md_lines.append(f"**Format:** {sm['format']}")
if 'duration' in sm:
md_lines.append(f"**Duration:** {sm['duration']}")
if 'outcome' in sm:
md_lines.append(f"**Outcome:** {sm['outcome']}")
md_lines.append("")
md_lines.append("---")
md_lines.append("")
# Expert panel
if 'expert_paThinkTankHandler.send_session_content method · python · L207-L245 (39 LOC)brainstorming/think-tank-server.py
def send_session_content(self, session_file=None):
"""Send the current session file content."""
try:
file_path = session_file or SESSION_FILE
# Security: Ensure file is within allowed directories
file_path = os.path.normpath(file_path)
allowed_prefixes = ['docs/', 'project-management/']
is_allowed = any(file_path.startswith(prefix) for prefix in allowed_prefixes)
if not is_allowed:
# Try to find it in docs first
potential_path = f'docs/{os.path.basename(file_path)}'
if os.path.exists(potential_path):
file_path = potential_path
else:
self.send_error(403, f'Access denied: {file_path}')
return
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
ThinkTankHandler.extract_session_metadata method · python · L247-L332 (86 LOC)brainstorming/think-tank-server.py
def extract_session_metadata(self, file_path, file_format):
"""Extract metadata from a session file."""
filename = os.path.basename(file_path)
metadata = {
'file': file_path,
'filename': filename,
'format': file_format,
'date': 'Unknown',
'display_name': filename.replace('_', ' ').replace('-', ' ').replace('.md', '').replace('.json', ''),
'title': None,
'session_type': 'General'
}
try:
# Try to extract date from filename
date_match = re.search(r'(\d{4})-(\d{2})-(\d{2})', filename)
if date_match:
year, month, day = date_match.groups()
metadata['date'] = f"{year}-{month}-{day}"
metadata['display_name'] = f"Think Tank Session - {month}/{day}/{year}"
# Read file to extract more metadata
with open(file_path, 'r', encoding='utf-8') as f:
ThinkTankHandler.list_sessions method · python · L334-L379 (46 LOC)brainstorming/think-tank-server.py
def list_sessions(self):
"""List all available Think Tank session files from all sources."""
try:
sessions = []
seen_files = set() # Avoid duplicates
# Scan all session sources
for source in SESSION_SOURCES:
source_dir = source['dir']
pattern = source['pattern']
file_format = source['format']
if not os.path.exists(source_dir):
continue
full_pattern = os.path.join(source_dir, pattern)
found_files = glob.glob(full_pattern)
for file_path in found_files:
# Skip if already processed
abs_path = os.path.abspath(file_path)
if abs_path in seen_files:
continue
seen_files.add(abs_path)
# ExtracRepobility · code-quality intelligence · https://repobility.com
ThinkTankHandler.append_message method · python · L381-L419 (39 LOC)brainstorming/think-tank-server.py
def append_message(self):
"""Append a new message to the session file."""
try:
# Read POST data
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
message = data.get('message', '').strip()
speaker = data.get('speaker', 'Jeremy Potts')
role = data.get('role', 'CEO')
if not message:
self.send_error(400, 'Message is required')
return
# Format the message
timestamp = datetime.now().strftime('%B %d, %Y at %I:%M %p')
formatted_message = f"\n\n### {speaker} ({role})\n*{timestamp}*\n\n{message}\n"
# Append to file
with open(SESSION_FILE, 'a', encoding='utf-8') as f:
f.write(formatted_message)
# Send success response
self.send_response(200)
selfrun_server function · python · L426-L474 (49 LOC)brainstorming/think-tank-server.py
def run_server(port=8000):
"""Start the Think Tank session server."""
server_address = ('', port)
httpd = HTTPServer(server_address, ThinkTankHandler)
# Count available sessions
session_count = 0
for source in SESSION_SOURCES:
if os.path.exists(source['dir']):
pattern = os.path.join(source['dir'], source['pattern'])
session_count += len(glob.glob(pattern))
print(f"""
╔══════════════════════════════════════════════════════════════╗
║ KNOVA Quest Think Tank Session Server ║
║ Session History Browser Enabled ║
╚══════════════════════════════════════════════════════════════╝
🚀 Server running at: http://localhost:{port}
📝 Active session: {SESSION_FILE}
📚 Sessions available: {session_count} (MD + JSON formats)
🌐 Open viewer: http://localhost:{port}/think-tank-viewer.html
Session sources:
- docs/THINK_TANK_SESSION_*.md
- project-management/think-tank-sessions/*.json
- projget_confidence_class function · python · L296-L303 (8 LOC)src/app/streamlit_app.py
def get_confidence_class(confidence: float) -> str:
"""Get CSS class based on confidence level."""
if confidence >= 0.8:
return "confidence-high"
elif confidence >= 0.6:
return "confidence-medium"
else:
return "confidence-low"format_findings_html function · python · L306-L311 (6 LOC)src/app/streamlit_app.py
def format_findings_html(findings: list) -> str:
"""Format findings as HTML list."""
html = ""
for finding in findings:
html += f'<div class="finding-item">{finding}</div>'
return htmlinitialize_pipeline function · python · L314-L323 (10 LOC)src/app/streamlit_app.py
def initialize_pipeline():
"""Initialize the inference pipeline."""
if st.session_state.pipeline is None:
if MODELS_AVAILABLE:
# Use mock model for demo (replace with real model in production)
model = MockMedGemmaModel()
st.session_state.pipeline = InferencePipeline(model=model)
else:
st.warning("Model modules not available. Using demo mode.")
st.session_state.pipeline = Noneinitialize_image_pipeline function · python · L326-L334 (9 LOC)src/app/streamlit_app.py
def initialize_image_pipeline():
"""Initialize the image analysis pipeline."""
if st.session_state.image_pipeline is None:
if PIPELINE_AVAILABLE:
config = AnalysisPipelineConfig(mock_mode=True)
st.session_state.image_pipeline = ImageAnalysisPipeline(config)
else:
st.warning("Image pipeline not available. Using demo mode.")
st.session_state.image_pipeline = Nonerender_sidebar function · python · L341-L391 (51 LOC)src/app/streamlit_app.py
def render_sidebar():
"""Render the sidebar."""
with st.sidebar:
st.image("https://via.placeholder.com/200x80?text=RadAssist+Pro", width=200)
st.markdown("---")
st.markdown("### Analysis Mode")
analysis_mode = st.radio(
"Select analysis type:",
["2D Image Analysis", "3D Volume Analysis", "Longitudinal Comparison", "CT Scan Series Upload"],
help="Choose the type of analysis to perform"
)
st.markdown("---")
st.markdown("### Settings")
show_confidence = st.checkbox("Show confidence scores", value=True)
show_processing_time = st.checkbox("Show processing time", value=True)
generate_report = st.checkbox("Generate text report", value=True)
st.markdown("---")
st.markdown("### Model Info")
st.info("""
**Model:** MedGemma 1.5 4B
**Provider:** Google
**Unique Capabilities:**
- 3D volumetric analysis
- Lorender_header function · python · L398-L417 (20 LOC)src/app/streamlit_app.py
def render_header():
"""Render the main header."""
st.markdown('<h1 class="main-header">🏥 RadAssist Pro</h1>', unsafe_allow_html=True)
st.markdown('<p class="tagline">"AI That Remembers"</p>', unsafe_allow_html=True)
st.markdown(
'<p class="sub-header">Longitudinal Change Detection with Clinical Decision Support • Powered by MedGemma 1.5</p>',
unsafe_allow_html=True
)
# Disclaimer
st.markdown("""
<div class="disclaimer-box">
<h4>⚠️ Research Prototype - Not for Clinical Use</h4>
<p>
This is a demonstration system for the Med-Gemma Impact Challenge.
All findings must be verified by a qualified radiologist.
Not FDA-cleared. For research and educational purposes only.
</p>
</div>
""", unsafe_allow_html=True)Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
render_2d_analysis function · python · L420-L485 (66 LOC)src/app/streamlit_app.py
def render_2d_analysis(settings: dict):
"""Render 2D image analysis interface."""
st.markdown("## 📷 2D Medical Image Analysis")
st.markdown("Upload a chest X-ray or other 2D medical image for AI analysis.")
col1, col2 = st.columns([1, 1])
with col1:
st.markdown("### Upload Image")
uploaded_file = st.file_uploader(
"Choose a medical image",
type=['png', 'jpg', 'jpeg', 'dcm'],
help="Supported formats: PNG, JPEG, DICOM"
)
clinical_context = st.text_area(
"Clinical Context (optional)",
placeholder="Enter relevant clinical history...",
help="Adding clinical context can improve analysis accuracy"
)
study_type = st.selectbox(
"Study Type",
["Chest X-ray", "Abdominal X-ray", "Bone X-ray", "Other"],
index=0
)
analyze_button = st.button("🔍 Analyze Image", type="primary", use_container_width=True)
render_3d_analysis function · python · L488-L543 (56 LOC)src/app/streamlit_app.py
def render_3d_analysis(settings: dict):
"""Render 3D volume analysis interface."""
st.markdown("## 🧊 3D Volumetric Analysis")
st.markdown("""
**MedGemma 1.5 Unique Capability**: Analyze complete CT/MRI volumes.
This is a capability unique to MedGemma 1.5 - no other open-source model can do this.
""")
st.info("📁 Upload a folder of DICOM files or a single 3D volume file")
uploaded_files = st.file_uploader(
"Upload DICOM slices",
type=['dcm'],
accept_multiple_files=True,
help="Upload multiple DICOM files from a CT/MRI scan"
)
if uploaded_files:
st.success(f"Uploaded {len(uploaded_files)} DICOM files")
col1, col2 = st.columns(2)
with col1:
st.markdown("### Volume Information")
st.write(f"**Slices:** {len(uploaded_files)}")
st.write("**Modality:** CT (estimated)")
with col2:
clinical_context = st.text_area(
"Clinical Corender_longitudinal_analysis function · python · L546-L561 (16 LOC)src/app/streamlit_app.py
def render_longitudinal_analysis(settings: dict):
"""Render longitudinal comparison interface - THE PRIMARY DIFFERENTIATOR."""
st.markdown("## 📈 Longitudinal Change Detection")
st.markdown("""
**The Innovation:** RadAssist Pro doesn't just detect findings - it **remembers**.
Track disease progression with clinical decision support, Lung-RADS integration,
and differential diagnosis evolution.
""")
# Demo mode toggle
demo_mode = st.checkbox("🎬 Run Demo Scenario (The Missed Progression Save)", value=True)
if demo_mode:
render_demo_longitudinal()
else:
render_custom_longitudinal(settings)render_demo_longitudinal function · python · L564-L586 (23 LOC)src/app/streamlit_app.py
def render_demo_longitudinal():
"""Render the demo scenario for the competition video."""
st.markdown("---")
st.markdown("### 📋 Demo Scenario: The Missed Progression Save")
st.markdown("""
> *"A 58-year-old patient was told 'stable' four times. She had cancer."*
This demo shows how RadAssist Pro would have caught what traditional reads missed.
""")
# Patient context
with st.expander("👤 Patient Context", expanded=True):
st.markdown("""
- **Age:** 58 years old
- **Gender:** Female
- **History:** Former smoker (30 pack-years, quit 5 years ago)
- **Finding:** Incidental 6mm lung nodule on screening CT (January 2024)
- **Prior Reports:** "Stable nodule, continued surveillance" x4
""")
if st.button("🔍 Run Longitudinal Analysis", type="primary", use_container_width=True):
with st.spinner("Analyzing 18 months of sequential scans with MedGemma..."):
render_demo_results_longitudrender_demo_results_longitudinal function · python · L589-L768 (180 LOC)src/app/streamlit_app.py
def render_demo_results_longitudinal():
"""Render the demo longitudinal analysis results."""
if not LONGITUDINAL_AVAILABLE:
st.error("Longitudinal analyzer module not available")
return
# Create the demo measurements
measurements = [
NoduleMeasurement(datetime(2024, 1, 15), 6.0, "right upper lobe", "solid"),
NoduleMeasurement(datetime(2024, 7, 20), 6.2, "right upper lobe", "solid"),
NoduleMeasurement(datetime(2025, 1, 18), 6.8, "right upper lobe", "solid"),
NoduleMeasurement(datetime(2025, 7, 15), 8.3, "right upper lobe", "solid"),
]
clinical_context = "58-year-old female, former smoker (30 pack-years), incidental lung nodule on screening CT"
# Create the report
report = create_longitudinal_report(measurements, clinical_context)
analysis = report.analysis
st.success("✅ Analysis Complete")
# Timeline
st.markdown("### 📅 Measurement Timeline")
timeline_html = ""
for m in measuremerender_custom_longitudinal function · python · L771-L831 (61 LOC)src/app/streamlit_app.py
def render_custom_longitudinal(settings: dict):
"""Render custom longitudinal comparison interface."""
st.info("Enter nodule measurements from sequential scans for analysis")
# Input form for measurements
st.markdown("### Enter Measurements")
num_timepoints = st.slider("Number of timepoints", 2, 6, 4)
measurements = []
cols = st.columns(num_timepoints)
for i, col in enumerate(cols):
with col:
st.markdown(f"**Scan {i+1}**")
date = st.date_input(
"Date",
datetime.now() - timedelta(days=180 * (num_timepoints - 1 - i)),
key=f"date_{i}"
)
size = st.number_input(
"Size (mm)",
min_value=1.0,
max_value=50.0,
value=6.0 + i * 0.5,
step=0.1,
key=f"size_{i}"
)
measurements.append((date, size))
clinical_context = st.text_area(
"Clrender_analysis_results function · python · L834-L892 (59 LOC)src/app/streamlit_app.py
def render_analysis_results(report: 'AnalysisReport', settings: dict):
"""Render analysis results."""
st.markdown("---")
st.markdown("## 📋 Analysis Results")
# Status indicator
if report.abnormalities:
st.markdown(
'<span class="status-abnormal">⚠️ ABNORMALITIES DETECTED</span>',
unsafe_allow_html=True
)
else:
st.markdown(
'<span class="status-normal">✓ NO ACUTE ABNORMALITY</span>',
unsafe_allow_html=True
)
col1, col2 = st.columns([2, 1])
with col1:
# Findings
st.markdown("### Findings")
for i, finding in enumerate(report.findings, 1):
st.markdown(f'<div class="finding-item">{i}. {finding}</div>', unsafe_allow_html=True)
# Impression
st.markdown("### Impression")
st.info(report.impression)
with col2:
# Metrics
st.markdown("### Metrics")
if settings.get("show_confidence", True):
render_ct_series_upload function · python · L895-L1062 (168 LOC)src/app/streamlit_app.py
def render_ct_series_upload(settings: dict):
"""Render CT scan series upload interface for longitudinal analysis."""
st.markdown("## 📂 CT Scan Series Upload")
st.markdown("""
**Upload sequential CT scans** for automated nodule detection and longitudinal analysis.
RadAssist Pro will detect nodules, measure them, and track changes over time.
""")
# Initialize pipeline
initialize_image_pipeline()
# Upload section
st.markdown("### 📤 Upload CT Scans")
col1, col2 = st.columns([2, 1])
with col1:
# Multiple scan upload
num_scans = st.slider("Number of scans to upload", 2, 6, 4, key="num_ct_scans")
scans_data = []
for i in range(num_scans):
with st.expander(f"📁 Scan {i+1}", expanded=(i == 0)):
scan_date = st.date_input(
f"Scan Date",
datetime.now() - timedelta(days=180 * (num_scans - 1 - i)),
key=f"ct_date_{i}"
Powered by Repobility — scan your code at https://repobility.com
render_pipeline_results function · python · L1065-L1121 (57 LOC)src/app/streamlit_app.py
def render_pipeline_results(result: 'PipelineResult', settings: dict):
"""Render results from the image analysis pipeline."""
st.markdown("---")
st.success("✅ Analysis Complete")
# Detection summary
st.markdown("### 🔍 Detection Summary")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Scans Processed", result.scans_processed)
with col2:
nodule_count = len(result.detection_results[0].nodules) if result.detection_results else 0
st.metric("Nodules Detected", nodule_count)
with col3:
st.metric("Longitudinal Data", "Yes" if result.has_longitudinal else "No")
# Detection details
if result.detection_results:
st.markdown("### 📋 Detection Details")
for i, detection in enumerate(result.detection_results):
with st.expander(f"Scan {i+1}: {detection.scan_metadata.scan_date.strftime('%Y-%m-%d')}"):
st.write(f"**Modality:** {detection.scan_metadata.modality}")
render_longitudinal_report function · python · L1124-L1229 (106 LOC)src/app/streamlit_app.py
def render_longitudinal_report(report, settings: dict):
"""Render a longitudinal report (shared between modes)."""
analysis = report.analysis
st.markdown("### 📈 Longitudinal Change Analysis")
# Timeline
st.markdown("#### 📅 Measurement Timeline")
timeline_html = ""
for m in report.measurements:
date_str = m.date.strftime("%Y-%m-%d")
timeline_html += f'''
<div class="timeline-item">
<span class="timeline-date">{date_str}</span>
<span class="timeline-value">{m.size_mm}mm {m.nodule_type} nodule</span>
</div>
'''
st.markdown(timeline_html, unsafe_allow_html=True)
# Key metrics
col1, col2, col3 = st.columns(3)
with col1:
st.metric(
"Size Change",
f"+{analysis.size_change_mm:.1f}mm",
f"{analysis.size_change_percent:.1f}%"
)
with col2:
vdt_display = f"{analysis.volume_doubling_time_days:.0f} days" if analysis.volume_dourender_demo_results function · python · L1232-L1265 (34 LOC)src/app/streamlit_app.py
def render_demo_results(settings: dict):
"""Render demo results when pipeline not available."""
st.markdown("---")
st.markdown("## 📋 Analysis Results (Demo Mode)")
st.markdown(
'<span class="status-normal">✓ NO ACUTE ABNORMALITY</span>',
unsafe_allow_html=True
)
findings = [
"Heart size is normal",
"Lungs are clear bilaterally",
"No pleural effusion",
"No pneumothorax",
"Mediastinal contours are unremarkable",
"Osseous structures are intact"
]
col1, col2 = st.columns([2, 1])
with col1:
st.markdown("### Findings")
for i, finding in enumerate(findings, 1):
st.markdown(f'<div class="finding-item">{i}. {finding}</div>', unsafe_allow_html=True)
st.markdown("### Impression")
st.info("No acute cardiopulmonary abnormality")
with col2:
st.markdown("### Metrics")
st.markdown('**Confidence:** <span class="confidence-high">87%</spmain function · python · L1272-L1298 (27 LOC)src/app/streamlit_app.py
def main():
"""Main application entry point."""
# Render header
render_header()
# Render sidebar and get settings
settings = render_sidebar()
# Route to appropriate analysis mode
if settings["mode"] == "2D Image Analysis":
render_2d_analysis(settings)
elif settings["mode"] == "3D Volume Analysis":
render_3d_analysis(settings)
elif settings["mode"] == "Longitudinal Comparison":
render_longitudinal_analysis(settings)
elif settings["mode"] == "CT Scan Series Upload":
render_ct_series_upload(settings)
# Footer
st.markdown("---")
st.markdown("""
<div style="text-align: center; color: #718096; font-size: 0.8rem;">
<p>RadAssist Pro • Med-Gemma Impact Challenge 2026</p>
<p>Powered by Google MedGemma 1.5 • Built with Streamlit</p>
<p>⚠️ FOR RESEARCH PURPOSES ONLY - NOT FOR CLINICAL USE</p>
</div>
""", unsafe_allow_html=True)PipelineResult.generate_summary method · python · L85-L114 (30 LOC)src/core/image_analysis_pipeline.py
def generate_summary(self) -> Dict[str, Any]:
"""Generate a summary dictionary of the analysis."""
summary = {
"scans_analyzed": self.scans_processed,
"nodules_found": self.nodule_count,
"has_longitudinal_data": self.has_longitudinal,
"requires_action": self.requires_action,
}
if self.has_longitudinal and self.longitudinal_report:
analysis = self.longitudinal_report.analysis
summary.update({
"risk_level": analysis.risk_level.value,
"trajectory": analysis.trajectory.value,
"size_change_percent": f"{analysis.size_change_percent:.1f}%",
"volume_doubling_time": (
f"{analysis.volume_doubling_time_days:.0f} days"
if analysis.volume_doubling_time_days else "N/A"
),
"lung_rads": (
analysis.lung_rads_current.value
ImageAnalysisPipeline.__init__ method · python · L144-L156 (13 LOC)src/core/image_analysis_pipeline.py
def __init__(self, config: Optional[AnalysisPipelineConfig] = None, model=None):
"""
Initialize the analysis pipeline.
Args:
config: Pipeline configuration
model: Optional MedGemma model instance
"""
self.config = config or AnalysisPipelineConfig()
self.detector = NoduleDetector(model=model)
self._model = model
logger.info(f"ImageAnalysisPipeline initialized (mock_mode={self.config.mock_mode})")ImageAnalysisPipeline.analyze_single method · python · L158-L194 (37 LOC)src/core/image_analysis_pipeline.py
def analyze_single(
self,
image_path: Union[str, Path],
scan_date: Optional[datetime] = None,
clinical_context: Optional[str] = None
) -> PipelineResult:
"""
Analyze a single scan for nodules.
Args:
image_path: Path to image/DICOM file
scan_date: Date of scan
clinical_context: Clinical history
Returns:
PipelineResult with detection findings
"""
import time
start_time = time.time()
# Run detection
detection_result = self.detector.detect(
image_path,
scan_date=scan_date,
clinical_context=clinical_context
)
processing_time = (time.time() - start_time) * 1000
return PipelineResult(
detection_results=[detection_result],
nodule_count=detection_result.nodule_count,
scans_processed=1,
processing_time_ms=processing_time,
ImageAnalysisPipeline.analyze_longitudinal method · python · L196-L276 (81 LOC)src/core/image_analysis_pipeline.py
def analyze_longitudinal(
self,
scans: List[Tuple[Union[str, Path], datetime]],
clinical_context: Optional[str] = None,
nodule_location: Optional[str] = None
) -> PipelineResult:
"""
Perform longitudinal analysis across multiple scans.
This is the PRIMARY FEATURE of RadAssist Pro.
Args:
scans: List of (image_path, scan_date) tuples
clinical_context: Clinical history
nodule_location: Optional specific nodule to track
Returns:
PipelineResult with longitudinal analysis
"""
import time
start_time = time.time()
warnings = []
errors = []
# Sort scans by date
scans_sorted = sorted(scans, key=lambda x: x[1])
# Detect nodules in each scan
detection_results = []
for image_path, scan_date in scans_sorted:
result = self.detector.detect(
image_path,
Repobility — same analyzer, your code, free for public repos · /scan/
ImageAnalysisPipeline.analyze_with_manual_measurements method · python · L278-L335 (58 LOC)src/core/image_analysis_pipeline.py
def analyze_with_manual_measurements(
self,
measurements: List[Dict[str, Any]],
clinical_context: Optional[str] = None
) -> PipelineResult:
"""
Run longitudinal analysis with manually entered measurements.
This is useful when nodule measurements are already known
(e.g., from prior reports or manual measurement).
Args:
measurements: List of dicts with 'date', 'size_mm', 'location'
clinical_context: Clinical history
Returns:
PipelineResult with longitudinal analysis
"""
import time
start_time = time.time()
# Convert to NoduleMeasurement objects
nodule_measurements = []
for m in measurements:
date = m.get('date')
if isinstance(date, str):
date = datetime.fromisoformat(date)
nodule_measurements.append(NoduleMeasurement(
date=date,
size_mm=m.ImageAnalysisPipeline._extract_measurements_for_tracking method · python · L337-L378 (42 LOC)src/core/image_analysis_pipeline.py
def _extract_measurements_for_tracking(
self,
detection_results: List[DetectionResult],
target_location: Optional[str] = None
) -> List[NoduleMeasurement]:
"""
Extract nodule measurements for longitudinal tracking.
Matches nodules across scans by location.
"""
measurements = []
for detection in detection_results:
if not detection.has_nodules:
continue
# Find nodule to track
if target_location:
# Look for specific location
matching_nodules = [
n for n in detection.nodules
if target_location.lower() in n.location.lower()
]
if matching_nodules:
nodule = max(matching_nodules, key=lambda n: n.size_mm)
else:
continue
else:
# Track largest nodule
nodule = deteImageAnalysisPipeline.generate_summary method · python · L380-L418 (39 LOC)src/core/image_analysis_pipeline.py
def generate_summary(self, result: PipelineResult) -> Dict[str, Any]:
"""
Generate a human-readable summary of the analysis.
Args:
result: PipelineResult from analysis
Returns:
Dictionary with summary information
"""
summary = {
"scans_analyzed": result.scans_processed,
"nodules_found": result.nodule_count,
"has_longitudinal_data": result.has_longitudinal,
"requires_action": result.requires_action,
}
if result.has_longitudinal:
analysis = result.longitudinal_report.analysis
summary.update({
"risk_level": analysis.risk_level.value,
"trajectory": analysis.trajectory.value,
"size_change_percent": f"{analysis.size_change_percent:.1f}%",
"volume_doubling_time": (
f"{analysis.volume_doubling_time_days:.0f} days"
if analysis.vdemo_pipeline function · python · L421-L473 (53 LOC)src/core/image_analysis_pipeline.py
def demo_pipeline():
"""Demo the full analysis pipeline."""
print("=" * 70)
print("RadAssist Pro - Image Analysis Pipeline Demo")
print("\"AI That Remembers\"")
print("=" * 70)
print()
# Create pipeline
pipeline = ImageAnalysisPipeline()
# Demo with manual measurements (simulating detected nodules)
measurements = [
{"date": datetime(2024, 1, 15), "size_mm": 6.0, "location": "right upper lobe"},
{"date": datetime(2024, 7, 20), "size_mm": 6.2, "location": "right upper lobe"},
{"date": datetime(2025, 1, 18), "size_mm": 6.8, "location": "right upper lobe"},
{"date": datetime(2025, 7, 15), "size_mm": 8.3, "location": "right upper lobe"},
]
result = pipeline.analyze_with_manual_measurements(
measurements,
clinical_context="58-year-old female, former smoker (30 pack-years)"
)
# Print summary
summary = pipeline.generate_summary(result)
print("ANALYSIS SUMMARY")
print("-" * 4calculate_volume_doubling_time function · python · L152-L189 (38 LOC)src/core/longitudinal_analyzer.py
def calculate_volume_doubling_time(
measurement1: NoduleMeasurement,
measurement2: NoduleMeasurement
) -> Optional[float]:
"""
Calculate volume doubling time (VDT) between two measurements.
VDT = (days_between * ln(2)) / ln(V2/V1)
Args:
measurement1: Earlier measurement
measurement2: Later measurement
Returns:
VDT in days, or None if calculation not possible
"""
v1 = measurement1.volume_calculated
v2 = measurement2.volume_calculated
# Ensure measurements are in correct order
if measurement2.date < measurement1.date:
measurement1, measurement2 = measurement2, measurement1
v1, v2 = v2, v1
days_between = (measurement2.date - measurement1.date).days
if days_between <= 0:
return None
if v2 <= v1:
# Nodule shrinking or stable - VDT not applicable
return None
try:
vdt = (days_between * math.log(2)) / math.log(v2 / v1)
return vdt
except (Vaclassify_lung_rads function · python · L192-L243 (52 LOC)src/core/longitudinal_analyzer.py
def classify_lung_rads(
size_mm: float,
nodule_type: str = "solid",
growth_detected: bool = False,
vdt_days: Optional[float] = None
) -> LungRADSCategory:
"""
Classify nodule according to Lung-RADS guidelines.
Simplified implementation based on ACR Lung-RADS v1.1.
Args:
size_mm: Nodule size in mm
nodule_type: solid, ground-glass, or part-solid
growth_detected: Whether growth was detected
vdt_days: Volume doubling time if calculated
Returns:
Lung-RADS category
"""
if nodule_type == "solid":
if size_mm < 6:
return LungRADSCategory.CATEGORY_2
elif size_mm < 8:
if growth_detected:
return LungRADSCategory.CATEGORY_4A
return LungRADSCategory.CATEGORY_3
elif size_mm < 15:
if growth_detected and vdt_days and vdt_days < VDT_HIGH_RISK:
return LungRADSCategory.CATEGORY_4B
return LungRADSCategory.Cassess_risk_level function · python · L246-L283 (38 LOC)src/core/longitudinal_analyzer.py
def assess_risk_level(
vdt_days: Optional[float],
lung_rads: LungRADSCategory,
trajectory: ChangeTrajectory
) -> RiskLevel:
"""
Assess overall risk level based on multiple factors.
Args:
vdt_days: Volume doubling time
lung_rads: Current Lung-RADS category
trajectory: Change trajectory
Returns:
Risk level assessment
"""
# VDT-based risk
if vdt_days:
if vdt_days < 200:
return RiskLevel.VERY_HIGH
elif vdt_days < VDT_HIGH_RISK:
return RiskLevel.HIGH
elif vdt_days < VDT_INTERMEDIATE:
return RiskLevel.INTERMEDIATE
# Lung-RADS based risk
if lung_rads == LungRADSCategory.CATEGORY_4B:
return RiskLevel.VERY_HIGH
elif lung_rads == LungRADSCategory.CATEGORY_4A:
return RiskLevel.HIGH
elif lung_rads == LungRADSCategory.CATEGORY_3:
return RiskLevel.INTERMEDIATE
# Trajectory based
if trajectory == ChangeTrajectory.WORSENdetermine_trajectory function · python · L286-L308 (23 LOC)src/core/longitudinal_analyzer.py
def determine_trajectory(
size_change_percent: float,
volume_change_percent: float
) -> ChangeTrajectory:
"""
Determine change trajectory from measurements.
Args:
size_change_percent: Percentage change in linear size
volume_change_percent: Percentage change in volume
Returns:
Change trajectory classification
"""
# Use volume change as primary (more sensitive)
if volume_change_percent > 25:
return ChangeTrajectory.WORSENING
elif volume_change_percent < -25:
return ChangeTrajectory.IMPROVING
elif abs(volume_change_percent) <= 25:
return ChangeTrajectory.STABLE
return ChangeTrajectory.INDETERMINATERepobility · code-quality intelligence · https://repobility.com
generate_change_summary function · python · L315-L354 (40 LOC)src/core/longitudinal_analyzer.py
def generate_change_summary(
prior: NoduleMeasurement,
current: NoduleMeasurement,
analysis: ChangeAnalysis
) -> str:
"""
Generate natural language change summary.
This is a key innovation - converting measurements to clinical language.
"""
location = current.location or "the identified nodule"
if analysis.trajectory == ChangeTrajectory.STABLE:
return (
f"The {current.nodule_type} nodule in {location} remains stable, "
f"measuring {current.size_mm}mm (previously {prior.size_mm}mm, "
f"{analysis.size_change_percent:+.1f}% change over {analysis.days_between} days)."
)
elif analysis.trajectory == ChangeTrajectory.IMPROVING:
return (
f"The {current.nodule_type} nodule in {location} has decreased in size, "
f"now measuring {current.size_mm}mm (previously {prior.size_mm}mm, "
f"{abs(analysis.size_change_percent):.1f}% reduction over {analysis.days_betweengenerate_clinical_interpretation function · python · L357-L406 (50 LOC)src/core/longitudinal_analyzer.py
def generate_clinical_interpretation(analysis: ChangeAnalysis) -> str:
"""Generate clinical interpretation of findings."""
interpretations = []
# VDT interpretation
if analysis.volume_doubling_time_days:
vdt = analysis.volume_doubling_time_days
if vdt < 200:
interpretations.append(
f"Volume doubling time of {vdt:.0f} days is concerning for "
"rapid growth, highly suggestive of malignancy."
)
elif vdt < VDT_HIGH_RISK:
interpretations.append(
f"Volume doubling time of {vdt:.0f} days (<400 days) "
"raises concern for malignancy."
)
elif vdt < VDT_INTERMEDIATE:
interpretations.append(
f"Volume doubling time of {vdt:.0f} days suggests "
"intermediate growth rate."
)
else:
interpretations.append(
f"Volume doubling time of {vdt:.0f} days suggegenerate_recommendations function · python · L409-L441 (33 LOC)src/core/longitudinal_analyzer.py
def generate_recommendations(analysis: ChangeAnalysis) -> List[str]:
"""Generate clinical recommendations based on analysis."""
recommendations = []
if analysis.risk_level == RiskLevel.VERY_HIGH:
recommendations.append("Urgent tissue sampling or PET-CT recommended.")
recommendations.append("Consider multidisciplinary tumor board discussion.")
elif analysis.risk_level == RiskLevel.HIGH:
recommendations.append("Consider PET-CT for further characterization.")
recommendations.append("If PET positive, tissue sampling recommended.")
recommendations.append("Short-interval follow-up CT if PET not performed (3 months).")
elif analysis.risk_level == RiskLevel.INTERMEDIATE:
if analysis.trajectory == ChangeTrajectory.STABLE:
recommendations.append("Continue surveillance with follow-up CT in 6 months.")
else:
recommendations.append("Consider follow-up CT in 3 months to assess trajectory.")
egenerate_differential_evolution function · python · L444-L509 (66 LOC)src/core/longitudinal_analyzer.py
def generate_differential_evolution(
analysis: ChangeAnalysis
) -> List[DifferentialDiagnosis]:
"""
Generate differential diagnosis evolution.
This is the "judge-wowing" feature - showing how differentials
should change based on the observed interval changes.
"""
differentials = []
if analysis.trajectory == ChangeTrajectory.WORSENING:
differentials.append(DifferentialDiagnosis(
diagnosis="Primary lung malignancy",
prior_probability="moderate",
current_probability="high",
change_rationale="Interval growth with VDT consistent with malignancy"
))
differentials.append(DifferentialDiagnosis(
diagnosis="Inflammatory/infectious",
prior_probability="moderate",
current_probability="low",
change_rationale="Would expect stability or resolution if infectious"
))
differentials.append(DifferentialDiagnosis(
diagnosis="Slgenerate_comparison_paragraph function · python · L512-L560 (49 LOC)src/core/longitudinal_analyzer.py
def generate_comparison_paragraph(
prior: NoduleMeasurement,
current: NoduleMeasurement,
analysis: ChangeAnalysis
) -> str:
"""
Generate draft comparison paragraph for radiology report.
This saves radiologist time by providing ready-to-use language.
"""
prior_date = prior.date.strftime("%Y-%m-%d")
if analysis.trajectory == ChangeTrajectory.STABLE:
return (
f"COMPARISON: CT Chest dated {prior_date}\n\n"
f"The previously identified {prior.size_mm}mm {prior.nodule_type} nodule "
f"in the {current.location} remains stable, now measuring {current.size_mm}mm. "
f"No significant interval change ({analysis.size_change_percent:+.1f}% over "
f"{analysis.days_between} days). Continued surveillance recommended per "
f"Lung-RADS {analysis.lung_rads_current.value if analysis.lung_rads_current else 'guidelines'}."
)
elif analysis.trajectory == ChangeTrajectory.WORSENING:
generate_patient_summary function · python · L563-L596 (34 LOC)src/core/longitudinal_analyzer.py
def generate_patient_summary(
analysis: ChangeAnalysis,
current: NoduleMeasurement
) -> str:
"""
Generate patient-friendly summary of findings.
This supports patient engagement and understanding.
"""
if analysis.trajectory == ChangeTrajectory.STABLE:
return (
f"Your lung scan shows a small spot ({current.size_mm}mm) that has not changed "
f"since your last scan. This is reassuring, as it suggests the spot is unlikely "
f"to be harmful. Your doctor recommends continuing to monitor it with regular scans."
)
elif analysis.trajectory == ChangeTrajectory.WORSENING:
return (
f"Your lung scan shows a spot that has grown since your last scan "
f"(from {current.size_mm - analysis.size_change_mm:.1f}mm to {current.size_mm}mm). "
f"While this doesn't definitely mean it's cancer, your doctor will likely recommend "
f"additional tests to learn more about it. analyze_longitudinal_change function · python · L603-L685 (83 LOC)src/core/longitudinal_analyzer.py
def analyze_longitudinal_change(
prior_measurement: NoduleMeasurement,
current_measurement: NoduleMeasurement,
clinical_context: Optional[str] = None
) -> ChangeAnalysis:
"""
Perform comprehensive longitudinal change analysis.
This is the core function of RadAssist Pro.
Args:
prior_measurement: Earlier nodule measurement
current_measurement: Later nodule measurement
clinical_context: Optional clinical context
Returns:
Complete change analysis with clinical decision support
"""
# Ensure correct ordering
if current_measurement.date < prior_measurement.date:
prior_measurement, current_measurement = current_measurement, prior_measurement
# Calculate basic metrics
size_change_mm = current_measurement.size_mm - prior_measurement.size_mm
size_change_percent = (size_change_mm / prior_measurement.size_mm) * 100 if prior_measurement.size_mm > 0 else 0
v_prior = prior_measurement.volume_calculcreate_longitudinal_report function · python · L688-L729 (42 LOC)src/core/longitudinal_analyzer.py
def create_longitudinal_report(
measurements: List[NoduleMeasurement],
clinical_context: str = ""
) -> LongitudinalReport:
"""
Create a complete longitudinal analysis report.
Args:
measurements: List of measurements (oldest to newest)
clinical_context: Clinical context string
Returns:
Complete longitudinal report
"""
if len(measurements) < 2:
raise ValueError("At least 2 measurements required for longitudinal analysis")
# Sort by date
measurements = sorted(measurements, key=lambda m: m.date)
# Analyze most recent change
prior = measurements[-2]
current = measurements[-1]
analysis = analyze_longitudinal_change(prior, current, clinical_context)
# Generate differentials
differentials = generate_differential_evolution(analysis)
# Build timeline summary
timeline_lines = ["TIMELINE:"]
for m in measurements:
date_str = m.date.strftime("%Y-%m-%d")
timeline_lines.appGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
demo_analysis function · python · L736-L844 (109 LOC)src/core/longitudinal_analyzer.py
def demo_analysis():
"""
Demonstrate the longitudinal analysis capability.
This creates the demo scenario for the competition.
"""
print("=" * 70)
print("RadAssist Pro - Longitudinal Analysis Demo")
print("'AI That Remembers'")
print("=" * 70)
print()
# Create demo measurements (the "missed progression" scenario)
measurements = [
NoduleMeasurement(
date=datetime(2024, 1, 15),
size_mm=6.0,
location="right upper lobe",
nodule_type="solid",
morphology="spiculated"
),
NoduleMeasurement(
date=datetime(2024, 7, 20),
size_mm=6.2,
location="right upper lobe",
nodule_type="solid",
morphology="spiculated"
),
NoduleMeasurement(
date=datetime(2025, 1, 18),
size_mm=6.8,
location="right upper lobe",
nodule_type="solid",
morphology="spiculDetectedNodule.to_dict method · python · L88-L102 (15 LOC)src/core/nodule_detector.py
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"nodule_id": self.nodule_id,
"location": self.location,
"nodule_type": self.nodule_type,
"size_mm": self.size_mm,
"size_perpendicular_mm": self.size_perpendicular_mm,
"volume_mm3": self.volume_mm3,
"confidence": self.confidence,
"detection_method": self.detection_method,
"morphology": self.morphology,
"calcification": self.calcification,
"cavitation": self.cavitation
}ScanMetadata.to_dict method · python · L116-L123 (8 LOC)src/core/nodule_detector.py
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"scan_date": self.scan_date.isoformat(),
"modality": self.modality,
"series_description": self.series_description,
"slice_thickness_mm": self.slice_thickness_mm
}page 1 / 5next ›