Function bodies 807 total
ImpactAgent._calculate_risk_score method · python · L337-L427 (91 LOC)fabric_agent/agents/specialized.py
async def _calculate_risk_score(
self,
total_impact: int,
has_production_reports: bool,
has_downstream_dependencies: bool,
proposed_change: str = "",
) -> Dict:
"""
Calculate composite risk score — data-driven when history is available.
STEP 1: Algorithmic base score (always runs):
base = total_impact × 2
× 1.5 if production reports are affected
× 1.3 if downstream dependencies exist
STEP 2: Historical calibration (when operation_memory is wired in):
Retrieves top-K similar past operations via vector similarity.
If historical_failure_rate > 30%: score × (1 + failure_rate)
Also surfaces past failure reasons as recommendations.
WHY THIS MATTERS:
A "rename Revenue → Gross Revenue" might score medium_risk by
algorithm alone. But if history shows that 5 of the last 7 similar
renames rolled back due to DAX formulaImpactAgent.analyze_change method · python · L429-L478 (50 LOC)fabric_agent/agents/specialized.py
async def analyze_change(
self,
change_type: str,
workspace_id: str,
**kwargs,
) -> AgentResult:
"""
Analyze a proposed change.
Args:
change_type: "measure_rename", "notebook_change", etc.
workspace_id: Target workspace
**kwargs: Change-specific parameters
"""
import time
start = time.time()
try:
if change_type == "measure_rename":
impact = await self._analyze_measure_rename(
workspace_id=workspace_id,
model_id=kwargs.get("model_id", ""),
measure_name=kwargs.get("measure_name", ""),
new_name=kwargs.get("new_name", ""),
)
elif change_type == "notebook_change":
impact = await self._analyze_notebook_change(
workspace_id=workspace_id,
notebook_id=kwargs.RefactorAgent.__init__ method · python · L496-L532 (37 LOC)fabric_agent/agents/specialized.py
def __init__(
self,
memory: SharedMemory,
workspace_name: str,
):
self.workspace_name = workspace_name
self._executor = None
tools = [
ToolDefinition(
name="rename_measure",
description="Rename a measure with automatic reference updates",
parameters={
"model_name": {"type": "string"},
"old_name": {"type": "string"},
"new_name": {"type": "string"},
"dry_run": {"type": "boolean"},
},
handler=self._rename_measure,
),
ToolDefinition(
name="rollback",
description="Rollback to a checkpoint",
parameters={
"checkpoint_id": {"type": "string"},
"dry_run": {"type": "boolean"},
},
handler=self._rollback,
),
RefactorAgent._get_executor method · python · L534-L539 (6 LOC)fabric_agent/agents/specialized.py
def _get_executor(self):
"""Lazy load executor."""
if self._executor is None:
from fabric_agent.refactor.executor import RefactorExecutor
self._executor = RefactorExecutor(workspace_name=self.workspace_name)
return self._executorRefactorAgent._rename_measure method · python · L541-L557 (17 LOC)fabric_agent/agents/specialized.py
async def _rename_measure(
self,
model_name: str,
old_name: str,
new_name: str,
dry_run: bool = True,
) -> Dict:
"""Execute measure rename."""
executor = self._get_executor()
result = await executor.rename_measure(
model_name=model_name,
old_name=old_name,
new_name=new_name,
update_references=True,
dry_run=dry_run,
)
return result.to_dict()RefactorAgent._rollback method · python · L559-L570 (12 LOC)fabric_agent/agents/specialized.py
async def _rollback(
self,
checkpoint_id: str,
dry_run: bool = True,
) -> Dict:
"""Rollback to checkpoint."""
executor = self._get_executor()
result = await executor.rollback(
checkpoint_id=checkpoint_id,
dry_run=dry_run,
)
return result.to_dict()RefactorAgent.execute_refactor method · python · L572-L625 (54 LOC)fabric_agent/agents/specialized.py
async def execute_refactor(
self,
refactor_type: str,
dry_run: bool = True,
**kwargs,
) -> AgentResult:
"""
Execute a refactoring operation.
Args:
refactor_type: "rename_measure", "rollback"
dry_run: If True, only simulate
**kwargs: Refactor-specific parameters
"""
import time
start = time.time()
try:
if refactor_type == "rename_measure":
result = await self._rename_measure(
model_name=kwargs.get("model_name", ""),
old_name=kwargs.get("old_name", ""),
new_name=kwargs.get("new_name", ""),
dry_run=dry_run,
)
elif refactor_type == "rollback":
result = await self._rollback(
checkpoint_id=kwargs.get("checkpoint_id", ""),
dry_run=dry_run,
)Repobility — the code-quality scanner for AI-generated software · https://repobility.com
HealerAgent.__init__ method · python · L665-L723 (59 LOC)fabric_agent/agents/specialized.py
def __init__(
self,
memory: SharedMemory,
fabric_client: Any, # FabricApiClient
memory_manager: Optional[Any] = None, # MemoryManager
contracts_dir: Optional[str] = None,
stale_hours: int = 24,
):
self.fabric_client = fabric_client
self.memory_manager = memory_manager
tools = [
ToolDefinition(
name="scan_for_anomalies",
description="Scan workspace(s) for infrastructure anomalies",
parameters={
"workspace_ids": {"type": "array", "items": {"type": "string"}},
},
handler=self._scan_for_anomalies,
),
ToolDefinition(
name="build_healing_plan",
description="Build a healing plan from detected anomalies",
parameters={
"workspace_ids": {"type": "array", "items": {"type": "string"}},
}HealerAgent._get_monitor method · python · L725-L735 (11 LOC)fabric_agent/agents/specialized.py
def _get_monitor(self):
"""Lazy-construct SelfHealingMonitor on first use."""
if self._monitor is None:
from fabric_agent.healing.monitor import SelfHealingMonitor
self._monitor = SelfHealingMonitor(
client=self.fabric_client,
memory=self.memory_manager,
contracts_dir=self._contracts_dir,
stale_hours=self._stale_hours,
)
return self._monitorHealerAgent.run method · python · L737-L785 (49 LOC)fabric_agent/agents/specialized.py
async def run(self, task: str, context: Dict[str, Any]) -> AgentResult:
"""
Execute a healing task.
Args:
task: One of "health_scan", "scan_anomalies", "build_plan", "execute_plan".
context: Must include "workspace_ids". Optional "dry_run" (default True).
Returns:
AgentResult with result=HealthReport (or List[Anomaly] / HealingPlan).
"""
import time
start = time.time()
workspace_ids = context.get("workspace_ids", [])
dry_run = context.get("dry_run", True)
try:
if task in ("health_scan", "get_health_report"):
result = await self._get_health_report(
workspace_ids=workspace_ids, dry_run=dry_run
)
elif task == "scan_anomalies":
result = await self._scan_for_anomalies(workspace_ids=workspace_ids)
elif task == "build_plan":
result = await self._build_heHealerAgent._build_healing_plan method · python · L793-L798 (6 LOC)fabric_agent/agents/specialized.py
async def _build_healing_plan(self, workspace_ids: List[str], **_) -> Dict:
"""Scan + build plan, return serializable plan dict."""
monitor = self._get_monitor()
anomalies = await monitor.detector.scan(workspace_ids)
plan = monitor.healer.build_plan(anomalies)
return plan.to_dict()HealerAgent._execute_healing_plan method · python · L800-L808 (9 LOC)fabric_agent/agents/specialized.py
async def _execute_healing_plan(
self, workspace_ids: List[str], dry_run: bool = True, **_
) -> Dict:
"""Scan + build plan + execute, return result dict."""
monitor = self._get_monitor()
anomalies = await monitor.detector.scan(workspace_ids)
plan = monitor.healer.build_plan(anomalies)
result = await monitor.healer.execute_healing_plan(plan, dry_run=dry_run)
return result.to_dict()HealerAgent._get_health_report method · python · L810-L816 (7 LOC)fabric_agent/agents/specialized.py
async def _get_health_report(
self, workspace_ids: List[str], dry_run: bool = True, **_
) -> Dict:
"""Full scan cycle, return HealthReport as dict."""
monitor = self._get_monitor()
report = await monitor.run_once(workspace_ids, dry_run=dry_run)
return report.to_dict()QueryResult.compute_checksum method · python · L38-L43 (6 LOC)fabric_agent/agents/validation.py
def compute_checksum(self) -> str:
"""Compute a checksum of the result data."""
if self.data is None:
return ""
data_str = json.dumps(self.data, sort_keys=True, default=str)
return hashlib.sha256(data_str.encode()).hexdigest()[:16]_try_import_sempy function · python · L84-L90 (7 LOC)fabric_agent/agents/validation.py
def _try_import_sempy():
"""Try to import SemPy."""
try:
import sempy.fabric as fabric
return fabric
except ImportError:
return NoneRepobility analyzer · published findings · https://repobility.com
ValidationAgent._ensure_sempy method · python · L120-L126 (7 LOC)fabric_agent/agents/validation.py
def _ensure_sempy(self) -> Any:
"""Ensure SemPy is available."""
if self._fabric is None:
self._fabric = _try_import_sempy()
if self._fabric is None:
raise RuntimeError("SemPy required for validation")
return self._fabricValidationAgent.generate_tests_for_measure method · python · L132-L189 (58 LOC)fabric_agent/agents/validation.py
def generate_tests_for_measure(
self,
measure_name: str,
table_name: Optional[str] = None,
) -> List[ValidationTest]:
"""
Generate standard test queries for a measure.
Args:
measure_name: The measure to test.
table_name: Optional table context.
Returns:
List of validation tests.
"""
tests = []
# Test 1: Simple evaluation
tests.append(ValidationTest(
test_id=str(uuid4()),
name=f"Evaluate {measure_name}",
description=f"Simple evaluation of [{measure_name}]",
dax_query=f'EVALUATE ROW("Value", [{measure_name}])',
expected_behavior="results_match",
))
# Test 2: Evaluate with table context (if provided)
if table_name:
tests.append(ValidationTest(
test_id=str(uuid4()),
name=f"{measure_name} by {table_nValidationAgent.generate_tests_for_refactor method · python · L191-L217 (27 LOC)fabric_agent/agents/validation.py
def generate_tests_for_refactor(
self,
old_name: str,
new_name: str,
affected_measures: List[str],
) -> List[ValidationTest]:
"""
Generate comprehensive tests for a refactoring operation.
Args:
old_name: Original measure name.
new_name: New measure name.
affected_measures: List of measures that reference the target.
Returns:
List of validation tests.
"""
tests = []
# Test the renamed measure itself (using new name after refactor)
tests.extend(self.generate_tests_for_measure(new_name))
# Test each affected measure
for measure in affected_measures[:10]: # Limit to 10
tests.extend(self.generate_tests_for_measure(measure))
return testsValidationAgent.execute_query method · python · L223-L275 (53 LOC)fabric_agent/agents/validation.py
async def execute_query(
self,
model_name: str,
query: str,
) -> QueryResult:
"""
Execute a DAX query and return results.
Args:
model_name: Semantic model name.
query: DAX query to execute.
Returns:
QueryResult with data or error.
"""
import time
start_time = time.time()
try:
fabric = self._ensure_sempy()
# Execute using SemPy evaluate_dax
df = fabric.evaluate_dax(
dataset=model_name,
dax_string=query,
workspace=self.workspace_name,
)
execution_time = (time.time() - start_time) * 1000
# Convert DataFrame to list of dicts
data = df.to_dict(orient='records') if df is not None else []
result = QueryResult(
query=query,
ValidationAgent.capture_state method · python · L277-L299 (23 LOC)fabric_agent/agents/validation.py
async def capture_state(
self,
model_name: str,
tests: List[ValidationTest],
) -> Dict[str, QueryResult]:
"""
Capture the current state by running all tests.
Args:
model_name: Semantic model name.
tests: List of tests to run.
Returns:
Dictionary mapping test_id to QueryResult.
"""
results = {}
for test in tests:
logger.debug(f"Capturing: {test.name}")
result = await self.execute_query(model_name, test.dax_query)
results[test.test_id] = result
return resultsValidationAgent.validate method · python · L305-L375 (71 LOC)fabric_agent/agents/validation.py
async def validate(
self,
model_name: str,
tests: List[ValidationTest],
before_results: Dict[str, QueryResult],
operation_id: str = "",
) -> ValidationReport:
"""
Validate by comparing before and after results.
Args:
model_name: Semantic model name.
tests: List of tests to run.
before_results: Results captured before refactoring.
operation_id: ID of the refactoring operation.
Returns:
ValidationReport with comparison details.
"""
import time
start_time = time.time()
comparisons: List[TestComparison] = []
tests_passed = 0
tests_failed = 0
for test in tests:
before = before_results.get(test.test_id)
if not before:
logger.warning(f"No before result for test: {test.name}")
continue
ValidationAgent._compare_results method · python · L377-L421 (45 LOC)fabric_agent/agents/validation.py
def _compare_results(
self,
test: ValidationTest,
before: QueryResult,
after: QueryResult,
) -> Tuple[bool, str]:
"""
Compare before and after results.
Returns:
Tuple of (passed, difference_summary).
"""
# If either query failed, compare error states
if not before.success and not after.success:
return True, "Both queries failed (expected if measure was removed)"
if before.success != after.success:
if not after.success:
return False, f"Query failed after refactor: {after.error_message}"
else:
return False, f"Query started working (was failing before)"
# Both succeeded - compare based on expected behavior
if test.expected_behavior == "no_error":
return True, "Query executed without error"
elif test.expected_behavior == "row_count_match":
ValidationAgent._detailed_comparison method · python · L423-L461 (39 LOC)fabric_agent/agents/validation.py
def _detailed_comparison(
self,
before_data: Optional[List[Dict]],
after_data: Optional[List[Dict]],
tolerance: float,
) -> Optional[str]:
"""
Perform detailed comparison of result data.
Returns:
Difference description or None if equal.
"""
if before_data is None and after_data is None:
return None
if before_data is None or after_data is None:
return "One result is None"
if len(before_data) != len(after_data):
return f"Row count differs: {len(before_data)} vs {len(after_data)}"
# Compare row by row
for i, (before_row, after_row) in enumerate(zip(before_data, after_data)):
if set(before_row.keys()) != set(after_row.keys()):
return f"Row {i}: Column mismatch"
for key in before_row:
before_val = before_row[key]
aRepobility · code-quality intelligence · https://repobility.com
ValidationAgent._generate_recommendations method · python · L463-L489 (27 LOC)fabric_agent/agents/validation.py
def _generate_recommendations(
self,
comparisons: List[TestComparison],
) -> List[str]:
"""Generate recommendations based on test results."""
recommendations = []
failed = [c for c in comparisons if not c.passed]
if not failed:
recommendations.append("✅ All validation tests passed. Safe to proceed.")
else:
recommendations.append(f"⚠️ {len(failed)} test(s) failed. Review before proceeding.")
for comp in failed[:3]: # Show first 3
recommendations.append(f" - {comp.test.name}: {comp.difference_summary}")
if len(failed) > 3:
recommendations.append(f" ... and {len(failed) - 3} more failures")
recommendations.append("")
recommendations.append("Recommended actions:")
recommendations.append("1. Review the DAX expressions that changed")
recommendavalidate_refactor function · python · L496-L521 (26 LOC)fabric_agent/agents/validation.py
async def validate_refactor(
workspace_name: str,
model_name: str,
measure_name: str,
before_state: Dict[str, QueryResult],
) -> ValidationReport:
"""
Quick validation after a refactor.
Example:
>>> # Before refactor
>>> validator = ValidationAgent(workspace_name)
>>> tests = validator.generate_tests_for_measure("Revenue")
>>> before = await validator.capture_state(model_name, tests)
>>>
>>> # ... do refactor ...
>>>
>>> # Validate
>>> report = await validate_refactor(
... workspace_name, model_name, "Revenue", before
... )
>>> print(f"Passed: {report.overall_passed}")
"""
validator = ValidationAgent(workspace_name)
tests = validator.generate_tests_for_measure(measure_name)
return await validator.validate(model_name, tests, before_state)FabricApiError.__init__ method · python · L39-L47 (9 LOC)fabric_agent/api/fabric_client.py
def __init__(
self,
message: str,
status_code: Optional[int] = None,
response_body: Optional[Any] = None,
):
super().__init__(message)
self.status_code = status_code
self.response_body = response_bodyFabricResponse.__init__ method · python · L62-L81 (20 LOC)fabric_agent/api/fabric_client.py
def __init__(self, raw: httpx.Response):
self.raw = raw
data: Dict[str, Any] = {}
if raw.content:
try:
parsed = raw.json()
except Exception:
parsed = {}
if parsed is None:
data = {}
elif isinstance(parsed, dict):
data = parsed
elif isinstance(parsed, list):
data = {"value": parsed}
else:
data = {"value": parsed}
super().__init__(data)FabricApiClient.__init__ method · python · L115-L145 (31 LOC)fabric_agent/api/fabric_client.py
def __init__(
self,
auth_config: FabricAuthConfig,
base_url: str = "https://api.fabric.microsoft.com",
api_version: str = "/v1",
timeout_seconds: int = 30,
max_retries: int = 3,
circuit_breaker_threshold: int = 5,
circuit_breaker_reset_seconds: float = 60.0,
):
self.auth_config = auth_config
self.base_url = base_url.rstrip("/") + api_version
self.timeout = httpx.Timeout(timeout_seconds)
self._token: Optional[str] = None
self._token_expiry: float = 0.0
self._credential: Any = None
self._client: Optional[httpx.AsyncClient] = None
self._initialized = False
# Resilience primitives — every HTTP call goes through both.
# RetryPolicy: backs off on 429/5xx with full jitter.
# CircuitBreaker: stops hammering a degraded Fabric API after
# `circuit_breaker_threshold` consecutive failures.
self._retry_poFabricApiClient.initialize method · python · L147-L160 (14 LOC)fabric_agent/api/fabric_client.py
async def initialize(self) -> None:
if self._initialized:
return
logger.info("Initializing Fabric API client")
self._credential = await self._get_credential()
await self._refresh_token()
self._client = httpx.AsyncClient(
timeout=self.timeout,
headers={"Content-Type": "application/json"},
)
self._initialized = True
logger.info("Fabric API client initialized successfully")FabricApiClient._get_credential method · python · L162-L178 (17 LOC)fabric_agent/api/fabric_client.py
async def _get_credential(self):
from azure.identity.aio import AzureCliCredential, ClientSecretCredential, DefaultAzureCredential
# Service Principal
if self.auth_config.client_secret:
return ClientSecretCredential(
tenant_id=self.auth_config.tenant_id,
client_id=self.auth_config.client_id,
client_secret=self.auth_config.client_secret.get_secret_value(),
)
# Interactive mode -> rely on `az login`
if self.auth_config.auth_mode == AuthMode.INTERACTIVE:
return AzureCliCredential()
# Default chain (exclude browser)
return DefaultAzureCredential(exclude_interactive_browser_credential=True)FabricApiClient._refresh_token method · python · L180-L190 (11 LOC)fabric_agent/api/fabric_client.py
async def _refresh_token(self) -> None:
# refresh if expiring within 60 seconds
if self._token and time.time() < self._token_expiry - 60:
return
logger.debug("Refreshing access token")
scopes = ["https://api.fabric.microsoft.com/.default"]
token_response = await self._credential.get_token(*scopes)
self._token = token_response.token
self._token_expiry = float(token_response.expires_on)
logger.debug(f"Token refreshed, expires at {self._token_expiry}")Repobility · MCP-ready · https://repobility.com
FabricApiClient._get_headers method · python · L196-L201 (6 LOC)fabric_agent/api/fabric_client.py
async def _get_headers(self) -> Dict[str, str]:
await self._refresh_token()
return {
"Authorization": f"Bearer {self._token}",
"Content-Type": "application/json",
}FabricApiClient.request method · python · L203-L263 (61 LOC)fabric_agent/api/fabric_client.py
async def request(
self,
method: str,
path: str,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
) -> httpx.Response:
"""
Execute an HTTP request with automatic retry + circuit breaking.
All GET/POST/PUT/DELETE calls route through here, so resilience is
applied uniformly — no per-call retry logic needed at call sites.
RetryPolicy: Retries 429/5xx with exponential backoff + full jitter.
CircuitBreaker: Fast-fails once `failure_threshold` consecutive failures
occur, giving the Fabric API time to recover.
FAANG PARALLEL:
AWS SDK: every API call goes through the retry handler middleware.
Google gRPC: retry interceptor wraps all unary calls.
Stripe API client: automatic retry with idempotency keys.
"""
self._ensure_initialized()
url = self.base_url + (path if path.starFabricApiClient.post_raw method · python · L272-L281 (10 LOC)fabric_agent/api/fabric_client.py
async def post_raw(
self,
path: str,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None, # alias
) -> httpx.Response:
if json is not None and json_data is None:
json_data = json
return await self.request("POST", path, json_data=json_data, params=params)FabricApiClient.put_raw method · python · L283-L291 (9 LOC)fabric_agent/api/fabric_client.py
async def put_raw(
self,
path: str,
json_data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None, # alias
) -> httpx.Response:
if json is not None and json_data is None:
json_data = json
return await self.request("PUT", path, json_data=json_data)FabricApiClient.get method · python · L300-L308 (9 LOC)fabric_agent/api/fabric_client.py
async def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> FabricResponse:
resp = await self.get_raw(path, params=params)
if resp.status_code != 200:
raise FabricApiError(
f"GET {path} failed with status {resp.status_code}",
status_code=resp.status_code,
response_body=resp.json() if resp.content else None,
)
return FabricResponse(resp)FabricApiClient.post method · python · L310-L324 (15 LOC)fabric_agent/api/fabric_client.py
async def post(
self,
path: str,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None, # alias
) -> FabricResponse:
resp = await self.post_raw(path, json_data=json_data, params=params, json=json)
if resp.status_code not in (200, 201, 202):
raise FabricApiError(
f"POST {path} failed with status {resp.status_code}",
status_code=resp.status_code,
response_body=resp.json() if resp.content else None,
)
return FabricResponse(resp)FabricApiClient.put method · python · L326-L339 (14 LOC)fabric_agent/api/fabric_client.py
async def put(
self,
path: str,
json_data: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None, # alias
) -> FabricResponse:
resp = await self.put_raw(path, json_data=json_data, json=json)
if resp.status_code not in (200, 201, 202):
raise FabricApiError(
f"PUT {path} failed with status {resp.status_code}",
status_code=resp.status_code,
response_body=resp.json() if resp.content else None,
)
return FabricResponse(resp)FabricApiClient.put_binary method · python · L341-L390 (50 LOC)fabric_agent/api/fabric_client.py
async def put_binary(
self,
path: str,
data: bytes,
content_type: str = "application/octet-stream",
) -> httpx.Response:
"""
Upload raw bytes to a Fabric endpoint (e.g., OneLake Files API).
Used by build_and_upload.py and bootstrap to push .whl files into
Lakehouse Files/wheels/ so Notebooks can %pip install them.
Standard REST pattern: PUT with Content-Type: application/octet-stream.
The regular put() method serialises JSON — this bypasses that and
streams raw bytes instead.
FAANG PATTERN:
Same as AWS S3 PutObject — direct binary streaming upload.
Compute reads from where data lives (OneLake), not from the
developer's machine.
Args:
path: API path, e.g. /workspaces/{ws}/lakehouses/{lh}/files/...
data: Raw bytes to upload (e.g., wheel file contents).
content_type: MIME type (default: application/octetRepobility — the code-quality scanner for AI-generated software · https://repobility.com
FabricApiClient.delete method · python · L392-L400 (9 LOC)fabric_agent/api/fabric_client.py
async def delete(self, path: str) -> FabricResponse:
resp = await self.delete_raw(path)
if resp.status_code not in (200, 202, 204):
raise FabricApiError(
f"DELETE {path} failed with status {resp.status_code}",
status_code=resp.status_code,
response_body=resp.json() if resp.content else None,
)
return FabricResponse(resp)FabricApiClient.wait_for_lro method · python · L413-L524 (112 LOC)fabric_agent/api/fabric_client.py
async def wait_for_lro(
self,
initial_response: Union[httpx.Response, FabricResponse],
poll_interval: float = 1.5,
timeout: int = 300,
) -> Dict[str, Any]:
raw = initial_response.raw if isinstance(initial_response, FabricResponse) else initial_response
# Not an LRO
if raw.status_code in (200, 201):
return raw.json() if raw.content else {}
# Unexpected status
if raw.status_code != 202:
raw.raise_for_status()
return raw.json() if raw.content else {}
# LRO headers
location = raw.headers.get("Location") or raw.headers.get("location")
op_id = raw.headers.get("x-ms-operation-id") or raw.headers.get("x-ms-operationid")
# Try to extract from Location if missing
if not op_id:
op_id = self._extract_operation_id(location)
# Prefer documented operations endpoint on api.fabric.microsoft.com
op_url = f"{self.base_FabricApiClient.post_with_lro method · python · L526-L554 (29 LOC)fabric_agent/api/fabric_client.py
async def post_with_lro(
self,
path: str,
json_data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
json: Optional[Dict[str, Any]] = None, # alias
*,
lro_poll_seconds: float = 1.5,
max_polls: int = 200,
timeout: Optional[int] = None,
) -> Dict[str, Any]:
"""
POST and wait for Long Running Operation completion.
- lro_poll_seconds: poll interval
- max_polls: used to derive timeout if timeout is not provided
"""
resp = await self.post_raw(path, json_data=json_data, params=params, json=json)
poll_interval = float(lro_poll_seconds)
if timeout is None:
timeout = int(max(30.0, poll_interval * float(max_polls)))
if resp.status_code == 202:
return await self.wait_for_lro(resp, poll_interval=poll_interval, timeout=timeout)
# Non-LRO create/update responses
resp.raise_for_statFabricApiClient.close method · python · L556-L565 (10 LOC)fabric_agent/api/fabric_client.py
async def close(self) -> None:
if self._client:
await self._client.aclose()
self._client = None
if self._credential and hasattr(self._credential, "close"):
await self._credential.close()
self._initialized = False
logger.info("Fabric API client closed")FabricApiClient.__aenter__ method · python · L571-L584 (14 LOC)fabric_agent/api/fabric_client.py
async def __aenter__(self) -> "FabricApiClient":
"""
Support ``async with FabricApiClient(...) as client:`` usage.
Calls initialize() so callers don't need a separate await step,
and guarantees close() is called even if the body raises.
FAANG PATTERN:
boto3 Session, google-cloud clients, and httpx.AsyncClient all
expose async context managers for the same reason — guaranteed
cleanup without try/finally boilerplate at every call site.
"""
await self.initialize()
return selfSyncFabricApiClient.__init__ method · python · L600-L615 (16 LOC)fabric_agent/api/fabric_client.py
def __init__(
self,
auth_config: FabricAuthConfig,
base_url: str = "https://api.fabric.microsoft.com",
api_version: str = "/v1",
timeout_seconds: int = 30,
):
import requests
self.auth_config = auth_config
self.base_url = base_url.rstrip("/") + api_version
self.timeout = timeout_seconds
self._token: Optional[str] = None
self._session = requests.Session()
self._session.headers.update({"Content-Type": "application/json"})SyncFabricApiClient._get_token_sync method · python · L617-L633 (17 LOC)fabric_agent/api/fabric_client.py
def _get_token_sync(self) -> str:
from azure.identity import AzureCliCredential, ClientSecretCredential, DefaultAzureCredential
if self.auth_config.client_secret:
credential = ClientSecretCredential(
tenant_id=self.auth_config.tenant_id,
client_id=self.auth_config.client_id,
client_secret=self.auth_config.client_secret.get_secret_value(),
)
else:
if self.auth_config.auth_mode == AuthMode.INTERACTIVE:
credential = AzureCliCredential()
else:
credential = DefaultAzureCredential(exclude_interactive_browser_credential=True)
scopes = ["https://api.fabric.microsoft.com/.default"]
return credential.get_token(*scopes).token_to_openai_messages function · python · L74-L152 (79 LOC)fabric_agent/api/llm_providers.py
def _to_openai_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Convert Anthropic-format message history to OpenAI format.
Anthropic format (what BaseAgent stores):
assistant: {"role": "assistant", "content": [TextBlock|dict, ToolUseBlock|dict, ...]}
tool results: {"role": "user", "content": [{"type": "tool_result", "tool_use_id": "...", "content": "..."}]}
OpenAI format (what Ollama/Azure OpenAI expect):
assistant: {"role": "assistant", "content": "text", "tool_calls": [...]}
tool results: {"role": "tool", "tool_call_id": "...", "content": "..."}
"""
result: List[Dict[str, Any]] = []
for msg in messages:
role = msg.get("role", "user")
content = msg.get("content", "")
# Simple string content — compatible as-is
if isinstance(content, str):
result.append({"role": role, "content": content})
continue
# List of content blocks
if isinstance(conRepobility analyzer · published findings · https://repobility.com
_to_openai_tools function · python · L155-L172 (18 LOC)fabric_agent/api/llm_providers.py
def _to_openai_tools(tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Convert Anthropic tool schemas to OpenAI function calling format.
Anthropic: {"name": "...", "description": "...", "input_schema": {...}}
OpenAI: {"type": "function", "function": {"name": "...", "description": "...", "parameters": {...}}}
"""
result = []
for tool in tools:
result.append({
"type": "function",
"function": {
"name": tool["name"],
"description": tool.get("description", ""),
"parameters": tool.get("input_schema", {"type": "object", "properties": {}}),
},
})
return result_from_openai_response function · python · L175-L233 (59 LOC)fabric_agent/api/llm_providers.py
def _from_openai_response(response: Dict[str, Any]) -> Dict[str, Any]:
"""
Convert an OpenAI-format chat completion response to Anthropic format.
OpenAI response:
choices[0].message.content: "text" | None
choices[0].message.tool_calls: [{"id": "...", "function": {"name": "...", "arguments": "{...}"}}]
choices[0].finish_reason: "stop" | "tool_calls" | "length"
usage.prompt_tokens / completion_tokens
Anthropic format (what BaseAgent expects):
content: [{"type": "text", "text": "..."}, {"type": "tool_use", "id": "...", "name": "...", "input": {...}}]
stop_reason: "end_turn" | "tool_use" | "max_tokens"
usage: {"input_tokens": N, "output_tokens": N}
"""
choices = response.get("choices", [{}])
choice = choices[0] if choices else {}
message = choice.get("message", {})
finish_reason = choice.get("finish_reason", "stop")
usage = response.get("usage", {})
content: List[Dict[str, Any]] = []
# Text conten_extract_text function · python · L236-L242 (7 LOC)fabric_agent/api/llm_providers.py
def _extract_text(block: Any) -> str:
"""Extract text string from a content block (dict or Anthropic object)."""
if isinstance(block, dict):
return block.get("text", "") if block.get("type") == "text" else ""
if hasattr(block, "type") and block.type == "text":
return getattr(block, "text", "")
return ""