← back to krivamsh007__fabric-agent

Function bodies 807 total

All specs Real LLM only Function bodies
_parse_block function · python · L245-L270 (26 LOC)
fabric_agent/api/llm_providers.py
def _parse_block(block: Any):
    """
    Parse a content block into (type, text, tool_id, tool_name, tool_input).
    Handles both dict format and Anthropic API object format.
    """
    if isinstance(block, dict):
        btype = block.get("type", "")
        return (
            btype,
            block.get("text", "") if btype == "text" else "",
            block.get("id", ""),
            block.get("name", ""),
            block.get("input", {}),
        )
    elif hasattr(block, "type"):
        btype = block.type
        if btype == "text":
            return btype, getattr(block, "text", ""), "", "", {}
        elif btype == "tool_use":
            return (
                btype, "",
                getattr(block, "id", ""),
                getattr(block, "name", ""),
                getattr(block, "input", {}),
            )
    return ("", "", "", "", {})
OllamaLLMClient.__init__ method · python · L316-L326 (11 LOC)
fabric_agent/api/llm_providers.py
    def __init__(
        self,
        base_url: str = "http://localhost:11434",
        model: str = "llama3.1",
        timeout: float = 120.0,
    ):
        self.base_url = base_url.rstrip("/")
        self.model = model
        self.timeout = timeout
        self._endpoint = f"{self.base_url}/v1/chat/completions"
        logger.debug(f"OllamaLLMClient created: model={model}, endpoint={self._endpoint}")
