From 3a61f133eb656414a468de02b50620d41c3dd0d4 Mon Sep 17 00:00:00 2001 From: mavrick-1 Date: Thu, 13 Nov 2025 10:52:19 +0530 Subject: [PATCH] Refactor and enhance type hints across multiple functions - Added type hints to constructors and methods in various classes for better type checking and code clarity. - Improved function signatures in `base_functions.py`, `binding_policy_management.py`, `deploy_to.py`, `get_cluster_labels.py`, `kubeconfig.py`, and others. - Cleaned up code formatting by removing unnecessary blank lines and ensuring consistent indentation. - Updated error handling and response structures in functions like `kubestellar_management.py`, `namespace_utils.py`, and `multicluster_logs.py`. - Enhanced readability by breaking long lines and restructuring complex conditions. - Added missing type hints for dictionaries and lists in several functions to improve code maintainability. --- src/agent.py | 99 ++++++++++++------- src/llm_providers/base.py | 10 +- src/llm_providers/config.py | 5 +- src/llm_providers/gemini.py | 76 +++++++++----- src/shared/base_functions.py | 10 +- src/shared/functions/__init__.py | 2 +- .../functions/binding_policy_management.py | 97 ++++++++++++------ .../functions/check_cluster_upgrades.py | 4 +- src/shared/functions/deploy_to.py | 20 ++-- src/shared/functions/describe_resource.py | 4 +- src/shared/functions/get_cluster_labels.py | 81 ++++++++------- src/shared/functions/gvrc_discovery.py | 8 +- src/shared/functions/helm/list.py | 20 +++- src/shared/functions/helm_deploy.py | 49 +++++---- src/shared/functions/kubeconfig.py | 12 +-- .../functions/kubestellar_management.py | 53 +++++----- src/shared/functions/multicluster_create.py | 4 +- src/shared/functions/multicluster_logs.py | 32 +++--- src/shared/functions/namespace_utils.py | 33 +++++-- tests/test_kubestellar_management.py | 7 +- tests/test_namespace_utils.py | 12 ++- 21 files changed, 407 insertions(+), 231 deletions(-) diff --git a/src/agent.py b/src/agent.py index 43cbf05..151a94e 100644 --- a/src/agent.py +++ b/src/agent.py @@ -25,7 +25,8 @@ from src.shared.base_functions import function_registry from src.shared.functions import initialize_functions -T = TypeVar('T') +T = TypeVar("T") + class AgentChat: """Interactive agent chat interface.""" @@ -40,6 +41,7 @@ def __init__(self, provider_name: Optional[str] = None): if sys.stdin and sys.stdin.isatty(): import termios import tty + self._old_tty_settings = termios.tcgetattr(sys.stdin) tty.setcbreak(sys.stdin.fileno()) @@ -74,17 +76,17 @@ async def _wait_for_escape(self) -> None: other coroutines are running. """ if not sys.stdin.isatty(): - await asyncio.Future() # block indefinitely if not a TTY + await asyncio.Future() # block indefinitely if not a TTY return loop = asyncio.get_running_loop() fut: asyncio.Future[None] = loop.create_future() - def _on_key_press() -> None: # called by add_reader + def _on_key_press() -> None: # called by add_reader # Non-blocking read try: - ch = sys.stdin.read(1) # read one raw byte - if ch == "\x1b": # ESC + ch = sys.stdin.read(1) # read one raw byte + if ch == "\x1b": # ESC if not fut.done(): fut.set_result(None) except OSError: @@ -93,7 +95,7 @@ def _on_key_press() -> None: # called by add_reader loop.add_reader(sys.stdin.fileno(), _on_key_press) try: - await fut # wait until ESC pressed + await fut # wait until ESC pressed finally: loop.remove_reader(sys.stdin.fileno()) @@ -104,20 +106,19 @@ async def _run_with_cancel(self, coro: Awaitable[T]) -> Optional[T]: Otherwise return the coroutine’s result. """ task = asyncio.create_task(coro) - esc = asyncio.create_task(self._wait_for_escape()) + esc = asyncio.create_task(self._wait_for_escape()) - done, _ = await asyncio.wait({task, esc}, - return_when=asyncio.FIRST_COMPLETED) + done, _ = await asyncio.wait({task, esc}, return_when=asyncio.FIRST_COMPLETED) - if esc in done: # user hit ESC + if esc in done: # user hit ESC task.cancel() self.console.print("[yellow]⏹ Operation cancelled (ESC)[/yellow]") try: - await task # swallow CancelledError + await task # swallow CancelledError except asyncio.CancelledError: pass return None - else: # task finished normally + else: # task finished normally esc.cancel() return await task @@ -167,11 +168,13 @@ def _format_prompt(self) -> List[tuple]: ] ) - async def _execute_function(self, function_name: str, args: Dict[str, Any]) -> tuple[str, float]: + async def _execute_function( + self, function_name: str, args: Dict[str, Any] + ) -> tuple[str, float]: """Execute a KubeStellar function.""" function = function_registry.get(function_name) if not function: - return f"Error: Unknown function '{function_name}'",0.0 + return f"Error: Unknown function '{function_name}'", 0.0 try: start = time.perf_counter() @@ -181,7 +184,6 @@ async def _execute_function(self, function_name: str, args: Dict[str, Any]) -> t except Exception as e: return f"Error executing {function_name}: {str(e)}", 0.0 - def _prepare_tools(self) -> List[Dict[str, Any]]: """Prepare available tools for the LLM.""" tools = [] @@ -274,8 +276,8 @@ async def _handle_message(self, user_input: str): response = await self._run_with_cancel( self.provider.generate( messages=conversation, - tools=tools, - stream=False, + tools=tools, + stream=False, ) ) if response is None: @@ -298,12 +300,14 @@ async def _handle_message(self, user_input: str): with self.console.status( f"[dim]⚙️ Executing: {tool_call.name}[/dim]", spinner="dots" ): - result,elapsed = await self._run_with_cancel( - self._execute_function(tool_call.name, tool_call.arguments) + result, elapsed = await self._run_with_cancel( + self._execute_function( + tool_call.name, tool_call.arguments + ) ) - if result is None: + if result is None: return - + tool_results.append( {"call_id": tool_call.id, "content": result} ) @@ -546,17 +550,39 @@ async def run(self): """Run the interactive chat loop.""" # ASCII art for KubeStellar with proper formatting self.console.print() - self.console.print("[cyan]╭─────────────────────────────────────────────────────────────────────────────────────────────╮[/cyan]") - self.console.print("[cyan]│[/cyan] [cyan]│[/cyan]") - self.console.print("[cyan]│[/cyan] [bold cyan]██╗ ██╗██╗ ██╗██████╗ ███████╗███████╗████████╗███████╗██╗ ██╗ █████╗ ██████╗[/bold cyan] [cyan]│[/cyan]") - self.console.print("[cyan]│[/cyan] [bold cyan]██║ ██╔╝██║ ██║██╔══██╗██╔════╝██╔════╝╚══██╔══╝██╔════╝██║ ██║ ██╔══██╗██╔══██╗[/bold cyan] [cyan]│[/cyan]") - self.console.print("[cyan]│[/cyan] [bold cyan]█████╔╝ ██║ ██║██████╔╝█████╗ ███████╗ ██║ █████╗ ██║ ██║ ███████║██████╔╝[/bold cyan] [cyan]│[/cyan]") - self.console.print("[cyan]│[/cyan] [bold cyan]██╔═██╗ ██║ ██║██╔══██╗██╔══╝ ╚════██║ ██║ ██╔══╝ ██║ ██║ ██╔══██║██╔══██╗[/bold cyan] [cyan]│[/cyan]") - self.console.print("[cyan]│[/cyan] [bold cyan]██║ ██╗╚██████╔╝██████╔╝███████╗███████║ ██║ ███████╗███████╗███████╗██║ ██║██║ ██║[/bold cyan] [cyan]│[/cyan]") - self.console.print("[cyan]│[/cyan] [bold cyan]╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚══════╝╚══════╝ ╚═╝ ╚══════╝╚══════╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝[/bold cyan] [cyan]│[/cyan]") - self.console.print("[cyan]│[/cyan] [cyan]│[/cyan]") - self.console.print("[cyan]│[/cyan] [dim]🌟 Multi-Cluster Kubernetes Management Agent 🌟[/dim] [cyan]│[/cyan]") - self.console.print("[cyan]╰─────────────────────────────────────────────────────────────────────────────────────────────╯[/cyan]") + self.console.print( + "[cyan]╭─────────────────────────────────────────────────────────────────────────────────────────────╮[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [bold cyan]██╗ ██╗██╗ ██╗██████╗ ███████╗███████╗████████╗███████╗██╗ ██╗ █████╗ ██████╗[/bold cyan] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [bold cyan]██║ ██╔╝██║ ██║██╔══██╗██╔════╝██╔════╝╚══██╔══╝██╔════╝██║ ██║ ██╔══██╗██╔══██╗[/bold cyan] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [bold cyan]█████╔╝ ██║ ██║██████╔╝█████╗ ███████╗ ██║ █████╗ ██║ ██║ ███████║██████╔╝[/bold cyan] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [bold cyan]██╔═██╗ ██║ ██║██╔══██╗██╔══╝ ╚════██║ ██║ ██╔══╝ ██║ ██║ ██╔══██║██╔══██╗[/bold cyan] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [bold cyan]██║ ██╗╚██████╔╝██████╔╝███████╗███████║ ██║ ███████╗███████╗███████╗██║ ██║██║ ██║[/bold cyan] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [bold cyan]╚═╝ ╚═╝ ╚═════╝ ╚═════╝ ╚══════╝╚══════╝ ╚═╝ ╚══════╝╚══════╝╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝[/bold cyan] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]│[/cyan] [dim]🌟 Multi-Cluster Kubernetes Management Agent 🌟[/dim] [cyan]│[/cyan]" + ) + self.console.print( + "[cyan]╰─────────────────────────────────────────────────────────────────────────────────────────────╯[/cyan]" + ) self.console.print() # Welcome message @@ -616,7 +642,10 @@ async def run(self): if sys.stdin and sys.stdin.isatty(): import termios - termios.tcsetattr(sys.stdin.fileno(), termios.TCSADRAIN, self._old_tty_settings) + + termios.tcsetattr( + sys.stdin.fileno(), termios.TCSADRAIN, self._old_tty_settings + ) # Goodbye self.console.print("\n[dim]Goodbye![/dim]") @@ -649,14 +678,14 @@ def _switch_provider(self, provider_name: str): async def _summarize_result(self, function_name: str, result: str) -> str: """Summarize the result of a tool execution using the LLM.""" try: - prompt = f'''Please summarize the following JSON output from the `{function_name}` tool. + prompt = f"""Please summarize the following JSON output from the `{function_name}` tool. Focus on the most important information for the user, such as success or failure, names of created resources, or key data points. Keep the summary concise and easy to read. Tool Output: ```json {result} -```''' +```""" messages = [LLMMessage(role=MessageRole.USER, content=prompt)] # We don't want the summarizer to call tools, so pass an empty list. diff --git a/src/llm_providers/base.py b/src/llm_providers/base.py index fb12d0a..88fa5f9 100644 --- a/src/llm_providers/base.py +++ b/src/llm_providers/base.py @@ -1,7 +1,7 @@ """Base LLM Provider interface.""" from abc import ABC, abstractmethod -from dataclasses import dataclass +from dataclasses import dataclass, field from enum import Enum from typing import Any, AsyncIterator, Dict, List, Optional, Union @@ -64,7 +64,7 @@ class LLMResponse: raw_response: Optional[Dict[str, Any]] = None usage: Optional[Dict[str, int]] = None - def __post_init__(self): + def __post_init__(self) -> None: if self.thinking_blocks is None: self.thinking_blocks = [] if self.tool_calls is None: @@ -80,9 +80,9 @@ class ProviderConfig: temperature: float = 0.7 max_tokens: Optional[int] = None timeout: int = 60 - extra_params: Dict[str, Any] = None + extra_params: Dict[str, Any] = field(default_factory=dict) - def __post_init__(self): + def __post_init__(self) -> None: if self.extra_params is None: self.extra_params = {} @@ -95,7 +95,7 @@ def __init__(self, config: ProviderConfig): self.config = config self._validate_config() - def _validate_config(self): + def _validate_config(self) -> None: """Validate provider configuration.""" if not self.config.api_key: raise ValueError(f"{self.__class__.__name__} requires an API key") diff --git a/src/llm_providers/config.py b/src/llm_providers/config.py index 3076cca..084e1c5 100644 --- a/src/llm_providers/config.py +++ b/src/llm_providers/config.py @@ -124,7 +124,10 @@ def _load_api_keys(self) -> Dict[str, str]: try: with open(self.keys_file, "r") as f: - return json.load(f) + keys = json.load(f) + if not isinstance(keys, dict): + return {} + return {str(k): str(v) for k, v in keys.items()} except Exception: return {} diff --git a/src/llm_providers/gemini.py b/src/llm_providers/gemini.py index eb2290b..91ba3c3 100644 --- a/src/llm_providers/gemini.py +++ b/src/llm_providers/gemini.py @@ -5,6 +5,7 @@ try: import google.generativeai as genai + HAS_GEMINI = True except ImportError: genai = None @@ -51,9 +52,11 @@ def __init__(self, config: ProviderConfig): # Initialize Gemini model self.model = genai.GenerativeModel(config.model) - def _convert_messages(self, messages: List[LLMMessage]) -> tuple[List[Dict[str, Any]], Optional[str]]: + def _convert_messages( + self, messages: List[LLMMessage] + ) -> tuple[List[Dict[str, Any]], Optional[str]]: """Convert messages to Gemini format. - + Returns: Tuple of (gemini_messages, system_instruction) """ @@ -69,18 +72,17 @@ def _convert_messages(self, messages: List[LLMMessage]) -> tuple[List[Dict[str, # Multiple system messages - combine them system_instruction += f"\n\n{msg.content}" elif msg.role == MessageRole.USER: - gemini_messages.append({"role": "user", "parts": [{"text": msg.content}]}) + gemini_messages.append( + {"role": "user", "parts": [{"text": msg.content}]} + ) elif msg.role == MessageRole.ASSISTANT: message = {"role": "model", "parts": [{"text": msg.content}]} if msg.tool_calls: # Convert tool calls to Gemini format for tc in msg.tool_calls: - message["parts"].append({ - "function_call": { - "name": tc.name, - "args": tc.arguments - } - }) + message["parts"].append( + {"function_call": {"name": tc.name, "args": tc.arguments}} + ) gemini_messages.append(message) elif msg.role == MessageRole.TOOL: # Gemini doesn't support tool role, convert to user message with tool result format @@ -90,11 +92,15 @@ def _convert_messages(self, messages: List[LLMMessage]) -> tuple[List[Dict[str, elif msg.role == MessageRole.THINKING: # Gemini does not have native thinking, append as annotation if gemini_messages and gemini_messages[-1]["role"] == "model": - gemini_messages[-1]["parts"].append({"text": f"\n{msg.content}\n"}) + gemini_messages[-1]["parts"].append( + {"text": f"\n{msg.content}\n"} + ) return gemini_messages, system_instruction - def _convert_tools_to_gemini(self, tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + def _convert_tools_to_gemini( + self, tools: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: """Convert tools to Gemini format.""" gemini_tools = [] @@ -104,11 +110,13 @@ def _convert_tools_to_gemini(self, tools: List[Dict[str, Any]]) -> List[Dict[str # Gemini expects tools in a specific format gemini_tool = { - "function_declarations": [{ - "name": tool["name"], - "description": tool.get("description", ""), - "parameters": cleaned_schema, - }] + "function_declarations": [ + { + "name": tool["name"], + "description": tool.get("description", ""), + "parameters": cleaned_schema, + } + ] } gemini_tools.append(gemini_tool) @@ -230,8 +238,7 @@ async def generate( model = self.model if system_instruction: model = genai.GenerativeModel( - self.config.model, - system_instruction=system_instruction + self.config.model, system_instruction=system_instruction ) # Prepare generation config @@ -246,7 +253,9 @@ async def generate( gemini_tools = self._convert_tools_to_gemini(tools) if stream: - return self._stream_response(gemini_messages, generation_config, gemini_tools, model, **kwargs) + return self._stream_response( + gemini_messages, generation_config, gemini_tools, model, **kwargs + ) else: try: # Try with tools first @@ -263,7 +272,10 @@ async def generate( ) except Exception as e: # If tools fail, fall back to no tools - if any(keyword in str(e).lower() for keyword in ["tools", "object", "system", "role", "function"]): + if any( + keyword in str(e).lower() + for keyword in ["tools", "object", "system", "role", "function"] + ): response = await model.generate_content_async( gemini_messages, generation_config=generation_config, @@ -284,14 +296,23 @@ async def generate( thinking_blocks=thinking_blocks, tool_calls=self._parse_tool_calls(response), usage=self._parse_usage(response), - raw_response=response.to_dict() if hasattr(response, "to_dict") else None, + raw_response=( + response.to_dict() if hasattr(response, "to_dict") else None + ), ) - async def _stream_response(self, messages: List[Dict[str, Any]], generation_config: Dict[str, Any], tools: Optional[List[Dict[str, Any]]] = None, model = None, **kwargs) -> AsyncIterator[LLMResponse]: + async def _stream_response( + self, + messages: List[Dict[str, Any]], + generation_config: Dict[str, Any], + tools: Optional[List[Dict[str, Any]]] = None, + model=None, + **kwargs, + ) -> AsyncIterator[LLMResponse]: """Stream response from Gemini.""" if model is None: model = self.model - + try: # Try with tools first if tools: @@ -309,7 +330,10 @@ async def _stream_response(self, messages: List[Dict[str, Any]], generation_conf ) except Exception as e: # If tools fail, fall back to no tools - if any(keyword in str(e).lower() for keyword in ["tools", "object", "system", "role", "function"]): + if any( + keyword in str(e).lower() + for keyword in ["tools", "object", "system", "role", "function"] + ): stream = await model.generate_content_async( messages, stream=True, @@ -324,7 +348,9 @@ async def _stream_response(self, messages: List[Dict[str, Any]], generation_conf delta = chunk.candidates[0].content.parts[0].text accumulated_content += delta - content, thinking_blocks = self.parse_thinking_blocks(accumulated_content) + content, thinking_blocks = self.parse_thinking_blocks( + accumulated_content + ) yield LLMResponse( content=content, diff --git a/src/shared/base_functions.py b/src/shared/base_functions.py index bc0b861..240936d 100644 --- a/src/shared/base_functions.py +++ b/src/shared/base_functions.py @@ -27,7 +27,7 @@ def get_schema(self) -> Dict[str, Any]: class CreatePlanFunction(BaseFunction): """A special function to create a plan of execution.""" - def __init__(self): + def __init__(self) -> None: super().__init__( "create_plan", "Creates a plan of steps to execute to fulfill the user's request. Use this when multiple steps are required.", @@ -61,7 +61,7 @@ def get_schema(self) -> Dict[str, Any]: class FunctionRegistry: """Registry to manage all available functions.""" - def __init__(self): + def __init__(self) -> None: self._functions: Dict[str, BaseFunction] = {} def register(self, function: BaseFunction) -> None: @@ -81,11 +81,11 @@ def get_schemas(self) -> Dict[str, Dict[str, Any]]: return {name: func.get_schema() for name, func in self._functions.items()} -def async_to_sync(func: Callable) -> Callable: +def async_to_sync(func: Callable[..., Any]) -> Callable[..., Any]: """Convert async function to sync for CLI usage.""" @functools.wraps(func) - def wrapper(*args, **kwargs): + def wrapper(*args, **kwargs) -> Any: try: loop = asyncio.get_running_loop() except RuntimeError: @@ -100,4 +100,4 @@ def wrapper(*args, **kwargs): function_registry = FunctionRegistry() # Register the create_plan function -function_registry.register(CreatePlanFunction()) \ No newline at end of file +function_registry.register(CreatePlanFunction()) diff --git a/src/shared/functions/__init__.py b/src/shared/functions/__init__.py index 51842cb..97940e4 100644 --- a/src/shared/functions/__init__.py +++ b/src/shared/functions/__init__.py @@ -42,7 +42,7 @@ def initialize_functions(): # Register Helm deployment function function_registry.register(HelmDeployFunction()) - + # Register cluster labels helper function function_registry.register(GetClusterLabelsFunction()) diff --git a/src/shared/functions/binding_policy_management.py b/src/shared/functions/binding_policy_management.py index d99b95b..ccd3fc7 100644 --- a/src/shared/functions/binding_policy_management.py +++ b/src/shared/functions/binding_policy_management.py @@ -39,12 +39,12 @@ def __init__(self) -> None: super().__init__( name="binding_policy_management", description="Fast operations on KubeStellar BindingPolicy objects " - "(list, create, delete, quick-create) against a single WDS." + "(list, create, delete, quick-create) against a single WDS.", ) async def execute( self, - operation: str = "list", # list | create | delete | quick_create + operation: str = "list", # list | create | delete | quick_create wds_context: str = "wds1", kubeconfig: str = "", # create / delete @@ -53,7 +53,7 @@ async def execute( policy_name: str = "", # quick-create selector_labels: Dict[str, str] | None = None, - resources: List[str] | None = None, # "apps/deployments", "core/namespaces" + resources: List[str] | None = None, # "apps/deployments", "core/namespaces" namespaces: List[str] | None = None, specific_workloads: List[Dict[str, str]] | None = None, **_: Any, @@ -75,8 +75,10 @@ async def execute( namespaces = json.loads(namespaces) except json.JSONDecodeError: namespaces = [s.strip() for s in namespaces.split(",") if s.strip()] + def _as_dict(obj): return dict(obj) if not isinstance(obj, dict) else obj + def _as_list(obj): return list(obj) if not isinstance(obj, list) else obj @@ -87,8 +89,12 @@ def _as_list(obj): if namespaces not in (None, "", []): namespaces = _as_list(namespaces) - if operation == "create" and not (policy_yaml or policy_json) \ - and selector_labels and resources: + if ( + operation == "create" + and not (policy_yaml or policy_json) + and selector_labels + and resources + ): operation = "quick_create" if operation == "list": @@ -96,10 +102,13 @@ def _as_list(obj): if operation == "create": if not (policy_yaml or policy_json): - return {"status": "error", "error": "policy_yaml or policy_json required"} + return { + "status": "error", + "error": "policy_yaml or policy_json required", + } manifest = policy_yaml or yaml.safe_dump(policy_json, sort_keys=False) return await self._kubectl_apply(manifest, wds_context, kubeconfig) - + if operation == "delete": if not policy_name: return {"status": "error", "error": "policy_name required"} @@ -120,8 +129,15 @@ def _as_list(obj): return {"status": "error", "error": f"unknown operation {operation}"} async def _op_list(self, ctx: str, kubeconfig: str) -> Dict[str, Any]: - cmd = ["kubectl", "--context", ctx, "get", - "bindingpolicies.control.kubestellar.io", "-o", "json"] + cmd = [ + "kubectl", + "--context", + ctx, + "get", + "bindingpolicies.control.kubestellar.io", + "-o", + "json", + ] if kubeconfig: cmd += ["--kubeconfig", kubeconfig] ret = await self._run(cmd) @@ -175,40 +191,60 @@ def _build_quick_manifest( if specific_wl: wl = specific_wl[0] - bp["metadata"].setdefault("annotations", {})["specificWorkloads"] = ",".join( - [wl.get("apiVersion", ""), wl.get("kind", ""), - wl.get("name", ""), wl.get("namespace", "")] + bp["metadata"].setdefault("annotations", {})["specificWorkloads"] = ( + ",".join( + [ + wl.get("apiVersion", ""), + wl.get("kind", ""), + wl.get("name", ""), + wl.get("namespace", ""), + ] + ) ) return yaml.safe_dump(bp, sort_keys=False), None # ───────────────────────── kubectl helpers ───────────────────────── - async def _kubectl_apply(self, yaml_body: str, ctx: str, kubeconfig: str) -> Dict[str, Any]: + async def _kubectl_apply( + self, yaml_body: str, ctx: str, kubeconfig: str + ) -> Dict[str, Any]: cmd = ["kubectl", "--context", ctx, "apply", "-f", "-"] if kubeconfig: cmd += ["--kubeconfig", kubeconfig] ret = await self._run(cmd, stdin_data=yaml_body.encode()) if ret["returncode"] != 0: - # include both stdout & stderr in the response so the LLM can show it - return { - "status": "error", - "operation": "apply", - "stderr": ret["stderr"], - "stdout": ret["stdout"], - "cmd": " ".join(cmd[:4]) + " -f -", # truncated for readability - } + # include both stdout & stderr in the response so the LLM can show it + return { + "status": "error", + "operation": "apply", + "stderr": ret["stderr"], + "stdout": ret["stdout"], + "cmd": " ".join(cmd[:4]) + " -f -", # truncated for readability + } status = "success" if ret["returncode"] == 0 else "error" - return {"status": status, "operation": "apply", "output": ret["stdout"] or ret["stderr"]} + return { + "status": status, + "operation": "apply", + "output": ret["stdout"] or ret["stderr"], + } - async def _kubectl_delete(self, name: str, ctx: str, kubeconfig: str) -> Dict[str, Any]: + async def _kubectl_delete( + self, name: str, ctx: str, kubeconfig: str + ) -> Dict[str, Any]: cmd = ["kubectl", "--context", ctx, "delete", "bindingpolicy", name] if kubeconfig: cmd += ["--kubeconfig", kubeconfig] ret = await self._run(cmd) status = "success" if ret["returncode"] == 0 else "error" - return {"status": status, "operation": "delete", "output": ret["stdout"] or ret["stderr"]} + return { + "status": status, + "operation": "delete", + "output": ret["stdout"] or ret["stderr"], + } - async def _run(self, cmd: List[str], stdin_data: bytes | None = None) -> Dict[str, str]: + async def _run( + self, cmd: List[str], stdin_data: bytes | None = None + ) -> Dict[str, str]: proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE if stdin_data else None, @@ -226,9 +262,12 @@ async def _run(self, cmd: List[str], stdin_data: bytes | None = None) -> Dict[st def _make_result(self, item: Dict[str, Any]) -> BPResult: meta = item["metadata"] spec = item.get("spec", {}) - status = "active" if meta.get("generation") == item.get("status", {}).get( - "observedGeneration" - ) else "inactive" + status = ( + "active" + if meta.get("generation") + == item.get("status", {}).get("observedGeneration") + else "inactive" + ) clusters: List[str] = [] for sel in spec.get("clusterSelectors", []): @@ -280,4 +319,4 @@ def get_schema(self) -> Dict[str, Any]: "specific_workloads": {"type": "array", "items": {"type": "object"}}, }, "required": [], - } \ No newline at end of file + } diff --git a/src/shared/functions/check_cluster_upgrades.py b/src/shared/functions/check_cluster_upgrades.py index 1faa80d..7e8f955 100644 --- a/src/shared/functions/check_cluster_upgrades.py +++ b/src/shared/functions/check_cluster_upgrades.py @@ -68,7 +68,9 @@ async def execute(self, kubeconfig: str = "") -> Dict[str, Any]: upgrade_statuses = await asyncio.gather( *[ - self._get_cluster_upgrade_status(cluster, latest_version, kubeconfig) + self._get_cluster_upgrade_status( + cluster, latest_version, kubeconfig + ) for cluster in clusters ] ) diff --git a/src/shared/functions/deploy_to.py b/src/shared/functions/deploy_to.py index 9b1bf11..e678eb5 100644 --- a/src/shared/functions/deploy_to.py +++ b/src/shared/functions/deploy_to.py @@ -73,12 +73,16 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: remote_context = params.remote_context dry_run = params.dry_run list_clusters = params.list_clusters - + try: # Handle list clusters request if list_clusters: - list_resp = await self._list_available_clusters(kubeconfig, remote_context) - return asdict(DeployToOutput(status=list_resp["status"], details=list_resp)) + list_resp = await self._list_available_clusters( + kubeconfig, remote_context + ) + return asdict( + DeployToOutput(status=list_resp["status"], details=list_resp) + ) # Validate inputs if not target_clusters and not cluster_labels: @@ -111,7 +115,8 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: "status": "error", "error": "No clusters match the selection criteria", "available_clusters": [ - {"name": c["name"], "context": c["context"]} for c in all_clusters + {"name": c["name"], "context": c["context"]} + for c in all_clusters ], } return asdict(DeployToOutput(status="error", details=err)) @@ -176,7 +181,10 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: return asdict(DeployToOutput(status=status, details=final_resp)) except Exception as e: - err = {"status": "error", "error": f"Failed to deploy to clusters: {str(e)}"} + err = { + "status": "error", + "error": f"Failed to deploy to clusters: {str(e)}", + } return asdict(DeployToOutput(status="error", details=err)) async def _list_available_clusters( @@ -616,4 +624,4 @@ def get_schema(self) -> Dict[str, Any]: ] }, ], - } \ No newline at end of file + } diff --git a/src/shared/functions/describe_resource.py b/src/shared/functions/describe_resource.py index fb5d678..f8a8988 100644 --- a/src/shared/functions/describe_resource.py +++ b/src/shared/functions/describe_resource.py @@ -66,7 +66,9 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: # Return the result if result["returncode"] == 0: response = {"status": "success", "description": result["stdout"]} - return asdict(DescribeResourceOutput(status="success", details=response)) + return asdict( + DescribeResourceOutput(status="success", details=response) + ) else: err = {"status": "error", "error": result["stderr"] or result["stdout"]} return asdict(DescribeResourceOutput(status="error", details=err)) diff --git a/src/shared/functions/get_cluster_labels.py b/src/shared/functions/get_cluster_labels.py index 4a541ff..5a3a5bd 100644 --- a/src/shared/functions/get_cluster_labels.py +++ b/src/shared/functions/get_cluster_labels.py @@ -29,35 +29,35 @@ async def execute( ) -> Dict[str, Any]: """ Get ManagedCluster labels for KubeStellar deployments. - + Args: context: Kubernetes context to use (default: current context) kubeconfig: Path to kubeconfig file output_format: Output format (table, json, yaml) - + Returns: Dictionary with cluster information and labels """ try: # Build kubectl command cmd = ["kubectl", "get", "managedclusters", "-A"] - + if output_format == "json": cmd.extend(["-o", "json"]) elif output_format == "yaml": cmd.extend(["-o", "yaml"]) else: cmd.append("--show-labels") - + if context: cmd.extend(["--context", context]) - + if kubeconfig: cmd.extend(["--kubeconfig", kubeconfig]) - + # Execute command result = await self._run_command(cmd) - + if result["returncode"] != 0: # Try without -A flag (some setups don't support it) cmd = ["kubectl", "get", "managedclusters"] @@ -71,30 +71,34 @@ async def execute( cmd.extend(["--context", context]) if kubeconfig: cmd.extend(["--kubeconfig", kubeconfig]) - + result = await self._run_command(cmd) - + if result["returncode"] != 0: return { "status": "error", "error": result["stderr"] or "Failed to get ManagedClusters", - "suggestion": "Make sure you're connected to a hub/ITS cluster with OCM installed" + "suggestion": "Make sure you're connected to a hub/ITS cluster with OCM installed", } - + # Parse results based on format if output_format == "json": data = json.loads(result["stdout"]) clusters = [] - + for item in data.get("items", []): cluster_info = { "name": item["metadata"]["name"], "labels": item["metadata"].get("labels", {}), - "status": item["status"].get("conditions", [{}])[0].get("type", "Unknown"), - "accepted": item["spec"].get("hubAcceptedManagedCluster", False) + "status": item["status"] + .get("conditions", [{}])[0] + .get("type", "Unknown"), + "accepted": item["spec"].get( + "hubAcceptedManagedCluster", False + ), } clusters.append(cluster_info) - + # Extract unique labels all_labels = {} for cluster in clusters: @@ -102,41 +106,42 @@ async def execute( if key not in all_labels: all_labels[key] = set() all_labels[key].add(value) - + # Convert sets to lists for JSON serialization unique_labels = {k: list(v) for k, v in all_labels.items()} - + return { "status": "success", "clusters": clusters, "unique_labels": unique_labels, "total_clusters": len(clusters), - "example_selectors": self._generate_example_selectors(unique_labels) + "example_selectors": self._generate_example_selectors( + unique_labels + ), } else: # Return raw output for table/yaml format return { "status": "success", "output": result["stdout"], - "format": output_format + "format": output_format, } - + except Exception as e: - return { - "status": "error", - "error": str(e) - } - - def _generate_example_selectors(self, labels: Dict[str, List[str]]) -> List[Dict[str, str]]: + return {"status": "error", "error": str(e)} + + def _generate_example_selectors( + self, labels: Dict[str, List[str]] + ) -> List[Dict[str, str]]: """Generate example cluster selector labels for BindingPolicy.""" examples = [] - + # Single label examples for key, values in labels.items(): if key != "name": # Skip name label as it's too specific for value in values[:2]: # Limit to 2 examples per key examples.append({key: value}) - + # Multi-label example if we have multiple keys if len(labels) > 1: multi_example = {} @@ -147,9 +152,9 @@ def _generate_example_selectors(self, labels: Dict[str, List[str]]) -> List[Dict multi_example[key] = values[0] if multi_example: examples.append(multi_example) - + return examples[:5] # Return max 5 examples - + async def _run_command(self, cmd: List[str]) -> Dict[str, Any]: """Execute command and return results.""" try: @@ -157,7 +162,7 @@ async def _run_command(self, cmd: List[str]) -> Dict[str, Any]: *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() - + return { "returncode": process.returncode, "stdout": stdout.decode("utf-8"), @@ -177,18 +182,18 @@ def get_schema(self) -> Dict[str, Any]: "properties": { "context": { "type": "string", - "description": "Kubernetes context to use" + "description": "Kubernetes context to use", }, "kubeconfig": { - "type": "string", - "description": "Path to kubeconfig file" + "type": "string", + "description": "Path to kubeconfig file", }, "output_format": { "type": "string", "enum": ["table", "json", "yaml"], "description": "Output format", - "default": "table" - } + "default": "table", + }, }, - "required": [] - } \ No newline at end of file + "required": [], + } diff --git a/src/shared/functions/gvrc_discovery.py b/src/shared/functions/gvrc_discovery.py index a1112a8..23d63b5 100644 --- a/src/shared/functions/gvrc_discovery.py +++ b/src/shared/functions/gvrc_discovery.py @@ -55,7 +55,7 @@ class GVRCDiscoveryOutput: class GVRCDiscoveryFunction(BaseFunction): """Function to discover Group, Version, Resource, Category information across clusters.""" - def __init__(self): + def __init__(self) -> None: super().__init__( name="gvrc_discovery", description="Discover and inventory all available Kubernetes API resources (pods, services, CRDs, etc.) across clusters. Shows Group/Version/Resource/Category (GVRC) information, API versions, and resource capabilities. Use this to understand what resources are available in your clusters, find custom resources, or check API compatibility across your fleet.", @@ -359,8 +359,8 @@ async def _get_namespace_details( # Parse the output (labels;annotations) parts = result["stdout"].split(";") - labels = {} - annotations = {} + labels: Dict[str, Any] = {} + annotations: Dict[str, Any] = {} if len(parts) > 0 and parts[0].strip(): # Parse labels (basic parsing) @@ -557,4 +557,4 @@ def get_schema(self) -> Dict[str, Any]: }, }, "required": [], - } \ No newline at end of file + } diff --git a/src/shared/functions/helm/list.py b/src/shared/functions/helm/list.py index 0274511..817813d 100644 --- a/src/shared/functions/helm/list.py +++ b/src/shared/functions/helm/list.py @@ -82,9 +82,21 @@ def get_schema(self) -> Dict[str, Any]: return { "type": "object", "properties": { - "namespace": {"type": "string", "description": "The namespace to list releases in."}, - "all_namespaces": {"type": "boolean", "description": "List releases in all namespaces."}, - "kubeconfig": {"type": "string", "description": "Path to the kubeconfig file."}, - "target_cluster": {"type": "string", "description": "The name of the cluster context."}, + "namespace": { + "type": "string", + "description": "The namespace to list releases in.", + }, + "all_namespaces": { + "type": "boolean", + "description": "List releases in all namespaces.", + }, + "kubeconfig": { + "type": "string", + "description": "Path to the kubeconfig file.", + }, + "target_cluster": { + "type": "string", + "description": "The name of the cluster context.", + }, }, } diff --git a/src/shared/functions/helm_deploy.py b/src/shared/functions/helm_deploy.py index 3894b66..e87bb97 100644 --- a/src/shared/functions/helm_deploy.py +++ b/src/shared/functions/helm_deploy.py @@ -17,7 +17,7 @@ 2. Create BindingPolicy in WDS cluster (e.g., wds1) with cluster selectors 3. KubeStellar automatically propagates workloads to WECs matching the selectors -Example for KubeStellar deployment: +Example for KubeStellar deployment: helm_deploy( chart_name="nginx", repository_url="https://charts.bitnami.com/bitnami", @@ -28,7 +28,7 @@ wds_context="wds1", # WDS cluster for policy cluster_selector_labels={"location-group": "edge"} # Selects WECs ) - + To check cluster labels: kubectl get managedclusters -A --show-labels """ @@ -45,7 +45,7 @@ class HelmDeployFunction(BaseFunction): """Multi-cluster Helm deployment with KubeStellar integration. - + Features: - Parallel deployment across multiple clusters - Automatic KubeStellar resource labeling and BindingPolicy creation @@ -97,7 +97,7 @@ async def execute( ) -> Dict[str, Any]: """ Deploy Helm charts across multiple clusters with KubeStellar integration. - + Process: discover clusters → prepare labels → execute Helm operation → create BindingPolicy Supports cluster-specific configurations and automatic KubeStellar resource management. @@ -178,22 +178,30 @@ async def execute( for c in all_clusters ], } - + # Validate KubeStellar deployment pattern if create_binding_policy and wds_context: # Check if deploying to ITS cluster - its_clusters = [c for c in selected_clusters if self._is_its_cluster(c["name"])] - wec_clusters = [c for c in selected_clusters if self._is_wec_cluster(c["name"])] - + its_clusters = [ + c for c in selected_clusters if self._is_its_cluster(c["name"]) + ] + wec_clusters = [ + c for c in selected_clusters if self._is_wec_cluster(c["name"]) + ] + if wec_clusters and not its_clusters: return { "status": "error", "error": "KubeStellar deployments should target ITS cluster (e.g., its1) not WEC clusters directly", "suggestion": "Use target_clusters=['its1'] and cluster_selector_labels to select WECs", - "its_clusters": [c["name"] for c in all_clusters if self._is_its_cluster(c["name"])], - "wec_clusters": [c["name"] for c in wec_clusters] + "its_clusters": [ + c["name"] + for c in all_clusters + if self._is_its_cluster(c["name"]) + ], + "wec_clusters": [c["name"] for c in wec_clusters], } - + if len(selected_clusters) > 1 and operation in ["install", "upgrade"]: # Warn if deploying to multiple clusters with KubeStellar self._log_warning( @@ -553,11 +561,13 @@ async def _deploy_to_cluster( result_status = "success" else: result_status = "error" - + # Create result dict without overwriting status - result_info = {k: v for k, v in release_info.items() if k != "status"} + result_info = { + k: v for k, v in release_info.items() if k != "status" + } result_info["helm_status"] = release_info.get("status", "unknown") - + namespace_results[namespace] = { "status": result_status, "output": result["stdout"], @@ -1209,7 +1219,7 @@ def _is_wds_cluster(self, cluster_name: str) -> bool: or "-wds-" in lower_name or "_wds_" in lower_name ) - + def _is_its_cluster(self, cluster_name: str) -> bool: """Check if cluster is an ITS (Inventory & Template Space) cluster.""" lower_name = cluster_name.lower() @@ -1218,16 +1228,19 @@ def _is_its_cluster(self, cluster_name: str) -> bool: or "-its-" in lower_name or "_its_" in lower_name ) - + def _is_wec_cluster(self, cluster_name: str) -> bool: """Check if cluster is a WEC (Workload Execution Cluster).""" # WEC clusters are typically named cluster1, cluster2, etc. # They are not WDS or ITS clusters - return not (self._is_wds_cluster(cluster_name) or self._is_its_cluster(cluster_name)) - + return not ( + self._is_wds_cluster(cluster_name) or self._is_its_cluster(cluster_name) + ) + def _log_warning(self, message: str) -> None: """Log a warning message.""" import sys + print(f"WARNING: {message}", file=sys.stderr) async def _resolve_target_namespaces( diff --git a/src/shared/functions/kubeconfig.py b/src/shared/functions/kubeconfig.py index 5f2d8f1..04e5590 100644 --- a/src/shared/functions/kubeconfig.py +++ b/src/shared/functions/kubeconfig.py @@ -34,7 +34,7 @@ class KubeconfigOutput: class KubeconfigFunction(BaseFunction): """Function to get details from kubeconfig file.""" - def __init__(self): + def __init__(self) -> None: super().__init__( name="get_kubeconfig", description="Get comprehensive details from kubeconfig file including available contexts, clusters, and users. Use this to understand your Kubernetes setup and available clusters for multi-cluster operations.", @@ -121,7 +121,7 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: out = KubeconfigOutput(status="error", details=err) return {"status": out.status, **out.details} - def _get_context_details(self, kubeconfig: Dict, context: Dict) -> Dict[str, Any]: + def _get_context_details(self, kubeconfig: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: """Get details for a specific context.""" context_info = context.get("context", {}) return { @@ -131,7 +131,7 @@ def _get_context_details(self, kubeconfig: Dict, context: Dict) -> Dict[str, Any "namespace": context_info.get("namespace", "default"), } - def _get_clusters(self, kubeconfig: Dict) -> List[Dict[str, Any]]: + def _get_clusters(self, kubeconfig: Dict[str, Any]) -> List[Dict[str, Any]]: """Get cluster information.""" clusters = [] for cluster in kubeconfig.get("clusters", []): @@ -147,12 +147,12 @@ def _get_clusters(self, kubeconfig: Dict) -> List[Dict[str, Any]]: ) return clusters - def _get_users(self, kubeconfig: Dict) -> List[Dict[str, Any]]: + def _get_users(self, kubeconfig: Dict[str, Any]) -> List[Dict[str, Any]]: """Get user information (sanitized).""" users = [] for user in kubeconfig.get("users", []): user_data = user.get("user", {}) - user_info = {"name": user["name"], "auth_type": []} + user_info: Dict[str, Any] = {"name": user["name"], "auth_type": []} # Determine auth type without exposing sensitive data if ( @@ -190,4 +190,4 @@ def get_schema(self) -> Dict[str, Any]: }, }, "required": [], - } \ No newline at end of file + } diff --git a/src/shared/functions/kubestellar_management.py b/src/shared/functions/kubestellar_management.py index ddff804..e65ae89 100644 --- a/src/shared/functions/kubestellar_management.py +++ b/src/shared/functions/kubestellar_management.py @@ -39,9 +39,9 @@ class BindingPolicy: cluster_selectors: List[Dict[str, Any]] workload_transformations: Optional[Dict[str, Any]] = None singleton_status_return: bool = False - status: Dict[str, Any] = None + status: Dict[str, Any] = field(default_factory=dict) created: str = "" - binding_objects: List[str] = None + binding_objects: List[str] = field(default_factory=list) @dataclass @@ -105,7 +105,7 @@ class KubeStellarManagementOutput: class KubeStellarManagementFunction(BaseFunction): """Enhanced KubeStellar multi-cluster management with deep search and binding policy integration.""" - def __init__(self): + def __init__(self) -> None: super().__init__( name="kubestellar_management", description="Advanced KubeStellar multi-cluster resource management with deep search capabilities, binding policy integration, work status tracking, and comprehensive cluster topology analysis. Provides detailed insights into resource distribution, policy compliance, and cross-cluster relationships.", @@ -155,7 +155,6 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: kubeconfig = p.kubeconfig output_format = p.output_format - # Discover KubeStellar cluster topology clusters = await self._discover_kubestellar_topology( kubeconfig, include_wds @@ -185,14 +184,18 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: output_format, ) return asdict( - KubeStellarManagementOutput(status=result.get("status", "success"), details=result) + KubeStellarManagementOutput( + status=result.get("status", "success"), details=result + ) ) elif operation == "policy_analysis": result = await self._analyze_binding_policies( clusters, kubeconfig, output_format ) return asdict( - KubeStellarManagementOutput(status=result.get("status", "success"), details=result) + KubeStellarManagementOutput( + status=result.get("status", "success"), details=result + ) ) elif operation == "resource_inventory": result = await self._create_resource_inventory( @@ -204,14 +207,18 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: output_format, ) return asdict( - KubeStellarManagementOutput(status=result.get("status", "success"), details=result) + KubeStellarManagementOutput( + status=result.get("status", "success"), details=result + ) ) elif operation == "topology_map": result = await self._create_topology_map( clusters, kubeconfig, output_format ) return asdict( - KubeStellarManagementOutput(status=result.get("status", "success"), details=result) + KubeStellarManagementOutput( + status=result.get("status", "success"), details=result + ) ) else: err = {"error": f"Unsupported operation: {operation}"} @@ -272,7 +279,7 @@ async def _classify_kubestellar_space( ) -> Dict[str, Any]: """Classify KubeStellar 2024 space type (WDS, ITS, WEC) and gather metadata.""" try: - space_info = { + space_info: Dict[str, Any] = { "name": context, "type": "unknown", "cluster": context, @@ -446,7 +453,7 @@ async def _get_kubestellar_info( ) -> Dict[str, Any]: """Get KubeStellar-specific information for a cluster.""" try: - info = { + info: Dict[str, Any] = { "version": None, "components": [], "api_resources": [], @@ -529,7 +536,7 @@ async def _perform_deep_search( ) -> Dict[str, Any]: """Perform deep search across KubeStellar clusters.""" try: - results = { + results: Dict[str, Any] = { "status": "success", "operation": "deep_search", "clusters_analyzed": len(clusters), @@ -696,7 +703,7 @@ async def _get_target_namespaces( if result["returncode"] != 0: return ["default"] - namespaces = [] + namespaces: List[str] = [] for line in result["stdout"].strip().split("\n"): if line.startswith("namespace/"): ns_name = line.replace("namespace/", "") @@ -813,7 +820,7 @@ def _aggregate_resource_summary( self, cluster_results: Dict[str, Any] ) -> Dict[str, Any]: """Aggregate resource summary across all clusters.""" - summary = { + summary: Dict[str, Any] = { "total_clusters": len(cluster_results), "total_resources": 0, "resources_by_type": {}, @@ -857,7 +864,7 @@ async def _aggregate_binding_policies( ) -> Dict[str, Any]: """Aggregate binding policy information across clusters.""" try: - policies = { + policies: Dict[str, Any] = { "total_policies": 0, "policies_by_cluster": {}, "policy_details": [], @@ -879,7 +886,7 @@ async def _get_binding_policies( ) -> List[Dict[str, Any]]: """Get binding policies from a cluster.""" try: - policies = [] + policies: List[Dict[str, Any]] = [] # Try to get binding policies (KubeStellar specific) cmd = [ @@ -919,7 +926,7 @@ async def _aggregate_work_statuses( ) -> Dict[str, Any]: """Aggregate work status information across clusters.""" try: - statuses = { + statuses: Dict[str, Any] = { "total_work_statuses": 0, "statuses_by_cluster": {}, "status_details": [], @@ -941,7 +948,7 @@ async def _get_work_statuses( ) -> List[Dict[str, Any]]: """Get work statuses from a cluster.""" try: - statuses = [] + statuses: List[Dict[str, Any]] = [] cmd = [ "kubectl", @@ -979,7 +986,7 @@ def _analyze_resource_placement( self, cluster_results: Dict[str, Any] ) -> Dict[str, Any]: """Analyze resource placement patterns across clusters.""" - placement_analysis = { + placement_analysis: Dict[str, Any] = { "distribution_patterns": {}, "cross_cluster_resources": {}, "placement_efficiency": {}, @@ -987,7 +994,7 @@ def _analyze_resource_placement( } # Analyze resource distribution patterns - resource_distribution = {} + resource_distribution: Dict[str, Any] = {} for cluster_name, cluster_result in cluster_results.items(): if cluster_result.get("status") != "success": continue @@ -1022,7 +1029,7 @@ def _analyze_resource_placement( def _create_dependency_map(self, cluster_results: Dict[str, Any]) -> Dict[str, Any]: """Create a dependency map of resources across clusters.""" - dependency_map = { + dependency_map: Dict[str, Any] = { "cross_cluster_references": {}, "resource_relationships": {}, "orphaned_resources": [], @@ -1090,7 +1097,7 @@ async def _create_resource_inventory( """Create comprehensive resource inventory.""" try: # Implementation for resource inventory - inventory = { + inventory: Dict[str, Any] = { "status": "success", "operation": "resource_inventory", "clusters": len(clusters), @@ -1124,7 +1131,7 @@ async def _create_topology_map( ) -> Dict[str, Any]: """Create KubeStellar topology map.""" try: - topology = { + topology: Dict[str, Any] = { "status": "success", "operation": "topology_map", "control_planes": [], @@ -1261,4 +1268,4 @@ def get_schema(self) -> Dict[str, Any]: }, }, "required": [], - } \ No newline at end of file + } diff --git a/src/shared/functions/multicluster_create.py b/src/shared/functions/multicluster_create.py index d46b409..66e3060 100644 --- a/src/shared/functions/multicluster_create.py +++ b/src/shared/functions/multicluster_create.py @@ -40,7 +40,7 @@ class MultiClusterCreateOutput: class MultiClusterCreateFunction(BaseFunction): """Function to create resources across multiple Kubernetes clusters.""" - def __init__(self): + def __init__(self) -> None: super().__init__( name="multicluster_create", description="Create and deploy Kubernetes workloads (deployments, services, configmaps) across all clusters simultaneously. Use this for global resource creation that should appear on every cluster in your KubeStellar fleet. For targeted deployment to specific clusters, use deploy_to instead.", @@ -462,4 +462,4 @@ def get_schema(self) -> Dict[str, Any]: {"required": ["filename"]}, {"required": ["resource_type", "resource_name"]}, ], - } \ No newline at end of file + } diff --git a/src/shared/functions/multicluster_logs.py b/src/shared/functions/multicluster_logs.py index 516ac83..2948a5b 100644 --- a/src/shared/functions/multicluster_logs.py +++ b/src/shared/functions/multicluster_logs.py @@ -2,7 +2,7 @@ import asyncio from dataclasses import asdict, dataclass, field -from typing import Any, Dict, List, Optional +from typing import Any, AsyncIterator, Dict, List, Optional from src.shared.base_functions import BaseFunction @@ -44,7 +44,7 @@ class MultiClusterLogsOutput: class MultiClusterLogsFunction(BaseFunction): """Function to aggregate logs from containers across multiple Kubernetes clusters.""" - def __init__(self): + def __init__(self) -> None: super().__init__( name="multicluster_logs", description="Retrieve and aggregate container logs from pods across multiple clusters. Use this to troubleshoot applications, monitor workloads, or gather logs from distributed services. Can target specific pods by name, label selectors, or resource types (deployment/nginx). Essential for multi-cluster debugging and observability.", @@ -94,7 +94,6 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: remote_context = params.remote_context max_log_requests = params.max_log_requests - if ( not pod_name and not resource_selector @@ -106,8 +105,6 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: } return asdict(MultiClusterLogsOutput(status="error", details=err)) - - # Discover clusters clusters = await self._discover_clusters(kubeconfig, remote_context) if not clusters: @@ -142,7 +139,9 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: max_log_requests, ) return asdict( - MultiClusterLogsOutput(status=resp.get("status", "success"), details=resp) + MultiClusterLogsOutput( + status=resp.get("status", "success"), details=resp + ) ) else: resp = await self._get_logs_from_clusters( @@ -162,7 +161,9 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: kubeconfig, ) return asdict( - MultiClusterLogsOutput(status=resp.get("status", "success"), details=resp) + MultiClusterLogsOutput( + status=resp.get("status", "success"), details=resp + ) ) except Exception as e: @@ -284,7 +285,7 @@ async def _follow_logs_from_clusters( # Limit concurrent requests to avoid overwhelming the system semaphore = asyncio.Semaphore(max_requests) - async def follow_cluster_logs(cluster): + async def follow_cluster_logs(cluster: Dict[str, Any]) -> Dict[str, Any]: async with semaphore: return await self._follow_logs_from_cluster( cluster, @@ -456,10 +457,11 @@ async def _follow_logs_from_cluster( # Stream output with cluster prefix lines_processed = 0 - async for line in self._stream_output(process.stdout, cluster["name"]): - lines_processed += 1 - # In a real implementation, you'd yield or emit these lines - # For now, we'll just count them + if process.stdout: + async for line in self._stream_output(process.stdout, cluster["name"]): + lines_processed += 1 + # In a real implementation, you'd yield or emit these lines + # For now, we'll just count them await process.wait() @@ -477,7 +479,9 @@ async def _follow_logs_from_cluster( "cluster": cluster["name"], } - async def _stream_output(self, stdout, cluster_name: str): + async def _stream_output( + self, stdout: asyncio.StreamReader, cluster_name: str + ) -> AsyncIterator[str]: """Stream output lines with cluster prefix.""" while True: line = await stdout.readline() @@ -660,4 +664,4 @@ def get_schema(self) -> Dict[str, Any]: {"required": ["resource_selector"]}, {"required": ["label_selector"]}, ], - } \ No newline at end of file + } diff --git a/src/shared/functions/namespace_utils.py b/src/shared/functions/namespace_utils.py index 13c4cf4..89b7f7f 100644 --- a/src/shared/functions/namespace_utils.py +++ b/src/shared/functions/namespace_utils.py @@ -21,8 +21,6 @@ class NamespaceResource: created: str - - @dataclass class NamespaceUtilsInput: """All parameters accepted by namespace_utils.execute in one bundle.""" @@ -51,7 +49,7 @@ class NamespaceUtilsOutput: class NamespaceUtilsFunction(BaseFunction): """Function to manage and discover namespace-related operations across clusters.""" - def __init__(self): + def __init__(self) -> None: super().__init__( name="namespace_utils", description="List and count pods, services, deployments and other resources across namespaces and clusters. Use operation='list' to get pod counts and resource information.", @@ -92,7 +90,6 @@ async def execute(self, **kwargs: Any) -> Dict[str, Any]: remote_context = params.remote_context output_format = params.output_format - # Discover clusters clusters = await self._discover_clusters(kubeconfig, remote_context) if not clusters: @@ -249,7 +246,12 @@ async def _list_namespaces( # Include resources if requested if include_resources: resources = await self._get_namespace_resources( - cluster, ns_name, resource_types, label_selector, resource_name, kubeconfig + cluster, + ns_name, + resource_types, + label_selector, + resource_name, + kubeconfig, ) namespace_info["resources"] = resources @@ -377,7 +379,12 @@ async def _list_namespace_resources( # Get resources from each namespace for namespace in target_namespaces: ns_resources = await self._get_namespace_resources( - cluster, namespace, resource_types, label_selector, resource_name, kubeconfig + cluster, + namespace, + resource_types, + label_selector, + resource_name, + kubeconfig, ) resources.extend(ns_resources) @@ -440,7 +447,7 @@ async def _get_namespace_resources( if label_selector: cmd.extend(["-l", label_selector]) - + if resource_name: cmd.append(resource_name) else: @@ -496,7 +503,10 @@ async def _get_resource_quotas( import json quota_data = json.loads(result["stdout"]) - return quota_data.get("items", []) + items = quota_data.get("items", []) + if isinstance(items, list): + return items + return [] return [] except Exception: @@ -527,7 +537,10 @@ async def _get_limit_ranges( import json limit_data = json.loads(result["stdout"]) - return limit_data.get("items", []) + items = limit_data.get("items", []) + if isinstance(items, list): + return items + return [] return [] except Exception: @@ -660,4 +673,4 @@ def get_schema(self) -> Dict[str, Any]: }, }, "required": [], - } \ No newline at end of file + } diff --git a/tests/test_kubestellar_management.py b/tests/test_kubestellar_management.py index 286a3bd..ccbe971 100644 --- a/tests/test_kubestellar_management.py +++ b/tests/test_kubestellar_management.py @@ -53,7 +53,10 @@ async def test_execute_no_clusters(self, kubestellar_function): result = await kubestellar_function.execute() assert result["status"] == "error" - assert "no kubestellar clusters discovered" in result["details"]["error"].lower() + assert ( + "no kubestellar clusters discovered" + in result["details"]["error"].lower() + ) @pytest.mark.asyncio async def test_topology_map_operation(self, kubestellar_function): @@ -87,7 +90,7 @@ async def test_topology_map_operation(self, kubestellar_function): mock_discover.return_value = mock_clusters result = await kubestellar_function.execute(operation="topology_map") - + assert result["status"] == "success" details = result["details"] assert details["operation"] == "topology_map" diff --git a/tests/test_namespace_utils.py b/tests/test_namespace_utils.py index 85aca0d..0e9392f 100644 --- a/tests/test_namespace_utils.py +++ b/tests/test_namespace_utils.py @@ -345,7 +345,17 @@ async def test_execute_namespace_operation_error( ): """Test error handling in namespace operations.""" result = await namespace_function._execute_namespace_operation( - mock_clusters[0], "invalid", None, False, "", "", None, False, "", "", "table" + mock_clusters[0], + "invalid", + None, + False, + "", + "", + None, + False, + "", + "", + "table", ) assert result["status"] == "error"