Function bodies 56 total
Screen class · kotlin · L49-L54 (6 LOC)android/app/src/main/java/com/shogun/android/MainActivity.kt
sealed class Screen(val route: String, val label: String, val icon: ImageVector) {
object Shogun : Screen("shogun", "将軍", Icons.Default.Star)
object Agents : Screen("agents", "エージェント", Icons.Default.List)
object Dashboard : Screen("dashboard", "戦況", Icons.Default.Home)
object Settings : Screen("settings", "設定", Icons.Default.Settings)
}MainActivity class · kotlin · L63-L144 (82 LOC)android/app/src/main/java/com/shogun/android/MainActivity.kt
class MainActivity : ComponentActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
enableEdgeToEdge()
NotificationHelper.initChannels(this)
setContent {
ShogunTheme {
ShogunApp()
}
}
handleShareIntent(intent)
// Only start NtfyService if notification permission is granted (Android 13+)
val hasNotifPerm = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.TIRAMISU) {
ContextCompat.checkSelfPermission(this, Manifest.permission.POST_NOTIFICATIONS) ==
PackageManager.PERMISSION_GRANTED
} else true
if (hasNotifPerm && getSharedPreferences(PrefsKeys.PREFS_NAME, MODE_PRIVATE)
.getBoolean(PrefsKeys.NOTIFICATION_ENABLED, true)) {
try {
startForegroundService(Intent(this, NtfyService::class.java))
} catch (_: Exception) {
// ForegrNotificationHelper class · kotlin · L11-L102 (92 LOC)android/app/src/main/java/com/shogun/android/NotificationHelper.kt
object NotificationHelper {
private const val CH_CMD_COMPLETE = "cmd_complete"
private const val CH_CMD_FAILURE = "cmd_failure"
private const val CH_ACTION_REQUIRED = "action_required"
private const val CH_DASHBOARD_UPDATE = "dashboard_update"
private const val CH_STREAK_UPDATE = "streak_update"
private const val CH_AGENT_RESPONSE = "agent_response"
fun initChannels(context: Context) {
val nm = context.getSystemService(NotificationManager::class.java)
val channels = listOf(
NotificationChannel(CH_CMD_COMPLETE, "タスク完了", NotificationManager.IMPORTANCE_DEFAULT).apply {
enableVibration(true)
vibrationPattern = longArrayOf(0, 200)
},
NotificationChannel(CH_CMD_FAILURE, "タスク失敗", NotificationManager.IMPORTANCE_HIGH).apply {
enableVibration(true)
vibrationPattern = longArrayOf(0, 300, 100, 300)
},
NotificationChannel(CH_ACTIONNtfyService class · kotlin · L27-L162 (136 LOC)android/app/src/main/java/com/shogun/android/NtfyService.kt
class NtfyService : Service() {
private val client = OkHttpClient.Builder()
.pingInterval(30, TimeUnit.SECONDS)
.build()
private var webSocket: WebSocket? = null
private var lastReceivedId: String = ""
private var backoffIndex = 0
private val reconnectHandler = Handler(Looper.getMainLooper())
private var reconnectRunnable: Runnable? = null
private lateinit var connectivityManager: ConnectivityManager
private var networkCallback: ConnectivityManager.NetworkCallback? = null
companion object {
private const val CHANNEL_ID = "ntfy_service"
private const val NOTIFICATION_ID = 2
private val TOPIC = Defaults.NTFY_TOPIC
private val BACKOFF_DELAYS = longArrayOf(5_000L, 10_000L, 30_000L, 60_000L)
}
override fun onCreate() {
super.onCreate()
val prefs = getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)
if (!prefs.getBoolean(PrefsKeys.NOTIFICATION_ENABLED, true)) {Companion class · kotlin · L41-L46 (6 LOC)android/app/src/main/java/com/shogun/android/NtfyService.kt
companion object {
private const val CHANNEL_ID = "ntfy_service"
private const val NOTIFICATION_ID = 2
private val TOPIC = Defaults.NTFY_TOPIC
private val BACKOFF_DELAYS = longArrayOf(5_000L, 10_000L, 30_000L, 60_000L)
}NtfyWebSocketListener class · kotlin · L131-L161 (31 LOC)android/app/src/main/java/com/shogun/android/NtfyService.kt
inner class NtfyWebSocketListener : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
try {
val json = JSONObject(text)
if (json.optString("event") != "message") return
lastReceivedId = json.optString("id", lastReceivedId)
backoffIndex = 0
val title = json.optString("title", "")
val message = json.optString("message", "")
val tagsArray = json.optJSONArray("tags")
val tags = buildList {
if (tagsArray != null) {
for (i in 0 until tagsArray.length()) add(tagsArray.getString(i))
}
}
NotificationHelper.showNotification(this@NtfyService, message, tags, title)
} catch (_: Exception) {
// Malformed JSON — ignore
}
}
override fun onFailure(webSocket: WebSocket,SshForegroundService class · kotlin · L12-L66 (55 LOC)android/app/src/main/java/com/shogun/android/SshForegroundService.kt
class SshForegroundService : Service() {
private val binder = SshBinder()
inner class SshBinder : Binder() {
fun getService(): SshForegroundService = this@SshForegroundService
}
override fun onCreate() {
super.onCreate()
createNotificationChannel()
startForeground(NOTIFICATION_ID, buildNotification())
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
return START_STICKY
}
override fun onBind(intent: Intent): IBinder = binder
override fun onDestroy() {
super.onDestroy()
}
private fun createNotificationChannel() {
val channel = NotificationChannel(
CHANNEL_ID,
"SSH接続",
NotificationManager.IMPORTANCE_LOW
).apply {
description = "将軍セッションのSSH接続を維持します"
}
getSystemService(NotificationManager::class.java).createNotificationChannel(channel)
}
private fun buildNotification(): NotifRepobility · open methodology · https://repobility.com/research/
SshBinder class · kotlin · L16-L18 (3 LOC)android/app/src/main/java/com/shogun/android/SshForegroundService.kt
inner class SshBinder : Binder() {
fun getService(): SshForegroundService = this@SshForegroundService
}Companion class · kotlin · L62-L65 (4 LOC)android/app/src/main/java/com/shogun/android/SshForegroundService.kt
companion object {
private const val CHANNEL_ID = "ssh_connection"
private const val NOTIFICATION_ID = 1
}SshManager class · kotlin · L21-L295 (275 LOC)android/app/src/main/java/com/shogun/android/ssh/SshManager.kt
class SshManager private constructor() {
companion object {
@Volatile private var INSTANCE: SshManager? = null
fun getInstance(): SshManager = INSTANCE ?: synchronized(this) {
INSTANCE ?: SshManager().also { INSTANCE = it }
}
}
@Volatile private var session: Session? = null
// Mutex serializes ALL SSH operations (exec, reconnect, connect).
// Prevents race condition where multiple ViewModels' concurrent reconnects
// kill each other's newly-created sessions.
private val sshMutex = Mutex()
// Stored for reconnect
private var lastHost = ""
private var lastPort = 22
private var lastUser = ""
private var lastKeyPath = ""
private var lastPassword = ""
var disconnectCallback: (() -> Unit)? = null
suspend fun connect(
host: String,
port: Int,
user: String,
privateKeyPath: String,
password: String = "",
onOutput: ((String) -> Unit)? = null,
Companion class · kotlin · L23-L28 (6 LOC)android/app/src/main/java/com/shogun/android/ssh/SshManager.kt
companion object {
@Volatile private var INSTANCE: SshManager? = null
fun getInstance(): SshManager = INSTANCE ?: synchronized(this) {
INSTANCE ?: SshManager().also { INSTANCE = it }
}
}AppLogger class · kotlin · L8-L25 (18 LOC)android/app/src/main/java/com/shogun/android/util/AppLogger.kt
object AppLogger {
private val entries = CopyOnWriteArrayList<String>()
private const val MAX_ENTRIES = 200
private val fmt = SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault())
fun log(tag: String, message: String) {
val ts = fmt.format(Date())
val entry = "$ts [$tag] $message"
entries.add(entry)
while (entries.size > MAX_ENTRIES) {
entries.removeAt(0)
}
}
fun getEntries(): List<String> = entries.toList()
fun clear() = entries.clear()
}PrefsKeys class · kotlin · L4-L22 (19 LOC)android/app/src/main/java/com/shogun/android/util/Constants.kt
object PrefsKeys {
const val PREFS_NAME = "shogun_prefs"
const val SSH_HOST = "ssh_host"
const val SSH_PORT = "ssh_port"
const val SSH_USER = "ssh_user"
const val SSH_KEY_PATH = "ssh_key_path"
const val SSH_PASSWORD = "ssh_password"
const val PROJECT_PATH = "project_path"
const val SHOGUN_SESSION = "shogun_session"
const val AGENTS_SESSION = "agents_session"
const val NOTIFICATION_ENABLED = "notification_enabled"
const val NTFY_TOPIC = "ntfy_topic"
const val NOTIFY_CMD_COMPLETE = "notify_cmd_complete"
const val NOTIFY_CMD_FAILURE = "notify_cmd_failure"
const val NOTIFY_ACTION_REQUIRED = "notify_action_required"
const val NOTIFY_DASHBOARD_UPDATE = "notify_dashboard_update"
const val NOTIFY_STREAK_UPDATE = "notify_streak_update"
const val NOTIFY_AGENT_RESPONSE = "notify_agent_response"
}Defaults class · kotlin · L24-L32 (9 LOC)android/app/src/main/java/com/shogun/android/util/Constants.kt
object Defaults {
const val SSH_HOST = "192.168.1.1"
const val SSH_PORT = 22
const val SSH_PORT_STR = "22"
const val SHOGUN_SESSION = "shogun"
const val AGENTS_SESSION = "multiagent"
const val NTFY_TOPIC = "sho-y0uhey"
const val TMUX = "/usr/bin/tmux"
}AgentsViewModel class · kotlin · L24-L198 (175 LOC)android/app/src/main/java/com/shogun/android/viewmodel/AgentsViewModel.kt
class AgentsViewModel(application: Application) : AndroidViewModel(application) {
private val sshManager = SshManager.getInstance()
private val prefs = application.getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)
private val _panes = MutableStateFlow<List<PaneInfo>>(emptyList())
val panes: StateFlow<List<PaneInfo>> = _panes
private val _isConnected = MutableStateFlow(false)
val isConnected: StateFlow<Boolean> = _isConnected
private val _errorMessage = MutableStateFlow<String?>(null)
val errorMessage: StateFlow<String?> = _errorMessage
private val _rateLimitResult = MutableStateFlow<String?>(null)
val rateLimitResult: StateFlow<String?> = _rateLimitResult
private val _rateLimitLoading = MutableStateFlow(false)
val rateLimitLoading: StateFlow<Boolean> = _rateLimitLoading
private var refreshJob: Job? = null
@Volatile private var paused = false
@Volatile private var isRefreshing = false
private fun agenWant this analysis on your repo? https://repobility.com/scan/
DashboardViewModel class · kotlin · L13-L67 (55 LOC)android/app/src/main/java/com/shogun/android/viewmodel/DashboardViewModel.kt
class DashboardViewModel(application: Application) : AndroidViewModel(application) {
private val sshManager = SshManager.getInstance()
private val prefs = application.getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)
private val _markdownContent = MutableStateFlow("")
val markdownContent: StateFlow<String> = _markdownContent
private val _isConnected = MutableStateFlow(false)
val isConnected: StateFlow<Boolean> = _isConnected
private val _isLoading = MutableStateFlow(false)
val isLoading: StateFlow<Boolean> = _isLoading
private val _errorMessage = MutableStateFlow<String?>(null)
val errorMessage: StateFlow<String?> = _errorMessage
fun connect(host: String, port: Int, user: String, keyPath: String, password: String = "") {
viewModelScope.launch {
val result = sshManager.connect(host, port, user, keyPath, password)
if (result.isSuccess) {
_isConnected.value = true
SettingsViewModel class · kotlin · L11-L78 (68 LOC)android/app/src/main/java/com/shogun/android/viewmodel/SettingsViewModel.kt
class SettingsViewModel(application: Application) : AndroidViewModel(application) {
private val prefs = application.getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)
private val _notificationEnabled = MutableStateFlow(prefs.getBoolean(PrefsKeys.NOTIFICATION_ENABLED, true))
val notificationEnabled: StateFlow<Boolean> = _notificationEnabled
private val _ntfyTopic = MutableStateFlow(prefs.getString(PrefsKeys.NTFY_TOPIC, Defaults.NTFY_TOPIC) ?: Defaults.NTFY_TOPIC)
val ntfyTopic: StateFlow<String> = _ntfyTopic
private val _notifyCmdComplete = MutableStateFlow(prefs.getBoolean(PrefsKeys.NOTIFY_CMD_COMPLETE, true))
val notifyCmdComplete: StateFlow<Boolean> = _notifyCmdComplete
private val _notifyCmdFailure = MutableStateFlow(prefs.getBoolean(PrefsKeys.NOTIFY_CMD_FAILURE, true))
val notifyCmdFailure: StateFlow<Boolean> = _notifyCmdFailure
private val _notifyActionRequired = MutableStateFlow(prefs.getBoolean(PrefsKeys.NOTIFY_ACTION_RShogunViewModel class · kotlin · L19-L153 (135 LOC)android/app/src/main/java/com/shogun/android/viewmodel/ShogunViewModel.kt
class ShogunViewModel(application: Application) : AndroidViewModel(application) {
private val sshManager = SshManager.getInstance()
private val prefs = application.getSharedPreferences(PrefsKeys.PREFS_NAME, Context.MODE_PRIVATE)
private val _paneContent = MutableStateFlow("")
val paneContent: StateFlow<String> = _paneContent
private val _isConnected = MutableStateFlow(false)
val isConnected: StateFlow<Boolean> = _isConnected
private val _errorMessage = MutableStateFlow<String?>(null)
val errorMessage: StateFlow<String?> = _errorMessage
private var refreshJob: Job? = null
private var reconnectJob: Job? = null
@Volatile private var paused = false
private fun tmuxTarget(): String {
val session = prefs.getString(PrefsKeys.SHOGUN_SESSION, Defaults.SHOGUN_SESSION) ?: Defaults.SHOGUN_SESSION
return "$session:main"
}
fun pauseRefresh() { paused = true }
fun resumeRefresh() {
paused = false
viewMoparse_frontmatter function · python · L40-L59 (20 LOC)scripts/seo_qc.py
def parse_frontmatter(content: str) -> tuple[dict, str]:
"""Parse YAML frontmatter from markdown content. Returns (frontmatter_dict, body)."""
if not content.startswith("---"):
return {}, content
end = content.find("---", 3)
if end == -1:
return {}, content
fm_str = content[3:end].strip()
body = content[end + 3:].strip()
try:
fm = yaml.safe_load(fm_str)
if not isinstance(fm, dict):
fm = {}
except yaml.YAMLError:
fm = {}
return fm, bodystrip_html_tags function · python · L62-L64 (3 LOC)scripts/seo_qc.py
def strip_html_tags(text: str) -> str:
"""Remove HTML tags from text."""
return re.sub(r"<[^>]+>", "", text)count_japanese_chars function · python · L67-L77 (11 LOC)scripts/seo_qc.py
def count_japanese_chars(text: str) -> int:
"""Count meaningful characters (Japanese + alphanumeric, excluding whitespace and markdown)."""
# Remove HTML tags
text = strip_html_tags(text)
# Remove markdown link syntax
text = re.sub(r"\[([^\]]*)\]\([^)]*\)", r"\1", text)
# Remove markdown formatting
text = re.sub(r"[#*_`|>-]", "", text)
# Remove whitespace and newlines
text = re.sub(r"\s+", "", text)
return len(text)find_h2_sections function · python · L80-L99 (20 LOC)scripts/seo_qc.py
def find_h2_sections(body: str) -> list[tuple[str, str]]:
"""Find H2 sections and their content. Returns list of (heading, section_content)."""
lines = body.split("\n")
sections = []
current_heading = None
current_lines = []
for line in lines:
if line.startswith("## "):
if current_heading is not None:
sections.append((current_heading, "\n".join(current_lines)))
current_heading = line
current_lines = []
elif current_heading is not None:
current_lines.append(line)
if current_heading is not None:
sections.append((current_heading, "\n".join(current_lines)))
return sectionscheck_001_frontmatter_fields function · python · L102-L107 (6 LOC)scripts/seo_qc.py
def check_001_frontmatter_fields(fm: dict) -> tuple[bool, str]:
"""Check that all 7 required frontmatter fields exist."""
missing = [f for f in REQUIRED_FIELDS if f not in fm]
if missing:
return False, f"missing: {', '.join(missing)}"
return True, ""Repobility (the analyzer behind this table) · https://repobility.com
check_002_frontmatter_types function · python · L110-L144 (35 LOC)scripts/seo_qc.py
def check_002_frontmatter_types(fm: dict) -> tuple[bool, str]:
"""Check frontmatter field types and values."""
issues = []
if "title" in fm and (not isinstance(fm["title"], str) or not fm["title"].strip()):
issues.append("title empty or not string")
if "description" in fm and (not isinstance(fm["description"], str) or not fm["description"].strip()):
issues.append("description empty or not string")
if "publishedAt" in fm:
val = str(fm["publishedAt"])
if not DATE_PATTERN.match(val):
issues.append(f"publishedAt format: {val}")
if "category" in fm and fm["category"] != "area":
# Allow other categories like "ranking" etc.
pass
if "area" in fm and (not isinstance(fm["area"], str) or not fm["area"].strip()):
issues.append("area empty or not string")
if "keyword" in fm:
if not isinstance(fm["keyword"], str) or not fm["keyword"].strip():
issues.append("keyword empty or ncheck_003_pr_notation function · python · L147-L153 (7 LOC)scripts/seo_qc.py
def check_003_pr_notation(body: str) -> tuple[bool, str]:
"""Check for PR/affiliate disclosure within first 50 lines."""
lines = body.split("\n")[:50]
text = "\n".join(lines)
if "アフィリエイト広告" in text or "PR" in text:
return True, ""
return False, "PR notation not found in first 50 lines"check_004_cta_count function · python · L156-L164 (9 LOC)scripts/seo_qc.py
def check_004_cta_count(body: str) -> tuple[bool, str]:
"""Check that exactly 3 CTA boxes exist."""
count = body.count('<div class="cta-box">')
# Also check for <!-- CTA: --> comments (rehype-affiliate-cta pattern)
cta_comments = len(re.findall(r"<!--\s*CTA:", body))
total = count + cta_comments
if total == 3:
return True, ""
return False, f"CTA count: {total} (div: {count}, comment: {cta_comments})"check_005_cta_structure function · python · L167-L191 (25 LOC)scripts/seo_qc.py
def check_005_cta_structure(body: str) -> tuple[bool, str]:
"""Check CTA box HTML structure."""
# Find all cta-box divs
cta_blocks = re.findall(
r'<div class="cta-box">.*?</div>\s*</div>',
body,
re.DOTALL
)
# Also count CTA comments (these are valid - replaced at build time)
cta_comments = len(re.findall(r"<!--\s*CTA:", body))
if not cta_blocks and cta_comments == 0:
return False, "no CTA found"
issues = []
for i, block in enumerate(cta_blocks):
if 'cta-badge' not in block and 'cta-button' not in block:
issues.append(f"CTA#{i+1}: missing badge or button")
if 'nofollow' not in block or 'sponsored' not in block:
issues.append(f"CTA#{i+1}: missing nofollow/sponsored")
if issues:
return False, "; ".join(issues)
return True, ""check_006_h2_count function · python · L194-L200 (7 LOC)scripts/seo_qc.py
def check_006_h2_count(body: str) -> tuple[bool, str]:
"""Check that there are exactly 5 H2 headings."""
h2s = re.findall(r"^## .+", body, re.MULTILINE)
count = len(h2s)
if count == 5:
return True, ""
return False, f"H2 count: {count}"check_007_faq_questions function · python · L203-L225 (23 LOC)scripts/seo_qc.py
def check_007_faq_questions(body: str) -> tuple[bool, str]:
"""Check that FAQ section has 5 Q&A items."""
sections = find_h2_sections(body)
# FAQ is typically the 4th H2 section
faq_content = ""
for heading, content in sections:
if "FAQ" in heading or "よくある質問" in heading:
faq_content = content
break
# If no explicit FAQ heading, use 4th section
if not faq_content and len(sections) >= 4:
faq_content = sections[3][1]
if not faq_content:
return False, "FAQ section not found"
# Count ### or #### headings in FAQ section
questions = re.findall(r"^#{3,4} .+", faq_content, re.MULTILINE)
count = len(questions)
if count >= 5:
return True, ""
return False, f"FAQ questions: {count}"check_008_char_count function · python · L228-L233 (6 LOC)scripts/seo_qc.py
def check_008_char_count(body: str) -> tuple[bool, str]:
"""Check that body has >= 2500 Japanese characters."""
char_count = count_japanese_chars(body)
if char_count >= 2500:
return True, ""
return False, f"chars: {char_count}"check_009_forbidden_words function · python · L236-L246 (11 LOC)scripts/seo_qc.py
def check_009_forbidden_words(body: str) -> tuple[bool, list]:
"""Check for forbidden words."""
matches = FORBIDDEN_PATTERN.findall(body)
if not matches:
return True, []
# Count occurrences
word_counts = defaultdict(int)
for m in matches:
word_counts[m] += 1
details = [f"{w}({c})" for w, c in word_counts.items()]
return False, detailsRepobility · severity-and-effort ranking · https://repobility.com
check_010_cost_table function · python · L249-L259 (11 LOC)scripts/seo_qc.py
def check_010_cost_table(body: str) -> tuple[bool, str]:
"""Check that first H2 section contains a markdown table (>= 4 lines)."""
sections = find_h2_sections(body)
if not sections:
return False, "no H2 sections"
first_section = sections[0][1]
table_lines = [l for l in first_section.split("\n") if l.strip().startswith("|")]
if len(table_lines) >= 4:
return True, ""
return False, f"table lines in H2-1: {len(table_lines)}"check_011_markdown_table_syntax function · python · L262-L294 (33 LOC)scripts/seo_qc.py
def check_011_markdown_table_syntax(body: str) -> tuple[bool, str]:
"""Check markdown table syntax: header -> separator -> data rows."""
lines = body.split("\n")
issues = []
i = 0
in_table = False
has_separator = False
while i < len(lines):
line = lines[i].strip()
if line.startswith("|"):
if not in_table:
# Start of a new table - this should be the header row
in_table = True
has_separator = False
# Next line must be separator (|---|---|)
if i + 1 < len(lines):
next_line = lines[i + 1].strip()
if next_line.startswith("|") and re.search(r"-{2,}", next_line):
has_separator = True
else:
issues.append(f"line {i+1}: table header not followed by separator")
# else: continuation of table (data rows) - OK
else:
if in_tablecheck_012_writing_style function · python · L297-L304 (8 LOC)scripts/seo_qc.py
def check_012_writing_style(body: str) -> tuple[bool, str]:
"""Check for consistent です・ます style (no 常体 endings)."""
# Exclude HTML tags and frontmatter
clean = strip_html_tags(body)
matches = JOTAI_PATTERN.findall(clean)
if not matches:
return True, ""
return False, f"常体 found: {matches[:5]}"check_013_area_frequency function · python · L307-L316 (10 LOC)scripts/seo_qc.py
def check_013_area_frequency(fm: dict, body: str) -> tuple[bool, str]:
"""Check that area name appears >= 3 times in body."""
area = fm.get("area", "")
if not area:
return False, "no area in frontmatter"
count = body.count(area)
if count >= 3:
return True, ""
return False, f"area '{area}' count: {count}"check_014_image_exists function · python · L319-L333 (15 LOC)scripts/seo_qc.py
def check_014_image_exists(slug: str, site_dir: str) -> tuple[bool, str]:
"""Check that OGP and thumbnail images exist for the article."""
images_dir = os.path.join(site_dir, "public", "images", "articles")
ogp_path = os.path.join(images_dir, f"{slug}-ogp.png")
thumb_path = os.path.join(images_dir, f"{slug}-thumb.png")
missing = []
if not os.path.exists(ogp_path):
missing.append("ogp")
if not os.path.exists(thumb_path):
missing.append("thumb")
if not missing:
return True, ""
return False, f"missing: {', '.join(missing)}"run_checks function · python · L336-L403 (68 LOC)scripts/seo_qc.py
def run_checks(filepath: str, site_dir: str) -> dict:
"""Run all 14 checks on a single article file."""
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
fm, body = parse_frontmatter(content)
slug = Path(filepath).stem # filename without extension
filename = Path(filepath).name
results = {}
# Check 001: Frontmatter fields
passed, detail = check_001_frontmatter_fields(fm)
results["check_001"] = {"pass": passed, "detail": detail}
# Check 002: Frontmatter types
passed, detail = check_002_frontmatter_types(fm)
results["check_002"] = {"pass": passed, "detail": detail}
# Check 003: PR notation
passed, detail = check_003_pr_notation(body)
results["check_003"] = {"pass": passed, "detail": detail}
# Check 004: CTA count
passed, detail = check_004_cta_count(body)
results["check_004"] = {"pass": passed, "detail": detail}
# Check 005: CTA structure
passed, detail = check_005_cta_structaggregate_results function · python · L406-L435 (30 LOC)scripts/seo_qc.py
def aggregate_results(all_results: dict[str, dict]) -> dict:
"""Aggregate per-file results into site-level summary."""
check_ids = [f"check_{i:03d}" for i in range(1, 15)]
summary = {}
for cid in check_ids:
pass_count = 0
fail_count = 0
fail_files = []
for filename, checks in all_results.items():
if cid in checks:
if checks[cid]["pass"]:
pass_count += 1
else:
fail_count += 1
fail_files.append(filename)
total = pass_count + fail_count
rate = f"{pass_count / total * 100:.0f}%" if total > 0 else "N/A"
summary[cid] = {
"pass": pass_count,
"fail": fail_count,
"pass_rate": rate,
"fail_files": fail_files[:20], # Limit to 20 samples
"fail_files_total": len(fail_files),
}
return summaryrun_site function · python · L438-L487 (50 LOC)scripts/seo_qc.py
def run_site(site: str, base_dir: str, output_format: str = "yaml") -> dict:
"""Run QC checks on all articles for a site."""
site_dir = os.path.join(base_dir, site)
area_dir = os.path.join(site_dir, "src", "content", "area")
if not os.path.isdir(area_dir):
print(f"ERROR: {area_dir} not found", file=sys.stderr)
return {}
# Find all .md and .mdx files
md_files = sorted(glob.glob(os.path.join(area_dir, "*.md")))
mdx_files = sorted(glob.glob(os.path.join(area_dir, "*.mdx")))
all_files = md_files + mdx_files
if not all_files:
print(f"WARNING: No articles found in {area_dir}", file=sys.stderr)
return {}
print(f"Checking {site}: {len(all_files)} articles...", file=sys.stderr)
all_results = {}
for filepath in all_files:
filename = Path(filepath).name
try:
results = run_checks(filepath, site_dir)
all_results[filename] = results
except Exception as e:
pRepobility · open methodology · https://repobility.com/research/
print_summary function · python · L490-L525 (36 LOC)scripts/seo_qc.py
def print_summary(report: dict):
"""Print a concise summary table."""
site = report["site"]
total = report["total_articles"]
results = report["results"]
check_names = {
"check_001": "frontmatter存在",
"check_002": "frontmatter型",
"check_003": "PR表記",
"check_004": "CTA×3",
"check_005": "CTA構造",
"check_006": "H2×5",
"check_007": "FAQ 5問",
"check_008": "2500文字↑",
"check_009": "禁止語なし",
"check_010": "費用テーブル",
"check_011": "md構文",
"check_012": "です・ます",
"check_013": "地域名出現",
"check_014": "画像存在",
}
print(f"\n{'='*60}")
print(f" {site} ({total} articles) — Overall: {report['overall']['pass_rate']}")
print(f"{'='*60}")
print(f" {'Check':<20} {'Pass':>6} {'Fail':>6} {'Rate':>6}")
print(f" {'-'*40}")
for cid in sorted(results.keys()):
r = results[cid]
name = check_names.get(cid, cid)
marker = " !!" if r["fail"]main function · python · L528-L573 (46 LOC)scripts/seo_qc.py
def main():
parser = argparse.ArgumentParser(description="SEO Article Quality Check")
parser.add_argument("site", help="Site name (e.g., gaichuu) or 'all' for all sites")
parser.add_argument("--base-dir", default=DEFAULT_BASE_DIR, help="Base directory for sites")
parser.add_argument("--output", choices=["yaml", "summary", "both"], default="both",
help="Output format")
parser.add_argument("--report-dir", default=None,
help="Directory to write YAML reports (default: queue/reports/)")
args = parser.parse_args()
sites = ALL_SITES if args.site == "all" else [args.site]
report_dir = args.report_dir
if report_dir is None:
# Default to queue/reports/ relative to this script
script_dir = Path(__file__).resolve().parent.parent
report_dir = str(script_dir / "queue" / "reports")
all_reports = []
for site in sites:
report = run_site(site, args.base_dir, args.output)
load_yaml function · python · L23-L32 (10 LOC)scripts/slim_yaml.py
def load_yaml(filepath):
"""Safely load YAML file."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return yaml.safe_load(f) or {}
except FileNotFoundError:
return {}
except yaml.YAMLError as e:
print(f"Error parsing {filepath}: {e}", file=sys.stderr)
return {}save_yaml function · python · L35-L43 (9 LOC)scripts/slim_yaml.py
def save_yaml(filepath, data):
"""Safely save YAML file."""
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, sort_keys=False, default_flow_style=False)
return True
except Exception as e:
print(f"Error writing {filepath}: {e}", file=sys.stderr)
return Falseget_timestamp function · python · L46-L48 (3 LOC)scripts/slim_yaml.py
def get_timestamp():
"""Generate archive filename timestamp."""
return datetime.now().strftime('%Y%m%d%H%M%S')get_active_cmd_ids function · python · L55-L75 (21 LOC)scripts/slim_yaml.py
def get_active_cmd_ids():
"""Return command IDs in rockman_to_blues that are not done."""
queue_dir = get_queue_dir()
cmd_queue_file = queue_dir / 'rockman_to_blues.yaml'
data = load_yaml(cmd_queue_file)
key = 'commands' if 'commands' in data else 'queue'
commands = data.get(key, []) if isinstance(data, dict) else []
if not isinstance(commands, list):
return set()
active = set()
for cmd in commands:
if not isinstance(cmd, dict):
continue
if cmd.get('id') is None:
continue
if cmd.get('status') == 'done':
continue
active.add(cmd.get('id'))
return activearchive_taskspec function · python · L82-L94 (13 LOC)scripts/slim_yaml.py
def archive_taskspec(filepath, archive_path, data, dry_run=False):
if dry_run:
print(f"[DRY-RUN] would archive: {filepath}")
print(f"[DRY-RUN] would write: {archive_path}")
return True
ensure_parent_dir(archive_path)
if not save_yaml(archive_path, data):
return False
if filepath.name in archive_path.name:
return True
return filepath.rename(archive_path)slim_tasks function · python · L97-L150 (54 LOC)scripts/slim_yaml.py
def slim_tasks(dry_run=False):
queue_dir = get_queue_dir()
tasks_dir = queue_dir / 'tasks'
archive_dir = queue_dir / 'archive' / 'tasks'
if not tasks_dir.exists():
return True
timestamp = get_timestamp()
done_statuses = {'done', 'completed', 'cancelled'}
for filepath in sorted(tasks_dir.glob('*.yaml')):
data = load_yaml(filepath)
if not isinstance(data, dict):
continue
task = data.get('task', {}) if isinstance(data.get('task', {}), dict) else {}
status = task.get('status', '') if isinstance(task, dict) else ''
if not status:
continue
stem = filepath.stem
if stem in CANONICAL_TASKS:
if status not in done_statuses:
continue
archive_path = archive_dir / f'{stem}_{timestamp}.yaml'
if not archive_taskspec(filepath, archive_path, data, dry_run=dry_run):
return False
if dry_run:
Want this analysis on your repo? https://repobility.com/scan/
slim_reports function · python · L153-L192 (40 LOC)scripts/slim_yaml.py
def slim_reports(dry_run=False):
queue_dir = get_queue_dir()
reports_dir = queue_dir / 'reports'
archive_dir = queue_dir / 'archive' / 'reports'
if not reports_dir.exists():
return True
active_cmd_ids = get_active_cmd_ids()
timestamp = get_timestamp()
for filepath in sorted(reports_dir.glob('*.yaml')):
if filepath.stem in CANONICAL_REPORTS:
continue
is_stale = (time.time() - filepath.stat().st_mtime) >= 86400
if not is_stale:
continue
data = load_yaml(filepath)
parent_cmd = data.get('parent_cmd') if isinstance(data, dict) else None
is_active = parent_cmd in active_cmd_ids
if is_active:
continue
archive_path = archive_dir / filepath.name
if archive_path.exists():
archive_path = archive_dir / f'{filepath.stem}_{timestamp}{filepath.suffix}'
if dry_run:
print(f"[DRY-RUN] would archive: {filepath}")
slim_inbox function · python · L195-L250 (56 LOC)scripts/slim_yaml.py
def slim_inbox(agent_id, dry_run=False):
"""Archive read: true messages from inbox file."""
queue_dir = get_queue_dir()
archive_dir = queue_dir / 'archive'
inbox_file = queue_dir / 'inbox' / f'{agent_id}.yaml'
if not inbox_file.exists():
# Inbox doesn't exist yet - that's fine
return True
data = load_yaml(inbox_file)
if not data or 'messages' not in data:
return True
messages = data.get('messages', [])
if not isinstance(messages, list):
print("Error: messages is not a list", file=sys.stderr)
return False
# Separate unread and archived messages
unread = []
archived = []
for msg in messages:
is_read = msg.get('read', False)
if is_read:
archived.append(msg)
else:
unread.append(msg)
# If nothing to archive, return success without writing
if not archived:
return True
archive_timestamp = get_timestamp()
archive_file = archislim_cmd_queue function · python · L253-L304 (52 LOC)scripts/slim_yaml.py
def slim_cmd_queue():
"""Archive done/cancelled commands from rockman_to_blues.yaml."""
queue_dir = get_queue_dir()
archive_dir = queue_dir / 'archive'
cmd_queue_file = queue_dir / 'rockman_to_blues.yaml'
if not cmd_queue_file.exists():
print(f"Warning: {cmd_queue_file} not found", file=sys.stderr)
return True
data = load_yaml(cmd_queue_file)
# Support both 'commands' and 'queue' keys for backwards compatibility
key = 'commands' if isinstance(data, dict) and 'commands' in data else 'queue'
if not data or key not in data:
return True
queue = data.get(key, [])
if not isinstance(queue, list):
print("Error: queue is not a list", file=sys.stderr)
return False
# Separate active and archived commands
active = []
archived = []
for cmd in queue:
status = cmd.get('status', 'unknown')
if status in ['done', 'cancelled']:
archived.append(cmd)
else:
page 1 / 2next ›