← back to drzachconner__bizbrain-os-plugin

Function bodies 108 total

All specs Real LLM only Function bodies
get_char_and_color function · python · L66-L86 (21 LOC)
assets/gen_brain.py
def get_char_and_color(brightness):
    if brightness < 20:
        return (' ', '')
    elif brightness < 50:
        c = random.choice(['.', ',', ':', ';'])
        return (c, DIM)
    elif brightness < 90:
        c = random.choice(['-', '~', '=', '+', '*'])
        return (c, random.choice([DIM, BLUE]))
    elif brightness < 130:
        c = random.choice(slash_chars + operator_chars)
        return (c, random.choice([BLUE, ORANGE, PURPLE]))
    elif brightness < 170:
        c = random.choice(bracket_chars + brace_chars + ['*', '#'])
        return (c, random.choice([GREEN, AMBER, PURPLE]))
    elif brightness < 210:
        c = random.choice(brace_chars + bracket_chars + operator_chars + ['#', '@'])
        return (c, random.choice([GREEN, AMBER, WHITE, BLUE]))
    else:
        c = random.choice(special_chars + brace_chars + bracket_chars)
        return (c, random.choice([GREEN, AMBER, WHITE, ORANGE]))
loadEntitySummary function · javascript · L57-L62 (6 LOC)
scripts/generate-context.js
function loadEntitySummary(maxLines = 60) {
  const entityIndexPath = path.join(brainPath, 'Entities', 'People', 'ENTITY-INDEX.md');
  if (!fs.existsSync(entityIndexPath)) return '';
  const content = fs.readFileSync(entityIndexPath, 'utf8');
  return content.split('\n').slice(0, maxLines).join('\n');
}
loadActionItems function · javascript · L64-L71 (8 LOC)
scripts/generate-context.js
function loadActionItems(max = 10) {
  const todosPath = path.join(brainPath, 'Operations', 'todos', 'aggregated-todos.json');
  if (!fs.existsSync(todosPath)) return [];
  try {
    const todos = JSON.parse(fs.readFileSync(todosPath, 'utf8'));
    return (todos.items || []).filter(t => !t.completed).slice(0, max);
  } catch(e) { return []; }
}
loadProjects function · javascript · L73-L96 (24 LOC)
scripts/generate-context.js
function loadProjects() {
  const projects = [];
  const projectsDir = path.join(brainPath, 'Projects');
  if (!fs.existsSync(projectsDir)) return projects;
  try {
    const entries = fs.readdirSync(projectsDir, { withFileTypes: true });
    for (const entry of entries) {
      if (entry.isDirectory() && !entry.name.startsWith('_') && !entry.name.startsWith('.')) {
        const metaPath = path.join(projectsDir, entry.name, '_meta.json');
        if (fs.existsSync(metaPath)) {
          try {
            const meta = JSON.parse(fs.readFileSync(metaPath, 'utf8'));
            projects.push({ name: entry.name, ...meta });
          } catch(e) {
            projects.push({ name: entry.name, status: 'unknown' });
          }
        } else {
          projects.push({ name: entry.name, status: 'unknown' });
        }
      }
    }
  } catch(e) {}
  return projects;
}
buildCommandsTable function · javascript · L98-L130 (33 LOC)
scripts/generate-context.js
function buildCommandsTable(zone) {
  const cmds = [
    ['`/brain`', 'Brain status, scan, configure, profiles'],
    ['`/dashboard`', 'Visual dashboard — setup checklist, integrations, quick launch'],
    ['`/knowledge <topic>`', 'Load specific brain knowledge'],
    ['`/todo`', 'View and manage tasks'],
  ];
  if (features?.entity_management) {
    cmds.push(['`/entity <name>`', 'Look up or add an entity']);
  }
  if (features?.gsd_workflow) {
    cmds.push(['`/gsd`', 'Project management workflow']);
  }
  if (features?.time_tracking) {
    cmds.push(['`/hours`', 'Time tracking summary']);
  }
  if (features?.content_pipeline) {
    cmds.push(['`/content`', 'Content pipeline management']);
  }
  if (features?.communications) {
    cmds.push(['`/comms`', 'Unified communications']);
  }
  if (features?.intake_processing) {
    cmds.push(['`/intake`', 'Process intake files']);
  }
  if (features?.outreach_engine) {
    cmds.push(['`/outreach`', 'Lead pipeline management']);
  }
  if (fe
generateBrainContext function · javascript · L136-L269 (134 LOC)
scripts/generate-context.js
function generateBrainContext() {
  // Full brain context — maximum detail
  const lines = [];
  const projects = loadProjects();
  const actionItems = loadActionItems(10);
  const entitySummary = loadEntitySummary(60);

  lines.push(`# ${profile.businessName || 'My'} Brain — BizBrain OS`);
  lines.push('');
  lines.push(`> Owner: ${profile.userName || 'Unknown'}`);
  if (profile.businessType) lines.push(`> Type: ${profile.businessType}`);
  if (profile.industry) lines.push(`> Industry: ${profile.industry}`);
  lines.push(`> Brain: \`${brainPath}\``);
  if (mode === 'full') {
    lines.push(`> Mode: Full (three-zone) | Zone: brain`);
  }
  lines.push('');

  // Active features
  const activeFeatures = Object.entries(features || {})
    .filter(([k, v]) => v)
    .map(([k]) => k.replace(/_/g, ' '));
  if (activeFeatures.length > 0) {
    lines.push('## Active Features');
    lines.push(activeFeatures.map(f => `- ${f}`).join('\n'));
    lines.push('');
  }

  // Commands
  const cmds = b
generateWorkspacesContext function · javascript · L271-L341 (71 LOC)
scripts/generate-context.js
function generateWorkspacesContext() {
  // Lean context for code development — just what developers need
  const lines = [];
  const projects = loadProjects();

  lines.push(`# BizBrain OS — Workspaces`);
  lines.push('');
  lines.push(`> Owner: ${profile.userName || 'Unknown'} | ${profile.businessName || ''}`);
  lines.push(`> Brain: \`${brainPath}\` (use \`/knowledge\` to load brain data)`);
  lines.push('');

  // Compact commands
  lines.push('## Commands');
  lines.push('| Command | Description |');
  lines.push('|---------|-------------|');
  lines.push('| `/knowledge <topic>` | Load brain knowledge |');
  lines.push('| `/todo` | View and manage tasks |');
  lines.push('| `/todo add <task>` | Add task (auto-routes to project) |');
  lines.push('| `/gsd` | Project management workflow |');
  lines.push('| `/hours` | Time tracking |');
  lines.push('| `/entity <name>` | Look up entity |');
  lines.push('| `/dashboard` | Visual dashboard in browser |');
  lines.push('| `/brain statu
Source: Repobility analyzer · https://repobility.com
generateLaunchpadContext function · javascript · L343-L486 (144 LOC)
scripts/generate-context.js
function generateLaunchpadContext() {
  // Medium context — optimized starting point for all Claude Code sessions
  const lines = [];
  const projects = loadProjects();
  const actionItems = loadActionItems(10);
  const entitySummary = loadEntitySummary(40);

  lines.push(`# ${profile.businessName || 'My'} Brain — Launchpad`);
  lines.push('');
  lines.push(`> Owner: ${profile.userName || 'Unknown'}`);
  if (profile.businessType) lines.push(`> Type: ${profile.businessType}`);
  lines.push(`> Brain: \`${brainPath}\``);
  lines.push('');

  lines.push('**This is the launchpad.** Start all Claude Code sessions here for optimized context with auto-capture.');
  lines.push('All sessions are automatically captured to the brain for future reference.');
  lines.push('');

  // All commands available
  const cmds = buildCommandsTable('launchpad');
  lines.push('## Commands');
  lines.push('| Command | Description |');
  lines.push('|---------|-------------|');
  cmds.forEach(([cmd, desc]) => li
generateExternalContext function · javascript · L488-L522 (35 LOC)
scripts/generate-context.js
function generateExternalContext() {
  // Minimal context for sessions outside the brain entirely
  const lines = [];

  lines.push(`# BizBrain OS`);
  lines.push('');
  lines.push(`> Brain: \`${brainPath}\``);
  if (mode === 'full' && rootPath) {
    lines.push(`> Workspaces: \`${rootPath}/workspaces\``);
    lines.push(`> Launchpad: \`${rootPath}/launchpad\``);
  }
  lines.push('');

  // Key commands
  lines.push('## Commands');
  lines.push('| Command | Description |');
  lines.push('|---------|-------------|');
  lines.push('| `/brain status` | Brain dashboard |');
  lines.push('| `/dashboard` | Visual dashboard in browser |');
  lines.push('| `/knowledge <topic>` | Load brain knowledge |');
  lines.push('| `/todo` | View and manage tasks |');
  lines.push('| `/entity <name>` | Look up entity |');
  lines.push('| `/hours` | Time tracking |');
  lines.push('| `/gsd` | Project management |');
  lines.push('');

  // Active auto-behaviors (just mention they're on)
  lines.push('## Au
loadOrchestratorBrief function · javascript · L528-L573 (46 LOC)
scripts/generate-context.js
function loadOrchestratorBrief() {
  const brief = { events: 0, pending: 0, conflicts: 0, changelog: [] };

  // Count queued events
  const eventsDir = path.join(brainPath, '.bizbrain', 'events');
  if (fs.existsSync(eventsDir)) {
    try {
      brief.events = fs.readdirSync(eventsDir)
        .filter(f => f.endsWith('.json')).length;
    } catch(e) {}
  }

  // Count pending staged proposals
  const pendingDir = path.join(brainPath, '.bizbrain', 'staging', 'pending');
  if (fs.existsSync(pendingDir)) {
    try {
      brief.pending = fs.readdirSync(pendingDir)
        .filter(f => f.endsWith('.json')).length;
    } catch(e) {}
  }

  // Count conflicts
  const conflictsDir = path.join(brainPath, '.bizbrain', 'staging', 'conflicts');
  if (fs.existsSync(conflictsDir)) {
    try {
      brief.conflicts = fs.readdirSync(conflictsDir)
        .filter(f => f.endsWith('.json')).length;
    } catch(e) {}
  }

  // Recent changelog entries (today)
  const today = new Date().toISOString().sp
appendContinuousLearning function · javascript · L579-L835 (257 LOC)
scripts/generate-context.js
function appendContinuousLearning(lines) {
  const orchestrationEnabled = features?.orchestration === true;

  if (orchestrationEnabled) {
    // Orchestrator mode — Brain Swarm active, make it VERY visible
    const brief = loadOrchestratorBrief();

    // ── Swarm Header ──────────────────────────────────────────────
    lines.push('## BRAIN SWARM — ACTIVE');
    lines.push('');
    lines.push('```');
    lines.push('╔══════════════════════════════════════════════════════════════╗');
    lines.push('║                    BRAIN SWARM CONTROL                      ║');
    lines.push('╠══════════════════════════════════════════════════════════════╣');
    lines.push('║                                                              ║');
    lines.push(`║  EVENT QUEUE ···· ${String(brief.events).padStart(3)} waiting    STAGED ········ ${String(brief.pending).padStart(3)} pending  ║`);
    lines.push(`║  CONFLICTS ····· ${String(brief.conflicts).padStart(3)} flagged    CHANGELOG ····· ${brief
generateImage function · javascript · L53-L122 (70 LOC)
tools/dashboard/generate-icons.js
async function generateImage(item) {
  const outPath = path.join(OUT_DIR, `${item.id}.png`);
  if (fs.existsSync(outPath)) {
    console.log(`SKIP ${item.id} (exists)`);
    return;
  }

  return new Promise((resolve, reject) => {
    const payload = JSON.stringify({
      model: 'gpt-image-1',
      prompt: item.prompt,
      size: '1024x1024',
      quality: 'medium',
    });

    const req = https.request({
      hostname: 'api.openai.com',
      path: '/v1/images/generations',
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${API_KEY}`,
        'Content-Length': Buffer.byteLength(payload),
      },
    }, (res) => {
      let body = '';
      res.on('data', chunk => body += chunk);
      res.on('end', () => {
        try {
          const data = JSON.parse(body);
          if (data.error) {
            console.log(`FAIL ${item.id}: ${data.error.message}`);
            reject(data.error);
            return;
       
main function · javascript · L124-L140 (17 LOC)
tools/dashboard/generate-icons.js
async function main() {
  console.log(`Generating ${ICONS.length} icons...`);
  console.log(`Output: ${OUT_DIR}\n`);

  // Generate 3 at a time to stay within rate limits
  for (let i = 0; i < ICONS.length; i += 3) {
    const batch = ICONS.slice(i, i + 3);
    await Promise.all(batch.map(item => generateImage(item).catch(() => {})));
    if (i + 3 < ICONS.length) {
      await new Promise(r => setTimeout(r, 1000)); // Brief pause between batches
    }
  }

  console.log('\nDone!');
  const files = fs.readdirSync(OUT_DIR).filter(f => f.endsWith('.png'));
  console.log(`Generated: ${files.length}/${ICONS.length} icons`);
}
getItemsByCategory function · javascript · L1020-L1022 (3 LOC)
tools/dashboard/public/js/data.js
function getItemsByCategory(category) {
  return CHECKLIST_ITEMS.filter(item => item.category === category);
}
getItemById function · javascript · L1025-L1027 (3 LOC)
tools/dashboard/public/js/data.js
function getItemById(id) {
  return CHECKLIST_ITEMS.find(item => item.id === id);
}
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
getCategoryColor function · javascript · L1030-L1032 (3 LOC)
tools/dashboard/public/js/data.js
function getCategoryColor(categoryId) {
  return CATEGORIES[categoryId]?.color || '#888';
}
getCategoryCompletion function · javascript · L1035-L1039 (5 LOC)
tools/dashboard/public/js/data.js
function getCategoryCompletion(categoryId, progress) {
  const items = getItemsByCategory(categoryId);
  const completed = items.filter(item => progress[item.id] === 'completed').length;
  return { completed, total: items.length };
}
getOverallCompletion function · javascript · L1042-L1045 (4 LOC)
tools/dashboard/public/js/data.js
function getOverallCompletion(progress) {
  const completed = CHECKLIST_ITEMS.filter(item => progress[item.id] === 'completed').length;
  return { completed, total: CHECKLIST_ITEMS.length };
}
getNextRecommended function · javascript · L1048-L1061 (14 LOC)
tools/dashboard/public/js/data.js
function getNextRecommended(progress) {
  // Priority order: critical > high > medium > low
  const priorityOrder = ['critical', 'high', 'medium', 'low'];
  for (const priority of priorityOrder) {
    const items = CHECKLIST_ITEMS.filter(item => {
      if (progress[item.id] === 'completed') return false;
      if (item.priority !== priority) return false;
      // Check prerequisites
      return item.prerequisites.every(prereq => progress[prereq] === 'completed');
    });
    if (items.length > 0) return items[0];
  }
  return null;
}
findBrainPath function · javascript · L15-L44 (30 LOC)
tools/dashboard/server.js
function findBrainPath() {
  if (process.env.BIZBRAIN_PATH && fs.existsSync(process.env.BIZBRAIN_PATH)) {
    return process.env.BIZBRAIN_PATH;
  }
  const home = os.homedir();
  const roots = [
    path.join(home, 'bizbrain-os'),
    path.join(home, 'Documents', 'bizbrain-os'),
  ];
  for (const root of roots) {
    if (!fs.existsSync(root)) continue;
    // Check for full mode (three-zone architecture)
    const rootMarker = path.join(root, '.bizbrain-root.json');
    if (fs.existsSync(rootMarker)) {
      try {
        const marker = JSON.parse(fs.readFileSync(rootMarker, 'utf8'));
        if (marker.mode === 'full') {
          const brainDir = path.join(root, marker.brainDir || 'brain');
          if (fs.existsSync(brainDir)) return brainDir;
        }
      } catch { /* fall through */ }
    }
    // Check for brain/ subdir with config (full mode without marker)
    const brainSubdir = path.join(root, 'brain');
    if (fs.existsSync(path.join(brainSubdir, 'config.json'))) return 
findBrainRoot function · javascript · L46-L53 (8 LOC)
tools/dashboard/server.js
function findBrainRoot() {
  const brainPath = findBrainPath();
  if (!brainPath) return null;
  // In full mode, root is parent of brain/
  const parent = path.dirname(brainPath);
  if (fs.existsSync(path.join(parent, '.bizbrain-root.json'))) return parent;
  return brainPath;
}
getBrainStats function · javascript · L55-L74 (20 LOC)
tools/dashboard/server.js
function getBrainStats(brainPath) {
  if (!brainPath) return null;
  const stats = { entities: 0, projects: 0, knowledge: 0, integrations: 0, sessions: 0 };
  try {
    const entitiesDir = path.join(brainPath, 'Entities');
    if (fs.existsSync(entitiesDir)) {
      for (const type of ['Clients', 'Partners', 'Vendors', 'People']) {
        const dir = path.join(entitiesDir, type);
        if (fs.existsSync(dir)) stats.entities += fs.readdirSync(dir).filter(f => !f.startsWith('.')).length;
      }
    }
    const projectsDir = path.join(brainPath, 'Projects');
    if (fs.existsSync(projectsDir)) stats.projects = fs.readdirSync(projectsDir).filter(f => !f.startsWith('.')).length;
    const knowledgeDir = path.join(brainPath, 'Knowledge');
    if (fs.existsSync(knowledgeDir)) stats.knowledge = countFiles(knowledgeDir);
    const credsDir = path.join(brainPath, 'Operations', 'credentials', 'vault');
    if (fs.existsSync(credsDir)) stats.integrations = fs.readdirSync(credsDir).filter(f => 
countFiles function · javascript · L76-L85 (10 LOC)
tools/dashboard/server.js
function countFiles(dir) {
  let count = 0;
  try {
    for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
      if (entry.isDirectory()) count += countFiles(path.join(dir, entry.name));
      else if (!entry.name.startsWith('.')) count++;
    }
  } catch (e) { /* ignore */ }
  return count;
}
All rows scored by the Repobility analyzer (https://repobility.com)
getProgressPath function · javascript · L87-L93 (7 LOC)
tools/dashboard/server.js
function getProgressPath(brainPath) {
  // Store progress at root level .bizbrain/ (works for both compact and full mode)
  const root = findBrainRoot() || brainPath;
  const dashDir = path.join(root, '.bizbrain', 'dashboard');
  if (!fs.existsSync(dashDir)) fs.mkdirSync(dashDir, { recursive: true });
  return path.join(dashDir, 'progress.json');
}
readProgress function · javascript · L95-L100 (6 LOC)
tools/dashboard/server.js
function readProgress(brainPath) {
  if (!brainPath) return {};
  const p = getProgressPath(brainPath);
  if (!fs.existsSync(p)) return {};
  try { return JSON.parse(fs.readFileSync(p, 'utf8')); } catch { return {}; }
}
writeProgress function · javascript · L102-L105 (4 LOC)
tools/dashboard/server.js
function writeProgress(brainPath, data) {
  if (!brainPath) return;
  fs.writeFileSync(getProgressPath(brainPath), JSON.stringify(data, null, 2));
}
BrainUpdater class · python · L16-L233 (218 LOC)
tools/meeting-transcriber/meeting_transcriber/brain_updater.py
class BrainUpdater:
    """Updates BB1 entity records and todos after meeting transcription.

    Immediate (keyword-based, no LLM):
        - update_entity_histories() — appends meeting entry to entity history files

    Deferred (called by skill after AI processing):
        - write_action_items() — routes extracted action items to entity/todo files
    """

    def __init__(self, brain_path: Path):
        self.brain_path = brain_path
        self._entity_map: dict[str, dict] | None = None

    def _load_entity_map(self) -> dict[str, dict]:
        """Parse ENTITY-INDEX.md into keyword → entity info lookup.

        Returns dict mapping lowercase keywords to:
            {"name": str, "type": str, "folder": str}
        """
        if self._entity_map is not None:
            return self._entity_map

        self._entity_map = {}
        index_path = self.brain_path / "Operations" / "entity-watchdog" / "ENTITY-INDEX.md"
        if not index_path.exists():
            return self._en
__init__ method · python · L26-L28 (3 LOC)
tools/meeting-transcriber/meeting_transcriber/brain_updater.py
    def __init__(self, brain_path: Path):
        self.brain_path = brain_path
        self._entity_map: dict[str, dict] | None = None
_load_entity_map method · python · L30-L84 (55 LOC)
tools/meeting-transcriber/meeting_transcriber/brain_updater.py
    def _load_entity_map(self) -> dict[str, dict]:
        """Parse ENTITY-INDEX.md into keyword → entity info lookup.

        Returns dict mapping lowercase keywords to:
            {"name": str, "type": str, "folder": str}
        """
        if self._entity_map is not None:
            return self._entity_map

        self._entity_map = {}
        index_path = self.brain_path / "Operations" / "entity-watchdog" / "ENTITY-INDEX.md"
        if not index_path.exists():
            return self._entity_map

        try:
            content = index_path.read_text(encoding="utf-8")
        except Exception:
            return self._entity_map

        for line in content.splitlines():
            if not line.startswith("|") or "---" in line:
                continue
            parts = [p.strip() for p in line.split("|")]
            if len(parts) < 4:
                continue

            name = parts[1]
            entity_type = parts[2].lower() if len(parts) > 2 else ""
            alia
_detect_entity_slugs method · python · L86-L105 (20 LOC)
tools/meeting-transcriber/meeting_transcriber/brain_updater.py
    def _detect_entity_slugs(
        self,
        segments: list[TranscriptSegment | SpeakerSegment],
    ) -> list[dict]:
        """Find entities mentioned in transcript segments.

        Returns list of entity info dicts for matched entities.
        """
        entity_map = self._load_entity_map()
        if not entity_map:
            return []

        all_text = " ".join(seg.text for seg in segments).lower()

        found: dict[str, dict] = {}  # entity name → info (deduplicated)
        for keyword, info in entity_map.items():
            if re.search(r"\b" + re.escape(keyword) + r"\b", all_text):
                found[info["name"]] = info

        return list(found.values())
update_entity_histories method · python · L107-L170 (64 LOC)
tools/meeting-transcriber/meeting_transcriber/brain_updater.py
    def update_entity_histories(
        self,
        meeting: MeetingInfo,
        segments: list[TranscriptSegment | SpeakerSegment],
    ) -> list[str]:
        """Append meeting reference to each detected entity's history file.

        This is keyword-based (no LLM needed) and runs immediately after transcription.
        Returns list of entity names that were updated.
        """
        detected = self._detect_entity_slugs(segments)
        if not detected:
            return []

        date_str = meeting.started_at.strftime("%Y-%m-%d")
        time_str = meeting.started_at.strftime("%H:%M")
        duration = f"{meeting.duration_minutes:.0f}" if meeting.ended_at else "ongoing"
        transcript_ref = f"Operations/meetings/transcripts/{date_str}-{meeting.slug}.md"

        entry = (
            f"\n### {date_str} — Meeting ({meeting.platform})\n"
            f"- **Time:** {time_str}\n"
            f"- **Duration:** {duration} min\n"
            f"- **Title:** {meeting.title}\
If a scraper extracted this row, it came from Repobility (https://repobility.com)
write_action_items method · python · L172-L233 (62 LOC)
tools/meeting-transcriber/meeting_transcriber/brain_updater.py
    def write_action_items(
        self,
        meeting: MeetingInfo,
        action_items: list[dict],
    ) -> int:
        """Route AI-extracted action items to entity files or operational todos.

        Called by the skill after Claude processes the transcript and extracts items.

        Each action_item dict should have:
            - "text": str — the action item description
            - "owner": str | None — assignee name (matched against entities)
            - "entity": str | None — related entity name

        Returns count of items written.
        """
        entity_map = self._load_entity_map()
        date_str = meeting.started_at.strftime("%Y-%m-%d")
        written = 0

        for item in action_items:
            text = item.get("text", "")
            entity_name = item.get("entity")
            target_path = None

            # Try to route to entity action-items file
            if entity_name and entity_name.lower() in entity_map:
                info = entit
find_brain_path function · python · L12-L26 (15 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def find_brain_path() -> Path | None:
    """Locate the brain folder."""
    # BIZBRAIN_PATH env
    env_path = os.environ.get("BIZBRAIN_PATH")
    if env_path:
        p = Path(env_path)
        if p.exists():
            return p

    # ~/bizbrain-os/
    home = Path.home() / "bizbrain-os"
    if home.exists():
        return home

    return None
cmd_daemon function · python · L29-L80 (52 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_daemon(args: list[str]) -> None:
    """Start the meeting transcription daemon."""
    brain_path = find_brain_path()
    if not brain_path:
        print("Error: No brain folder found. Set BIZBRAIN_PATH or run /brain setup.")
        sys.exit(1)

    model = "base"
    language = None
    diarize = False
    hf_token = os.environ.get("HF_TOKEN")
    audio_retention_days = None  # Keep forever by default

    # Parse flags
    i = 0
    while i < len(args):
        if args[i] in ("--model", "-m") and i + 1 < len(args):
            model = args[i + 1]
            i += 2
        elif args[i] in ("--language", "-l") and i + 1 < len(args):
            language = args[i + 1]
            i += 2
        elif args[i] == "--diarize":
            diarize = True
            i += 1
        elif args[i] == "--hf-token" and i + 1 < len(args):
            hf_token = args[i + 1]
            i += 2
        elif args[i] == "--keep-audio":
            audio_retention_days = None
            i += 
cmd_transcribe function · python · L83-L109 (27 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_transcribe(args: list[str]) -> None:
    """Transcribe a specific audio file."""
    if not args:
        print("Usage: bizbrain-meetings transcribe <audio-file> [--model base]")
        sys.exit(1)

    audio_path = Path(args[0])
    if not audio_path.exists():
        print(f"Error: File not found: {audio_path}")
        sys.exit(1)

    model = "base"
    for i, arg in enumerate(args[1:], 1):
        if arg in ("--model", "-m") and i + 1 < len(args):
            model = args[i + 1]

    from .transcriber import WhisperTranscriber

    transcriber = WhisperTranscriber(model_size=model)
    segments = transcriber.transcribe(audio_path)

    for seg in segments:
        h = int(seg.start // 3600)
        m = int((seg.start % 3600) // 60)
        s = int(seg.start % 60)
        ts = f"{h:02d}:{m:02d}:{s:02d}" if h else f"{m:02d}:{s:02d}"
        print(f"[{ts}] {seg.text}")
cmd_status function · python · L112-L137 (26 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_status(args: list[str]) -> None:
    """Show daemon status."""
    brain_path = find_brain_path()
    if not brain_path:
        print("No brain folder found.")
        return

    status_file = brain_path / ".bizbrain" / "meeting-daemon-status.json"
    if not status_file.exists():
        print("Meeting daemon has not been run yet.")
        return

    data = json.loads(status_file.read_text())
    print(f"Running: {data.get('running', False)}")
    print(f"PID: {data.get('pid', 'N/A')}")
    print(f"Meeting active: {data.get('meeting_active', False)}")
    print(f"Chunks recorded: {data.get('chunks_recorded', 0)}")
    print(f"Chunks transcribed: {data.get('chunks_transcribed', 0)}")
    print(f"Last check: {data.get('last_check', 'N/A')}")

    if data.get("current_meeting"):
        m = data["current_meeting"]
        print(f"\nCurrent meeting:")
        print(f"  Platform: {m.get('platform')}")
        print(f"  Title: {m.get('title')}")
        print(f"  Started: {m.get
cmd_stop function · python · L140-L162 (23 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_stop(args: list[str]) -> None:
    """Stop the running daemon."""
    brain_path = find_brain_path()
    if not brain_path:
        print("No brain folder found.")
        return

    pid_file = brain_path / ".bizbrain" / "meeting-daemon.pid"
    if not pid_file.exists():
        print("No daemon PID file found.")
        return

    try:
        pid = int(pid_file.read_text().strip())
        import psutil
        proc = psutil.Process(pid)
        proc.terminate()
        proc.wait(timeout=10)
        print(f"Daemon (PID {pid}) stopped.")
    except Exception as e:
        print(f"Error stopping daemon: {e}")
        if pid_file.exists():
            pid_file.unlink()
cmd_install function · python · L165-L270 (106 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_install(args: list[str]) -> None:
    """Auto-install the meeting transcriber package with platform deps."""
    current_platform = platform.system()
    print(f"BizBrain Meetings — Auto-Install ({current_platform})\n")

    # 1. Find the package directory via __file__
    package_dir = Path(__file__).resolve().parent.parent
    pyproject = package_dir / "pyproject.toml"
    if not pyproject.exists():
        print(f"Error: Could not find pyproject.toml at {package_dir}")
        print("Expected the package directory to be the parent of meeting_transcriber/")
        sys.exit(1)

    print(f"Package: {package_dir}")

    # 2. Determine platform extras
    if current_platform == "Windows":
        extras = "windows"
    elif current_platform == "Darwin":
        extras = "macos"
    else:
        extras = ""

    # 3. Find installer (uv preferred, pip fallback)
    import shutil

    uv_path = shutil.which("uv")
    pip_path = shutil.which("pip") or shutil.which("pip3")

    if 
cmd_setup function · python · L273-L331 (59 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def cmd_setup(args: list[str]) -> None:
    """Check prerequisites and show platform-specific setup instructions."""
    current_platform = platform.system()
    print(f"BizBrain Meetings — Setup Check ({current_platform})\n")

    # Check brain
    brain = find_brain_path()
    print(f"Brain folder: {brain or 'NOT FOUND'}")

    # Core dependencies (cross-platform)
    print("\nCore dependencies:")
    core_deps = {
        "faster_whisper": "faster-whisper",
        "sounddevice": "sounddevice",
        "numpy": "numpy",
        "psutil": "psutil",
    }

    missing = []
    for module, package in core_deps.items():
        try:
            __import__(module)
            print(f"  {package}: installed")
        except ImportError:
            print(f"  {package}: MISSING")
            missing.append(package)

    # Platform-specific dependencies
    print(f"\nPlatform ({current_platform}):")
    if current_platform == "Windows":
        _check_windows_deps(missing)
    elif current_
Source: Repobility analyzer · https://repobility.com
_check_windows_deps function · python · L334-L343 (10 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def _check_windows_deps(missing: list[str]) -> None:
    """Check Windows-specific dependencies."""
    try:
        import pyaudiowpatch
        print("  pyaudiowpatch (WASAPI): installed")
    except ImportError:
        print("  pyaudiowpatch (WASAPI): MISSING")
        missing.append("pyaudiowpatch")

    print("  Audio setup: No additional setup needed (WASAPI built into Windows)")
_check_macos_deps function · python · L346-L404 (59 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def _check_macos_deps(missing: list[str]) -> None:
    """Check macOS-specific dependencies and audio configuration."""
    # Check pyobjc-framework-Quartz (optional but recommended)
    try:
        from Quartz import CGWindowListCopyWindowInfo
        print("  pyobjc-framework-Quartz: installed (window title detection)")
    except ImportError:
        print("  pyobjc-framework-Quartz: not installed (optional, improves meeting detection)")

    # Check BlackHole virtual audio device
    print("\n  Audio setup (BlackHole):")
    blackhole_found = False
    try:
        import sounddevice as sd
        devices = sd.query_devices()
        for dev in devices:
            if "blackhole" in dev["name"].lower() and dev["max_input_channels"] > 0:
                blackhole_found = True
                print(f"    BlackHole device: FOUND — {dev['name']}")
                break
    except Exception:
        pass

    if not blackhole_found:
        print("    BlackHole device: NOT FOUND")
    
main function · python · L417-L441 (25 LOC)
tools/meeting-transcriber/meeting_transcriber/cli.py
def main() -> None:
    if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"):
        print("Usage: bizbrain-meetings <command> [args]")
        print("\nCommands:")
        print("  daemon      Start the meeting transcription daemon")
        print("  transcribe  Transcribe a specific audio file")
        print("  status      Show daemon status")
        print("  stop        Stop the running daemon")
        print("  setup       Check prerequisites and show setup info")
        print("  install     Auto-install package with platform dependencies")
        print("\nDaemon flags:")
        print("  --model tiny|base|small|medium|large-v3  Whisper model (default: base)")
        print("  --language en                            Force language (default: auto)")
        print("  --diarize                                Enable speaker diarization")
        print("  --keep-audio                             Keep recordings forever (default)")
        print("  --delete-audio-after N        
MeetingDaemon class · python · L21-L274 (254 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
class MeetingDaemon:
    """Main orchestrator: polls for meetings, records, transcribes, saves.

    Lifecycle:
        1. Poll detector every POLL_INTERVAL_SEC
        2. When meeting detected → start loopback recorder
        3. When meeting ends → stop recorder → transcribe chunks → save to brain
        4. Optionally clean up old audio files based on retention policy
    """

    def __init__(
        self,
        brain_path: Path,
        model_size: str = "base",
        language: str | None = None,
        diarize: bool = False,
        hf_token: str | None = None,
        audio_retention_days: int | None = None,
    ):
        self.brain_path = brain_path
        self.model_size = model_size
        self.language = language
        self.diarize = diarize
        self.hf_token = hf_token
        self.audio_retention_days = audio_retention_days  # None = keep forever

        self._bizbrain_dir = brain_path / ".bizbrain"
        self._pid_file = self._bizbrain_dir / "meeting-dae
__init__ method · python · L31-L54 (24 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def __init__(
        self,
        brain_path: Path,
        model_size: str = "base",
        language: str | None = None,
        diarize: bool = False,
        hf_token: str | None = None,
        audio_retention_days: int | None = None,
    ):
        self.brain_path = brain_path
        self.model_size = model_size
        self.language = language
        self.diarize = diarize
        self.hf_token = hf_token
        self.audio_retention_days = audio_retention_days  # None = keep forever

        self._bizbrain_dir = brain_path / ".bizbrain"
        self._pid_file = self._bizbrain_dir / "meeting-daemon.pid"
        self._status_file = self._bizbrain_dir / "meeting-daemon-status.json"
        self._audio_dir = brain_path / "Operations" / "meetings" / "_audio"
        self._recordings_dir = brain_path / "Operations" / "meetings" / "recordings"
        self._running = False
        self._current_meeting: MeetingInfo | None = None
        self._recorder: LoopbackRecorder | None 
start method · python · L56-L95 (40 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def start(self) -> None:
        """Start the daemon. Writes PID file and enters main loop."""
        self._bizbrain_dir.mkdir(parents=True, exist_ok=True)
        self._audio_dir.mkdir(parents=True, exist_ok=True)
        self._recordings_dir.mkdir(parents=True, exist_ok=True)

        # Check for existing daemon
        if self._pid_file.exists():
            try:
                existing_pid = int(self._pid_file.read_text().strip())
                import psutil
                if psutil.pid_exists(existing_pid):
                    print(f"Daemon already running (PID {existing_pid})")
                    sys.exit(1)
            except (ValueError, ImportError):
                pass

        # Write PID
        self._pid_file.write_text(str(os.getpid()))
        self._running = True

        # Handle graceful shutdown
        signal.signal(signal.SIGTERM, self._handle_signal)
        signal.signal(signal.SIGINT, self._handle_signal)

        retention_msg = (
            f"dele
stop method · python · L97-L99 (3 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def stop(self) -> None:
        """Signal the daemon to stop."""
        self._running = False
_main_loop method · python · L101-L119 (19 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def _main_loop(self) -> None:
        while self._running:
            try:
                if self._current_meeting:
                    # Meeting in progress — check if it ended
                    detected = detect_meeting()
                    if detected is None:
                        self._on_meeting_end()
                    else:
                        self._update_status(meeting_active=True)
                else:
                    # No meeting — poll for one
                    detected = detect_meeting()
                    if detected is not None:
                        self._on_meeting_start(detected)
            except Exception as e:
                print(f"Error in main loop: {e}")

            time.sleep(POLL_INTERVAL_SEC)
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
_on_meeting_start method · python · L121-L139 (19 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def _on_meeting_start(self, detected: DetectedMeeting) -> None:
        """Called when a new meeting is detected."""
        now = datetime.now()
        self._current_meeting = MeetingInfo(
            platform=detected.platform,
            title=detected.window_title or detected.platform,
            started_at=now,
            process_name=detected.process_name,
            window_title=detected.window_title,
        )

        # Start recording to a session-specific audio directory
        session_dir = self._audio_dir / now.strftime("%Y-%m-%d_%H%M%S")
        self._recorder = LoopbackRecorder(session_dir)
        self._recorder.start()

        print(f"\nMeeting detected: {detected.platform} — {detected.window_title}")
        print(f"Recording to: {session_dir}")
        self._update_status(meeting_active=True)
_on_meeting_end method · python · L141-L209 (69 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def _on_meeting_end(self) -> None:
        """Called when the active meeting ends."""
        if not self._current_meeting or not self._recorder:
            return

        self._current_meeting.ended_at = datetime.now()
        print(f"\nMeeting ended ({self._current_meeting.duration_minutes:.0f} min)")

        # Stop recording
        chunk_paths = self._recorder.stop()
        self._current_meeting.audio_chunks = chunk_paths
        print(f"Recorded {len(chunk_paths)} audio chunk(s)")

        if not chunk_paths:
            print("No audio recorded — skipping transcription")
            self._current_meeting = None
            self._recorder = None
            self._update_status(meeting_active=False)
            return

        # Stitch chunks into a single permanent recording
        recording_path = self._stitch_recording(chunk_paths)
        if recording_path:
            self._current_meeting.recording_path = recording_path
            print(f"Recording saved: {recording
_stitch_recording method · python · L211-L231 (21 LOC)
tools/meeting-transcriber/meeting_transcriber/daemon.py
    def _stitch_recording(self, chunk_paths: list[Path]) -> Path | None:
        """Stitch audio chunks into a single clean WAV file for permanent storage."""
        if not chunk_paths:
            return None

        date_str = self._current_meeting.started_at.strftime("%Y-%m-%d")
        slug = self._current_meeting.slug
        recording_path = self._recordings_dir / f"{date_str}-{slug}.wav"

        try:
            from .diarizer import _stitch_wav_files
            _stitch_wav_files(chunk_paths, recording_path)
            return recording_path
        except Exception:
            # Fallback: manual stitching without diarizer import
            try:
                _stitch_wav_chunks(chunk_paths, recording_path)
                return recording_path
            except Exception as e:
                print(f"Warning: Could not stitch recording: {e}")
                return None
page 1 / 3next ›