-
Notifications
You must be signed in to change notification settings - Fork 49
Fixes bug in sandbox_client that assumed pod name matched the sandbox name #150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,8 +32,12 @@ | |
| SANDBOX_API_GROUP = "agents.x-k8s.io" | ||
| SANDBOX_API_VERSION = "v1alpha1" | ||
| SANDBOX_PLURAL_NAME = "sandboxes" | ||
| POD_NAME_ANNOTATION = "agents.x-k8s.io/pod-name" | ||
|
|
||
| logging.basicConfig(level=logging.INFO, | ||
| format='%(asctime)s - %(levelname)s - %(message)s', | ||
| stream=sys.stdout) | ||
|
|
||
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', stream=sys.stdout) | ||
|
|
||
| @dataclass | ||
| class ExecutionResult: | ||
|
|
@@ -42,26 +46,31 @@ class ExecutionResult: | |
| stderr: str | ||
| exit_code: int | ||
|
|
||
|
|
||
| class SandboxClient: | ||
| """ | ||
| The main client for creating and interacting with a stateful Sandbox (now named SandboxClient). | ||
| This class is a context manager, designed to be used with a `with` statement. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| template_name: str, | ||
| namespace: str = "default", | ||
| self, | ||
| template_name: str, | ||
| namespace: str = "default", | ||
| server_port: int = 8888, | ||
| sandbox_ready_timeout: int = 180, | ||
| port_forward_ready_timeout: int = 30 | ||
| port_forward_ready_timeout: int = 30, | ||
| pod_name_ready_timeout: int = 1 | ||
| ): | ||
| self.template_name = template_name | ||
| self.namespace = namespace | ||
| self.server_port = server_port | ||
| self.sandbox_ready_timeout = sandbox_ready_timeout | ||
| self.port_forward_ready_timeout = port_forward_ready_timeout | ||
| self.pod_name_ready_timeout = pod_name_ready_timeout | ||
| self.claim_name: str | None = None | ||
| self.sandbox_name: str | None = None | ||
| self.pod_name: str | None = None | ||
| self.base_url = f"http://127.0.0.1:{self.server_port}" | ||
| self.port_forward_process: subprocess.Popen | None = None | ||
|
|
||
|
|
@@ -101,7 +110,9 @@ def _wait_for_sandbox_ready(self): | |
| This indicates that the underlying pod is running and has passed its checks. | ||
| """ | ||
| if not self.claim_name: | ||
| raise RuntimeError("Cannot wait for sandbox, claim has not been created.") | ||
| raise RuntimeError( | ||
| "Cannot wait for sandbox, claim has not been created.") | ||
|
|
||
| w = watch.Watch() | ||
| logging.info("Watching for Sandbox to become ready...") | ||
| for event in w.stream( | ||
|
|
@@ -113,37 +124,86 @@ def _wait_for_sandbox_ready(self): | |
| field_selector=f"metadata.name={self.claim_name}", | ||
| timeout_seconds=self.sandbox_ready_timeout | ||
| ): | ||
| sandbox_object = event['object'] | ||
| status = sandbox_object.get('status', {}) | ||
| conditions = status.get('conditions', []) | ||
| is_ready = False | ||
| for cond in conditions: | ||
| if cond.get('type') == 'Ready' and cond.get('status') == 'True': | ||
| is_ready = True | ||
| break | ||
|
|
||
| if is_ready: | ||
| self.sandbox_name = sandbox_object['metadata']['name'] | ||
| w.stop() | ||
| logging.info(f"Sandbox {self.sandbox_name} is ready.") | ||
| break | ||
|
|
||
| if not self.sandbox_name: | ||
| self.__exit__(None, None, None) | ||
| raise TimeoutError(f"Sandbox did not become ready within {self.sandbox_ready_timeout} seconds.") | ||
| if event["type"] in ["ADDED", "MODIFIED"]: | ||
| sandbox_object = event['object'] | ||
| status = sandbox_object.get('status', {}) | ||
| conditions = status.get('conditions', []) | ||
| is_ready = False | ||
| for cond in conditions: | ||
| if cond.get('type') == 'Ready' and cond.get('status') == 'True': | ||
| is_ready = True | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The logic here relies on the
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a wait to account for the possible race condition. |
||
| break | ||
|
|
||
| if is_ready: | ||
igooch marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new logic for discovering the pod name is critical for warm pool functionality but is not covered by automated tests. Please add unit tests to
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is tracked under #151.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is generally best practice to include tests in the same PR as the bug fix that verify the fix and prevent future regressions. Please add the necessary tests to validate the new behavior. These tests should cover cases where the pod-name annotation is present and where it is absent.
igooch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| metadata = sandbox_object.get( | ||
| "metadata", {}) | ||
| self.sandbox_name = metadata.get( | ||
| "name") | ||
| if not self.sandbox_name: | ||
| raise RuntimeError( | ||
| "Could not determine sandbox name from sandbox object.") | ||
|
|
||
| logging.info(f"Sandbox {self.sandbox_name} is ready.") | ||
| self._wait_for_pod_name() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From the controller code https://github.com/kubernetes-sigs/agent-sandbox/blob/main/extensions/controllers/sandboxclaim_controller.go The pod name annotation is present from the moment the Sandbox is created, and the Ready status is only set after the pod is running, there should not be a scenario where the sandbox is "Ready" before the annotation being present. While adding a wait can seem like a safe precaution, a short and arbitrary timeout of 1 second can introduce flakiness and make the system harder to debug if there are other, unrelated timing issues. A more thorough approach would be to investigate the controller logic to confirm if a race condition is possible before implementing a fix. In this case, it appears the wait is unnecessary. My recommendation is to remove the _wait_for_pod_name function and the associated pod_name_ready_timeout. This would simplify the client code and remove a potential source of intermittent failures. To prevent this from regressing, please also add an e2e test to the controller to guarantee the pod-name annotation is present when the sandbox becomes Ready.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plus 1 .. i missed this. |
||
| w.stop() | ||
| return | ||
|
|
||
| self.__exit__(None, None, None) | ||
| raise TimeoutError( | ||
| f"Sandbox did not become ready within {self.sandbox_ready_timeout} seconds.") | ||
|
|
||
| def _wait_for_pod_name(self, timeout: int = 30): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| """ | ||
| Waits for the pod-name annotation to be present on the sandbox object. | ||
| This wait is only necessary when using SandboxWarmPool. | ||
| """ | ||
| if self.pod_name_ready_timeout <= 0: | ||
| logging.info( | ||
| f"pod_name_ready_timeout {self.pod_name_ready_timeout} is <= 0. Defaulting pod to sandbox name {self.sandbox_name}.") | ||
| self.pod_name = self.sandbox_name | ||
| return | ||
| w = watch.Watch() | ||
| logging.info( | ||
| f"Waiting for pod name annotation on sandbox {self.sandbox_name}...") | ||
| for event in w.stream( | ||
| func=self.custom_objects_api.list_namespaced_custom_object, | ||
| namespace=self.namespace, | ||
| group=SANDBOX_API_GROUP, | ||
| version=SANDBOX_API_VERSION, | ||
| plural=SANDBOX_PLURAL_NAME, | ||
| field_selector=f"metadata.name={self.sandbox_name}", | ||
| timeout_seconds=self.pod_name_ready_timeout | ||
| ): | ||
| if event["type"] in ["ADDED", "MODIFIED"]: | ||
| sandbox_object = event['object'] | ||
| annotations = sandbox_object.get( | ||
| 'metadata', {}).get('annotations', {}) | ||
| pod_name = annotations.get(POD_NAME_ANNOTATION) | ||
| if pod_name: | ||
| self.pod_name = pod_name | ||
| logging.info( | ||
| f"Found pod name from annotation: {self.pod_name}") | ||
| w.stop() | ||
| return | ||
|
|
||
| logging.warning( | ||
| f"Pod name annotation not found after {self.pod_name_ready_timeout} seconds. Defaulting to sandbox name {self.sandbox_name}.") | ||
| self.pod_name = self.sandbox_name | ||
|
|
||
| def _start_and_wait_for_port_forward(self): | ||
igooch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """ | ||
| Starts the 'kubectl port-forward' subprocess and waits for the local port | ||
| to be open and listening, ensuring the tunnel is ready for traffic. | ||
| """ | ||
| if not self.sandbox_name: | ||
| raise RuntimeError("Cannot start port-forwarding, sandbox name is not known.") | ||
| logging.info(f"Starting port-forwarding for sandbox {self.sandbox_name}...") | ||
| if not self.pod_name: | ||
| raise RuntimeError( | ||
| "Cannot start port-forwarding, sandbox pod name is not known.") | ||
| logging.info( | ||
| f"Starting port-forwarding for sandbox {self.sandbox_name} in namespace {self.namespace} with sandbox pod {self.pod_name}...") | ||
| self.port_forward_process = subprocess.Popen( | ||
| [ | ||
| "kubectl", "port-forward", | ||
| f"pod/{self.sandbox_name}", | ||
| f"pod/{self.pod_name}", | ||
| f"{self.server_port}:{self.server_port}", | ||
| "-n", self.namespace | ||
| ], | ||
|
|
@@ -165,14 +225,16 @@ def _start_and_wait_for_port_forward(self): | |
|
|
||
| try: | ||
| with socket.create_connection(("127.0.0.1", self.server_port), timeout=0.1): | ||
| logging.info(f"Port-forwarding is ready on port {self.server_port}.") | ||
| logging.info( | ||
| f"Port-forwarding is ready on port {self.server_port}.") | ||
| return | ||
| except (socket.timeout, ConnectionRefusedError): | ||
| time.sleep(0.2) # Wait before retrying | ||
|
|
||
| # If the loop finishes, it timed out | ||
| self.__exit__(None, None, None) | ||
| raise TimeoutError(f"Port-forwarding did not become ready within {self.port_forward_ready_timeout} seconds.") | ||
| raise TimeoutError( | ||
| f"Port-forwarding did not become ready within {self.port_forward_ready_timeout} seconds.") | ||
|
|
||
| def __enter__(self) -> 'SandboxClient': | ||
| """Creates the SandboxClaim resource and waits for the Sandbox to become ready.""" | ||
|
|
@@ -200,7 +262,8 @@ def __exit__(self, exc_type, exc_val, exc_tb): | |
| ) | ||
| except client.ApiException as e: | ||
| if e.status != 404: | ||
| logging.error(f"Error deleting sandbox claim: {e}", exc_info=True) | ||
| logging.error( | ||
| f"Error deleting sandbox claim: {e}", exc_info=True) | ||
|
|
||
| def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response: | ||
| """ | ||
|
|
@@ -210,22 +273,24 @@ def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response: | |
| if not self.is_ready(): | ||
| raise RuntimeError("Sandbox is not ready. Cannot send requests.") | ||
|
|
||
| url = f"http://127.0.0.1:{self.server_port}/{endpoint}" | ||
| url = f"{self.base_url}/{endpoint}" | ||
| try: | ||
| response = requests.request(method, url, **kwargs) | ||
| response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) | ||
| return response | ||
| except requests.exceptions.RequestException as e: | ||
| logging.error(f"Request to sandbox failed: {e}") | ||
| raise RuntimeError(f"Failed to communicate with the sandbox at {url}.") from e | ||
| raise RuntimeError( | ||
| f"Failed to communicate with the sandbox at {url}.") from e | ||
|
|
||
| def run(self, command: str, timeout: int = 60) -> ExecutionResult: | ||
| """ | ||
| Executes a shell command inside the running sandbox. | ||
| """ | ||
| payload = {"command": command} | ||
| response = self._request("POST", "execute", json=payload, timeout=timeout) | ||
|
|
||
| response = self._request( | ||
| "POST", "execute", json=payload, timeout=timeout) | ||
|
|
||
| response_data = response.json() | ||
| return ExecutionResult( | ||
| stdout=response_data['stdout'], | ||
|
|
@@ -253,4 +318,4 @@ def read(self, path: str) -> bytes: | |
| The base path for the download is the root of the sandbox's filesystem. | ||
| """ | ||
| response = self._request("GET", f"download/{path}") | ||
| return response.content | ||
| return response.content | ||
Uh oh!
There was an error while loading. Please reload this page.