← back to flirtter__zzirit-docs

Function bodies 672 total

All specs Real LLM only Function bodies
load_json function · python · L10-L17 (8 LOC)
04. Resources/automation-scripts/automation/parallel-scheduler.py
def load_json(path: Path) -> dict:
    if not path.exists():
        return {}
    try:
        value = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return {}
    return value if isinstance(value, dict) else {}
parse_status_markdown function · python · L20-L31 (12 LOC)
04. Resources/automation-scripts/automation/parallel-scheduler.py
def parse_status_markdown(path: Path) -> dict[str, str]:
    values: dict[str, str] = {}
    if not path.exists():
        return values
    for line in path.read_text(encoding="utf-8").splitlines():
        if not line.startswith("- "):
            continue
        if ":" not in line:
            continue
        key, value = line[2:].split(":", 1)
        values[key.strip()] = value.strip()
    return values
parse_timestamp function · python · L34-L40 (7 LOC)
04. Resources/automation-scripts/automation/parallel-scheduler.py
def parse_timestamp(value: str) -> int | None:
    if not value:
        return None
    try:
        return int(datetime.strptime(value, "%Y-%m-%d %H:%M:%S").timestamp())
    except ValueError:
        return None
file_mtime_bucket function · python · L43-L46 (4 LOC)
04. Resources/automation-scripts/automation/parallel-scheduler.py
def file_mtime_bucket(path: Path, cooldown_seconds: int) -> str:
    if not path.exists():
        return "missing"
    return str(int(path.stat().st_mtime // max(cooldown_seconds, 1)))
effective_main_state function · python · L53-L72 (20 LOC)
04. Resources/automation-scripts/automation/parallel-scheduler.py
def effective_main_state(
    worker: dict,
    automation_status_file: Path,
    automation_status: dict[str, str],
) -> tuple[str, str]:
    state = automation_status.get("state", "")
    if state != "running":
        return state, "state-not-running"

    stale_seconds = int(worker.get("main_run_stale_seconds", 7200))
    started_at = parse_timestamp(automation_status.get("started_at", ""))
    if started_at is not None and now_timestamp() - started_at > stale_seconds:
        return "stale-running", "started_at-stale"

    if automation_status_file.exists():
        age_seconds = now_timestamp() - int(automation_status_file.stat().st_mtime)
        if age_seconds > stale_seconds:
            return "stale-running", "status-file-stale"

    return state, "fresh-running"
ready_gemini_review function · python · L75-L115 (41 LOC)
04. Resources/automation-scripts/automation/parallel-scheduler.py
def ready_gemini_review(
    worker: dict,
    artifact_root: Path,
    automation_status_file: Path,
    automation_status: dict[str, str],
) -> tuple[bool, str]:
    cooldown_seconds = int(worker.get("cooldown_seconds", 900))
    max_running_seconds = int(worker.get("max_running_seconds", max(cooldown_seconds, 900)))
    run_id = automation_status.get("run_id", "")
    state, _state_reason = effective_main_state(worker, automation_status_file, automation_status)
    control = load_json(artifact_root / worker["key"] / "control.json")
    worker_status = parse_status_markdown(artifact_root / worker["key"] / "status.md")

    if not run_id or not state:
        return False, "missing-main-run"

    if state == "running":
        return False, "wait-main-complete"

    trigger = f"{run_id}:{state}"

    worker_state = worker_status.get("state", "")
    worker_trigger = worker_status.get("trigger", "")
    worker_started_at = parse_timestamp(worker_status.get("started_at", ""))
    worker
ready_gemini_coder function · python · L118-L141 (24 LOC)
04. Resources/automation-scripts/automation/parallel-scheduler.py
def ready_gemini_coder(
    worker: dict,
    artifact_root: Path,
    automation_status_file: Path,
    automation_status: dict[str, str],
) -> tuple[bool, str]:
    cooldown_seconds = int(worker.get("cooldown_seconds", 1800))
    control = load_json(artifact_root / worker["key"] / "control.json")
    last_finished_at = int(control.get("last_finished_at_ts", 0) or 0)
    active_focus = automation_status.get("focus_section", "none")
    main_state, _state_reason = effective_main_state(worker, automation_status_file, automation_status)
    now_ts = now_timestamp()

    if main_state not in {"running", "success"}:
        return False, "main-not-active"

    if last_finished_at and now_ts - last_finished_at < cooldown_seconds:
        return False, "cooldown"

    trigger = f"{active_focus}:{now_ts // max(cooldown_seconds, 1)}"
    if control.get("last_trigger") == trigger:
        return False, "already-ran-bucket"

    return True, trigger
If a scraper extracted this row, it came from Repobility (https://repobility.com)
main function · python · L144-L193 (50 LOC)
04. Resources/automation-scripts/automation/parallel-scheduler.py
def main() -> int:
    if len(sys.argv) < 5:
        raise SystemExit(
            "usage: parallel-scheduler.py <spec-file> <artifact-root> <automation-status> <max-workers>"
        )

    spec_file = Path(sys.argv[1])
    artifact_root = Path(sys.argv[2])
    automation_status_file = Path(sys.argv[3])
    max_workers = int(sys.argv[4])

    data = json.loads(spec_file.read_text(encoding="utf-8"))
    workers = data.get("workers", []) if isinstance(data, dict) else data
    automation_status = parse_status_markdown(automation_status_file)

    ready_items: list[dict[str, str | int]] = []
    for worker in workers:
        if not worker.get("enabled", True):
            continue
        runner = worker.get("runner", "codex")
        if runner == "gemini-review":
            ready, trigger = ready_gemini_review(
                worker,
                artifact_root,
                automation_status_file,
                automation_status,
            )
        elif runner == "gemini-c
load_state function · python · L114-L121 (8 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def load_state(path: Path) -> dict:
    if not path.exists():
        return {}
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return {}
    return data if isinstance(data, dict) else {}
parse_time function · python · L124-L130 (7 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def parse_time(value: str | None) -> datetime | None:
    if not value:
        return None
    try:
        return datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
    except ValueError:
        return None
choose_next_section function · python · L133-L137 (5 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def choose_next_section(current: str | None, order: list[str]) -> str:
    if not current or current not in order:
        return order[0]
    index = order.index(current)
    return order[(index + 1) % len(order)]
parse_bool function · python · L140-L145 (6 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def parse_bool(value: object) -> bool:
    if isinstance(value, bool):
        return value
    if value is None:
        return False
    return str(value).strip().lower() in {"1", "true", "yes", "y", "on"}
ensure_task_queue function · python · L148-L158 (11 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def ensure_task_queue(path: Path) -> dict:
    if not path.exists():
        path.write_text(json.dumps(DEFAULT_TASK_QUEUE, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
        return json.loads(json.dumps(DEFAULT_TASK_QUEUE))

    data = load_state(path)
    tasks = data.get("tasks") if isinstance(data, dict) else None
    if not isinstance(tasks, list) or not tasks:
        path.write_text(json.dumps(DEFAULT_TASK_QUEUE, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
        return json.loads(json.dumps(DEFAULT_TASK_QUEUE))
    return data
normalize_queue_tasks function · python · L161-L177 (17 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def normalize_queue_tasks(queue: dict, spec_map: dict[str, dict]) -> dict:
    tasks = queue.get("tasks", [])
    if not isinstance(tasks, list):
        return queue

    valid_sections = set(SECTION_METADATA)
    for task in tasks:
        if not isinstance(task, dict):
            continue
        source_surface = str(task.get("source_surface", "")).strip()
        task_id = str(task.get("id", "")).strip()
        if source_surface and source_surface in valid_sections:
            task["section"] = source_surface
            continue
        if task_id in spec_map and task_id in valid_sections:
            task["section"] = task_id
    return queue
choose_task_from_queue function · python · L180-L196 (17 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def choose_task_from_queue(queue: dict) -> dict | None:
    tasks = [task for task in queue.get("tasks", []) if isinstance(task, dict)]
    if not tasks:
        return None

    active_task_id = queue.get("active_task_id")
    if active_task_id:
        for task in tasks:
            if task.get("id") == active_task_id and task.get("status") in {"in_progress", "pending", "queued"}:
                return task

    for status in ("in_progress", "pending", "queued"):
        for task in tasks:
            if task.get("status") == status:
                return task

    return None
Repobility analyzer · published findings · https://repobility.com
load_surface_specs function · python · L199-L211 (13 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def load_surface_specs(path: Path) -> dict[str, dict]:
    data = load_state(path)
    surfaces = data.get("surfaces") if isinstance(data, dict) else None
    if not isinstance(surfaces, list):
        return {}
    spec_map: dict[str, dict] = {}
    for item in surfaces:
        if not isinstance(item, dict):
            continue
        surface_id = str(item.get("id", "")).strip()
        if surface_id:
            spec_map[surface_id] = item
    return spec_map
maybe_append_followup_tasks function · python · L214-L263 (50 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def maybe_append_followup_tasks(queue: dict, spec_map: dict[str, dict], now: datetime) -> dict:
    tasks = [task for task in queue.get("tasks", []) if isinstance(task, dict)]
    if any(task.get("status") in {"in_progress", "pending", "queued"} for task in tasks):
        return queue

    existing_ids = {str(task.get("id", "")).strip() for task in tasks}
    appended = []

    for surface_id in FOLLOWUP_PRIORITY:
        spec = spec_map.get(surface_id, {})
        if not spec:
            continue
        if str(spec.get("current_state", "")).strip() == "blocked":
            continue
        next_step = str(spec.get("next_step", "")).strip()
        if not next_step:
            continue

        task_id = f"{surface_id}-followup"
        if task_id in existing_ids:
            continue

        section = surface_id if surface_id in SECTION_METADATA else surface_id
        title = f"{surface_id.replace('-', ' ').title()} follow-up"
        appended.append(
            {
            
build_surface_spec_payload function · python · L266-L277 (12 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def build_surface_spec_payload(section: str, spec_map: dict[str, dict]) -> dict[str, object]:
    spec = spec_map.get(section, {})
    spec_path = SURFACE_SPEC_MANIFEST.parent / f"{section}.md"
    return {
        "surface_spec_manifest": str(SURFACE_SPEC_MANIFEST),
        "surface_spec_path": str(spec_path) if spec_path.exists() else "",
        "surface_spec_status": spec.get("spec_status", ""),
        "surface_qa_level": spec.get("qa_level", ""),
        "surface_automation_status": spec.get("automation_status", ""),
        "surface_current_state": spec.get("current_state", ""),
        "surface_next_step": spec.get("next_step", ""),
    }
main function · python · L280-L358 (79 LOC)
04. Resources/automation-scripts/automation/select-focus-session.py
def main() -> int:
    state_path = Path(sys.argv[1])
    duration_minutes = int(sys.argv[2])
    now = datetime.now()
    state = load_state(state_path)
    queue_path = state_path.parent / "task-queue.json"
    agent_state_path = state_path.parent / "agent-state.json"
    next_action_path = state_path.parent / "next-action.md"
    spec_map = load_surface_specs(SURFACE_SPEC_MANIFEST)
    queue = ensure_task_queue(queue_path)
    queue = normalize_queue_tasks(queue, spec_map)
    queue = maybe_append_followup_tasks(queue, spec_map, now)
    queue_path.write_text(json.dumps(queue, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
    order = DEFAULT_ORDER

    queued_task = choose_task_from_queue(queue)
    if queued_task and queued_task.get("section") in SECTION_METADATA:
        section = queued_task["section"]
        metadata = SECTION_METADATA[section]
        current_task_id = state.get("task_id")
        started_at = parse_time(state.get("started_at")) if current_task_id == 
load_state function · python · L64-L71 (8 LOC)
04. Resources/automation-scripts/automation/select-parallel-focus.py
def load_state(path: Path) -> dict:
    if not path.exists():
        return {}
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return {}
    return data if isinstance(data, dict) else {}
choose_next_section function · python · L74-L81 (8 LOC)
04. Resources/automation-scripts/automation/select-parallel-focus.py
def choose_next_section(previous: str | None, exclude: str | None) -> str:
    candidates = [section for section in DEFAULT_ORDER if section != exclude]
    if not candidates:
        return DEFAULT_ORDER[0]
    if previous in candidates:
        index = candidates.index(previous)
        return candidates[(index + 1) % len(candidates)]
    return candidates[0]
load_surface_specs function · python · L84-L96 (13 LOC)
04. Resources/automation-scripts/automation/select-parallel-focus.py
def load_surface_specs(path: Path) -> dict[str, dict]:
    data = load_state(path)
    surfaces = data.get("surfaces") if isinstance(data, dict) else None
    if not isinstance(surfaces, list):
        return {}
    spec_map: dict[str, dict] = {}
    for item in surfaces:
        if not isinstance(item, dict):
            continue
        surface_id = str(item.get("id", "")).strip()
        if surface_id:
            spec_map[surface_id] = item
    return spec_map
build_surface_spec_payload function · python · L99-L110 (12 LOC)
04. Resources/automation-scripts/automation/select-parallel-focus.py
def build_surface_spec_payload(section: str, spec_map: dict[str, dict]) -> dict[str, object]:
    spec = spec_map.get(section, {})
    spec_path = SURFACE_SPEC_MANIFEST.parent / f"{section}.md"
    return {
        "surface_spec_manifest": str(SURFACE_SPEC_MANIFEST),
        "surface_spec_path": str(spec_path) if spec_path.exists() else "",
        "surface_spec_status": spec.get("spec_status", ""),
        "surface_qa_level": spec.get("qa_level", ""),
        "surface_automation_status": spec.get("automation_status", ""),
        "surface_current_state": spec.get("current_state", ""),
        "surface_next_step": spec.get("next_step", ""),
    }
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
main function · python · L113-L140 (28 LOC)
04. Resources/automation-scripts/automation/select-parallel-focus.py
def main() -> int:
    if len(sys.argv) < 3:
        raise SystemExit("usage: select-parallel-focus.py <state-path> <active-focus> [duration-minutes]")

    state_path = Path(sys.argv[1])
    active_focus = sys.argv[2] or None
    duration_minutes = int(sys.argv[3]) if len(sys.argv) > 3 else 45

    now = datetime.now()
    state = load_state(state_path)
    spec_map = load_surface_specs(SURFACE_SPEC_MANIFEST)
    previous_section = state.get("section")
    section = choose_next_section(previous_section, active_focus)
    metadata = SECTION_METADATA[section]

    payload = {
        "section": section,
        "label": metadata["label"],
        "goal": metadata["goal"],
        "target": "85%",
        "started_at": now.strftime("%Y-%m-%d %H:%M:%S"),
        "expires_at": (now + timedelta(minutes=duration_minutes)).strftime("%Y-%m-%d %H:%M:%S"),
        "active_codex_focus": active_focus or "none",
        **build_surface_spec_payload(section, spec_map),
    }
    state_path.write_tex
load_json function · python · L62-L69 (8 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def load_json(path: Path, default: dict) -> dict:
    if not path.exists():
        return json.loads(json.dumps(default))
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return json.loads(json.dumps(default))
    return data if isinstance(data, dict) else json.loads(json.dumps(default))
find_task function · python · L72-L78 (7 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def find_task(queue: dict, task_id: str | None) -> dict | None:
    if not task_id:
        return None
    for task in queue.get("tasks", []):
        if isinstance(task, dict) and task.get("id") == task_id:
            return task
    return None
choose_next_task function · python · L81-L87 (7 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def choose_next_task(queue: dict) -> dict | None:
    for task in queue.get("tasks", []):
        if not isinstance(task, dict):
            continue
        if task.get("status") in {"pending", "queued", "in_progress"}:
            return task
    return None
load_surface_specs function · python · L90-L107 (18 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def load_surface_specs(path: Path) -> dict[str, dict]:
    if not path.exists():
        return {}
    try:
        data = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return {}
    surfaces = data.get("surfaces") if isinstance(data, dict) else None
    if not isinstance(surfaces, list):
        return {}
    spec_map: dict[str, dict] = {}
    for item in surfaces:
        if not isinstance(item, dict):
            continue
        surface_id = str(item.get("id", "")).strip()
        if surface_id:
            spec_map[surface_id] = item
    return spec_map
normalize_queue_tasks function · python · L110-L125 (16 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def normalize_queue_tasks(queue: dict, spec_map: dict[str, dict]) -> dict:
    tasks = queue.get("tasks", [])
    if not isinstance(tasks, list):
        return queue
    valid_sections = UI_SECTIONS | {"automation", "server"}
    for task in tasks:
        if not isinstance(task, dict):
            continue
        source_surface = str(task.get("source_surface", "")).strip()
        task_id = str(task.get("id", "")).strip()
        if source_surface and source_surface in valid_sections:
            task["section"] = source_surface
            continue
        if task_id in spec_map and task_id in valid_sections:
            task["section"] = task_id
    return queue
maybe_append_followup_tasks function · python · L128-L166 (39 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def maybe_append_followup_tasks(queue: dict, spec_map: dict[str, dict], now: str) -> dict:
    tasks = [task for task in queue.get("tasks", []) if isinstance(task, dict)]
    if any(task.get("status") in {"in_progress", "pending", "queued"} for task in tasks):
        return queue

    existing_ids = {str(task.get("id", "")).strip() for task in tasks}
    appended = []

    for surface_id in FOLLOWUP_PRIORITY:
        spec = spec_map.get(surface_id, {})
        if not spec:
            continue
        if str(spec.get("current_state", "")).strip() == "blocked":
            continue
        next_step = str(spec.get("next_step", "")).strip()
        if not next_step:
            continue
        task_id = f"{surface_id}-followup"
        if task_id in existing_ids:
            continue
        section = surface_id
        appended.append(
            {
                "id": task_id,
                "title": f"{surface_id.replace('-', ' ').title()} follow-up",
                "section": s
ensure_agent_state function · python · L169-L184 (16 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def ensure_agent_state(queue: dict, state: dict) -> dict:
    task_state = state.setdefault("tasks", {})
    for task in queue.get("tasks", []):
        if not isinstance(task, dict):
            continue
        task_state.setdefault(
            task["id"],
            {
                "success_streak": 0,
                "blocked_streak": 0,
                "last_status": "pending",
                "last_run_id": None,
            },
        )
    state.setdefault("history", [])
    return state
Powered by Repobility — scan your code at https://repobility.com
derive_result_status function · python · L187-L212 (26 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def derive_result_status(result: dict, task: dict | None) -> str:
    run_state = str(result.get("state", "unknown"))
    exit_code = int(result.get("exit_code", 0) or 0)
    host_qa_status = str(result.get("host_qa_status", "skipped"))
    design_result_status = str(result.get("design_result_status", "skipped"))
    section = str(task.get("section", "")) if task else ""

    if run_state == "success" and exit_code == 0:
        if section in {"automation", "server"}:
            return "pass"
        if section in UI_SECTIONS:
            if design_result_status == "pass":
                return "pass"
            if design_result_status == "blocked":
                return "blocked"
            if host_qa_status in {"pass", "completed", "success"}:
                return "partial"
            return "blocked"
        if host_qa_status in {"pass", "completed", "success", "skipped"}:
            return "pass"
        return "partial"

    if host_qa_status in {"blocked", "failure"} or 
render_next_action function · python · L215-L260 (46 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def render_next_action(queue: dict, state: dict) -> str:
    active_task = find_task(queue, queue.get("active_task_id"))
    pending = [
        task
        for task in queue.get("tasks", [])
        if isinstance(task, dict) and task.get("status") in {"pending", "queued", "in_progress"}
    ]
    next_pending = next(
        (
            task
            for task in pending
            if not active_task or task.get("id") != active_task.get("id")
        ),
        None,
    )
    lines = ["# Next Action", ""]

    if active_task:
        task_state = state.get("tasks", {}).get(active_task.get("id"), {})
        lines.extend(
            [
                f"- active_task_id: {active_task.get('id', 'none')}",
                f"- title: {active_task.get('title', 'none')}",
                f"- section: {active_task.get('section', 'none')}",
                f"- queue_status: {active_task.get('status', 'unknown')}",
                f"- success_streak: {task_state.get('success_streak', 0)
main function · python · L263-L379 (117 LOC)
04. Resources/automation-scripts/automation/update-agent-state.py
def main() -> int:
    if len(sys.argv) != 5:
        raise SystemExit(
            "usage: update-agent-state.py <task-queue.json> <agent-state.json> <next-action.md> <result.json>"
        )

    queue_path = Path(sys.argv[1])
    state_path = Path(sys.argv[2])
    next_action_path = Path(sys.argv[3])
    result_path = Path(sys.argv[4])

    queue = load_json(queue_path, DEFAULT_QUEUE)
    state = ensure_agent_state(queue, load_json(state_path, {}))
    result = load_json(result_path, {})
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    spec_map = load_surface_specs(SURFACE_SPEC_MANIFEST)
    queue = normalize_queue_tasks(queue, spec_map)

    has_result = result_path.exists() and bool(result)

    task = find_task(queue, result.get("task_id")) if has_result else None
    if has_result and not task and queue.get("active_task_id"):
        task = find_task(queue, queue.get("active_task_id"))

    if has_result and task:
        task_state = state["tasks"].setdefault(task["id
load_json function · python · L15-L22 (8 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def load_json(path: Path) -> dict:
    if not path.exists():
        return {}
    try:
        payload = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return {}
    return payload if isinstance(payload, dict) else {}
load_surface_spec function · python · L25-L31 (7 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def load_surface_spec(section: str) -> dict[str, object]:
    payload = load_json(SURFACE_SPEC_MANIFEST)
    surfaces = payload.get("surfaces") if isinstance(payload.get("surfaces"), list) else []
    for item in surfaces:
        if isinstance(item, dict) and str(item.get("id", "")).strip() == section:
            return item
    return {}
parse_summary_status function · python · L34-L41 (8 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def parse_summary_status(path: Path) -> str:
    if not path.exists():
        return "missing"
    for raw_line in path.read_text(encoding="utf-8").splitlines():
        line = raw_line.strip()
        if line.startswith("- status:"):
            return line.split(":", 1)[1].strip() or "missing"
    return "unknown"
collect_manual_reference_info function · python · L48-L64 (17 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def collect_manual_reference_info(manual_ref_root: Path, section: str) -> dict[str, object]:
    catalog_path = manual_ref_root / "catalog.md"
    section_name = normalize_section_name(section)
    section_dir = manual_ref_root / section_name if section_name else manual_ref_root / "__missing__"
    image_paths = sorted(
        path
        for path in section_dir.rglob("*")
        if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp", ".pdf"}
    ) if section_dir.exists() else []
    return {
        "catalog_path": str(catalog_path),
        "catalog_exists": catalog_path.exists(),
        "section_dir": str(section_dir),
        "section_exists": section_dir.exists(),
        "image_count": len(image_paths),
        "sample_images": [str(path) for path in image_paths[:8]],
    }
collect_host_qa_info function · python · L67-L98 (32 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def collect_host_qa_info(host_qa_result_path: Path, host_qa_summary_path: Path, run_dir: Path) -> dict[str, object]:
    payload = load_json(host_qa_result_path)
    host_status = str(payload.get("overall_status", "missing")) if payload else "missing"
    artifacts = payload.get("artifacts", {}) if isinstance(payload.get("artifacts"), dict) else {}
    existing_artifacts: dict[str, str] = {}
    for key, raw_value in artifacts.items():
        value = str(raw_value).strip()
        if not value:
            continue
        candidate = Path(value)
        if not candidate.is_absolute():
            candidate = (run_dir / candidate).resolve()
        if candidate.exists():
            existing_artifacts[str(key)] = str(candidate)
    release_clean_artifacts = {
        key: value
        for key, value in existing_artifacts.items()
        if "/artifacts/manual-review/" in value
    }
    return {
        "result_path": str(host_qa_result_path),
        "result_exists": host_qa_result_p
If a scraper extracted this row, it came from Repobility (https://repobility.com)
collect_figma_bundle_info function · python · L101-L106 (6 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def collect_figma_bundle_info(summary_path: Path) -> dict[str, object]:
    return {
        "summary_path": str(summary_path),
        "summary_exists": summary_path.exists(),
        "status": parse_summary_status(summary_path),
    }
requires_release_clean_capture function · python · L109-L112 (4 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def requires_release_clean_capture(surface_spec: dict[str, object]) -> bool:
    next_step = str(surface_spec.get("next_step", "")).lower()
    blockers = [str(item).lower() for item in surface_spec.get("blockers", []) if item]
    return "release" in next_step or any("release clean capture" in item for item in blockers)
derive_overall_status function · python · L115-L152 (38 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def derive_overall_status(
    section: str,
    surface_spec: dict[str, object],
    manual_info: dict[str, object],
    host_qa_info: dict[str, object],
    figma_info: dict[str, object],
) -> tuple[str, list[str]]:
    notes: list[str] = []
    if section not in UI_SECTIONS:
        notes.append("non-ui section; design gating skipped")
        return "skipped", notes

    references_present = bool(manual_info.get("section_exists")) or bool(figma_info.get("summary_exists"))
    if not references_present:
        notes.append("missing manual section references and figma bundle summary")
        return "blocked", notes

    host_status = str(host_qa_info.get("overall_status", "missing"))
    if host_status == "blocked":
        notes.append("host QA reported blocked")
        return "blocked", notes

    if host_status in PASS_HOST_QA:
        if int(host_qa_info.get("artifact_count", 0) or 0) > 0 or bool(host_qa_info.get("summary_exists")):
            if requires_release_clean_captur
main function · python · L155-L188 (34 LOC)
04. Resources/automation-scripts/automation/write-design-result.py
def main() -> int:
    if len(sys.argv) != 8:
        raise SystemExit(
            "usage: write-design-result.py <focus_section> <run_dir> <host_qa_result.json> <host_qa_summary.md> <figma_bundle_summary.md> <manual_ref_root> <output_path>"
        )

    focus_section = sys.argv[1].strip()
    run_dir = Path(sys.argv[2]).resolve()
    host_qa_result_path = Path(sys.argv[3]) if sys.argv[3] else Path()
    host_qa_summary_path = Path(sys.argv[4]) if sys.argv[4] else Path()
    figma_bundle_summary_path = Path(sys.argv[5]) if sys.argv[5] else Path()
    manual_ref_root = Path(sys.argv[6]).resolve()
    output_path = Path(sys.argv[7]).resolve()

    surface_spec = load_surface_spec(focus_section)
    manual_info = collect_manual_reference_info(manual_ref_root, focus_section)
    host_qa_info = collect_host_qa_info(host_qa_result_path, host_qa_summary_path, run_dir)
    figma_info = collect_figma_bundle_info(figma_bundle_summary_path)
    overall_status, notes = derive_overall_status(foc
load_json function · python · L9-L16 (8 LOC)
04. Resources/automation-scripts/automation/write-docs-note.py
def load_json(path: Path) -> dict:
    if not path.exists():
        return {}
    try:
        payload = json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return {}
    return payload if isinstance(payload, dict) else {}
main function · python · L19-L92 (74 LOC)
04. Resources/automation-scripts/automation/write-docs-note.py
def main() -> int:
    if len(sys.argv) != 6:
        print(
            "usage: write-zzirit-docs-note.py <run_result.json> <run_summary.md> <run_dir> <out_md> <out_json>",
            file=sys.stderr,
        )
        return 2

    run_result_path = Path(sys.argv[1])
    run_summary_path = Path(sys.argv[2])
    run_dir = Path(sys.argv[3])
    out_md = Path(sys.argv[4])
    out_json = Path(sys.argv[5])

    run_result = load_json(run_result_path)
    design_result = load_json(run_dir / "design-result.json")
    host_qa_result = load_json(run_dir / "focus-host-qa-result.json")

    payload = {
        "run_id": run_result.get("run_id") or run_dir.name,
        "state": run_result.get("state", "unknown"),
        "task_id": run_result.get("task_id"),
        "task_title": run_result.get("task_title"),
        "focus_section": run_result.get("focus_section"),
        "focus_label": run_result.get("focus_label"),
        "host_qa_status": run_result.get("host_qa_status", "skipped"),
    
getElementId function · javascript · L22-L24 (3 LOC)
04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
function getElementId(element) {
  return element.ELEMENT || element['element-6066-11e4-a52e-4f735466cecf'];
}
request function · javascript · L26-L40 (15 LOC)
04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
async function request(method, pathname, body) {
  const response = await fetch(`${appiumServerUrl}${pathname}`, {
    method,
    headers: {
      'Content-Type': 'application/json',
    },
    body: body ? JSON.stringify(body) : undefined,
  });

  const payload = await response.json();
  if (!response.ok) {
    throw new Error(`${method} ${pathname} failed: ${JSON.stringify(payload)}`);
  }
  return payload;
}
Repobility analyzer · published findings · https://repobility.com
sleep function · javascript · L42-L44 (3 LOC)
04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
async function sleep(ms) {
  await new Promise((resolve) => setTimeout(resolve, ms));
}
openSimulatorUrl function · javascript · L46-L53 (8 LOC)
04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
async function openSimulatorUrl(url) {
  if (!udid) {
    throw new Error('Simulator UDID is required to open deeplinks during Appium onboarding');
  }

  await execFileAsync('xcrun', ['simctl', 'openurl', udid, url]);
  await sleep(4000);
}
isSignupSource function · javascript · L55-L63 (9 LOC)
04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
function isSignupSource(source) {
  return (
    (source.includes('이메일 주소를') && source.includes('다음')) ||
    (source.includes('이미 계정이 있으신가요? 로그인') &&
      source.includes('다음') &&
      source.includes('XCUIElementTypeTextField') &&
      !source.includes('XCUIElementTypeSecureTextField'))
  );
}
page 1 / 14next ›