-
Notifications
You must be signed in to change notification settings - Fork 6
feat: Add security group tools #86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
aea9490
986e03b
5bbca59
bc33054
14bfd70
7044f55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| Port, | ||
| Router, | ||
| RouterInterface, | ||
| SecurityGroup, | ||
| Subnet, | ||
| ) | ||
|
|
||
|
|
@@ -56,6 +57,11 @@ def register_tools(self, mcp: FastMCP): | |
| mcp.tool()(self.add_router_interface) | ||
| mcp.tool()(self.get_router_interfaces) | ||
| mcp.tool()(self.remove_router_interface) | ||
| mcp.tool()(self.get_security_groups) | ||
| mcp.tool()(self.create_security_group) | ||
| mcp.tool()(self.get_security_group_detail) | ||
| mcp.tool()(self.update_security_group) | ||
| mcp.tool()(self.delete_security_group) | ||
|
|
||
| def get_networks( | ||
| self, | ||
|
|
@@ -1161,6 +1167,135 @@ def _sanitize_server_filters(self, filters: dict) -> dict: | |
| if not filters: | ||
| return {} | ||
| attrs = dict(filters) | ||
| # Remove client-only or unsupported filters | ||
| attrs.pop("status", None) | ||
| return attrs | ||
|
|
||
| def get_security_groups( | ||
| self, | ||
| project_id: str | None = None, | ||
| name: str | None = None, | ||
| ) -> list[SecurityGroup]: | ||
| """ | ||
| Get the list of Security Groups with optional filtering. | ||
|
|
||
| :param project_id: Filter by project ID | ||
| :param name: Filter by security group name | ||
| :return: List of SecurityGroup objects | ||
| """ | ||
| conn = get_openstack_conn() | ||
| filters: dict = {} | ||
| if project_id: | ||
| filters["project_id"] = project_id | ||
| if name: | ||
| filters["name"] = name | ||
| security_groups = conn.network.security_groups(**filters) | ||
| return [ | ||
| self._convert_to_security_group_model(sg) for sg in security_groups | ||
| ] | ||
|
|
||
| def create_security_group( | ||
| self, | ||
| name: str, | ||
| description: str | None = None, | ||
| project_id: str | None = None, | ||
| ) -> SecurityGroup: | ||
| """ | ||
| Create a new Security Group. | ||
|
|
||
| :param name: Security group name | ||
| :param description: Security group description | ||
| :param project_id: Project ID to assign ownership | ||
| :return: Created SecurityGroup object | ||
| """ | ||
| conn = get_openstack_conn() | ||
| args: dict = {"name": name} | ||
| if description is not None: | ||
| args["description"] = description | ||
| if project_id is not None: | ||
halucinor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| args["project_id"] = project_id | ||
| sg = conn.network.create_security_group(**args) | ||
| return self._convert_to_security_group_model(sg) | ||
|
|
||
| def get_security_group_detail( | ||
| self, security_group_id: str | ||
| ) -> SecurityGroup: | ||
| """ | ||
| Get detailed information about a specific Security Group. | ||
|
|
||
| :param security_group_id: ID of the security group to retrieve | ||
| :return: SecurityGroup details | ||
| """ | ||
| conn = get_openstack_conn() | ||
| sg = conn.network.get_security_group(security_group_id) | ||
| return self._convert_to_security_group_model(sg) | ||
|
|
||
| def update_security_group( | ||
| self, | ||
| security_group_id: str, | ||
| name: str | None = None, | ||
| description: str | None = None, | ||
| ) -> SecurityGroup: | ||
| """ | ||
| Update an existing Security Group. | ||
|
|
||
| :param security_group_id: ID of the security group to update | ||
| :param name: New security group name | ||
| :param description: New security group description | ||
| :return: Updated SecurityGroup object | ||
| """ | ||
| conn = get_openstack_conn() | ||
| update_args: dict = {} | ||
| if name is not None: | ||
| update_args["name"] = name | ||
| if description is not None: | ||
| update_args["description"] = description | ||
halucinor marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if not update_args: | ||
| current = conn.network.get_security_group(security_group_id) | ||
| return self._convert_to_security_group_model(current) | ||
| sg = conn.network.update_security_group( | ||
| security_group_id, **update_args | ||
| ) | ||
| return self._convert_to_security_group_model(sg) | ||
|
|
||
| def delete_security_group(self, security_group_id: str) -> None: | ||
| """ | ||
| Delete a Security Group. | ||
|
|
||
| :param security_group_id: ID of the security group to delete | ||
| :return: None | ||
| """ | ||
| conn = get_openstack_conn() | ||
| conn.network.delete_security_group( | ||
| security_group_id, ignore_missing=False | ||
| ) | ||
| return None | ||
|
|
||
| def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: | ||
| """ | ||
| Convert an OpenStack Security Group object to a SecurityGroup pydantic model. | ||
|
|
||
| :param openstack_sg: OpenStack security group object | ||
| :return: Pydantic SecurityGroup model | ||
| """ | ||
| rule_ids: list[str] | None = None | ||
| rules = getattr(openstack_sg, "security_group_rules", None) | ||
| if rules is not None: | ||
| extracted: list[str] = [] | ||
| for r in rules: | ||
| rid = None | ||
| if isinstance(r, dict): | ||
| rid = r.get("id") | ||
| else: | ||
| rid = getattr(r, "id", None) | ||
|
||
| if rid: | ||
| extracted.append(str(rid)) | ||
| rule_ids = extracted | ||
|
|
||
| return SecurityGroup( | ||
| id=openstack_sg.id, | ||
| name=getattr(openstack_sg, "name", None), | ||
| status=getattr(openstack_sg, "status", None), | ||
| description=getattr(openstack_sg, "description", None), | ||
| project_id=getattr(openstack_sg, "project_id", None), | ||
| security_group_rule_ids=rule_ids, | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
이미 훌륭한 코드에 한 가지 제 생각을 덧붙여보자면, id필드가 있으면 더 좋지 않을까 생각이 듭니다.
id가 필요없을까? 생각에서 뻗어나갔는데요,
security group은 name을 중복값으로 생성할 수 있을텐데,
만약 조금 복잡하게 질문해서
라고 했을 때,
기존 프로젝트의 compute에서는 return은 id와 name 모두를 반환하므로 get_security_group_detail 함수에 id를 통해 정확히 조회해준다면 아름답겠지만, 자칫 get_security_groups의 name으로 조회하게된다면, 결과를 보장하기 힘들 것 같습니다.
그런데 id필드가 있으면 적어도 이 케이스에 대해서는 안전할 것 같습니다
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
그러네요 ID 필드 추가하도록 하겠습니다!