← back to jeremydpotts__med-gemma-hackathon

Function bodies 229 total

All specs Real LLM only Function bodies
ThinkTankHandler.end_headers method · python · L36-L41 (6 LOC)
brainstorming/think-tank-server.py
    def end_headers(self):
        """Add CORS headers to allow frontend access."""
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        super().end_headers()
ThinkTankHandler.do_GET method · python · L48-L62 (15 LOC)
brainstorming/think-tank-server.py
    def do_GET(self):
        """Handle GET requests."""
        parsed_path = urlparse(self.path)
        query_params = parse_qs(parsed_path.query)

        # API endpoint to get session content
        if parsed_path.path == '/api/session':
            session_file = query_params.get('file', [SESSION_FILE])[0]
            self.send_session_content(session_file)
        # API endpoint to list all sessions
        elif parsed_path.path == '/api/sessions':
            self.list_sessions()
        else:
            # Serve static files (HTML, JS, etc.)
            super().do_GET()
ThinkTankHandler.do_POST method · python · L64-L72 (9 LOC)
brainstorming/think-tank-server.py
    def do_POST(self):
        """Handle POST requests."""
        parsed_path = urlparse(self.path)

        # API endpoint to append message
        if parsed_path.path == '/api/message':
            self.append_message()
        else:
            self.send_error(404, 'Endpoint not found')
ThinkTankHandler.json_to_markdown method · python · L74-L205 (132 LOC)
brainstorming/think-tank-server.py
    def json_to_markdown(self, data):
        """Convert a JSON think tank session to readable markdown format."""
        md_lines = []
        
        # Session metadata
        if 'session_metadata' in data:
            sm = data['session_metadata']
            md_lines.append(f"# {sm.get('title', 'Think Tank Session')}")
            md_lines.append("")
            if 'date' in sm:
                md_lines.append(f"**Date:** {sm['date']}")
            if 'participant' in sm:
                md_lines.append(f"**Participant:** {sm['participant']}")
            if 'format' in sm:
                md_lines.append(f"**Format:** {sm['format']}")
            if 'duration' in sm:
                md_lines.append(f"**Duration:** {sm['duration']}")
            if 'outcome' in sm:
                md_lines.append(f"**Outcome:** {sm['outcome']}")
            md_lines.append("")
            md_lines.append("---")
            md_lines.append("")
        
        # Expert panel
        if 'expert_pa
ThinkTankHandler.send_session_content method · python · L207-L245 (39 LOC)
brainstorming/think-tank-server.py
    def send_session_content(self, session_file=None):
        """Send the current session file content."""
        try:
            file_path = session_file or SESSION_FILE

            # Security: Ensure file is within allowed directories
            file_path = os.path.normpath(file_path)
            allowed_prefixes = ['docs/', 'project-management/']
            is_allowed = any(file_path.startswith(prefix) for prefix in allowed_prefixes)
            
            if not is_allowed:
                # Try to find it in docs first
                potential_path = f'docs/{os.path.basename(file_path)}'
                if os.path.exists(potential_path):
                    file_path = potential_path
                else:
                    self.send_error(403, f'Access denied: {file_path}')
                    return

            if os.path.exists(file_path):
                with open(file_path, 'r', encoding='utf-8') as f:
                    content = f.read()
                
       
ThinkTankHandler.extract_session_metadata method · python · L247-L332 (86 LOC)
brainstorming/think-tank-server.py
    def extract_session_metadata(self, file_path, file_format):
        """Extract metadata from a session file."""
        filename = os.path.basename(file_path)
        metadata = {
            'file': file_path,
            'filename': filename,
            'format': file_format,
            'date': 'Unknown',
            'display_name': filename.replace('_', ' ').replace('-', ' ').replace('.md', '').replace('.json', ''),
            'title': None,
            'session_type': 'General'
        }
        
        try:
            # Try to extract date from filename
            date_match = re.search(r'(\d{4})-(\d{2})-(\d{2})', filename)
            if date_match:
                year, month, day = date_match.groups()
                metadata['date'] = f"{year}-{month}-{day}"
                metadata['display_name'] = f"Think Tank Session - {month}/{day}/{year}"
            
            # Read file to extract more metadata
            with open(file_path, 'r', encoding='utf-8') as f:
ThinkTankHandler.list_sessions method · python · L334-L379 (46 LOC)
brainstorming/think-tank-server.py
    def list_sessions(self):
        """List all available Think Tank session files from all sources."""
        try:
            sessions = []
            seen_files = set()  # Avoid duplicates
            
            # Scan all session sources
            for source in SESSION_SOURCES:
                source_dir = source['dir']
                pattern = source['pattern']
                file_format = source['format']
                
                if not os.path.exists(source_dir):
                    continue
                
                full_pattern = os.path.join(source_dir, pattern)
                found_files = glob.glob(full_pattern)
                
                for file_path in found_files:
                    # Skip if already processed
                    abs_path = os.path.abspath(file_path)
                    if abs_path in seen_files:
                        continue
                    seen_files.add(abs_path)
                    
                    # Extrac
Repobility · code-quality intelligence · https://repobility.com
ThinkTankHandler.append_message method · python · L381-L419 (39 LOC)
brainstorming/think-tank-server.py
    def append_message(self):
        """Append a new message to the session file."""
        try:
            # Read POST data
            content_length = int(self.headers['Content-Length'])
            post_data = self.rfile.read(content_length)
            data = json.loads(post_data.decode('utf-8'))

            message = data.get('message', '').strip()
            speaker = data.get('speaker', 'Jeremy Potts')
            role = data.get('role', 'CEO')

            if not message:
                self.send_error(400, 'Message is required')
                return

            # Format the message
            timestamp = datetime.now().strftime('%B %d, %Y at %I:%M %p')
            formatted_message = f"\n\n### {speaker} ({role})\n*{timestamp}*\n\n{message}\n"

            # Append to file
            with open(SESSION_FILE, 'a', encoding='utf-8') as f:
                f.write(formatted_message)

            # Send success response
            self.send_response(200)
            self
