Function bodies 51 total
Colors class · python · L21-L26 (6 LOC)src/screen_times/cli.py
class Colors:
RED = "\033[0;31m"
GREEN = "\033[0;32m"
YELLOW = "\033[1;33m"
BLUE = "\033[0;34m"
NC = "\033[0m" # No Colorlog_info function · python · L29-L31 (3 LOC)src/screen_times/cli.py
def log_info(message: str):
"""情報メッセージを出力"""
print(f"{Colors.GREEN}[INFO]{Colors.NC} {message}")log_warn function · python · L34-L36 (3 LOC)src/screen_times/cli.py
def log_warn(message: str):
"""警告メッセージを出力"""
print(f"{Colors.YELLOW}[WARN]{Colors.NC} {message}")log_error function · python · L39-L41 (3 LOC)src/screen_times/cli.py
def log_error(message: str):
"""エラーメッセージを出力"""
print(f"{Colors.RED}[ERROR]{Colors.NC} {message}", file=sys.stderr)get_project_root function · python · L44-L65 (22 LOC)src/screen_times/cli.py
def get_project_root() -> Path:
"""プロジェクトルートディレクトリを取得
開発環境: src/screen_times/cli.py -> プロジェクトルート
インストール済み: site-packages/screen_times/cli.py -> ホームディレクトリにフォールバック
"""
# パッケージのディレクトリ
package_dir = Path(__file__).parent.absolute()
# 開発環境かチェック(src/screen_times/cli.pyの場合)
if package_dir.parent.name == "src":
# 開発環境: src/screen_times -> src -> プロジェクトルート
return package_dir.parent.parent
# インストール済み環境: ホームディレクトリ配下のプロジェクトを探す
# フォールバック: カレントディレクトリまたはホームディレクトリ
cwd = Path.cwd()
if (cwd / ".venv").exists() and (cwd / "pyproject.toml").exists():
return cwd
# 最終フォールバック
return Path.home() / "git" / "screen-times"get_plist_path function · python · L68-L70 (3 LOC)src/screen_times/cli.py
def get_plist_path() -> Path:
"""plistファイルのパスを取得"""
return Path.home() / "Library" / "LaunchAgents" / "com.screenocr.logger.plist"get_launchd_label function · python · L73-L75 (3 LOC)src/screen_times/cli.py
def get_launchd_label() -> str:
"""launchdラベルを取得"""
return "com.screenocr.logger"If a scraper extracted this row, it came from Repobility (https://repobility.com)
check_launchd_status function · python · L78-L84 (7 LOC)src/screen_times/cli.py
def check_launchd_status() -> bool:
"""launchdエージェントが実行中かチェック"""
try:
result = subprocess.run(["launchctl", "list"], capture_output=True, text=True, check=True)
return get_launchd_label() in result.stdout
except subprocess.CalledProcessError:
return Falsestart_agent function · python · L87-L160 (74 LOC)src/screen_times/cli.py
def start_agent():
"""launchdエージェントを開始"""
log_info("ScreenOCR Logger を起動します...")
project_root = get_project_root()
plist_template = project_root / "config" / "com.screenocr.logger.plist"
plist_dest = get_plist_path()
main_script = project_root / "src" / "screen_times" / "screenshot_ocr.py"
python_path = project_root / ".venv" / "bin" / "python"
# 前提条件チェック
if not plist_template.exists():
log_error(f"plistテンプレートが見つかりません: {plist_template}")
sys.exit(1)
if not main_script.exists():
log_error(f"メインスクリプトが見つかりません: {main_script}")
sys.exit(1)
if not python_path.exists():
log_error(f"Pythonの仮想環境が見つかりません: {python_path}")
log_info("まず 'pipenv install' を実行してください")
sys.exit(1)
# LaunchAgentsディレクトリを作成
plist_dest.parent.mkdir(parents=True, exist_ok=True)
# 既存のエージェントをアンロード
if check_launchd_status():
log_warn("既存のエージェントを停止します...")
try:
subprocess.run(
stop_agent function · python · L163-L185 (23 LOC)src/screen_times/cli.py
def stop_agent():
"""launchdエージェントを停止"""
log_info("ScreenOCR Logger を停止します...")
plist_dest = get_plist_path()
if not plist_dest.exists():
log_warn("plistファイルが見つかりません。エージェントは登録されていません。")
return
if not check_launchd_status():
log_warn("エージェントは実行されていません。")
return
# エージェントをアンロード
try:
subprocess.run(
["launchctl", "unload", str(plist_dest)], check=True, capture_output=True, text=True
)
log_info("✓ ScreenOCR Logger を停止しました")
except subprocess.CalledProcessError as e:
log_error(f"エージェントの停止に失敗しました: {e.stderr}")
sys.exit(1)split_task function · python · L188-L251 (64 LOC)src/screen_times/cli.py
def split_task(description: Optional[str] = None, clear: bool = False):
"""タスク別にJSONLファイルを分割"""
try:
# エージェントが停止している場合は自動起動
if not check_launchd_status():
log_warn("エージェントが停止しています。自動的に起動します...")
try:
start_agent()
print() # 空行を追加して読みやすく
except SystemExit:
# start_agent()がsys.exit()を呼ぶ場合があるので捕捉
log_error("エージェントの起動に失敗しました。タスク分割を中断します。")
sys.exit(1)
# JSONLマネージャーの初期化
jsonl_manager = JsonlManager()
# --clear オプションまたは説明なしの場合は日付ベースに戻す
if clear or not description:
jsonl_manager._clear_current_task_file()
timestamp = datetime.now()
effective_date = jsonl_manager.get_effective_date(timestamp)
current_path = jsonl_manager.get_current_jsonl_path(timestamp)
log_info(f"日付ベースのファイルに戻しました: {current_path}")
print(f" 実効日付: {effective_date.strftime('%Y-%m-%d')dry_run function · python · L254-L322 (69 LOC)src/screen_times/cli.py
def dry_run(merge_threshold: Optional[float] = None):
"""dry-run実行(5秒待機後にOCR処理を実行し、結果を標準出力)
Args:
merge_threshold: マージのしきい値(0.0~1.0)
"""
import json
from .screen_ocr_logger import ScreenOCRLogger, ScreenOCRConfig
log_info("Dry-runモードで実行します")
if merge_threshold is not None:
log_info(f"マージしきい値: {merge_threshold}")
print()
print("5秒後にスクリーンショットとOCR処理を実行します...")
print("任意のウィンドウをアクティブにしてお待ちください。")
print()
# カウントダウン
for i in range(5, 0, -1):
print(f" {i}...", flush=True)
import time
time.sleep(1)
print("\n実行中...\n")
# dry-runモードで実行
config = ScreenOCRConfig(dry_run=True, verbose=True, merge_threshold=merge_threshold)
logger = ScreenOCRLogger(config)
result = logger.run()
# 結果を表示
print("\n" + "=" * 60)
print("DRY-RUN 実行結果")
print("=" * 60)
if result.success:
# JSON形式で表示するデータを構築
output_data = {
"timestamp": result.timestamp.isofoshow_status function · python · L325-L369 (45 LOC)src/screen_times/cli.py
def show_status():
"""現在の状態を表示"""
log_info("=== ScreenOCR Logger ステータス ===")
print()
# launchdの状態
is_running = check_launchd_status()
status_color = Colors.GREEN if is_running else Colors.YELLOW
status_text = "実行中" if is_running else "停止中"
print(f" launchdエージェント: {status_color}{status_text}{Colors.NC}")
# plistファイルの存在
plist_dest = get_plist_path()
plist_exists = plist_dest.exists()
print(f" plistファイル: {'存在' if plist_exists else '未作成'}")
if plist_exists:
print(f" -> {plist_dest}")
# ログディレクトリ
jsonl_manager = JsonlManager()
log_dir = jsonl_manager.logs_dir
if log_dir.exists():
log_files = list(log_dir.glob("*.jsonl"))
print(f" ログファイル: {len(log_files)} 個")
print(f" -> {log_dir}")
# 今日のログファイル
timestamp = datetime.now()
current_path = jsonl_manager.get_current_jsonl_path(timestamp)
if current_path.exists():
size_kb = current_path.stat().main function · python · L372-L439 (68 LOC)src/screen_times/cli.py
def main():
"""メイン処理"""
parser = argparse.ArgumentParser(
description="ScreenOCR Logger の統合管理ツール",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
使用例:
screenocr start # エージェントを開始
screenocr stop # エージェントを停止
screenocr split "新機能の実装" # タスクを分割
screenocr split --clear # 日付ベースに戻す
screenocr status # 現在の状態を表示
screenocr dry-run # テスト実行(JSONLに保存せず結果表示)
""",
)
subparsers = parser.add_subparsers(dest="command", help="実行するコマンド")
# start コマンド
subparsers.add_parser("start", help="launchdエージェントを開始")
# stop コマンド
subparsers.add_parser("stop", help="launchdエージェントを停止")
# split コマンド
split_parser = subparsers.add_parser("split", help="タスク別にJSONLファイルを分割")
split_parser.add_argument(
"description", nargs="?", help="タスクの説明(例: '〇〇機能の実装作業')"
)
split_parser.add_argument(
"--clear",
action="store_true",
get_default_logs_dir function · python · L23-L39 (17 LOC)src/screen_times/jsonl_manager.py
def get_default_logs_dir() -> Path:
"""デフォルトのログディレクトリパスを取得
環境変数 OBSIDIAN_VAULT_PATH が設定されていればそれを使用し、
未設定時は macOS の Obsidian Vault デフォルトパスを使用する。
ユーザー名は getpass.getuser() で自動取得する。
Returns:
logs_dir の Path
"""
vault_path_str = os.environ.get("OBSIDIAN_VAULT_PATH")
if vault_path_str:
vault_path = Path(vault_path_str)
else:
vault_path = DEFAULT_VAULT_PATH
username = getpass.getuser()
return vault_path / "screenocr_logs" / usernameSame scanner, your repo: https://repobility.com — Repobility
JsonlManager class · python · L42-L347 (306 LOC)src/screen_times/jsonl_manager.py
class JsonlManager:
"""JSONLファイルの管理を行うクラス"""
# ファイルサイズの上限(100KB = 約50Kトークン)
MAX_FILE_SIZE_BYTES = 100 * 1024 # 100KB
def __init__(self, base_dir: Optional[Path] = None, merge_threshold: Optional[float] = None):
"""
初期化
Args:
base_dir: JSONLファイルを保存するベースディレクトリ
Noneの場合は get_default_logs_dir() を使用
merge_threshold: 類似レコードをマージするしきい値(0.0~1.0)
Noneの場合はマージを行わない
"""
if base_dir is None:
self.logs_dir = get_default_logs_dir()
else:
self.logs_dir = base_dir / "screenocr_logs"
self.logs_dir.mkdir(parents=True, exist_ok=True)
self.state_file = self.logs_dir / ".current_jsonl"
self.merge_threshold = merge_threshold
self.merger: Optional[RecordMerger] = None
if merge_threshold is not None:
self.merger = RecordMerger(threshold=merge_threshold)
def get_effective_date(self, timestamp: da__init__ method · python · L48-L67 (20 LOC)src/screen_times/jsonl_manager.py
def __init__(self, base_dir: Optional[Path] = None, merge_threshold: Optional[float] = None):
"""
初期化
Args:
base_dir: JSONLファイルを保存するベースディレクトリ
Noneの場合は get_default_logs_dir() を使用
merge_threshold: 類似レコードをマージするしきい値(0.0~1.0)
Noneの場合はマージを行わない
"""
if base_dir is None:
self.logs_dir = get_default_logs_dir()
else:
self.logs_dir = base_dir / "screenocr_logs"
self.logs_dir.mkdir(parents=True, exist_ok=True)
self.state_file = self.logs_dir / ".current_jsonl"
self.merge_threshold = merge_threshold
self.merger: Optional[RecordMerger] = None
if merge_threshold is not None:
self.merger = RecordMerger(threshold=merge_threshold)get_effective_date method · python · L69-L90 (22 LOC)src/screen_times/jsonl_manager.py
def get_effective_date(self, timestamp: datetime) -> datetime:
"""
朝5時を基準とした実効日付を取得
5時より前の時刻は前日として扱う。
例: 2025-12-28 04:59 → 2025-12-27
2025-12-28 05:00 → 2025-12-28
Args:
timestamp: 判定対象のタイムスタンプ
Returns:
実効日付(datetime)
"""
if timestamp.hour < 5:
# 5時より前なら前日とする
return (timestamp - timedelta(days=1)).replace(
hour=0, minute=0, second=0, microsecond=0
)
else:
# 5時以降は当日
return timestamp.replace(hour=0, minute=0, second=0, microsecond=0)get_jsonl_path method · python · L92-L127 (36 LOC)src/screen_times/jsonl_manager.py
def get_jsonl_path(
self,
timestamp: Optional[datetime] = None,
task_id: Optional[str] = None,
include_time: bool = False,
) -> Path:
"""
JSONLファイルのパスを取得
Args:
timestamp: タイムスタンプ(Noneの場合は現在時刻)
task_id: タスクID(手動分割時に指定)
include_time: Trueの場合、ファイル名に時刻を含める(サイズベース分割用)
Returns:
JSONLファイルのPath
"""
if timestamp is None:
timestamp = datetime.now()
effective_date = self.get_effective_date(timestamp)
date_str = effective_date.strftime("%Y-%m-%d")
if task_id:
# 手動分割: 日付 + タスクID + タイムスタンプ
time_str = timestamp.strftime("%H%M%S")
filename = f"{date_str}_{task_id}_{time_str}.jsonl"
elif include_time:
# サイズベース分割: 日付 + 時刻
time_str = timestamp.strftime("%H%M%S")
filename = f"{date_str}_{time_str}.jsonl"
else:
# 自動分割: 日付のみ(下位互換性のため残す)write_metadata method · python · L129-L161 (33 LOC)src/screen_times/jsonl_manager.py
def write_metadata(
self, filepath: Path, description: str, timestamp: Optional[datetime] = None
) -> None:
"""
メタデータをJSONLファイルの1行目に書き込む
Args:
filepath: JSONLファイルのパス
description: タスクの説明
timestamp: タイムスタンプ(Noneの場合は現在時刻)
"""
if timestamp is None:
timestamp = datetime.now()
metadata = {
"type": "task_metadata",
"timestamp": timestamp.isoformat(),
"description": description,
"effective_date": self.get_effective_date(timestamp).strftime("%Y-%m-%d"),
}
# ファイルが存在する場合は既存の内容を読み込む
existing_lines = []
if filepath.exists():
with open(filepath, "r", encoding="utf-8") as f:
existing_lines = f.readlines()
# メタデータを先頭に書き込み、その後に既存の内容を追加
with open(filepath, "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False)
f.write("\n")
append_record method · python · L163-L236 (74 LOC)src/screen_times/jsonl_manager.py
def append_record(
self, filepath: Path, timestamp: datetime, window: str, text: str, status: str = "normal"
) -> Path:
"""
レコードをJSONLファイルに追記
マージが有効な場合は、類似レコードを自動的にマージする。
レコード追記後にファイルサイズが上限(100KB)を超えた場合は、次回から新しいファイルを使用する。
Args:
filepath: JSONLファイルのパス
timestamp: タイムスタンプ
window: ウィンドウ名
text: OCRテキスト
status: 状態("normal", "sleep", "error"など)
Returns:
実際に書き込んだファイルのPath(常に現在のファイルパスを返す)
"""
# レコードサイズの警告(10KB以上の場合)
record_size = len(text.encode("utf-8"))
if record_size > 10 * 1024: # 10KB
import sys
print(
f"Warning: Large record detected ({record_size / 1024:.1f} KB) "
f"in window '{window}'. This may cause frequent file splits.",
file=sys.stderr,
)
record = {
"timestamp": timestamp.isoformat(),
"window": wi_write_record method · python · L238-L248 (11 LOC)src/screen_times/jsonl_manager.py
def _write_record(self, filepath: Path, record: dict) -> None:
"""
レコードをJSONLファイルに書き込む
Args:
filepath: JSONLファイルのパス
record: 書き込むレコード
"""
with open(filepath, "a", encoding="utf-8") as f:
json.dump(record, f, ensure_ascii=False)
f.write("\n")flush_merger method · python · L250-L262 (13 LOC)src/screen_times/jsonl_manager.py
def flush_merger(self, filepath: Path) -> None:
"""
マージャーのバッファをフラッシュして書き込む
プログラム終了時などに呼び出す。
Args:
filepath: JSONLファイルのパス
"""
if self.merger:
buffered_record = self.merger.flush()
if buffered_record:
self._write_record(filepath, buffered_record)Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
get_current_jsonl_path method · python · L264-L310 (47 LOC)src/screen_times/jsonl_manager.py
def get_current_jsonl_path(self, timestamp: Optional[datetime] = None) -> Path:
"""
現在使用すべきJSONLファイルのパスを取得
手動分割で設定されたタスクファイルがある場合はそれを使用し、
日付が変わっていたら自動的に日付ベースのファイルに切り替える。
同じ日付のファイルが複数ある場合は、最新のファイルを返す。
Args:
timestamp: タイムスタンプ(Noneの場合は現在時刻)
Returns:
JSONLファイルのPath
"""
if timestamp is None:
timestamp = datetime.now()
current_effective_date = self.get_effective_date(timestamp)
# 状態ファイルから現在のタスクファイル情報を取得
task_file_info = self._get_current_task_file()
if task_file_info:
task_file_path = Path(task_file_info["path"])
task_effective_date_str = task_file_info.get("effective_date")
# タスクファイルが存在し、かつ日付が変わっていない場合はそのファイルを使用
if (
task_file_path.exists()
and task_effective_date_str == current_effective_date.strftime("%Y-%m-%d")
):
return task_file_p_get_current_task_file method · python · L312-L327 (16 LOC)src/screen_times/jsonl_manager.py
def _get_current_task_file(self) -> Optional[dict]:
"""
状態ファイルから現在のタスクファイル情報を取得
Returns:
タスクファイル情報の辞書、または None
"""
if not self.state_file.exists():
return None
try:
with open(self.state_file, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else None
except (json.JSONDecodeError, IOError):
return None_set_current_task_file method · python · L329-L340 (12 LOC)src/screen_times/jsonl_manager.py
def _set_current_task_file(self, filepath: Path, effective_date: str) -> None:
"""
状態ファイルに現在のタスクファイル情報を保存
Args:
filepath: タスクファイルのパス
effective_date: 実効日付(YYYY-MM-DD形式)
"""
task_info = {"path": str(filepath), "effective_date": effective_date}
with open(self.state_file, "w", encoding="utf-8") as f:
json.dump(task_info, f, ensure_ascii=False)_clear_current_task_file method · python · L342-L347 (6 LOC)src/screen_times/jsonl_manager.py
def _clear_current_task_file(self) -> None:
"""
状態ファイルをクリア(日付ベースのファイルに戻る)
"""
if self.state_file.exists():
self.state_file.unlink()TimeoutError class · python · L13-L16 (4 LOC)src/screen_times/ocr.py
class TimeoutError(Exception):
"""タイムアウトエラー"""
passtimeout_handler function · python · L19-L21 (3 LOC)src/screen_times/ocr.py
def timeout_handler(signum, frame):
"""タイムアウトハンドラ"""
raise TimeoutError("OCR processing timeout")perform_ocr function · python · L24-L108 (85 LOC)src/screen_times/ocr.py
def perform_ocr(image_path: Path, timeout_seconds: int = 5) -> str:
"""
Vision FrameworkでOCR処理を実行
Args:
image_path: 画像ファイルのパス
timeout_seconds: タイムアウト時間(秒)
Returns:
認識されたテキスト
"""
# pyobjc imports (遅延インポート)
try:
from Cocoa import NSURL
from Quartz import CGImageSourceCreateWithURL, CGImageSourceCreateImageAtIndex
from Vision import (
VNImageRequestHandler,
VNRecognizeTextRequest,
VNRequestTextRecognitionLevelAccurate,
)
except ImportError as import_error:
print(f"Error: pyobjc frameworks not found: {import_error}", file=sys.stderr)
print("Install with: pip install -r requirements.txt", file=sys.stderr)
return ""
# タイムアウト設定
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(timeout_seconds)
try:
# 画像URLを作成
url = NSURL.fileURLWithPath_(str(image_path))
# CGImageを読み込み
image_source = Cshould_merge function · python · L13-L46 (34 LOC)src/screen_times/record_merger.py
def should_merge(prev: Dict[str, Any], curr: Dict[str, Any], threshold: float = 0.90) -> bool:
"""
マージすべきかどうかの判定
以下の両方を満たす場合にマージ対象とする:
1. window が同一であること
2. テキスト類似度が指定のしきい値以上であること
Args:
prev: 前のレコード
curr: 現在のレコード
threshold: 類似度のしきい値(0.0~1.0)
Returns:
マージすべき場合はTrue
"""
# windowが異なる場合はマージしない
if prev.get("window") != curr.get("window"):
return False
prev_text = prev.get("text", "")
curr_text = curr.get("text", "")
# 両方とも空の場合もマージする
if not prev_text and not curr_text:
return True
# どちらか一方だけが空の場合はマージしない
if not prev_text or not curr_text:
return False
# テキスト類似度を計算(0-100の範囲なので100で割る)
similarity: float = fuzz.ratio(prev_text, curr_text) / 100.0
return similarity >= thresholdRepobility · MCP-ready · https://repobility.com
merge_records function · python · L49-L77 (29 LOC)src/screen_times/record_merger.py
def merge_records(prev: Dict[str, Any], curr: Dict[str, Any]) -> Dict[str, Any]:
"""
2つのレコードをマージ
マージ時は以下のルールで統合する:
- timestamp: 最初のレコードの値を保持
- timestamp_end: 最後のレコードのtimestampを設定
- window: 保持
- text: 最初のレコードの値を保持
- text_length: 最初のレコードの値を保持
- merged_count: マージされたレコード数(既存の値に+1)
Args:
prev: マージ先のレコード
curr: マージ元のレコード
Returns:
マージされたレコード
"""
# prevをベースにコピー
merged = prev.copy()
# timestamp_endを更新
merged["timestamp_end"] = curr["timestamp"]
# merged_countを更新(prevに既にある場合は+1、ない場合は2)
merged["merged_count"] = prev.get("merged_count", 1) + 1
return mergedRecordMerger class · python · L80-L135 (56 LOC)src/screen_times/record_merger.py
class RecordMerger:
"""
レコードのマージを管理するクラス
連続するレコードをバッファリングして、類似度に基づいてマージする。
"""
def __init__(self, threshold: float = 0.90):
"""
初期化
Args:
threshold: 類似度のしきい値(0.0~1.0、デフォルト0.90)
"""
self.threshold = threshold
self.buffer: Optional[Dict[str, Any]] = None
def add_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
レコードを追加
前回のレコードとマージすべきかを判定し、マージしない場合は
前回のレコードを返す。マージする場合はNoneを返す。
Args:
record: 追加するレコード
Returns:
出力すべきレコード(マージした場合はNone)
"""
# 最初のレコードの場合はバッファに保存
if self.buffer is None:
self.buffer = record
return None
# マージすべきか判定
if should_merge(self.buffer, record, self.threshold):
# マージしてバッファを更新
self.buffer = merge_records(self.buffer, record)
return None
else:
# マージしない場合は、バッファを出力して新しいレコードを保存
__init__ method · python · L87-L95 (9 LOC)src/screen_times/record_merger.py
def __init__(self, threshold: float = 0.90):
"""
初期化
Args:
threshold: 類似度のしきい値(0.0~1.0、デフォルト0.90)
"""
self.threshold = threshold
self.buffer: Optional[Dict[str, Any]] = Noneadd_record method · python · L97-L124 (28 LOC)src/screen_times/record_merger.py
def add_record(self, record: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
レコードを追加
前回のレコードとマージすべきかを判定し、マージしない場合は
前回のレコードを返す。マージする場合はNoneを返す。
Args:
record: 追加するレコード
Returns:
出力すべきレコード(マージした場合はNone)
"""
# 最初のレコードの場合はバッファに保存
if self.buffer is None:
self.buffer = record
return None
# マージすべきか判定
if should_merge(self.buffer, record, self.threshold):
# マージしてバッファを更新
self.buffer = merge_records(self.buffer, record)
return None
else:
# マージしない場合は、バッファを出力して新しいレコードを保存
output = self.buffer
self.buffer = record
return outputflush method · python · L126-L135 (10 LOC)src/screen_times/record_merger.py
def flush(self) -> Optional[Dict[str, Any]]:
"""
バッファに残っているレコードを取得
Returns:
バッファに残っているレコード(ない場合はNone)
"""
output = self.buffer
self.buffer = None
return outputScreenOCRConfig class · python · L21-L29 (9 LOC)src/screen_times/screen_ocr_logger.py
class ScreenOCRConfig:
"""ScreenOCRの設定"""
screenshot_dir: Path = Path("/tmp/screen-times")
timeout_seconds: int = 30
screenshot_retention_hours: int = 72
verbose: bool = False
dry_run: bool = False
merge_threshold: Optional[float] = NoneScreenOCRResult class · python · L33-L55 (23 LOC)src/screen_times/screen_ocr_logger.py
class ScreenOCRResult:
"""ScreenOCR実行結果"""
success: bool
timestamp: datetime
window_name: str
screenshot_path: Optional[Path]
text: str
text_length: int
jsonl_path: Optional[Path]
status: str = "normal"
error: Optional[str] = None
def __str__(self) -> str:
"""結果の文字列表現"""
if self.success:
return (
f"Success: {self.window_name} | "
f"{self.text_length} chars | "
f"Saved to {self.jsonl_path}"
)
else:
return f"Failed: {self.error}"__str__ method · python · L46-L55 (10 LOC)src/screen_times/screen_ocr_logger.py
def __str__(self) -> str:
"""結果の文字列表現"""
if self.success:
return (
f"Success: {self.window_name} | "
f"{self.text_length} chars | "
f"Saved to {self.jsonl_path}"
)
else:
return f"Failed: {self.error}"If a scraper extracted this row, it came from Repobility (https://repobility.com)
ScreenOCRLogger class · python · L58-L293 (236 LOC)src/screen_times/screen_ocr_logger.py
class ScreenOCRLogger:
"""
ScreenOCRシステムのファサード
複雑な一連の処理(スクリーンショット取得、OCR、ログ記録)を
単一のシンプルなインターフェースで提供する。
使用例:
>>> logger = ScreenOCRLogger()
>>> result = logger.run()
>>> print(result)
Success: Chrome | 1234 chars | Saved to ~/.screenocr_logs/2025-12-28.jsonl
>>> # カスタム設定で使用
>>> config = ScreenOCRConfig(
... screenshot_dir=Path("/custom/path"),
... timeout_seconds=10,
... verbose=True
... )
>>> logger = ScreenOCRLogger(config)
>>> result = logger.run()
"""
def __init__(self, config: Optional[ScreenOCRConfig] = None):
"""
初期化
Args:
config: 設定オブジェクト(Noneの場合はデフォルト設定)
"""
self.config = config or ScreenOCRConfig()
self.jsonl_manager = JsonlManager(merge_threshold=self.config.merge_threshold)
# スリープ状態検出用の状態
self._last_screenshot_size: Optional[int] = None
self._consecutive___init__ method · python · L81-L92 (12 LOC)src/screen_times/screen_ocr_logger.py
def __init__(self, config: Optional[ScreenOCRConfig] = None):
"""
初期化
Args:
config: 設定オブジェクト(Noneの場合はデフォルト設定)
"""
self.config = config or ScreenOCRConfig()
self.jsonl_manager = JsonlManager(merge_threshold=self.config.merge_threshold)
# スリープ状態検出用の状態
self._last_screenshot_size: Optional[int] = None
self._consecutive_empty_count: int = 0run method · python · L94-L172 (79 LOC)src/screen_times/screen_ocr_logger.py
def run(self) -> ScreenOCRResult:
"""
メイン処理を実行
スクリーンショット取得 → OCR → JSONL保存の一連の処理を実行する。
Returns:
実行結果(ScreenOCRResult)
"""
timestamp = datetime.now()
window_name = "Unknown"
screenshot_path = None
text = ""
jsonl_path = None
error = None
try:
# 1. アクティブウィンドウ取得
window_name, window_bounds = get_active_window()
if self.config.verbose:
print(f"Active window: {window_name}")
if window_bounds:
print(f"Window bounds: {window_bounds}")
# 2. スクリーンショット取得
screenshot_path = take_screenshot(self.config.screenshot_dir, window_bounds)
if self.config.verbose:
print(f"Screenshot saved: {screenshot_path}")
# 3. OCR処理
text = perform_ocr(screenshot_path, self.config.timeout_seconds)
if self.config.verbose:
cleanup method · python · L174-L214 (41 LOC)src/screen_times/screen_ocr_logger.py
def cleanup(self) -> int:
"""
古いスクリーンショットを削除
設定で指定された保持期間を超えたスクリーンショットファイルを削除する。
Returns:
削除したファイル数
"""
try:
cutoff_time = time.time() - (self.config.screenshot_retention_hours * 3600)
pattern = "screenshot_*.png"
deleted_count = 0
# ディレクトリが存在しない場合は0を返す
if not self.config.screenshot_dir.exists():
return 0
for screenshot in self.config.screenshot_dir.glob(pattern):
try:
# ファイルの最終更新時刻を確認
if screenshot.stat().st_mtime < cutoff_time:
screenshot.unlink()
deleted_count += 1
except Exception as file_error:
# 個別のファイル削除エラーは無視して続行
if self.config.verbose:
print(
f"Warning: Failed to delete {screenshot}: {file_error}", file=sys.stderr
_detect_sleep_state method · python · L216-L263 (48 LOC)src/screen_times/screen_ocr_logger.py
def _detect_sleep_state(self, text: str, screenshot_path: Path) -> str:
"""
スリープ/ロック状態を検出する
以下の条件でスリープ状態を判定:
- テキストが空(text_length = 0)
- スクリーンショットのファイルサイズが前回と同じ
- 連続して空のテキストが3回以上記録される
Args:
text: OCRで抽出されたテキスト
screenshot_path: スクリーンショットファイルのパス
Returns:
状態を表す文字列("normal", "sleep", "lock")
"""
# テキストが空でない場合は通常状態
if text.strip():
self._consecutive_empty_count = 0
self._last_screenshot_size = None
return "normal"
# テキストが空の場合、連続カウントを増加
self._consecutive_empty_count += 1
# スクリーンショットのファイルサイズを確認
try:
current_size = screenshot_path.stat().st_size
except (OSError, FileNotFoundError):
# ファイルアクセスエラーの場合は通常状態として扱う
return "normal"
# 初回の場合はサイズを記録して通常状態とする
if self._last_screenshot_size is None:
self._last_screenshot_size = current_si_save_to_jsonl method · python · L265-L293 (29 LOC)src/screen_times/screen_ocr_logger.py
def _save_to_jsonl(
self, timestamp: datetime, window: str, text: str, status: str = "normal"
) -> Path:
"""
JSONL形式でログを保存(日付ベースで自動分割)
Args:
timestamp: タイムスタンプ
window: ウィンドウ名
text: OCRテキスト
status: 状態("normal", "sleep", "error"など)
Returns:
保存先のJSONLファイルパス
Raises:
Exception: JSONL保存に失敗した場合
"""
try:
jsonl_path = self.jsonl_manager.get_current_jsonl_path(timestamp)
# append_recordは実際に書き込んだファイルパスを返す(サイズ超過時は新ファイル)
actual_path = self.jsonl_manager.append_record(
jsonl_path, timestamp, window, text, status
)
return actual_path
except Exception as e:
if self.config.verbose:
print(f"Error: Failed to write to JSONL: {e}", file=sys.stderr)
raisemain function · python · L296-L306 (11 LOC)src/screen_times/screen_ocr_logger.py
def main():
"""モジュールとして実行された時のエントリーポイント"""
logger = ScreenOCRLogger()
result = logger.run()
# マージャーをフラッシュ(バッファに残っているレコードを書き込む)
# これは最後の実行時に必要だが、定期実行では次の実行で処理されるため問題ない
# 念のため、明示的にフラッシュする
if not result.success:
sys.exit(1)main function · python · L23-L52 (30 LOC)src/screen_times/screenshot_ocr.py
def main():
"""メイン処理"""
try:
# 設定を準備
config = ScreenOCRConfig(
screenshot_dir=SCREENSHOT_DIR,
timeout_seconds=TIMEOUT_SECONDS,
screenshot_retention_hours=SCREENSHOT_RETENTION_HOURS,
verbose=True, # 詳細ログを出力
)
# ファサードを初期化
logger = ScreenOCRLogger(config)
# メイン処理を実行
result = logger.run()
# 結果を表示
if result.success:
print(f"Screenshot will be kept for {SCREENSHOT_RETENTION_HOURS} hours")
else:
print(f"Fatal error: {result.error}", file=sys.stderr)
sys.exit(1)
# 古いスクリーンショットをクリーンアップ
logger.cleanup()
except Exception as main_error:
print(f"Fatal error: {main_error}", file=sys.stderr)
sys.exit(1)Same scanner, your repo: https://repobility.com — Repobility
get_active_window function · python · L15-L86 (72 LOC)src/screen_times/screenshot.py
def get_active_window() -> tuple[str, Optional[tuple[int, int, int, int]]]:
"""
AppleScript経由でアクティブウィンドウ名と位置を取得
Returns:
(アプリケーション名, ウィンドウ位置 (x, y, width, height) または None)
"""
script_path = Path(__file__).parent / "resources" / "screenshot_window.applescript"
try:
result = subprocess.run(
["osascript", str(script_path)], capture_output=True, text=True, timeout=3, check=True
)
app_name = result.stdout.strip() or "Unknown"
# PyObjCでウィンドウ情報を取得
try:
from Quartz import (
CGWindowListCopyWindowInfo,
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID,
)
# 画面上の全ウィンドウ情報を取得
window_list = CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly, kCGNullWindowID
)
# アクティブなアプリのウィンドウを探す
# 正規化して比較用の文字列を作成
normalized_app_name = app_name.lower().replace("-take_screenshot function · python · L89-L129 (41 LOC)src/screen_times/screenshot.py
def take_screenshot(
screenshot_dir: Path, window_bounds: Optional[tuple[int, int, int, int]] = None
) -> Path:
"""
スクリーンショットを取得
Args:
screenshot_dir: スクリーンショット保存先ディレクトリ
window_bounds: ウィンドウの位置とサイズ (x, y, w, h)。Noneの場合は画面全体
Returns:
スクリーンショットのパス
"""
# ディレクトリが存在しない場合は作成
screenshot_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshot_path = screenshot_dir / f"screenshot_{timestamp}.png"
try:
if window_bounds:
x, y, w, h = window_bounds
# -R オプションで特定の領域をキャプチャ
cmd = ["screencapture", "-x", "-R", f"{x},{y},{w},{h}", str(screenshot_path)]
else:
# 画面全体をキャプチャ(フォールバック)
cmd = ["screencapture", "-x", str(screenshot_path)]
subprocess.run(cmd, check=True, timeout=5)
if not screenshot_path.exists():
raise FileNotFoundError(f"Screenshot was not created: {screenshot_path}")
generate_task_id function · python · L17-L32 (16 LOC)src/screen_times/split_jsonl.py
def generate_task_id(description: str) -> str:
"""
タスク説明からタスクIDを生成
Args:
description: タスクの説明
Returns:
タスクID(英数字とハイフン)
"""
# 簡易的な実装: 最初の20文字を使用し、スペースをハイフンに変換
task_id = description[:20].replace(" ", "-").replace(" ", "-")
# 使用できない文字を除去
allowed_chars = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_")
task_id = "".join(c for c in task_id if c in allowed_chars)
return task_id or "task"page 1 / 2next ›