← back to drewlinsley__tmp_bs_mcp

Function bodies 170 total

All specs Real LLM only Function bodies
api_call_tool function · python · L727-L734 (8 LOC)
server/mcp_server.py
async def api_call_tool(request: Request):
    """REST endpoint: call a tool by name (for stdio proxy)."""
    body = await request.json()
    name = body.get("name", "")
    arguments = body.get("arguments", {})
    result = await call_tool(name, arguments)
    # result is a list of TextContent
    return {"text": "\n".join(r.text for r in result)}
health function · python · L740-L746 (7 LOC)
server/mcp_server.py
async def health():
    return {
        "status": "ok",
        "model_loaded": engine is not None,
        "n_classifiers": len(classifiers),
        "gpu_available": torch.cuda.is_available(),
    }
load_classifiers function · python · L751-L774 (24 LOC)
server/mcp_server.py
def load_classifiers(classifier_dir: str, device: str = "cpu"):
    clf_dir = Path(classifier_dir)
    loaded = {}
    if not clf_dir.exists():
        print(f"WARNING: classifier directory {clf_dir} not found")
        return loaded

    for pkl_path in sorted(clf_dir.glob('*.pkl')):
        task = pkl_path.stem.replace('__', '/')
        try:
            with open(pkl_path, 'rb') as f:
                clf_data = pickle.load(f)
            # Move TabPFN models to GPU for faster inference
            if clf_data.get('method') == 'tabpfn' and device != 'cpu':
                try:
                    clf_data['model'].to(device)
                except Exception:
                    pass
            loaded[task] = clf_data
        except Exception as e:
            print(f"  Failed to load {pkl_path.name}: {e}")

    print(f"Loaded {len(loaded)} classifiers")
    return loaded
