← back to koboriakira__screen-times

Function bodies 51 total

All specs Real LLM only Function bodies
Colors class · python · L21-L26 (6 LOC)
src/screen_times/cli.py
class Colors:
    RED = "\033[0;31m"
    GREEN = "\033[0;32m"
    YELLOW = "\033[1;33m"
    BLUE = "\033[0;34m"
    NC = "\033[0m"  # No Color
log_info function · python · L29-L31 (3 LOC)
src/screen_times/cli.py
def log_info(message: str):
    """情報メッセージを出力"""
    print(f"{Colors.GREEN}[INFO]{Colors.NC} {message}")
log_warn function · python · L34-L36 (3 LOC)
src/screen_times/cli.py
def log_warn(message: str):
    """警告メッセージを出力"""
    print(f"{Colors.YELLOW}[WARN]{Colors.NC} {message}")
log_error function · python · L39-L41 (3 LOC)
src/screen_times/cli.py
def log_error(message: str):
    """エラーメッセージを出力"""
    print(f"{Colors.RED}[ERROR]{Colors.NC} {message}", file=sys.stderr)
get_project_root function · python · L44-L65 (22 LOC)
src/screen_times/cli.py
def get_project_root() -> Path:
    """プロジェクトルートディレクトリを取得

    開発環境: src/screen_times/cli.py -> プロジェクトルート
    インストール済み: site-packages/screen_times/cli.py -> ホームディレクトリにフォールバック
    """
    # パッケージのディレクトリ
    package_dir = Path(__file__).parent.absolute()

    # 開発環境かチェック(src/screen_times/cli.pyの場合)
    if package_dir.parent.name == "src":
        # 開発環境: src/screen_times -> src -> プロジェクトルート
        return package_dir.parent.parent

    # インストール済み環境: ホームディレクトリ配下のプロジェクトを探す
    # フォールバック: カレントディレクトリまたはホームディレクトリ
    cwd = Path.cwd()
    if (cwd / ".venv").exists() and (cwd / "pyproject.toml").exists():
        return cwd

    # 最終フォールバック
    return Path.home() / "git" / "screen-times"
get_plist_path function · python · L68-L70 (3 LOC)
src/screen_times/cli.py
def get_plist_path() -> Path:
    """plistファイルのパスを取得"""
    return Path.home() / "Library" / "LaunchAgents" / "com.screenocr.logger.plist"
get_launchd_label function · python · L73-L75 (3 LOC)
src/screen_times/cli.py
def get_launchd_label() -> str:
    """launchdラベルを取得"""
    return "com.screenocr.logger"
