Function bodies 807 total
AgentMessage.to_dict method · python · L71-L80 (10 LOC)fabric_agent/agents/base.py
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"from_agent": self.from_agent,
"to_agent": self.to_agent,
"message_type": self.message_type.value,
"content": self.content,
"timestamp": self.timestamp,
"correlation_id": self.correlation_id,
}ToolDefinition.to_llm_format method · python · L91-L100 (10 LOC)fabric_agent/agents/base.py
def to_llm_format(self) -> Dict[str, Any]:
"""Convert to format expected by LLM APIs."""
return {
"name": self.name,
"description": self.description,
"input_schema": {
"type": "object",
"properties": self.parameters,
}
}AgentResult.to_dict method · python · L133-L143 (11 LOC)fabric_agent/agents/base.py
def to_dict(self) -> Dict[str, Any]:
return {
"agent_name": self.agent_name,
"task": self.task,
"status": self.status.value,
"result": self.result,
"thoughts": [vars(t) for t in self.thoughts],
"tool_calls": [vars(tc) for tc in self.tool_calls],
"error": self.error,
"execution_time_ms": self.execution_time_ms,
}LLMClient.complete method · python · L209-L216 (8 LOC)fabric_agent/agents/base.py
async def complete(
self,
messages: List[Dict[str, str]],
tools: Optional[List[Dict]] = None,
system_prompt: Optional[str] = None,
) -> Dict[str, Any]:
"""Send completion request to LLM."""
passClaudeLLMClient.complete method · python · L227-L265 (39 LOC)fabric_agent/agents/base.py
async def complete(
self,
messages: List[Dict[str, str]],
tools: Optional[List[Dict]] = None,
system_prompt: Optional[str] = None,
) -> Dict[str, Any]:
"""Send completion request to Claude."""
try:
import anthropic
if self._client is None:
self._client = anthropic.AsyncAnthropic(api_key=self.api_key)
kwargs = {
"model": self.model,
"max_tokens": 4096,
"messages": messages,
}
if system_prompt:
kwargs["system"] = system_prompt
if tools:
kwargs["tools"] = tools
response = await self._client.messages.create(**kwargs)
return {
"content": response.content,
"stop_reason": response.stop_reason,
"usage": {
"MockLLMClient.complete method · python · L275-L292 (18 LOC)fabric_agent/agents/base.py
async def complete(
self,
messages: List[Dict[str, str]],
tools: Optional[List[Dict]] = None,
system_prompt: Optional[str] = None,
) -> Dict[str, Any]:
"""Return mock response."""
if self.responses and self.call_count < len(self.responses):
response = self.responses[self.call_count]
self.call_count += 1
return response
# Default mock response
return {
"content": [{"type": "text", "text": "Mock response"}],
"stop_reason": "end_turn",
"usage": {"input_tokens": 100, "output_tokens": 50}
}BaseAgent.__init__ method · python · L316-L343 (28 LOC)fabric_agent/agents/base.py
def __init__(
self,
name: str,
role: AgentRole,
llm: LLMClient,
memory: SharedMemory,
max_iterations: int = 10,
):
"""
Initialize the agent.
Args:
name: Unique agent name
role: Agent's role
llm: LLM client for reasoning
memory: Shared memory
max_iterations: Max think-act cycles
"""
self.name = name
self.role = role
self.llm = llm
self.memory = memory
self.max_iterations = max_iterations
self._tools: Dict[str, ToolDefinition] = {}
# Register tools
for tool in self.get_tools():
self._tools[tool.name] = toolAbout: code-quality intelligence by Repobility · https://repobility.com
BaseAgent.run method · python · L355-L516 (162 LOC)fabric_agent/agents/base.py
async def run(
self,
task: str,
context: Optional[Dict[str, Any]] = None,
) -> AgentResult:
"""
Execute the agent on a task.
Args:
task: The task to perform
context: Additional context
Returns:
AgentResult with outcome
"""
import time
start_time = time.time()
logger.info(f"[{self.name}] Starting task: {task[:100]}...")
thoughts: List[AgentThought] = []
tool_calls: List[ToolCall] = []
# Build initial messages
messages = [
{"role": "user", "content": self._build_task_prompt(task, context)}
]
# Get tools in LLM format
llm_tools = [t.to_llm_format() for t in self._tools.values()] if self._tools else None
try:
for iteration in range(self.max_iterations):
# Call LLM
response = awaitBaseAgent._call_tool method · python · L518-L529 (12 LOC)fabric_agent/agents/base.py
async def _call_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Any:
"""Execute a tool."""
tool = self._tools.get(tool_name)
if not tool:
raise ValueError(f"Unknown tool: {tool_name}")
# Call handler (may be sync or async)
result = tool.handler(**arguments)
if asyncio.iscoroutine(result):
result = await result
return resultBaseAgent._build_task_prompt method · python · L531-L538 (8 LOC)fabric_agent/agents/base.py
def _build_task_prompt(self, task: str, context: Optional[Dict]) -> str:
"""Build the task prompt with context."""
prompt = f"Task: {task}"
if context:
prompt += f"\n\nContext:\n{json.dumps(context, indent=2)}"
return promptBaseAgent.send_message method · python · L544-L554 (11 LOC)fabric_agent/agents/base.py
def send_message(self, to_agent: str, content: Dict[str, Any]) -> AgentMessage:
"""Send a message to another agent."""
message = AgentMessage(
id=str(uuid4()),
from_agent=self.name,
to_agent=to_agent,
message_type=MessageType.REQUEST,
content=content,
)
self.memory.add_message(message)
return messageSimpleAgent.__init__ method · python · L568-L593 (26 LOC)fabric_agent/agents/base.py
def __init__(
self,
name: str,
role: AgentRole,
memory: SharedMemory,
tools: List[ToolDefinition],
):
# IMPORTANT:
# BaseAgent.__init__ registers tools by calling self.get_tools().
# SimpleAgent.get_tools() returns self._simple_tools, so we must
# set it *before* calling super().__init__.
self._simple_tools = tools
# Use mock LLM since we won't use it
super().__init__(
name=name,
role=role,
llm=MockLLMClient(),
memory=memory,
max_iterations=1,
)
# BaseAgent already registered tools via get_tools(), but keep this
# to ensure tools are present even if BaseAgent behavior changes.
for tool in tools:
self._tools[tool.name] = toolOrchestratorLLMAgent.__init__ method · python · L200-L236 (37 LOC)fabric_agent/agents/llm_orchestrator.py
def __init__(
self,
llm: LLMClient,
fabric_client: Any,
memory: Optional[SharedMemory] = None,
workspace_id: Optional[str] = None,
workspace_name: Optional[str] = None,
operation_memory: Optional[Any] = None,
max_iterations: int = 20,
):
"""
Args:
llm: LLMClient (OllamaLLMClient, ClaudeLLMClient, etc.)
fabric_client: FabricApiClient for Fabric REST API calls.
memory: Shared memory (created fresh if not provided).
workspace_id: Default workspace ID (can be None — LLM discovers it).
workspace_name: Default workspace name.
operation_memory: Optional OperationMemory for historical RAG context.
max_iterations: Max LLM → tool call cycles before stopping.
"""
super().__init__(
name="OrchestratorLLMAgent",
role=AgentRole.ORCHESTRATOR,
llm=lOrchestratorLLMAgent._get_discovery method · python · L238-L245 (8 LOC)fabric_agent/agents/llm_orchestrator.py
def _get_discovery(self):
if self._discovery_agent is None:
from fabric_agent.agents.specialized import DiscoveryAgent
self._discovery_agent = DiscoveryAgent(
memory=self.memory,
fabric_client=self._fabric_client,
)
return self._discovery_agentOrchestratorLLMAgent._get_impact method · python · L247-L254 (8 LOC)fabric_agent/agents/llm_orchestrator.py
def _get_impact(self):
if self._impact_agent is None:
from fabric_agent.agents.specialized import ImpactAgent
self._impact_agent = ImpactAgent(
memory=self.memory,
operation_memory=self._operation_memory,
)
return self._impact_agentRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
OrchestratorLLMAgent._get_refactor method · python · L256-L263 (8 LOC)fabric_agent/agents/llm_orchestrator.py
def _get_refactor(self):
if self._refactor_agent is None:
from fabric_agent.agents.specialized import RefactorAgent
self._refactor_agent = RefactorAgent(
memory=self.memory,
workspace_name=self._workspace_name or "",
)
return self._refactor_agentOrchestratorLLMAgent.get_system_prompt method · python · L265-L275 (11 LOC)fabric_agent/agents/llm_orchestrator.py
def get_system_prompt(self) -> str:
"""Return the orchestrator system prompt with current context injected."""
context_lines = []
if self._workspace_id:
context_lines.append(f"Current workspace ID: {self._workspace_id}")
if self._workspace_name:
context_lines.append(f"Current workspace name: {self._workspace_name}")
context_lines.append(f"Session started: {datetime.now(timezone.utc).isoformat()}")
context_block = "\n".join(context_lines)
return f"{_SYSTEM_PROMPT}\n\nCURRENT CONTEXT:\n{context_block}"OrchestratorLLMAgent.get_tools method · python · L277-L392 (116 LOC)fabric_agent/agents/llm_orchestrator.py
def get_tools(self) -> List[ToolDefinition]:
"""Define all tools exposed to the LLM."""
return [
ToolDefinition(
name="discover_workspace",
description=(
"Scan a Fabric workspace to build a dependency graph of all "
"semantic models, measures, reports, and their relationships. "
"MUST be called before analyze_refactor_impact."
),
parameters={
"workspace_id": {
"type": "string",
"description": "Fabric workspace ID (GUID). Use the default if known.",
},
"workspace_name": {
"type": "string",
"description": "Human-readable workspace name for logging.",
},
},
handler=self._tool_discover_workspace,
),
ToolDefinOrchestratorLLMAgent._tool_discover_workspace method · python · L398-L436 (39 LOC)fabric_agent/agents/llm_orchestrator.py
async def _tool_discover_workspace(
self,
workspace_id: Optional[str] = None,
workspace_name: Optional[str] = None,
) -> Dict[str, Any]:
"""Delegate to DiscoveryAgent."""
ws_id = workspace_id or self._workspace_id
ws_name = workspace_name or self._workspace_name or ""
if not ws_id:
return {"error": "workspace_id is required. Provide it or set workspace_id in config."}
logger.info(f"[OrchestratorLLM] discover_workspace: {ws_name} ({ws_id})")
agent = self._get_discovery()
try:
result = await agent.discover_workspace(
workspace_id=ws_id,
workspace_name=ws_name,
include_measures=True,
)
if result.status == TaskStatus.FAILED:
return {"error": result.error, "status": "failed"}
# Update default workspace context for subsequent tool calls
if not self._workspace_id:
OrchestratorLLMAgent._tool_find_historical_context method · python · L438-L481 (44 LOC)fabric_agent/agents/llm_orchestrator.py
async def _tool_find_historical_context(
self,
proposed_change_description: str,
) -> Dict[str, Any]:
"""Query OperationMemory for similar past operations."""
if not self._operation_memory:
return {
"status": "no_history",
"message": (
"OperationMemory not configured. Proceeding without historical context. "
"This is normal on a fresh deployment."
),
"historical_failure_rate": 0.0,
"recommendations": [],
}
logger.info(f"[OrchestratorLLM] find_historical_context: {proposed_change_description[:80]}")
try:
ctx = await self._operation_memory.get_risk_context(proposed_change_description)
return {
"status": "success",
"historical_failure_rate": ctx.historical_failure_rate,
"similar_operations_count": len(ctx.similar_operatOrchestratorLLMAgent._tool_analyze_impact method · python · L483-L508 (26 LOC)fabric_agent/agents/llm_orchestrator.py
async def _tool_analyze_impact(
self,
workspace_id: str,
model_id: str,
measure_name: str,
new_name: str,
) -> Dict[str, Any]:
"""Delegate to ImpactAgent."""
logger.info(f"[OrchestratorLLM] analyze_impact: [{measure_name}] → [{new_name}]")
agent = self._get_impact()
try:
result = await agent.analyze_change(
change_type="rename_measure",
workspace_id=workspace_id,
model_id=model_id,
measure_name=measure_name,
new_name=new_name,
)
if result.status == TaskStatus.FAILED:
return {"error": result.error, "status": "failed"}
return {"status": "success", "impact": result.result}
except Exception as exc:
logger.error(f"analyze_impact failed: {exc}")
return {"error": str(exc), "status": "failed"}OrchestratorLLMAgent._tool_execute_refactor method · python · L510-L541 (32 LOC)fabric_agent/agents/llm_orchestrator.py
async def _tool_execute_refactor(
self,
model_name: str,
old_name: str,
new_name: str,
dry_run: bool = True,
) -> Dict[str, Any]:
"""Delegate to RefactorAgent."""
mode = "DRY RUN" if dry_run else "LIVE"
logger.info(f"[OrchestratorLLM] execute_refactor ({mode}): [{old_name}] → [{new_name}]")
agent = self._get_refactor()
try:
result = await agent.execute_refactor(
refactor_type="rename_measure",
model_name=model_name,
old_name=old_name,
new_name=new_name,
dry_run=dry_run,
)
if result.status == TaskStatus.FAILED:
return {"error": result.error, "status": "failed"}
return {
"status": "success",
"dry_run": dry_run,
"result": result.result,
"checkpoint_id": result.result.get("checkpoint_id") if resulOrchestratorLLMAgent._tool_rollback method · python · L543-L561 (19 LOC)fabric_agent/agents/llm_orchestrator.py
async def _tool_rollback(self, checkpoint_id: str) -> Dict[str, Any]:
"""Rollback using checkpoint_id."""
logger.warning(f"[OrchestratorLLM] rollback: checkpoint={checkpoint_id}")
agent = self._get_refactor()
try:
result = await agent.execute_refactor(
refactor_type="rollback",
checkpoint_id=checkpoint_id,
dry_run=False,
)
return {
"status": "success" if result.status == TaskStatus.COMPLETED else "failed",
"result": result.result,
"error": result.error,
}
except Exception as exc:
logger.error(f"rollback failed: {exc}")
return {"error": str(exc), "status": "failed"}Repobility · code-quality intelligence · https://repobility.com
OrchestratorLLMAgent._tool_scan_health method · python · L563-L592 (30 LOC)fabric_agent/agents/llm_orchestrator.py
async def _tool_scan_health(
self,
workspace_ids: Optional[List[str]] = None,
dry_run: bool = True,
) -> Dict[str, Any]:
"""Delegate to SelfHealingMonitor."""
ws_ids = workspace_ids or ([self._workspace_id] if self._workspace_id else [])
if not ws_ids:
return {"error": "workspace_ids required for health scan"}
logger.info(f"[OrchestratorLLM] scan_health: {ws_ids}")
try:
from fabric_agent.healing.monitor import SelfHealingMonitor
monitor = SelfHealingMonitor(
fabric_client=self._fabric_client,
memory_manager=None,
)
report = await monitor.run_once(workspace_ids=ws_ids, dry_run=dry_run)
return {
"status": "success",
"total_assets": report.total_assets,
"anomalies_found": report.anomalies_found,
"auto_healed": report.auto_healed,
"manuOrchestratorLLMAgent._tool_get_session_summary method · python · L594-L605 (12 LOC)fabric_agent/agents/llm_orchestrator.py
async def _tool_get_session_summary(self) -> Dict[str, Any]:
"""Return a summary of this session's activity."""
tool_calls = self.memory.get("session_tool_calls", [])
return {
"status": "success",
"session_id": self.memory.get("session_id", str(uuid4())),
"tool_calls_made": len(tool_calls),
"tool_sequence": [tc.get("name") for tc in tool_calls],
"workspace_id": self._workspace_id,
"workspace_name": self._workspace_name,
"timestamp": datetime.now(timezone.utc).isoformat(),
}OrchestratorLLMAgent._extract_result method · python · L611-L632 (22 LOC)fabric_agent/agents/llm_orchestrator.py
def _extract_result(self, text_content: str, context: Optional[Dict]) -> Any:
"""
Check if the LLM output contains the APPROVAL_REQUIRED signal.
If the LLM output starts with "APPROVAL_REQUIRED:", extract the risk
summary and return a structured dict with approval_required=True.
The caller (MCP tool, CLI, tests) can check this flag and pause.
"""
if text_content and text_content.strip().startswith("APPROVAL_REQUIRED:"):
risk_summary = text_content.strip()[len("APPROVAL_REQUIRED:"):].strip()
logger.warning(f"[OrchestratorLLM] APPROVAL_REQUIRED: {risk_summary[:200]}")
return {
"approval_required": True,
"risk_summary": risk_summary,
"status": "awaiting_approval",
}
return {
"approval_required": False,
"summary": text_content.strip() if text_content else "Completed.",
"status": "completed",create_llm_orchestrator function · python · L639-L681 (43 LOC)fabric_agent/agents/llm_orchestrator.py
def create_llm_orchestrator(
config: Any,
fabric_client: Any,
memory: Optional[SharedMemory] = None,
workspace_id: Optional[str] = None,
workspace_name: Optional[str] = None,
operation_memory: Optional[Any] = None,
) -> OrchestratorLLMAgent:
"""
Factory: build an OrchestratorLLMAgent from config.
Reads llm_provider from config to select the right LLM client,
then wires everything into OrchestratorLLMAgent.
Args:
config: AgentConfig with llm_provider + provider-specific fields.
fabric_client: Initialized FabricApiClient.
memory: Shared memory (created fresh if None).
workspace_id: Default workspace ID.
workspace_name: Default workspace name.
operation_memory: Optional OperationMemory for RAG context.
Returns:
OrchestratorLLMAgent ready to call .run(task).
Example:
config = AgentConfig()
orchestrator = create_llm_orchestrator(config,ApprovalRequest.to_dict method · python · L81-L93 (13 LOC)fabric_agent/agents/orchestrator.py
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"workflow_id": self.workflow_id,
"change_summary": self.change_summary,
"risk_level": self.risk_level,
"impact_report": self.impact_report,
"requested_at": self.requested_at,
"status": self.status.value,
"approved_by": self.approved_by,
"approved_at": self.approved_at,
"rejection_reason": self.rejection_reason,
}WorkflowResult.to_dict method · python · L109-L120 (12 LOC)fabric_agent/agents/orchestrator.py
def to_dict(self) -> Dict[str, Any]:
return {
"workflow_id": self.workflow_id,
"workflow_type": self.workflow_type,
"status": self.status.value,
"started_at": self.started_at,
"completed_at": self.completed_at,
"agent_results": {k: v.to_dict() for k, v in self.agent_results.items()},
"approval_request": self.approval_request.to_dict() if self.approval_request else None,
"final_result": self.final_result,
"error": self.error,
}DeploymentPipelineManager.get_or_create_pipeline method · python · L142-L192 (51 LOC)fabric_agent/agents/orchestrator.py
async def get_or_create_pipeline(
self,
name: str,
source_workspace_id: str,
target_workspace_id: str,
) -> Dict:
"""Get existing or create new deployment pipeline."""
# List existing pipelines
try:
response = await self.client.get("/deploymentPipelines")
pipelines = response.get("value", [])
for pipeline in pipelines:
if pipeline.get("displayName") == name:
self._pipelines[name] = pipeline
return pipeline
except Exception as e:
logger.warning(f"Could not list pipelines: {e}")
# Create new pipeline
try:
pipeline = await self.client.post(
"/deploymentPipelines",
json={
"displayName": name,
"description": f"Deployment pipeline for refactoring operations",
}
)DeploymentPipelineManager.create_deployment_request method · python · L194-L240 (47 LOC)fabric_agent/agents/orchestrator.py
async def create_deployment_request(
self,
pipeline_id: str,
source_stage: int = 0,
target_stage: int = 1,
items: Optional[List[Dict]] = None,
note: str = "",
) -> Dict:
"""
Create a deployment request (triggers approval flow).
Args:
pipeline_id: Deployment pipeline ID
source_stage: Source stage order (0=DEV)
target_stage: Target stage order (1=PROD)
items: Specific items to deploy (None = all)
note: Deployment note
"""
payload = {
"sourceStageOrder": source_stage,
"isBackwardDeployment": False,
"newWorkspace": {
"name": None, # Use existing
},
"options": {
"allowCreateArtifact": False,
"allowOverwriteArtifact": True,
"allowOverwriteTargetArtifactLabel": False,
"allowPurgeData": False,Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
DeploymentPipelineManager.get_deployment_status method · python · L242-L255 (14 LOC)fabric_agent/agents/orchestrator.py
async def get_deployment_status(
self,
pipeline_id: str,
operation_id: str,
) -> Dict:
"""Get status of a deployment operation."""
try:
response = await self.client.get(
f"/deploymentPipelines/{pipeline_id}/operations/{operation_id}"
)
return response
except Exception as e:
logger.error(f"Could not get deployment status: {e}")
raiseDeploymentPipelineManager.wait_for_deployment method · python · L257-L287 (31 LOC)fabric_agent/agents/orchestrator.py
async def wait_for_deployment(
self,
pipeline_id: str,
operation_id: str,
timeout_seconds: int = 3600,
poll_interval: int = 30,
) -> Dict:
"""
Wait for deployment to complete.
Note: Fabric Deployment Pipelines handle approval in their UI.
This polls until the deployment is approved and completed.
"""
import time
start_time = time.time()
while time.time() - start_time < timeout_seconds:
status = await self.get_deployment_status(pipeline_id, operation_id)
state = status.get("status", "Unknown")
if state == "Succeeded":
return {"status": "approved", "result": status}
elif state == "Failed":
return {"status": "failed", "result": status}
elif state == "Cancelled":
return {"status": "rejected", "result": status}
logger.iSimpleApprovalManager.create_request method · python · L307-L330 (24 LOC)fabric_agent/agents/orchestrator.py
async def create_request(
self,
workflow_id: str,
change_summary: str,
risk_level: str,
impact_report: Dict,
) -> ApprovalRequest:
"""Create an approval request."""
request = ApprovalRequest(
id=str(uuid4()),
workflow_id=workflow_id,
change_summary=change_summary,
risk_level=risk_level,
impact_report=impact_report,
requested_at=datetime.now(timezone.utc).isoformat(),
)
self._requests[request.id] = request
logger.info(f"Approval request created: {request.id}")
logger.info(f" Change: {change_summary}")
logger.info(f" Risk: {risk_level}")
return requestSimpleApprovalManager.approve method · python · L332-L348 (17 LOC)fabric_agent/agents/orchestrator.py
async def approve(
self,
request_id: str,
approved_by: str = "user",
) -> ApprovalRequest:
"""Approve a request."""
request = self._requests.get(request_id)
if not request:
raise ValueError(f"Request not found: {request_id}")
request.status = ApprovalStatus.APPROVED
request.approved_by = approved_by
request.approved_at = datetime.now(timezone.utc).isoformat()
logger.info(f"Request approved: {request_id} by {approved_by}")
return requestSimpleApprovalManager.reject method · python · L350-L368 (19 LOC)fabric_agent/agents/orchestrator.py
async def reject(
self,
request_id: str,
rejected_by: str = "user",
reason: str = "",
) -> ApprovalRequest:
"""Reject a request."""
request = self._requests.get(request_id)
if not request:
raise ValueError(f"Request not found: {request_id}")
request.status = ApprovalStatus.REJECTED
request.approved_by = rejected_by
request.approved_at = datetime.now(timezone.utc).isoformat()
request.rejection_reason = reason
logger.info(f"Request rejected: {request_id} by {rejected_by}")
return requestSimpleApprovalManager.get_status method · python · L370-L375 (6 LOC)fabric_agent/agents/orchestrator.py
async def get_status(self, request_id: str) -> ApprovalStatus:
"""Get approval status."""
request = self._requests.get(request_id)
if not request:
raise ValueError(f"Request not found: {request_id}")
return request.statusAgentOrchestrator.__init__ method · python · L397-L448 (52 LOC)fabric_agent/agents/orchestrator.py
def __init__(
self,
fabric_client: Any,
workspace_name: str,
workspace_id: Optional[str] = None,
use_deployment_pipelines: bool = False,
):
"""
Initialize the orchestrator.
Args:
fabric_client: FabricApiClient instance
workspace_name: Default workspace name
workspace_id: Default workspace ID
use_deployment_pipelines: If True, use Fabric pipelines for approval
"""
self.fabric_client = fabric_client
self.workspace_name = workspace_name
self.workspace_id = workspace_id
# Shared memory for all agents
self.memory = SharedMemory()
# Initialize agents
self.discovery = DiscoveryAgent(
memory=self.memory,
fabric_client=fabric_client,
)
self.impact = ImpactAgent(memory=self.memory)
self.refactor = RefactorAgent(
AgentOrchestrator._execute_steps_4_and_5 method · python · L696-L817 (122 LOC)fabric_agent/agents/orchestrator.py
async def _execute_steps_4_and_5(
self,
workflow: "WorkflowResult",
params: Dict[str, Any],
) -> "WorkflowResult":
"""
Execute Steps 4 (Refactor) and 5 (Validation) of safe_refactor_workflow.
Extracted so that approve_workflow() can resume a paused workflow without
re-running discovery or impact analysis (Steps 1+2 already completed).
WHY THIS MATTERS:
Before this fix, approve_workflow() set status=APPROVED and returned —
the actual refactoring never happened. High-risk workflows were permanently
stuck in AWAITING_APPROVAL after approval.
Args:
workflow: The WorkflowResult object (already in APPROVED state).
params: Dict with refactor_type, model_name, old_name, new_name,
dry_run, model_id — all captured in safe_refactor_workflow().
Returns:
Updated WorkflowResult with COMPLETED, ROLLED_BACK, or FAILED statAbout: code-quality intelligence by Repobility · https://repobility.com
AgentOrchestrator.approve_workflow method · python · L819-L862 (44 LOC)fabric_agent/agents/orchestrator.py
async def approve_workflow(
self,
request_id: str,
approved_by: str = "user",
) -> WorkflowResult:
"""
Approve a pending workflow and continue execution.
Call this after receiving approval for a high-risk change.
Retrieves the saved continuation params and resumes Steps 4+5.
"""
if isinstance(self.approval, SimpleApprovalManager):
request = await self.approval.approve(request_id, approved_by)
workflow_id = request.workflow_id
workflow = self._workflows.get(workflow_id)
if not workflow:
raise ValueError(f"Workflow not found: {workflow_id}")
if workflow.status != WorkflowStatus.AWAITING_APPROVAL:
raise ValueError(f"Workflow not awaiting approval: {workflow.status}")
workflow.status = WorkflowStatus.APPROVED
logger.info(f"Workflow approved by {approved_by!r} — resuming execution: {workflow_idAgentOrchestrator.reject_workflow method · python · L864-L884 (21 LOC)fabric_agent/agents/orchestrator.py
async def reject_workflow(
self,
request_id: str,
rejected_by: str = "user",
reason: str = "",
) -> WorkflowResult:
"""Reject a pending workflow."""
if isinstance(self.approval, SimpleApprovalManager):
request = await self.approval.reject(request_id, rejected_by, reason)
workflow_id = request.workflow_id
workflow = self._workflows.get(workflow_id)
if workflow:
workflow.status = WorkflowStatus.REJECTED
workflow.completed_at = datetime.now(timezone.utc).isoformat()
workflow.error = f"Rejected by {rejected_by}: {reason}"
return workflow
else:
logger.warning("reject_workflow called with non-SimpleApprovalManager — no-op")
return NoneDiscoveryAgent.__init__ method · python · L48-L96 (49 LOC)fabric_agent/agents/specialized.py
def __init__(
self,
memory: SharedMemory,
fabric_client: Any, # FabricApiClient
):
self.fabric_client = fabric_client
tools = [
ToolDefinition(
name="list_workspaces",
description="List all accessible workspaces",
parameters={},
handler=self._list_workspaces,
),
ToolDefinition(
name="scan_workspace",
description="Scan a workspace and list all items",
parameters={
"workspace_id": {"type": "string", "description": "Workspace ID"},
},
handler=self._scan_workspace,
),
ToolDefinition(
name="build_dependency_graph",
description="Build dependency graph for a workspace",
parameters={
"workspace_id": {"type": "string", "description": "Workspace ID"},
DiscoveryAgent._list_workspaces method · python · L98-L106 (9 LOC)fabric_agent/agents/specialized.py
async def _list_workspaces(self) -> List[Dict]:
"""List all workspaces."""
response = await self.fabric_client.get("/workspaces")
workspaces = response.get("value", [])
# Cache in memory
self.memory.set("workspaces", workspaces)
return [{"id": w["id"], "name": w.get("displayName", "")} for w in workspaces]DiscoveryAgent._scan_workspace method · python · L108-L134 (27 LOC)fabric_agent/agents/specialized.py
async def _scan_workspace(self, workspace_id: str) -> Dict:
"""Scan a workspace for items."""
response = await self.fabric_client.get(f"/workspaces/{workspace_id}/items")
items = response.get("value", [])
# Categorize items
by_type = {}
for item in items:
item_type = item.get("type", "Unknown")
if item_type not in by_type:
by_type[item_type] = []
by_type[item_type].append({
"id": item["id"],
"name": item.get("displayName", ""),
"type": item_type,
})
result = {
"workspace_id": workspace_id,
"total_items": len(items),
"by_type": by_type,
}
# Cache in memory
self.memory.set(f"workspace_items_{workspace_id}", result)
return resultDiscoveryAgent._build_graph method · python · L136-L160 (25 LOC)fabric_agent/agents/specialized.py
async def _build_graph(
self,
workspace_id: str,
workspace_name: str,
include_measures: bool = True,
) -> Dict:
"""Build dependency graph for workspace."""
from fabric_agent.tools.workspace_graph import WorkspaceGraphBuilder
builder = WorkspaceGraphBuilder(include_measure_graph=include_measures)
graph = await builder.build(
client=self.fabric_client,
workspace_id=workspace_id,
workspace_name=workspace_name,
)
# Cache in memory
self.memory.set(f"dependency_graph_{workspace_id}", graph)
return {
"workspace_id": workspace_id,
"nodes": len(graph.get("nodes", [])),
"edges": len(graph.get("edges", [])),
"stats": graph.get("stats", {}),
}DiscoveryAgent._get_model_definition method · python · L162-L168 (7 LOC)fabric_agent/agents/specialized.py
async def _get_model_definition(self, workspace_id: str, model_id: str) -> Dict:
"""Get semantic model definition."""
response = await self.fabric_client.post(
f"/workspaces/{workspace_id}/semanticModels/{model_id}/getDefinition",
json={"format": "TMDL"}
)
return responseDiscoveryAgent.discover_workspace method · python · L170-L214 (45 LOC)fabric_agent/agents/specialized.py
async def discover_workspace(
self,
workspace_id: str,
workspace_name: str,
include_measures: bool = True,
) -> AgentResult:
"""
Full discovery of a workspace.
Combines scanning + graph building.
"""
import time
start = time.time()
try:
# Scan items
items = await self._scan_workspace(workspace_id)
# Build graph
graph = await self._build_graph(workspace_id, workspace_name, include_measures)
result = {
"workspace_id": workspace_id,
"workspace_name": workspace_name,
"items": items,
"graph": graph,
}
return AgentResult(
agent_name=self.name,
task="discover_workspace",
status=TaskStatus.COMPLETED,
result=result,
execRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
ImpactAgent.__init__ method · python · L248-L294 (47 LOC)fabric_agent/agents/specialized.py
def __init__(self, memory: SharedMemory, operation_memory=None):
"""
Args:
memory: Shared in-memory state (workspace graphs, etc.).
operation_memory: Optional OperationMemory instance for historical
risk calibration. If None, base score only.
"""
self._operation_memory = operation_memory
tools = [
ToolDefinition(
name="analyze_measure_rename",
description="Analyze impact of renaming a measure",
parameters={
"workspace_id": {"type": "string"},
"model_id": {"type": "string"},
"measure_name": {"type": "string"},
"new_name": {"type": "string"},
},
handler=self._analyze_measure_rename,
),
ToolDefinition(
name="analyze_notebook_change",
description="Analyze impaImpactAgent._analyze_measure_rename method · python · L296-L319 (24 LOC)fabric_agent/agents/specialized.py
async def _analyze_measure_rename(
self,
workspace_id: str,
model_id: str,
measure_name: str,
new_name: str,
) -> Dict:
"""Analyze impact of measure rename."""
from fabric_agent.tools.workspace_graph import GraphImpactAnalyzer
# Get cached graph
graph = self.memory.get(f"dependency_graph_{workspace_id}")
if not graph:
return {"error": "No cached graph. Run DiscoveryAgent first."}
analyzer = GraphImpactAnalyzer(graph)
impact = analyzer.analyze_measure_rename_impact(model_id, measure_name)
# Enhance with additional analysis
impact["proposed_new_name"] = new_name
impact["requires_approval"] = impact["risk_level"] in ("high_risk", "critical")
return impactImpactAgent._analyze_notebook_change method · python · L321-L335 (15 LOC)fabric_agent/agents/specialized.py
async def _analyze_notebook_change(
self,
workspace_id: str,
notebook_id: str,
) -> Dict:
"""Analyze impact of notebook change."""
from fabric_agent.tools.workspace_graph import GraphImpactAnalyzer
graph = self.memory.get(f"dependency_graph_{workspace_id}")
if not graph:
return {"error": "No cached graph. Run DiscoveryAgent first."}
analyzer = GraphImpactAnalyzer(graph)
return analyzer.analyze_notebook_change_impact(notebook_id)page 1 / 17next ›