Function bodies 688 total
apply_variance function · python · L193-L229 (37 LOC)src/pinwheel/core/archetypes.py
def apply_variance(
base: PlayerAttributes,
rng_seed: int,
variance: int = 10,
) -> PlayerAttributes:
"""Apply random variance to base archetype attributes.
After applying per-attribute jitter, the total is normalized back to
the original base total so the result stays within the 360-point
budget (+/- VARIANCE enforced by the model validator).
"""
import random
rng = random.Random(rng_seed)
data = base.model_dump()
target_total = base.total()
for key in data:
delta = rng.randint(-variance, variance)
data[key] = max(1, min(100, data[key] + delta))
# Normalize: redistribute the drift so total matches the base budget.
current_total = sum(data.values())
drift = current_total - target_total
keys = list(data.keys())
rng.shuffle(keys)
for key in keys:
if drift == 0:
break
if drift > 0:
reduction = min(drift, data[key] - 1)
data[key] -= reduction
GameContext.meta_get method · python · L96-L102 (7 LOC)src/pinwheel/core/codegen.py
def meta_get(
self,
entity_type: str,
entity_id: str,
field_name: str,
default: object = None,
) -> object: ...SandboxedGameContext.meta_get method · python · L162-L172 (11 LOC)src/pinwheel/core/codegen.py
def meta_get(
self,
entity_type: str,
entity_id: str,
field_name: str,
default: object = None,
) -> object:
"""Read from MetaStore if available (trust level STATE+)."""
if self._meta_store_ref is None:
return default
return self._meta_store_ref.get(entity_type, entity_id, field_name, default=default)clamp_result function · python · L213-L243 (31 LOC)src/pinwheel/core/codegen.py
def clamp_result(result: CodegenHookResult) -> CodegenHookResult:
"""Enforce bounds on generated code output.
Even if the code tries to return score_modifier=9999, it gets clamped.
"""
for field_name, (lo, hi) in RESULT_BOUNDS.items():
val = getattr(result, field_name)
if isinstance(val, (int, float)):
clamped = max(lo, min(hi, val))
# Preserve type: int fields stay int, float fields stay float
if isinstance(lo, int) and isinstance(hi, int):
setattr(result, field_name, int(clamped))
else:
setattr(result, field_name, float(clamped))
# Clamp narrative
if len(result.narrative_note) > MAX_NARRATIVE_LENGTH:
result.narrative_note = result.narrative_note[:MAX_NARRATIVE_LENGTH]
# Clamp meta_writes
if result.meta_writes:
if len(result.meta_writes) > MAX_META_WRITES:
trimmed = dict(list(result.meta_writes.items())[:MAX_META_WRITES])
enforce_trust_level function · python · L293-L303 (11 LOC)src/pinwheel/core/codegen.py
def enforce_trust_level(
result: CodegenHookResult,
trust_level: CodegenTrustLevel,
) -> CodegenHookResult:
"""Zero out any fields the code shouldn't be writing to."""
allowed = TRUST_LEVEL_ALLOWED_RESULT_FIELDS[trust_level]
defaults = CodegenHookResult()
for f in dataclasses.fields(result):
if f.name not in allowed:
setattr(result, f.name, getattr(defaults, f.name))
return resultCodegenASTValidator.validate method · python · L346-L438 (93 LOC)src/pinwheel/core/codegen.py
def validate(self, code: str) -> list[str]:
"""Return list of violations. Empty list = code is valid."""
violations: list[str] = []
if len(code) > MAX_CODE_LENGTH:
violations.append(
f"Code exceeds max length ({len(code)} > {MAX_CODE_LENGTH})"
)
return violations
try:
tree = ast.parse(code, mode="exec")
except SyntaxError as e:
violations.append(f"Syntax error: {e}")
return violations
# Check AST depth
depth = self._max_depth(tree)
if depth > MAX_AST_DEPTH:
violations.append(f"AST depth {depth} exceeds max {MAX_AST_DEPTH}")
for node in ast.walk(tree):
# No imports
if isinstance(node, ast.Import | ast.ImportFrom):
violations.append(
f"Import statement at line {node.lineno}"
)
# No forbidden function calls
if (
CodegenASTValidator._is_bounded_for method · python · L440-L473 (34 LOC)src/pinwheel/core/codegen.py
def _is_bounded_for(self, node: ast.For) -> bool:
"""Check that a for-loop uses range() with bounded arg, or dict iteration."""
if not isinstance(node.iter, ast.Call):
return False
func = node.iter.func
# range(N) where N is a literal <= MAX_LOOP_BOUND
if isinstance(func, ast.Name) and func.id == "range":
args = node.iter.args
if not args:
return False
stop_arg = args[0] if len(args) == 1 else args[1]
if isinstance(stop_arg, ast.Constant) and isinstance(stop_arg.value, int):
return stop_arg.value <= MAX_LOOP_BOUND
return False
# .items(), .values(), .keys() calls on any object
if isinstance(func, ast.Attribute) and func.attr in ("items", "values", "keys"):
return True
# enumerate() wrapping a bounded iterable — allow if wrapping dict method
if isinstance(func, ast.Name) and func.id == "enumeraRepobility — same analyzer, your code, free for public repos · /scan/
CodegenASTValidator._max_depth method · python · L475-L482 (8 LOC)src/pinwheel/core/codegen.py
def _max_depth(self, node: ast.AST, current: int = 0) -> int:
"""Compute maximum AST depth."""
max_d = current
for child in ast.iter_child_nodes(node):
child_depth = self._max_depth(child, current + 1)
if child_depth > max_d:
max_d = child_depth
return max_dexecute_codegen_effect function · python · L519-L584 (66 LOC)src/pinwheel/core/codegen.py
def execute_codegen_effect(
code: str,
ctx: GameContext,
rng: object, # random.Random
) -> CodegenHookResult:
"""Execute generated code in a sandboxed environment.
The code string is a function body that was previously validated
by CodegenASTValidator and approved by the council.
"""
import math
import signal
# Wrap the code in a function definition
lines = code.strip().split("\n")
indented = "\n".join(f" {line}" for line in lines)
wrapped = f"def _codegen_execute(ctx, rng, math, HookResult):\n{indented}\n"
# Build restricted globals
sandbox_globals: dict[str, object] = {
"__builtins__": SANDBOX_BUILTINS,
}
# Compile
try:
compiled = compile(wrapped, "<codegen-effect>", "exec")
except SyntaxError as e:
raise SandboxViolation(
violation_type="syntax_error",
detail=str(e),
) from e
# Execute the function definition (defines _codegen_execute in saselect_scheme function · python · L43-L111 (69 LOC)src/pinwheel/core/defense.py
def select_scheme(
offense: list[HooperState],
defense: list[HooperState],
game_state: GameState,
rules: RuleSet,
rng: random.Random,
strategy: TeamStrategy | None = None,
) -> DefensiveScheme:
"""Select defensive scheme based on team attributes, game state, and strategy.
When a TeamStrategy is provided, ``defensive_intensity`` nudges scheme weights:
high intensity (> 0.2) favours man-tight and press (aggressive schemes),
while low intensity (< -0.1) favours zone (passive/conserve energy).
"""
if not defense:
return "zone"
avg_def_iq = sum(d.current_attributes.iq for d in defense) / len(defense)
avg_def_stamina = sum(d.current_stamina for d in defense) / len(defense)
avg_off_speed = sum(o.current_attributes.speed for o in offense) / len(offense)
# Score-based tendencies
score_diff = game_state.score_diff
if not game_state.home_has_ball:
score_diff = -score_diff # from defense's perspective
assign_matchups function · python · L114-L151 (38 LOC)src/pinwheel/core/defense.py
def assign_matchups(
offense: list[HooperState],
defense: list[HooperState],
scheme: DefensiveScheme,
rng: random.Random,
) -> dict[str, str]:
"""Assign defensive matchups. Returns {defender_id: attacker_id}.
Man schemes: match by role (best defender on best scorer).
Zone: distributed assignment.
"""
if not offense or not defense:
return {}
if scheme in ("man_tight", "man_switch"):
# Sort offense by scoring (desc), defense by defense (desc)
off_sorted = sorted(offense, key=lambda a: a.current_attributes.scoring, reverse=True)
def_sorted = sorted(defense, key=lambda a: a.current_attributes.defense, reverse=True)
matchups = {}
for i, d in enumerate(def_sorted):
opp = off_sorted[i % len(off_sorted)]
matchups[d.hooper.id] = opp.hooper.id
return matchups
if scheme == "zone":
# Zone: each defender covers a zone, roughly rotational assignment
matchups get_primary_defender function · python · L154-L166 (13 LOC)src/pinwheel/core/defense.py
def get_primary_defender(
ball_handler: HooperState,
matchups: dict[str, str],
defense: list[HooperState],
) -> HooperState:
"""Find the defender assigned to guard the ball handler."""
for def_id, off_id in matchups.items():
if off_id == ball_handler.hooper.id:
for d in defense:
if d.hooper.id == def_id:
return d
# Fallback: first available defender
return defense[0] if defense else ball_handlerget_pending_interpretations function · python · L35-L60 (26 LOC)src/pinwheel/core/deferred_interpreter.py
async def get_pending_interpretations(
repo: Repository,
season_id: str,
) -> list[GovernanceEvent]:
"""Find pending_interpretation events without a corresponding ready or expired event."""
pending_events = await repo.get_events_by_type(
season_id=season_id,
event_types=["proposal.pending_interpretation"],
)
ready_events = await repo.get_events_by_type(
season_id=season_id,
event_types=["proposal.interpretation_ready"],
)
expired_events = await repo.get_events_by_type(
season_id=season_id,
event_types=["proposal.interpretation_expired"],
)
# Build set of resolved aggregate IDs
resolved_ids: set[str] = set()
for ev in ready_events:
resolved_ids.add(ev.aggregate_id)
for ev in expired_events:
resolved_ids.add(ev.aggregate_id)
return [ev for ev in pending_events if ev.aggregate_id not in resolved_ids]retry_pending_interpretation function · python · L63-L166 (104 LOC)src/pinwheel/core/deferred_interpreter.py
async def retry_pending_interpretation(
repo: Repository,
pending: GovernanceEvent,
api_key: str,
) -> bool:
"""Retry interpretation for a single pending event.
Returns True if the retry succeeded and the interpretation_ready event was appended.
Returns False if the retry failed or was expired after MAX_RETRIES.
"""
from pinwheel.ai.interpreter import interpret_proposal_v2
from pinwheel.models.governance import ProposalInterpretation
from pinwheel.models.rules import RuleSet
payload = pending.payload
raw_text = str(payload.get("raw_text", ""))
ruleset_data = payload.get("ruleset", {})
if not raw_text:
return False
# Count prior failed retries — expire after MAX_RETRIES
retry_failed_events = await repo.get_events_by_type(
season_id=pending.season_id,
event_types=["proposal.interpretation_retry_failed"],
)
retry_count = sum(
1 for e in retry_failed_events if e.aggregate_id == pendin_expire_and_refund function · python · L169-L195 (27 LOC)src/pinwheel/core/deferred_interpreter.py
async def _expire_and_refund(repo: Repository, pending: GovernanceEvent) -> None:
"""Expire a pending interpretation and refund the governor's token."""
await repo.append_event(
event_type="proposal.interpretation_expired",
aggregate_id=pending.aggregate_id,
aggregate_type="proposal",
season_id=pending.season_id,
governor_id=pending.governor_id,
payload={
"reason": "max_retries_exceeded",
"raw_text": pending.payload.get("raw_text", ""),
},
)
token_cost = pending.payload.get("token_cost", 1)
if isinstance(token_cost, (int, float)):
await repo.append_event(
event_type="token.regenerated",
aggregate_id=pending.governor_id,
aggregate_type="token",
season_id=pending.season_id,
governor_id=pending.governor_id,
payload={
"token_type": "propose",
"amount": int(token_cost),
Repobility · MCP-ready · https://repobility.com
expire_stale_pending function · python · L198-L251 (54 LOC)src/pinwheel/core/deferred_interpreter.py
async def expire_stale_pending(
repo: Repository,
season_id: str,
max_age_hours: float = MAX_PENDING_AGE_HOURS,
) -> list[str]:
"""Expire pending interpretations older than max_age_hours. Refund tokens.
Returns list of expired aggregate IDs.
"""
from datetime import UTC, datetime, timedelta
pending = await get_pending_interpretations(repo, season_id)
now = datetime.now(UTC)
cutoff = now - timedelta(hours=max_age_hours)
expired_ids: list[str] = []
for ev in pending:
ev_time = getattr(ev, "created_at", None) or getattr(ev, "timestamp", None)
if ev_time is None or ev_time < cutoff:
# Expire it
await repo.append_event(
event_type="proposal.interpretation_expired",
aggregate_id=ev.aggregate_id,
aggregate_type="proposal",
season_id=season_id,
governor_id=ev.governor_id,
payload={
"reason"_dm_player_with_interpretation function · python · L254-L342 (89 LOC)src/pinwheel/core/deferred_interpreter.py
async def _dm_player_with_interpretation(
bot: discord.Client,
ready_event: GovernanceEvent,
engine: object,
settings: object,
) -> None:
"""DM the player with their deferred interpretation result.
Sends an embed with Confirm/Revise/Cancel buttons. If the player
can't be reached (left server, DMs closed), the interpretation and
token are expired.
"""
from pinwheel.discord.embeds import build_interpretation_embed
from pinwheel.discord.helpers import GovernorInfo
from pinwheel.discord.views import ProposalConfirmView
from pinwheel.models.governance import ProposalInterpretation
payload = ready_event.payload
discord_user_id = payload.get("discord_user_id")
if not discord_user_id:
return
interp_data = payload.get("interpretation", {})
if not isinstance(interp_data, dict):
return
try:
interpretation_v2 = ProposalInterpretation(**interp_data)
except (ValidationError, TypeError):
tick_deferred_interpretations function · python · L345-L417 (73 LOC)src/pinwheel/core/deferred_interpreter.py
async def tick_deferred_interpretations(
engine: AsyncEngine,
api_key: str,
bot: discord.Client | None = None,
settings: Settings | None = None,
) -> None:
"""Scheduler entry point — called every 60 seconds.
1. Find pending interpretations
2. Retry each one
3. DM players for successful retries
4. Expire stale ones
"""
from pinwheel.db.engine import get_session
from pinwheel.db.repository import Repository
if not api_key:
return
try:
async with get_session(engine) as session:
repo = Repository(session)
# Check ALL seasons for pending interpretations — proposals can
# be submitted in a season that later completes while still pending.
all_seasons = await repo.get_all_seasons()
if not all_seasons:
return
total_expired: list[str] = []
all_pending: list[tuple[str, GovernanceEvent]] = []
for season in all_annotate_drama function · python · L42-L176 (135 LOC)src/pinwheel/core/drama.py
def annotate_drama(game_result: GameResult) -> list[DramaAnnotation]:
"""Pre-classify every possession in a game for dramatic pacing.
Because the presenter has the full GameResult, this runs once before
streaming begins. The annotations drive both pacing and visual treatment.
Detection rules (in order of evaluation):
- Lead changes and tie-breaking
- Scoring runs (momentum detection)
- Move activations (signature plays)
- Elam Ending approach (target score proximity)
- Game-winning shot (final scoring possession)
- Elam period transition
- Close game in late quarters
Returns a list of DramaAnnotation, one per possession, in order.
"""
annotations: list[DramaAnnotation] = []
possessions = game_result.possession_log
if not possessions:
return annotations
# Pre-compute game-level context
elam_target = game_result.elam_target_score
total_possessions = len(possessions)
lasnormalize_delays function · python · L179-L207 (29 LOC)src/pinwheel/core/drama.py
def normalize_delays(
annotations: list[DramaAnnotation],
quarter_seconds: float,
) -> list[float]:
"""Convert drama annotations into actual delay values (seconds).
Normalizes so total delay across the quarter equals ``quarter_seconds``.
Dramatic moments get more time, routine moments get less, but the total
quarter duration stays the same.
Args:
annotations: DramaAnnotations for possessions in a single quarter.
quarter_seconds: Wall-clock budget for the quarter.
Returns:
A list of delay values (seconds), one per annotation, summing to
approximately ``quarter_seconds``.
"""
if not annotations:
return []
raw_multipliers = [a.delay_multiplier for a in annotations]
total_raw = sum(raw_multipliers)
if total_raw == 0:
base_delay = quarter_seconds / max(len(annotations), 1)
return [base_delay] * len(annotations)
base_delay = quarter_seconds / total_raw
return [m * base_delcompute_drama_score function · python · L210-L291 (82 LOC)src/pinwheel/core/drama.py
def compute_drama_score(
game_result: GameResult,
*,
is_playoff: bool = False,
) -> float:
"""Compute a single game-level drama score from 0.0 (boring) to 1.0 (incredible).
The score is a weighted combination of four factors:
1. **Score differential** (weight 0.35) -- Close finishes are dramatic.
A 1-point margin scores 1.0; a 30+ point blowout scores 0.0.
2. **Lead changes** (weight 0.25) -- Games with many lead changes are dramatic.
6+ lead changes scores 1.0; 0 scores 0.0.
3. **Elam ending** (weight 0.20) -- Games that activated the Elam ending
get a drama boost. Elam itself scores 0.6; if the Elam margin was
also close (target reached by <= 3 points), scores 1.0.
4. **Playoff context** (weight 0.20) -- Playoff games are inherently
more dramatic. Playoff scores 1.0; regular season scores 0.0.
Args:
game_result: The completed game's result.
is_playoff: Whether this game is a playoff game.
get_drama_summary function · python · L294-L302 (9 LOC)src/pinwheel/core/drama.py
def get_drama_summary(annotations: list[DramaAnnotation]) -> dict[str, int]:
"""Summarize drama level counts for logging/debugging.
Returns a dict like {"routine": 40, "elevated": 5, "high": 8, "peak": 2}.
"""
counts: dict[str, int] = {"routine": 0, "elevated": 0, "high": 0, "peak": 0}
for a in annotations:
counts[a.level] = counts.get(a.level, 0) + 1
return countsEffectRegistry.register method · python · L37-L46 (10 LOC)src/pinwheel/core/effects.py
def register(self, effect: RegisteredEffect) -> None:
"""Register a new effect."""
self._effects[effect.effect_id] = effect
logger.info(
"effect_registered id=%s type=%s hooks=%s lifetime=%s",
effect.effect_id,
effect.effect_type,
effect.hook_points,
effect.lifetime.value,
)Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
EffectRegistry.deregister method · python · L48-L53 (6 LOC)src/pinwheel/core/effects.py
def deregister(self, effect_id: str) -> RegisteredEffect | None:
"""Remove an effect from the registry. Returns the removed effect or None."""
effect = self._effects.pop(effect_id, None)
if effect:
logger.info("effect_deregistered id=%s", effect_id)
return effectEffectRegistry.get_effects_for_hook method · python · L55-L60 (6 LOC)src/pinwheel/core/effects.py
def get_effects_for_hook(self, hook: str) -> list[RegisteredEffect]:
"""Get all active effects that listen on a specific hook point."""
return [
e for e in self._effects.values()
if hook in e.hook_points
]EffectRegistry.get_narrative_effects method · python · L66-L71 (6 LOC)src/pinwheel/core/effects.py
def get_narrative_effects(self) -> list[RegisteredEffect]:
"""Get all active narrative effects."""
return [
e for e in self._effects.values()
if e.effect_type == "narrative"
]EffectRegistry.tick_round method · python · L73-L88 (16 LOC)src/pinwheel/core/effects.py
def tick_round(self, current_round: int) -> list[str]:
"""Advance round counters for all effects.
Returns list of effect IDs that have expired.
"""
expired_ids: list[str] = []
for effect in list(self._effects.values()):
if effect.tick_round():
expired_ids.append(effect.effect_id)
self._effects.pop(effect.effect_id, None)
logger.info(
"effect_expired id=%s round=%d",
effect.effect_id,
current_round,
)
return expired_idsEffectRegistry.get_effects_for_proposal method · python · L94-L99 (6 LOC)src/pinwheel/core/effects.py
def get_effects_for_proposal(self, proposal_id: str) -> list[RegisteredEffect]:
"""Get all effects from a specific proposal."""
return [
e for e in self._effects.values()
if e.proposal_id == proposal_id
]EffectRegistry.remove_effect method · python · L101-L107 (7 LOC)src/pinwheel/core/effects.py
def remove_effect(self, effect_id: str) -> bool:
"""Remove an effect by ID. Returns True if removed, False if not found."""
effect = self._effects.pop(effect_id, None)
if effect:
logger.info("effect_removed id=%s type=%s", effect_id, effect.effect_type)
return True
return FalseEffectRegistry.build_effects_summary method · python · L114-L149 (36 LOC)src/pinwheel/core/effects.py
def build_effects_summary(self) -> str:
"""Build a human-readable summary of active effects for report context."""
if not self._effects:
return "No active effects."
lines: list[str] = []
for effect in self._effects.values():
desc = (
effect.description
or effect.narrative_instruction
or effect.effect_type
)
lifetime_str = effect.lifetime.value
if effect.rounds_remaining is not None:
lifetime_str = f"{effect.rounds_remaining} rounds remaining"
type_label = (
"PENDING MECHANIC"
if effect.effect_type == "custom_mechanic"
else effect.effect_type
)
line = f"- [{type_label}] {desc} ({lifetime_str})"
# Codegen metadata
if effect.effect_type == "codegen":
status = (
"enabled" if effect.codegeneffect_spec_to_registered function · python · L152-L242 (91 LOC)src/pinwheel/core/effects.py
def effect_spec_to_registered(
spec: EffectSpec,
proposal_id: str,
current_round: int,
) -> RegisteredEffect:
"""Convert an EffectSpec from AI interpretation into a RegisteredEffect.
Maps the structured spec into the runtime effect object that the
registry manages.
"""
effect_id = str(uuid.uuid4())
# Determine lifetime
if spec.duration == "permanent":
lifetime = EffectLifetime.PERMANENT
elif spec.duration == "n_rounds":
lifetime = EffectLifetime.N_ROUNDS
elif spec.duration == "one_game":
lifetime = EffectLifetime.ONE_GAME
elif spec.duration == "until_repealed":
lifetime = EffectLifetime.UNTIL_REPEALED
else:
lifetime = EffectLifetime.PERMANENT
# Determine hook points based on effect type
hook_points: list[str] = []
if spec.effect_type == "codegen" and spec.codegen:
# Codegen effects use hook points from the CodegenEffectSpec
hook_points = list(spec.codegen.hook_Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
collect_game_def_patches function · python · L245-L270 (26 LOC)src/pinwheel/core/effects.py
def collect_game_def_patches(
effects: list[RegisteredEffect],
) -> list[dict[str, object]]:
"""Extract game definition patch dicts from active effects.
Returns a list of raw patch dicts (suitable for constructing
``GameDefinitionPatch`` objects) from effects whose type is
``modify_game_definition``. Patches are returned in registration
order (by ``registered_at_round``, then by ``effect_id``).
Args:
effects: All active effects (typically from ``EffectRegistry.get_all_active()``).
Returns:
List of patch dicts, each deserializable via ``GameDefinitionPatch(**d)``.
"""
game_def_effects = [
e for e in effects
if e.effect_type == "modify_game_definition"
and e.action_code
and isinstance(e.action_code.get("patch"), dict)
]
# Sort by registration order for deterministic application
game_def_effects.sort(key=lambda e: (e.registered_at_round, e.effect_id))
return [e.action_code["patch"] register_effects_for_proposal function · python · L273-L306 (34 LOC)src/pinwheel/core/effects.py
async def register_effects_for_proposal(
repo: Repository,
registry: EffectRegistry,
proposal_id: str,
effects: list[EffectSpec],
season_id: str,
current_round: int,
) -> list[RegisteredEffect]:
"""Register effects for a passing proposal.
Creates RegisteredEffect objects, adds them to the registry,
and persists them via effect.registered events.
"""
registered: list[RegisteredEffect] = []
for spec in effects:
# Skip parameter_change effects — those go through the existing RuleSet path
if spec.effect_type == "parameter_change":
continue
effect = effect_spec_to_registered(spec, proposal_id, current_round)
registry.register(effect)
registered.append(effect)
# Persist to event store
await repo.append_event(
event_type="effect.registered",
aggregate_id=effect.effect_id,
aggregate_type="effect",
season_id=season_id,
load_effect_registry function · python · L309-L357 (49 LOC)src/pinwheel/core/effects.py
async def load_effect_registry(
repo: Repository,
season_id: str,
) -> EffectRegistry:
"""Rebuild the effect registry from the event store.
Replays effect.registered events and removes any effects that
have been expired or repealed.
"""
registry = EffectRegistry()
# Load all effect events
registered_events = await repo.get_events_by_type(
season_id=season_id,
event_types=["effect.registered"],
)
expired_events = await repo.get_events_by_type(
season_id=season_id,
event_types=["effect.expired"],
)
repealed_events = await repo.get_events_by_type(
season_id=season_id,
event_types=["effect.repealed"],
)
# Build set of dead effect IDs
dead_ids: set[str] = set()
for ev in expired_events:
dead_ids.add(str(ev.payload.get("effect_id", ev.aggregate_id)))
for ev in repealed_events:
dead_ids.add(str(ev.payload.get("effect_id", ev.aggregate_id)))
# Registerpersist_expired_effects function · python · L360-L373 (14 LOC)src/pinwheel/core/effects.py
async def persist_expired_effects(
repo: Repository,
season_id: str,
expired_ids: list[str],
) -> None:
"""Persist effect expiration events."""
for effect_id in expired_ids:
await repo.append_event(
event_type="effect.expired",
aggregate_id=effect_id,
aggregate_type="effect",
season_id=season_id,
payload={"effect_id": effect_id, "reason": "lifetime_expired"},
)activate_custom_mechanic function · python · L376-L421 (46 LOC)src/pinwheel/core/effects.py
async def activate_custom_mechanic(
repo: Repository,
registry: EffectRegistry,
effect_id: str,
season_id: str,
hook_point: str | None = None,
action_code: dict[str, object] | None = None,
) -> bool:
"""Activate a pending custom_mechanic effect with real hook/action implementation.
Called by admin via /activate-mechanic. If hook_point and action_code
are provided, the effect becomes a real hook_callback. If not, the
approximation (already live) is confirmed as good enough.
Returns True if the effect was found and activated.
"""
effect = registry.get_effect(effect_id)
if effect is None or effect.effect_type != "custom_mechanic":
return False
if hook_point and action_code:
# Upgrade to a real hook_callback
effect.effect_type = "hook_callback"
effect._hook_points = [hook_point]
effect.action_code = action_code
# Persist activation event
await repo.append_event(
event_typrepeal_effect function · python · L424-L460 (37 LOC)src/pinwheel/core/effects.py
async def repeal_effect(
repo: Repository,
registry: EffectRegistry,
effect_id: str,
season_id: str,
proposal_id: str,
) -> bool:
"""Repeal an active effect via governance.
Writes an effect.repealed event to the event store and removes the
effect from the in-memory registry. Returns True if the effect was
found and removed, False if it was not in the registry.
"""
removed = registry.remove_effect(effect_id)
# Always write the repeal event — even if the effect was already
# expired in-memory, the event store needs the record so that
# load_effect_registry() excludes it on future reloads.
await repo.append_event(
event_type="effect.repealed",
aggregate_id=effect_id,
aggregate_type="effect",
season_id=season_id,
payload={
"effect_id": effect_id,
"reason": "governance_repeal",
"proposal_id": proposal_id,
},
)
logger.info(
"effectEventBus.publish method · python · L37-L59 (23 LOC)src/pinwheel/core/event_bus.py
async def publish(self, event_type: str, data: dict[str, Any]) -> int:
"""Publish an event to all subscribers of this type + wildcard subscribers.
Returns the number of subscribers that received the event.
"""
envelope = {"type": event_type, "data": data}
count = 0
for queue in self._subscribers.get(event_type, []):
try:
queue.put_nowait(envelope)
count += 1
except asyncio.QueueFull:
logger.warning("Dropping event %s for slow subscriber", event_type)
for queue in self._wildcard_subscribers:
try:
queue.put_nowait(envelope)
count += 1
except asyncio.QueueFull:
logger.warning("Dropping wildcard event %s for slow subscriber", event_type)
return countEventBus.subscribe method · python · L61-L68 (8 LOC)src/pinwheel/core/event_bus.py
def subscribe(self, event_type: str | None = None, max_size: int = 100) -> Subscription:
"""Create a subscription for a specific event type (or all events if None).
Returns a Subscription that works as an async iterator.
Must be used as an async context manager to ensure cleanup.
"""
queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=max_size)
return Subscription(self, queue, event_type)Repobility — same analyzer, your code, free for public repos · /scan/
EventBus._unregister method · python · L76-L82 (7 LOC)src/pinwheel/core/event_bus.py
def _unregister(self, queue: asyncio.Queue[dict[str, Any]], event_type: str | None) -> None:
if event_type is None:
with contextlib.suppress(ValueError):
self._wildcard_subscribers.remove(queue)
else:
with contextlib.suppress(ValueError):
self._subscribers[event_type].remove(queue)Subscription.__init__ method · python · L94-L103 (10 LOC)src/pinwheel/core/event_bus.py
def __init__(
self,
bus: EventBus,
queue: asyncio.Queue[dict[str, Any]],
event_type: str | None,
) -> None:
self._bus = bus
self._queue = queue
self._event_type = event_type
self._active = FalseSubscription.__anext__ method · python · L117-L123 (7 LOC)src/pinwheel/core/event_bus.py
async def __anext__(self) -> dict[str, Any]:
if not self._active:
raise StopAsyncIteration
try:
return await self._queue.get()
except asyncio.CancelledError:
raise StopAsyncIteration from NoneSubscription.get method · python · L125-L130 (6 LOC)src/pinwheel/core/event_bus.py
async def get(self, timeout: float | None = None) -> dict[str, Any] | None:
"""Get next event with optional timeout. Returns None on timeout."""
try:
return await asyncio.wait_for(self._queue.get(), timeout=timeout)
except TimeoutError:
return None_row_to_team function · python · L88-L120 (33 LOC)src/pinwheel/core/game_loop.py
def _row_to_team(team_row: TeamRow) -> Team:
"""Convert a TeamRow + HooperRows to domain Team model."""
# suppress_budget_check() bypasses budget validation for DB-persisted
# hoopers that may predate the budget enforcement rule.
with suppress_budget_check():
hoopers = []
for idx, a in enumerate(team_row.hoopers):
attrs = PlayerAttributes(**a.attributes)
raw_moves = a.moves if hasattr(a, "moves") and a.moves else []
moves = [Move(**m) if isinstance(m, dict) else m for m in raw_moves]
hoopers.append(
Hooper(
id=a.id,
name=a.name,
team_id=a.team_id,
archetype=a.archetype,
attributes=attrs,
moves=moves,
is_starter=idx < 3,
)
)
venue_data = team_row.venue
venue = Venue(**(venue_data or {"name": "Default Arena"}))
ret_check_earned_moves function · python · L123-L183 (61 LOC)src/pinwheel/core/game_loop.py
async def _check_earned_moves(
repo: Repository,
season_id: str,
round_number: int,
teams_cache: dict[str, Team],
event_bus: EventBus | None = None,
) -> list[dict]:
"""Check all hoopers for newly earned moves via milestone thresholds.
Iterates every hooper in teams_cache, aggregates their season stats,
and grants any moves whose milestone thresholds have been crossed.
Returns a list of grant dicts for narrative integration.
"""
grants: list[dict] = []
for team in teams_cache.values():
for hooper in team.hoopers:
season_stats = await repo.get_hooper_season_stats(hooper.id, season_id)
existing_move_names = {m.name for m in hooper.moves}
new_moves = check_milestones(season_stats, existing_move_names)
for move in new_moves:
await repo.add_hooper_move(hooper.id, move.model_dump())
grant = {
"hooper_id": hooper.id,
_check_season_complete function · python · L186-L199 (14 LOC)src/pinwheel/core/game_loop.py
async def _check_season_complete(repo: Repository, season_id: str) -> bool:
"""Check if all scheduled regular-season games have been played.
Compares the set of round numbers in the regular-season schedule against
the set of round numbers that have game results stored. Returns True only
when every scheduled round has at least one played game.
"""
schedule = await repo.get_full_schedule(season_id, phase="regular")
if not schedule:
return False
games = await repo.get_all_games(season_id)
played_rounds = {g.round_number for g in games}
scheduled_rounds = {s.round_number for s in schedule}
return scheduled_rounds.issubset(played_rounds)_get_playoff_series_record function · python · L207-L251 (45 LOC)src/pinwheel/core/game_loop.py
async def _get_playoff_series_record(
repo: Repository,
season_id: str,
team_a_id: str,
team_b_id: str,
before_round: int | None = None,
) -> tuple[int, int, int]:
"""Get win counts for a playoff series between two teams.
Returns (team_a_wins, team_b_wins, games_played).
Counts games where this specific team pair was scheduled to play,
regardless of round number — so manually-inserted or late-committed
schedule entries are always included.
Args:
before_round: If set, only count games with round_number < before_round.
Used by the display layer to show the pre-game series state for
each historical game rather than the current overall series record.
"""
playoff_schedule = await repo.get_full_schedule(season_id, phase="playoff")
pair = frozenset({team_a_id, team_b_id})
# Rounds where this specific matchup was scheduled (not all playoff rounds)
scheduled_rounds = {
s.round_number
Repobility · MCP-ready · https://repobility.com
_schedule_next_series_game function · python · L254-L297 (44 LOC)src/pinwheel/core/game_loop.py
async def _schedule_next_series_game(
repo: Repository,
season_id: str,
higher_seed_id: str,
lower_seed_id: str,
games_played: int,
round_number: int,
matchup_index: int,
phase: str = "semifinal",
) -> None:
"""Schedule the next game in a playoff series with alternating home court.
Higher seed has home court in odd-numbered games (1, 3, 5, ...).
Args:
phase: Precise playoff phase — ``"semifinal"`` or ``"finals"``.
"""
if games_played % 2 == 0:
home_team_id = higher_seed_id
away_team_id = lower_seed_id
else:
home_team_id = lower_seed_id
away_team_id = higher_seed_id
try:
await repo.create_schedule_entry(
season_id=season_id,
round_number=round_number,
matchup_index=matchup_index,
home_team_id=home_team_id,
away_team_id=away_team_id,
phase=phase,
)
except IntegrityError:
# Entry already ecompute_standings_from_repo function · python · L594-L619 (26 LOC)src/pinwheel/core/game_loop.py
async def compute_standings_from_repo(repo: Repository, season_id: str) -> list[dict]:
"""Compute W-L standings from game results stored in the database.
Reuses the existing ``compute_standings`` function from ``scheduler.py``,
enriching each entry with the team name. Results are sorted by wins
descending, then point differential descending.
"""
games = await repo.get_all_games(season_id)
results: list[dict] = []
for g in games:
results.append(
{
"home_team_id": g.home_team_id,
"away_team_id": g.away_team_id,
"home_score": g.home_score,
"away_score": g.away_score,
"winner_team_id": g.winner_team_id,
}
)
standings = compute_standings(results)
# Enrich with team names
for s in standings:
team = await repo.get_team(s["team_id"])
if team:
s["team_name"] = team.name
return standingsgenerate_playoff_bracket function · python · L622-L709 (88 LOC)src/pinwheel/core/game_loop.py
async def generate_playoff_bracket(
repo: Repository,
season_id: str,
num_playoff_teams: int = 4,
) -> list[dict]:
"""Generate playoff matchups from final standings.
Standard bracket: #1 vs #4, #2 vs #3 (semis), winners play finals.
Stores playoff schedule entries in the database and returns the bracket
as a list of matchup dicts.
If fewer teams than ``num_playoff_teams`` exist, the bracket shrinks
accordingly. Returns an empty list if fewer than 2 teams qualify.
"""
standings = await compute_standings_from_repo(repo, season_id)
playoff_teams = standings[:num_playoff_teams]
if len(playoff_teams) < 2:
return []
# Determine first available round number for playoffs
full_schedule = await repo.get_full_schedule(season_id)
max_round = max((s.round_number for s in full_schedule), default=0) if full_schedule else 0
games = await repo.get_all_games(season_id)
max_played = max((g.round_number for g in games), d