← back to krivamsh007__fabric-agent

Function bodies 807 total

All specs Real LLM only Function bodies
AgentMessage.to_dict method · python · L71-L80 (10 LOC)
fabric_agent/agents/base.py
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "from_agent": self.from_agent,
            "to_agent": self.to_agent,
            "message_type": self.message_type.value,
            "content": self.content,
            "timestamp": self.timestamp,
            "correlation_id": self.correlation_id,
        }
ToolDefinition.to_llm_format method · python · L91-L100 (10 LOC)
fabric_agent/agents/base.py
    def to_llm_format(self) -> Dict[str, Any]:
        """Convert to format expected by LLM APIs."""
        return {
            "name": self.name,
            "description": self.description,
            "input_schema": {
                "type": "object",
                "properties": self.parameters,
            }
        }
AgentResult.to_dict method · python · L133-L143 (11 LOC)
fabric_agent/agents/base.py
    def to_dict(self) -> Dict[str, Any]:
        return {
            "agent_name": self.agent_name,
            "task": self.task,
            "status": self.status.value,
            "result": self.result,
            "thoughts": [vars(t) for t in self.thoughts],
            "tool_calls": [vars(tc) for tc in self.tool_calls],
            "error": self.error,
            "execution_time_ms": self.execution_time_ms,
        }
LLMClient.complete method · python · L209-L216 (8 LOC)
fabric_agent/agents/base.py
    async def complete(
        self,
        messages: List[Dict[str, str]],
        tools: Optional[List[Dict]] = None,
        system_prompt: Optional[str] = None,
    ) -> Dict[str, Any]:
        """Send completion request to LLM."""
        pass
