Function bodies 672 total
load_json function · python · L10-L17 (8 LOC)04. Resources/automation-scripts/automation/parallel-scheduler.py
def load_json(path: Path) -> dict:
if not path.exists():
return {}
try:
value = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
return value if isinstance(value, dict) else {}parse_status_markdown function · python · L20-L31 (12 LOC)04. Resources/automation-scripts/automation/parallel-scheduler.py
def parse_status_markdown(path: Path) -> dict[str, str]:
values: dict[str, str] = {}
if not path.exists():
return values
for line in path.read_text(encoding="utf-8").splitlines():
if not line.startswith("- "):
continue
if ":" not in line:
continue
key, value = line[2:].split(":", 1)
values[key.strip()] = value.strip()
return valuesparse_timestamp function · python · L34-L40 (7 LOC)04. Resources/automation-scripts/automation/parallel-scheduler.py
def parse_timestamp(value: str) -> int | None:
if not value:
return None
try:
return int(datetime.strptime(value, "%Y-%m-%d %H:%M:%S").timestamp())
except ValueError:
return Nonefile_mtime_bucket function · python · L43-L46 (4 LOC)04. Resources/automation-scripts/automation/parallel-scheduler.py
def file_mtime_bucket(path: Path, cooldown_seconds: int) -> str:
if not path.exists():
return "missing"
return str(int(path.stat().st_mtime // max(cooldown_seconds, 1)))effective_main_state function · python · L53-L72 (20 LOC)04. Resources/automation-scripts/automation/parallel-scheduler.py
def effective_main_state(
worker: dict,
automation_status_file: Path,
automation_status: dict[str, str],
) -> tuple[str, str]:
state = automation_status.get("state", "")
if state != "running":
return state, "state-not-running"
stale_seconds = int(worker.get("main_run_stale_seconds", 7200))
started_at = parse_timestamp(automation_status.get("started_at", ""))
if started_at is not None and now_timestamp() - started_at > stale_seconds:
return "stale-running", "started_at-stale"
if automation_status_file.exists():
age_seconds = now_timestamp() - int(automation_status_file.stat().st_mtime)
if age_seconds > stale_seconds:
return "stale-running", "status-file-stale"
return state, "fresh-running"ready_gemini_review function · python · L75-L115 (41 LOC)04. Resources/automation-scripts/automation/parallel-scheduler.py
def ready_gemini_review(
worker: dict,
artifact_root: Path,
automation_status_file: Path,
automation_status: dict[str, str],
) -> tuple[bool, str]:
cooldown_seconds = int(worker.get("cooldown_seconds", 900))
max_running_seconds = int(worker.get("max_running_seconds", max(cooldown_seconds, 900)))
run_id = automation_status.get("run_id", "")
state, _state_reason = effective_main_state(worker, automation_status_file, automation_status)
control = load_json(artifact_root / worker["key"] / "control.json")
worker_status = parse_status_markdown(artifact_root / worker["key"] / "status.md")
if not run_id or not state:
return False, "missing-main-run"
if state == "running":
return False, "wait-main-complete"
trigger = f"{run_id}:{state}"
worker_state = worker_status.get("state", "")
worker_trigger = worker_status.get("trigger", "")
worker_started_at = parse_timestamp(worker_status.get("started_at", ""))
workerready_gemini_coder function · python · L118-L141 (24 LOC)04. Resources/automation-scripts/automation/parallel-scheduler.py
def ready_gemini_coder(
worker: dict,
artifact_root: Path,
automation_status_file: Path,
automation_status: dict[str, str],
) -> tuple[bool, str]:
cooldown_seconds = int(worker.get("cooldown_seconds", 1800))
control = load_json(artifact_root / worker["key"] / "control.json")
last_finished_at = int(control.get("last_finished_at_ts", 0) or 0)
active_focus = automation_status.get("focus_section", "none")
main_state, _state_reason = effective_main_state(worker, automation_status_file, automation_status)
now_ts = now_timestamp()
if main_state not in {"running", "success"}:
return False, "main-not-active"
if last_finished_at and now_ts - last_finished_at < cooldown_seconds:
return False, "cooldown"
trigger = f"{active_focus}:{now_ts // max(cooldown_seconds, 1)}"
if control.get("last_trigger") == trigger:
return False, "already-ran-bucket"
return True, triggerIf a scraper extracted this row, it came from Repobility (https://repobility.com)
main function · python · L144-L193 (50 LOC)04. Resources/automation-scripts/automation/parallel-scheduler.py
def main() -> int:
if len(sys.argv) < 5:
raise SystemExit(
"usage: parallel-scheduler.py <spec-file> <artifact-root> <automation-status> <max-workers>"
)
spec_file = Path(sys.argv[1])
artifact_root = Path(sys.argv[2])
automation_status_file = Path(sys.argv[3])
max_workers = int(sys.argv[4])
data = json.loads(spec_file.read_text(encoding="utf-8"))
workers = data.get("workers", []) if isinstance(data, dict) else data
automation_status = parse_status_markdown(automation_status_file)
ready_items: list[dict[str, str | int]] = []
for worker in workers:
if not worker.get("enabled", True):
continue
runner = worker.get("runner", "codex")
if runner == "gemini-review":
ready, trigger = ready_gemini_review(
worker,
artifact_root,
automation_status_file,
automation_status,
)
elif runner == "gemini-cload_state function · python · L114-L121 (8 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def load_state(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
return data if isinstance(data, dict) else {}parse_time function · python · L124-L130 (7 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def parse_time(value: str | None) -> datetime | None:
if not value:
return None
try:
return datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
except ValueError:
return Nonechoose_next_section function · python · L133-L137 (5 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def choose_next_section(current: str | None, order: list[str]) -> str:
if not current or current not in order:
return order[0]
index = order.index(current)
return order[(index + 1) % len(order)]parse_bool function · python · L140-L145 (6 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def parse_bool(value: object) -> bool:
if isinstance(value, bool):
return value
if value is None:
return False
return str(value).strip().lower() in {"1", "true", "yes", "y", "on"}ensure_task_queue function · python · L148-L158 (11 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def ensure_task_queue(path: Path) -> dict:
if not path.exists():
path.write_text(json.dumps(DEFAULT_TASK_QUEUE, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
return json.loads(json.dumps(DEFAULT_TASK_QUEUE))
data = load_state(path)
tasks = data.get("tasks") if isinstance(data, dict) else None
if not isinstance(tasks, list) or not tasks:
path.write_text(json.dumps(DEFAULT_TASK_QUEUE, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
return json.loads(json.dumps(DEFAULT_TASK_QUEUE))
return datanormalize_queue_tasks function · python · L161-L177 (17 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def normalize_queue_tasks(queue: dict, spec_map: dict[str, dict]) -> dict:
tasks = queue.get("tasks", [])
if not isinstance(tasks, list):
return queue
valid_sections = set(SECTION_METADATA)
for task in tasks:
if not isinstance(task, dict):
continue
source_surface = str(task.get("source_surface", "")).strip()
task_id = str(task.get("id", "")).strip()
if source_surface and source_surface in valid_sections:
task["section"] = source_surface
continue
if task_id in spec_map and task_id in valid_sections:
task["section"] = task_id
return queuechoose_task_from_queue function · python · L180-L196 (17 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def choose_task_from_queue(queue: dict) -> dict | None:
tasks = [task for task in queue.get("tasks", []) if isinstance(task, dict)]
if not tasks:
return None
active_task_id = queue.get("active_task_id")
if active_task_id:
for task in tasks:
if task.get("id") == active_task_id and task.get("status") in {"in_progress", "pending", "queued"}:
return task
for status in ("in_progress", "pending", "queued"):
for task in tasks:
if task.get("status") == status:
return task
return NoneRepobility analyzer · published findings · https://repobility.com
load_surface_specs function · python · L199-L211 (13 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def load_surface_specs(path: Path) -> dict[str, dict]:
data = load_state(path)
surfaces = data.get("surfaces") if isinstance(data, dict) else None
if not isinstance(surfaces, list):
return {}
spec_map: dict[str, dict] = {}
for item in surfaces:
if not isinstance(item, dict):
continue
surface_id = str(item.get("id", "")).strip()
if surface_id:
spec_map[surface_id] = item
return spec_mapmaybe_append_followup_tasks function · python · L214-L263 (50 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def maybe_append_followup_tasks(queue: dict, spec_map: dict[str, dict], now: datetime) -> dict:
tasks = [task for task in queue.get("tasks", []) if isinstance(task, dict)]
if any(task.get("status") in {"in_progress", "pending", "queued"} for task in tasks):
return queue
existing_ids = {str(task.get("id", "")).strip() for task in tasks}
appended = []
for surface_id in FOLLOWUP_PRIORITY:
spec = spec_map.get(surface_id, {})
if not spec:
continue
if str(spec.get("current_state", "")).strip() == "blocked":
continue
next_step = str(spec.get("next_step", "")).strip()
if not next_step:
continue
task_id = f"{surface_id}-followup"
if task_id in existing_ids:
continue
section = surface_id if surface_id in SECTION_METADATA else surface_id
title = f"{surface_id.replace('-', ' ').title()} follow-up"
appended.append(
{
build_surface_spec_payload function · python · L266-L277 (12 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def build_surface_spec_payload(section: str, spec_map: dict[str, dict]) -> dict[str, object]:
spec = spec_map.get(section, {})
spec_path = SURFACE_SPEC_MANIFEST.parent / f"{section}.md"
return {
"surface_spec_manifest": str(SURFACE_SPEC_MANIFEST),
"surface_spec_path": str(spec_path) if spec_path.exists() else "",
"surface_spec_status": spec.get("spec_status", ""),
"surface_qa_level": spec.get("qa_level", ""),
"surface_automation_status": spec.get("automation_status", ""),
"surface_current_state": spec.get("current_state", ""),
"surface_next_step": spec.get("next_step", ""),
}main function · python · L280-L358 (79 LOC)04. Resources/automation-scripts/automation/select-focus-session.py
def main() -> int:
state_path = Path(sys.argv[1])
duration_minutes = int(sys.argv[2])
now = datetime.now()
state = load_state(state_path)
queue_path = state_path.parent / "task-queue.json"
agent_state_path = state_path.parent / "agent-state.json"
next_action_path = state_path.parent / "next-action.md"
spec_map = load_surface_specs(SURFACE_SPEC_MANIFEST)
queue = ensure_task_queue(queue_path)
queue = normalize_queue_tasks(queue, spec_map)
queue = maybe_append_followup_tasks(queue, spec_map, now)
queue_path.write_text(json.dumps(queue, ensure_ascii=True, indent=2) + "\n", encoding="utf-8")
order = DEFAULT_ORDER
queued_task = choose_task_from_queue(queue)
if queued_task and queued_task.get("section") in SECTION_METADATA:
section = queued_task["section"]
metadata = SECTION_METADATA[section]
current_task_id = state.get("task_id")
started_at = parse_time(state.get("started_at")) if current_task_id == load_state function · python · L64-L71 (8 LOC)04. Resources/automation-scripts/automation/select-parallel-focus.py
def load_state(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
return data if isinstance(data, dict) else {}choose_next_section function · python · L74-L81 (8 LOC)04. Resources/automation-scripts/automation/select-parallel-focus.py
def choose_next_section(previous: str | None, exclude: str | None) -> str:
candidates = [section for section in DEFAULT_ORDER if section != exclude]
if not candidates:
return DEFAULT_ORDER[0]
if previous in candidates:
index = candidates.index(previous)
return candidates[(index + 1) % len(candidates)]
return candidates[0]load_surface_specs function · python · L84-L96 (13 LOC)04. Resources/automation-scripts/automation/select-parallel-focus.py
def load_surface_specs(path: Path) -> dict[str, dict]:
data = load_state(path)
surfaces = data.get("surfaces") if isinstance(data, dict) else None
if not isinstance(surfaces, list):
return {}
spec_map: dict[str, dict] = {}
for item in surfaces:
if not isinstance(item, dict):
continue
surface_id = str(item.get("id", "")).strip()
if surface_id:
spec_map[surface_id] = item
return spec_mapbuild_surface_spec_payload function · python · L99-L110 (12 LOC)04. Resources/automation-scripts/automation/select-parallel-focus.py
def build_surface_spec_payload(section: str, spec_map: dict[str, dict]) -> dict[str, object]:
spec = spec_map.get(section, {})
spec_path = SURFACE_SPEC_MANIFEST.parent / f"{section}.md"
return {
"surface_spec_manifest": str(SURFACE_SPEC_MANIFEST),
"surface_spec_path": str(spec_path) if spec_path.exists() else "",
"surface_spec_status": spec.get("spec_status", ""),
"surface_qa_level": spec.get("qa_level", ""),
"surface_automation_status": spec.get("automation_status", ""),
"surface_current_state": spec.get("current_state", ""),
"surface_next_step": spec.get("next_step", ""),
}Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
main function · python · L113-L140 (28 LOC)04. Resources/automation-scripts/automation/select-parallel-focus.py
def main() -> int:
if len(sys.argv) < 3:
raise SystemExit("usage: select-parallel-focus.py <state-path> <active-focus> [duration-minutes]")
state_path = Path(sys.argv[1])
active_focus = sys.argv[2] or None
duration_minutes = int(sys.argv[3]) if len(sys.argv) > 3 else 45
now = datetime.now()
state = load_state(state_path)
spec_map = load_surface_specs(SURFACE_SPEC_MANIFEST)
previous_section = state.get("section")
section = choose_next_section(previous_section, active_focus)
metadata = SECTION_METADATA[section]
payload = {
"section": section,
"label": metadata["label"],
"goal": metadata["goal"],
"target": "85%",
"started_at": now.strftime("%Y-%m-%d %H:%M:%S"),
"expires_at": (now + timedelta(minutes=duration_minutes)).strftime("%Y-%m-%d %H:%M:%S"),
"active_codex_focus": active_focus or "none",
**build_surface_spec_payload(section, spec_map),
}
state_path.write_texload_json function · python · L62-L69 (8 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def load_json(path: Path, default: dict) -> dict:
if not path.exists():
return json.loads(json.dumps(default))
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return json.loads(json.dumps(default))
return data if isinstance(data, dict) else json.loads(json.dumps(default))find_task function · python · L72-L78 (7 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def find_task(queue: dict, task_id: str | None) -> dict | None:
if not task_id:
return None
for task in queue.get("tasks", []):
if isinstance(task, dict) and task.get("id") == task_id:
return task
return Nonechoose_next_task function · python · L81-L87 (7 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def choose_next_task(queue: dict) -> dict | None:
for task in queue.get("tasks", []):
if not isinstance(task, dict):
continue
if task.get("status") in {"pending", "queued", "in_progress"}:
return task
return Noneload_surface_specs function · python · L90-L107 (18 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def load_surface_specs(path: Path) -> dict[str, dict]:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
surfaces = data.get("surfaces") if isinstance(data, dict) else None
if not isinstance(surfaces, list):
return {}
spec_map: dict[str, dict] = {}
for item in surfaces:
if not isinstance(item, dict):
continue
surface_id = str(item.get("id", "")).strip()
if surface_id:
spec_map[surface_id] = item
return spec_mapnormalize_queue_tasks function · python · L110-L125 (16 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def normalize_queue_tasks(queue: dict, spec_map: dict[str, dict]) -> dict:
tasks = queue.get("tasks", [])
if not isinstance(tasks, list):
return queue
valid_sections = UI_SECTIONS | {"automation", "server"}
for task in tasks:
if not isinstance(task, dict):
continue
source_surface = str(task.get("source_surface", "")).strip()
task_id = str(task.get("id", "")).strip()
if source_surface and source_surface in valid_sections:
task["section"] = source_surface
continue
if task_id in spec_map and task_id in valid_sections:
task["section"] = task_id
return queuemaybe_append_followup_tasks function · python · L128-L166 (39 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def maybe_append_followup_tasks(queue: dict, spec_map: dict[str, dict], now: str) -> dict:
tasks = [task for task in queue.get("tasks", []) if isinstance(task, dict)]
if any(task.get("status") in {"in_progress", "pending", "queued"} for task in tasks):
return queue
existing_ids = {str(task.get("id", "")).strip() for task in tasks}
appended = []
for surface_id in FOLLOWUP_PRIORITY:
spec = spec_map.get(surface_id, {})
if not spec:
continue
if str(spec.get("current_state", "")).strip() == "blocked":
continue
next_step = str(spec.get("next_step", "")).strip()
if not next_step:
continue
task_id = f"{surface_id}-followup"
if task_id in existing_ids:
continue
section = surface_id
appended.append(
{
"id": task_id,
"title": f"{surface_id.replace('-', ' ').title()} follow-up",
"section": sensure_agent_state function · python · L169-L184 (16 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def ensure_agent_state(queue: dict, state: dict) -> dict:
task_state = state.setdefault("tasks", {})
for task in queue.get("tasks", []):
if not isinstance(task, dict):
continue
task_state.setdefault(
task["id"],
{
"success_streak": 0,
"blocked_streak": 0,
"last_status": "pending",
"last_run_id": None,
},
)
state.setdefault("history", [])
return statePowered by Repobility — scan your code at https://repobility.com
derive_result_status function · python · L187-L212 (26 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def derive_result_status(result: dict, task: dict | None) -> str:
run_state = str(result.get("state", "unknown"))
exit_code = int(result.get("exit_code", 0) or 0)
host_qa_status = str(result.get("host_qa_status", "skipped"))
design_result_status = str(result.get("design_result_status", "skipped"))
section = str(task.get("section", "")) if task else ""
if run_state == "success" and exit_code == 0:
if section in {"automation", "server"}:
return "pass"
if section in UI_SECTIONS:
if design_result_status == "pass":
return "pass"
if design_result_status == "blocked":
return "blocked"
if host_qa_status in {"pass", "completed", "success"}:
return "partial"
return "blocked"
if host_qa_status in {"pass", "completed", "success", "skipped"}:
return "pass"
return "partial"
if host_qa_status in {"blocked", "failure"} or render_next_action function · python · L215-L260 (46 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def render_next_action(queue: dict, state: dict) -> str:
active_task = find_task(queue, queue.get("active_task_id"))
pending = [
task
for task in queue.get("tasks", [])
if isinstance(task, dict) and task.get("status") in {"pending", "queued", "in_progress"}
]
next_pending = next(
(
task
for task in pending
if not active_task or task.get("id") != active_task.get("id")
),
None,
)
lines = ["# Next Action", ""]
if active_task:
task_state = state.get("tasks", {}).get(active_task.get("id"), {})
lines.extend(
[
f"- active_task_id: {active_task.get('id', 'none')}",
f"- title: {active_task.get('title', 'none')}",
f"- section: {active_task.get('section', 'none')}",
f"- queue_status: {active_task.get('status', 'unknown')}",
f"- success_streak: {task_state.get('success_streak', 0)main function · python · L263-L379 (117 LOC)04. Resources/automation-scripts/automation/update-agent-state.py
def main() -> int:
if len(sys.argv) != 5:
raise SystemExit(
"usage: update-agent-state.py <task-queue.json> <agent-state.json> <next-action.md> <result.json>"
)
queue_path = Path(sys.argv[1])
state_path = Path(sys.argv[2])
next_action_path = Path(sys.argv[3])
result_path = Path(sys.argv[4])
queue = load_json(queue_path, DEFAULT_QUEUE)
state = ensure_agent_state(queue, load_json(state_path, {}))
result = load_json(result_path, {})
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
spec_map = load_surface_specs(SURFACE_SPEC_MANIFEST)
queue = normalize_queue_tasks(queue, spec_map)
has_result = result_path.exists() and bool(result)
task = find_task(queue, result.get("task_id")) if has_result else None
if has_result and not task and queue.get("active_task_id"):
task = find_task(queue, queue.get("active_task_id"))
if has_result and task:
task_state = state["tasks"].setdefault(task["idload_json function · python · L15-L22 (8 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def load_json(path: Path) -> dict:
if not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
return payload if isinstance(payload, dict) else {}load_surface_spec function · python · L25-L31 (7 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def load_surface_spec(section: str) -> dict[str, object]:
payload = load_json(SURFACE_SPEC_MANIFEST)
surfaces = payload.get("surfaces") if isinstance(payload.get("surfaces"), list) else []
for item in surfaces:
if isinstance(item, dict) and str(item.get("id", "")).strip() == section:
return item
return {}parse_summary_status function · python · L34-L41 (8 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def parse_summary_status(path: Path) -> str:
if not path.exists():
return "missing"
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if line.startswith("- status:"):
return line.split(":", 1)[1].strip() or "missing"
return "unknown"collect_manual_reference_info function · python · L48-L64 (17 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def collect_manual_reference_info(manual_ref_root: Path, section: str) -> dict[str, object]:
catalog_path = manual_ref_root / "catalog.md"
section_name = normalize_section_name(section)
section_dir = manual_ref_root / section_name if section_name else manual_ref_root / "__missing__"
image_paths = sorted(
path
for path in section_dir.rglob("*")
if path.is_file() and path.suffix.lower() in {".png", ".jpg", ".jpeg", ".webp", ".pdf"}
) if section_dir.exists() else []
return {
"catalog_path": str(catalog_path),
"catalog_exists": catalog_path.exists(),
"section_dir": str(section_dir),
"section_exists": section_dir.exists(),
"image_count": len(image_paths),
"sample_images": [str(path) for path in image_paths[:8]],
}collect_host_qa_info function · python · L67-L98 (32 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def collect_host_qa_info(host_qa_result_path: Path, host_qa_summary_path: Path, run_dir: Path) -> dict[str, object]:
payload = load_json(host_qa_result_path)
host_status = str(payload.get("overall_status", "missing")) if payload else "missing"
artifacts = payload.get("artifacts", {}) if isinstance(payload.get("artifacts"), dict) else {}
existing_artifacts: dict[str, str] = {}
for key, raw_value in artifacts.items():
value = str(raw_value).strip()
if not value:
continue
candidate = Path(value)
if not candidate.is_absolute():
candidate = (run_dir / candidate).resolve()
if candidate.exists():
existing_artifacts[str(key)] = str(candidate)
release_clean_artifacts = {
key: value
for key, value in existing_artifacts.items()
if "/artifacts/manual-review/" in value
}
return {
"result_path": str(host_qa_result_path),
"result_exists": host_qa_result_pIf a scraper extracted this row, it came from Repobility (https://repobility.com)
collect_figma_bundle_info function · python · L101-L106 (6 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def collect_figma_bundle_info(summary_path: Path) -> dict[str, object]:
return {
"summary_path": str(summary_path),
"summary_exists": summary_path.exists(),
"status": parse_summary_status(summary_path),
}requires_release_clean_capture function · python · L109-L112 (4 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def requires_release_clean_capture(surface_spec: dict[str, object]) -> bool:
next_step = str(surface_spec.get("next_step", "")).lower()
blockers = [str(item).lower() for item in surface_spec.get("blockers", []) if item]
return "release" in next_step or any("release clean capture" in item for item in blockers)derive_overall_status function · python · L115-L152 (38 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def derive_overall_status(
section: str,
surface_spec: dict[str, object],
manual_info: dict[str, object],
host_qa_info: dict[str, object],
figma_info: dict[str, object],
) -> tuple[str, list[str]]:
notes: list[str] = []
if section not in UI_SECTIONS:
notes.append("non-ui section; design gating skipped")
return "skipped", notes
references_present = bool(manual_info.get("section_exists")) or bool(figma_info.get("summary_exists"))
if not references_present:
notes.append("missing manual section references and figma bundle summary")
return "blocked", notes
host_status = str(host_qa_info.get("overall_status", "missing"))
if host_status == "blocked":
notes.append("host QA reported blocked")
return "blocked", notes
if host_status in PASS_HOST_QA:
if int(host_qa_info.get("artifact_count", 0) or 0) > 0 or bool(host_qa_info.get("summary_exists")):
if requires_release_clean_capturmain function · python · L155-L188 (34 LOC)04. Resources/automation-scripts/automation/write-design-result.py
def main() -> int:
if len(sys.argv) != 8:
raise SystemExit(
"usage: write-design-result.py <focus_section> <run_dir> <host_qa_result.json> <host_qa_summary.md> <figma_bundle_summary.md> <manual_ref_root> <output_path>"
)
focus_section = sys.argv[1].strip()
run_dir = Path(sys.argv[2]).resolve()
host_qa_result_path = Path(sys.argv[3]) if sys.argv[3] else Path()
host_qa_summary_path = Path(sys.argv[4]) if sys.argv[4] else Path()
figma_bundle_summary_path = Path(sys.argv[5]) if sys.argv[5] else Path()
manual_ref_root = Path(sys.argv[6]).resolve()
output_path = Path(sys.argv[7]).resolve()
surface_spec = load_surface_spec(focus_section)
manual_info = collect_manual_reference_info(manual_ref_root, focus_section)
host_qa_info = collect_host_qa_info(host_qa_result_path, host_qa_summary_path, run_dir)
figma_info = collect_figma_bundle_info(figma_bundle_summary_path)
overall_status, notes = derive_overall_status(focload_json function · python · L9-L16 (8 LOC)04. Resources/automation-scripts/automation/write-docs-note.py
def load_json(path: Path) -> dict:
if not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
return payload if isinstance(payload, dict) else {}main function · python · L19-L92 (74 LOC)04. Resources/automation-scripts/automation/write-docs-note.py
def main() -> int:
if len(sys.argv) != 6:
print(
"usage: write-zzirit-docs-note.py <run_result.json> <run_summary.md> <run_dir> <out_md> <out_json>",
file=sys.stderr,
)
return 2
run_result_path = Path(sys.argv[1])
run_summary_path = Path(sys.argv[2])
run_dir = Path(sys.argv[3])
out_md = Path(sys.argv[4])
out_json = Path(sys.argv[5])
run_result = load_json(run_result_path)
design_result = load_json(run_dir / "design-result.json")
host_qa_result = load_json(run_dir / "focus-host-qa-result.json")
payload = {
"run_id": run_result.get("run_id") or run_dir.name,
"state": run_result.get("state", "unknown"),
"task_id": run_result.get("task_id"),
"task_title": run_result.get("task_title"),
"focus_section": run_result.get("focus_section"),
"focus_label": run_result.get("focus_label"),
"host_qa_status": run_result.get("host_qa_status", "skipped"),
getElementId function · javascript · L22-L24 (3 LOC)04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
function getElementId(element) {
return element.ELEMENT || element['element-6066-11e4-a52e-4f735466cecf'];
}request function · javascript · L26-L40 (15 LOC)04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
async function request(method, pathname, body) {
const response = await fetch(`${appiumServerUrl}${pathname}`, {
method,
headers: {
'Content-Type': 'application/json',
},
body: body ? JSON.stringify(body) : undefined,
});
const payload = await response.json();
if (!response.ok) {
throw new Error(`${method} ${pathname} failed: ${JSON.stringify(payload)}`);
}
return payload;
}Repobility analyzer · published findings · https://repobility.com
sleep function · javascript · L42-L44 (3 LOC)04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
async function sleep(ms) {
await new Promise((resolve) => setTimeout(resolve, ms));
}openSimulatorUrl function · javascript · L46-L53 (8 LOC)04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
async function openSimulatorUrl(url) {
if (!udid) {
throw new Error('Simulator UDID is required to open deeplinks during Appium onboarding');
}
await execFileAsync('xcrun', ['simctl', 'openurl', udid, url]);
await sleep(4000);
}isSignupSource function · javascript · L55-L63 (9 LOC)04. Resources/automation-scripts/e2e/ios-appium-onboarding.mjs
function isSignupSource(source) {
return (
(source.includes('이메일 주소를') && source.includes('다음')) ||
(source.includes('이미 계정이 있으신가요? 로그인') &&
source.includes('다음') &&
source.includes('XCUIElementTypeTextField') &&
!source.includes('XCUIElementTypeSecureTextField'))
);
}page 1 / 14next ›