← back to internet-dot__multi-agent-rockman

Function bodies 56 total

All specs Real LLM only Function bodies
Screen class · kotlin · L49-L54 (6 LOC)
android/app/src/main/java/com/shogun/android/MainActivity.kt
sealed class Screen(val route: String, val label: String, val icon: ImageVector) {
    object Shogun : Screen("shogun", "将軍", Icons.Default.Star)
    object Agents : Screen("agents", "エージェント", Icons.Default.List)
    object Dashboard : Screen("dashboard", "戦況", Icons.Default.Home)
    object Settings : Screen("settings", "設定", Icons.Default.Settings)
}
MainActivity class · kotlin · L63-L144 (82 LOC)
android/app/src/main/java/com/shogun/android/MainActivity.kt
class MainActivity : ComponentActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        enableEdgeToEdge()
        NotificationHelper.initChannels(this)
        setContent {
            ShogunTheme {
                ShogunApp()
            }
        }
        handleShareIntent(intent)
        // Only start NtfyService if notification permission is granted (Android 13+)
        val hasNotifPerm = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
            ContextCompat.checkSelfPermission(this, Manifest.permission.POST_NOTIFICATIONS) ==
                PackageManager.PERMISSION_GRANTED
        } else true
        if (hasNotifPerm && getSharedPreferences(PrefsKeys.PREFS_NAME, MODE_PRIVATE)
                .getBoolean(PrefsKeys.NOTIFICATION_ENABLED, true)) {
            try {
                startForegroundService(Intent(this, NtfyService::class.java))
            } catch (_: Exception) {
                // Foregr
NotificationHelper class · kotlin · L11-L102 (92 LOC)
android/app/src/main/java/com/shogun/android/NotificationHelper.kt
object NotificationHelper {

    private const val CH_CMD_COMPLETE = "cmd_complete"
    private const val CH_CMD_FAILURE = "cmd_failure"
    private const val CH_ACTION_REQUIRED = "action_required"
    private const val CH_DASHBOARD_UPDATE = "dashboard_update"
    private const val CH_STREAK_UPDATE = "streak_update"
    private const val CH_AGENT_RESPONSE = "agent_response"

    fun initChannels(context: Context) {
        val nm = context.getSystemService(NotificationManager::class.java)
        val channels = listOf(
            NotificationChannel(CH_CMD_COMPLETE, "タスク完了", NotificationManager.IMPORTANCE_DEFAULT).apply {
                enableVibration(true)
                vibrationPattern = longArrayOf(0, 200)
            },
            NotificationChannel(CH_CMD_FAILURE, "タスク失敗", NotificationManager.IMPORTANCE_HIGH).apply {
                enableVibration(true)
                vibrationPattern = longArrayOf(0, 300, 100, 300)
            },
            NotificationChannel(CH_ACTION
NtfyService class · kotlin · L27-L162 (136 LOC)
android/app/src/main/java/com/shogun/android/NtfyService.kt
class NtfyService : Service() {

    private val client = OkHttpClient.Builder()
        .pingInterval(30, TimeUnit.SECONDS)
        .build()

    private var webSocket: WebSocket? = null
    private var lastReceivedId: String = ""
    private var backoffIndex = 0
    private val reconnectHandler = Handler(Looper.getMainLooper())
    private var reconnectRunnable: Runnable? = null
    private lateinit var connectivityManager: ConnectivityManager
    private var networkCallback: ConnectivityManager.NetworkCallback? = null

    companion object {
        private const val CHANNEL_ID = "ntfy_service"
        private const val NOTIFICATION_ID = 2
        private val TOPIC = Defaults.NTFY_TOPIC
        private val BACKOFF_DELAYS = longArrayOf(5_000L, 10_000L, 30_000L, 60_000L)
    }

    override fun onCreate() {
        super.onCreate()
        val prefs = getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)
        if (!prefs.getBoolean(PrefsKeys.NOTIFICATION_ENABLED, true)) {
Companion class · kotlin · L41-L46 (6 LOC)
android/app/src/main/java/com/shogun/android/NtfyService.kt
    companion object {
        private const val CHANNEL_ID = "ntfy_service"
        private const val NOTIFICATION_ID = 2
        private val TOPIC = Defaults.NTFY_TOPIC
        private val BACKOFF_DELAYS = longArrayOf(5_000L, 10_000L, 30_000L, 60_000L)
    }
NtfyWebSocketListener class · kotlin · L131-L161 (31 LOC)
android/app/src/main/java/com/shogun/android/NtfyService.kt
    inner class NtfyWebSocketListener : WebSocketListener() {
        override fun onMessage(webSocket: WebSocket, text: String) {
            try {
                val json = JSONObject(text)
                if (json.optString("event") != "message") return

                lastReceivedId = json.optString("id", lastReceivedId)
                backoffIndex = 0

                val title = json.optString("title", "")
                val message = json.optString("message", "")
                val tagsArray = json.optJSONArray("tags")
                val tags = buildList {
                    if (tagsArray != null) {
                        for (i in 0 until tagsArray.length()) add(tagsArray.getString(i))
                    }
                }
                NotificationHelper.showNotification(this@NtfyService, message, tags, title)
            } catch (_: Exception) {
                // Malformed JSON — ignore
            }
        }

        override fun onFailure(webSocket: WebSocket,
SshForegroundService class · kotlin · L12-L66 (55 LOC)
android/app/src/main/java/com/shogun/android/SshForegroundService.kt
class SshForegroundService : Service() {

    private val binder = SshBinder()

    inner class SshBinder : Binder() {
        fun getService(): SshForegroundService = this@SshForegroundService
    }

    override fun onCreate() {
        super.onCreate()
        createNotificationChannel()
        startForeground(NOTIFICATION_ID, buildNotification())
    }

    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        return START_STICKY
    }

    override fun onBind(intent: Intent): IBinder = binder

    override fun onDestroy() {
        super.onDestroy()
    }

    private fun createNotificationChannel() {
        val channel = NotificationChannel(
            CHANNEL_ID,
            "SSH接続",
            NotificationManager.IMPORTANCE_LOW
        ).apply {
            description = "将軍セッションのSSH接続を維持します"
        }
        getSystemService(NotificationManager::class.java).createNotificationChannel(channel)
    }

    private fun buildNotification(): Notif
Repobility · open methodology · https://repobility.com/research/
SshBinder class · kotlin · L16-L18 (3 LOC)
android/app/src/main/java/com/shogun/android/SshForegroundService.kt
    inner class SshBinder : Binder() {
        fun getService(): SshForegroundService = this@SshForegroundService
    }
Companion class · kotlin · L62-L65 (4 LOC)
android/app/src/main/java/com/shogun/android/SshForegroundService.kt
    companion object {
        private const val CHANNEL_ID = "ssh_connection"
        private const val NOTIFICATION_ID = 1
    }
SshManager class · kotlin · L21-L295 (275 LOC)
android/app/src/main/java/com/shogun/android/ssh/SshManager.kt
class SshManager private constructor() {

    companion object {
        @Volatile private var INSTANCE: SshManager? = null
        fun getInstance(): SshManager = INSTANCE ?: synchronized(this) {
            INSTANCE ?: SshManager().also { INSTANCE = it }
        }
    }

    @Volatile private var session: Session? = null

    // Mutex serializes ALL SSH operations (exec, reconnect, connect).
    // Prevents race condition where multiple ViewModels' concurrent reconnects
    // kill each other's newly-created sessions.
    private val sshMutex = Mutex()

    // Stored for reconnect
    private var lastHost = ""
    private var lastPort = 22
    private var lastUser = ""
    private var lastKeyPath = ""
    private var lastPassword = ""

    var disconnectCallback: (() -> Unit)? = null

    suspend fun connect(
        host: String,
        port: Int,
        user: String,
        privateKeyPath: String,
        password: String = "",
        onOutput: ((String) -> Unit)? = null,
     
Companion class · kotlin · L23-L28 (6 LOC)
android/app/src/main/java/com/shogun/android/ssh/SshManager.kt
    companion object {
        @Volatile private var INSTANCE: SshManager? = null
        fun getInstance(): SshManager = INSTANCE ?: synchronized(this) {
            INSTANCE ?: SshManager().also { INSTANCE = it }
        }
    }
AppLogger class · kotlin · L8-L25 (18 LOC)
android/app/src/main/java/com/shogun/android/util/AppLogger.kt
object AppLogger {
    private val entries = CopyOnWriteArrayList<String>()
    private const val MAX_ENTRIES = 200
    private val fmt = SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault())

    fun log(tag: String, message: String) {
        val ts = fmt.format(Date())
        val entry = "$ts [$tag] $message"
        entries.add(entry)
        while (entries.size > MAX_ENTRIES) {
            entries.removeAt(0)
        }
    }

    fun getEntries(): List<String> = entries.toList()

    fun clear() = entries.clear()
}
PrefsKeys class · kotlin · L4-L22 (19 LOC)
android/app/src/main/java/com/shogun/android/util/Constants.kt
object PrefsKeys {
    const val PREFS_NAME = "shogun_prefs"
    const val SSH_HOST = "ssh_host"
    const val SSH_PORT = "ssh_port"
    const val SSH_USER = "ssh_user"
    const val SSH_KEY_PATH = "ssh_key_path"
    const val SSH_PASSWORD = "ssh_password"
    const val PROJECT_PATH = "project_path"
    const val SHOGUN_SESSION = "shogun_session"
    const val AGENTS_SESSION = "agents_session"
    const val NOTIFICATION_ENABLED = "notification_enabled"
    const val NTFY_TOPIC = "ntfy_topic"
    const val NOTIFY_CMD_COMPLETE = "notify_cmd_complete"
    const val NOTIFY_CMD_FAILURE = "notify_cmd_failure"
    const val NOTIFY_ACTION_REQUIRED = "notify_action_required"
    const val NOTIFY_DASHBOARD_UPDATE = "notify_dashboard_update"
    const val NOTIFY_STREAK_UPDATE = "notify_streak_update"
    const val NOTIFY_AGENT_RESPONSE = "notify_agent_response"
}
Defaults class · kotlin · L24-L32 (9 LOC)
android/app/src/main/java/com/shogun/android/util/Constants.kt
object Defaults {
    const val SSH_HOST = "192.168.1.1"
    const val SSH_PORT = 22
    const val SSH_PORT_STR = "22"
    const val SHOGUN_SESSION = "shogun"
    const val AGENTS_SESSION = "multiagent"
    const val NTFY_TOPIC = "sho-y0uhey"
    const val TMUX = "/usr/bin/tmux"
}
AgentsViewModel class · kotlin · L24-L198 (175 LOC)
android/app/src/main/java/com/shogun/android/viewmodel/AgentsViewModel.kt
class AgentsViewModel(application: Application) : AndroidViewModel(application) {

    private val sshManager = SshManager.getInstance()
    private val prefs = application.getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)

    private val _panes = MutableStateFlow<List<PaneInfo>>(emptyList())
    val panes: StateFlow<List<PaneInfo>> = _panes

    private val _isConnected = MutableStateFlow(false)
    val isConnected: StateFlow<Boolean> = _isConnected

    private val _errorMessage = MutableStateFlow<String?>(null)
    val errorMessage: StateFlow<String?> = _errorMessage

    private val _rateLimitResult = MutableStateFlow<String?>(null)
    val rateLimitResult: StateFlow<String?> = _rateLimitResult

    private val _rateLimitLoading = MutableStateFlow(false)
    val rateLimitLoading: StateFlow<Boolean> = _rateLimitLoading

    private var refreshJob: Job? = null
    @Volatile private var paused = false
    @Volatile private var isRefreshing = false

    private fun agen
Want this analysis on your repo? https://repobility.com/scan/
DashboardViewModel class · kotlin · L13-L67 (55 LOC)
android/app/src/main/java/com/shogun/android/viewmodel/DashboardViewModel.kt
class DashboardViewModel(application: Application) : AndroidViewModel(application) {

    private val sshManager = SshManager.getInstance()
    private val prefs = application.getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)

    private val _markdownContent = MutableStateFlow("")
    val markdownContent: StateFlow<String> = _markdownContent

    private val _isConnected = MutableStateFlow(false)
    val isConnected: StateFlow<Boolean> = _isConnected

    private val _isLoading = MutableStateFlow(false)
    val isLoading: StateFlow<Boolean> = _isLoading

    private val _errorMessage = MutableStateFlow<String?>(null)
    val errorMessage: StateFlow<String?> = _errorMessage

    fun connect(host: String, port: Int, user: String, keyPath: String, password: String = "") {
        viewModelScope.launch {
            val result = sshManager.connect(host, port, user, keyPath, password)
            if (result.isSuccess) {
                _isConnected.value = true
             
SettingsViewModel class · kotlin · L11-L78 (68 LOC)
android/app/src/main/java/com/shogun/android/viewmodel/SettingsViewModel.kt
class SettingsViewModel(application: Application) : AndroidViewModel(application) {

    private val prefs = application.getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)

    private val _notificationEnabled = MutableStateFlow(prefs.getBoolean(PrefsKeys.NOTIFICATION_ENABLED, true))
    val notificationEnabled: StateFlow<Boolean> = _notificationEnabled

    private val _ntfyTopic = MutableStateFlow(prefs.getString(PrefsKeys.NTFY_TOPIC, Defaults.NTFY_TOPIC) ?: Defaults.NTFY_TOPIC)
    val ntfyTopic: StateFlow<String> = _ntfyTopic

    private val _notifyCmdComplete = MutableStateFlow(prefs.getBoolean(PrefsKeys.NOTIFY_CMD_COMPLETE, true))
    val notifyCmdComplete: StateFlow<Boolean> = _notifyCmdComplete

    private val _notifyCmdFailure = MutableStateFlow(prefs.getBoolean(PrefsKeys.NOTIFY_CMD_FAILURE, true))
    val notifyCmdFailure: StateFlow<Boolean> = _notifyCmdFailure

    private val _notifyActionRequired = MutableStateFlow(prefs.getBoolean(PrefsKeys.NOTIFY_ACTION_R
ShogunViewModel class · kotlin · L19-L153 (135 LOC)
android/app/src/main/java/com/shogun/android/viewmodel/ShogunViewModel.kt
class ShogunViewModel(application: Application) : AndroidViewModel(application) {

    private val sshManager = SshManager.getInstance()
    private val prefs = application.getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)

    private val _paneContent = MutableStateFlow("")
    val paneContent: StateFlow<String> = _paneContent

    private val _isConnected = MutableStateFlow(false)
    val isConnected: StateFlow<Boolean> = _isConnected

    private val _errorMessage = MutableStateFlow<String?>(null)
    val errorMessage: StateFlow<String?> = _errorMessage

    private var refreshJob: Job? = null
    private var reconnectJob: Job? = null
    @Volatile private var paused = false

    private fun tmuxTarget(): String {
        val session = prefs.getString(PrefsKeys.SHOGUN_SESSION, Defaults.SHOGUN_SESSION) ?: Defaults.SHOGUN_SESSION
        return "$session:main"
    }

    fun pauseRefresh() { paused = true }
    fun resumeRefresh() {
        paused = false
        viewMo
parse_frontmatter function · python · L40-L59 (20 LOC)
scripts/seo_qc.py
def parse_frontmatter(content: str) -> tuple[dict, str]:
    """Parse YAML frontmatter from markdown content. Returns (frontmatter_dict, body)."""
    if not content.startswith("---"):
        return {}, content

    end = content.find("---", 3)
    if end == -1:
        return {}, content

    fm_str = content[3:end].strip()
    body = content[end + 3:].strip()

    try:
        fm = yaml.safe_load(fm_str)
        if not isinstance(fm, dict):
            fm = {}
    except yaml.YAMLError:
        fm = {}

    return fm, body
strip_html_tags function · python · L62-L64 (3 LOC)
scripts/seo_qc.py
def strip_html_tags(text: str) -> str:
    """Remove HTML tags from text."""
    return re.sub(r"<[^>]+>", "", text)
count_japanese_chars function · python · L67-L77 (11 LOC)
scripts/seo_qc.py
def count_japanese_chars(text: str) -> int:
    """Count meaningful characters (Japanese + alphanumeric, excluding whitespace and markdown)."""
    # Remove HTML tags
    text = strip_html_tags(text)
    # Remove markdown link syntax
    text = re.sub(r"\[([^\]]*)\]\([^)]*\)", r"\1", text)
    # Remove markdown formatting
    text = re.sub(r"[#*_`|>-]", "", text)
    # Remove whitespace and newlines
    text = re.sub(r"\s+", "", text)
    return len(text)
find_h2_sections function · python · L80-L99 (20 LOC)
scripts/seo_qc.py
def find_h2_sections(body: str) -> list[tuple[str, str]]:
    """Find H2 sections and their content. Returns list of (heading, section_content)."""
    lines = body.split("\n")
    sections = []
    current_heading = None
    current_lines = []

    for line in lines:
        if line.startswith("## "):
            if current_heading is not None:
                sections.append((current_heading, "\n".join(current_lines)))
            current_heading = line
            current_lines = []
        elif current_heading is not None:
            current_lines.append(line)

    if current_heading is not None:
        sections.append((current_heading, "\n".join(current_lines)))

    return sections
check_001_frontmatter_fields function · python · L102-L107 (6 LOC)
scripts/seo_qc.py
def check_001_frontmatter_fields(fm: dict) -> tuple[bool, str]:
    """Check that all 7 required frontmatter fields exist."""
    missing = [f for f in REQUIRED_FIELDS if f not in fm]
    if missing:
        return False, f"missing: {', '.join(missing)}"
    return True, ""
Repobility (the analyzer behind this table) · https://repobility.com
check_002_frontmatter_types function · python · L110-L144 (35 LOC)
scripts/seo_qc.py
def check_002_frontmatter_types(fm: dict) -> tuple[bool, str]:
    """Check frontmatter field types and values."""
    issues = []

    if "title" in fm and (not isinstance(fm["title"], str) or not fm["title"].strip()):
        issues.append("title empty or not string")

    if "description" in fm and (not isinstance(fm["description"], str) or not fm["description"].strip()):
        issues.append("description empty or not string")

    if "publishedAt" in fm:
        val = str(fm["publishedAt"])
        if not DATE_PATTERN.match(val):
            issues.append(f"publishedAt format: {val}")

    if "category" in fm and fm["category"] != "area":
        # Allow other categories like "ranking" etc.
        pass

    if "area" in fm and (not isinstance(fm["area"], str) or not fm["area"].strip()):
        issues.append("area empty or not string")

    if "keyword" in fm:
        if not isinstance(fm["keyword"], str) or not fm["keyword"].strip():
            issues.append("keyword empty or n
check_003_pr_notation function · python · L147-L153 (7 LOC)
scripts/seo_qc.py
def check_003_pr_notation(body: str) -> tuple[bool, str]:
    """Check for PR/affiliate disclosure within first 50 lines."""
    lines = body.split("\n")[:50]
    text = "\n".join(lines)
    if "アフィリエイト広告" in text or "PR" in text:
        return True, ""
    return False, "PR notation not found in first 50 lines"
check_004_cta_count function · python · L156-L164 (9 LOC)
scripts/seo_qc.py
def check_004_cta_count(body: str) -> tuple[bool, str]:
    """Check that exactly 3 CTA boxes exist."""
    count = body.count('<div class="cta-box">')
    # Also check for <!-- CTA: --> comments (rehype-affiliate-cta pattern)
    cta_comments = len(re.findall(r"<!--\s*CTA:", body))
    total = count + cta_comments
    if total == 3:
        return True, ""
    return False, f"CTA count: {total} (div: {count}, comment: {cta_comments})"
check_005_cta_structure function · python · L167-L191 (25 LOC)
scripts/seo_qc.py
def check_005_cta_structure(body: str) -> tuple[bool, str]:
    """Check CTA box HTML structure."""
    # Find all cta-box divs
    cta_blocks = re.findall(
        r'<div class="cta-box">.*?</div>\s*</div>',
        body,
        re.DOTALL
    )

    # Also count CTA comments (these are valid - replaced at build time)
    cta_comments = len(re.findall(r"<!--\s*CTA:", body))

    if not cta_blocks and cta_comments == 0:
        return False, "no CTA found"

    issues = []
    for i, block in enumerate(cta_blocks):
        if 'cta-badge' not in block and 'cta-button' not in block:
            issues.append(f"CTA#{i+1}: missing badge or button")
        if 'nofollow' not in block or 'sponsored' not in block:
            issues.append(f"CTA#{i+1}: missing nofollow/sponsored")

    if issues:
        return False, "; ".join(issues)
    return True, ""
check_006_h2_count function · python · L194-L200 (7 LOC)
scripts/seo_qc.py
def check_006_h2_count(body: str) -> tuple[bool, str]:
    """Check that there are exactly 5 H2 headings."""
    h2s = re.findall(r"^## .+", body, re.MULTILINE)
    count = len(h2s)
    if count == 5:
        return True, ""
    return False, f"H2 count: {count}"
check_007_faq_questions function · python · L203-L225 (23 LOC)
scripts/seo_qc.py
def check_007_faq_questions(body: str) -> tuple[bool, str]:
    """Check that FAQ section has 5 Q&A items."""
    sections = find_h2_sections(body)
    # FAQ is typically the 4th H2 section
    faq_content = ""
    for heading, content in sections:
        if "FAQ" in heading or "よくある質問" in heading:
            faq_content = content
            break

    # If no explicit FAQ heading, use 4th section
    if not faq_content and len(sections) >= 4:
        faq_content = sections[3][1]

    if not faq_content:
        return False, "FAQ section not found"

    # Count ### or #### headings in FAQ section
    questions = re.findall(r"^#{3,4} .+", faq_content, re.MULTILINE)
    count = len(questions)
    if count >= 5:
        return True, ""
    return False, f"FAQ questions: {count}"
check_008_char_count function · python · L228-L233 (6 LOC)
scripts/seo_qc.py
def check_008_char_count(body: str) -> tuple[bool, str]:
    """Check that body has >= 2500 Japanese characters."""
    char_count = count_japanese_chars(body)
    if char_count >= 2500:
        return True, ""
    return False, f"chars: {char_count}"
check_009_forbidden_words function · python · L236-L246 (11 LOC)
scripts/seo_qc.py
def check_009_forbidden_words(body: str) -> tuple[bool, list]:
    """Check for forbidden words."""
    matches = FORBIDDEN_PATTERN.findall(body)
    if not matches:
        return True, []
    # Count occurrences
    word_counts = defaultdict(int)
    for m in matches:
        word_counts[m] += 1
    details = [f"{w}({c})" for w, c in word_counts.items()]
    return False, details
Repobility · severity-and-effort ranking · https://repobility.com
check_010_cost_table function · python · L249-L259 (11 LOC)
scripts/seo_qc.py
def check_010_cost_table(body: str) -> tuple[bool, str]:
    """Check that first H2 section contains a markdown table (>= 4 lines)."""
    sections = find_h2_sections(body)
    if not sections:
        return False, "no H2 sections"

    first_section = sections[0][1]
    table_lines = [l for l in first_section.split("\n") if l.strip().startswith("|")]
    if len(table_lines) >= 4:
        return True, ""
    return False, f"table lines in H2-1: {len(table_lines)}"
check_011_markdown_table_syntax function · python · L262-L294 (33 LOC)
scripts/seo_qc.py
def check_011_markdown_table_syntax(body: str) -> tuple[bool, str]:
    """Check markdown table syntax: header -> separator -> data rows."""
    lines = body.split("\n")
    issues = []
    i = 0
    in_table = False
    has_separator = False

    while i < len(lines):
        line = lines[i].strip()
        if line.startswith("|"):
            if not in_table:
                # Start of a new table - this should be the header row
                in_table = True
                has_separator = False
                # Next line must be separator (|---|---|)
                if i + 1 < len(lines):
                    next_line = lines[i + 1].strip()
                    if next_line.startswith("|") and re.search(r"-{2,}", next_line):
                        has_separator = True
                    else:
                        issues.append(f"line {i+1}: table header not followed by separator")
            # else: continuation of table (data rows) - OK
        else:
            if in_table
check_012_writing_style function · python · L297-L304 (8 LOC)
scripts/seo_qc.py
def check_012_writing_style(body: str) -> tuple[bool, str]:
    """Check for consistent です・ます style (no 常体 endings)."""
    # Exclude HTML tags and frontmatter
    clean = strip_html_tags(body)
    matches = JOTAI_PATTERN.findall(clean)
    if not matches:
        return True, ""
    return False, f"常体 found: {matches[:5]}"
check_013_area_frequency function · python · L307-L316 (10 LOC)
scripts/seo_qc.py
def check_013_area_frequency(fm: dict, body: str) -> tuple[bool, str]:
    """Check that area name appears >= 3 times in body."""
    area = fm.get("area", "")
    if not area:
        return False, "no area in frontmatter"

    count = body.count(area)
    if count >= 3:
        return True, ""
    return False, f"area '{area}' count: {count}"
check_014_image_exists function · python · L319-L333 (15 LOC)
scripts/seo_qc.py
def check_014_image_exists(slug: str, site_dir: str) -> tuple[bool, str]:
    """Check that OGP and thumbnail images exist for the article."""
    images_dir = os.path.join(site_dir, "public", "images", "articles")
    ogp_path = os.path.join(images_dir, f"{slug}-ogp.png")
    thumb_path = os.path.join(images_dir, f"{slug}-thumb.png")

    missing = []
    if not os.path.exists(ogp_path):
        missing.append("ogp")
    if not os.path.exists(thumb_path):
        missing.append("thumb")

    if not missing:
        return True, ""
    return False, f"missing: {', '.join(missing)}"
run_checks function · python · L336-L403 (68 LOC)
scripts/seo_qc.py
def run_checks(filepath: str, site_dir: str) -> dict:
    """Run all 14 checks on a single article file."""
    with open(filepath, "r", encoding="utf-8") as f:
        content = f.read()

    fm, body = parse_frontmatter(content)
    slug = Path(filepath).stem  # filename without extension
    filename = Path(filepath).name

    results = {}

    # Check 001: Frontmatter fields
    passed, detail = check_001_frontmatter_fields(fm)
    results["check_001"] = {"pass": passed, "detail": detail}

    # Check 002: Frontmatter types
    passed, detail = check_002_frontmatter_types(fm)
    results["check_002"] = {"pass": passed, "detail": detail}

    # Check 003: PR notation
    passed, detail = check_003_pr_notation(body)
    results["check_003"] = {"pass": passed, "detail": detail}

    # Check 004: CTA count
    passed, detail = check_004_cta_count(body)
    results["check_004"] = {"pass": passed, "detail": detail}

    # Check 005: CTA structure
    passed, detail = check_005_cta_struct
aggregate_results function · python · L406-L435 (30 LOC)
scripts/seo_qc.py
def aggregate_results(all_results: dict[str, dict]) -> dict:
    """Aggregate per-file results into site-level summary."""
    check_ids = [f"check_{i:03d}" for i in range(1, 15)]
    summary = {}

    for cid in check_ids:
        pass_count = 0
        fail_count = 0
        fail_files = []

        for filename, checks in all_results.items():
            if cid in checks:
                if checks[cid]["pass"]:
                    pass_count += 1
                else:
                    fail_count += 1
                    fail_files.append(filename)

        total = pass_count + fail_count
        rate = f"{pass_count / total * 100:.0f}%" if total > 0 else "N/A"

        summary[cid] = {
            "pass": pass_count,
            "fail": fail_count,
            "pass_rate": rate,
            "fail_files": fail_files[:20],  # Limit to 20 samples
            "fail_files_total": len(fail_files),
        }

    return summary
run_site function · python · L438-L487 (50 LOC)
scripts/seo_qc.py
def run_site(site: str, base_dir: str, output_format: str = "yaml") -> dict:
    """Run QC checks on all articles for a site."""
    site_dir = os.path.join(base_dir, site)
    area_dir = os.path.join(site_dir, "src", "content", "area")

    if not os.path.isdir(area_dir):
        print(f"ERROR: {area_dir} not found", file=sys.stderr)
        return {}

    # Find all .md and .mdx files
    md_files = sorted(glob.glob(os.path.join(area_dir, "*.md")))
    mdx_files = sorted(glob.glob(os.path.join(area_dir, "*.mdx")))
    all_files = md_files + mdx_files

    if not all_files:
        print(f"WARNING: No articles found in {area_dir}", file=sys.stderr)
        return {}

    print(f"Checking {site}: {len(all_files)} articles...", file=sys.stderr)

    all_results = {}
    for filepath in all_files:
        filename = Path(filepath).name
        try:
            results = run_checks(filepath, site_dir)
            all_results[filename] = results
        except Exception as e:
            p
Repobility · open methodology · https://repobility.com/research/
print_summary function · python · L490-L525 (36 LOC)
scripts/seo_qc.py
def print_summary(report: dict):
    """Print a concise summary table."""
    site = report["site"]
    total = report["total_articles"]
    results = report["results"]

    check_names = {
        "check_001": "frontmatter存在",
        "check_002": "frontmatter型",
        "check_003": "PR表記",
        "check_004": "CTA×3",
        "check_005": "CTA構造",
        "check_006": "H2×5",
        "check_007": "FAQ 5問",
        "check_008": "2500文字↑",
        "check_009": "禁止語なし",
        "check_010": "費用テーブル",
        "check_011": "md構文",
        "check_012": "です・ます",
        "check_013": "地域名出現",
        "check_014": "画像存在",
    }

    print(f"\n{'='*60}")
    print(f"  {site} ({total} articles)  —  Overall: {report['overall']['pass_rate']}")
    print(f"{'='*60}")
    print(f"  {'Check':<20} {'Pass':>6} {'Fail':>6} {'Rate':>6}")
    print(f"  {'-'*40}")

    for cid in sorted(results.keys()):
        r = results[cid]
        name = check_names.get(cid, cid)
        marker = " !!" if r["fail"]
main function · python · L528-L573 (46 LOC)
scripts/seo_qc.py
def main():
    parser = argparse.ArgumentParser(description="SEO Article Quality Check")
    parser.add_argument("site", help="Site name (e.g., gaichuu) or 'all' for all sites")
    parser.add_argument("--base-dir", default=DEFAULT_BASE_DIR, help="Base directory for sites")
    parser.add_argument("--output", choices=["yaml", "summary", "both"], default="both",
                        help="Output format")
    parser.add_argument("--report-dir", default=None,
                        help="Directory to write YAML reports (default: queue/reports/)")
    args = parser.parse_args()

    sites = ALL_SITES if args.site == "all" else [args.site]

    report_dir = args.report_dir
    if report_dir is None:
        # Default to queue/reports/ relative to this script
        script_dir = Path(__file__).resolve().parent.parent
        report_dir = str(script_dir / "queue" / "reports")

    all_reports = []

    for site in sites:
        report = run_site(site, args.base_dir, args.output)
      
load_yaml function · python · L23-L32 (10 LOC)
scripts/slim_yaml.py
def load_yaml(filepath):
    """Safely load YAML file."""
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            return yaml.safe_load(f) or {}
    except FileNotFoundError:
        return {}
    except yaml.YAMLError as e:
        print(f"Error parsing {filepath}: {e}", file=sys.stderr)
        return {}
save_yaml function · python · L35-L43 (9 LOC)
scripts/slim_yaml.py
def save_yaml(filepath, data):
    """Safely save YAML file."""
    try:
        with open(filepath, 'w', encoding='utf-8') as f:
            yaml.dump(data, f, allow_unicode=True, sort_keys=False, default_flow_style=False)
        return True
    except Exception as e:
        print(f"Error writing {filepath}: {e}", file=sys.stderr)
        return False
get_timestamp function · python · L46-L48 (3 LOC)
scripts/slim_yaml.py
def get_timestamp():
    """Generate archive filename timestamp."""
    return datetime.now().strftime('%Y%m%d%H%M%S')
get_active_cmd_ids function · python · L55-L75 (21 LOC)
scripts/slim_yaml.py
def get_active_cmd_ids():
    """Return command IDs in rockman_to_blues that are not done."""
    queue_dir = get_queue_dir()
    cmd_queue_file = queue_dir / 'rockman_to_blues.yaml'
    data = load_yaml(cmd_queue_file)

    key = 'commands' if 'commands' in data else 'queue'
    commands = data.get(key, []) if isinstance(data, dict) else []
    if not isinstance(commands, list):
        return set()

    active = set()
    for cmd in commands:
        if not isinstance(cmd, dict):
            continue
        if cmd.get('id') is None:
            continue
        if cmd.get('status') == 'done':
            continue
        active.add(cmd.get('id'))
    return active
archive_taskspec function · python · L82-L94 (13 LOC)
scripts/slim_yaml.py
def archive_taskspec(filepath, archive_path, data, dry_run=False):
    if dry_run:
        print(f"[DRY-RUN] would archive: {filepath}")
        print(f"[DRY-RUN] would write: {archive_path}")
        return True

    ensure_parent_dir(archive_path)
    if not save_yaml(archive_path, data):
        return False

    if filepath.name in archive_path.name:
        return True
    return filepath.rename(archive_path)
slim_tasks function · python · L97-L150 (54 LOC)
scripts/slim_yaml.py
def slim_tasks(dry_run=False):
    queue_dir = get_queue_dir()
    tasks_dir = queue_dir / 'tasks'
    archive_dir = queue_dir / 'archive' / 'tasks'

    if not tasks_dir.exists():
        return True

    timestamp = get_timestamp()
    done_statuses = {'done', 'completed', 'cancelled'}

    for filepath in sorted(tasks_dir.glob('*.yaml')):
        data = load_yaml(filepath)
        if not isinstance(data, dict):
            continue

        task = data.get('task', {}) if isinstance(data.get('task', {}), dict) else {}
        status = task.get('status', '') if isinstance(task, dict) else ''
        if not status:
            continue

        stem = filepath.stem
        if stem in CANONICAL_TASKS:
            if status not in done_statuses:
                continue

            archive_path = archive_dir / f'{stem}_{timestamp}.yaml'
            if not archive_taskspec(filepath, archive_path, data, dry_run=dry_run):
                return False

            if dry_run:
              
Want this analysis on your repo? https://repobility.com/scan/
slim_reports function · python · L153-L192 (40 LOC)
scripts/slim_yaml.py
def slim_reports(dry_run=False):
    queue_dir = get_queue_dir()
    reports_dir = queue_dir / 'reports'
    archive_dir = queue_dir / 'archive' / 'reports'

    if not reports_dir.exists():
        return True

    active_cmd_ids = get_active_cmd_ids()
    timestamp = get_timestamp()

    for filepath in sorted(reports_dir.glob('*.yaml')):
        if filepath.stem in CANONICAL_REPORTS:
            continue

        is_stale = (time.time() - filepath.stat().st_mtime) >= 86400

        if not is_stale:
            continue

        data = load_yaml(filepath)
        parent_cmd = data.get('parent_cmd') if isinstance(data, dict) else None
        is_active = parent_cmd in active_cmd_ids

        if is_active:
            continue

        archive_path = archive_dir / filepath.name
        if archive_path.exists():
            archive_path = archive_dir / f'{filepath.stem}_{timestamp}{filepath.suffix}'

        if dry_run:
            print(f"[DRY-RUN] would archive: {filepath}")
         
slim_inbox function · python · L195-L250 (56 LOC)
scripts/slim_yaml.py
def slim_inbox(agent_id, dry_run=False):
    """Archive read: true messages from inbox file."""
    queue_dir = get_queue_dir()
    archive_dir = queue_dir / 'archive'
    inbox_file = queue_dir / 'inbox' / f'{agent_id}.yaml'

    if not inbox_file.exists():
        # Inbox doesn't exist yet - that's fine
        return True

    data = load_yaml(inbox_file)
    if not data or 'messages' not in data:
        return True

    messages = data.get('messages', [])
    if not isinstance(messages, list):
        print("Error: messages is not a list", file=sys.stderr)
        return False

    # Separate unread and archived messages
    unread = []
    archived = []

    for msg in messages:
        is_read = msg.get('read', False)
        if is_read:
            archived.append(msg)
        else:
            unread.append(msg)

    # If nothing to archive, return success without writing
    if not archived:
        return True

    archive_timestamp = get_timestamp()
    archive_file = archi
slim_cmd_queue function · python · L253-L304 (52 LOC)
scripts/slim_yaml.py
def slim_cmd_queue():
    """Archive done/cancelled commands from rockman_to_blues.yaml."""
    queue_dir = get_queue_dir()
    archive_dir = queue_dir / 'archive'
    cmd_queue_file = queue_dir / 'rockman_to_blues.yaml'

    if not cmd_queue_file.exists():
        print(f"Warning: {cmd_queue_file} not found", file=sys.stderr)
        return True

    data = load_yaml(cmd_queue_file)
    # Support both 'commands' and 'queue' keys for backwards compatibility
    key = 'commands' if isinstance(data, dict) and 'commands' in data else 'queue'
    if not data or key not in data:
        return True

    queue = data.get(key, [])
    if not isinstance(queue, list):
        print("Error: queue is not a list", file=sys.stderr)
        return False

    # Separate active and archived commands
    active = []
    archived = []

    for cmd in queue:
        status = cmd.get('status', 'unknown')
        if status in ['done', 'cancelled']:
            archived.append(cmd)
        else:
           
page 1 / 2next ›