← back to djacobs__Pinwheel

Function bodies 688 total

All specs Real LLM only Function bodies
apply_variance function · python · L193-L229 (37 LOC)
src/pinwheel/core/archetypes.py
def apply_variance(
    base: PlayerAttributes,
    rng_seed: int,
    variance: int = 10,
) -> PlayerAttributes:
    """Apply random variance to base archetype attributes.

    After applying per-attribute jitter, the total is normalized back to
    the original base total so the result stays within the 360-point
    budget (+/- VARIANCE enforced by the model validator).
    """
    import random

    rng = random.Random(rng_seed)
    data = base.model_dump()
    target_total = base.total()
    for key in data:
        delta = rng.randint(-variance, variance)
        data[key] = max(1, min(100, data[key] + delta))

    # Normalize: redistribute the drift so total matches the base budget.
    current_total = sum(data.values())
    drift = current_total - target_total
    keys = list(data.keys())
    rng.shuffle(keys)
    for key in keys:
        if drift == 0:
            break
        if drift > 0:
            reduction = min(drift, data[key] - 1)
            data[key] -= reduction
  
GameContext.meta_get method · python · L96-L102 (7 LOC)
src/pinwheel/core/codegen.py
    def meta_get(
        self,
        entity_type: str,
        entity_id: str,
        field_name: str,
        default: object = None,
    ) -> object: ...
SandboxedGameContext.meta_get method · python · L162-L172 (11 LOC)
src/pinwheel/core/codegen.py
    def meta_get(
        self,
        entity_type: str,
        entity_id: str,
        field_name: str,
        default: object = None,
    ) -> object:
        """Read from MetaStore if available (trust level STATE+)."""
        if self._meta_store_ref is None:
            return default
        return self._meta_store_ref.get(entity_type, entity_id, field_name, default=default)
clamp_result function · python · L213-L243 (31 LOC)
src/pinwheel/core/codegen.py
def clamp_result(result: CodegenHookResult) -> CodegenHookResult:
    """Enforce bounds on generated code output.

    Even if the code tries to return score_modifier=9999, it gets clamped.
    """
    for field_name, (lo, hi) in RESULT_BOUNDS.items():
        val = getattr(result, field_name)
        if isinstance(val, (int, float)):
            clamped = max(lo, min(hi, val))
            # Preserve type: int fields stay int, float fields stay float
            if isinstance(lo, int) and isinstance(hi, int):
                setattr(result, field_name, int(clamped))
            else:
                setattr(result, field_name, float(clamped))

    # Clamp narrative
    if len(result.narrative_note) > MAX_NARRATIVE_LENGTH:
        result.narrative_note = result.narrative_note[:MAX_NARRATIVE_LENGTH]

    # Clamp meta_writes
    if result.meta_writes:
        if len(result.meta_writes) > MAX_META_WRITES:
            trimmed = dict(list(result.meta_writes.items())[:MAX_META_WRITES])
      
enforce_trust_level function · python · L293-L303 (11 LOC)
src/pinwheel/core/codegen.py
def enforce_trust_level(
    result: CodegenHookResult,
    trust_level: CodegenTrustLevel,
) -> CodegenHookResult:
    """Zero out any fields the code shouldn't be writing to."""
    allowed = TRUST_LEVEL_ALLOWED_RESULT_FIELDS[trust_level]
    defaults = CodegenHookResult()
    for f in dataclasses.fields(result):
        if f.name not in allowed:
            setattr(result, f.name, getattr(defaults, f.name))
    return result
