Function bodies 170 total
api_call_tool function · python · L727-L734 (8 LOC)server/mcp_server.py
async def api_call_tool(request: Request):
"""REST endpoint: call a tool by name (for stdio proxy)."""
body = await request.json()
name = body.get("name", "")
arguments = body.get("arguments", {})
result = await call_tool(name, arguments)
# result is a list of TextContent
return {"text": "\n".join(r.text for r in result)}health function · python · L740-L746 (7 LOC)server/mcp_server.py
async def health():
return {
"status": "ok",
"model_loaded": engine is not None,
"n_classifiers": len(classifiers),
"gpu_available": torch.cuda.is_available(),
}load_classifiers function · python · L751-L774 (24 LOC)server/mcp_server.py
def load_classifiers(classifier_dir: str, device: str = "cpu"):
clf_dir = Path(classifier_dir)
loaded = {}
if not clf_dir.exists():
print(f"WARNING: classifier directory {clf_dir} not found")
return loaded
for pkl_path in sorted(clf_dir.glob('*.pkl')):
task = pkl_path.stem.replace('__', '/')
try:
with open(pkl_path, 'rb') as f:
clf_data = pickle.load(f)
# Move TabPFN models to GPU for faster inference
if clf_data.get('method') == 'tabpfn' and device != 'cpu':
try:
clf_data['model'].to(device)
except Exception:
pass
loaded[task] = clf_data
except Exception as e:
print(f" Failed to load {pkl_path.name}: {e}")
print(f"Loaded {len(loaded)} classifiers")
return loadedmain function · python · L777-L814 (38 LOC)server/mcp_server.py
def main():
parser = argparse.ArgumentParser(description="NeoCog EEG MCP Server")
parser.add_argument("--checkpoint", required=True)
parser.add_argument("--classifier-dir", default="server/classifiers")
parser.add_argument("--feature-classifier-dir", default="server/feature_classifiers")
parser.add_argument("--port", type=int, default=8080)
parser.add_argument("--host", default="0.0.0.0")
parser.add_argument("--device", default="cuda")
parser.add_argument("--ssl-cert", default=None, help="Path to SSL certificate for HTTPS")
parser.add_argument("--ssl-key", default=None, help="Path to SSL private key for HTTPS")
args = parser.parse_args()
global engine, classifiers, feature_classifiers
print(f"Loading encoder from {args.checkpoint}...")
engine = EEGInferenceEngine(args.checkpoint, device=args.device)
print(f"Loading classifiers from {args.classifier_dir}...")
classifiers = load_classifiers(args.classifier_dir, device=args.de_robust_zscore function · python · L30-L38 (9 LOC)server/preprocess.py
def _robust_zscore(data: np.ndarray) -> np.ndarray:
"""Robust z-score normalization per channel (matching v5 pipeline)."""
for ch in range(data.shape[0]):
x = data[ch]
median = np.median(x)
mad = np.median(np.abs(x - median))
scale = max(mad * 1.4826, 1e-8) # MAD to pseudo-std
data[ch] = (x - median) / scale
return data_map_channels_to_coords function · python · L41-L56 (16 LOC)server/preprocess.py
def _map_channels_to_coords(ch_names):
"""Map channel names to (x, y) scalp coordinates."""
coords = np.zeros((len(ch_names), 2), dtype=np.float32)
mask = np.zeros(len(ch_names), dtype=np.float32)
for i, ch in enumerate(ch_names):
# Try direct match, then alias
name = ch.strip()
if name in EEGConfig.CHANNEL_COORDS:
coords[i] = EEGConfig.CHANNEL_COORDS[name]
mask[i] = 1.0
elif CHANNEL_ALIASES.get(name, name) in EEGConfig.CHANNEL_COORDS:
coords[i] = EEGConfig.CHANNEL_COORDS[CHANNEL_ALIASES[name]]
mask[i] = 1.0
return coords, maskpreprocess_edf function · python · L59-L112 (54 LOC)server/preprocess.py
def preprocess_edf(filepath: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Dict]:
"""Load and preprocess EDF/BDF file."""
import mne
raw = mne.io.read_raw(str(filepath), preload=True, verbose=False)
# Pick only EEG channels
eeg_picks = mne.pick_types(raw.info, eeg=True, exclude='bads')
if len(eeg_picks) == 0:
raise ValueError("No EEG channels found in file")
raw.pick(eeg_picks)
# Resample to 200 Hz
if raw.info['sfreq'] != TARGET_SRATE:
raw.resample(TARGET_SRATE, verbose=False)
# Get data and channel names
data = raw.get_data() # (n_channels, n_samples)
ch_names = raw.ch_names
# Check units — MNE returns SI (volts). Convert to microvolts.
# MNE FIFF unit code 107 = volts
for ch_info in raw.info['chs']:
if ch_info.get('unit', 0) == 107: # FIFF_UNIT_V
data *= 1e6 # V → µV
break
# Robust z-score
data = _robust_zscore(data)
# Map channels to coordinates
cMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
_detect_dataset_from_h5 function · python · L115-L146 (32 LOC)server/preprocess.py
def _detect_dataset_from_h5(filepath: Path, f) -> Tuple[str, str]:
"""Detect dataset name and recording state from H5 file attributes."""
# Check H5 attrs
dataset = 'unknown'
rec_state = 'resting eyes closed'
if 'metadata' in f and 'dataset' in f['metadata'].attrs:
dataset = f['metadata'].attrs['dataset']
elif 'dataset' in f.attrs:
dataset = f.attrs['dataset']
# Infer from filename/path
fname = filepath.name.lower()
path_str = str(filepath).lower()
if dataset == 'unknown':
for ds in ['AD_EEG', 'CAUEEG', 'DS004504', 'PEARL', 'TD_BRAIN',
'READTBI', 'DORTMUND', 'PARKINSON_UCSD', 'DEPRESSION', 'SRM', 'LEMON']:
if ds.lower() in path_str:
dataset = ds
break
# Infer recording state
if 'eyesopen' in fname or 'eyes_open' in fname or 'resteo' in fname:
rec_state = 'resting eyes open'
elif 'msit' in fname:
rec_state = 'MSIT task'
elif 'preprocess_h5 function · python · L149-L204 (56 LOC)server/preprocess.py
def preprocess_h5(filepath: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Dict]:
"""Load preprocessed H5 file (our internal format)."""
with h5py.File(filepath, 'r') as f:
segments = f['eeg/segments'][:]
ch_names = [x.decode() if isinstance(x, bytes) else x for x in f['eeg/channel_names'][:]]
file_coords = f['eeg/channel_coords'][:] if 'eeg/channel_coords' in f else None
file_mask = f['channel_mask'][:] if 'channel_mask' in f else None
dataset, rec_state = _detect_dataset_from_h5(filepath, f)
# Stitch segments
STRIDE = 12000
if segments.shape[0] == 1:
data = segments[0]
else:
parts = [segments[i, :, :STRIDE] for i in range(segments.shape[0] - 1)]
parts.append(segments[-1])
data = np.concatenate(parts, axis=1)
n_ch = data.shape[0]
# Channel coords
if file_coords is not None:
coords = np.zeros((MAX_CHANNELS, 2), dtype=np.float32)
coords[:min(n_ch, MAX_CHANNEpreprocess_eeg function · python · L207-L225 (19 LOC)server/preprocess.py
def preprocess_eeg(filepath: Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Dict]:
"""
Load and preprocess an EEG file.
Returns:
eeg_data: (max_channels, n_samples) float32
channel_mask: (max_channels,) float32, 1=present 0=padding
channel_coords: (max_channels, 2) float32, (x, y) scalp positions
metadata: dict with recording info
"""
filepath = Path(filepath)
ext = filepath.suffix.lower()
if ext in ('.edf', '.bdf', '.set'):
return preprocess_edf(filepath)
elif ext == '.h5':
return preprocess_h5(filepath)
else:
raise ValueError(f"Unsupported format: {ext}. Supported: {SUPPORTED_EXTENSIONS}")_select_citations_for_results function · python · L215-L260 (46 LOC)server/response_generator.py
def _select_citations_for_results(results: List[dict]) -> List[dict]:
"""Select citations relevant to the actual screening results.
Prioritize citations for positive findings."""
seen_ids = set()
selected = []
# Collect citation keys, positive findings first
positive_keys = set()
all_keys = set()
for r in results:
if r['type'] != 'classification':
continue
cat = _get_task_category(r['task'])
cit_keys = _CATEGORY_CITATIONS.get(cat, [])
all_keys.update(cit_keys)
if r['prediction'] == 'POSITIVE':
positive_keys.update(cit_keys)
# Always include regression-related citations
for r in results:
if r['type'] == 'regression':
if 'age' in r['task']:
all_keys.add('brain_age')
if 'mmse' in r['task']:
all_keys.add('mmse')
if 'ravlt' in r['task']:
all_keys.add('cognitive')
# Add positive-finding citat_find_citation_num function · python · L263-L267 (5 LOC)server/response_generator.py
def _find_citation_num(cit_id: str, citations: List[dict]) -> Optional[int]:
for i, c in enumerate(citations, 1):
if c['id'] == cit_id:
return i
return None_citation_refs_for_category function · python · L270-L281 (12 LOC)server/response_generator.py
def _citation_refs_for_category(cat: str, citations: List[dict]) -> str:
"""Return inline citation string like '[1,2]' for a category."""
keys = _CATEGORY_CITATIONS.get(cat, [])
nums = []
for key in keys:
for cit in CITATIONS.get(key, []):
n = _find_citation_num(cit['id'], citations)
if n is not None and n not in nums:
nums.append(n)
if not nums:
return ''
return ' [' + ','.join(str(n) for n in sorted(nums)) + ']'generate_enriched_report function · python · L284-L472 (189 LOC)server/response_generator.py
def generate_enriched_report(
results: List[dict],
metadata: Optional[dict] = None,
) -> Tuple[str, str]:
"""
Generate an OpenEvidence-style clinical report with inline citations,
plus structured JSON data for visualization artifacts.
Args:
results: List of dicts from run_single_screen() — each has keys like
task, display_name, type, prediction, probability, confidence, auc, etc.
metadata: Optional recording metadata from preprocess_eeg().
Returns:
(narrative_markdown, json_string)
"""
citations = _select_citations_for_results(results)
cls_results = [r for r in results if r['type'] == 'classification']
reg_results = [r for r in results if r['type'] == 'regression']
positive = [r for r in cls_results if r['prediction'] == 'POSITIVE']
negative = [r for r in cls_results if r['prediction'] == 'NEGATIVE']
positive.sort(key=lambda r: r['probability'], reverse=True)
negative.sort(key=lambd_build_visualization_data function · python · L475-L521 (47 LOC)server/response_generator.py
def _build_visualization_data(results: List[dict], metadata: Optional[dict] = None) -> dict:
cls_data = []
reg_data = []
for r in results:
if r['type'] == 'classification':
cls_data.append({
'task': r['task'],
'display_name': r['display_name'],
'probability': round(r['probability'], 4),
'prediction': r['prediction'],
'confidence': r['confidence'],
'auc': round(r['auc'], 3),
'n_test': r['n_test'],
'category': _get_task_category(r['task']),
})
else:
cls_data_entry = {
'task': r['task'],
'display_name': r['display_name'],
'predicted_value': round(r['predicted_value'], 2),
'valid_range': [round(v, 1) for v in r['valid_range']],
'pearson_r': round(r['pearson_r'], 3),
'n_test': r['n_test'],
}
Repobility (the analyzer behind this table) · https://repobility.com
EEGConfig class · python · L5-L98 (94 LOC)utils/core.py
class EEGConfig:
"""Centralized configuration for all EEG processing"""
SAMPLING_RATE = 200
N_CHANNELS = 19 # Standard 10-20 system (all datasets padded to 19)
N_STANDARD_CHANNELS = 19 # Same as N_CHANNELS
# Updated for 2-minute segments with 160ms windows
SEGMENT_LENGTH_MS = 120000 # 2 minutes
SEGMENT_LENGTH_SAMPLES = 24000 # 120s @ 200Hz
WINDOW_SIZE_MS = 160 # Doubled from 80ms
WINDOW_SIZE_SAMPLES = 32 # 160ms @ 200Hz
TOKENS_PER_WINDOW = 4 # Will be mean-pooled to 1
STANDARD_CHANNELS = [
'Fp1', 'Fp2', 'F3', 'F4', 'C3', 'C4',
'P3', 'P4', 'O1', 'O2', 'F7', 'F8',
'T3', 'T4', 'T5', 'T6', 'Fz', 'Cz', 'Pz'
]
FILTER_LOW = 0.5
FILTER_HIGH = 70
# 2D coordinates for each channel on the scalp (10-20 system)
# Normalized to approximately [-1, 1] range
# y-axis: front (+) to back (-)
# x-axis: left (-) to right (+)
CHANNEL_COORDS = {
# === Standard 10-20 (19 channels) ===
# get_dataset_config function · python · L100-L218 (119 LOC)utils/core.py
def get_dataset_config(dataset_name: str) -> Dict:
"""Single source of truth for dataset configurations
Recording condition metadata for model conditioning:
- power_line_freq: 50 (Europe/Asia) or 60 (Americas)
- reference: Original reference scheme before re-referencing
- recording_condition: Default recording condition for this dataset
"""
configs = {
'AD_EEG': {
'original_sr': 500,
'format': 'eeglab',
'notch': [50, 100],
'reference': 'A1-A2', # Mastoid reference
'power_line_freq': 50, # European dataset
'recording_condition': 'eyes_closed'
},
'PEARL': {
'original_sr': 1000,
'format': 'brainvision',
'notch': [60, 120],
'reference': 'FCz',
'power_line_freq': 60, # Polish dataset uses 50Hz but recorded with US equipment
'recording_condition': 'task' # Various cognitive tasks
},
get_channel_mapping function · python · L220-L285 (66 LOC)utils/core.py
def get_channel_mapping(dataset_name: str) -> Dict[str, str]:
"""Get channel mapping for each dataset to standard 19 channels"""
mappings = {
'AD_EEG': {
'Fp1': 'Fp1', 'Fp2': 'Fp2', 'F3': 'F3', 'F4': 'F4',
'C3': 'C3', 'C4': 'C4', 'P3': 'P3', 'P4': 'P4',
'O1': 'O1', 'O2': 'O2', 'F7': 'F7', 'F8': 'F8',
'T3': 'T3', 'T4': 'T4', 'T5': 'T5', 'T6': 'T6',
'Fz': 'Fz', 'Cz': 'Cz', 'Pz': 'Pz'
},
'PEARL': { # PEARL uses modern naming (T7/T8/P7/P8)
'Fp1': 'Fp1', 'Fp2': 'Fp2', 'F3': 'F3', 'F4': 'F4',
'C3': 'C3', 'C4': 'C4', 'P3': 'P3', 'P4': 'P4',
'O1': 'O1', 'O2': 'O2', 'F7': 'F7', 'F8': 'F8',
'T3': 'T7', 'T4': 'T8', 'T5': 'P7', 'T6': 'P8',
'Fz': 'Fz', 'Cz': 'Cz', 'Pz': 'Pz'
},
'TD_BRAIN': { # Modern naming convention
'Fp1': 'Fp1', 'Fp2': 'Fp2', 'F3': 'F3', 'F4': 'F4',
'C3': 'C3', 'C4': 'C4', 'P3': 'P3', 'P4get_dataset_channels function · python · L288-L293 (6 LOC)utils/core.py
def get_dataset_channels(dataset_name: str) -> List[str]:
"""Get the list of channels for a dataset (may be fewer than 19 for some datasets)"""
config = get_dataset_config(dataset_name)
if 'channels' in config:
return config['channels']
return EEGConfig.STANDARD_CHANNELS # Default to full 19 channelscreate_channel_mask function · python · L296-L313 (18 LOC)utils/core.py
def create_channel_mask(dataset_channels: List[str], all_channels: List[str] = None) -> np.ndarray:
"""Create binary mask indicating which channels are present.
Args:
dataset_channels: List of channel names present in the dataset
all_channels: Full list of possible channels (defaults to EXTENDED_CHANNELS)
Returns:
Binary mask array where 1 = channel present, 0 = channel absent
"""
if all_channels is None:
all_channels = EEGConfig.EXTENDED_CHANNELS
mask = np.zeros(len(all_channels), dtype=np.float32)
for ch in dataset_channels:
if ch in all_channels:
mask[all_channels.index(ch)] = 1.0
return mask‹ prevpage 4 / 4