main function · python · L777-L814 (38 LOC)
server/mcp_server.py
def main():
    parser = argparse.ArgumentParser(description="NeoCog EEG MCP Server")
    parser.add_argument("--checkpoint", required=True)
    parser.add_argument("--classifier-dir", default="server/classifiers")
    parser.add_argument("--feature-classifier-dir", default="server/feature_classifiers")
    parser.add_argument("--port", type=int, default=8080)
    parser.add_argument("--host", default="0.0.0.0")
    parser.add_argument("--device", default="cuda")
    parser.add_argument("--ssl-cert", default=None, help="Path to SSL certificate for HTTPS")
    parser.add_argument("--ssl-key", default=None, help="Path to SSL private key for HTTPS")
    args = parser.parse_args()

    global engine, classifiers, feature_classifiers

    print(f"Loading encoder from {args.checkpoint}...")
    engine = EEGInferenceEngine(args.checkpoint, device=args.device)

    print(f"Loading classifiers from {args.classifier_dir}...")
    classifiers = load_classifiers(args.classifier_dir, device=args.de
_robust_zscore function · python · L30-L38 (9 LOC)
server/preprocess.py
def _robust_zscore(data: np.ndarray) -> np.ndarray:
    """Robust z-score normalization per channel (matching v5 pipeline)."""
    for ch in range(data.shape[0]):
        x = data[ch]
        median = np.median(x)
        mad = np.median(np.abs(x - median))
        scale = max(mad * 1.4826, 1e-8)  # MAD to pseudo-std
        data[ch] = (x - median) / scale
    return data
_map_channels_to_coords function · python · L41-L56 (16 LOC)
server/preprocess.py
def _map_channels_to_coords(ch_names):
    """Map channel names to (x, y) scalp coordinates."""
    coords = np.zeros((len(ch_names), 2), dtype=np.float32)
    mask = np.zeros(len(ch_names), dtype=np.float32)

    for i, ch in enumerate(ch_names):
        # Try direct match, then alias
        name = ch.strip()
        if name in EEGConfig.CHANNEL_COORDS:
            coords[i] = EEGConfig.CHANNEL_COORDS[name]
            mask[i] = 1.0
        elif CHANNEL_ALIASES.get(name, name) in EEGConfig.CHANNEL_COORDS:
            coords[i] = EEGConfig.CHANNEL_COORDS[CHANNEL_ALIASES[name]]
            mask[i] = 1.0

    return coords, mask
preprocess_edf function · python · L59-L112 (54 LOC)
server/preprocess.py
def preprocess_edf(filepath: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Dict]:
    """Load and preprocess EDF/BDF file."""
    import mne
    raw = mne.io.read_raw(str(filepath), preload=True, verbose=False)

    # Pick only EEG channels
    eeg_picks = mne.pick_types(raw.info, eeg=True, exclude='bads')
    if len(eeg_picks) == 0:
        raise ValueError("No EEG channels found in file")
    raw.pick(eeg_picks)

    # Resample to 200 Hz
    if raw.info['sfreq'] != TARGET_SRATE:
        raw.resample(TARGET_SRATE, verbose=False)

    # Get data and channel names
    data = raw.get_data()  # (n_channels, n_samples)
    ch_names = raw.ch_names

    # Check units — MNE returns SI (volts). Convert to microvolts.
    # MNE FIFF unit code 107 = volts
    for ch_info in raw.info['chs']:
        if ch_info.get('unit', 0) == 107:  # FIFF_UNIT_V
            data *= 1e6  # V → µV
            break

    # Robust z-score
    data = _robust_zscore(data)

    # Map channels to coordinates
    c
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
_detect_dataset_from_h5 function · python · L115-L146 (32 LOC)
server/preprocess.py
def _detect_dataset_from_h5(filepath: Path, f) -> Tuple[str, str]:
    """Detect dataset name and recording state from H5 file attributes."""
    # Check H5 attrs
    dataset = 'unknown'
    rec_state = 'resting eyes closed'

    if 'metadata' in f and 'dataset' in f['metadata'].attrs:
        dataset = f['metadata'].attrs['dataset']
    elif 'dataset' in f.attrs:
        dataset = f.attrs['dataset']

    # Infer from filename/path
    fname = filepath.name.lower()
    path_str = str(filepath).lower()
    if dataset == 'unknown':
        for ds in ['AD_EEG', 'CAUEEG', 'DS004504', 'PEARL', 'TD_BRAIN',
                    'READTBI', 'DORTMUND', 'PARKINSON_UCSD', 'DEPRESSION', 'SRM', 'LEMON']:
            if ds.lower() in path_str:
                dataset = ds
                break

    # Infer recording state
    if 'eyesopen' in fname or 'eyes_open' in fname or 'resteo' in fname:
        rec_state = 'resting eyes open'
    elif 'msit' in fname:
        rec_state = 'MSIT task'
    elif '
preprocess_h5 function · python · L149-L204 (56 LOC)
server/preprocess.py
def preprocess_h5(filepath: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Dict]:
    """Load preprocessed H5 file (our internal format)."""
    with h5py.File(filepath, 'r') as f:
        segments = f['eeg/segments'][:]
        ch_names = [x.decode() if isinstance(x, bytes) else x for x in f['eeg/channel_names'][:]]
        file_coords = f['eeg/channel_coords'][:] if 'eeg/channel_coords' in f else None
        file_mask = f['channel_mask'][:] if 'channel_mask' in f else None
        dataset, rec_state = _detect_dataset_from_h5(filepath, f)

    # Stitch segments
    STRIDE = 12000
    if segments.shape[0] == 1:
        data = segments[0]
    else:
        parts = [segments[i, :, :STRIDE] for i in range(segments.shape[0] - 1)]
        parts.append(segments[-1])
        data = np.concatenate(parts, axis=1)

    n_ch = data.shape[0]

    # Channel coords
    if file_coords is not None:
        coords = np.zeros((MAX_CHANNELS, 2), dtype=np.float32)
        coords[:min(n_ch, MAX_CHANNE
preprocess_eeg function · python · L207-L225 (19 LOC)
server/preprocess.py
def preprocess_eeg(filepath: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Dict]:
    """
    Load and preprocess an EEG file.

    Returns:
        eeg_data: (max_channels, n_samples) float32
        channel_mask: (max_channels,) float32, 1=present 0=padding
        channel_coords: (max_channels, 2) float32, (x, y) scalp positions
        metadata: dict with recording info
    """
    filepath = Path(filepath)
    ext = filepath.suffix.lower()

    if ext in ('.edf', '.bdf', '.set'):
        return preprocess_edf(filepath)
    elif ext == '.h5':
        return preprocess_h5(filepath)
    else:
        raise ValueError(f"Unsupported format: {ext}. Supported: {SUPPORTED_EXTENSIONS}")
_select_citations_for_results function · python · L215-L260 (46 LOC)
server/response_generator.py
def _select_citations_for_results(results: List[dict]) -> List[dict]:
    """Select citations relevant to the actual screening results.
    Prioritize citations for positive findings."""
    seen_ids = set()
    selected = []

    # Collect citation keys, positive findings first
    positive_keys = set()
    all_keys = set()
    for r in results:
        if r['type'] != 'classification':
            continue
        cat = _get_task_category(r['task'])
        cit_keys = _CATEGORY_CITATIONS.get(cat, [])
        all_keys.update(cit_keys)
        if r['prediction'] == 'POSITIVE':
            positive_keys.update(cit_keys)

    # Always include regression-related citations
    for r in results:
        if r['type'] == 'regression':
            if 'age' in r['task']:
                all_keys.add('brain_age')
            if 'mmse' in r['task']:
                all_keys.add('mmse')
            if 'ravlt' in r['task']:
                all_keys.add('cognitive')

    # Add positive-finding citat
_find_citation_num function · python · L263-L267 (5 LOC)
server/response_generator.py
def _find_citation_num(cit_id: str, citations: List[dict]) -> Optional[int]:
    for i, c in enumerate(citations, 1):
        if c['id'] == cit_id:
            return i
    return None
_citation_refs_for_category function · python · L270-L281 (12 LOC)
server/response_generator.py
def _citation_refs_for_category(cat: str, citations: List[dict]) -> str:
    """Return inline citation string like '[1,2]' for a category."""
    keys = _CATEGORY_CITATIONS.get(cat, [])
    nums = []
    for key in keys:
        for cit in CITATIONS.get(key, []):
            n = _find_citation_num(cit['id'], citations)
            if n is not None and n not in nums:
                nums.append(n)
    if not nums:
        return ''
    return ' [' + ','.join(str(n) for n in sorted(nums)) + ']'
generate_enriched_report function · python · L284-L472 (189 LOC)
server/response_generator.py
def generate_enriched_report(
    results: List[dict],
    metadata: Optional[dict] = None,
) -> Tuple[str, str]:
    """
    Generate an OpenEvidence-style clinical report with inline citations,
    plus structured JSON data for visualization artifacts.

    Args:
        results: List of dicts from run_single_screen() — each has keys like
                 task, display_name, type, prediction, probability, confidence, auc, etc.
        metadata: Optional recording metadata from preprocess_eeg().

    Returns:
        (narrative_markdown, json_string)
    """
    citations = _select_citations_for_results(results)

    cls_results = [r for r in results if r['type'] == 'classification']
    reg_results = [r for r in results if r['type'] == 'regression']
    positive = [r for r in cls_results if r['prediction'] == 'POSITIVE']
    negative = [r for r in cls_results if r['prediction'] == 'NEGATIVE']

    positive.sort(key=lambda r: r['probability'], reverse=True)
    negative.sort(key=lambd
_build_visualization_data function · python · L475-L521 (47 LOC)
server/response_generator.py
def _build_visualization_data(results: List[dict], metadata: Optional[dict] = None) -> dict:
    cls_data = []
    reg_data = []

    for r in results:
        if r['type'] == 'classification':
            cls_data.append({
                'task': r['task'],
                'display_name': r['display_name'],
                'probability': round(r['probability'], 4),
                'prediction': r['prediction'],
                'confidence': r['confidence'],
                'auc': round(r['auc'], 3),
                'n_test': r['n_test'],
                'category': _get_task_category(r['task']),
            })
        else:
            cls_data_entry = {
                'task': r['task'],
                'display_name': r['display_name'],
                'predicted_value': round(r['predicted_value'], 2),
                'valid_range': [round(v, 1) for v in r['valid_range']],
                'pearson_r': round(r['pearson_r'], 3),
                'n_test': r['n_test'],
            }
   
Repobility (the analyzer behind this table) · https://repobility.com
EEGConfig class · python · L5-L98 (94 LOC)
utils/core.py
class EEGConfig:
    """Centralized configuration for all EEG processing"""
    SAMPLING_RATE = 200
    N_CHANNELS = 19  # Standard 10-20 system (all datasets padded to 19)
    N_STANDARD_CHANNELS = 19  # Same as N_CHANNELS
    # Updated for 2-minute segments with 160ms windows
    SEGMENT_LENGTH_MS = 120000  # 2 minutes
    SEGMENT_LENGTH_SAMPLES = 24000  # 120s @ 200Hz
    WINDOW_SIZE_MS = 160  # Doubled from 80ms
    WINDOW_SIZE_SAMPLES = 32  # 160ms @ 200Hz
    TOKENS_PER_WINDOW = 4  # Will be mean-pooled to 1
    STANDARD_CHANNELS = [
        'Fp1', 'Fp2', 'F3', 'F4', 'C3', 'C4',
        'P3', 'P4', 'O1', 'O2', 'F7', 'F8',
        'T3', 'T4', 'T5', 'T6', 'Fz', 'Cz', 'Pz'
    ]
    FILTER_LOW = 0.5
    FILTER_HIGH = 70

    # 2D coordinates for each channel on the scalp (10-20 system)
    # Normalized to approximately [-1, 1] range
    # y-axis: front (+) to back (-)
    # x-axis: left (-) to right (+)
    CHANNEL_COORDS = {
        # === Standard 10-20 (19 channels) ===
        # 
get_dataset_config function · python · L100-L218 (119 LOC)
utils/core.py
def get_dataset_config(dataset_name: str) -> Dict:
    """Single source of truth for dataset configurations

    Recording condition metadata for model conditioning:
    - power_line_freq: 50 (Europe/Asia) or 60 (Americas)
    - reference: Original reference scheme before re-referencing
    - recording_condition: Default recording condition for this dataset
    """
    configs = {
        'AD_EEG': {
            'original_sr': 500,
            'format': 'eeglab',
            'notch': [50, 100],
            'reference': 'A1-A2',  # Mastoid reference
            'power_line_freq': 50,  # European dataset
            'recording_condition': 'eyes_closed'
        },
        'PEARL': {
            'original_sr': 1000,
            'format': 'brainvision',
            'notch': [60, 120],
            'reference': 'FCz',
            'power_line_freq': 60,  # Polish dataset uses 50Hz but recorded with US equipment
            'recording_condition': 'task'  # Various cognitive tasks
        },
   
get_channel_mapping function · python · L220-L285 (66 LOC)
utils/core.py
def get_channel_mapping(dataset_name: str) -> Dict[str, str]:
    """Get channel mapping for each dataset to standard 19 channels"""
    mappings = {
        'AD_EEG': {
            'Fp1': 'Fp1', 'Fp2': 'Fp2', 'F3': 'F3', 'F4': 'F4',
            'C3': 'C3', 'C4': 'C4', 'P3': 'P3', 'P4': 'P4',
            'O1': 'O1', 'O2': 'O2', 'F7': 'F7', 'F8': 'F8',
            'T3': 'T3', 'T4': 'T4', 'T5': 'T5', 'T6': 'T6',
            'Fz': 'Fz', 'Cz': 'Cz', 'Pz': 'Pz'
        },
        'PEARL': {  # PEARL uses modern naming (T7/T8/P7/P8)
            'Fp1': 'Fp1', 'Fp2': 'Fp2', 'F3': 'F3', 'F4': 'F4',
            'C3': 'C3', 'C4': 'C4', 'P3': 'P3', 'P4': 'P4',
            'O1': 'O1', 'O2': 'O2', 'F7': 'F7', 'F8': 'F8',
            'T3': 'T7', 'T4': 'T8', 'T5': 'P7', 'T6': 'P8',
            'Fz': 'Fz', 'Cz': 'Cz', 'Pz': 'Pz'
        },
        'TD_BRAIN': {  # Modern naming convention
            'Fp1': 'Fp1', 'Fp2': 'Fp2', 'F3': 'F3', 'F4': 'F4',
            'C3': 'C3', 'C4': 'C4', 'P3': 'P3', 'P4
get_dataset_channels function · python · L288-L293 (6 LOC)
utils/core.py
def get_dataset_channels(dataset_name: str) -> List[str]:
    """Get the list of channels for a dataset (may be fewer than 19 for some datasets)"""
    config = get_dataset_config(dataset_name)
    if 'channels' in config:
        return config['channels']
    return EEGConfig.STANDARD_CHANNELS  # Default to full 19 channels
create_channel_mask function · python · L296-L313 (18 LOC)
utils/core.py
def create_channel_mask(dataset_channels: List[str], all_channels: List[str] = None) -> np.ndarray:
    """Create binary mask indicating which channels are present.

    Args:
        dataset_channels: List of channel names present in the dataset
        all_channels: Full list of possible channels (defaults to EXTENDED_CHANNELS)

    Returns:
        Binary mask array where 1 = channel present, 0 = channel absent
    """
    if all_channels is None:
        all_channels = EEGConfig.EXTENDED_CHANNELS

    mask = np.zeros(len(all_channels), dtype=np.float32)
    for ch in dataset_channels:
        if ch in all_channels:
            mask[all_channels.index(ch)] = 1.0
    return mask
‹ prevpage 4 / 4