Function bodies 807 total
_parse_block function · python · L245-L270 (26 LOC)fabric_agent/api/llm_providers.py
def _parse_block(block: Any):
"""
Parse a content block into (type, text, tool_id, tool_name, tool_input).
Handles both dict format and Anthropic API object format.
"""
if isinstance(block, dict):
btype = block.get("type", "")
return (
btype,
block.get("text", "") if btype == "text" else "",
block.get("id", ""),
block.get("name", ""),
block.get("input", {}),
)
elif hasattr(block, "type"):
btype = block.type
if btype == "text":
return btype, getattr(block, "text", ""), "", "", {}
elif btype == "tool_use":
return (
btype, "",
getattr(block, "id", ""),
getattr(block, "name", ""),
getattr(block, "input", {}),
)
return ("", "", "", "", {})OllamaLLMClient.__init__ method · python · L316-L326 (11 LOC)fabric_agent/api/llm_providers.py
def __init__(
self,
base_url: str = "http://localhost:11434",
model: str = "llama3.1",
timeout: float = 120.0,
):
self.base_url = base_url.rstrip("/")
self.model = model
self.timeout = timeout
self._endpoint = f"{self.base_url}/v1/chat/completions"
logger.debug(f"OllamaLLMClient created: model={model}, endpoint={self._endpoint}")OllamaLLMClient.complete method · python · L328-L401 (74 LOC)fabric_agent/api/llm_providers.py
async def complete(
self,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
system_prompt: Optional[str] = None,
) -> Dict[str, Any]:
"""
Send a chat completion request to Ollama.
Converts Anthropic-format messages/tools → OpenAI format → sends to Ollama
→ converts response back to Anthropic format for BaseAgent compatibility.
Args:
messages: Chat history in Anthropic format.
tools: Tool schemas in Anthropic format (input_schema key).
system_prompt: System instruction injected as the first message.
Returns:
Response dict in Anthropic format:
{"content": [...], "stop_reason": "end_turn"|"tool_use", "usage": {...}}
Raises:
RuntimeError: If Ollama is not running or returns an error.
"""
# Build message list in OpenAI format
openai_messages: List[Dict[strAzureOpenAILLMClient.__init__ method · python · L438-L450 (13 LOC)fabric_agent/api/llm_providers.py
def __init__(
self,
endpoint: str,
deployment: str = "gpt-4o",
api_version: str = "2024-10-01-preview",
api_key: Optional[str] = None,
):
self.endpoint = endpoint.rstrip("/")
self.deployment = deployment
self.api_version = api_version
self.api_key = api_key
self._client = None
logger.debug(f"AzureOpenAILLMClient: endpoint={endpoint}, deployment={deployment}")AzureOpenAILLMClient._get_client method · python · L452-L490 (39 LOC)fabric_agent/api/llm_providers.py
def _get_client(self):
"""Lazy-initialize the AzureOpenAI client."""
if self._client is not None:
return self._client
try:
from openai import AsyncAzureOpenAI
except ImportError:
raise RuntimeError(
"openai package not installed. "
"Install: pip install openai"
)
if self.api_key:
self._client = AsyncAzureOpenAI(
azure_endpoint=self.endpoint,
api_key=self.api_key,
api_version=self.api_version,
)
else:
# Use Azure AD token via DefaultAzureCredential
try:
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
token_provider = get_bearer_token_provider(
DefaultAzureCredential(),
"https://cognitiveservices.azure.com/.default",
)
self._clientAzureOpenAILLMClient.complete method · python · L492-L544 (53 LOC)fabric_agent/api/llm_providers.py
async def complete(
self,
messages: List[Dict[str, Any]],
tools: Optional[List[Dict[str, Any]]] = None,
system_prompt: Optional[str] = None,
) -> Dict[str, Any]:
"""
Send a chat completion request to Azure OpenAI.
Same message normalization as OllamaLLMClient — Anthropic format in,
Anthropic format out. The caller never knows the difference.
Args:
messages: Chat history in Anthropic format.
tools: Tool schemas in Anthropic format.
system_prompt: System instruction.
Returns:
Response dict in Anthropic format.
Raises:
RuntimeError: If openai package is missing or API call fails.
"""
client = self._get_client()
openai_messages: List[Dict[str, Any]] = []
if system_prompt:
openai_messages.append({"role": "system", "content": system_prompt})
openai_messages.extend(_to_ocreate_llm_client function · python · L551-L639 (89 LOC)fabric_agent/api/llm_providers.py
def create_llm_client(config: Any) -> LLMClient:
"""
Factory function — returns the correct LLMClient based on config.
WHAT: Single entry point for all LLM provider construction. Callers
never import OllamaLLMClient or ClaudeLLMClient directly.
WHY FACTORY PATTERN:
- Config is the single source of truth for which provider to use
- Adding a new provider requires one elif here — zero changes to callers
- Unit tests can inject MockLLMClient without touching factory at all
FAANG PARALLEL:
- AWS SDK provider chain (S3, DynamoDB adapters via same interface)
- Google Cloud ADC (Application Default Credentials) factory
- Spring's @Bean factory methods for dependency injection
PROVIDER SELECTION (via LLM_PROVIDER env var):
"ollama" → OllamaLLMClient (DEFAULT — no API key needed)
"claude" → ClaudeLLMClient (best reasoning, needs ANTHROPIC_API_KEY)
"azure_openai" → AzureOpenAILLMClient (enterRepobility (the analyzer behind this table) · https://repobility.com
_infer_items_from_saferefactor_plan function · python · L44-L67 (24 LOC)fabric_agent/cicd/change_packet.py
def _infer_items_from_saferefactor_plan(plan: dict[str, Any]) -> list[dict[str, str]]:
"""Infer deployment items from SafeRefactor plan impact.
Deployment Pipelines expects: { "sourceItemId": "<uuid>", "itemType": "Report|SemanticModel|..." }
"""
items: list[dict[str, str]] = []
sm_id = plan.get("semantic_model_id")
if sm_id:
items.append({"sourceItemId": sm_id, "itemType": "SemanticModel"})
impact = plan.get("impact") or {}
reports_changed = impact.get("reports_changed") or {}
for report_id in reports_changed.keys():
items.append({"sourceItemId": report_id, "itemType": "Report"})
# de-dupe while preserving order
seen: set[tuple[str, str]] = set()
out: list[dict[str, str]] = []
for it in items:
key = (it["sourceItemId"], it["itemType"])
if key not in seen:
seen.add(key)
out.append(it)
return outcreate_change_packet_from_plan function · python · L70-L175 (106 LOC)fabric_agent/cicd/change_packet.py
def create_change_packet_from_plan(
plan_path: str | Path,
*,
out_dir: str | Path = "change_packets",
deployment_pipeline_id: str,
source_stage_id: str,
target_stage_id: str,
note: str | None = None,
include_items: bool = True,
) -> ChangePacketPaths:
"""Create a PR-friendly change packet folder.
This is Option 2's core artifact:
- reviewers approve via PR
- CI reads deployment_request.json and deploys stage content
"""
plan_path = Path(plan_path)
plan = _read_json(plan_path)
op_id = str(plan.get("operation_id") or plan_path.stem)
packet_dir = Path(out_dir) / op_id
packet_dir.mkdir(parents=True, exist_ok=True)
# 1) Copy plan.json
plan_json = packet_dir / "plan.json"
_write_json(plan_json, plan)
# 2) Approval record (even if pending)
approval = plan.get("approval") or {"status": "PENDING"}
approval_json = packet_dir / "approval.json"
_write_json(approval_json, approval)
# 3) HTMLgit_commit_change_packet function · python · L185-L207 (23 LOC)fabric_agent/cicd/change_packet.py
def git_commit_change_packet(
*,
repo_root: str | Path,
packet_dir: str | Path,
branch: str,
message: str,
push: bool = False,
remote: str = "origin",
) -> None:
"""Create a PR branch and commit the change packet.
Review + approval happens in GitHub/Azure DevOps PR. After merge, CI deploys.
"""
repo_root = Path(repo_root)
packet_dir = Path(packet_dir)
if not (repo_root / ".git").exists():
raise RuntimeError("Not a git repo. Run: git init && git add . && git commit -m 'init'")
_run_git(["checkout", "-b", branch], cwd=repo_root)
_run_git(["add", packet_dir.as_posix()], cwd=repo_root)
_run_git(["commit", "-m", message], cwd=repo_root)
if push:
_run_git(["push", "-u", remote, branch], cwd=repo_root)deploy_stage_content_and_wait function · python · L25-L83 (59 LOC)fabric_agent/cicd/deploy_pipeline.py
async def deploy_stage_content_and_wait(
client: FabricApiClient,
*,
deployment_pipeline_id: str,
source_stage_id: str,
target_stage_id: str,
items: list[dict[str, str]] | None = None,
note: str | None = None,
allow_cross_region_deployment: bool = False,
timeout_s: int = 1800,
poll_s: float = 5.0,
) -> DeployResult:
"""Deploy between Fabric deployment pipeline stages and wait for completion.
Uses POST /v1/deploymentPipelines/{id}/deploy and then polls:
GET /v1/deploymentPipelines/{id}/operations/{deploymentId}
This is the "Option 2" CI step after PR approval.
"""
body: dict[str, Any] = {
"sourceStageId": source_stage_id,
"targetStageId": target_stage_id,
}
if note:
body["note"] = note
if items:
# Deploy Stage Content expects: items: [{sourceItemId, itemType}, ...]
body["items"] = items
if allow_cross_region_deployment:
body["options"] = {"allowCrossRegionDlist_stage_items_all function · python · L86-L101 (16 LOC)fabric_agent/cicd/deploy_pipeline.py
async def list_stage_items_all(
client: FabricApiClient, *, deployment_pipeline_id: str, stage_id: str
) -> list[dict[str, Any]]:
"""Fetch all stage items across continuation tokens."""
all_items: list[dict[str, Any]] = []
token: str | None = None
while True:
data = await client.list_deployment_pipeline_stage_items(
deployment_pipeline_id, stage_id, continuation_token=token
)
batch = data.get("value") or []
all_items.extend(batch)
token = data.get("continuationToken")
if not token:
break
return all_itemsget_agent function · python · L48-L53 (6 LOC)fabric_agent/cli.py
def get_agent() -> FabricAgent:
"""Get or create the global agent instance."""
global _agent
if _agent is None:
_agent = FabricAgent(log_level="WARNING")
return _agentensure_initialized function · python · L56-L61 (6 LOC)fabric_agent/cli.py
async def ensure_initialized() -> FabricAgent:
"""Ensure agent is initialized."""
agent = get_agent()
if not agent._initialized:
await agent.initialize()
return agent_exit_code_from_system_exit function · python · L64-L73 (10 LOC)fabric_agent/cli.py
def _exit_code_from_system_exit(code: object) -> int:
"""Normalize SystemExit.code to an integer exit code."""
if code is None:
return 0
if isinstance(code, int):
return code
try:
return int(code)
except (TypeError, ValueError):
return 1Open data scored by Repobility · https://repobility.com
_close_agent_async function · python · L76-L87 (12 LOC)fabric_agent/cli.py
async def _close_agent_async() -> None:
"""Best-effort shutdown for the global agent on command completion."""
global _agent
agent = _agent
_agent = None
if agent is None:
return
try:
await agent.close()
except Exception:
# Don't hide command failures if shutdown has a cleanup issue.
pass_run_async function · python · L90-L104 (15 LOC)fabric_agent/cli.py
def _run_async(coro_factory: Callable[[], Awaitable[_T]]) -> _T:
"""Run an async command and preserve non-zero exits."""
async def _runner() -> _T:
try:
return await coro_factory()
finally:
await _close_agent_async()
try:
return asyncio.run(_runner())
except SystemExit as exc:
code = _exit_code_from_system_exit(exc.code)
if code == 0:
raise
raise typer.Exit(code=code) from exclist_workspaces function · python · L108-L125 (18 LOC)fabric_agent/cli.py
def list_workspaces():
"""List all accessible Fabric workspaces."""
async def _run():
agent = await ensure_initialized()
result = await agent.list_workspaces()
table = Table(title="Fabric Workspaces")
table.add_column("Name", style="cyan")
table.add_column("ID", style="dim")
table.add_column("Type")
for ws in result.workspaces:
table.add_row(ws.name, ws.id, ws.type)
console.print(table)
console.print(f"\nTotal: {result.count} workspaces")
_run_async(_run)set_workspace function · python · L130-L142 (13 LOC)fabric_agent/cli.py
def set_workspace(name: str = typer.Argument(..., help="Workspace name")):
"""Set the active workspace."""
async def _run():
agent = await ensure_initialized()
result = await agent.set_workspace(name)
console.print(Panel(
f"[green]✓[/green] Active workspace: [cyan]{result.workspace_name}[/cyan]\n"
f"ID: {result.workspace_id}",
title="Workspace Set",
))
_run_async(_run)list_items function · python · L147-L169 (23 LOC)fabric_agent/cli.py
def list_items(
item_type: Optional[str] = typer.Option(
None, "--type", "-t",
help="Filter by type (Report, SemanticModel, etc.)"
)
):
"""List items in the current workspace."""
async def _run():
agent = await ensure_initialized()
result = await agent.list_items(item_type)
table = Table(title="Workspace Items")
table.add_column("Name", style="cyan")
table.add_column("Type", style="magenta")
table.add_column("ID", style="dim")
for item in result.items:
table.add_row(item.name, item.type, item.id)
console.print(table)
console.print(f"\nTotal: {result.count} items")
_run_async(_run)get_measures function · python · L174-L190 (17 LOC)fabric_agent/cli.py
def get_measures(model: str = typer.Argument(..., help="Semantic model name")):
"""Get measures from a semantic model."""
async def _run():
agent = await ensure_initialized()
result = await agent.get_measures(model)
table = Table(title=f"Measures in {result.model_name}")
table.add_column("Name", style="cyan")
table.add_column("Table")
for m in result.measures:
table.add_row(m.name, m.table or "-")
console.print(table)
console.print(f"\nTotal: {result.count} measures")
_run_async(_run)analyze_impact function · python · L195-L256 (62 LOC)fabric_agent/cli.py
def analyze_impact(
target_type: str = typer.Option(
..., "--type", "-t",
help="Target type: measure or column"
),
name: str = typer.Option(..., "--name", "-n", help="Target name"),
model: Optional[str] = typer.Option(
None, "--model", "-m",
help="Semantic model name (for DAX analysis)"
),
):
"""Analyze impact of renaming a measure or column."""
async def _run():
agent = await ensure_initialized()
result = await agent.analyze_impact(target_type, name, model)
# Summary panel
severity_colors = {
"none": "green",
"low": "yellow",
"medium": "orange1",
"high": "red",
"critical": "bold red",
}
color = severity_colors.get(result.severity.value, "white")
console.print(Panel(
f"Target: [cyan]{result.target_name}[/cyan] ({result.target_type})\n"
f"Severity: [{color}]{result.severithistory function · python · L261-L297 (37 LOC)fabric_agent/cli.py
def history(
limit: int = typer.Option(10, "--limit", "-l", help="Max events"),
operation: Optional[str] = typer.Option(
None, "--operation", "-o",
help="Filter by operation type"
),
):
"""Show refactoring history from audit trail."""
async def _run():
agent = await ensure_initialized()
result = await agent.get_history(limit, operation)
table = Table(title="Refactoring History")
table.add_column("Timestamp", style="dim")
table.add_column("Operation", style="cyan")
table.add_column("Target")
table.add_column("Change")
table.add_column("Status")
for event in result.events:
change = ""
if event.old_value and event.new_value:
change = f"{event.old_value} → {event.new_value}"
status_style = "green" if event.status == "success" else "yellow"
table.add_row(
event.tCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
status function · python · L302-L322 (21 LOC)fabric_agent/cli.py
def status():
"""Check connection status."""
async def _run():
agent = await ensure_initialized()
result = await agent.get_status()
if result.connected:
console.print(Panel(
f"[green]✓ Connected[/green]\n"
f"Workspace: {result.workspace_name or '(not set)'}\n"
f"Auth Mode: {result.auth_mode}",
title="Connection Status",
))
else:
console.print(Panel(
f"[red]✗ Not Connected[/red]\n"
f"Error: {result.error}",
title="Connection Status",
))
_run_async(_run)health_scan function · python · L339-L394 (56 LOC)fabric_agent/cli.py
def health_scan(
workspace_ids: str = typer.Argument(
...,
help="Comma-separated workspace IDs to scan (e.g. ws-abc,ws-def)",
),
stale_hours: int = typer.Option(
24, "--stale-hours", help="Flag tables not refreshed within N hours"
),
):
"""Scan workspace(s) for infrastructure anomalies (broken shortcuts, schema drift, etc.)."""
async def _run():
from fabric_agent.tools.models import ScanWorkspaceHealthInput
ids = [w.strip() for w in workspace_ids.split(",") if w.strip()]
agent = await ensure_initialized()
if not agent.tools:
console.print("[red]Agent tools not initialized.[/red]")
return
with console.status("[bold cyan]Scanning workspace health...[/bold cyan]"):
result = await agent.tools.scan_workspace_health(
ScanWorkspaceHealthInput(workspace_ids=ids, stale_hours=stale_hours)
)
severity_colors = {
"critical": "heal function · python · L399-L440 (42 LOC)fabric_agent/cli.py
def heal(
workspace_ids: str = typer.Argument(
...,
help="Comma-separated workspace IDs (e.g. ws-abc,ws-def)",
),
dry_run: bool = typer.Option(
True, "--dry-run/--apply",
help="Simulate (default) or apply healing actions",
),
):
"""Execute self-healing for workspace anomalies. Use --apply to actually fix issues."""
async def _run():
from fabric_agent.tools.models import ExecuteHealingPlanInput
ids = [w.strip() for w in workspace_ids.split(",") if w.strip()]
agent = await ensure_initialized()
if not agent.tools:
console.print("[red]Agent tools not initialized.[/red]")
return
mode = "[dim](dry run)[/dim]" if dry_run else "[bold red](LIVE)[/bold red]"
with console.status(f"[bold cyan]Running self-healing {mode}...[/bold cyan]"):
result = await agent.tools.execute_healing_plan(
ExecuteHealingPlanInput(workspace_ids=ids, dry_run=dry_rushortcut_scan function · python · L450-L530 (81 LOC)fabric_agent/cli.py
def shortcut_scan(
workspace_ids: str = typer.Argument(
...,
help="Comma-separated workspace IDs to scan (e.g. ws-abc,ws-def)",
),
):
"""
Discover broken OneLake shortcuts + their cascade of impacted assets.
Queries the Fabric REST API for actual shortcut definitions, verifies each
source is accessible, then shows which downstream tables / semantic models /
pipelines / reports are affected.
Run this before shortcut-heal to review what's broken.
"""
async def _run():
from fabric_agent.tools.models import ScanShortcutCascadeInput
from rich.table import Table
ids = [w.strip() for w in workspace_ids.split(",") if w.strip()]
agent = await ensure_initialized()
if not agent.tools:
console.print("[red]Agent tools not initialized.[/red]")
return
with console.status("[bold cyan]Scanning for broken shortcuts...[/bold cyan]"):
result = await agent.tools.scshortcut_heal function · python · L535-L658 (124 LOC)fabric_agent/cli.py
def shortcut_heal(
workspace_ids: str = typer.Argument(
...,
help="Comma-separated workspace IDs (e.g. ws-abc,ws-def)",
),
dry_run: bool = typer.Option(
True, "--dry-run/--apply",
help="Simulate (default) or apply healing actions",
),
approved_by: str = typer.Option(
"cli-user", "--approved-by", help="Approver name for audit trail",
),
):
"""
Build a shortcut cascade healing plan and optionally execute it.
WORKFLOW:
1. Builds cascade plan (auto vs manual action split)
2. Shows the plan for review
3. If --apply: asks for confirmation, approves, then executes
Always run without --apply first to review what will happen.
"""
async def _run():
from fabric_agent.tools.models import (
BuildShortcutHealingPlanInput,
ApproveShortcutHealingInput,
ExecuteShortcutHealingInput,
)
from rich.table import Table
ids = [w.strip()memory_stats function · python · L667-L691 (25 LOC)fabric_agent/cli.py
def memory_stats():
"""Show vector store statistics (total indexed, collection size, embedding model)."""
async def _run():
from fabric_agent.tools.models import MemoryStatsInput
agent = await ensure_initialized()
if not agent.tools:
console.print("[red]Agent tools not initialized.[/red]")
return
result = await agent.tools.memory_stats(MemoryStatsInput())
table = Table(title="Operation Memory Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value")
table.add_row("Total Indexed", str(result.total_indexed))
table.add_row("Vector Store Size", str(result.collection_size))
table.add_row("Last Indexed", result.last_indexed or "never")
table.add_row("Embedding Model", result.embedding_model)
table.add_row("Store Path", result.vector_store_path)
console.print(table)
console.print(f"\n[dim]{result.message}[/dim]")
_run_asmemory_search function · python · L696-L747 (52 LOC)fabric_agent/cli.py
def memory_search(
query: str = typer.Argument(..., help="Natural language description of the proposed change"),
top_k: int = typer.Option(5, "--top-k", "-k", help="Number of similar operations to return"),
change_type: Optional[str] = typer.Option(
None, "--type", "-t", help="Filter by change type (rename_measure, etc.)"
),
):
"""Find past operations semantically similar to a proposed change (vector RAG search)."""
async def _run():
from fabric_agent.tools.models import FindSimilarOperationsInput
agent = await ensure_initialized()
if not agent.tools:
console.print("[red]Agent tools not initialized.[/red]")
return
result = await agent.tools.find_similar_operations(
FindSimilarOperationsInput(
proposed_change=query,
top_k=top_k,
change_type=change_type,
)
)
if not result.similar_operations:
console.memory_reindex function · python · L752-L770 (19 LOC)fabric_agent/cli.py
def memory_reindex():
"""Rebuild the vector index from the SQLite audit trail (run after first setup)."""
async def _run():
from fabric_agent.tools.models import ReindexMemoryInput
agent = await ensure_initialized()
if not agent.tools:
console.print("[red]Agent tools not initialized.[/red]")
return
with console.status("[bold cyan]Reindexing operation memory...[/bold cyan]"):
result = await agent.tools.reindex_operation_memory(ReindexMemoryInput())
console.print(Panel(
f"[green]✓[/green] {result.message}",
title="Memory Reindex",
))
_run_async(_run)Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
session_summary function · python · L775-L804 (30 LOC)fabric_agent/cli.py
def session_summary(
workspace_id: Optional[str] = typer.Option(
None, "--workspace", "-w", help="Workspace ID (defaults to active workspace)"
),
):
"""Show what the agent did in the last session (cross-session memory)."""
async def _run():
from fabric_agent.tools.models import GetSessionSummaryInput
agent = await ensure_initialized()
if not agent.tools:
console.print("[red]Agent tools not initialized.[/red]")
return
result = await agent.tools.get_session_summary(
GetSessionSummaryInput(workspace_id=workspace_id)
)
if result.has_history:
console.print(Panel(
result.summary,
title=f"Last Session Summary (workspace: {result.workspace_id or 'default'})",
))
else:
console.print(Panel(
"[dim]No previous sessions found.[/dim]",
title="Session Summary",
))
_runblast_radius function · python · L814-L966 (153 LOC)fabric_agent/cli.py
def blast_radius(
source_table: str = typer.Option(
..., "--source-table", "-s",
help="Name of the source asset that changed (e.g. 'fact_sales')",
),
workspace_ids: List[str] = typer.Option(
..., "--workspace-id", "-w",
help="Workspace ID to scan (repeat for multiple workspaces)",
),
change_description: str = typer.Option(
"schema change", "--change",
help="Description of the change (e.g. 'column dropped: customer_tier')",
),
json_out: Optional[str] = typer.Option(
None, "--json-out",
help="Write full blast-radius JSON to this file path (consumed by execute-cascade-plan)",
),
fail_on_risk: Optional[str] = typer.Option(
None, "--fail-on-risk",
help="Exit with code 1 if risk_level >= this level (high|critical). Used by CI to block PRs.",
),
):
"""
Compute the full cross-workspace cascade blast radius for a source asset change.
Shows which assets across all execute_cascade_plan function · python · L975-L1097 (123 LOC)fabric_agent/cli.py
def execute_cascade_plan(
plan_json: str = typer.Option(
..., "--plan-json", "-p",
help="Path to blast-radius JSON file produced by `blast-radius --json-out`",
),
dry_run: bool = typer.Option(
False, "--dry-run",
help="Print what would be executed without calling Fabric APIs",
),
skip_advisory: bool = typer.Option(
True, "--skip-advisory/--include-advisory",
help="Skip advisory (notify_owner) steps that don't require API calls (default: skip)",
),
):
"""
Execute the ordered cascade healing plan produced by `blast-radius --json-out`.
Reads the blast-radius JSON, iterates the topologically sorted healing steps,
and triggers pipelines / refreshes semantic models in the correct dependency order.
This command is the GitOps Stage 2: it runs ONLY after a human reviewer approves
the `fabric-production` GitHub Environment in the fabric-auto-heal workflow.
Healing order (mirrors data flow, sourfreshness_scan function · python · L1101-L1191 (91 LOC)fabric_agent/cli.py
def freshness_scan(
workspace_ids: List[str] = typer.Option(
..., "--workspace-id", "-w",
help="Workspace ID to scan (repeat for multiple workspaces)",
),
sla_fact: float = typer.Option(
1.0, "--sla-fact",
help="SLA threshold in hours for fact_* tables",
),
sla_dim: float = typer.Option(
24.0, "--sla-dim",
help="SLA threshold in hours for dim_* tables",
),
sla_default: float = typer.Option(
24.0, "--sla-default",
help="Default SLA threshold in hours for unmatched tables",
),
lro_timeout: int = typer.Option(
300, "--lro-timeout",
help="Max seconds to wait for refreshMetadata LRO per endpoint",
),
):
"""
Scan SQL Endpoint sync freshness across workspaces.
Detects tables silently stale with no Fabric alert by comparing each
table's lastSuccessfulSyncDateTime against SLA thresholds.
Example:
fabric-agent freshness-scan -w ws-dataplatform -w wtable_maintenance function · python · L1195-L1277 (83 LOC)fabric_agent/cli.py
def table_maintenance(
workspace_ids: List[str] = typer.Option(
..., "--workspace-id", "-w",
help="Workspace ID to process (repeat for multiple workspaces)",
),
dry_run: bool = typer.Option(
True, "--dry-run/--no-dry-run",
help="True = validate only (default). False = submit jobs.",
),
table_filter: Optional[List[str]] = typer.Option(
None, "--table", "-t",
help="Specific table name to maintain (repeat for multiple). Omit = all tables.",
),
queue_threshold: int = typer.Option(
3, "--queue-threshold",
help="Skip submission if active jobs >= this (Trial capacity protection)",
),
):
"""
Validate and optionally submit TableMaintenance jobs.
Pre-validates table names before submission to prevent silent Spark failures.
ALWAYS run with --dry-run first (default) to preview what would be submitted.
Example:
fabric-agent table-maintenance -w ws-dataplatform --dry-run
_configure_logging function · python · L73-L113 (41 LOC)fabric_agent/core/agent.py
def _configure_logging(
level: str = "INFO",
log_file: Optional[Path] = None,
) -> None:
"""
Configure loguru logging.
Args:
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
log_file: Optional file path for log output.
"""
# Remove default handler
logger.remove()
# Add stderr handler with formatting
logger.add(
sys.stderr,
level=level,
format=(
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
"<level>{message}</level>"
),
colorize=True,
)
# Add file handler if specified
if log_file:
log_file.parent.mkdir(parents=True, exist_ok=True)
logger.add(
str(log_file),
level=level,
format=(
"{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | "
FabricAgent.__init__ method · python · L150-L212 (63 LOC)fabric_agent/core/agent.py
def __init__(
self,
config: Optional[AgentConfig] = None,
auth_config: Optional[FabricAuthConfig] = None,
storage_path: Optional[Path] = None,
storage_backend: str = "sqlite",
log_level: str = "INFO",
log_file: Optional[Path] = None,
):
"""
Initialize the FabricAgent.
Args:
config: Full agent configuration. If not provided, will be
created from other parameters.
auth_config: Authentication configuration. If not provided,
will be loaded from environment variables.
storage_path: Path for audit trail storage.
storage_backend: "sqlite" or "json".
log_level: Logging level.
log_file: Optional file for logs.
Example:
>>> # Default (loads auth from environment)
>>> agent = FabricAgent()
>>>
>>> # With explicit configuratFabricAgent.initialize method · python · L214-L286 (73 LOC)fabric_agent/core/agent.py
async def initialize(self) -> "FabricAgent":
"""
Initialize the agent and all its components.
This must be called before using any agent methods.
Alternatively, use the async context manager.
Returns:
Self for method chaining.
Raises:
ValueError: If authentication configuration is missing.
Example:
>>> agent = FabricAgent()
>>> await agent.initialize()
>>> # Now ready to use
"""
if self._initialized:
logger.debug("Agent already initialized")
return self
logger.info("Starting agent initialization")
if not self.config.auth:
raise ValueError(
"Authentication configuration required. "
"Set AZURE_TENANT_ID and AZURE_CLIENT_ID environment variables, "
"or provide auth_config parameter."
)
Repobility (the analyzer behind this table) · https://repobility.com
FabricAgent.close method · python · L288-L303 (16 LOC)fabric_agent/core/agent.py
async def close(self) -> None:
"""
Close the agent and release resources.
Always call this when done, or use the async context manager.
"""
logger.info("Closing FabricAgent")
if self.client:
await self.client.close()
self.client = None
self.tools = None
self._initialized = False
logger.info("FabricAgent closed")FabricAgent._ensure_initialized method · python · L314-L320 (7 LOC)fabric_agent/core/agent.py
def _ensure_initialized(self) -> None:
"""Ensure the agent is initialized."""
if not self._initialized or not self.tools:
raise RuntimeError(
"FabricAgent not initialized. "
"Call initialize() or use async context manager."
)FabricAgent.list_workspaces method · python · L326-L339 (14 LOC)fabric_agent/core/agent.py
async def list_workspaces(self) -> ListWorkspacesOutput:
"""
List all accessible Fabric workspaces.
Returns:
ListWorkspacesOutput with workspaces and count.
Example:
>>> result = await agent.list_workspaces()
>>> for ws in result.workspaces:
... print(f"{ws.name} ({ws.id})")
"""
self._ensure_initialized()
return await self.tools.list_workspaces(ListWorkspacesInput())FabricAgent.set_workspace method · python · L341-L357 (17 LOC)fabric_agent/core/agent.py
async def set_workspace(self, workspace_name: str) -> SetWorkspaceOutput:
"""
Set the active workspace for subsequent operations.
Args:
workspace_name: Name of the workspace.
Returns:
SetWorkspaceOutput with workspace_id and workspace_name.
Example:
>>> await agent.set_workspace("Sales Analytics")
"""
self._ensure_initialized()
return await self.tools.set_workspace(
SetWorkspaceInput(workspace_name=workspace_name)
)FabricAgent.list_items method · python · L359-L376 (18 LOC)fabric_agent/core/agent.py
async def list_items(
self,
item_type: Optional[str] = None,
) -> ListItemsOutput:
"""
List items in the current workspace.
Args:
item_type: Optional filter (Report, SemanticModel, etc.)
Returns:
ListItemsOutput with items and count.
Example:
>>> reports = await agent.list_items("Report")
"""
self._ensure_initialized()
return await self.tools.list_items(ListItemsInput(item_type=item_type))FabricAgent.get_semantic_model method · python · L382-L402 (21 LOC)fabric_agent/core/agent.py
async def get_semantic_model(
self,
model_name: str,
) -> GetSemanticModelOutput:
"""
Get details of a semantic model.
Args:
model_name: Name of the semantic model.
Returns:
GetSemanticModelOutput with model details.
Example:
>>> model = await agent.get_semantic_model("Sales Analytics")
>>> print(f"Measures: {model.measure_count}")
"""
self._ensure_initialized()
return await self.tools.get_semantic_model(
GetSemanticModelInput(model_name=model_name)
)FabricAgent.get_measures method · python · L404-L422 (19 LOC)fabric_agent/core/agent.py
async def get_measures(self, model_name: str) -> GetMeasuresOutput:
"""
Get all measures from a semantic model.
Args:
model_name: Name of the semantic model.
Returns:
GetMeasuresOutput with measures.
Example:
>>> measures = await agent.get_measures("Sales Analytics")
>>> for m in measures.measures:
... print(m.name)
"""
self._ensure_initialized()
return await self.tools.get_measures(
GetMeasuresInput(model_name=model_name)
)FabricAgent.get_report_definition method · python · L428-L448 (21 LOC)fabric_agent/core/agent.py
async def get_report_definition(
self,
report_name: str,
) -> GetReportDefinitionOutput:
"""
Get the definition of a report.
Args:
report_name: Name of the report.
Returns:
GetReportDefinitionOutput with report structure.
Example:
>>> report = await agent.get_report_definition("Sales Dashboard")
>>> print(f"Pages: {report.pages}")
"""
self._ensure_initialized()
return await self.tools.get_report_definition(
GetReportDefinitionInput(report_name=report_name)
)Open data scored by Repobility · https://repobility.com
FabricAgent.analyze_impact method · python · L454-L485 (32 LOC)fabric_agent/core/agent.py
async def analyze_impact(
self,
target_type: str,
target_name: str,
model_name: Optional[str] = None,
) -> AnalyzeImpactOutput:
"""
Analyze the impact of renaming a measure or column.
ALWAYS call this before any rename operation!
Args:
target_type: "measure" or "column".
target_name: Name of the target.
model_name: Semantic model name (for DAX analysis).
Returns:
AnalyzeImpactOutput with affected items.
Example:
>>> impact = await agent.analyze_impact("measure", "Sales", "Analytics")
>>> if not impact.safe_to_rename:
... print(f"Would affect {impact.total_impact} items!")
"""
self._ensure_initialized()
return await self.tools.analyze_impact(
AnalyzeImpactInput(
target_type=TargetType(target_type),
target_name=tFabricAgent.rename_measure method · python · L491-L532 (42 LOC)fabric_agent/core/agent.py
async def rename_measure(
self,
model_name: str,
old_name: str,
new_name: str,
dry_run: bool = True,
) -> RenameMeasureOutput:
"""
Rename a measure across semantic model and reports.
Use dry_run=True first to preview changes!
Args:
model_name: Semantic model containing the measure.
old_name: Current measure name.
new_name: New measure name.
dry_run: If True, only simulate (default True).
Returns:
RenameMeasureOutput with operation results.
Example:
>>> # Preview first
>>> result = await agent.rename_measure(
... "Analytics", "Sales", "Net_Revenue", dry_run=True
... )
>>> print(f"Would affect {result.impact.total_impact} items")
>>>
>>> # Then execute
>>> result = await agent.rename_measure(
FabricAgent.smart_rename_measure method · python · L535-L572 (38 LOC)fabric_agent/core/agent.py
async def smart_rename_measure(
self,
model_name: str,
old_name: str,
new_name: str,
dry_run: bool = True,
reasoning: Optional[str] = None,
) -> SmartRenameMeasureOutput:
"""
Hero refactor: rename a measure and update dependent measures safely.
This is the recommended API for measure renames because it:
- Uses SemPy dependency mapping where available
- Updates referencing measures' DAX so nothing breaks
- Persists an append-only log to memory/refactor_log.json
- Produces a Markdown safety UI
Args:
model_name: Semantic model containing the measure.
old_name: Current measure name.
new_name: New measure name.
dry_run: If True, only preview (default True).
reasoning: Reason for this change (stored in log).
Returns:
SmartRenameMeasureOutput with operation_id for rollback.
"""