If a scraper extracted this row, it came from Repobility (https://repobility.com)
check_launchd_status function · python · L78-L84 (7 LOC)
src/screen_times/cli.py
def check_launchd_status() -> bool:
    """launchdエージェントが実行中かチェック"""
    try:
        result = subprocess.run(["launchctl", "list"], capture_output=True, text=True, check=True)
        return get_launchd_label() in result.stdout
    except subprocess.CalledProcessError:
        return False
start_agent function · python · L87-L160 (74 LOC)
src/screen_times/cli.py
def start_agent():
    """launchdエージェントを開始"""
    log_info("ScreenOCR Logger を起動します...")

    project_root = get_project_root()
    plist_template = project_root / "config" / "com.screenocr.logger.plist"
    plist_dest = get_plist_path()
    main_script = project_root / "src" / "screen_times" / "screenshot_ocr.py"
    python_path = project_root / ".venv" / "bin" / "python"

    # 前提条件チェック
    if not plist_template.exists():
        log_error(f"plistテンプレートが見つかりません: {plist_template}")
        sys.exit(1)

    if not main_script.exists():
        log_error(f"メインスクリプトが見つかりません: {main_script}")
        sys.exit(1)

    if not python_path.exists():
        log_error(f"Pythonの仮想環境が見つかりません: {python_path}")
        log_info("まず 'pipenv install' を実行してください")
        sys.exit(1)

    # LaunchAgentsディレクトリを作成
    plist_dest.parent.mkdir(parents=True, exist_ok=True)

    # 既存のエージェントをアンロード
    if check_launchd_status():
        log_warn("既存のエージェントを停止します...")
        try:
            subprocess.run(
   
stop_agent function · python · L163-L185 (23 LOC)
src/screen_times/cli.py
def stop_agent():
    """launchdエージェントを停止"""
    log_info("ScreenOCR Logger を停止します...")

    plist_dest = get_plist_path()

    if not plist_dest.exists():
        log_warn("plistファイルが見つかりません。エージェントは登録されていません。")
        return

    if not check_launchd_status():
        log_warn("エージェントは実行されていません。")
        return

    # エージェントをアンロード
    try:
        subprocess.run(
            ["launchctl", "unload", str(plist_dest)], check=True, capture_output=True, text=True
        )
        log_info("✓ ScreenOCR Logger を停止しました")
    except subprocess.CalledProcessError as e:
        log_error(f"エージェントの停止に失敗しました: {e.stderr}")
        sys.exit(1)
split_task function · python · L188-L251 (64 LOC)
src/screen_times/cli.py
def split_task(description: Optional[str] = None, clear: bool = False):
    """タスク別にJSONLファイルを分割"""
    try:
        # エージェントが停止している場合は自動起動
        if not check_launchd_status():
            log_warn("エージェントが停止しています。自動的に起動します...")
            try:
                start_agent()
                print()  # 空行を追加して読みやすく
            except SystemExit:
                # start_agent()がsys.exit()を呼ぶ場合があるので捕捉
                log_error("エージェントの起動に失敗しました。タスク分割を中断します。")
                sys.exit(1)

        # JSONLマネージャーの初期化
        jsonl_manager = JsonlManager()

        # --clear オプションまたは説明なしの場合は日付ベースに戻す
        if clear or not description:
            jsonl_manager._clear_current_task_file()
            timestamp = datetime.now()
            effective_date = jsonl_manager.get_effective_date(timestamp)
            current_path = jsonl_manager.get_current_jsonl_path(timestamp)
            log_info(f"日付ベースのファイルに戻しました: {current_path}")
            print(f"  実効日付: {effective_date.strftime('%Y-%m-%d')
dry_run function · python · L254-L322 (69 LOC)
src/screen_times/cli.py
def dry_run(merge_threshold: Optional[float] = None):
    """dry-run実行(5秒待機後にOCR処理を実行し、結果を標準出力)

    Args:
        merge_threshold: マージのしきい値(0.0~1.0)
    """
    import json
    from .screen_ocr_logger import ScreenOCRLogger, ScreenOCRConfig

    log_info("Dry-runモードで実行します")
    if merge_threshold is not None:
        log_info(f"マージしきい値: {merge_threshold}")
    print()
    print("5秒後にスクリーンショットとOCR処理を実行します...")
    print("任意のウィンドウをアクティブにしてお待ちください。")
    print()

    # カウントダウン
    for i in range(5, 0, -1):
        print(f"  {i}...", flush=True)
        import time

        time.sleep(1)

    print("\n実行中...\n")

    # dry-runモードで実行
    config = ScreenOCRConfig(dry_run=True, verbose=True, merge_threshold=merge_threshold)
    logger = ScreenOCRLogger(config)
    result = logger.run()

    # 結果を表示
    print("\n" + "=" * 60)
    print("DRY-RUN 実行結果")
    print("=" * 60)

    if result.success:
        # JSON形式で表示するデータを構築
        output_data = {
            "timestamp": result.timestamp.isofo
show_status function · python · L325-L369 (45 LOC)
src/screen_times/cli.py
def show_status():
    """現在の状態を表示"""
    log_info("=== ScreenOCR Logger ステータス ===")
    print()

    # launchdの状態
    is_running = check_launchd_status()
    status_color = Colors.GREEN if is_running else Colors.YELLOW
    status_text = "実行中" if is_running else "停止中"
    print(f"  launchdエージェント: {status_color}{status_text}{Colors.NC}")

    # plistファイルの存在
    plist_dest = get_plist_path()
    plist_exists = plist_dest.exists()
    print(f"  plistファイル: {'存在' if plist_exists else '未作成'}")
    if plist_exists:
        print(f"    -> {plist_dest}")

    # ログディレクトリ
    jsonl_manager = JsonlManager()
    log_dir = jsonl_manager.logs_dir
    if log_dir.exists():
        log_files = list(log_dir.glob("*.jsonl"))
        print(f"  ログファイル: {len(log_files)} 個")
        print(f"    -> {log_dir}")

        # 今日のログファイル
        timestamp = datetime.now()
        current_path = jsonl_manager.get_current_jsonl_path(timestamp)
        if current_path.exists():
            size_kb = current_path.stat().
main function · python · L372-L439 (68 LOC)
src/screen_times/cli.py
def main():
    """メイン処理"""
    parser = argparse.ArgumentParser(
        description="ScreenOCR Logger の統合管理ツール",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
使用例:
  screenocr start                 # エージェントを開始
  screenocr stop                  # エージェントを停止
  screenocr split "新機能の実装"   # タスクを分割
  screenocr split --clear         # 日付ベースに戻す
  screenocr status                # 現在の状態を表示
  screenocr dry-run               # テスト実行(JSONLに保存せず結果表示)
        """,
    )

    subparsers = parser.add_subparsers(dest="command", help="実行するコマンド")

    # start コマンド
    subparsers.add_parser("start", help="launchdエージェントを開始")

    # stop コマンド
    subparsers.add_parser("stop", help="launchdエージェントを停止")

    # split コマンド
    split_parser = subparsers.add_parser("split", help="タスク別にJSONLファイルを分割")
    split_parser.add_argument(
        "description", nargs="?", help="タスクの説明(例: '〇〇機能の実装作業')"
    )
    split_parser.add_argument(
        "--clear",
        action="store_true",
 
get_default_logs_dir function · python · L23-L39 (17 LOC)
src/screen_times/jsonl_manager.py
def get_default_logs_dir() -> Path:
    """デフォルトのログディレクトリパスを取得

    環境変数 OBSIDIAN_VAULT_PATH が設定されていればそれを使用し、
    未設定時は macOS の Obsidian Vault デフォルトパスを使用する。
    ユーザー名は getpass.getuser() で自動取得する。

    Returns:
        logs_dir の Path
    """
    vault_path_str = os.environ.get("OBSIDIAN_VAULT_PATH")
    if vault_path_str:
        vault_path = Path(vault_path_str)
    else:
        vault_path = DEFAULT_VAULT_PATH
    username = getpass.getuser()
    return vault_path / "screenocr_logs" / username
Same scanner, your repo: https://repobility.com — Repobility
JsonlManager class · python · L42-L347 (306 LOC)
src/screen_times/jsonl_manager.py
class JsonlManager:
    """JSONLファイルの管理を行うクラス"""

    # ファイルサイズの上限(100KB = 約50Kトークン)
    MAX_FILE_SIZE_BYTES = 100 * 1024  # 100KB

    def __init__(self, base_dir: Optional[Path] = None, merge_threshold: Optional[float] = None):
        """
        初期化

        Args:
            base_dir: JSONLファイルを保存するベースディレクトリ
                     Noneの場合は get_default_logs_dir() を使用
            merge_threshold: 類似レコードをマージするしきい値(0.0~1.0)
                           Noneの場合はマージを行わない
        """
        if base_dir is None:
            self.logs_dir = get_default_logs_dir()
        else:
            self.logs_dir = base_dir / "screenocr_logs"
        self.logs_dir.mkdir(parents=True, exist_ok=True)
        self.state_file = self.logs_dir / ".current_jsonl"
        self.merge_threshold = merge_threshold
        self.merger: Optional[RecordMerger] = None
        if merge_threshold is not None:
            self.merger = RecordMerger(threshold=merge_threshold)

    def get_effective_date(self, timestamp: da
__init__ method · python · L48-L67 (20 LOC)
src/screen_times/jsonl_manager.py
    def __init__(self, base_dir: Optional[Path] = None, merge_threshold: Optional[float] = None):
        """
        初期化

        Args:
            base_dir: JSONLファイルを保存するベースディレクトリ
                     Noneの場合は get_default_logs_dir() を使用
            merge_threshold: 類似レコードをマージするしきい値(0.0~1.0)
                           Noneの場合はマージを行わない
        """
        if base_dir is None:
            self.logs_dir = get_default_logs_dir()
        else:
            self.logs_dir = base_dir / "screenocr_logs"
        self.logs_dir.mkdir(parents=True, exist_ok=True)
        self.state_file = self.logs_dir / ".current_jsonl"
        self.merge_threshold = merge_threshold
        self.merger: Optional[RecordMerger] = None
        if merge_threshold is not None:
            self.merger = RecordMerger(threshold=merge_threshold)
get_effective_date method · python · L69-L90 (22 LOC)
src/screen_times/jsonl_manager.py
    def get_effective_date(self, timestamp: datetime) -> datetime:
        """
        朝5時を基準とした実効日付を取得

        5時より前の時刻は前日として扱う。
        例: 2025-12-28 04:59 → 2025-12-27
            2025-12-28 05:00 → 2025-12-28

        Args:
            timestamp: 判定対象のタイムスタンプ

        Returns:
            実効日付(datetime)
        """
        if timestamp.hour < 5:
            # 5時より前なら前日とする
            return (timestamp - timedelta(days=1)).replace(
                hour=0, minute=0, second=0, microsecond=0
            )
        else:
            # 5時以降は当日
            return timestamp.replace(hour=0, minute=0, second=0, microsecond=0)
get_jsonl_path method · python · L92-L127 (36 LOC)
src/screen_times/jsonl_manager.py
    def get_jsonl_path(
        self,
        timestamp: Optional[datetime] = None,
        task_id: Optional[str] = None,
        include_time: bool = False,
    ) -> Path:
        """
        JSONLファイルのパスを取得

        Args:
            timestamp: タイムスタンプ(Noneの場合は現在時刻)
            task_id: タスクID(手動分割時に指定)
            include_time: Trueの場合、ファイル名に時刻を含める(サイズベース分割用)

        Returns:
            JSONLファイルのPath
        """
        if timestamp is None:
            timestamp = datetime.now()

        effective_date = self.get_effective_date(timestamp)
        date_str = effective_date.strftime("%Y-%m-%d")

        if task_id:
            # 手動分割: 日付 + タスクID + タイムスタンプ
            time_str = timestamp.strftime("%H%M%S")
            filename = f"{date_str}_{task_id}_{time_str}.jsonl"
        elif include_time:
            # サイズベース分割: 日付 + 時刻
            time_str = timestamp.strftime("%H%M%S")
            filename = f"{date_str}_{time_str}.jsonl"
        else:
            # 自動分割: 日付のみ(下位互換性のため残す)
write_metadata method · python · L129-L161 (33 LOC)
src/screen_times/jsonl_manager.py
    def write_metadata(
        self, filepath: Path, description: str, timestamp: Optional[datetime] = None
    ) -> None:
        """
        メタデータをJSONLファイルの1行目に書き込む

        Args:
            filepath: JSONLファイルのパス
            description: タスクの説明
            timestamp: タイムスタンプ(Noneの場合は現在時刻)
        """
        if timestamp is None:
            timestamp = datetime.now()

        metadata = {
            "type": "task_metadata",
            "timestamp": timestamp.isoformat(),
            "description": description,
            "effective_date": self.get_effective_date(timestamp).strftime("%Y-%m-%d"),
        }

        # ファイルが存在する場合は既存の内容を読み込む
        existing_lines = []
        if filepath.exists():
            with open(filepath, "r", encoding="utf-8") as f:
                existing_lines = f.readlines()

        # メタデータを先頭に書き込み、その後に既存の内容を追加
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(metadata, f, ensure_ascii=False)
            f.write("\n")
   
append_record method · python · L163-L236 (74 LOC)
src/screen_times/jsonl_manager.py
    def append_record(
        self, filepath: Path, timestamp: datetime, window: str, text: str, status: str = "normal"
    ) -> Path:
        """
        レコードをJSONLファイルに追記

        マージが有効な場合は、類似レコードを自動的にマージする。
        レコード追記後にファイルサイズが上限(100KB)を超えた場合は、次回から新しいファイルを使用する。

        Args:
            filepath: JSONLファイルのパス
            timestamp: タイムスタンプ
            window: ウィンドウ名
            text: OCRテキスト
            status: 状態("normal", "sleep", "error"など)

        Returns:
            実際に書き込んだファイルのPath(常に現在のファイルパスを返す)
        """
        # レコードサイズの警告(10KB以上の場合)
        record_size = len(text.encode("utf-8"))
        if record_size > 10 * 1024:  # 10KB
            import sys

            print(
                f"Warning: Large record detected ({record_size / 1024:.1f} KB) "
                f"in window '{window}'. This may cause frequent file splits.",
                file=sys.stderr,
            )

        record = {
            "timestamp": timestamp.isoformat(),
            "window": wi
_write_record method · python · L238-L248 (11 LOC)
src/screen_times/jsonl_manager.py
    def _write_record(self, filepath: Path, record: dict) -> None:
        """
        レコードをJSONLファイルに書き込む

        Args:
            filepath: JSONLファイルのパス
            record: 書き込むレコード
        """
        with open(filepath, "a", encoding="utf-8") as f:
            json.dump(record, f, ensure_ascii=False)
            f.write("\n")
flush_merger method · python · L250-L262 (13 LOC)
src/screen_times/jsonl_manager.py
    def flush_merger(self, filepath: Path) -> None:
        """
        マージャーのバッファをフラッシュして書き込む

        プログラム終了時などに呼び出す。

        Args:
            filepath: JSONLファイルのパス
        """
        if self.merger:
            buffered_record = self.merger.flush()
            if buffered_record:
                self._write_record(filepath, buffered_record)
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
get_current_jsonl_path method · python · L264-L310 (47 LOC)
src/screen_times/jsonl_manager.py
    def get_current_jsonl_path(self, timestamp: Optional[datetime] = None) -> Path:
        """
        現在使用すべきJSONLファイルのパスを取得

        手動分割で設定されたタスクファイルがある場合はそれを使用し、
        日付が変わっていたら自動的に日付ベースのファイルに切り替える。
        同じ日付のファイルが複数ある場合は、最新のファイルを返す。

        Args:
            timestamp: タイムスタンプ(Noneの場合は現在時刻)

        Returns:
            JSONLファイルのPath
        """
        if timestamp is None:
            timestamp = datetime.now()

        current_effective_date = self.get_effective_date(timestamp)

        # 状態ファイルから現在のタスクファイル情報を取得
        task_file_info = self._get_current_task_file()

        if task_file_info:
            task_file_path = Path(task_file_info["path"])
            task_effective_date_str = task_file_info.get("effective_date")

            # タスクファイルが存在し、かつ日付が変わっていない場合はそのファイルを使用
            if (
                task_file_path.exists()
                and task_effective_date_str == current_effective_date.strftime("%Y-%m-%d")
            ):
                return task_file_p
_get_current_task_file method · python · L312-L327 (16 LOC)
src/screen_times/jsonl_manager.py
    def _get_current_task_file(self) -> Optional[dict]:
        """
        状態ファイルから現在のタスクファイル情報を取得

        Returns:
            タスクファイル情報の辞書、または None
        """
        if not self.state_file.exists():
            return None

        try:
            with open(self.state_file, "r", encoding="utf-8") as f:
                data = json.load(f)
                return data if isinstance(data, dict) else None
        except (json.JSONDecodeError, IOError):
            return None
_set_current_task_file method · python · L329-L340 (12 LOC)
src/screen_times/jsonl_manager.py
    def _set_current_task_file(self, filepath: Path, effective_date: str) -> None:
        """
        状態ファイルに現在のタスクファイル情報を保存

        Args:
            filepath: タスクファイルのパス
            effective_date: 実効日付(YYYY-MM-DD形式)
        """
        task_info = {"path": str(filepath), "effective_date": effective_date}

        with open(self.state_file, "w", encoding="utf-8") as f:
            json.dump(task_info, f, ensure_ascii=False)
_clear_current_task_file method · python · L342-L347 (6 LOC)
src/screen_times/jsonl_manager.py
    def _clear_current_task_file(self) -> None:
        """
        状態ファイルをクリア(日付ベースのファイルに戻る)
        """
        if self.state_file.exists():
            self.state_file.unlink()
TimeoutError class · python · L13-L16 (4 LOC)
src/screen_times/ocr.py
class TimeoutError(Exception):
    """タイムアウトエラー"""

    pass
timeout_handler function · python · L19-L21 (3 LOC)
src/screen_times/ocr.py
def timeout_handler(signum, frame):
    """タイムアウトハンドラ"""
    raise TimeoutError("OCR processing timeout")
perform_ocr function · python · L24-L108 (85 LOC)
src/screen_times/ocr.py
def perform_ocr(image_path: Path, timeout_seconds: int = 5) -> str:
    """
    Vision FrameworkでOCR処理を実行

    Args:
        image_path: 画像ファイルのパス
        timeout_seconds: タイムアウト時間(秒)

    Returns:
        認識されたテキスト
    """
    # pyobjc imports (遅延インポート)
    try:
        from Cocoa import NSURL
        from Quartz import CGImageSourceCreateWithURL, CGImageSourceCreateImageAtIndex
        from Vision import (
            VNImageRequestHandler,
            VNRecognizeTextRequest,
            VNRequestTextRecognitionLevelAccurate,
        )
    except ImportError as import_error:
        print(f"Error: pyobjc frameworks not found: {import_error}", file=sys.stderr)
        print("Install with: pip install -r requirements.txt", file=sys.stderr)
        return ""

    # タイムアウト設定
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(timeout_seconds)

    try:
        # 画像URLを作成
        url = NSURL.fileURLWithPath_(str(image_path))

        # CGImageを読み込み
        image_source = C
should_merge function · python · L13-L46 (34 LOC)
src/screen_times/record_merger.py
def should_merge(prev: Dict[str, Any], curr: Dict[str, Any], threshold: float = 0.90) -> bool:
    """
    マージすべきかどうかの判定

    以下の両方を満たす場合にマージ対象とする:
    1. window が同一であること
    2. テキスト類似度が指定のしきい値以上であること

    Args:
        prev: 前のレコード
        curr: 現在のレコード
        threshold: 類似度のしきい値(0.0~1.0)

    Returns:
        マージすべき場合はTrue
    """
    # windowが異なる場合はマージしない
    if prev.get("window") != curr.get("window"):
        return False

    prev_text = prev.get("text", "")
    curr_text = curr.get("text", "")

    # 両方とも空の場合もマージする
    if not prev_text and not curr_text:
        return True

    # どちらか一方だけが空の場合はマージしない
    if not prev_text or not curr_text:
        return False

    # テキスト類似度を計算(0-100の範囲なので100で割る)
    similarity: float = fuzz.ratio(prev_text, curr_text) / 100.0
    return similarity >= threshold
Repobility · MCP-ready · https://repobility.com
merge_records function · python · L49-L77 (29 LOC)
src/screen_times/record_merger.py
def merge_records(prev: Dict[str, Any], curr: Dict[str, Any]) -> Dict[str, Any]:
    """
    2つのレコードをマージ

    マージ時は以下のルールで統合する:
    - timestamp: 最初のレコードの値を保持
    - timestamp_end: 最後のレコードのtimestampを設定
    - window: 保持
    - text: 最初のレコードの値を保持
    - text_length: 最初のレコードの値を保持
    - merged_count: マージされたレコード数(既存の値に+1)

    Args:
        prev: マージ先のレコード
        curr: マージ元のレコード

    Returns:
        マージされたレコード
    """
    # prevをベースにコピー
    merged = prev.copy()

    # timestamp_endを更新
    merged["timestamp_end"] = curr["timestamp"]

    # merged_countを更新(prevに既にある場合は+1、ない場合は2)
    merged["merged_count"] = prev.get("merged_count", 1) + 1

    return merged
RecordMerger class · python · L80-L135 (56 LOC)
src/screen_times/record_merger.py
class RecordMerger:
    """
    レコードのマージを管理するクラス

    連続するレコードをバッファリングして、類似度に基づいてマージする。
    """

    def __init__(self, threshold: float = 0.90):
        """
        初期化

        Args:
            threshold: 類似度のしきい値(0.0~1.0、デフォルト0.90)
        """
        self.threshold = threshold
        self.buffer: Optional[Dict[str, Any]] = None

    def add_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """
        レコードを追加

        前回のレコードとマージすべきかを判定し、マージしない場合は
        前回のレコードを返す。マージする場合はNoneを返す。

        Args:
            record: 追加するレコード

        Returns:
            出力すべきレコード(マージした場合はNone)
        """
        # 最初のレコードの場合はバッファに保存
        if self.buffer is None:
            self.buffer = record
            return None

        # マージすべきか判定
        if should_merge(self.buffer, record, self.threshold):
            # マージしてバッファを更新
            self.buffer = merge_records(self.buffer, record)
            return None
        else:
            # マージしない場合は、バッファを出力して新しいレコードを保存
 
__init__ method · python · L87-L95 (9 LOC)
src/screen_times/record_merger.py
    def __init__(self, threshold: float = 0.90):
        """
        初期化

        Args:
            threshold: 類似度のしきい値(0.0~1.0、デフォルト0.90)
        """
        self.threshold = threshold
        self.buffer: Optional[Dict[str, Any]] = None
add_record method · python · L97-L124 (28 LOC)
src/screen_times/record_merger.py
    def add_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """
        レコードを追加

        前回のレコードとマージすべきかを判定し、マージしない場合は
        前回のレコードを返す。マージする場合はNoneを返す。

        Args:
            record: 追加するレコード

        Returns:
            出力すべきレコード(マージした場合はNone)
        """
        # 最初のレコードの場合はバッファに保存
        if self.buffer is None:
            self.buffer = record
            return None

        # マージすべきか判定
        if should_merge(self.buffer, record, self.threshold):
            # マージしてバッファを更新
            self.buffer = merge_records(self.buffer, record)
            return None
        else:
            # マージしない場合は、バッファを出力して新しいレコードを保存
            output = self.buffer
            self.buffer = record
            return output
flush method · python · L126-L135 (10 LOC)
src/screen_times/record_merger.py
    def flush(self) -> Optional[Dict[str, Any]]:
        """
        バッファに残っているレコードを取得

        Returns:
            バッファに残っているレコード(ない場合はNone)
        """
        output = self.buffer
        self.buffer = None
        return output
ScreenOCRConfig class · python · L21-L29 (9 LOC)
src/screen_times/screen_ocr_logger.py
class ScreenOCRConfig:
    """ScreenOCRの設定"""

    screenshot_dir: Path = Path("/tmp/screen-times")
    timeout_seconds: int = 30
    screenshot_retention_hours: int = 72
    verbose: bool = False
    dry_run: bool = False
    merge_threshold: Optional[float] = None
ScreenOCRResult class · python · L33-L55 (23 LOC)
src/screen_times/screen_ocr_logger.py
class ScreenOCRResult:
    """ScreenOCR実行結果"""

    success: bool
    timestamp: datetime
    window_name: str
    screenshot_path: Optional[Path]
    text: str
    text_length: int
    jsonl_path: Optional[Path]
    status: str = "normal"
    error: Optional[str] = None

    def __str__(self) -> str:
        """結果の文字列表現"""
        if self.success:
            return (
                f"Success: {self.window_name} | "
                f"{self.text_length} chars | "
                f"Saved to {self.jsonl_path}"
            )
        else:
            return f"Failed: {self.error}"
__str__ method · python · L46-L55 (10 LOC)
src/screen_times/screen_ocr_logger.py
    def __str__(self) -> str:
        """結果の文字列表現"""
        if self.success:
            return (
                f"Success: {self.window_name} | "
                f"{self.text_length} chars | "
                f"Saved to {self.jsonl_path}"
            )
        else:
            return f"Failed: {self.error}"
If a scraper extracted this row, it came from Repobility (https://repobility.com)
ScreenOCRLogger class · python · L58-L293 (236 LOC)
src/screen_times/screen_ocr_logger.py
class ScreenOCRLogger:
    """
    ScreenOCRシステムのファサード

    複雑な一連の処理(スクリーンショット取得、OCR、ログ記録)を
    単一のシンプルなインターフェースで提供する。

    使用例:
        >>> logger = ScreenOCRLogger()
        >>> result = logger.run()
        >>> print(result)
        Success: Chrome | 1234 chars | Saved to ~/.screenocr_logs/2025-12-28.jsonl

        >>> # カスタム設定で使用
        >>> config = ScreenOCRConfig(
        ...     screenshot_dir=Path("/custom/path"),
        ...     timeout_seconds=10,
        ...     verbose=True
        ... )
        >>> logger = ScreenOCRLogger(config)
        >>> result = logger.run()
    """

    def __init__(self, config: Optional[ScreenOCRConfig] = None):
        """
        初期化

        Args:
            config: 設定オブジェクト(Noneの場合はデフォルト設定)
        """
        self.config = config or ScreenOCRConfig()
        self.jsonl_manager = JsonlManager(merge_threshold=self.config.merge_threshold)
        # スリープ状態検出用の状態
        self._last_screenshot_size: Optional[int] = None
        self._consecutive_
__init__ method · python · L81-L92 (12 LOC)
src/screen_times/screen_ocr_logger.py
    def __init__(self, config: Optional[ScreenOCRConfig] = None):
        """
        初期化

        Args:
            config: 設定オブジェクト(Noneの場合はデフォルト設定)
        """
        self.config = config or ScreenOCRConfig()
        self.jsonl_manager = JsonlManager(merge_threshold=self.config.merge_threshold)
        # スリープ状態検出用の状態
        self._last_screenshot_size: Optional[int] = None
        self._consecutive_empty_count: int = 0
run method · python · L94-L172 (79 LOC)
src/screen_times/screen_ocr_logger.py
    def run(self) -> ScreenOCRResult:
        """
        メイン処理を実行

        スクリーンショット取得 → OCR → JSONL保存の一連の処理を実行する。

        Returns:
            実行結果(ScreenOCRResult)
        """
        timestamp = datetime.now()
        window_name = "Unknown"
        screenshot_path = None
        text = ""
        jsonl_path = None
        error = None

        try:
            # 1. アクティブウィンドウ取得
            window_name, window_bounds = get_active_window()
            if self.config.verbose:
                print(f"Active window: {window_name}")
                if window_bounds:
                    print(f"Window bounds: {window_bounds}")

            # 2. スクリーンショット取得
            screenshot_path = take_screenshot(self.config.screenshot_dir, window_bounds)
            if self.config.verbose:
                print(f"Screenshot saved: {screenshot_path}")

            # 3. OCR処理
            text = perform_ocr(screenshot_path, self.config.timeout_seconds)
            if self.config.verbose:
            
cleanup method · python · L174-L214 (41 LOC)
src/screen_times/screen_ocr_logger.py
    def cleanup(self) -> int:
        """
        古いスクリーンショットを削除

        設定で指定された保持期間を超えたスクリーンショットファイルを削除する。

        Returns:
            削除したファイル数
        """
        try:
            cutoff_time = time.time() - (self.config.screenshot_retention_hours * 3600)
            pattern = "screenshot_*.png"
            deleted_count = 0

            # ディレクトリが存在しない場合は0を返す
            if not self.config.screenshot_dir.exists():
                return 0

            for screenshot in self.config.screenshot_dir.glob(pattern):
                try:
                    # ファイルの最終更新時刻を確認
                    if screenshot.stat().st_mtime < cutoff_time:
                        screenshot.unlink()
                        deleted_count += 1
                except Exception as file_error:
                    # 個別のファイル削除エラーは無視して続行
                    if self.config.verbose:
                        print(
                            f"Warning: Failed to delete {screenshot}: {file_error}", file=sys.stderr
 
_detect_sleep_state method · python · L216-L263 (48 LOC)
src/screen_times/screen_ocr_logger.py
    def _detect_sleep_state(self, text: str, screenshot_path: Path) -> str:
        """
        スリープ/ロック状態を検出する

        以下の条件でスリープ状態を判定:
        - テキストが空(text_length = 0)
        - スクリーンショットのファイルサイズが前回と同じ
        - 連続して空のテキストが3回以上記録される

        Args:
            text: OCRで抽出されたテキスト
            screenshot_path: スクリーンショットファイルのパス

        Returns:
            状態を表す文字列("normal", "sleep", "lock")
        """
        # テキストが空でない場合は通常状態
        if text.strip():
            self._consecutive_empty_count = 0
            self._last_screenshot_size = None
            return "normal"

        # テキストが空の場合、連続カウントを増加
        self._consecutive_empty_count += 1

        # スクリーンショットのファイルサイズを確認
        try:
            current_size = screenshot_path.stat().st_size
        except (OSError, FileNotFoundError):
            # ファイルアクセスエラーの場合は通常状態として扱う
            return "normal"

        # 初回の場合はサイズを記録して通常状態とする
        if self._last_screenshot_size is None:
            self._last_screenshot_size = current_si
_save_to_jsonl method · python · L265-L293 (29 LOC)
src/screen_times/screen_ocr_logger.py
    def _save_to_jsonl(
        self, timestamp: datetime, window: str, text: str, status: str = "normal"
    ) -> Path:
        """
        JSONL形式でログを保存(日付ベースで自動分割)

        Args:
            timestamp: タイムスタンプ
            window: ウィンドウ名
            text: OCRテキスト
            status: 状態("normal", "sleep", "error"など)

        Returns:
            保存先のJSONLファイルパス

        Raises:
            Exception: JSONL保存に失敗した場合
        """
        try:
            jsonl_path = self.jsonl_manager.get_current_jsonl_path(timestamp)
            # append_recordは実際に書き込んだファイルパスを返す(サイズ超過時は新ファイル)
            actual_path = self.jsonl_manager.append_record(
                jsonl_path, timestamp, window, text, status
            )
            return actual_path
        except Exception as e:
            if self.config.verbose:
                print(f"Error: Failed to write to JSONL: {e}", file=sys.stderr)
            raise
main function · python · L296-L306 (11 LOC)
src/screen_times/screen_ocr_logger.py
def main():
    """モジュールとして実行された時のエントリーポイント"""
    logger = ScreenOCRLogger()
    result = logger.run()

    # マージャーをフラッシュ(バッファに残っているレコードを書き込む)
    # これは最後の実行時に必要だが、定期実行では次の実行で処理されるため問題ない
    # 念のため、明示的にフラッシュする

    if not result.success:
        sys.exit(1)
main function · python · L23-L52 (30 LOC)
src/screen_times/screenshot_ocr.py
def main():
    """メイン処理"""
    try:
        # 設定を準備
        config = ScreenOCRConfig(
            screenshot_dir=SCREENSHOT_DIR,
            timeout_seconds=TIMEOUT_SECONDS,
            screenshot_retention_hours=SCREENSHOT_RETENTION_HOURS,
            verbose=True,  # 詳細ログを出力
        )

        # ファサードを初期化
        logger = ScreenOCRLogger(config)

        # メイン処理を実行
        result = logger.run()

        # 結果を表示
        if result.success:
            print(f"Screenshot will be kept for {SCREENSHOT_RETENTION_HOURS} hours")
        else:
            print(f"Fatal error: {result.error}", file=sys.stderr)
            sys.exit(1)

        # 古いスクリーンショットをクリーンアップ
        logger.cleanup()

    except Exception as main_error:
        print(f"Fatal error: {main_error}", file=sys.stderr)
        sys.exit(1)
Same scanner, your repo: https://repobility.com — Repobility
get_active_window function · python · L15-L86 (72 LOC)
src/screen_times/screenshot.py
def get_active_window() -> tuple[str, Optional[tuple[int, int, int, int]]]:
    """
    AppleScript経由でアクティブウィンドウ名と位置を取得

    Returns:
        (アプリケーション名, ウィンドウ位置 (x, y, width, height) または None)
    """
    script_path = Path(__file__).parent / "resources" / "screenshot_window.applescript"

    try:
        result = subprocess.run(
            ["osascript", str(script_path)], capture_output=True, text=True, timeout=3, check=True
        )
        app_name = result.stdout.strip() or "Unknown"

        # PyObjCでウィンドウ情報を取得
        try:
            from Quartz import (
                CGWindowListCopyWindowInfo,
                kCGWindowListOptionOnScreenOnly,
                kCGNullWindowID,
            )

            # 画面上の全ウィンドウ情報を取得
            window_list = CGWindowListCopyWindowInfo(
                kCGWindowListOptionOnScreenOnly, kCGNullWindowID
            )

            # アクティブなアプリのウィンドウを探す
            # 正規化して比較用の文字列を作成
            normalized_app_name = app_name.lower().replace("-
take_screenshot function · python · L89-L129 (41 LOC)
src/screen_times/screenshot.py
def take_screenshot(
    screenshot_dir: Path, window_bounds: Optional[tuple[int, int, int, int]] = None
) -> Path:
    """
    スクリーンショットを取得

    Args:
        screenshot_dir: スクリーンショット保存先ディレクトリ
        window_bounds: ウィンドウの位置とサイズ (x, y, w, h)。Noneの場合は画面全体

    Returns:
        スクリーンショットのパス
    """
    # ディレクトリが存在しない場合は作成
    screenshot_dir.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    screenshot_path = screenshot_dir / f"screenshot_{timestamp}.png"

    try:
        if window_bounds:
            x, y, w, h = window_bounds
            # -R オプションで特定の領域をキャプチャ
            cmd = ["screencapture", "-x", "-R", f"{x},{y},{w},{h}", str(screenshot_path)]
        else:
            # 画面全体をキャプチャ(フォールバック)
            cmd = ["screencapture", "-x", str(screenshot_path)]

        subprocess.run(cmd, check=True, timeout=5)

        if not screenshot_path.exists():
            raise FileNotFoundError(f"Screenshot was not created: {screenshot_path}")

  
generate_task_id function · python · L17-L32 (16 LOC)
src/screen_times/split_jsonl.py
def generate_task_id(description: str) -> str:
    """
    タスク説明からタスクIDを生成

    Args:
        description: タスクの説明

    Returns:
        タスクID(英数字とハイフン)
    """
    # 簡易的な実装: 最初の20文字を使用し、スペースをハイフンに変換
    task_id = description[:20].replace(" ", "-").replace(" ", "-")
    # 使用できない文字を除去
    allowed_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
    task_id = "".join(c for c in task_id if c in allowed_chars)
    return task_id or "task"
page 1 / 2next ›