CodegenASTValidator.validate method · python · L346-L438 (93 LOC)
src/pinwheel/core/codegen.py
    def validate(self, code: str) -> list[str]:
        """Return list of violations. Empty list = code is valid."""
        violations: list[str] = []

        if len(code) > MAX_CODE_LENGTH:
            violations.append(
                f"Code exceeds max length ({len(code)} > {MAX_CODE_LENGTH})"
            )
            return violations

        try:
            tree = ast.parse(code, mode="exec")
        except SyntaxError as e:
            violations.append(f"Syntax error: {e}")
            return violations

        # Check AST depth
        depth = self._max_depth(tree)
        if depth > MAX_AST_DEPTH:
            violations.append(f"AST depth {depth} exceeds max {MAX_AST_DEPTH}")

        for node in ast.walk(tree):
            # No imports
            if isinstance(node, ast.Import | ast.ImportFrom):
                violations.append(
                    f"Import statement at line {node.lineno}"
                )

            # No forbidden function calls
            if (
CodegenASTValidator._is_bounded_for method · python · L440-L473 (34 LOC)
src/pinwheel/core/codegen.py
    def _is_bounded_for(self, node: ast.For) -> bool:
        """Check that a for-loop uses range() with bounded arg, or dict iteration."""
        if not isinstance(node.iter, ast.Call):
            return False

        func = node.iter.func

        # range(N) where N is a literal <= MAX_LOOP_BOUND
        if isinstance(func, ast.Name) and func.id == "range":
            args = node.iter.args
            if not args:
                return False
            stop_arg = args[0] if len(args) == 1 else args[1]
            if isinstance(stop_arg, ast.Constant) and isinstance(stop_arg.value, int):
                return stop_arg.value <= MAX_LOOP_BOUND
            return False

        # .items(), .values(), .keys() calls on any object
        if isinstance(func, ast.Attribute) and func.attr in ("items", "values", "keys"):
            return True

        # enumerate() wrapping a bounded iterable — allow if wrapping dict method
        if isinstance(func, ast.Name) and func.id == "enumera
Repobility — same analyzer, your code, free for public repos · /scan/
CodegenASTValidator._max_depth method · python · L475-L482 (8 LOC)
src/pinwheel/core/codegen.py
    def _max_depth(self, node: ast.AST, current: int = 0) -> int:
        """Compute maximum AST depth."""
        max_d = current
        for child in ast.iter_child_nodes(node):
            child_depth = self._max_depth(child, current + 1)
            if child_depth > max_d:
                max_d = child_depth
        return max_d
execute_codegen_effect function · python · L519-L584 (66 LOC)
src/pinwheel/core/codegen.py
def execute_codegen_effect(
    code: str,
    ctx: GameContext,
    rng: object,  # random.Random
) -> CodegenHookResult:
    """Execute generated code in a sandboxed environment.

    The code string is a function body that was previously validated
    by CodegenASTValidator and approved by the council.
    """
    import math
    import signal

    # Wrap the code in a function definition
    lines = code.strip().split("\n")
    indented = "\n".join(f"    {line}" for line in lines)
    wrapped = f"def _codegen_execute(ctx, rng, math, HookResult):\n{indented}\n"

    # Build restricted globals
    sandbox_globals: dict[str, object] = {
        "__builtins__": SANDBOX_BUILTINS,
    }

    # Compile
    try:
        compiled = compile(wrapped, "<codegen-effect>", "exec")
    except SyntaxError as e:
        raise SandboxViolation(
            violation_type="syntax_error",
            detail=str(e),
        ) from e

    # Execute the function definition (defines _codegen_execute in sa
select_scheme function · python · L43-L111 (69 LOC)
src/pinwheel/core/defense.py
def select_scheme(
    offense: list[HooperState],
    defense: list[HooperState],
    game_state: GameState,
    rules: RuleSet,
    rng: random.Random,
    strategy: TeamStrategy | None = None,
) -> DefensiveScheme:
    """Select defensive scheme based on team attributes, game state, and strategy.

    When a TeamStrategy is provided, ``defensive_intensity`` nudges scheme weights:
    high intensity (> 0.2) favours man-tight and press (aggressive schemes),
    while low intensity (< -0.1) favours zone (passive/conserve energy).
    """
    if not defense:
        return "zone"

    avg_def_iq = sum(d.current_attributes.iq for d in defense) / len(defense)
    avg_def_stamina = sum(d.current_stamina for d in defense) / len(defense)
    avg_off_speed = sum(o.current_attributes.speed for o in offense) / len(offense)

    # Score-based tendencies
    score_diff = game_state.score_diff
    if not game_state.home_has_ball:
        score_diff = -score_diff  # from defense's perspective

    
assign_matchups function · python · L114-L151 (38 LOC)
src/pinwheel/core/defense.py
def assign_matchups(
    offense: list[HooperState],
    defense: list[HooperState],
    scheme: DefensiveScheme,
    rng: random.Random,
) -> dict[str, str]:
    """Assign defensive matchups. Returns {defender_id: attacker_id}.

    Man schemes: match by role (best defender on best scorer).
    Zone: distributed assignment.
    """
    if not offense or not defense:
        return {}

    if scheme in ("man_tight", "man_switch"):
        # Sort offense by scoring (desc), defense by defense (desc)
        off_sorted = sorted(offense, key=lambda a: a.current_attributes.scoring, reverse=True)
        def_sorted = sorted(defense, key=lambda a: a.current_attributes.defense, reverse=True)
        matchups = {}
        for i, d in enumerate(def_sorted):
            opp = off_sorted[i % len(off_sorted)]
            matchups[d.hooper.id] = opp.hooper.id
        return matchups

    if scheme == "zone":
        # Zone: each defender covers a zone, roughly rotational assignment
        matchups 
get_primary_defender function · python · L154-L166 (13 LOC)
src/pinwheel/core/defense.py
def get_primary_defender(
    ball_handler: HooperState,
    matchups: dict[str, str],
    defense: list[HooperState],
) -> HooperState:
    """Find the defender assigned to guard the ball handler."""
    for def_id, off_id in matchups.items():
        if off_id == ball_handler.hooper.id:
            for d in defense:
                if d.hooper.id == def_id:
                    return d
    # Fallback: first available defender
    return defense[0] if defense else ball_handler
get_pending_interpretations function · python · L35-L60 (26 LOC)
src/pinwheel/core/deferred_interpreter.py
async def get_pending_interpretations(
    repo: Repository,
    season_id: str,
) -> list[GovernanceEvent]:
    """Find pending_interpretation events without a corresponding ready or expired event."""
    pending_events = await repo.get_events_by_type(
        season_id=season_id,
        event_types=["proposal.pending_interpretation"],
    )
    ready_events = await repo.get_events_by_type(
        season_id=season_id,
        event_types=["proposal.interpretation_ready"],
    )
    expired_events = await repo.get_events_by_type(
        season_id=season_id,
        event_types=["proposal.interpretation_expired"],
    )

    # Build set of resolved aggregate IDs
    resolved_ids: set[str] = set()
    for ev in ready_events:
        resolved_ids.add(ev.aggregate_id)
    for ev in expired_events:
        resolved_ids.add(ev.aggregate_id)

    return [ev for ev in pending_events if ev.aggregate_id not in resolved_ids]
retry_pending_interpretation function · python · L63-L166 (104 LOC)
src/pinwheel/core/deferred_interpreter.py
async def retry_pending_interpretation(
    repo: Repository,
    pending: GovernanceEvent,
    api_key: str,
) -> bool:
    """Retry interpretation for a single pending event.

    Returns True if the retry succeeded and the interpretation_ready event was appended.
    Returns False if the retry failed or was expired after MAX_RETRIES.
    """
    from pinwheel.ai.interpreter import interpret_proposal_v2
    from pinwheel.models.governance import ProposalInterpretation
    from pinwheel.models.rules import RuleSet

    payload = pending.payload
    raw_text = str(payload.get("raw_text", ""))
    ruleset_data = payload.get("ruleset", {})
    if not raw_text:
        return False

    # Count prior failed retries — expire after MAX_RETRIES
    retry_failed_events = await repo.get_events_by_type(
        season_id=pending.season_id,
        event_types=["proposal.interpretation_retry_failed"],
    )
    retry_count = sum(
        1 for e in retry_failed_events if e.aggregate_id == pendin
_expire_and_refund function · python · L169-L195 (27 LOC)
src/pinwheel/core/deferred_interpreter.py
async def _expire_and_refund(repo: Repository, pending: GovernanceEvent) -> None:
    """Expire a pending interpretation and refund the governor's token."""
    await repo.append_event(
        event_type="proposal.interpretation_expired",
        aggregate_id=pending.aggregate_id,
        aggregate_type="proposal",
        season_id=pending.season_id,
        governor_id=pending.governor_id,
        payload={
            "reason": "max_retries_exceeded",
            "raw_text": pending.payload.get("raw_text", ""),
        },
    )
    token_cost = pending.payload.get("token_cost", 1)
    if isinstance(token_cost, (int, float)):
        await repo.append_event(
            event_type="token.regenerated",
            aggregate_id=pending.governor_id,
            aggregate_type="token",
            season_id=pending.season_id,
            governor_id=pending.governor_id,
            payload={
                "token_type": "propose",
                "amount": int(token_cost),
            
Repobility · MCP-ready · https://repobility.com
expire_stale_pending function · python · L198-L251 (54 LOC)
src/pinwheel/core/deferred_interpreter.py
async def expire_stale_pending(
    repo: Repository,
    season_id: str,
    max_age_hours: float = MAX_PENDING_AGE_HOURS,
) -> list[str]:
    """Expire pending interpretations older than max_age_hours. Refund tokens.

    Returns list of expired aggregate IDs.
    """
    from datetime import UTC, datetime, timedelta

    pending = await get_pending_interpretations(repo, season_id)
    now = datetime.now(UTC)
    cutoff = now - timedelta(hours=max_age_hours)
    expired_ids: list[str] = []

    for ev in pending:
        ev_time = getattr(ev, "created_at", None) or getattr(ev, "timestamp", None)
        if ev_time is None or ev_time < cutoff:
            # Expire it
            await repo.append_event(
                event_type="proposal.interpretation_expired",
                aggregate_id=ev.aggregate_id,
                aggregate_type="proposal",
                season_id=season_id,
                governor_id=ev.governor_id,
                payload={
                    "reason"
_dm_player_with_interpretation function · python · L254-L342 (89 LOC)
src/pinwheel/core/deferred_interpreter.py
async def _dm_player_with_interpretation(
    bot: discord.Client,
    ready_event: GovernanceEvent,
    engine: object,
    settings: object,
) -> None:
    """DM the player with their deferred interpretation result.

    Sends an embed with Confirm/Revise/Cancel buttons. If the player
    can't be reached (left server, DMs closed), the interpretation and
    token are expired.
    """
    from pinwheel.discord.embeds import build_interpretation_embed
    from pinwheel.discord.helpers import GovernorInfo
    from pinwheel.discord.views import ProposalConfirmView
    from pinwheel.models.governance import ProposalInterpretation

    payload = ready_event.payload
    discord_user_id = payload.get("discord_user_id")
    if not discord_user_id:
        return

    interp_data = payload.get("interpretation", {})
    if not isinstance(interp_data, dict):
        return

    try:
        interpretation_v2 = ProposalInterpretation(**interp_data)
    except (ValidationError, TypeError):
      
tick_deferred_interpretations function · python · L345-L417 (73 LOC)
src/pinwheel/core/deferred_interpreter.py
async def tick_deferred_interpretations(
    engine: AsyncEngine,
    api_key: str,
    bot: discord.Client | None = None,
    settings: Settings | None = None,
) -> None:
    """Scheduler entry point — called every 60 seconds.

    1. Find pending interpretations
    2. Retry each one
    3. DM players for successful retries
    4. Expire stale ones
    """
    from pinwheel.db.engine import get_session
    from pinwheel.db.repository import Repository

    if not api_key:
        return

    try:
        async with get_session(engine) as session:
            repo = Repository(session)

            # Check ALL seasons for pending interpretations — proposals can
            # be submitted in a season that later completes while still pending.
            all_seasons = await repo.get_all_seasons()
            if not all_seasons:
                return

            total_expired: list[str] = []
            all_pending: list[tuple[str, GovernanceEvent]] = []

            for season in all_
annotate_drama function · python · L42-L176 (135 LOC)
src/pinwheel/core/drama.py
def annotate_drama(game_result: GameResult) -> list[DramaAnnotation]:
    """Pre-classify every possession in a game for dramatic pacing.

    Because the presenter has the full GameResult, this runs once before
    streaming begins. The annotations drive both pacing and visual treatment.

    Detection rules (in order of evaluation):
        - Lead changes and tie-breaking
        - Scoring runs (momentum detection)
        - Move activations (signature plays)
        - Elam Ending approach (target score proximity)
        - Game-winning shot (final scoring possession)
        - Elam period transition
        - Close game in late quarters

    Returns a list of DramaAnnotation, one per possession, in order.
    """
    annotations: list[DramaAnnotation] = []
    possessions = game_result.possession_log
    if not possessions:
        return annotations

    # Pre-compute game-level context
    elam_target = game_result.elam_target_score
    total_possessions = len(possessions)
    las
normalize_delays function · python · L179-L207 (29 LOC)
src/pinwheel/core/drama.py
def normalize_delays(
    annotations: list[DramaAnnotation],
    quarter_seconds: float,
) -> list[float]:
    """Convert drama annotations into actual delay values (seconds).

    Normalizes so total delay across the quarter equals ``quarter_seconds``.
    Dramatic moments get more time, routine moments get less, but the total
    quarter duration stays the same.

    Args:
        annotations: DramaAnnotations for possessions in a single quarter.
        quarter_seconds: Wall-clock budget for the quarter.

    Returns:
        A list of delay values (seconds), one per annotation, summing to
        approximately ``quarter_seconds``.
    """
    if not annotations:
        return []

    raw_multipliers = [a.delay_multiplier for a in annotations]
    total_raw = sum(raw_multipliers)
    if total_raw == 0:
        base_delay = quarter_seconds / max(len(annotations), 1)
        return [base_delay] * len(annotations)

    base_delay = quarter_seconds / total_raw
    return [m * base_del
compute_drama_score function · python · L210-L291 (82 LOC)
src/pinwheel/core/drama.py
def compute_drama_score(
    game_result: GameResult,
    *,
    is_playoff: bool = False,
) -> float:
    """Compute a single game-level drama score from 0.0 (boring) to 1.0 (incredible).

    The score is a weighted combination of four factors:

    1. **Score differential** (weight 0.35) -- Close finishes are dramatic.
       A 1-point margin scores 1.0; a 30+ point blowout scores 0.0.
    2. **Lead changes** (weight 0.25) -- Games with many lead changes are dramatic.
       6+ lead changes scores 1.0; 0 scores 0.0.
    3. **Elam ending** (weight 0.20) -- Games that activated the Elam ending
       get a drama boost. Elam itself scores 0.6; if the Elam margin was
       also close (target reached by <= 3 points), scores 1.0.
    4. **Playoff context** (weight 0.20) -- Playoff games are inherently
       more dramatic. Playoff scores 1.0; regular season scores 0.0.

    Args:
        game_result: The completed game's result.
        is_playoff: Whether this game is a playoff game.

 
get_drama_summary function · python · L294-L302 (9 LOC)
src/pinwheel/core/drama.py
def get_drama_summary(annotations: list[DramaAnnotation]) -> dict[str, int]:
    """Summarize drama level counts for logging/debugging.

    Returns a dict like {"routine": 40, "elevated": 5, "high": 8, "peak": 2}.
    """
    counts: dict[str, int] = {"routine": 0, "elevated": 0, "high": 0, "peak": 0}
    for a in annotations:
        counts[a.level] = counts.get(a.level, 0) + 1
    return counts
EffectRegistry.register method · python · L37-L46 (10 LOC)
src/pinwheel/core/effects.py
    def register(self, effect: RegisteredEffect) -> None:
        """Register a new effect."""
        self._effects[effect.effect_id] = effect
        logger.info(
            "effect_registered id=%s type=%s hooks=%s lifetime=%s",
            effect.effect_id,
            effect.effect_type,
            effect.hook_points,
            effect.lifetime.value,
        )
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
EffectRegistry.deregister method · python · L48-L53 (6 LOC)
src/pinwheel/core/effects.py
    def deregister(self, effect_id: str) -> RegisteredEffect | None:
        """Remove an effect from the registry. Returns the removed effect or None."""
        effect = self._effects.pop(effect_id, None)
        if effect:
            logger.info("effect_deregistered id=%s", effect_id)
        return effect
EffectRegistry.get_effects_for_hook method · python · L55-L60 (6 LOC)
src/pinwheel/core/effects.py
    def get_effects_for_hook(self, hook: str) -> list[RegisteredEffect]:
        """Get all active effects that listen on a specific hook point."""
        return [
            e for e in self._effects.values()
            if hook in e.hook_points
        ]
EffectRegistry.get_narrative_effects method · python · L66-L71 (6 LOC)
src/pinwheel/core/effects.py
    def get_narrative_effects(self) -> list[RegisteredEffect]:
        """Get all active narrative effects."""
        return [
            e for e in self._effects.values()
            if e.effect_type == "narrative"
        ]
EffectRegistry.tick_round method · python · L73-L88 (16 LOC)
src/pinwheel/core/effects.py
    def tick_round(self, current_round: int) -> list[str]:
        """Advance round counters for all effects.

        Returns list of effect IDs that have expired.
        """
        expired_ids: list[str] = []
        for effect in list(self._effects.values()):
            if effect.tick_round():
                expired_ids.append(effect.effect_id)
                self._effects.pop(effect.effect_id, None)
                logger.info(
                    "effect_expired id=%s round=%d",
                    effect.effect_id,
                    current_round,
                )
        return expired_ids
EffectRegistry.get_effects_for_proposal method · python · L94-L99 (6 LOC)
src/pinwheel/core/effects.py
    def get_effects_for_proposal(self, proposal_id: str) -> list[RegisteredEffect]:
        """Get all effects from a specific proposal."""
        return [
            e for e in self._effects.values()
            if e.proposal_id == proposal_id
        ]
EffectRegistry.remove_effect method · python · L101-L107 (7 LOC)
src/pinwheel/core/effects.py
    def remove_effect(self, effect_id: str) -> bool:
        """Remove an effect by ID. Returns True if removed, False if not found."""
        effect = self._effects.pop(effect_id, None)
        if effect:
            logger.info("effect_removed id=%s type=%s", effect_id, effect.effect_type)
            return True
        return False
EffectRegistry.build_effects_summary method · python · L114-L149 (36 LOC)
src/pinwheel/core/effects.py
    def build_effects_summary(self) -> str:
        """Build a human-readable summary of active effects for report context."""
        if not self._effects:
            return "No active effects."

        lines: list[str] = []
        for effect in self._effects.values():
            desc = (
                effect.description
                or effect.narrative_instruction
                or effect.effect_type
            )
            lifetime_str = effect.lifetime.value
            if effect.rounds_remaining is not None:
                lifetime_str = f"{effect.rounds_remaining} rounds remaining"
            type_label = (
                "PENDING MECHANIC"
                if effect.effect_type == "custom_mechanic"
                else effect.effect_type
            )
            line = f"- [{type_label}] {desc} ({lifetime_str})"

            # Codegen metadata
            if effect.effect_type == "codegen":
                status = (
                    "enabled" if effect.codegen
effect_spec_to_registered function · python · L152-L242 (91 LOC)
src/pinwheel/core/effects.py
def effect_spec_to_registered(
    spec: EffectSpec,
    proposal_id: str,
    current_round: int,
) -> RegisteredEffect:
    """Convert an EffectSpec from AI interpretation into a RegisteredEffect.

    Maps the structured spec into the runtime effect object that the
    registry manages.
    """
    effect_id = str(uuid.uuid4())

    # Determine lifetime
    if spec.duration == "permanent":
        lifetime = EffectLifetime.PERMANENT
    elif spec.duration == "n_rounds":
        lifetime = EffectLifetime.N_ROUNDS
    elif spec.duration == "one_game":
        lifetime = EffectLifetime.ONE_GAME
    elif spec.duration == "until_repealed":
        lifetime = EffectLifetime.UNTIL_REPEALED
    else:
        lifetime = EffectLifetime.PERMANENT

    # Determine hook points based on effect type
    hook_points: list[str] = []
    if spec.effect_type == "codegen" and spec.codegen:
        # Codegen effects use hook points from the CodegenEffectSpec
        hook_points = list(spec.codegen.hook_
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
collect_game_def_patches function · python · L245-L270 (26 LOC)
src/pinwheel/core/effects.py
def collect_game_def_patches(
    effects: list[RegisteredEffect],
) -> list[dict[str, object]]:
    """Extract game definition patch dicts from active effects.

    Returns a list of raw patch dicts (suitable for constructing
    ``GameDefinitionPatch`` objects) from effects whose type is
    ``modify_game_definition``. Patches are returned in registration
    order (by ``registered_at_round``, then by ``effect_id``).

    Args:
        effects: All active effects (typically from ``EffectRegistry.get_all_active()``).

    Returns:
        List of patch dicts, each deserializable via ``GameDefinitionPatch(**d)``.
    """
    game_def_effects = [
        e for e in effects
        if e.effect_type == "modify_game_definition"
        and e.action_code
        and isinstance(e.action_code.get("patch"), dict)
    ]
    # Sort by registration order for deterministic application
    game_def_effects.sort(key=lambda e: (e.registered_at_round, e.effect_id))

    return [e.action_code["patch"] 
register_effects_for_proposal function · python · L273-L306 (34 LOC)
src/pinwheel/core/effects.py
async def register_effects_for_proposal(
    repo: Repository,
    registry: EffectRegistry,
    proposal_id: str,
    effects: list[EffectSpec],
    season_id: str,
    current_round: int,
) -> list[RegisteredEffect]:
    """Register effects for a passing proposal.

    Creates RegisteredEffect objects, adds them to the registry,
    and persists them via effect.registered events.
    """
    registered: list[RegisteredEffect] = []

    for spec in effects:
        # Skip parameter_change effects — those go through the existing RuleSet path
        if spec.effect_type == "parameter_change":
            continue

        effect = effect_spec_to_registered(spec, proposal_id, current_round)
        registry.register(effect)
        registered.append(effect)

        # Persist to event store
        await repo.append_event(
            event_type="effect.registered",
            aggregate_id=effect.effect_id,
            aggregate_type="effect",
            season_id=season_id,
          
load_effect_registry function · python · L309-L357 (49 LOC)
src/pinwheel/core/effects.py
async def load_effect_registry(
    repo: Repository,
    season_id: str,
) -> EffectRegistry:
    """Rebuild the effect registry from the event store.

    Replays effect.registered events and removes any effects that
    have been expired or repealed.
    """
    registry = EffectRegistry()

    # Load all effect events
    registered_events = await repo.get_events_by_type(
        season_id=season_id,
        event_types=["effect.registered"],
    )
    expired_events = await repo.get_events_by_type(
        season_id=season_id,
        event_types=["effect.expired"],
    )
    repealed_events = await repo.get_events_by_type(
        season_id=season_id,
        event_types=["effect.repealed"],
    )

    # Build set of dead effect IDs
    dead_ids: set[str] = set()
    for ev in expired_events:
        dead_ids.add(str(ev.payload.get("effect_id", ev.aggregate_id)))
    for ev in repealed_events:
        dead_ids.add(str(ev.payload.get("effect_id", ev.aggregate_id)))

    # Register
persist_expired_effects function · python · L360-L373 (14 LOC)
src/pinwheel/core/effects.py
async def persist_expired_effects(
    repo: Repository,
    season_id: str,
    expired_ids: list[str],
) -> None:
    """Persist effect expiration events."""
    for effect_id in expired_ids:
        await repo.append_event(
            event_type="effect.expired",
            aggregate_id=effect_id,
            aggregate_type="effect",
            season_id=season_id,
            payload={"effect_id": effect_id, "reason": "lifetime_expired"},
        )
activate_custom_mechanic function · python · L376-L421 (46 LOC)
src/pinwheel/core/effects.py
async def activate_custom_mechanic(
    repo: Repository,
    registry: EffectRegistry,
    effect_id: str,
    season_id: str,
    hook_point: str | None = None,
    action_code: dict[str, object] | None = None,
) -> bool:
    """Activate a pending custom_mechanic effect with real hook/action implementation.

    Called by admin via /activate-mechanic. If hook_point and action_code
    are provided, the effect becomes a real hook_callback. If not, the
    approximation (already live) is confirmed as good enough.

    Returns True if the effect was found and activated.
    """
    effect = registry.get_effect(effect_id)
    if effect is None or effect.effect_type != "custom_mechanic":
        return False

    if hook_point and action_code:
        # Upgrade to a real hook_callback
        effect.effect_type = "hook_callback"
        effect._hook_points = [hook_point]
        effect.action_code = action_code

    # Persist activation event
    await repo.append_event(
        event_typ
repeal_effect function · python · L424-L460 (37 LOC)
src/pinwheel/core/effects.py
async def repeal_effect(
    repo: Repository,
    registry: EffectRegistry,
    effect_id: str,
    season_id: str,
    proposal_id: str,
) -> bool:
    """Repeal an active effect via governance.

    Writes an effect.repealed event to the event store and removes the
    effect from the in-memory registry. Returns True if the effect was
    found and removed, False if it was not in the registry.
    """
    removed = registry.remove_effect(effect_id)

    # Always write the repeal event — even if the effect was already
    # expired in-memory, the event store needs the record so that
    # load_effect_registry() excludes it on future reloads.
    await repo.append_event(
        event_type="effect.repealed",
        aggregate_id=effect_id,
        aggregate_type="effect",
        season_id=season_id,
        payload={
            "effect_id": effect_id,
            "reason": "governance_repeal",
            "proposal_id": proposal_id,
        },
    )

    logger.info(
        "effect
EventBus.publish method · python · L37-L59 (23 LOC)
src/pinwheel/core/event_bus.py
    async def publish(self, event_type: str, data: dict[str, Any]) -> int:
        """Publish an event to all subscribers of this type + wildcard subscribers.

        Returns the number of subscribers that received the event.
        """
        envelope = {"type": event_type, "data": data}
        count = 0

        for queue in self._subscribers.get(event_type, []):
            try:
                queue.put_nowait(envelope)
                count += 1
            except asyncio.QueueFull:
                logger.warning("Dropping event %s for slow subscriber", event_type)

        for queue in self._wildcard_subscribers:
            try:
                queue.put_nowait(envelope)
                count += 1
            except asyncio.QueueFull:
                logger.warning("Dropping wildcard event %s for slow subscriber", event_type)

        return count
EventBus.subscribe method · python · L61-L68 (8 LOC)
src/pinwheel/core/event_bus.py
    def subscribe(self, event_type: str | None = None, max_size: int = 100) -> Subscription:
        """Create a subscription for a specific event type (or all events if None).

        Returns a Subscription that works as an async iterator.
        Must be used as an async context manager to ensure cleanup.
        """
        queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=max_size)
        return Subscription(self, queue, event_type)
Repobility — same analyzer, your code, free for public repos · /scan/
EventBus._unregister method · python · L76-L82 (7 LOC)
src/pinwheel/core/event_bus.py
    def _unregister(self, queue: asyncio.Queue[dict[str, Any]], event_type: str | None) -> None:
        if event_type is None:
            with contextlib.suppress(ValueError):
                self._wildcard_subscribers.remove(queue)
        else:
            with contextlib.suppress(ValueError):
                self._subscribers[event_type].remove(queue)
Subscription.__init__ method · python · L94-L103 (10 LOC)
src/pinwheel/core/event_bus.py
    def __init__(
        self,
        bus: EventBus,
        queue: asyncio.Queue[dict[str, Any]],
        event_type: str | None,
    ) -> None:
        self._bus = bus
        self._queue = queue
        self._event_type = event_type
        self._active = False
Subscription.__anext__ method · python · L117-L123 (7 LOC)
src/pinwheel/core/event_bus.py
    async def __anext__(self) -> dict[str, Any]:
        if not self._active:
            raise StopAsyncIteration
        try:
            return await self._queue.get()
        except asyncio.CancelledError:
            raise StopAsyncIteration from None
Subscription.get method · python · L125-L130 (6 LOC)
src/pinwheel/core/event_bus.py
    async def get(self, timeout: float | None = None) -> dict[str, Any] | None:
        """Get next event with optional timeout. Returns None on timeout."""
        try:
            return await asyncio.wait_for(self._queue.get(), timeout=timeout)
        except TimeoutError:
            return None
_row_to_team function · python · L88-L120 (33 LOC)
src/pinwheel/core/game_loop.py
def _row_to_team(team_row: TeamRow) -> Team:
    """Convert a TeamRow + HooperRows to domain Team model."""
    # suppress_budget_check() bypasses budget validation for DB-persisted
    # hoopers that may predate the budget enforcement rule.
    with suppress_budget_check():
        hoopers = []
        for idx, a in enumerate(team_row.hoopers):
            attrs = PlayerAttributes(**a.attributes)
            raw_moves = a.moves if hasattr(a, "moves") and a.moves else []
            moves = [Move(**m) if isinstance(m, dict) else m for m in raw_moves]
            hoopers.append(
                Hooper(
                    id=a.id,
                    name=a.name,
                    team_id=a.team_id,
                    archetype=a.archetype,
                    attributes=attrs,
                    moves=moves,
                    is_starter=idx < 3,
                )
            )

    venue_data = team_row.venue
    venue = Venue(**(venue_data or {"name": "Default Arena"}))

    ret
_check_earned_moves function · python · L123-L183 (61 LOC)
src/pinwheel/core/game_loop.py
async def _check_earned_moves(
    repo: Repository,
    season_id: str,
    round_number: int,
    teams_cache: dict[str, Team],
    event_bus: EventBus | None = None,
) -> list[dict]:
    """Check all hoopers for newly earned moves via milestone thresholds.

    Iterates every hooper in teams_cache, aggregates their season stats,
    and grants any moves whose milestone thresholds have been crossed.
    Returns a list of grant dicts for narrative integration.
    """
    grants: list[dict] = []
    for team in teams_cache.values():
        for hooper in team.hoopers:
            season_stats = await repo.get_hooper_season_stats(hooper.id, season_id)
            existing_move_names = {m.name for m in hooper.moves}

            new_moves = check_milestones(season_stats, existing_move_names)
            for move in new_moves:
                await repo.add_hooper_move(hooper.id, move.model_dump())
                grant = {
                    "hooper_id": hooper.id,
                    
_check_season_complete function · python · L186-L199 (14 LOC)
src/pinwheel/core/game_loop.py
async def _check_season_complete(repo: Repository, season_id: str) -> bool:
    """Check if all scheduled regular-season games have been played.

    Compares the set of round numbers in the regular-season schedule against
    the set of round numbers that have game results stored.  Returns True only
    when every scheduled round has at least one played game.
    """
    schedule = await repo.get_full_schedule(season_id, phase="regular")
    if not schedule:
        return False
    games = await repo.get_all_games(season_id)
    played_rounds = {g.round_number for g in games}
    scheduled_rounds = {s.round_number for s in schedule}
    return scheduled_rounds.issubset(played_rounds)
_get_playoff_series_record function · python · L207-L251 (45 LOC)
src/pinwheel/core/game_loop.py
async def _get_playoff_series_record(
    repo: Repository,
    season_id: str,
    team_a_id: str,
    team_b_id: str,
    before_round: int | None = None,
) -> tuple[int, int, int]:
    """Get win counts for a playoff series between two teams.

    Returns (team_a_wins, team_b_wins, games_played).
    Counts games where this specific team pair was scheduled to play,
    regardless of round number — so manually-inserted or late-committed
    schedule entries are always included.

    Args:
        before_round: If set, only count games with round_number < before_round.
            Used by the display layer to show the pre-game series state for
            each historical game rather than the current overall series record.
    """
    playoff_schedule = await repo.get_full_schedule(season_id, phase="playoff")
    pair = frozenset({team_a_id, team_b_id})
    # Rounds where this specific matchup was scheduled (not all playoff rounds)
    scheduled_rounds = {
        s.round_number
      
Repobility · MCP-ready · https://repobility.com
_schedule_next_series_game function · python · L254-L297 (44 LOC)
src/pinwheel/core/game_loop.py
async def _schedule_next_series_game(
    repo: Repository,
    season_id: str,
    higher_seed_id: str,
    lower_seed_id: str,
    games_played: int,
    round_number: int,
    matchup_index: int,
    phase: str = "semifinal",
) -> None:
    """Schedule the next game in a playoff series with alternating home court.

    Higher seed has home court in odd-numbered games (1, 3, 5, ...).

    Args:
        phase: Precise playoff phase — ``"semifinal"`` or ``"finals"``.
    """
    if games_played % 2 == 0:
        home_team_id = higher_seed_id
        away_team_id = lower_seed_id
    else:
        home_team_id = lower_seed_id
        away_team_id = higher_seed_id

    try:
        await repo.create_schedule_entry(
            season_id=season_id,
            round_number=round_number,
            matchup_index=matchup_index,
            home_team_id=home_team_id,
            away_team_id=away_team_id,
            phase=phase,
        )
    except IntegrityError:
        # Entry already e
compute_standings_from_repo function · python · L594-L619 (26 LOC)
src/pinwheel/core/game_loop.py
async def compute_standings_from_repo(repo: Repository, season_id: str) -> list[dict]:
    """Compute W-L standings from game results stored in the database.

    Reuses the existing ``compute_standings`` function from ``scheduler.py``,
    enriching each entry with the team name.  Results are sorted by wins
    descending, then point differential descending.
    """
    games = await repo.get_all_games(season_id)
    results: list[dict] = []
    for g in games:
        results.append(
            {
                "home_team_id": g.home_team_id,
                "away_team_id": g.away_team_id,
                "home_score": g.home_score,
                "away_score": g.away_score,
                "winner_team_id": g.winner_team_id,
            }
        )
    standings = compute_standings(results)
    # Enrich with team names
    for s in standings:
        team = await repo.get_team(s["team_id"])
        if team:
            s["team_name"] = team.name
    return standings
generate_playoff_bracket function · python · L622-L709 (88 LOC)
src/pinwheel/core/game_loop.py
async def generate_playoff_bracket(
    repo: Repository,
    season_id: str,
    num_playoff_teams: int = 4,
) -> list[dict]:
    """Generate playoff matchups from final standings.

    Standard bracket: #1 vs #4, #2 vs #3 (semis), winners play finals.
    Stores playoff schedule entries in the database and returns the bracket
    as a list of matchup dicts.

    If fewer teams than ``num_playoff_teams`` exist, the bracket shrinks
    accordingly.  Returns an empty list if fewer than 2 teams qualify.
    """
    standings = await compute_standings_from_repo(repo, season_id)
    playoff_teams = standings[:num_playoff_teams]

    if len(playoff_teams) < 2:
        return []

    # Determine first available round number for playoffs
    full_schedule = await repo.get_full_schedule(season_id)
    max_round = max((s.round_number for s in full_schedule), default=0) if full_schedule else 0
    games = await repo.get_all_games(season_id)
    max_played = max((g.round_number for g in games), d
‹ prevpage 5 / 14next ›