Function bodies 108 total
get_char_and_color function · python · L66-L86 (21 LOC)assets/gen_brain.py
def get_char_and_color(brightness):
if brightness < 20:
return (' ', '')
elif brightness < 50:
c = random.choice(['.', ',', ':', ';'])
return (c, DIM)
elif brightness < 90:
c = random.choice(['-', '~', '=', '+', '*'])
return (c, random.choice([DIM, BLUE]))
elif brightness < 130:
c = random.choice(slash_chars + operator_chars)
return (c, random.choice([BLUE, ORANGE, PURPLE]))
elif brightness < 170:
c = random.choice(bracket_chars + brace_chars + ['*', '#'])
return (c, random.choice([GREEN, AMBER, PURPLE]))
elif brightness < 210:
c = random.choice(brace_chars + bracket_chars + operator_chars + ['#', '@'])
return (c, random.choice([GREEN, AMBER, WHITE, BLUE]))
else:
c = random.choice(special_chars + brace_chars + bracket_chars)
return (c, random.choice([GREEN, AMBER, WHITE, ORANGE]))loadEntitySummary function · javascript · L57-L62 (6 LOC)scripts/generate-context.js
function loadEntitySummary(maxLines = 60) {
const entityIndexPath = path.join(brainPath, 'Entities', 'People', 'ENTITY-INDEX.md');
if (!fs.existsSync(entityIndexPath)) return '';
const content = fs.readFileSync(entityIndexPath, 'utf8');
return content.split('\n').slice(0, maxLines).join('\n');
}loadActionItems function · javascript · L64-L71 (8 LOC)scripts/generate-context.js
function loadActionItems(max = 10) {
const todosPath = path.join(brainPath, 'Operations', 'todos', 'aggregated-todos.json');
if (!fs.existsSync(todosPath)) return [];
try {
const todos = JSON.parse(fs.readFileSync(todosPath, 'utf8'));
return (todos.items || []).filter(t => !t.completed).slice(0, max);
} catch(e) { return []; }
}loadProjects function · javascript · L73-L96 (24 LOC)scripts/generate-context.js
function loadProjects() {
const projects = [];
const projectsDir = path.join(brainPath, 'Projects');
if (!fs.existsSync(projectsDir)) return projects;
try {
const entries = fs.readdirSync(projectsDir, { withFileTypes: true });
for (const entry of entries) {
if (entry.isDirectory() && !entry.name.startsWith('_') && !entry.name.startsWith('.')) {
const metaPath = path.join(projectsDir, entry.name, '_meta.json');
if (fs.existsSync(metaPath)) {
try {
const meta = JSON.parse(fs.readFileSync(metaPath, 'utf8'));
projects.push({ name: entry.name, ...meta });
} catch(e) {
projects.push({ name: entry.name, status: 'unknown' });
}
} else {
projects.push({ name: entry.name, status: 'unknown' });
}
}
}
} catch(e) {}
return projects;
}buildCommandsTable function · javascript · L98-L130 (33 LOC)scripts/generate-context.js
function buildCommandsTable(zone) {
const cmds = [
['`/brain`', 'Brain status, scan, configure, profiles'],
['`/dashboard`', 'Visual dashboard — setup checklist, integrations, quick launch'],
['`/knowledge <topic>`', 'Load specific brain knowledge'],
['`/todo`', 'View and manage tasks'],
];
if (features?.entity_management) {
cmds.push(['`/entity <name>`', 'Look up or add an entity']);
}
if (features?.gsd_workflow) {
cmds.push(['`/gsd`', 'Project management workflow']);
}
if (features?.time_tracking) {
cmds.push(['`/hours`', 'Time tracking summary']);
}
if (features?.content_pipeline) {
cmds.push(['`/content`', 'Content pipeline management']);
}
if (features?.communications) {
cmds.push(['`/comms`', 'Unified communications']);
}
if (features?.intake_processing) {
cmds.push(['`/intake`', 'Process intake files']);
}
if (features?.outreach_engine) {
cmds.push(['`/outreach`', 'Lead pipeline management']);
}
if (fegenerateBrainContext function · javascript · L136-L269 (134 LOC)scripts/generate-context.js
function generateBrainContext() {
// Full brain context — maximum detail
const lines = [];
const projects = loadProjects();
const actionItems = loadActionItems(10);
const entitySummary = loadEntitySummary(60);
lines.push(`# ${profile.businessName || 'My'} Brain — BizBrain OS`);
lines.push('');
lines.push(`> Owner: ${profile.userName || 'Unknown'}`);
if (profile.businessType) lines.push(`> Type: ${profile.businessType}`);
if (profile.industry) lines.push(`> Industry: ${profile.industry}`);
lines.push(`> Brain: \`${brainPath}\``);
if (mode === 'full') {
lines.push(`> Mode: Full (three-zone) | Zone: brain`);
}
lines.push('');
// Active features
const activeFeatures = Object.entries(features || {})
.filter(([k, v]) => v)
.map(([k]) => k.replace(/_/g, ' '));
if (activeFeatures.length > 0) {
lines.push('## Active Features');
lines.push(activeFeatures.map(f => `- ${f}`).join('\n'));
lines.push('');
}
// Commands
const cmds = bgenerateWorkspacesContext function · javascript · L271-L341 (71 LOC)scripts/generate-context.js
function generateWorkspacesContext() {
// Lean context for code development — just what developers need
const lines = [];
const projects = loadProjects();
lines.push(`# BizBrain OS — Workspaces`);
lines.push('');
lines.push(`> Owner: ${profile.userName || 'Unknown'} | ${profile.businessName || ''}`);
lines.push(`> Brain: \`${brainPath}\` (use \`/knowledge\` to load brain data)`);
lines.push('');
// Compact commands
lines.push('## Commands');
lines.push('| Command | Description |');
lines.push('|---------|-------------|');
lines.push('| `/knowledge <topic>` | Load brain knowledge |');
lines.push('| `/todo` | View and manage tasks |');
lines.push('| `/todo add <task>` | Add task (auto-routes to project) |');
lines.push('| `/gsd` | Project management workflow |');
lines.push('| `/hours` | Time tracking |');
lines.push('| `/entity <name>` | Look up entity |');
lines.push('| `/dashboard` | Visual dashboard in browser |');
lines.push('| `/brain statuSource: Repobility analyzer · https://repobility.com
generateLaunchpadContext function · javascript · L343-L486 (144 LOC)scripts/generate-context.js
function generateLaunchpadContext() {
// Medium context — optimized starting point for all Claude Code sessions
const lines = [];
const projects = loadProjects();
const actionItems = loadActionItems(10);
const entitySummary = loadEntitySummary(40);
lines.push(`# ${profile.businessName || 'My'} Brain — Launchpad`);
lines.push('');
lines.push(`> Owner: ${profile.userName || 'Unknown'}`);
if (profile.businessType) lines.push(`> Type: ${profile.businessType}`);
lines.push(`> Brain: \`${brainPath}\``);
lines.push('');
lines.push('**This is the launchpad.** Start all Claude Code sessions here for optimized context with auto-capture.');
lines.push('All sessions are automatically captured to the brain for future reference.');
lines.push('');
// All commands available
const cmds = buildCommandsTable('launchpad');
lines.push('## Commands');
lines.push('| Command | Description |');
lines.push('|---------|-------------|');
cmds.forEach(([cmd, desc]) => ligenerateExternalContext function · javascript · L488-L522 (35 LOC)scripts/generate-context.js
function generateExternalContext() {
// Minimal context for sessions outside the brain entirely
const lines = [];
lines.push(`# BizBrain OS`);
lines.push('');
lines.push(`> Brain: \`${brainPath}\``);
if (mode === 'full' && rootPath) {
lines.push(`> Workspaces: \`${rootPath}/workspaces\``);
lines.push(`> Launchpad: \`${rootPath}/launchpad\``);
}
lines.push('');
// Key commands
lines.push('## Commands');
lines.push('| Command | Description |');
lines.push('|---------|-------------|');
lines.push('| `/brain status` | Brain dashboard |');
lines.push('| `/dashboard` | Visual dashboard in browser |');
lines.push('| `/knowledge <topic>` | Load brain knowledge |');
lines.push('| `/todo` | View and manage tasks |');
lines.push('| `/entity <name>` | Look up entity |');
lines.push('| `/hours` | Time tracking |');
lines.push('| `/gsd` | Project management |');
lines.push('');
// Active auto-behaviors (just mention they're on)
lines.push('## AuloadOrchestratorBrief function · javascript · L528-L573 (46 LOC)scripts/generate-context.js
function loadOrchestratorBrief() {
const brief = { events: 0, pending: 0, conflicts: 0, changelog: [] };
// Count queued events
const eventsDir = path.join(brainPath, '.bizbrain', 'events');
if (fs.existsSync(eventsDir)) {
try {
brief.events = fs.readdirSync(eventsDir)
.filter(f => f.endsWith('.json')).length;
} catch(e) {}
}
// Count pending staged proposals
const pendingDir = path.join(brainPath, '.bizbrain', 'staging', 'pending');
if (fs.existsSync(pendingDir)) {
try {
brief.pending = fs.readdirSync(pendingDir)
.filter(f => f.endsWith('.json')).length;
} catch(e) {}
}
// Count conflicts
const conflictsDir = path.join(brainPath, '.bizbrain', 'staging', 'conflicts');
if (fs.existsSync(conflictsDir)) {
try {
brief.conflicts = fs.readdirSync(conflictsDir)
.filter(f => f.endsWith('.json')).length;
} catch(e) {}
}
// Recent changelog entries (today)
const today = new Date().toISOString().spappendContinuousLearning function · javascript · L579-L835 (257 LOC)scripts/generate-context.js
function appendContinuousLearning(lines) {
const orchestrationEnabled = features?.orchestration === true;
if (orchestrationEnabled) {
// Orchestrator mode — Brain Swarm active, make it VERY visible
const brief = loadOrchestratorBrief();
// ── Swarm Header ──────────────────────────────────────────────
lines.push('## BRAIN SWARM — ACTIVE');
lines.push('');
lines.push('```');
lines.push('╔══════════════════════════════════════════════════════════════╗');
lines.push('║ BRAIN SWARM CONTROL ║');
lines.push('╠══════════════════════════════════════════════════════════════╣');
lines.push('║ ║');
lines.push(`║ EVENT QUEUE ···· ${String(brief.events).padStart(3)} waiting STAGED ········ ${String(brief.pending).padStart(3)} pending ║`);
lines.push(`║ CONFLICTS ····· ${String(brief.conflicts).padStart(3)} flagged CHANGELOG ····· ${briefgenerateImage function · javascript · L53-L122 (70 LOC)tools/dashboard/generate-icons.js
async function generateImage(item) {
const outPath = path.join(OUT_DIR, `${item.id}.png`);
if (fs.existsSync(outPath)) {
console.log(`SKIP ${item.id} (exists)`);
return;
}
return new Promise((resolve, reject) => {
const payload = JSON.stringify({
model: 'gpt-image-1',
prompt: item.prompt,
size: '1024x1024',
quality: 'medium',
});
const req = https.request({
hostname: 'api.openai.com',
path: '/v1/images/generations',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${API_KEY}`,
'Content-Length': Buffer.byteLength(payload),
},
}, (res) => {
let body = '';
res.on('data', chunk => body += chunk);
res.on('end', () => {
try {
const data = JSON.parse(body);
if (data.error) {
console.log(`FAIL ${item.id}: ${data.error.message}`);
reject(data.error);
return;
main function · javascript · L124-L140 (17 LOC)tools/dashboard/generate-icons.js
async function main() {
console.log(`Generating ${ICONS.length} icons...`);
console.log(`Output: ${OUT_DIR}\n`);
// Generate 3 at a time to stay within rate limits
for (let i = 0; i < ICONS.length; i += 3) {
const batch = ICONS.slice(i, i + 3);
await Promise.all(batch.map(item => generateImage(item).catch(() => {})));
if (i + 3 < ICONS.length) {
await new Promise(r => setTimeout(r, 1000)); // Brief pause between batches
}
}
console.log('\nDone!');
const files = fs.readdirSync(OUT_DIR).filter(f => f.endsWith('.png'));
console.log(`Generated: ${files.length}/${ICONS.length} icons`);
}getItemsByCategory function · javascript · L1020-L1022 (3 LOC)tools/dashboard/public/js/data.js
function getItemsByCategory(category) {
return CHECKLIST_ITEMS.filter(item => item.category === category);
}getItemById function · javascript · L1025-L1027 (3 LOC)tools/dashboard/public/js/data.js
function getItemById(id) {
return CHECKLIST_ITEMS.find(item => item.id === id);
}Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
getCategoryColor function · javascript · L1030-L1032 (3 LOC)tools/dashboard/public/js/data.js
function getCategoryColor(categoryId) {
return CATEGORIES[categoryId]?.color || '#888';
}getCategoryCompletion function · javascript · L1035-L1039 (5 LOC)tools/dashboard/public/js/data.js
function getCategoryCompletion(categoryId, progress) {
const items = getItemsByCategory(categoryId);
const completed = items.filter(item => progress[item.id] === 'completed').length;
return { completed, total: items.length };
}getOverallCompletion function · javascript · L1042-L1045 (4 LOC)tools/dashboard/public/js/data.js
function getOverallCompletion(progress) {
const completed = CHECKLIST_ITEMS.filter(item => progress[item.id] === 'completed').length;
return { completed, total: CHECKLIST_ITEMS.length };
}getNextRecommended function · javascript · L1048-L1061 (14 LOC)tools/dashboard/public/js/data.js
function getNextRecommended(progress) {
// Priority order: critical > high > medium > low
const priorityOrder = ['critical', 'high', 'medium', 'low'];
for (const priority of priorityOrder) {
const items = CHECKLIST_ITEMS.filter(item => {
if (progress[item.id] === 'completed') return false;
if (item.priority !== priority) return false;
// Check prerequisites
return item.prerequisites.every(prereq => progress[prereq] === 'completed');
});
if (items.length > 0) return items[0];
}
return null;
}findBrainPath function · javascript · L15-L44 (30 LOC)tools/dashboard/server.js
function findBrainPath() {
if (process.env.BIZBRAIN_PATH && fs.existsSync(process.env.BIZBRAIN_PATH)) {
return process.env.BIZBRAIN_PATH;
}
const home = os.homedir();
const roots = [
path.join(home, 'bizbrain-os'),
path.join(home, 'Documents', 'bizbrain-os'),
];
for (const root of roots) {
if (!fs.existsSync(root)) continue;
// Check for full mode (three-zone architecture)
const rootMarker = path.join(root, '.bizbrain-root.json');
if (fs.existsSync(rootMarker)) {
try {
const marker = JSON.parse(fs.readFileSync(rootMarker, 'utf8'));
if (marker.mode === 'full') {
const brainDir = path.join(root, marker.brainDir || 'brain');
if (fs.existsSync(brainDir)) return brainDir;
}
} catch { /* fall through */ }
}
// Check for brain/ subdir with config (full mode without marker)
const brainSubdir = path.join(root, 'brain');
if (fs.existsSync(path.join(brainSubdir, 'config.json'))) return findBrainRoot function · javascript · L46-L53 (8 LOC)tools/dashboard/server.js
function findBrainRoot() {
const brainPath = findBrainPath();
if (!brainPath) return null;
// In full mode, root is parent of brain/
const parent = path.dirname(brainPath);
if (fs.existsSync(path.join(parent, '.bizbrain-root.json'))) return parent;
return brainPath;
}getBrainStats function · javascript · L55-L74 (20 LOC)tools/dashboard/server.js
function getBrainStats(brainPath) {
if (!brainPath) return null;
const stats = { entities: 0, projects: 0, knowledge: 0, integrations: 0, sessions: 0 };
try {
const entitiesDir = path.join(brainPath, 'Entities');
if (fs.existsSync(entitiesDir)) {
for (const type of ['Clients', 'Partners', 'Vendors', 'People']) {
const dir = path.join(entitiesDir, type);
if (fs.existsSync(dir)) stats.entities += fs.readdirSync(dir).filter(f => !f.startsWith('.')).length;
}
}
const projectsDir = path.join(brainPath, 'Projects');
if (fs.existsSync(projectsDir)) stats.projects = fs.readdirSync(projectsDir).filter(f => !f.startsWith('.')).length;
const knowledgeDir = path.join(brainPath, 'Knowledge');
if (fs.existsSync(knowledgeDir)) stats.knowledge = countFiles(knowledgeDir);
const credsDir = path.join(brainPath, 'Operations', 'credentials', 'vault');
if (fs.existsSync(credsDir)) stats.integrations = fs.readdirSync(credsDir).filter(f => countFiles function · javascript · L76-L85 (10 LOC)tools/dashboard/server.js
function countFiles(dir) {
let count = 0;
try {
for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
if (entry.isDirectory()) count += countFiles(path.join(dir, entry.name));
else if (!entry.name.startsWith('.')) count++;
}
} catch (e) { /* ignore */ }
return count;
}All rows scored by the Repobility analyzer (https://repobility.com)
getProgressPath function · javascript · L87-L93 (7 LOC)tools/dashboard/server.js
function getProgressPath(brainPath) {
// Store progress at root level .bizbrain/ (works for both compact and full mode)
const root = findBrainRoot() || brainPath;
const dashDir = path.join(root, '.bizbrain', 'dashboard');
if (!fs.existsSync(dashDir)) fs.mkdirSync(dashDir, { recursive: true });
return path.join(dashDir, 'progress.json');
}readProgress function · javascript · L95-L100 (6 LOC)tools/dashboard/server.js
function readProgress(brainPath) {
if (!brainPath) return {};
const p = getProgressPath(brainPath);
if (!fs.existsSync(p)) return {};
try { return JSON.parse(fs.readFileSync(p, 'utf8')); } catch { return {}; }
}writeProgress function · javascript · L102-L105 (4 LOC)tools/dashboard/server.js
function writeProgress(brainPath, data) {
if (!brainPath) return;
fs.writeFileSync(getProgressPath(brainPath), JSON.stringify(data, null, 2));
}BrainUpdater class · python · L16-L233 (218 LOC)tools/meeting-transcriber/meeting_transcriber/brain_updater.py
class BrainUpdater:
"""Updates BB1 entity records and todos after meeting transcription.
Immediate (keyword-based, no LLM):
- update_entity_histories() — appends meeting entry to entity history files
Deferred (called by skill after AI processing):
- write_action_items() — routes extracted action items to entity/todo files
"""
def __init__(self, brain_path: Path):
self.brain_path = brain_path
self._entity_map: dict[str, dict] | None = None
def _load_entity_map(self) -> dict[str, dict]:
"""Parse ENTITY-INDEX.md into keyword → entity info lookup.
Returns dict mapping lowercase keywords to:
{"name": str, "type": str, "folder": str}
"""
if self._entity_map is not None:
return self._entity_map
self._entity_map = {}
index_path = self.brain_path / "Operations" / "entity-watchdog" / "ENTITY-INDEX.md"
if not index_path.exists():
return self._en__init__ method · python · L26-L28 (3 LOC)tools/meeting-transcriber/meeting_transcriber/brain_updater.py
def __init__(self, brain_path: Path):
self.brain_path = brain_path
self._entity_map: dict[str, dict] | None = None_load_entity_map method · python · L30-L84 (55 LOC)tools/meeting-transcriber/meeting_transcriber/brain_updater.py
def _load_entity_map(self) -> dict[str, dict]:
"""Parse ENTITY-INDEX.md into keyword → entity info lookup.
Returns dict mapping lowercase keywords to:
{"name": str, "type": str, "folder": str}
"""
if self._entity_map is not None:
return self._entity_map
self._entity_map = {}
index_path = self.brain_path / "Operations" / "entity-watchdog" / "ENTITY-INDEX.md"
if not index_path.exists():
return self._entity_map
try:
content = index_path.read_text(encoding="utf-8")
except Exception:
return self._entity_map
for line in content.splitlines():
if not line.startswith("|") or "---" in line:
continue
parts = [p.strip() for p in line.split("|")]
if len(parts) < 4:
continue
name = parts[1]
entity_type = parts[2].lower() if len(parts) > 2 else ""
alia_detect_entity_slugs method · python · L86-L105 (20 LOC)tools/meeting-transcriber/meeting_transcriber/brain_updater.py
def _detect_entity_slugs(
self,
segments: list[TranscriptSegment | SpeakerSegment],
) -> list[dict]:
"""Find entities mentioned in transcript segments.
Returns list of entity info dicts for matched entities.
"""
entity_map = self._load_entity_map()
if not entity_map:
return []
all_text = " ".join(seg.text for seg in segments).lower()
found: dict[str, dict] = {} # entity name → info (deduplicated)
for keyword, info in entity_map.items():
if re.search(r"\b" + re.escape(keyword) + r"\b", all_text):
found[info["name"]] = info
return list(found.values())update_entity_histories method · python · L107-L170 (64 LOC)tools/meeting-transcriber/meeting_transcriber/brain_updater.py
def update_entity_histories(
self,
meeting: MeetingInfo,
segments: list[TranscriptSegment | SpeakerSegment],
) -> list[str]:
"""Append meeting reference to each detected entity's history file.
This is keyword-based (no LLM needed) and runs immediately after transcription.
Returns list of entity names that were updated.
"""
detected = self._detect_entity_slugs(segments)
if not detected:
return []
date_str = meeting.started_at.strftime("%Y-%m-%d")
time_str = meeting.started_at.strftime("%H:%M")
duration = f"{meeting.duration_minutes:.0f}" if meeting.ended_at else "ongoing"
transcript_ref = f"Operations/meetings/transcripts/{date_str}-{meeting.slug}.md"
entry = (
f"\n### {date_str} — Meeting ({meeting.platform})\n"
f"- **Time:** {time_str}\n"
f"- **Duration:** {duration} min\n"
f"- **Title:** {meeting.title}\If a scraper extracted this row, it came from Repobility (https://repobility.com)
write_action_items method · python · L172-L233 (62 LOC)tools/meeting-transcriber/meeting_transcriber/brain_updater.py
def write_action_items(
self,
meeting: MeetingInfo,
action_items: list[dict],
) -> int:
"""Route AI-extracted action items to entity files or operational todos.
Called by the skill after Claude processes the transcript and extracts items.
Each action_item dict should have:
- "text": str — the action item description
- "owner": str | None — assignee name (matched against entities)
- "entity": str | None — related entity name
Returns count of items written.
"""
entity_map = self._load_entity_map()
date_str = meeting.started_at.strftime("%Y-%m-%d")
written = 0
for item in action_items:
text = item.get("text", "")
entity_name = item.get("entity")
target_path = None
# Try to route to entity action-items file
if entity_name and entity_name.lower() in entity_map:
info = entitfind_brain_path function · python · L12-L26 (15 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def find_brain_path() -> Path | None:
"""Locate the brain folder."""
# BIZBRAIN_PATH env
env_path = os.environ.get("BIZBRAIN_PATH")
if env_path:
p = Path(env_path)
if p.exists():
return p
# ~/bizbrain-os/
home = Path.home() / "bizbrain-os"
if home.exists():
return home
return Nonecmd_daemon function · python · L29-L80 (52 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_daemon(args: list[str]) -> None:
"""Start the meeting transcription daemon."""
brain_path = find_brain_path()
if not brain_path:
print("Error: No brain folder found. Set BIZBRAIN_PATH or run /brain setup.")
sys.exit(1)
model = "base"
language = None
diarize = False
hf_token = os.environ.get("HF_TOKEN")
audio_retention_days = None # Keep forever by default
# Parse flags
i = 0
while i < len(args):
if args[i] in ("--model", "-m") and i + 1 < len(args):
model = args[i + 1]
i += 2
elif args[i] in ("--language", "-l") and i + 1 < len(args):
language = args[i + 1]
i += 2
elif args[i] == "--diarize":
diarize = True
i += 1
elif args[i] == "--hf-token" and i + 1 < len(args):
hf_token = args[i + 1]
i += 2
elif args[i] == "--keep-audio":
audio_retention_days = None
i += cmd_transcribe function · python · L83-L109 (27 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_transcribe(args: list[str]) -> None:
"""Transcribe a specific audio file."""
if not args:
print("Usage: bizbrain-meetings transcribe <audio-file> [--model base]")
sys.exit(1)
audio_path = Path(args[0])
if not audio_path.exists():
print(f"Error: File not found: {audio_path}")
sys.exit(1)
model = "base"
for i, arg in enumerate(args[1:], 1):
if arg in ("--model", "-m") and i + 1 < len(args):
model = args[i + 1]
from .transcriber import WhisperTranscriber
transcriber = WhisperTranscriber(model_size=model)
segments = transcriber.transcribe(audio_path)
for seg in segments:
h = int(seg.start // 3600)
m = int((seg.start % 3600) // 60)
s = int(seg.start % 60)
ts = f"{h:02d}:{m:02d}:{s:02d}" if h else f"{m:02d}:{s:02d}"
print(f"[{ts}] {seg.text}")cmd_status function · python · L112-L137 (26 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_status(args: list[str]) -> None:
"""Show daemon status."""
brain_path = find_brain_path()
if not brain_path:
print("No brain folder found.")
return
status_file = brain_path / ".bizbrain" / "meeting-daemon-status.json"
if not status_file.exists():
print("Meeting daemon has not been run yet.")
return
data = json.loads(status_file.read_text())
print(f"Running: {data.get('running', False)}")
print(f"PID: {data.get('pid', 'N/A')}")
print(f"Meeting active: {data.get('meeting_active', False)}")
print(f"Chunks recorded: {data.get('chunks_recorded', 0)}")
print(f"Chunks transcribed: {data.get('chunks_transcribed', 0)}")
print(f"Last check: {data.get('last_check', 'N/A')}")
if data.get("current_meeting"):
m = data["current_meeting"]
print(f"\nCurrent meeting:")
print(f" Platform: {m.get('platform')}")
print(f" Title: {m.get('title')}")
print(f" Started: {m.getcmd_stop function · python · L140-L162 (23 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_stop(args: list[str]) -> None:
"""Stop the running daemon."""
brain_path = find_brain_path()
if not brain_path:
print("No brain folder found.")
return
pid_file = brain_path / ".bizbrain" / "meeting-daemon.pid"
if not pid_file.exists():
print("No daemon PID file found.")
return
try:
pid = int(pid_file.read_text().strip())
import psutil
proc = psutil.Process(pid)
proc.terminate()
proc.wait(timeout=10)
print(f"Daemon (PID {pid}) stopped.")
except Exception as e:
print(f"Error stopping daemon: {e}")
if pid_file.exists():
pid_file.unlink()cmd_install function · python · L165-L270 (106 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_install(args: list[str]) -> None:
"""Auto-install the meeting transcriber package with platform deps."""
current_platform = platform.system()
print(f"BizBrain Meetings — Auto-Install ({current_platform})\n")
# 1. Find the package directory via __file__
package_dir = Path(__file__).resolve().parent.parent
pyproject = package_dir / "pyproject.toml"
if not pyproject.exists():
print(f"Error: Could not find pyproject.toml at {package_dir}")
print("Expected the package directory to be the parent of meeting_transcriber/")
sys.exit(1)
print(f"Package: {package_dir}")
# 2. Determine platform extras
if current_platform == "Windows":
extras = "windows"
elif current_platform == "Darwin":
extras = "macos"
else:
extras = ""
# 3. Find installer (uv preferred, pip fallback)
import shutil
uv_path = shutil.which("uv")
pip_path = shutil.which("pip") or shutil.which("pip3")
if cmd_setup function · python · L273-L331 (59 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_setup(args: list[str]) -> None:
"""Check prerequisites and show platform-specific setup instructions."""
current_platform = platform.system()
print(f"BizBrain Meetings — Setup Check ({current_platform})\n")
# Check brain
brain = find_brain_path()
print(f"Brain folder: {brain or 'NOT FOUND'}")
# Core dependencies (cross-platform)
print("\nCore dependencies:")
core_deps = {
"faster_whisper": "faster-whisper",
"sounddevice": "sounddevice",
"numpy": "numpy",
"psutil": "psutil",
}
missing = []
for module, package in core_deps.items():
try:
__import__(module)
print(f" {package}: installed")
except ImportError:
print(f" {package}: MISSING")
missing.append(package)
# Platform-specific dependencies
print(f"\nPlatform ({current_platform}):")
if current_platform == "Windows":
_check_windows_deps(missing)
elif current_Source: Repobility analyzer · https://repobility.com
_check_windows_deps function · python · L334-L343 (10 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def _check_windows_deps(missing: list[str]) -> None:
"""Check Windows-specific dependencies."""
try:
import pyaudiowpatch
print(" pyaudiowpatch (WASAPI): installed")
except ImportError:
print(" pyaudiowpatch (WASAPI): MISSING")
missing.append("pyaudiowpatch")
print(" Audio setup: No additional setup needed (WASAPI built into Windows)")_check_macos_deps function · python · L346-L404 (59 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def _check_macos_deps(missing: list[str]) -> None:
"""Check macOS-specific dependencies and audio configuration."""
# Check pyobjc-framework-Quartz (optional but recommended)
try:
from Quartz import CGWindowListCopyWindowInfo
print(" pyobjc-framework-Quartz: installed (window title detection)")
except ImportError:
print(" pyobjc-framework-Quartz: not installed (optional, improves meeting detection)")
# Check BlackHole virtual audio device
print("\n Audio setup (BlackHole):")
blackhole_found = False
try:
import sounddevice as sd
devices = sd.query_devices()
for dev in devices:
if "blackhole" in dev["name"].lower() and dev["max_input_channels"] > 0:
blackhole_found = True
print(f" BlackHole device: FOUND — {dev['name']}")
break
except Exception:
pass
if not blackhole_found:
print(" BlackHole device: NOT FOUND")
main function · python · L417-L441 (25 LOC)tools/meeting-transcriber/meeting_transcriber/cli.py
def main() -> None:
if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"):
print("Usage: bizbrain-meetings <command> [args]")
print("\nCommands:")
print(" daemon Start the meeting transcription daemon")
print(" transcribe Transcribe a specific audio file")
print(" status Show daemon status")
print(" stop Stop the running daemon")
print(" setup Check prerequisites and show setup info")
print(" install Auto-install package with platform dependencies")
print("\nDaemon flags:")
print(" --model tiny|base|small|medium|large-v3 Whisper model (default: base)")
print(" --language en Force language (default: auto)")
print(" --diarize Enable speaker diarization")
print(" --keep-audio Keep recordings forever (default)")
print(" --delete-audio-after N MeetingDaemon class · python · L21-L274 (254 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
class MeetingDaemon:
"""Main orchestrator: polls for meetings, records, transcribes, saves.
Lifecycle:
1. Poll detector every POLL_INTERVAL_SEC
2. When meeting detected → start loopback recorder
3. When meeting ends → stop recorder → transcribe chunks → save to brain
4. Optionally clean up old audio files based on retention policy
"""
def __init__(
self,
brain_path: Path,
model_size: str = "base",
language: str | None = None,
diarize: bool = False,
hf_token: str | None = None,
audio_retention_days: int | None = None,
):
self.brain_path = brain_path
self.model_size = model_size
self.language = language
self.diarize = diarize
self.hf_token = hf_token
self.audio_retention_days = audio_retention_days # None = keep forever
self._bizbrain_dir = brain_path / ".bizbrain"
self._pid_file = self._bizbrain_dir / "meeting-dae__init__ method · python · L31-L54 (24 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def __init__(
self,
brain_path: Path,
model_size: str = "base",
language: str | None = None,
diarize: bool = False,
hf_token: str | None = None,
audio_retention_days: int | None = None,
):
self.brain_path = brain_path
self.model_size = model_size
self.language = language
self.diarize = diarize
self.hf_token = hf_token
self.audio_retention_days = audio_retention_days # None = keep forever
self._bizbrain_dir = brain_path / ".bizbrain"
self._pid_file = self._bizbrain_dir / "meeting-daemon.pid"
self._status_file = self._bizbrain_dir / "meeting-daemon-status.json"
self._audio_dir = brain_path / "Operations" / "meetings" / "_audio"
self._recordings_dir = brain_path / "Operations" / "meetings" / "recordings"
self._running = False
self._current_meeting: MeetingInfo | None = None
self._recorder: LoopbackRecorder | None start method · python · L56-L95 (40 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def start(self) -> None:
"""Start the daemon. Writes PID file and enters main loop."""
self._bizbrain_dir.mkdir(parents=True, exist_ok=True)
self._audio_dir.mkdir(parents=True, exist_ok=True)
self._recordings_dir.mkdir(parents=True, exist_ok=True)
# Check for existing daemon
if self._pid_file.exists():
try:
existing_pid = int(self._pid_file.read_text().strip())
import psutil
if psutil.pid_exists(existing_pid):
print(f"Daemon already running (PID {existing_pid})")
sys.exit(1)
except (ValueError, ImportError):
pass
# Write PID
self._pid_file.write_text(str(os.getpid()))
self._running = True
# Handle graceful shutdown
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
retention_msg = (
f"delestop method · python · L97-L99 (3 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def stop(self) -> None:
"""Signal the daemon to stop."""
self._running = False_main_loop method · python · L101-L119 (19 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _main_loop(self) -> None:
while self._running:
try:
if self._current_meeting:
# Meeting in progress — check if it ended
detected = detect_meeting()
if detected is None:
self._on_meeting_end()
else:
self._update_status(meeting_active=True)
else:
# No meeting — poll for one
detected = detect_meeting()
if detected is not None:
self._on_meeting_start(detected)
except Exception as e:
print(f"Error in main loop: {e}")
time.sleep(POLL_INTERVAL_SEC)Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
_on_meeting_start method · python · L121-L139 (19 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _on_meeting_start(self, detected: DetectedMeeting) -> None:
"""Called when a new meeting is detected."""
now = datetime.now()
self._current_meeting = MeetingInfo(
platform=detected.platform,
title=detected.window_title or detected.platform,
started_at=now,
process_name=detected.process_name,
window_title=detected.window_title,
)
# Start recording to a session-specific audio directory
session_dir = self._audio_dir / now.strftime("%Y-%m-%d_%H%M%S")
self._recorder = LoopbackRecorder(session_dir)
self._recorder.start()
print(f"\nMeeting detected: {detected.platform} — {detected.window_title}")
print(f"Recording to: {session_dir}")
self._update_status(meeting_active=True)_on_meeting_end method · python · L141-L209 (69 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _on_meeting_end(self) -> None:
"""Called when the active meeting ends."""
if not self._current_meeting or not self._recorder:
return
self._current_meeting.ended_at = datetime.now()
print(f"\nMeeting ended ({self._current_meeting.duration_minutes:.0f} min)")
# Stop recording
chunk_paths = self._recorder.stop()
self._current_meeting.audio_chunks = chunk_paths
print(f"Recorded {len(chunk_paths)} audio chunk(s)")
if not chunk_paths:
print("No audio recorded — skipping transcription")
self._current_meeting = None
self._recorder = None
self._update_status(meeting_active=False)
return
# Stitch chunks into a single permanent recording
recording_path = self._stitch_recording(chunk_paths)
if recording_path:
self._current_meeting.recording_path = recording_path
print(f"Recording saved: {recording_stitch_recording method · python · L211-L231 (21 LOC)tools/meeting-transcriber/meeting_transcriber/daemon.py
def _stitch_recording(self, chunk_paths: list[Path]) -> Path | None:
"""Stitch audio chunks into a single clean WAV file for permanent storage."""
if not chunk_paths:
return None
date_str = self._current_meeting.started_at.strftime("%Y-%m-%d")
slug = self._current_meeting.slug
recording_path = self._recordings_dir / f"{date_str}-{slug}.wav"
try:
from .diarizer import _stitch_wav_files
_stitch_wav_files(chunk_paths, recording_path)
return recording_path
except Exception:
# Fallback: manual stitching without diarizer import
try:
_stitch_wav_chunks(chunk_paths, recording_path)
return recording_path
except Exception as e:
print(f"Warning: Could not stitch recording: {e}")
return Nonepage 1 / 3next ›