ClaudeLLMClient.complete method · python · L227-L265 (39 LOC)
fabric_agent/agents/base.py
    async def complete(
        self,
        messages: List[Dict[str, str]],
        tools: Optional[List[Dict]] = None,
        system_prompt: Optional[str] = None,
    ) -> Dict[str, Any]:
        """Send completion request to Claude."""
        try:
            import anthropic
            
            if self._client is None:
                self._client = anthropic.AsyncAnthropic(api_key=self.api_key)
            
            kwargs = {
                "model": self.model,
                "max_tokens": 4096,
                "messages": messages,
            }
            
            if system_prompt:
                kwargs["system"] = system_prompt
            
            if tools:
                kwargs["tools"] = tools
            
            response = await self._client.messages.create(**kwargs)
            
            return {
                "content": response.content,
                "stop_reason": response.stop_reason,
                "usage": {
                    "
MockLLMClient.complete method · python · L275-L292 (18 LOC)
fabric_agent/agents/base.py
    async def complete(
        self,
        messages: List[Dict[str, str]],
        tools: Optional[List[Dict]] = None,
        system_prompt: Optional[str] = None,
    ) -> Dict[str, Any]:
        """Return mock response."""
        if self.responses and self.call_count < len(self.responses):
            response = self.responses[self.call_count]
            self.call_count += 1
            return response
        
        # Default mock response
        return {
            "content": [{"type": "text", "text": "Mock response"}],
            "stop_reason": "end_turn",
            "usage": {"input_tokens": 100, "output_tokens": 50}
        }
BaseAgent.__init__ method · python · L316-L343 (28 LOC)
fabric_agent/agents/base.py
    def __init__(
        self,
        name: str,
        role: AgentRole,
        llm: LLMClient,
        memory: SharedMemory,
        max_iterations: int = 10,
    ):
        """
        Initialize the agent.
        
        Args:
            name: Unique agent name
            role: Agent's role
            llm: LLM client for reasoning
            memory: Shared memory
            max_iterations: Max think-act cycles
        """
        self.name = name
        self.role = role
        self.llm = llm
        self.memory = memory
        self.max_iterations = max_iterations
        self._tools: Dict[str, ToolDefinition] = {}
        
        # Register tools
        for tool in self.get_tools():
            self._tools[tool.name] = tool
About: code-quality intelligence by Repobility · https://repobility.com
BaseAgent.run method · python · L355-L516 (162 LOC)
fabric_agent/agents/base.py
    async def run(
        self,
        task: str,
        context: Optional[Dict[str, Any]] = None,
    ) -> AgentResult:
        """
        Execute the agent on a task.
        
        Args:
            task: The task to perform
            context: Additional context
        
        Returns:
            AgentResult with outcome
        """
        import time
        start_time = time.time()
        
        logger.info(f"[{self.name}] Starting task: {task[:100]}...")
        
        thoughts: List[AgentThought] = []
        tool_calls: List[ToolCall] = []
        
        # Build initial messages
        messages = [
            {"role": "user", "content": self._build_task_prompt(task, context)}
        ]
        
        # Get tools in LLM format
        llm_tools = [t.to_llm_format() for t in self._tools.values()] if self._tools else None
        
        try:
            for iteration in range(self.max_iterations):
                # Call LLM
                response = await
BaseAgent._call_tool method · python · L518-L529 (12 LOC)
fabric_agent/agents/base.py
    async def _call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
        """Execute a tool."""
        tool = self._tools.get(tool_name)
        if not tool:
            raise ValueError(f"Unknown tool: {tool_name}")
        
        # Call handler (may be sync or async)
        result = tool.handler(**arguments)
        if asyncio.iscoroutine(result):
            result = await result
        
        return result
BaseAgent._build_task_prompt method · python · L531-L538 (8 LOC)
fabric_agent/agents/base.py
    def _build_task_prompt(self, task: str, context: Optional[Dict]) -> str:
        """Build the task prompt with context."""
        prompt = f"Task: {task}"
        
        if context:
            prompt += f"\n\nContext:\n{json.dumps(context, indent=2)}"
        
        return prompt
BaseAgent.send_message method · python · L544-L554 (11 LOC)
fabric_agent/agents/base.py
    def send_message(self, to_agent: str, content: Dict[str, Any]) -> AgentMessage:
        """Send a message to another agent."""
        message = AgentMessage(
            id=str(uuid4()),
            from_agent=self.name,
            to_agent=to_agent,
            message_type=MessageType.REQUEST,
            content=content,
        )
        self.memory.add_message(message)
        return message
SimpleAgent.__init__ method · python · L568-L593 (26 LOC)
fabric_agent/agents/base.py
    def __init__(
        self,
        name: str,
        role: AgentRole,
        memory: SharedMemory,
        tools: List[ToolDefinition],
    ):
        # IMPORTANT:
        # BaseAgent.__init__ registers tools by calling self.get_tools().
        # SimpleAgent.get_tools() returns self._simple_tools, so we must
        # set it *before* calling super().__init__.
        self._simple_tools = tools

        # Use mock LLM since we won't use it
        super().__init__(
            name=name,
            role=role,
            llm=MockLLMClient(),
            memory=memory,
            max_iterations=1,
        )

        # BaseAgent already registered tools via get_tools(), but keep this
        # to ensure tools are present even if BaseAgent behavior changes.
        for tool in tools:
            self._tools[tool.name] = tool
OrchestratorLLMAgent.__init__ method · python · L200-L236 (37 LOC)
fabric_agent/agents/llm_orchestrator.py
    def __init__(
        self,
        llm: LLMClient,
        fabric_client: Any,
        memory: Optional[SharedMemory] = None,
        workspace_id: Optional[str] = None,
        workspace_name: Optional[str] = None,
        operation_memory: Optional[Any] = None,
        max_iterations: int = 20,
    ):
        """
        Args:
            llm:              LLMClient (OllamaLLMClient, ClaudeLLMClient, etc.)
            fabric_client:    FabricApiClient for Fabric REST API calls.
            memory:           Shared memory (created fresh if not provided).
            workspace_id:     Default workspace ID (can be None — LLM discovers it).
            workspace_name:   Default workspace name.
            operation_memory: Optional OperationMemory for historical RAG context.
            max_iterations:   Max LLM → tool call cycles before stopping.
        """
        super().__init__(
            name="OrchestratorLLMAgent",
            role=AgentRole.ORCHESTRATOR,
            llm=l
OrchestratorLLMAgent._get_discovery method · python · L238-L245 (8 LOC)
fabric_agent/agents/llm_orchestrator.py
    def _get_discovery(self):
        if self._discovery_agent is None:
            from fabric_agent.agents.specialized import DiscoveryAgent
            self._discovery_agent = DiscoveryAgent(
                memory=self.memory,
                fabric_client=self._fabric_client,
            )
        return self._discovery_agent
OrchestratorLLMAgent._get_impact method · python · L247-L254 (8 LOC)
fabric_agent/agents/llm_orchestrator.py
    def _get_impact(self):
        if self._impact_agent is None:
            from fabric_agent.agents.specialized import ImpactAgent
            self._impact_agent = ImpactAgent(
                memory=self.memory,
                operation_memory=self._operation_memory,
            )
        return self._impact_agent
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
OrchestratorLLMAgent._get_refactor method · python · L256-L263 (8 LOC)
fabric_agent/agents/llm_orchestrator.py
    def _get_refactor(self):
        if self._refactor_agent is None:
            from fabric_agent.agents.specialized import RefactorAgent
            self._refactor_agent = RefactorAgent(
                memory=self.memory,
                workspace_name=self._workspace_name or "",
            )
        return self._refactor_agent
OrchestratorLLMAgent.get_system_prompt method · python · L265-L275 (11 LOC)
fabric_agent/agents/llm_orchestrator.py
    def get_system_prompt(self) -> str:
        """Return the orchestrator system prompt with current context injected."""
        context_lines = []
        if self._workspace_id:
            context_lines.append(f"Current workspace ID: {self._workspace_id}")
        if self._workspace_name:
            context_lines.append(f"Current workspace name: {self._workspace_name}")
        context_lines.append(f"Session started: {datetime.now(timezone.utc).isoformat()}")

        context_block = "\n".join(context_lines)
        return f"{_SYSTEM_PROMPT}\n\nCURRENT CONTEXT:\n{context_block}"
OrchestratorLLMAgent.get_tools method · python · L277-L392 (116 LOC)
fabric_agent/agents/llm_orchestrator.py
    def get_tools(self) -> List[ToolDefinition]:
        """Define all tools exposed to the LLM."""
        return [
            ToolDefinition(
                name="discover_workspace",
                description=(
                    "Scan a Fabric workspace to build a dependency graph of all "
                    "semantic models, measures, reports, and their relationships. "
                    "MUST be called before analyze_refactor_impact."
                ),
                parameters={
                    "workspace_id": {
                        "type": "string",
                        "description": "Fabric workspace ID (GUID). Use the default if known.",
                    },
                    "workspace_name": {
                        "type": "string",
                        "description": "Human-readable workspace name for logging.",
                    },
                },
                handler=self._tool_discover_workspace,
            ),
            ToolDefin
OrchestratorLLMAgent._tool_discover_workspace method · python · L398-L436 (39 LOC)
fabric_agent/agents/llm_orchestrator.py
    async def _tool_discover_workspace(
        self,
        workspace_id: Optional[str] = None,
        workspace_name: Optional[str] = None,
    ) -> Dict[str, Any]:
        """Delegate to DiscoveryAgent."""
        ws_id = workspace_id or self._workspace_id
        ws_name = workspace_name or self._workspace_name or ""

        if not ws_id:
            return {"error": "workspace_id is required. Provide it or set workspace_id in config."}

        logger.info(f"[OrchestratorLLM] discover_workspace: {ws_name} ({ws_id})")
        agent = self._get_discovery()

        try:
            result = await agent.discover_workspace(
                workspace_id=ws_id,
                workspace_name=ws_name,
                include_measures=True,
            )
            if result.status == TaskStatus.FAILED:
                return {"error": result.error, "status": "failed"}

            # Update default workspace context for subsequent tool calls
            if not self._workspace_id:
    
OrchestratorLLMAgent._tool_find_historical_context method · python · L438-L481 (44 LOC)
fabric_agent/agents/llm_orchestrator.py
    async def _tool_find_historical_context(
        self,
        proposed_change_description: str,
    ) -> Dict[str, Any]:
        """Query OperationMemory for similar past operations."""
        if not self._operation_memory:
            return {
                "status": "no_history",
                "message": (
                    "OperationMemory not configured. Proceeding without historical context. "
                    "This is normal on a fresh deployment."
                ),
                "historical_failure_rate": 0.0,
                "recommendations": [],
            }

        logger.info(f"[OrchestratorLLM] find_historical_context: {proposed_change_description[:80]}")

        try:
            ctx = await self._operation_memory.get_risk_context(proposed_change_description)
            return {
                "status": "success",
                "historical_failure_rate": ctx.historical_failure_rate,
                "similar_operations_count": len(ctx.similar_operat
OrchestratorLLMAgent._tool_analyze_impact method · python · L483-L508 (26 LOC)
fabric_agent/agents/llm_orchestrator.py
    async def _tool_analyze_impact(
        self,
        workspace_id: str,
        model_id: str,
        measure_name: str,
        new_name: str,
    ) -> Dict[str, Any]:
        """Delegate to ImpactAgent."""
        logger.info(f"[OrchestratorLLM] analyze_impact: [{measure_name}] → [{new_name}]")
        agent = self._get_impact()

        try:
            result = await agent.analyze_change(
                change_type="rename_measure",
                workspace_id=workspace_id,
                model_id=model_id,
                measure_name=measure_name,
                new_name=new_name,
            )
            if result.status == TaskStatus.FAILED:
                return {"error": result.error, "status": "failed"}

            return {"status": "success", "impact": result.result}
        except Exception as exc:
            logger.error(f"analyze_impact failed: {exc}")
            return {"error": str(exc), "status": "failed"}
OrchestratorLLMAgent._tool_execute_refactor method · python · L510-L541 (32 LOC)
fabric_agent/agents/llm_orchestrator.py
    async def _tool_execute_refactor(
        self,
        model_name: str,
        old_name: str,
        new_name: str,
        dry_run: bool = True,
    ) -> Dict[str, Any]:
        """Delegate to RefactorAgent."""
        mode = "DRY RUN" if dry_run else "LIVE"
        logger.info(f"[OrchestratorLLM] execute_refactor ({mode}): [{old_name}] → [{new_name}]")
        agent = self._get_refactor()

        try:
            result = await agent.execute_refactor(
                refactor_type="rename_measure",
                model_name=model_name,
                old_name=old_name,
                new_name=new_name,
                dry_run=dry_run,
            )
            if result.status == TaskStatus.FAILED:
                return {"error": result.error, "status": "failed"}

            return {
                "status": "success",
                "dry_run": dry_run,
                "result": result.result,
                "checkpoint_id": result.result.get("checkpoint_id") if resul
OrchestratorLLMAgent._tool_rollback method · python · L543-L561 (19 LOC)
fabric_agent/agents/llm_orchestrator.py
    async def _tool_rollback(self, checkpoint_id: str) -> Dict[str, Any]:
        """Rollback using checkpoint_id."""
        logger.warning(f"[OrchestratorLLM] rollback: checkpoint={checkpoint_id}")
        agent = self._get_refactor()

        try:
            result = await agent.execute_refactor(
                refactor_type="rollback",
                checkpoint_id=checkpoint_id,
                dry_run=False,
            )
            return {
                "status": "success" if result.status == TaskStatus.COMPLETED else "failed",
                "result": result.result,
                "error": result.error,
            }
        except Exception as exc:
            logger.error(f"rollback failed: {exc}")
            return {"error": str(exc), "status": "failed"}
Repobility · code-quality intelligence · https://repobility.com
OrchestratorLLMAgent._tool_scan_health method · python · L563-L592 (30 LOC)
fabric_agent/agents/llm_orchestrator.py
    async def _tool_scan_health(
        self,
        workspace_ids: Optional[List[str]] = None,
        dry_run: bool = True,
    ) -> Dict[str, Any]:
        """Delegate to SelfHealingMonitor."""
        ws_ids = workspace_ids or ([self._workspace_id] if self._workspace_id else [])
        if not ws_ids:
            return {"error": "workspace_ids required for health scan"}

        logger.info(f"[OrchestratorLLM] scan_health: {ws_ids}")

        try:
            from fabric_agent.healing.monitor import SelfHealingMonitor
            monitor = SelfHealingMonitor(
                fabric_client=self._fabric_client,
                memory_manager=None,
            )
            report = await monitor.run_once(workspace_ids=ws_ids, dry_run=dry_run)
            return {
                "status": "success",
                "total_assets": report.total_assets,
                "anomalies_found": report.anomalies_found,
                "auto_healed": report.auto_healed,
                "manu
OrchestratorLLMAgent._tool_get_session_summary method · python · L594-L605 (12 LOC)
fabric_agent/agents/llm_orchestrator.py
    async def _tool_get_session_summary(self) -> Dict[str, Any]:
        """Return a summary of this session's activity."""
        tool_calls = self.memory.get("session_tool_calls", [])
        return {
            "status": "success",
            "session_id": self.memory.get("session_id", str(uuid4())),
            "tool_calls_made": len(tool_calls),
            "tool_sequence": [tc.get("name") for tc in tool_calls],
            "workspace_id": self._workspace_id,
            "workspace_name": self._workspace_name,
            "timestamp": datetime.now(timezone.utc).isoformat(),
        }
OrchestratorLLMAgent._extract_result method · python · L611-L632 (22 LOC)
fabric_agent/agents/llm_orchestrator.py
    def _extract_result(self, text_content: str, context: Optional[Dict]) -> Any:
        """
        Check if the LLM output contains the APPROVAL_REQUIRED signal.

        If the LLM output starts with "APPROVAL_REQUIRED:", extract the risk
        summary and return a structured dict with approval_required=True.
        The caller (MCP tool, CLI, tests) can check this flag and pause.
        """
        if text_content and text_content.strip().startswith("APPROVAL_REQUIRED:"):
            risk_summary = text_content.strip()[len("APPROVAL_REQUIRED:"):].strip()
            logger.warning(f"[OrchestratorLLM] APPROVAL_REQUIRED: {risk_summary[:200]}")
            return {
                "approval_required": True,
                "risk_summary": risk_summary,
                "status": "awaiting_approval",
            }

        return {
            "approval_required": False,
            "summary": text_content.strip() if text_content else "Completed.",
            "status": "completed",
create_llm_orchestrator function · python · L639-L681 (43 LOC)
fabric_agent/agents/llm_orchestrator.py
def create_llm_orchestrator(
    config: Any,
    fabric_client: Any,
    memory: Optional[SharedMemory] = None,
    workspace_id: Optional[str] = None,
    workspace_name: Optional[str] = None,
    operation_memory: Optional[Any] = None,
) -> OrchestratorLLMAgent:
    """
    Factory: build an OrchestratorLLMAgent from config.

    Reads llm_provider from config to select the right LLM client,
    then wires everything into OrchestratorLLMAgent.

    Args:
        config:           AgentConfig with llm_provider + provider-specific fields.
        fabric_client:    Initialized FabricApiClient.
        memory:           Shared memory (created fresh if None).
        workspace_id:     Default workspace ID.
        workspace_name:   Default workspace name.
        operation_memory: Optional OperationMemory for RAG context.

    Returns:
        OrchestratorLLMAgent ready to call .run(task).

    Example:
        config = AgentConfig()
        orchestrator = create_llm_orchestrator(config,
ApprovalRequest.to_dict method · python · L81-L93 (13 LOC)
fabric_agent/agents/orchestrator.py
    def to_dict(self) -> Dict[str, Any]:
        return {
            "id": self.id,
            "workflow_id": self.workflow_id,
            "change_summary": self.change_summary,
            "risk_level": self.risk_level,
            "impact_report": self.impact_report,
            "requested_at": self.requested_at,
            "status": self.status.value,
            "approved_by": self.approved_by,
            "approved_at": self.approved_at,
            "rejection_reason": self.rejection_reason,
        }
WorkflowResult.to_dict method · python · L109-L120 (12 LOC)
fabric_agent/agents/orchestrator.py
    def to_dict(self) -> Dict[str, Any]:
        return {
            "workflow_id": self.workflow_id,
            "workflow_type": self.workflow_type,
            "status": self.status.value,
            "started_at": self.started_at,
            "completed_at": self.completed_at,
            "agent_results": {k: v.to_dict() for k, v in self.agent_results.items()},
            "approval_request": self.approval_request.to_dict() if self.approval_request else None,
            "final_result": self.final_result,
            "error": self.error,
        }
DeploymentPipelineManager.get_or_create_pipeline method · python · L142-L192 (51 LOC)
fabric_agent/agents/orchestrator.py
    async def get_or_create_pipeline(
        self,
        name: str,
        source_workspace_id: str,
        target_workspace_id: str,
    ) -> Dict:
        """Get existing or create new deployment pipeline."""
        
        # List existing pipelines
        try:
            response = await self.client.get("/deploymentPipelines")
            pipelines = response.get("value", [])
            
            for pipeline in pipelines:
                if pipeline.get("displayName") == name:
                    self._pipelines[name] = pipeline
                    return pipeline
        except Exception as e:
            logger.warning(f"Could not list pipelines: {e}")
        
        # Create new pipeline
        try:
            pipeline = await self.client.post(
                "/deploymentPipelines",
                json={
                    "displayName": name,
                    "description": f"Deployment pipeline for refactoring operations",
                }
            )
DeploymentPipelineManager.create_deployment_request method · python · L194-L240 (47 LOC)
fabric_agent/agents/orchestrator.py
    async def create_deployment_request(
        self,
        pipeline_id: str,
        source_stage: int = 0,
        target_stage: int = 1,
        items: Optional[List[Dict]] = None,
        note: str = "",
    ) -> Dict:
        """
        Create a deployment request (triggers approval flow).
        
        Args:
            pipeline_id: Deployment pipeline ID
            source_stage: Source stage order (0=DEV)
            target_stage: Target stage order (1=PROD)
            items: Specific items to deploy (None = all)
            note: Deployment note
        """
        payload = {
            "sourceStageOrder": source_stage,
            "isBackwardDeployment": False,
            "newWorkspace": {
                "name": None,  # Use existing
            },
            "options": {
                "allowCreateArtifact": False,
                "allowOverwriteArtifact": True,
                "allowOverwriteTargetArtifactLabel": False,
                "allowPurgeData": False,
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
DeploymentPipelineManager.get_deployment_status method · python · L242-L255 (14 LOC)
fabric_agent/agents/orchestrator.py
    async def get_deployment_status(
        self,
        pipeline_id: str,
        operation_id: str,
    ) -> Dict:
        """Get status of a deployment operation."""
        try:
            response = await self.client.get(
                f"/deploymentPipelines/{pipeline_id}/operations/{operation_id}"
            )
            return response
        except Exception as e:
            logger.error(f"Could not get deployment status: {e}")
            raise
DeploymentPipelineManager.wait_for_deployment method · python · L257-L287 (31 LOC)
fabric_agent/agents/orchestrator.py
    async def wait_for_deployment(
        self,
        pipeline_id: str,
        operation_id: str,
        timeout_seconds: int = 3600,
        poll_interval: int = 30,
    ) -> Dict:
        """
        Wait for deployment to complete.
        
        Note: Fabric Deployment Pipelines handle approval in their UI.
        This polls until the deployment is approved and completed.
        """
        import time
        start_time = time.time()
        
        while time.time() - start_time < timeout_seconds:
            status = await self.get_deployment_status(pipeline_id, operation_id)
            state = status.get("status", "Unknown")
            
            if state == "Succeeded":
                return {"status": "approved", "result": status}
            elif state == "Failed":
                return {"status": "failed", "result": status}
            elif state == "Cancelled":
                return {"status": "rejected", "result": status}
            
            logger.i
SimpleApprovalManager.create_request method · python · L307-L330 (24 LOC)
fabric_agent/agents/orchestrator.py
    async def create_request(
        self,
        workflow_id: str,
        change_summary: str,
        risk_level: str,
        impact_report: Dict,
    ) -> ApprovalRequest:
        """Create an approval request."""
        request = ApprovalRequest(
            id=str(uuid4()),
            workflow_id=workflow_id,
            change_summary=change_summary,
            risk_level=risk_level,
            impact_report=impact_report,
            requested_at=datetime.now(timezone.utc).isoformat(),
        )
        
        self._requests[request.id] = request
        
        logger.info(f"Approval request created: {request.id}")
        logger.info(f"  Change: {change_summary}")
        logger.info(f"  Risk: {risk_level}")
        
        return request
SimpleApprovalManager.approve method · python · L332-L348 (17 LOC)
fabric_agent/agents/orchestrator.py
    async def approve(
        self,
        request_id: str,
        approved_by: str = "user",
    ) -> ApprovalRequest:
        """Approve a request."""
        request = self._requests.get(request_id)
        if not request:
            raise ValueError(f"Request not found: {request_id}")
        
        request.status = ApprovalStatus.APPROVED
        request.approved_by = approved_by
        request.approved_at = datetime.now(timezone.utc).isoformat()
        
        logger.info(f"Request approved: {request_id} by {approved_by}")
        
        return request
SimpleApprovalManager.reject method · python · L350-L368 (19 LOC)
fabric_agent/agents/orchestrator.py
    async def reject(
        self,
        request_id: str,
        rejected_by: str = "user",
        reason: str = "",
    ) -> ApprovalRequest:
        """Reject a request."""
        request = self._requests.get(request_id)
        if not request:
            raise ValueError(f"Request not found: {request_id}")
        
        request.status = ApprovalStatus.REJECTED
        request.approved_by = rejected_by
        request.approved_at = datetime.now(timezone.utc).isoformat()
        request.rejection_reason = reason
        
        logger.info(f"Request rejected: {request_id} by {rejected_by}")
        
        return request
SimpleApprovalManager.get_status method · python · L370-L375 (6 LOC)
fabric_agent/agents/orchestrator.py
    async def get_status(self, request_id: str) -> ApprovalStatus:
        """Get approval status."""
        request = self._requests.get(request_id)
        if not request:
            raise ValueError(f"Request not found: {request_id}")
        return request.status
AgentOrchestrator.__init__ method · python · L397-L448 (52 LOC)
fabric_agent/agents/orchestrator.py
    def __init__(
        self,
        fabric_client: Any,
        workspace_name: str,
        workspace_id: Optional[str] = None,
        use_deployment_pipelines: bool = False,
    ):
        """
        Initialize the orchestrator.
        
        Args:
            fabric_client: FabricApiClient instance
            workspace_name: Default workspace name
            workspace_id: Default workspace ID
            use_deployment_pipelines: If True, use Fabric pipelines for approval
        """
        self.fabric_client = fabric_client
        self.workspace_name = workspace_name
        self.workspace_id = workspace_id
        
        # Shared memory for all agents
        self.memory = SharedMemory()
        
        # Initialize agents
        self.discovery = DiscoveryAgent(
            memory=self.memory,
            fabric_client=fabric_client,
        )
        
        self.impact = ImpactAgent(memory=self.memory)
        
        self.refactor = RefactorAgent(
           
AgentOrchestrator._execute_steps_4_and_5 method · python · L696-L817 (122 LOC)
fabric_agent/agents/orchestrator.py
    async def _execute_steps_4_and_5(
        self,
        workflow: "WorkflowResult",
        params: Dict[str, Any],
    ) -> "WorkflowResult":
        """
        Execute Steps 4 (Refactor) and 5 (Validation) of safe_refactor_workflow.

        Extracted so that approve_workflow() can resume a paused workflow without
        re-running discovery or impact analysis (Steps 1+2 already completed).

        WHY THIS MATTERS:
          Before this fix, approve_workflow() set status=APPROVED and returned —
          the actual refactoring never happened. High-risk workflows were permanently
          stuck in AWAITING_APPROVAL after approval.

        Args:
            workflow: The WorkflowResult object (already in APPROVED state).
            params:   Dict with refactor_type, model_name, old_name, new_name,
                      dry_run, model_id — all captured in safe_refactor_workflow().

        Returns:
            Updated WorkflowResult with COMPLETED, ROLLED_BACK, or FAILED stat
About: code-quality intelligence by Repobility · https://repobility.com
AgentOrchestrator.approve_workflow method · python · L819-L862 (44 LOC)
fabric_agent/agents/orchestrator.py
    async def approve_workflow(
        self,
        request_id: str,
        approved_by: str = "user",
    ) -> WorkflowResult:
        """
        Approve a pending workflow and continue execution.

        Call this after receiving approval for a high-risk change.
        Retrieves the saved continuation params and resumes Steps 4+5.
        """
        if isinstance(self.approval, SimpleApprovalManager):
            request = await self.approval.approve(request_id, approved_by)
            workflow_id = request.workflow_id

            workflow = self._workflows.get(workflow_id)
            if not workflow:
                raise ValueError(f"Workflow not found: {workflow_id}")

            if workflow.status != WorkflowStatus.AWAITING_APPROVAL:
                raise ValueError(f"Workflow not awaiting approval: {workflow.status}")

            workflow.status = WorkflowStatus.APPROVED
            logger.info(f"Workflow approved by {approved_by!r} — resuming execution: {workflow_id
AgentOrchestrator.reject_workflow method · python · L864-L884 (21 LOC)
fabric_agent/agents/orchestrator.py
    async def reject_workflow(
        self,
        request_id: str,
        rejected_by: str = "user",
        reason: str = "",
    ) -> WorkflowResult:
        """Reject a pending workflow."""
        if isinstance(self.approval, SimpleApprovalManager):
            request = await self.approval.reject(request_id, rejected_by, reason)
            workflow_id = request.workflow_id
            
            workflow = self._workflows.get(workflow_id)
            if workflow:
                workflow.status = WorkflowStatus.REJECTED
                workflow.completed_at = datetime.now(timezone.utc).isoformat()
                workflow.error = f"Rejected by {rejected_by}: {reason}"
            
            return workflow
        else:
            logger.warning("reject_workflow called with non-SimpleApprovalManager — no-op")
            return None
DiscoveryAgent.__init__ method · python · L48-L96 (49 LOC)
fabric_agent/agents/specialized.py
    def __init__(
        self,
        memory: SharedMemory,
        fabric_client: Any,  # FabricApiClient
    ):
        self.fabric_client = fabric_client
        
        tools = [
            ToolDefinition(
                name="list_workspaces",
                description="List all accessible workspaces",
                parameters={},
                handler=self._list_workspaces,
            ),
            ToolDefinition(
                name="scan_workspace",
                description="Scan a workspace and list all items",
                parameters={
                    "workspace_id": {"type": "string", "description": "Workspace ID"},
                },
                handler=self._scan_workspace,
            ),
            ToolDefinition(
                name="build_dependency_graph",
                description="Build dependency graph for a workspace",
                parameters={
                    "workspace_id": {"type": "string", "description": "Workspace ID"},
DiscoveryAgent._list_workspaces method · python · L98-L106 (9 LOC)
fabric_agent/agents/specialized.py
    async def _list_workspaces(self) -> List[Dict]:
        """List all workspaces."""
        response = await self.fabric_client.get("/workspaces")
        workspaces = response.get("value", [])
        
        # Cache in memory
        self.memory.set("workspaces", workspaces)
        
        return [{"id": w["id"], "name": w.get("displayName", "")} for w in workspaces]
DiscoveryAgent._scan_workspace method · python · L108-L134 (27 LOC)
fabric_agent/agents/specialized.py
    async def _scan_workspace(self, workspace_id: str) -> Dict:
        """Scan a workspace for items."""
        response = await self.fabric_client.get(f"/workspaces/{workspace_id}/items")
        items = response.get("value", [])
        
        # Categorize items
        by_type = {}
        for item in items:
            item_type = item.get("type", "Unknown")
            if item_type not in by_type:
                by_type[item_type] = []
            by_type[item_type].append({
                "id": item["id"],
                "name": item.get("displayName", ""),
                "type": item_type,
            })
        
        result = {
            "workspace_id": workspace_id,
            "total_items": len(items),
            "by_type": by_type,
        }
        
        # Cache in memory
        self.memory.set(f"workspace_items_{workspace_id}", result)
        
        return result
DiscoveryAgent._build_graph method · python · L136-L160 (25 LOC)
fabric_agent/agents/specialized.py
    async def _build_graph(
        self,
        workspace_id: str,
        workspace_name: str,
        include_measures: bool = True,
    ) -> Dict:
        """Build dependency graph for workspace."""
        from fabric_agent.tools.workspace_graph import WorkspaceGraphBuilder
        
        builder = WorkspaceGraphBuilder(include_measure_graph=include_measures)
        graph = await builder.build(
            client=self.fabric_client,
            workspace_id=workspace_id,
            workspace_name=workspace_name,
        )
        
        # Cache in memory
        self.memory.set(f"dependency_graph_{workspace_id}", graph)
        
        return {
            "workspace_id": workspace_id,
            "nodes": len(graph.get("nodes", [])),
            "edges": len(graph.get("edges", [])),
            "stats": graph.get("stats", {}),
        }
DiscoveryAgent._get_model_definition method · python · L162-L168 (7 LOC)
fabric_agent/agents/specialized.py
    async def _get_model_definition(self, workspace_id: str, model_id: str) -> Dict:
        """Get semantic model definition."""
        response = await self.fabric_client.post(
            f"/workspaces/{workspace_id}/semanticModels/{model_id}/getDefinition",
            json={"format": "TMDL"}
        )
        return response
DiscoveryAgent.discover_workspace method · python · L170-L214 (45 LOC)
fabric_agent/agents/specialized.py
    async def discover_workspace(
        self,
        workspace_id: str,
        workspace_name: str,
        include_measures: bool = True,
    ) -> AgentResult:
        """
        Full discovery of a workspace.
        
        Combines scanning + graph building.
        """
        import time
        start = time.time()
        
        try:
            # Scan items
            items = await self._scan_workspace(workspace_id)
            
            # Build graph
            graph = await self._build_graph(workspace_id, workspace_name, include_measures)
            
            result = {
                "workspace_id": workspace_id,
                "workspace_name": workspace_name,
                "items": items,
                "graph": graph,
            }
            
            return AgentResult(
                agent_name=self.name,
                task="discover_workspace",
                status=TaskStatus.COMPLETED,
                result=result,
                exec
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
ImpactAgent.__init__ method · python · L248-L294 (47 LOC)
fabric_agent/agents/specialized.py
    def __init__(self, memory: SharedMemory, operation_memory=None):
        """
        Args:
            memory:           Shared in-memory state (workspace graphs, etc.).
            operation_memory: Optional OperationMemory instance for historical
                              risk calibration. If None, base score only.
        """
        self._operation_memory = operation_memory
        tools = [
            ToolDefinition(
                name="analyze_measure_rename",
                description="Analyze impact of renaming a measure",
                parameters={
                    "workspace_id": {"type": "string"},
                    "model_id": {"type": "string"},
                    "measure_name": {"type": "string"},
                    "new_name": {"type": "string"},
                },
                handler=self._analyze_measure_rename,
            ),
            ToolDefinition(
                name="analyze_notebook_change",
                description="Analyze impa
ImpactAgent._analyze_measure_rename method · python · L296-L319 (24 LOC)
fabric_agent/agents/specialized.py
    async def _analyze_measure_rename(
        self,
        workspace_id: str,
        model_id: str,
        measure_name: str,
        new_name: str,
    ) -> Dict:
        """Analyze impact of measure rename."""
        from fabric_agent.tools.workspace_graph import GraphImpactAnalyzer
        
        # Get cached graph
        graph = self.memory.get(f"dependency_graph_{workspace_id}")
        
        if not graph:
            return {"error": "No cached graph. Run DiscoveryAgent first."}
        
        analyzer = GraphImpactAnalyzer(graph)
        impact = analyzer.analyze_measure_rename_impact(model_id, measure_name)
        
        # Enhance with additional analysis
        impact["proposed_new_name"] = new_name
        impact["requires_approval"] = impact["risk_level"] in ("high_risk", "critical")
        
        return impact
ImpactAgent._analyze_notebook_change method · python · L321-L335 (15 LOC)
fabric_agent/agents/specialized.py
    async def _analyze_notebook_change(
        self,
        workspace_id: str,
        notebook_id: str,
    ) -> Dict:
        """Analyze impact of notebook change."""
        from fabric_agent.tools.workspace_graph import GraphImpactAnalyzer
        
        graph = self.memory.get(f"dependency_graph_{workspace_id}")
        
        if not graph:
            return {"error": "No cached graph. Run DiscoveryAgent first."}
        
        analyzer = GraphImpactAnalyzer(graph)
        return analyzer.analyze_notebook_change_impact(notebook_id)
page 1 / 17next ›