OllamaLLMClient.complete method · python · L328-L401 (74 LOC)
fabric_agent/api/llm_providers.py
    async def complete(
        self,
        messages: List[Dict[str, Any]],
        tools: Optional[List[Dict[str, Any]]] = None,
        system_prompt: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Send a chat completion request to Ollama.

        Converts Anthropic-format messages/tools → OpenAI format → sends to Ollama
        → converts response back to Anthropic format for BaseAgent compatibility.

        Args:
            messages:      Chat history in Anthropic format.
            tools:         Tool schemas in Anthropic format (input_schema key).
            system_prompt: System instruction injected as the first message.

        Returns:
            Response dict in Anthropic format:
              {"content": [...], "stop_reason": "end_turn"|"tool_use", "usage": {...}}

        Raises:
            RuntimeError: If Ollama is not running or returns an error.
        """
        # Build message list in OpenAI format
        openai_messages: List[Dict[str
AzureOpenAILLMClient.__init__ method · python · L438-L450 (13 LOC)
fabric_agent/api/llm_providers.py
    def __init__(
        self,
        endpoint: str,
        deployment: str = "gpt-4o",
        api_version: str = "2024-10-01-preview",
        api_key: Optional[str] = None,
    ):
        self.endpoint = endpoint.rstrip("/")
        self.deployment = deployment
        self.api_version = api_version
        self.api_key = api_key
        self._client = None
        logger.debug(f"AzureOpenAILLMClient: endpoint={endpoint}, deployment={deployment}")
AzureOpenAILLMClient._get_client method · python · L452-L490 (39 LOC)
fabric_agent/api/llm_providers.py
    def _get_client(self):
        """Lazy-initialize the AzureOpenAI client."""
        if self._client is not None:
            return self._client

        try:
            from openai import AsyncAzureOpenAI
        except ImportError:
            raise RuntimeError(
                "openai package not installed. "
                "Install: pip install openai"
            )

        if self.api_key:
            self._client = AsyncAzureOpenAI(
                azure_endpoint=self.endpoint,
                api_key=self.api_key,
                api_version=self.api_version,
            )
        else:
            # Use Azure AD token via DefaultAzureCredential
            try:
                from azure.identity import DefaultAzureCredential, get_bearer_token_provider
                token_provider = get_bearer_token_provider(
                    DefaultAzureCredential(),
                    "https://cognitiveservices.azure.com/.default",
                )
                self._client
AzureOpenAILLMClient.complete method · python · L492-L544 (53 LOC)
fabric_agent/api/llm_providers.py
    async def complete(
        self,
        messages: List[Dict[str, Any]],
        tools: Optional[List[Dict[str, Any]]] = None,
        system_prompt: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        Send a chat completion request to Azure OpenAI.

        Same message normalization as OllamaLLMClient — Anthropic format in,
        Anthropic format out. The caller never knows the difference.

        Args:
            messages:      Chat history in Anthropic format.
            tools:         Tool schemas in Anthropic format.
            system_prompt: System instruction.

        Returns:
            Response dict in Anthropic format.

        Raises:
            RuntimeError: If openai package is missing or API call fails.
        """
        client = self._get_client()

        openai_messages: List[Dict[str, Any]] = []
        if system_prompt:
            openai_messages.append({"role": "system", "content": system_prompt})
        openai_messages.extend(_to_o
create_llm_client function · python · L551-L639 (89 LOC)
fabric_agent/api/llm_providers.py
def create_llm_client(config: Any) -> LLMClient:
    """
    Factory function — returns the correct LLMClient based on config.

    WHAT: Single entry point for all LLM provider construction. Callers
          never import OllamaLLMClient or ClaudeLLMClient directly.

    WHY FACTORY PATTERN:
      - Config is the single source of truth for which provider to use
      - Adding a new provider requires one elif here — zero changes to callers
      - Unit tests can inject MockLLMClient without touching factory at all

    FAANG PARALLEL:
      - AWS SDK provider chain (S3, DynamoDB adapters via same interface)
      - Google Cloud ADC (Application Default Credentials) factory
      - Spring's @Bean factory methods for dependency injection

    PROVIDER SELECTION (via LLM_PROVIDER env var):
      "ollama"       → OllamaLLMClient  (DEFAULT — no API key needed)
      "claude"       → ClaudeLLMClient  (best reasoning, needs ANTHROPIC_API_KEY)
      "azure_openai" → AzureOpenAILLMClient (enter
Repobility (the analyzer behind this table) · https://repobility.com
_infer_items_from_saferefactor_plan function · python · L44-L67 (24 LOC)
fabric_agent/cicd/change_packet.py
def _infer_items_from_saferefactor_plan(plan: dict[str, Any]) -> list[dict[str, str]]:
    """Infer deployment items from SafeRefactor plan impact.

    Deployment Pipelines expects: { "sourceItemId": "<uuid>", "itemType": "Report|SemanticModel|..." }
    """
    items: list[dict[str, str]] = []
    sm_id = plan.get("semantic_model_id")
    if sm_id:
        items.append({"sourceItemId": sm_id, "itemType": "SemanticModel"})

    impact = plan.get("impact") or {}
    reports_changed = impact.get("reports_changed") or {}
    for report_id in reports_changed.keys():
        items.append({"sourceItemId": report_id, "itemType": "Report"})

    # de-dupe while preserving order
    seen: set[tuple[str, str]] = set()
    out: list[dict[str, str]] = []
    for it in items:
        key = (it["sourceItemId"], it["itemType"])
        if key not in seen:
            seen.add(key)
            out.append(it)
    return out
create_change_packet_from_plan function · python · L70-L175 (106 LOC)
fabric_agent/cicd/change_packet.py
def create_change_packet_from_plan(
    plan_path: str | Path,
    *,
    out_dir: str | Path = "change_packets",
    deployment_pipeline_id: str,
    source_stage_id: str,
    target_stage_id: str,
    note: str | None = None,
    include_items: bool = True,
) -> ChangePacketPaths:
    """Create a PR-friendly change packet folder.

    This is Option 2's core artifact:
    - reviewers approve via PR
    - CI reads deployment_request.json and deploys stage content
    """
    plan_path = Path(plan_path)
    plan = _read_json(plan_path)
    op_id = str(plan.get("operation_id") or plan_path.stem)

    packet_dir = Path(out_dir) / op_id
    packet_dir.mkdir(parents=True, exist_ok=True)

    # 1) Copy plan.json
    plan_json = packet_dir / "plan.json"
    _write_json(plan_json, plan)

    # 2) Approval record (even if pending)
    approval = plan.get("approval") or {"status": "PENDING"}
    approval_json = packet_dir / "approval.json"
    _write_json(approval_json, approval)

    # 3) HTML
git_commit_change_packet function · python · L185-L207 (23 LOC)
fabric_agent/cicd/change_packet.py
def git_commit_change_packet(
    *,
    repo_root: str | Path,
    packet_dir: str | Path,
    branch: str,
    message: str,
    push: bool = False,
    remote: str = "origin",
) -> None:
    """Create a PR branch and commit the change packet.

    Review + approval happens in GitHub/Azure DevOps PR. After merge, CI deploys.
    """
    repo_root = Path(repo_root)
    packet_dir = Path(packet_dir)
    if not (repo_root / ".git").exists():
        raise RuntimeError("Not a git repo. Run: git init && git add . && git commit -m 'init'")

    _run_git(["checkout", "-b", branch], cwd=repo_root)
    _run_git(["add", packet_dir.as_posix()], cwd=repo_root)
    _run_git(["commit", "-m", message], cwd=repo_root)
    if push:
        _run_git(["push", "-u", remote, branch], cwd=repo_root)
deploy_stage_content_and_wait function · python · L25-L83 (59 LOC)
fabric_agent/cicd/deploy_pipeline.py
async def deploy_stage_content_and_wait(
    client: FabricApiClient,
    *,
    deployment_pipeline_id: str,
    source_stage_id: str,
    target_stage_id: str,
    items: list[dict[str, str]] | None = None,
    note: str | None = None,
    allow_cross_region_deployment: bool = False,
    timeout_s: int = 1800,
    poll_s: float = 5.0,
) -> DeployResult:
    """Deploy between Fabric deployment pipeline stages and wait for completion.

    Uses POST /v1/deploymentPipelines/{id}/deploy and then polls:
      GET /v1/deploymentPipelines/{id}/operations/{deploymentId}

    This is the "Option 2" CI step after PR approval.
    """
    body: dict[str, Any] = {
        "sourceStageId": source_stage_id,
        "targetStageId": target_stage_id,
    }
    if note:
        body["note"] = note
    if items:
        # Deploy Stage Content expects: items: [{sourceItemId, itemType}, ...]
        body["items"] = items
    if allow_cross_region_deployment:
        body["options"] = {"allowCrossRegionD
list_stage_items_all function · python · L86-L101 (16 LOC)
fabric_agent/cicd/deploy_pipeline.py
async def list_stage_items_all(
    client: FabricApiClient, *, deployment_pipeline_id: str, stage_id: str
) -> list[dict[str, Any]]:
    """Fetch all stage items across continuation tokens."""
    all_items: list[dict[str, Any]] = []
    token: str | None = None
    while True:
        data = await client.list_deployment_pipeline_stage_items(
            deployment_pipeline_id, stage_id, continuation_token=token
        )
        batch = data.get("value") or []
        all_items.extend(batch)
        token = data.get("continuationToken")
        if not token:
            break
    return all_items
get_agent function · python · L48-L53 (6 LOC)
fabric_agent/cli.py
def get_agent() -> FabricAgent:
    """Get or create the global agent instance."""
    global _agent
    if _agent is None:
        _agent = FabricAgent(log_level="WARNING")
    return _agent
ensure_initialized function · python · L56-L61 (6 LOC)
fabric_agent/cli.py
async def ensure_initialized() -> FabricAgent:
    """Ensure agent is initialized."""
    agent = get_agent()
    if not agent._initialized:
        await agent.initialize()
    return agent
_exit_code_from_system_exit function · python · L64-L73 (10 LOC)
fabric_agent/cli.py
def _exit_code_from_system_exit(code: object) -> int:
    """Normalize SystemExit.code to an integer exit code."""
    if code is None:
        return 0
    if isinstance(code, int):
        return code
    try:
        return int(code)
    except (TypeError, ValueError):
        return 1
Open data scored by Repobility · https://repobility.com
_close_agent_async function · python · L76-L87 (12 LOC)
fabric_agent/cli.py
async def _close_agent_async() -> None:
    """Best-effort shutdown for the global agent on command completion."""
    global _agent
    agent = _agent
    _agent = None
    if agent is None:
        return
    try:
        await agent.close()
    except Exception:
        # Don't hide command failures if shutdown has a cleanup issue.
        pass
_run_async function · python · L90-L104 (15 LOC)
fabric_agent/cli.py
def _run_async(coro_factory: Callable[[], Awaitable[_T]]) -> _T:
    """Run an async command and preserve non-zero exits."""
    async def _runner() -> _T:
        try:
            return await coro_factory()
        finally:
            await _close_agent_async()

    try:
        return asyncio.run(_runner())
    except SystemExit as exc:
        code = _exit_code_from_system_exit(exc.code)
        if code == 0:
            raise
        raise typer.Exit(code=code) from exc
list_workspaces function · python · L108-L125 (18 LOC)
fabric_agent/cli.py
def list_workspaces():
    """List all accessible Fabric workspaces."""
    async def _run():
        agent = await ensure_initialized()
        result = await agent.list_workspaces()
        
        table = Table(title="Fabric Workspaces")
        table.add_column("Name", style="cyan")
        table.add_column("ID", style="dim")
        table.add_column("Type")
        
        for ws in result.workspaces:
            table.add_row(ws.name, ws.id, ws.type)
        
        console.print(table)
        console.print(f"\nTotal: {result.count} workspaces")
    
    _run_async(_run)
set_workspace function · python · L130-L142 (13 LOC)
fabric_agent/cli.py
def set_workspace(name: str = typer.Argument(..., help="Workspace name")):
    """Set the active workspace."""
    async def _run():
        agent = await ensure_initialized()
        result = await agent.set_workspace(name)
        
        console.print(Panel(
            f"[green]✓[/green] Active workspace: [cyan]{result.workspace_name}[/cyan]\n"
            f"ID: {result.workspace_id}",
            title="Workspace Set",
        ))
    
    _run_async(_run)
list_items function · python · L147-L169 (23 LOC)
fabric_agent/cli.py
def list_items(
    item_type: Optional[str] = typer.Option(
        None, "--type", "-t",
        help="Filter by type (Report, SemanticModel, etc.)"
    )
):
    """List items in the current workspace."""
    async def _run():
        agent = await ensure_initialized()
        result = await agent.list_items(item_type)
        
        table = Table(title="Workspace Items")
        table.add_column("Name", style="cyan")
        table.add_column("Type", style="magenta")
        table.add_column("ID", style="dim")
        
        for item in result.items:
            table.add_row(item.name, item.type, item.id)
        
        console.print(table)
        console.print(f"\nTotal: {result.count} items")
    
    _run_async(_run)
get_measures function · python · L174-L190 (17 LOC)
fabric_agent/cli.py
def get_measures(model: str = typer.Argument(..., help="Semantic model name")):
    """Get measures from a semantic model."""
    async def _run():
        agent = await ensure_initialized()
        result = await agent.get_measures(model)
        
        table = Table(title=f"Measures in {result.model_name}")
        table.add_column("Name", style="cyan")
        table.add_column("Table")
        
        for m in result.measures:
            table.add_row(m.name, m.table or "-")
        
        console.print(table)
        console.print(f"\nTotal: {result.count} measures")
    
    _run_async(_run)
analyze_impact function · python · L195-L256 (62 LOC)
fabric_agent/cli.py
def analyze_impact(
    target_type: str = typer.Option(
        ..., "--type", "-t",
        help="Target type: measure or column"
    ),
    name: str = typer.Option(..., "--name", "-n", help="Target name"),
    model: Optional[str] = typer.Option(
        None, "--model", "-m",
        help="Semantic model name (for DAX analysis)"
    ),
):
    """Analyze impact of renaming a measure or column."""
    async def _run():
        agent = await ensure_initialized()
        result = await agent.analyze_impact(target_type, name, model)
        
        # Summary panel
        severity_colors = {
            "none": "green",
            "low": "yellow",
            "medium": "orange1",
            "high": "red",
            "critical": "bold red",
        }
        color = severity_colors.get(result.severity.value, "white")
        
        console.print(Panel(
            f"Target: [cyan]{result.target_name}[/cyan] ({result.target_type})\n"
            f"Severity: [{color}]{result.severit
history function · python · L261-L297 (37 LOC)
fabric_agent/cli.py
def history(
    limit: int = typer.Option(10, "--limit", "-l", help="Max events"),
    operation: Optional[str] = typer.Option(
        None, "--operation", "-o",
        help="Filter by operation type"
    ),
):
    """Show refactoring history from audit trail."""
    async def _run():
        agent = await ensure_initialized()
        result = await agent.get_history(limit, operation)
        
        table = Table(title="Refactoring History")
        table.add_column("Timestamp", style="dim")
        table.add_column("Operation", style="cyan")
        table.add_column("Target")
        table.add_column("Change")
        table.add_column("Status")
        
        for event in result.events:
            change = ""
            if event.old_value and event.new_value:
                change = f"{event.old_value} → {event.new_value}"
            
            status_style = "green" if event.status == "success" else "yellow"
            
            table.add_row(
                event.t
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
status function · python · L302-L322 (21 LOC)
fabric_agent/cli.py
def status():
    """Check connection status."""
    async def _run():
        agent = await ensure_initialized()
        result = await agent.get_status()
        
        if result.connected:
            console.print(Panel(
                f"[green]✓ Connected[/green]\n"
                f"Workspace: {result.workspace_name or '(not set)'}\n"
                f"Auth Mode: {result.auth_mode}",
                title="Connection Status",
            ))
        else:
            console.print(Panel(
                f"[red]✗ Not Connected[/red]\n"
                f"Error: {result.error}",
                title="Connection Status",
            ))
    
    _run_async(_run)
health_scan function · python · L339-L394 (56 LOC)
fabric_agent/cli.py
def health_scan(
    workspace_ids: str = typer.Argument(
        ...,
        help="Comma-separated workspace IDs to scan (e.g. ws-abc,ws-def)",
    ),
    stale_hours: int = typer.Option(
        24, "--stale-hours", help="Flag tables not refreshed within N hours"
    ),
):
    """Scan workspace(s) for infrastructure anomalies (broken shortcuts, schema drift, etc.)."""
    async def _run():
        from fabric_agent.tools.models import ScanWorkspaceHealthInput

        ids = [w.strip() for w in workspace_ids.split(",") if w.strip()]
        agent = await ensure_initialized()
        if not agent.tools:
            console.print("[red]Agent tools not initialized.[/red]")
            return

        with console.status("[bold cyan]Scanning workspace health...[/bold cyan]"):
            result = await agent.tools.scan_workspace_health(
                ScanWorkspaceHealthInput(workspace_ids=ids, stale_hours=stale_hours)
            )

        severity_colors = {
            "critical": "
heal function · python · L399-L440 (42 LOC)
fabric_agent/cli.py
def heal(
    workspace_ids: str = typer.Argument(
        ...,
        help="Comma-separated workspace IDs (e.g. ws-abc,ws-def)",
    ),
    dry_run: bool = typer.Option(
        True, "--dry-run/--apply",
        help="Simulate (default) or apply healing actions",
    ),
):
    """Execute self-healing for workspace anomalies. Use --apply to actually fix issues."""
    async def _run():
        from fabric_agent.tools.models import ExecuteHealingPlanInput

        ids = [w.strip() for w in workspace_ids.split(",") if w.strip()]
        agent = await ensure_initialized()
        if not agent.tools:
            console.print("[red]Agent tools not initialized.[/red]")
            return

        mode = "[dim](dry run)[/dim]" if dry_run else "[bold red](LIVE)[/bold red]"
        with console.status(f"[bold cyan]Running self-healing {mode}...[/bold cyan]"):
            result = await agent.tools.execute_healing_plan(
                ExecuteHealingPlanInput(workspace_ids=ids, dry_run=dry_ru
shortcut_scan function · python · L450-L530 (81 LOC)
fabric_agent/cli.py
def shortcut_scan(
    workspace_ids: str = typer.Argument(
        ...,
        help="Comma-separated workspace IDs to scan (e.g. ws-abc,ws-def)",
    ),
):
    """
    Discover broken OneLake shortcuts + their cascade of impacted assets.

    Queries the Fabric REST API for actual shortcut definitions, verifies each
    source is accessible, then shows which downstream tables / semantic models /
    pipelines / reports are affected.

    Run this before shortcut-heal to review what's broken.
    """
    async def _run():
        from fabric_agent.tools.models import ScanShortcutCascadeInput
        from rich.table import Table

        ids = [w.strip() for w in workspace_ids.split(",") if w.strip()]
        agent = await ensure_initialized()
        if not agent.tools:
            console.print("[red]Agent tools not initialized.[/red]")
            return

        with console.status("[bold cyan]Scanning for broken shortcuts...[/bold cyan]"):
            result = await agent.tools.sc
shortcut_heal function · python · L535-L658 (124 LOC)
fabric_agent/cli.py
def shortcut_heal(
    workspace_ids: str = typer.Argument(
        ...,
        help="Comma-separated workspace IDs (e.g. ws-abc,ws-def)",
    ),
    dry_run: bool = typer.Option(
        True, "--dry-run/--apply",
        help="Simulate (default) or apply healing actions",
    ),
    approved_by: str = typer.Option(
        "cli-user", "--approved-by", help="Approver name for audit trail",
    ),
):
    """
    Build a shortcut cascade healing plan and optionally execute it.

    WORKFLOW:
      1. Builds cascade plan (auto vs manual action split)
      2. Shows the plan for review
      3. If --apply: asks for confirmation, approves, then executes

    Always run without --apply first to review what will happen.
    """
    async def _run():
        from fabric_agent.tools.models import (
            BuildShortcutHealingPlanInput,
            ApproveShortcutHealingInput,
            ExecuteShortcutHealingInput,
        )
        from rich.table import Table

        ids = [w.strip()
memory_stats function · python · L667-L691 (25 LOC)
fabric_agent/cli.py
def memory_stats():
    """Show vector store statistics (total indexed, collection size, embedding model)."""
    async def _run():
        from fabric_agent.tools.models import MemoryStatsInput

        agent = await ensure_initialized()
        if not agent.tools:
            console.print("[red]Agent tools not initialized.[/red]")
            return

        result = await agent.tools.memory_stats(MemoryStatsInput())

        table = Table(title="Operation Memory Statistics")
        table.add_column("Metric", style="cyan")
        table.add_column("Value")
        table.add_row("Total Indexed", str(result.total_indexed))
        table.add_row("Vector Store Size", str(result.collection_size))
        table.add_row("Last Indexed", result.last_indexed or "never")
        table.add_row("Embedding Model", result.embedding_model)
        table.add_row("Store Path", result.vector_store_path)

        console.print(table)
        console.print(f"\n[dim]{result.message}[/dim]")

    _run_as
memory_search function · python · L696-L747 (52 LOC)
fabric_agent/cli.py
def memory_search(
    query: str = typer.Argument(..., help="Natural language description of the proposed change"),
    top_k: int = typer.Option(5, "--top-k", "-k", help="Number of similar operations to return"),
    change_type: Optional[str] = typer.Option(
        None, "--type", "-t", help="Filter by change type (rename_measure, etc.)"
    ),
):
    """Find past operations semantically similar to a proposed change (vector RAG search)."""
    async def _run():
        from fabric_agent.tools.models import FindSimilarOperationsInput

        agent = await ensure_initialized()
        if not agent.tools:
            console.print("[red]Agent tools not initialized.[/red]")
            return

        result = await agent.tools.find_similar_operations(
            FindSimilarOperationsInput(
                proposed_change=query,
                top_k=top_k,
                change_type=change_type,
            )
        )

        if not result.similar_operations:
            console.
memory_reindex function · python · L752-L770 (19 LOC)
fabric_agent/cli.py
def memory_reindex():
    """Rebuild the vector index from the SQLite audit trail (run after first setup)."""
    async def _run():
        from fabric_agent.tools.models import ReindexMemoryInput

        agent = await ensure_initialized()
        if not agent.tools:
            console.print("[red]Agent tools not initialized.[/red]")
            return

        with console.status("[bold cyan]Reindexing operation memory...[/bold cyan]"):
            result = await agent.tools.reindex_operation_memory(ReindexMemoryInput())

        console.print(Panel(
            f"[green]✓[/green] {result.message}",
            title="Memory Reindex",
        ))

    _run_async(_run)
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
session_summary function · python · L775-L804 (30 LOC)
fabric_agent/cli.py
def session_summary(
    workspace_id: Optional[str] = typer.Option(
        None, "--workspace", "-w", help="Workspace ID (defaults to active workspace)"
    ),
):
    """Show what the agent did in the last session (cross-session memory)."""
    async def _run():
        from fabric_agent.tools.models import GetSessionSummaryInput

        agent = await ensure_initialized()
        if not agent.tools:
            console.print("[red]Agent tools not initialized.[/red]")
            return

        result = await agent.tools.get_session_summary(
            GetSessionSummaryInput(workspace_id=workspace_id)
        )

        if result.has_history:
            console.print(Panel(
                result.summary,
                title=f"Last Session Summary (workspace: {result.workspace_id or 'default'})",
            ))
        else:
            console.print(Panel(
                "[dim]No previous sessions found.[/dim]",
                title="Session Summary",
            ))

    _run
blast_radius function · python · L814-L966 (153 LOC)
fabric_agent/cli.py
def blast_radius(
    source_table: str = typer.Option(
        ..., "--source-table", "-s",
        help="Name of the source asset that changed (e.g. 'fact_sales')",
    ),
    workspace_ids: List[str] = typer.Option(
        ..., "--workspace-id", "-w",
        help="Workspace ID to scan (repeat for multiple workspaces)",
    ),
    change_description: str = typer.Option(
        "schema change", "--change",
        help="Description of the change (e.g. 'column dropped: customer_tier')",
    ),
    json_out: Optional[str] = typer.Option(
        None, "--json-out",
        help="Write full blast-radius JSON to this file path (consumed by execute-cascade-plan)",
    ),
    fail_on_risk: Optional[str] = typer.Option(
        None, "--fail-on-risk",
        help="Exit with code 1 if risk_level >= this level (high|critical). Used by CI to block PRs.",
    ),
):
    """
    Compute the full cross-workspace cascade blast radius for a source asset change.

    Shows which assets across all 
execute_cascade_plan function · python · L975-L1097 (123 LOC)
fabric_agent/cli.py
def execute_cascade_plan(
    plan_json: str = typer.Option(
        ..., "--plan-json", "-p",
        help="Path to blast-radius JSON file produced by `blast-radius --json-out`",
    ),
    dry_run: bool = typer.Option(
        False, "--dry-run",
        help="Print what would be executed without calling Fabric APIs",
    ),
    skip_advisory: bool = typer.Option(
        True, "--skip-advisory/--include-advisory",
        help="Skip advisory (notify_owner) steps that don't require API calls (default: skip)",
    ),
):
    """
    Execute the ordered cascade healing plan produced by `blast-radius --json-out`.

    Reads the blast-radius JSON, iterates the topologically sorted healing steps,
    and triggers pipelines / refreshes semantic models in the correct dependency order.

    This command is the GitOps Stage 2: it runs ONLY after a human reviewer approves
    the `fabric-production` GitHub Environment in the fabric-auto-heal workflow.

    Healing order (mirrors data flow, sour
freshness_scan function · python · L1101-L1191 (91 LOC)
fabric_agent/cli.py
def freshness_scan(
    workspace_ids: List[str] = typer.Option(
        ..., "--workspace-id", "-w",
        help="Workspace ID to scan (repeat for multiple workspaces)",
    ),
    sla_fact: float = typer.Option(
        1.0, "--sla-fact",
        help="SLA threshold in hours for fact_* tables",
    ),
    sla_dim: float = typer.Option(
        24.0, "--sla-dim",
        help="SLA threshold in hours for dim_* tables",
    ),
    sla_default: float = typer.Option(
        24.0, "--sla-default",
        help="Default SLA threshold in hours for unmatched tables",
    ),
    lro_timeout: int = typer.Option(
        300, "--lro-timeout",
        help="Max seconds to wait for refreshMetadata LRO per endpoint",
    ),
):
    """
    Scan SQL Endpoint sync freshness across workspaces.

    Detects tables silently stale with no Fabric alert by comparing each
    table's lastSuccessfulSyncDateTime against SLA thresholds.

    Example:
        fabric-agent freshness-scan -w ws-dataplatform -w w
table_maintenance function · python · L1195-L1277 (83 LOC)
fabric_agent/cli.py
def table_maintenance(
    workspace_ids: List[str] = typer.Option(
        ..., "--workspace-id", "-w",
        help="Workspace ID to process (repeat for multiple workspaces)",
    ),
    dry_run: bool = typer.Option(
        True, "--dry-run/--no-dry-run",
        help="True = validate only (default). False = submit jobs.",
    ),
    table_filter: Optional[List[str]] = typer.Option(
        None, "--table", "-t",
        help="Specific table name to maintain (repeat for multiple). Omit = all tables.",
    ),
    queue_threshold: int = typer.Option(
        3, "--queue-threshold",
        help="Skip submission if active jobs >= this (Trial capacity protection)",
    ),
):
    """
    Validate and optionally submit TableMaintenance jobs.

    Pre-validates table names before submission to prevent silent Spark failures.
    ALWAYS run with --dry-run first (default) to preview what would be submitted.

    Example:
        fabric-agent table-maintenance -w ws-dataplatform --dry-run
    
_configure_logging function · python · L73-L113 (41 LOC)
fabric_agent/core/agent.py
def _configure_logging(
    level: str = "INFO",
    log_file: Optional[Path] = None,
) -> None:
    """
    Configure loguru logging.
    
    Args:
        level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
        log_file: Optional file path for log output.
    """
    # Remove default handler
    logger.remove()
    
    # Add stderr handler with formatting
    logger.add(
        sys.stderr,
        level=level,
        format=(
            "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
            "<level>{level: <8}</level> | "
            "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | "
            "<level>{message}</level>"
        ),
        colorize=True,
    )
    
    # Add file handler if specified
    if log_file:
        log_file.parent.mkdir(parents=True, exist_ok=True)
        logger.add(
            str(log_file),
            level=level,
            format=(
                "{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | "
               
FabricAgent.__init__ method · python · L150-L212 (63 LOC)
fabric_agent/core/agent.py
    def __init__(
        self,
        config: Optional[AgentConfig] = None,
        auth_config: Optional[FabricAuthConfig] = None,
        storage_path: Optional[Path] = None,
        storage_backend: str = "sqlite",
        log_level: str = "INFO",
        log_file: Optional[Path] = None,
    ):
        """
        Initialize the FabricAgent.
        
        Args:
            config: Full agent configuration. If not provided, will be
                   created from other parameters.
            auth_config: Authentication configuration. If not provided,
                        will be loaded from environment variables.
            storage_path: Path for audit trail storage.
            storage_backend: "sqlite" or "json".
            log_level: Logging level.
            log_file: Optional file for logs.
        
        Example:
            >>> # Default (loads auth from environment)
            >>> agent = FabricAgent()
            >>> 
            >>> # With explicit configurat
FabricAgent.initialize method · python · L214-L286 (73 LOC)
fabric_agent/core/agent.py
    async def initialize(self) -> "FabricAgent":
        """
        Initialize the agent and all its components.
        
        This must be called before using any agent methods.
        Alternatively, use the async context manager.
        
        Returns:
            Self for method chaining.
        
        Raises:
            ValueError: If authentication configuration is missing.
        
        Example:
            >>> agent = FabricAgent()
            >>> await agent.initialize()
            >>> # Now ready to use
        """
        if self._initialized:
            logger.debug("Agent already initialized")
            return self
        
        logger.info("Starting agent initialization")
        
        if not self.config.auth:
            raise ValueError(
                "Authentication configuration required. "
                "Set AZURE_TENANT_ID and AZURE_CLIENT_ID environment variables, "
                "or provide auth_config parameter."
            )
      
Repobility (the analyzer behind this table) · https://repobility.com
FabricAgent.close method · python · L288-L303 (16 LOC)
fabric_agent/core/agent.py
    async def close(self) -> None:
        """
        Close the agent and release resources.
        
        Always call this when done, or use the async context manager.
        """
        logger.info("Closing FabricAgent")
        
        if self.client:
            await self.client.close()
            self.client = None
        
        self.tools = None
        self._initialized = False
        
        logger.info("FabricAgent closed")
FabricAgent._ensure_initialized method · python · L314-L320 (7 LOC)
fabric_agent/core/agent.py
    def _ensure_initialized(self) -> None:
        """Ensure the agent is initialized."""
        if not self._initialized or not self.tools:
            raise RuntimeError(
                "FabricAgent not initialized. "
                "Call initialize() or use async context manager."
            )
FabricAgent.list_workspaces method · python · L326-L339 (14 LOC)
fabric_agent/core/agent.py
    async def list_workspaces(self) -> ListWorkspacesOutput:
        """
        List all accessible Fabric workspaces.
        
        Returns:
            ListWorkspacesOutput with workspaces and count.
        
        Example:
            >>> result = await agent.list_workspaces()
            >>> for ws in result.workspaces:
            ...     print(f"{ws.name} ({ws.id})")
        """
        self._ensure_initialized()
        return await self.tools.list_workspaces(ListWorkspacesInput())
FabricAgent.set_workspace method · python · L341-L357 (17 LOC)
fabric_agent/core/agent.py
    async def set_workspace(self, workspace_name: str) -> SetWorkspaceOutput:
        """
        Set the active workspace for subsequent operations.
        
        Args:
            workspace_name: Name of the workspace.
        
        Returns:
            SetWorkspaceOutput with workspace_id and workspace_name.
        
        Example:
            >>> await agent.set_workspace("Sales Analytics")
        """
        self._ensure_initialized()
        return await self.tools.set_workspace(
            SetWorkspaceInput(workspace_name=workspace_name)
        )
FabricAgent.list_items method · python · L359-L376 (18 LOC)
fabric_agent/core/agent.py
    async def list_items(
        self,
        item_type: Optional[str] = None,
    ) -> ListItemsOutput:
        """
        List items in the current workspace.
        
        Args:
            item_type: Optional filter (Report, SemanticModel, etc.)
        
        Returns:
            ListItemsOutput with items and count.
        
        Example:
            >>> reports = await agent.list_items("Report")
        """
        self._ensure_initialized()
        return await self.tools.list_items(ListItemsInput(item_type=item_type))
FabricAgent.get_semantic_model method · python · L382-L402 (21 LOC)
fabric_agent/core/agent.py
    async def get_semantic_model(
        self,
        model_name: str,
    ) -> GetSemanticModelOutput:
        """
        Get details of a semantic model.
        
        Args:
            model_name: Name of the semantic model.
        
        Returns:
            GetSemanticModelOutput with model details.
        
        Example:
            >>> model = await agent.get_semantic_model("Sales Analytics")
            >>> print(f"Measures: {model.measure_count}")
        """
        self._ensure_initialized()
        return await self.tools.get_semantic_model(
            GetSemanticModelInput(model_name=model_name)
        )
FabricAgent.get_measures method · python · L404-L422 (19 LOC)
fabric_agent/core/agent.py
    async def get_measures(self, model_name: str) -> GetMeasuresOutput:
        """
        Get all measures from a semantic model.
        
        Args:
            model_name: Name of the semantic model.
        
        Returns:
            GetMeasuresOutput with measures.
        
        Example:
            >>> measures = await agent.get_measures("Sales Analytics")
            >>> for m in measures.measures:
            ...     print(m.name)
        """
        self._ensure_initialized()
        return await self.tools.get_measures(
            GetMeasuresInput(model_name=model_name)
        )
FabricAgent.get_report_definition method · python · L428-L448 (21 LOC)
fabric_agent/core/agent.py
    async def get_report_definition(
        self,
        report_name: str,
    ) -> GetReportDefinitionOutput:
        """
        Get the definition of a report.
        
        Args:
            report_name: Name of the report.
        
        Returns:
            GetReportDefinitionOutput with report structure.
        
        Example:
            >>> report = await agent.get_report_definition("Sales Dashboard")
            >>> print(f"Pages: {report.pages}")
        """
        self._ensure_initialized()
        return await self.tools.get_report_definition(
            GetReportDefinitionInput(report_name=report_name)
        )
Open data scored by Repobility · https://repobility.com
FabricAgent.analyze_impact method · python · L454-L485 (32 LOC)
fabric_agent/core/agent.py
    async def analyze_impact(
        self,
        target_type: str,
        target_name: str,
        model_name: Optional[str] = None,
    ) -> AnalyzeImpactOutput:
        """
        Analyze the impact of renaming a measure or column.
        
        ALWAYS call this before any rename operation!
        
        Args:
            target_type: "measure" or "column".
            target_name: Name of the target.
            model_name: Semantic model name (for DAX analysis).
        
        Returns:
            AnalyzeImpactOutput with affected items.
        
        Example:
            >>> impact = await agent.analyze_impact("measure", "Sales", "Analytics")
            >>> if not impact.safe_to_rename:
            ...     print(f"Would affect {impact.total_impact} items!")
        """
        self._ensure_initialized()
        return await self.tools.analyze_impact(
            AnalyzeImpactInput(
                target_type=TargetType(target_type),
                target_name=t
FabricAgent.rename_measure method · python · L491-L532 (42 LOC)
fabric_agent/core/agent.py
    async def rename_measure(
        self,
        model_name: str,
        old_name: str,
        new_name: str,
        dry_run: bool = True,
    ) -> RenameMeasureOutput:
        """
        Rename a measure across semantic model and reports.
        
        Use dry_run=True first to preview changes!
        
        Args:
            model_name: Semantic model containing the measure.
            old_name: Current measure name.
            new_name: New measure name.
            dry_run: If True, only simulate (default True).
        
        Returns:
            RenameMeasureOutput with operation results.
        
        Example:
            >>> # Preview first
            >>> result = await agent.rename_measure(
            ...     "Analytics", "Sales", "Net_Revenue", dry_run=True
            ... )
            >>> print(f"Would affect {result.impact.total_impact} items")
            >>> 
            >>> # Then execute
            >>> result = await agent.rename_measure(
       
FabricAgent.smart_rename_measure method · python · L535-L572 (38 LOC)
fabric_agent/core/agent.py
    async def smart_rename_measure(
        self,
        model_name: str,
        old_name: str,
        new_name: str,
        dry_run: bool = True,
        reasoning: Optional[str] = None,
    ) -> SmartRenameMeasureOutput:
        """
        Hero refactor: rename a measure and update dependent measures safely.

        This is the recommended API for measure renames because it:
        - Uses SemPy dependency mapping where available
        - Updates referencing measures' DAX so nothing breaks
        - Persists an append-only log to memory/refactor_log.json
        - Produces a Markdown safety UI

        Args:
            model_name: Semantic model containing the measure.
            old_name: Current measure name.
            new_name: New measure name.
            dry_run: If True, only preview (default True).
            reasoning: Reason for this change (stored in log).

        Returns:
            SmartRenameMeasureOutput with operation_id for rollback.
        """
       
‹ prevpage 3 / 17next ›