run_server function · python · L426-L474 (49 LOC)
brainstorming/think-tank-server.py
def run_server(port=8000):
    """Start the Think Tank session server."""
    server_address = ('', port)
    httpd = HTTPServer(server_address, ThinkTankHandler)

    # Count available sessions
    session_count = 0
    for source in SESSION_SOURCES:
        if os.path.exists(source['dir']):
            pattern = os.path.join(source['dir'], source['pattern'])
            session_count += len(glob.glob(pattern))

    print(f"""
╔══════════════════════════════════════════════════════════════╗
║          KNOVA Quest Think Tank Session Server               ║
║            Session History Browser Enabled                   ║
╚══════════════════════════════════════════════════════════════╝

🚀 Server running at: http://localhost:{port}
📝 Active session: {SESSION_FILE}
📚 Sessions available: {session_count} (MD + JSON formats)
🌐 Open viewer: http://localhost:{port}/think-tank-viewer.html

Session sources:
  - docs/THINK_TANK_SESSION_*.md
  - project-management/think-tank-sessions/*.json
  - proj
get_confidence_class function · python · L296-L303 (8 LOC)
src/app/streamlit_app.py
def get_confidence_class(confidence: float) -> str:
    """Get CSS class based on confidence level."""
    if confidence >= 0.8:
        return "confidence-high"
    elif confidence >= 0.6:
        return "confidence-medium"
    else:
        return "confidence-low"
format_findings_html function · python · L306-L311 (6 LOC)
src/app/streamlit_app.py
def format_findings_html(findings: list) -> str:
    """Format findings as HTML list."""
    html = ""
    for finding in findings:
        html += f'<div class="finding-item">{finding}</div>'
    return html
initialize_pipeline function · python · L314-L323 (10 LOC)
src/app/streamlit_app.py
def initialize_pipeline():
    """Initialize the inference pipeline."""
    if st.session_state.pipeline is None:
        if MODELS_AVAILABLE:
            # Use mock model for demo (replace with real model in production)
            model = MockMedGemmaModel()
            st.session_state.pipeline = InferencePipeline(model=model)
        else:
            st.warning("Model modules not available. Using demo mode.")
            st.session_state.pipeline = None
initialize_image_pipeline function · python · L326-L334 (9 LOC)
src/app/streamlit_app.py
def initialize_image_pipeline():
    """Initialize the image analysis pipeline."""
    if st.session_state.image_pipeline is None:
        if PIPELINE_AVAILABLE:
            config = AnalysisPipelineConfig(mock_mode=True)
            st.session_state.image_pipeline = ImageAnalysisPipeline(config)
        else:
            st.warning("Image pipeline not available. Using demo mode.")
            st.session_state.image_pipeline = None
render_sidebar function · python · L341-L391 (51 LOC)
src/app/streamlit_app.py
def render_sidebar():
    """Render the sidebar."""
    with st.sidebar:
        st.image("https://via.placeholder.com/200x80?text=RadAssist+Pro", width=200)

        st.markdown("---")

        st.markdown("### Analysis Mode")
        analysis_mode = st.radio(
            "Select analysis type:",
            ["2D Image Analysis", "3D Volume Analysis", "Longitudinal Comparison", "CT Scan Series Upload"],
            help="Choose the type of analysis to perform"
        )

        st.markdown("---")

        st.markdown("### Settings")

        show_confidence = st.checkbox("Show confidence scores", value=True)
        show_processing_time = st.checkbox("Show processing time", value=True)
        generate_report = st.checkbox("Generate text report", value=True)

        st.markdown("---")

        st.markdown("### Model Info")
        st.info("""
        **Model:** MedGemma 1.5 4B
        **Provider:** Google
        **Unique Capabilities:**
        - 3D volumetric analysis
        - Lo
render_header function · python · L398-L417 (20 LOC)
src/app/streamlit_app.py
def render_header():
    """Render the main header."""
    st.markdown('<h1 class="main-header">🏥 RadAssist Pro</h1>', unsafe_allow_html=True)
    st.markdown('<p class="tagline">"AI That Remembers"</p>', unsafe_allow_html=True)
    st.markdown(
        '<p class="sub-header">Longitudinal Change Detection with Clinical Decision Support • Powered by MedGemma 1.5</p>',
        unsafe_allow_html=True
    )

    # Disclaimer
    st.markdown("""
    <div class="disclaimer-box">
        <h4>⚠️ Research Prototype - Not for Clinical Use</h4>
        <p>
            This is a demonstration system for the Med-Gemma Impact Challenge.
            All findings must be verified by a qualified radiologist.
            Not FDA-cleared. For research and educational purposes only.
        </p>
    </div>
    """, unsafe_allow_html=True)
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
render_2d_analysis function · python · L420-L485 (66 LOC)
src/app/streamlit_app.py
def render_2d_analysis(settings: dict):
    """Render 2D image analysis interface."""
    st.markdown("## 📷 2D Medical Image Analysis")
    st.markdown("Upload a chest X-ray or other 2D medical image for AI analysis.")

    col1, col2 = st.columns([1, 1])

    with col1:
        st.markdown("### Upload Image")

        uploaded_file = st.file_uploader(
            "Choose a medical image",
            type=['png', 'jpg', 'jpeg', 'dcm'],
            help="Supported formats: PNG, JPEG, DICOM"
        )

        clinical_context = st.text_area(
            "Clinical Context (optional)",
            placeholder="Enter relevant clinical history...",
            help="Adding clinical context can improve analysis accuracy"
        )

        study_type = st.selectbox(
            "Study Type",
            ["Chest X-ray", "Abdominal X-ray", "Bone X-ray", "Other"],
            index=0
        )

        analyze_button = st.button("🔍 Analyze Image", type="primary", use_container_width=True)

   
render_3d_analysis function · python · L488-L543 (56 LOC)
src/app/streamlit_app.py
def render_3d_analysis(settings: dict):
    """Render 3D volume analysis interface."""
    st.markdown("## 🧊 3D Volumetric Analysis")
    st.markdown("""
    **MedGemma 1.5 Unique Capability**: Analyze complete CT/MRI volumes.
    This is a capability unique to MedGemma 1.5 - no other open-source model can do this.
    """)

    st.info("📁 Upload a folder of DICOM files or a single 3D volume file")

    uploaded_files = st.file_uploader(
        "Upload DICOM slices",
        type=['dcm'],
        accept_multiple_files=True,
        help="Upload multiple DICOM files from a CT/MRI scan"
    )

    if uploaded_files:
        st.success(f"Uploaded {len(uploaded_files)} DICOM files")

        col1, col2 = st.columns(2)

        with col1:
            st.markdown("### Volume Information")
            st.write(f"**Slices:** {len(uploaded_files)}")
            st.write("**Modality:** CT (estimated)")

        with col2:
            clinical_context = st.text_area(
                "Clinical Co
render_longitudinal_analysis function · python · L546-L561 (16 LOC)
src/app/streamlit_app.py
def render_longitudinal_analysis(settings: dict):
    """Render longitudinal comparison interface - THE PRIMARY DIFFERENTIATOR."""
    st.markdown("## 📈 Longitudinal Change Detection")
    st.markdown("""
    **The Innovation:** RadAssist Pro doesn't just detect findings - it **remembers**.
    Track disease progression with clinical decision support, Lung-RADS integration,
    and differential diagnosis evolution.
    """)

    # Demo mode toggle
    demo_mode = st.checkbox("🎬 Run Demo Scenario (The Missed Progression Save)", value=True)

    if demo_mode:
        render_demo_longitudinal()
    else:
        render_custom_longitudinal(settings)
render_demo_longitudinal function · python · L564-L586 (23 LOC)
src/app/streamlit_app.py
def render_demo_longitudinal():
    """Render the demo scenario for the competition video."""
    st.markdown("---")
    st.markdown("### 📋 Demo Scenario: The Missed Progression Save")
    st.markdown("""
    > *"A 58-year-old patient was told 'stable' four times. She had cancer."*

    This demo shows how RadAssist Pro would have caught what traditional reads missed.
    """)

    # Patient context
    with st.expander("👤 Patient Context", expanded=True):
        st.markdown("""
        - **Age:** 58 years old
        - **Gender:** Female
        - **History:** Former smoker (30 pack-years, quit 5 years ago)
        - **Finding:** Incidental 6mm lung nodule on screening CT (January 2024)
        - **Prior Reports:** "Stable nodule, continued surveillance" x4
        """)

    if st.button("🔍 Run Longitudinal Analysis", type="primary", use_container_width=True):
        with st.spinner("Analyzing 18 months of sequential scans with MedGemma..."):
            render_demo_results_longitud
render_demo_results_longitudinal function · python · L589-L768 (180 LOC)
src/app/streamlit_app.py
def render_demo_results_longitudinal():
    """Render the demo longitudinal analysis results."""

    if not LONGITUDINAL_AVAILABLE:
        st.error("Longitudinal analyzer module not available")
        return

    # Create the demo measurements
    measurements = [
        NoduleMeasurement(datetime(2024, 1, 15), 6.0, "right upper lobe", "solid"),
        NoduleMeasurement(datetime(2024, 7, 20), 6.2, "right upper lobe", "solid"),
        NoduleMeasurement(datetime(2025, 1, 18), 6.8, "right upper lobe", "solid"),
        NoduleMeasurement(datetime(2025, 7, 15), 8.3, "right upper lobe", "solid"),
    ]

    clinical_context = "58-year-old female, former smoker (30 pack-years), incidental lung nodule on screening CT"

    # Create the report
    report = create_longitudinal_report(measurements, clinical_context)
    analysis = report.analysis

    st.success("✅ Analysis Complete")

    # Timeline
    st.markdown("### 📅 Measurement Timeline")
    timeline_html = ""
    for m in measureme
render_custom_longitudinal function · python · L771-L831 (61 LOC)
src/app/streamlit_app.py
def render_custom_longitudinal(settings: dict):
    """Render custom longitudinal comparison interface."""
    st.info("Enter nodule measurements from sequential scans for analysis")

    # Input form for measurements
    st.markdown("### Enter Measurements")

    num_timepoints = st.slider("Number of timepoints", 2, 6, 4)

    measurements = []
    cols = st.columns(num_timepoints)

    for i, col in enumerate(cols):
        with col:
            st.markdown(f"**Scan {i+1}**")
            date = st.date_input(
                "Date",
                datetime.now() - timedelta(days=180 * (num_timepoints - 1 - i)),
                key=f"date_{i}"
            )
            size = st.number_input(
                "Size (mm)",
                min_value=1.0,
                max_value=50.0,
                value=6.0 + i * 0.5,
                step=0.1,
                key=f"size_{i}"
            )
            measurements.append((date, size))

    clinical_context = st.text_area(
        "Cl
render_analysis_results function · python · L834-L892 (59 LOC)
src/app/streamlit_app.py
def render_analysis_results(report: 'AnalysisReport', settings: dict):
    """Render analysis results."""
    st.markdown("---")
    st.markdown("## 📋 Analysis Results")

    # Status indicator
    if report.abnormalities:
        st.markdown(
            '<span class="status-abnormal">⚠️ ABNORMALITIES DETECTED</span>',
            unsafe_allow_html=True
        )
    else:
        st.markdown(
            '<span class="status-normal">✓ NO ACUTE ABNORMALITY</span>',
            unsafe_allow_html=True
        )

    col1, col2 = st.columns([2, 1])

    with col1:
        # Findings
        st.markdown("### Findings")
        for i, finding in enumerate(report.findings, 1):
            st.markdown(f'<div class="finding-item">{i}. {finding}</div>', unsafe_allow_html=True)

        # Impression
        st.markdown("### Impression")
        st.info(report.impression)

    with col2:
        # Metrics
        st.markdown("### Metrics")

        if settings.get("show_confidence", True):
     
render_ct_series_upload function · python · L895-L1062 (168 LOC)
src/app/streamlit_app.py
def render_ct_series_upload(settings: dict):
    """Render CT scan series upload interface for longitudinal analysis."""
    st.markdown("## 📂 CT Scan Series Upload")
    st.markdown("""
    **Upload sequential CT scans** for automated nodule detection and longitudinal analysis.
    RadAssist Pro will detect nodules, measure them, and track changes over time.
    """)

    # Initialize pipeline
    initialize_image_pipeline()

    # Upload section
    st.markdown("### 📤 Upload CT Scans")

    col1, col2 = st.columns([2, 1])

    with col1:
        # Multiple scan upload
        num_scans = st.slider("Number of scans to upload", 2, 6, 4, key="num_ct_scans")

        scans_data = []
        for i in range(num_scans):
            with st.expander(f"📁 Scan {i+1}", expanded=(i == 0)):
                scan_date = st.date_input(
                    f"Scan Date",
                    datetime.now() - timedelta(days=180 * (num_scans - 1 - i)),
                    key=f"ct_date_{i}"
             
Powered by Repobility — scan your code at https://repobility.com
render_pipeline_results function · python · L1065-L1121 (57 LOC)
src/app/streamlit_app.py
def render_pipeline_results(result: 'PipelineResult', settings: dict):
    """Render results from the image analysis pipeline."""
    st.markdown("---")
    st.success("✅ Analysis Complete")

    # Detection summary
    st.markdown("### 🔍 Detection Summary")
    col1, col2, col3 = st.columns(3)

    with col1:
        st.metric("Scans Processed", result.scans_processed)

    with col2:
        nodule_count = len(result.detection_results[0].nodules) if result.detection_results else 0
        st.metric("Nodules Detected", nodule_count)

    with col3:
        st.metric("Longitudinal Data", "Yes" if result.has_longitudinal else "No")

    # Detection details
    if result.detection_results:
        st.markdown("### 📋 Detection Details")
        for i, detection in enumerate(result.detection_results):
            with st.expander(f"Scan {i+1}: {detection.scan_metadata.scan_date.strftime('%Y-%m-%d')}"):
                st.write(f"**Modality:** {detection.scan_metadata.modality}")
          
render_longitudinal_report function · python · L1124-L1229 (106 LOC)
src/app/streamlit_app.py
def render_longitudinal_report(report, settings: dict):
    """Render a longitudinal report (shared between modes)."""
    analysis = report.analysis

    st.markdown("### 📈 Longitudinal Change Analysis")

    # Timeline
    st.markdown("#### 📅 Measurement Timeline")
    timeline_html = ""
    for m in report.measurements:
        date_str = m.date.strftime("%Y-%m-%d")
        timeline_html += f'''
        <div class="timeline-item">
            <span class="timeline-date">{date_str}</span>
            <span class="timeline-value">{m.size_mm}mm {m.nodule_type} nodule</span>
        </div>
        '''
    st.markdown(timeline_html, unsafe_allow_html=True)

    # Key metrics
    col1, col2, col3 = st.columns(3)

    with col1:
        st.metric(
            "Size Change",
            f"+{analysis.size_change_mm:.1f}mm",
            f"{analysis.size_change_percent:.1f}%"
        )

    with col2:
        vdt_display = f"{analysis.volume_doubling_time_days:.0f} days" if analysis.volume_dou
render_demo_results function · python · L1232-L1265 (34 LOC)
src/app/streamlit_app.py
def render_demo_results(settings: dict):
    """Render demo results when pipeline not available."""
    st.markdown("---")
    st.markdown("## 📋 Analysis Results (Demo Mode)")

    st.markdown(
        '<span class="status-normal">✓ NO ACUTE ABNORMALITY</span>',
        unsafe_allow_html=True
    )

    findings = [
        "Heart size is normal",
        "Lungs are clear bilaterally",
        "No pleural effusion",
        "No pneumothorax",
        "Mediastinal contours are unremarkable",
        "Osseous structures are intact"
    ]

    col1, col2 = st.columns([2, 1])

    with col1:
        st.markdown("### Findings")
        for i, finding in enumerate(findings, 1):
            st.markdown(f'<div class="finding-item">{i}. {finding}</div>', unsafe_allow_html=True)

        st.markdown("### Impression")
        st.info("No acute cardiopulmonary abnormality")

    with col2:
        st.markdown("### Metrics")
        st.markdown('**Confidence:** <span class="confidence-high">87%</sp
main function · python · L1272-L1298 (27 LOC)
src/app/streamlit_app.py
def main():
    """Main application entry point."""
    # Render header
    render_header()

    # Render sidebar and get settings
    settings = render_sidebar()

    # Route to appropriate analysis mode
    if settings["mode"] == "2D Image Analysis":
        render_2d_analysis(settings)
    elif settings["mode"] == "3D Volume Analysis":
        render_3d_analysis(settings)
    elif settings["mode"] == "Longitudinal Comparison":
        render_longitudinal_analysis(settings)
    elif settings["mode"] == "CT Scan Series Upload":
        render_ct_series_upload(settings)

    # Footer
    st.markdown("---")
    st.markdown("""
    <div style="text-align: center; color: #718096; font-size: 0.8rem;">
        <p>RadAssist Pro • Med-Gemma Impact Challenge 2026</p>
        <p>Powered by Google MedGemma 1.5 • Built with Streamlit</p>
        <p>⚠️ FOR RESEARCH PURPOSES ONLY - NOT FOR CLINICAL USE</p>
    </div>
    """, unsafe_allow_html=True)
PipelineResult.generate_summary method · python · L85-L114 (30 LOC)
src/core/image_analysis_pipeline.py
    def generate_summary(self) -> Dict[str, Any]:
        """Generate a summary dictionary of the analysis."""
        summary = {
            "scans_analyzed": self.scans_processed,
            "nodules_found": self.nodule_count,
            "has_longitudinal_data": self.has_longitudinal,
            "requires_action": self.requires_action,
        }

        if self.has_longitudinal and self.longitudinal_report:
            analysis = self.longitudinal_report.analysis
            summary.update({
                "risk_level": analysis.risk_level.value,
                "trajectory": analysis.trajectory.value,
                "size_change_percent": f"{analysis.size_change_percent:.1f}%",
                "volume_doubling_time": (
                    f"{analysis.volume_doubling_time_days:.0f} days"
                    if analysis.volume_doubling_time_days else "N/A"
                ),
                "lung_rads": (
                    analysis.lung_rads_current.value
                    
ImageAnalysisPipeline.__init__ method · python · L144-L156 (13 LOC)
src/core/image_analysis_pipeline.py
    def __init__(self, config: Optional[AnalysisPipelineConfig] = None, model=None):
        """
        Initialize the analysis pipeline.

        Args:
            config: Pipeline configuration
            model: Optional MedGemma model instance
        """
        self.config = config or AnalysisPipelineConfig()
        self.detector = NoduleDetector(model=model)
        self._model = model

        logger.info(f"ImageAnalysisPipeline initialized (mock_mode={self.config.mock_mode})")
ImageAnalysisPipeline.analyze_single method · python · L158-L194 (37 LOC)
src/core/image_analysis_pipeline.py
    def analyze_single(
        self,
        image_path: Union[str, Path],
        scan_date: Optional[datetime] = None,
        clinical_context: Optional[str] = None
    ) -> PipelineResult:
        """
        Analyze a single scan for nodules.

        Args:
            image_path: Path to image/DICOM file
            scan_date: Date of scan
            clinical_context: Clinical history

        Returns:
            PipelineResult with detection findings
        """
        import time
        start_time = time.time()

        # Run detection
        detection_result = self.detector.detect(
            image_path,
            scan_date=scan_date,
            clinical_context=clinical_context
        )

        processing_time = (time.time() - start_time) * 1000

        return PipelineResult(
            detection_results=[detection_result],
            nodule_count=detection_result.nodule_count,
            scans_processed=1,
            processing_time_ms=processing_time,
     
ImageAnalysisPipeline.analyze_longitudinal method · python · L196-L276 (81 LOC)
src/core/image_analysis_pipeline.py
    def analyze_longitudinal(
        self,
        scans: List[Tuple[Union[str, Path], datetime]],
        clinical_context: Optional[str] = None,
        nodule_location: Optional[str] = None
    ) -> PipelineResult:
        """
        Perform longitudinal analysis across multiple scans.

        This is the PRIMARY FEATURE of RadAssist Pro.

        Args:
            scans: List of (image_path, scan_date) tuples
            clinical_context: Clinical history
            nodule_location: Optional specific nodule to track

        Returns:
            PipelineResult with longitudinal analysis
        """
        import time
        start_time = time.time()

        warnings = []
        errors = []

        # Sort scans by date
        scans_sorted = sorted(scans, key=lambda x: x[1])

        # Detect nodules in each scan
        detection_results = []
        for image_path, scan_date in scans_sorted:
            result = self.detector.detect(
                image_path,
           
Repobility — same analyzer, your code, free for public repos · /scan/
ImageAnalysisPipeline.analyze_with_manual_measurements method · python · L278-L335 (58 LOC)
src/core/image_analysis_pipeline.py
    def analyze_with_manual_measurements(
        self,
        measurements: List[Dict[str, Any]],
        clinical_context: Optional[str] = None
    ) -> PipelineResult:
        """
        Run longitudinal analysis with manually entered measurements.

        This is useful when nodule measurements are already known
        (e.g., from prior reports or manual measurement).

        Args:
            measurements: List of dicts with 'date', 'size_mm', 'location'
            clinical_context: Clinical history

        Returns:
            PipelineResult with longitudinal analysis
        """
        import time
        start_time = time.time()

        # Convert to NoduleMeasurement objects
        nodule_measurements = []
        for m in measurements:
            date = m.get('date')
            if isinstance(date, str):
                date = datetime.fromisoformat(date)

            nodule_measurements.append(NoduleMeasurement(
                date=date,
                size_mm=m.
ImageAnalysisPipeline._extract_measurements_for_tracking method · python · L337-L378 (42 LOC)
src/core/image_analysis_pipeline.py
    def _extract_measurements_for_tracking(
        self,
        detection_results: List[DetectionResult],
        target_location: Optional[str] = None
    ) -> List[NoduleMeasurement]:
        """
        Extract nodule measurements for longitudinal tracking.

        Matches nodules across scans by location.
        """
        measurements = []

        for detection in detection_results:
            if not detection.has_nodules:
                continue

            # Find nodule to track
            if target_location:
                # Look for specific location
                matching_nodules = [
                    n for n in detection.nodules
                    if target_location.lower() in n.location.lower()
                ]
                if matching_nodules:
                    nodule = max(matching_nodules, key=lambda n: n.size_mm)
                else:
                    continue
            else:
                # Track largest nodule
                nodule = dete
ImageAnalysisPipeline.generate_summary method · python · L380-L418 (39 LOC)
src/core/image_analysis_pipeline.py
    def generate_summary(self, result: PipelineResult) -> Dict[str, Any]:
        """
        Generate a human-readable summary of the analysis.

        Args:
            result: PipelineResult from analysis

        Returns:
            Dictionary with summary information
        """
        summary = {
            "scans_analyzed": result.scans_processed,
            "nodules_found": result.nodule_count,
            "has_longitudinal_data": result.has_longitudinal,
            "requires_action": result.requires_action,
        }

        if result.has_longitudinal:
            analysis = result.longitudinal_report.analysis
            summary.update({
                "risk_level": analysis.risk_level.value,
                "trajectory": analysis.trajectory.value,
                "size_change_percent": f"{analysis.size_change_percent:.1f}%",
                "volume_doubling_time": (
                    f"{analysis.volume_doubling_time_days:.0f} days"
                    if analysis.v
demo_pipeline function · python · L421-L473 (53 LOC)
src/core/image_analysis_pipeline.py
def demo_pipeline():
    """Demo the full analysis pipeline."""
    print("=" * 70)
    print("RadAssist Pro - Image Analysis Pipeline Demo")
    print("\"AI That Remembers\"")
    print("=" * 70)
    print()

    # Create pipeline
    pipeline = ImageAnalysisPipeline()

    # Demo with manual measurements (simulating detected nodules)
    measurements = [
        {"date": datetime(2024, 1, 15), "size_mm": 6.0, "location": "right upper lobe"},
        {"date": datetime(2024, 7, 20), "size_mm": 6.2, "location": "right upper lobe"},
        {"date": datetime(2025, 1, 18), "size_mm": 6.8, "location": "right upper lobe"},
        {"date": datetime(2025, 7, 15), "size_mm": 8.3, "location": "right upper lobe"},
    ]

    result = pipeline.analyze_with_manual_measurements(
        measurements,
        clinical_context="58-year-old female, former smoker (30 pack-years)"
    )

    # Print summary
    summary = pipeline.generate_summary(result)

    print("ANALYSIS SUMMARY")
    print("-" * 4
calculate_volume_doubling_time function · python · L152-L189 (38 LOC)
src/core/longitudinal_analyzer.py
def calculate_volume_doubling_time(
    measurement1: NoduleMeasurement,
    measurement2: NoduleMeasurement
) -> Optional[float]:
    """
    Calculate volume doubling time (VDT) between two measurements.

    VDT = (days_between * ln(2)) / ln(V2/V1)

    Args:
        measurement1: Earlier measurement
        measurement2: Later measurement

    Returns:
        VDT in days, or None if calculation not possible
    """
    v1 = measurement1.volume_calculated
    v2 = measurement2.volume_calculated

    # Ensure measurements are in correct order
    if measurement2.date < measurement1.date:
        measurement1, measurement2 = measurement2, measurement1
        v1, v2 = v2, v1

    days_between = (measurement2.date - measurement1.date).days

    if days_between <= 0:
        return None

    if v2 <= v1:
        # Nodule shrinking or stable - VDT not applicable
        return None

    try:
        vdt = (days_between * math.log(2)) / math.log(v2 / v1)
        return vdt
    except (Va
classify_lung_rads function · python · L192-L243 (52 LOC)
src/core/longitudinal_analyzer.py
def classify_lung_rads(
    size_mm: float,
    nodule_type: str = "solid",
    growth_detected: bool = False,
    vdt_days: Optional[float] = None
) -> LungRADSCategory:
    """
    Classify nodule according to Lung-RADS guidelines.

    Simplified implementation based on ACR Lung-RADS v1.1.

    Args:
        size_mm: Nodule size in mm
        nodule_type: solid, ground-glass, or part-solid
        growth_detected: Whether growth was detected
        vdt_days: Volume doubling time if calculated

    Returns:
        Lung-RADS category
    """
    if nodule_type == "solid":
        if size_mm < 6:
            return LungRADSCategory.CATEGORY_2
        elif size_mm < 8:
            if growth_detected:
                return LungRADSCategory.CATEGORY_4A
            return LungRADSCategory.CATEGORY_3
        elif size_mm < 15:
            if growth_detected and vdt_days and vdt_days < VDT_HIGH_RISK:
                return LungRADSCategory.CATEGORY_4B
            return LungRADSCategory.C
assess_risk_level function · python · L246-L283 (38 LOC)
src/core/longitudinal_analyzer.py
def assess_risk_level(
    vdt_days: Optional[float],
    lung_rads: LungRADSCategory,
    trajectory: ChangeTrajectory
) -> RiskLevel:
    """
    Assess overall risk level based on multiple factors.

    Args:
        vdt_days: Volume doubling time
        lung_rads: Current Lung-RADS category
        trajectory: Change trajectory

    Returns:
        Risk level assessment
    """
    # VDT-based risk
    if vdt_days:
        if vdt_days < 200:
            return RiskLevel.VERY_HIGH
        elif vdt_days < VDT_HIGH_RISK:
            return RiskLevel.HIGH
        elif vdt_days < VDT_INTERMEDIATE:
            return RiskLevel.INTERMEDIATE

    # Lung-RADS based risk
    if lung_rads == LungRADSCategory.CATEGORY_4B:
        return RiskLevel.VERY_HIGH
    elif lung_rads == LungRADSCategory.CATEGORY_4A:
        return RiskLevel.HIGH
    elif lung_rads == LungRADSCategory.CATEGORY_3:
        return RiskLevel.INTERMEDIATE

    # Trajectory based
    if trajectory == ChangeTrajectory.WORSEN
determine_trajectory function · python · L286-L308 (23 LOC)
src/core/longitudinal_analyzer.py
def determine_trajectory(
    size_change_percent: float,
    volume_change_percent: float
) -> ChangeTrajectory:
    """
    Determine change trajectory from measurements.

    Args:
        size_change_percent: Percentage change in linear size
        volume_change_percent: Percentage change in volume

    Returns:
        Change trajectory classification
    """
    # Use volume change as primary (more sensitive)
    if volume_change_percent > 25:
        return ChangeTrajectory.WORSENING
    elif volume_change_percent < -25:
        return ChangeTrajectory.IMPROVING
    elif abs(volume_change_percent) <= 25:
        return ChangeTrajectory.STABLE

    return ChangeTrajectory.INDETERMINATE
Repobility · code-quality intelligence · https://repobility.com
generate_change_summary function · python · L315-L354 (40 LOC)
src/core/longitudinal_analyzer.py
def generate_change_summary(
    prior: NoduleMeasurement,
    current: NoduleMeasurement,
    analysis: ChangeAnalysis
) -> str:
    """
    Generate natural language change summary.

    This is a key innovation - converting measurements to clinical language.
    """
    location = current.location or "the identified nodule"

    if analysis.trajectory == ChangeTrajectory.STABLE:
        return (
            f"The {current.nodule_type} nodule in {location} remains stable, "
            f"measuring {current.size_mm}mm (previously {prior.size_mm}mm, "
            f"{analysis.size_change_percent:+.1f}% change over {analysis.days_between} days)."
        )

    elif analysis.trajectory == ChangeTrajectory.IMPROVING:
        return (
            f"The {current.nodule_type} nodule in {location} has decreased in size, "
            f"now measuring {current.size_mm}mm (previously {prior.size_mm}mm, "
            f"{abs(analysis.size_change_percent):.1f}% reduction over {analysis.days_between
generate_clinical_interpretation function · python · L357-L406 (50 LOC)
src/core/longitudinal_analyzer.py
def generate_clinical_interpretation(analysis: ChangeAnalysis) -> str:
    """Generate clinical interpretation of findings."""

    interpretations = []

    # VDT interpretation
    if analysis.volume_doubling_time_days:
        vdt = analysis.volume_doubling_time_days
        if vdt < 200:
            interpretations.append(
                f"Volume doubling time of {vdt:.0f} days is concerning for "
                "rapid growth, highly suggestive of malignancy."
            )
        elif vdt < VDT_HIGH_RISK:
            interpretations.append(
                f"Volume doubling time of {vdt:.0f} days (<400 days) "
                "raises concern for malignancy."
            )
        elif vdt < VDT_INTERMEDIATE:
            interpretations.append(
                f"Volume doubling time of {vdt:.0f} days suggests "
                "intermediate growth rate."
            )
        else:
            interpretations.append(
                f"Volume doubling time of {vdt:.0f} days sugge
generate_recommendations function · python · L409-L441 (33 LOC)
src/core/longitudinal_analyzer.py
def generate_recommendations(analysis: ChangeAnalysis) -> List[str]:
    """Generate clinical recommendations based on analysis."""

    recommendations = []

    if analysis.risk_level == RiskLevel.VERY_HIGH:
        recommendations.append("Urgent tissue sampling or PET-CT recommended.")
        recommendations.append("Consider multidisciplinary tumor board discussion.")

    elif analysis.risk_level == RiskLevel.HIGH:
        recommendations.append("Consider PET-CT for further characterization.")
        recommendations.append("If PET positive, tissue sampling recommended.")
        recommendations.append("Short-interval follow-up CT if PET not performed (3 months).")

    elif analysis.risk_level == RiskLevel.INTERMEDIATE:
        if analysis.trajectory == ChangeTrajectory.STABLE:
            recommendations.append("Continue surveillance with follow-up CT in 6 months.")
        else:
            recommendations.append("Consider follow-up CT in 3 months to assess trajectory.")

    e
generate_differential_evolution function · python · L444-L509 (66 LOC)
src/core/longitudinal_analyzer.py
def generate_differential_evolution(
    analysis: ChangeAnalysis
) -> List[DifferentialDiagnosis]:
    """
    Generate differential diagnosis evolution.

    This is the "judge-wowing" feature - showing how differentials
    should change based on the observed interval changes.
    """
    differentials = []

    if analysis.trajectory == ChangeTrajectory.WORSENING:
        differentials.append(DifferentialDiagnosis(
            diagnosis="Primary lung malignancy",
            prior_probability="moderate",
            current_probability="high",
            change_rationale="Interval growth with VDT consistent with malignancy"
        ))
        differentials.append(DifferentialDiagnosis(
            diagnosis="Inflammatory/infectious",
            prior_probability="moderate",
            current_probability="low",
            change_rationale="Would expect stability or resolution if infectious"
        ))
        differentials.append(DifferentialDiagnosis(
            diagnosis="Sl
generate_comparison_paragraph function · python · L512-L560 (49 LOC)
src/core/longitudinal_analyzer.py
def generate_comparison_paragraph(
    prior: NoduleMeasurement,
    current: NoduleMeasurement,
    analysis: ChangeAnalysis
) -> str:
    """
    Generate draft comparison paragraph for radiology report.

    This saves radiologist time by providing ready-to-use language.
    """
    prior_date = prior.date.strftime("%Y-%m-%d")

    if analysis.trajectory == ChangeTrajectory.STABLE:
        return (
            f"COMPARISON: CT Chest dated {prior_date}\n\n"
            f"The previously identified {prior.size_mm}mm {prior.nodule_type} nodule "
            f"in the {current.location} remains stable, now measuring {current.size_mm}mm. "
            f"No significant interval change ({analysis.size_change_percent:+.1f}% over "
            f"{analysis.days_between} days). Continued surveillance recommended per "
            f"Lung-RADS {analysis.lung_rads_current.value if analysis.lung_rads_current else 'guidelines'}."
        )

    elif analysis.trajectory == ChangeTrajectory.WORSENING:
generate_patient_summary function · python · L563-L596 (34 LOC)
src/core/longitudinal_analyzer.py
def generate_patient_summary(
    analysis: ChangeAnalysis,
    current: NoduleMeasurement
) -> str:
    """
    Generate patient-friendly summary of findings.

    This supports patient engagement and understanding.
    """
    if analysis.trajectory == ChangeTrajectory.STABLE:
        return (
            f"Your lung scan shows a small spot ({current.size_mm}mm) that has not changed "
            f"since your last scan. This is reassuring, as it suggests the spot is unlikely "
            f"to be harmful. Your doctor recommends continuing to monitor it with regular scans."
        )

    elif analysis.trajectory == ChangeTrajectory.WORSENING:
        return (
            f"Your lung scan shows a spot that has grown since your last scan "
            f"(from {current.size_mm - analysis.size_change_mm:.1f}mm to {current.size_mm}mm). "
            f"While this doesn't definitely mean it's cancer, your doctor will likely recommend "
            f"additional tests to learn more about it. 
analyze_longitudinal_change function · python · L603-L685 (83 LOC)
src/core/longitudinal_analyzer.py
def analyze_longitudinal_change(
    prior_measurement: NoduleMeasurement,
    current_measurement: NoduleMeasurement,
    clinical_context: Optional[str] = None
) -> ChangeAnalysis:
    """
    Perform comprehensive longitudinal change analysis.

    This is the core function of RadAssist Pro.

    Args:
        prior_measurement: Earlier nodule measurement
        current_measurement: Later nodule measurement
        clinical_context: Optional clinical context

    Returns:
        Complete change analysis with clinical decision support
    """
    # Ensure correct ordering
    if current_measurement.date < prior_measurement.date:
        prior_measurement, current_measurement = current_measurement, prior_measurement

    # Calculate basic metrics
    size_change_mm = current_measurement.size_mm - prior_measurement.size_mm
    size_change_percent = (size_change_mm / prior_measurement.size_mm) * 100 if prior_measurement.size_mm > 0 else 0

    v_prior = prior_measurement.volume_calcul
create_longitudinal_report function · python · L688-L729 (42 LOC)
src/core/longitudinal_analyzer.py
def create_longitudinal_report(
    measurements: List[NoduleMeasurement],
    clinical_context: str = ""
) -> LongitudinalReport:
    """
    Create a complete longitudinal analysis report.

    Args:
        measurements: List of measurements (oldest to newest)
        clinical_context: Clinical context string

    Returns:
        Complete longitudinal report
    """
    if len(measurements) < 2:
        raise ValueError("At least 2 measurements required for longitudinal analysis")

    # Sort by date
    measurements = sorted(measurements, key=lambda m: m.date)

    # Analyze most recent change
    prior = measurements[-2]
    current = measurements[-1]
    analysis = analyze_longitudinal_change(prior, current, clinical_context)

    # Generate differentials
    differentials = generate_differential_evolution(analysis)

    # Build timeline summary
    timeline_lines = ["TIMELINE:"]
    for m in measurements:
        date_str = m.date.strftime("%Y-%m-%d")
        timeline_lines.app
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
demo_analysis function · python · L736-L844 (109 LOC)
src/core/longitudinal_analyzer.py
def demo_analysis():
    """
    Demonstrate the longitudinal analysis capability.

    This creates the demo scenario for the competition.
    """
    print("=" * 70)
    print("RadAssist Pro - Longitudinal Analysis Demo")
    print("'AI That Remembers'")
    print("=" * 70)
    print()

    # Create demo measurements (the "missed progression" scenario)
    measurements = [
        NoduleMeasurement(
            date=datetime(2024, 1, 15),
            size_mm=6.0,
            location="right upper lobe",
            nodule_type="solid",
            morphology="spiculated"
        ),
        NoduleMeasurement(
            date=datetime(2024, 7, 20),
            size_mm=6.2,
            location="right upper lobe",
            nodule_type="solid",
            morphology="spiculated"
        ),
        NoduleMeasurement(
            date=datetime(2025, 1, 18),
            size_mm=6.8,
            location="right upper lobe",
            nodule_type="solid",
            morphology="spicul
DetectedNodule.to_dict method · python · L88-L102 (15 LOC)
src/core/nodule_detector.py
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary."""
        return {
            "nodule_id": self.nodule_id,
            "location": self.location,
            "nodule_type": self.nodule_type,
            "size_mm": self.size_mm,
            "size_perpendicular_mm": self.size_perpendicular_mm,
            "volume_mm3": self.volume_mm3,
            "confidence": self.confidence,
            "detection_method": self.detection_method,
            "morphology": self.morphology,
            "calcification": self.calcification,
            "cavitation": self.cavitation
        }
ScanMetadata.to_dict method · python · L116-L123 (8 LOC)
src/core/nodule_detector.py
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary."""
        return {
            "scan_date": self.scan_date.isoformat(),
            "modality": self.modality,
            "series_description": self.series_description,
            "slice_thickness_mm": self.slice_thickness_mm
        }
page 1 / 5next ›