Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 167 additions & 32 deletions src/openstack_mcp_server/tools/network_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
Port,
Router,
RouterInterface,
SecurityGroup,
SecurityGroupRule,
Subnet,
)

Expand Down Expand Up @@ -56,6 +58,11 @@ def register_tools(self, mcp: FastMCP):
mcp.tool()(self.add_router_interface)
mcp.tool()(self.get_router_interfaces)
mcp.tool()(self.remove_router_interface)
mcp.tool()(self.get_security_groups)
mcp.tool()(self.create_security_group)
mcp.tool()(self.get_security_group_detail)
mcp.tool()(self.update_security_group)
mcp.tool()(self.delete_security_group)

def get_networks(
self,
Expand Down Expand Up @@ -171,9 +178,9 @@ def update_network(

update_args = {}

if name is not None:
if name:
update_args["name"] = name
if description is not None:
if description:
update_args["description"] = description
if is_admin_state_up is not None:
update_args["admin_state_up"] = is_admin_state_up
Expand Down Expand Up @@ -305,11 +312,11 @@ def create_subnet(
"ip_version": ip_version,
"enable_dhcp": is_dhcp_enabled,
}
if name is not None:
if name:
subnet_args["name"] = name
if description is not None:
if description:
subnet_args["description"] = description
if gateway_ip is not None:
if gateway_ip:
subnet_args["gateway_ip"] = gateway_ip
if dns_nameservers is not None:
subnet_args["dns_nameservers"] = dns_nameservers
Expand Down Expand Up @@ -378,13 +385,13 @@ def update_subnet(
"""
conn = get_openstack_conn()
update_args: dict = {}
if name is not None:
if name:
update_args["name"] = name
if description is not None:
if description:
update_args["description"] = description
if clear_gateway:
update_args["gateway_ip"] = None
elif gateway_ip is not None:
elif gateway_ip:
update_args["gateway_ip"] = gateway_ip
if is_dhcp_enabled is not None:
update_args["enable_dhcp"] = is_dhcp_enabled
Expand Down Expand Up @@ -492,9 +499,9 @@ def set_port_binding(
"""
conn = get_openstack_conn()
update_args: dict = {}
if host_id is not None:
if host_id:
update_args["binding_host_id"] = host_id
if vnic_type is not None:
if vnic_type:
update_args["binding_vnic_type"] = vnic_type
if profile is not None:
update_args["binding_profile"] = profile
Expand Down Expand Up @@ -531,11 +538,11 @@ def create_port(
"network_id": network_id,
"admin_state_up": is_admin_state_up,
}
if name is not None:
if name:
port_args["name"] = name
if description is not None:
if description:
port_args["description"] = description
if device_id is not None:
if device_id:
port_args["device_id"] = device_id
if fixed_ips is not None:
port_args["fixed_ips"] = fixed_ips
Expand Down Expand Up @@ -604,13 +611,13 @@ def update_port(
"""
conn = get_openstack_conn()
update_args: dict = {}
if name is not None:
if name:
update_args["name"] = name
if description is not None:
if description:
update_args["description"] = description
if is_admin_state_up is not None:
update_args["admin_state_up"] = is_admin_state_up
if device_id is not None:
if device_id:
update_args["device_id"] = device_id
if security_group_ids is not None:
update_args["security_groups"] = security_group_ids
Expand Down Expand Up @@ -717,13 +724,13 @@ def create_floating_ip(
"""
conn = get_openstack_conn()
ip_args: dict = {"floating_network_id": floating_network_id}
if description is not None:
if description:
ip_args["description"] = description
if fixed_ip_address is not None:
if fixed_ip_address:
ip_args["fixed_ip_address"] = fixed_ip_address
if port_id is not None:
if port_id:
ip_args["port_id"] = port_id
if project_id is not None:
if project_id:
ip_args["project_id"] = project_id
ip = conn.network.create_ip(**ip_args)
return self._convert_to_floating_ip_model(ip)
Expand All @@ -744,7 +751,7 @@ def attach_floating_ip_to_port(
"""
conn = get_openstack_conn()
update_args: dict = {"port_id": port_id}
if fixed_ip_address is not None:
if fixed_ip_address:
update_args["fixed_ip_address"] = fixed_ip_address
ip = conn.network.update_ip(floating_ip_id, **update_args)
return self._convert_to_floating_ip_model(ip)
Expand Down Expand Up @@ -783,11 +790,11 @@ def update_floating_ip(
"""
conn = get_openstack_conn()
update_args: dict = {}
if description is not None:
if description:
update_args["description"] = description
if port_id is not None:
if port_id:
update_args["port_id"] = port_id
if fixed_ip_address is not None:
if fixed_ip_address:
update_args["fixed_ip_address"] = fixed_ip_address
else:
if clear_port:
Expand Down Expand Up @@ -947,13 +954,13 @@ def create_router(
"""
conn = get_openstack_conn()
router_args: dict = {"admin_state_up": is_admin_state_up}
if name is not None:
if name:
router_args["name"] = name
if description is not None:
if description:
router_args["description"] = description
if is_distributed is not None:
router_args["distributed"] = is_distributed
if project_id is not None:
if project_id:
router_args["project_id"] = project_id
if external_gateway_info is not None:
router_args["external_gateway_info"] = (
Expand Down Expand Up @@ -1008,9 +1015,9 @@ def update_router(
"""
conn = get_openstack_conn()
update_args: dict = {}
if name is not None:
if name:
update_args["name"] = name
if description is not None:
if description:
update_args["description"] = description
if is_admin_state_up is not None:
update_args["admin_state_up"] = is_admin_state_up
Expand Down Expand Up @@ -1114,9 +1121,9 @@ def remove_router_interface(
"""
conn = get_openstack_conn()
args: dict = {}
if subnet_id is not None:
if subnet_id:
args["subnet_id"] = subnet_id
if port_id is not None:
if port_id:
args["port_id"] = port_id
res = conn.network.remove_interface_from_router(router_id, **args)
return RouterInterface(
Expand Down Expand Up @@ -1161,6 +1168,134 @@ def _sanitize_server_filters(self, filters: dict) -> dict:
if not filters:
return {}
attrs = dict(filters)
# Remove client-only or unsupported filters
attrs.pop("status", None)
return attrs

def get_security_groups(
self,
project_id: str | None = None,
name: str | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이미 훌륭한 코드에 한 가지 제 생각을 덧붙여보자면, id필드가 있으면 더 좋지 않을까 생각이 듭니다.
id가 필요없을까? 생각에서 뻗어나갔는데요,
security group은 name을 중복값으로 생성할 수 있을텐데,
만약 조금 복잡하게 질문해서

  • 전제: 중복된 security name이 많음
  • 프롬프트: "instance중에 tag가 prod인 인스턴스들을 조회하고, 그 인스턴스들이 어떤 security group을 쓰는지 조회해줘"
    라고 했을 때,
    기존 프로젝트의 compute에서는 return은 id와 name 모두를 반환하므로 get_security_group_detail 함수에 id를 통해 정확히 조회해준다면 아름답겠지만, 자칫 get_security_groups의 name으로 조회하게된다면, 결과를 보장하기 힘들 것 같습니다.
    그런데 id필드가 있으면 적어도 이 케이스에 대해서는 안전할 것 같습니다

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

그러네요 ID 필드 추가하도록 하겠습니다!

id: str | None = None,
) -> list[SecurityGroup]:
"""
Get the list of Security Groups with optional filtering.

:param project_id: Filter by project ID
:param name: Filter by security group name
:param id: Filter by security group ID
:return: List of SecurityGroup objects
"""
conn = get_openstack_conn()
filters: dict = {}
if project_id:
filters["project_id"] = project_id
if name:
filters["name"] = name
if id:
filters["id"] = id
security_groups = conn.network.security_groups(**filters)
return [
self._convert_to_security_group_model(sg) for sg in security_groups
]

def create_security_group(
self,
name: str,
description: str | None = None,
project_id: str | None = None,
) -> SecurityGroup:
"""
Create a new Security Group.

:param name: Security group name
:param description: Security group description
:param project_id: Project ID to assign ownership
:return: Created SecurityGroup object
"""
conn = get_openstack_conn()
args: dict = {"name": name}
if description:
args["description"] = description
if project_id:
args["project_id"] = project_id
sg = conn.network.create_security_group(**args)
return self._convert_to_security_group_model(sg)

def get_security_group_detail(
self, security_group_id: str
) -> SecurityGroup:
"""
Get detailed information about a specific Security Group.

:param security_group_id: ID of the security group to retrieve
:return: SecurityGroup details
"""
conn = get_openstack_conn()
sg = conn.network.get_security_group(security_group_id)
return self._convert_to_security_group_model(sg)

def update_security_group(
self,
security_group_id: str,
name: str | None = None,
description: str | None = None,
) -> SecurityGroup:
"""
Update an existing Security Group.

:param security_group_id: ID of the security group to update
:param name: New security group name
:param description: New security group description
:return: Updated SecurityGroup object
"""
conn = get_openstack_conn()
update_args: dict = {}
if name:
update_args["name"] = name
if description:
update_args["description"] = description
if not update_args:
current = conn.network.get_security_group(security_group_id)
return self._convert_to_security_group_model(current)
sg = conn.network.update_security_group(
security_group_id, **update_args
)
return self._convert_to_security_group_model(sg)

def delete_security_group(self, security_group_id: str) -> None:
"""
Delete a Security Group.

:param security_group_id: ID of the security group to delete
:return: None
"""
conn = get_openstack_conn()
conn.network.delete_security_group(
security_group_id, ignore_missing=False
)
return None

def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup:
"""
Convert an OpenStack Security Group object to a SecurityGroup pydantic model.

:param openstack_sg: OpenStack security group object
:return: Pydantic SecurityGroup model
"""
rule_ids: list[str] | None = None
rules = getattr(openstack_sg, "security_group_rules", None)
if rules is not None:
dto_rules = [
SecurityGroupRule.model_validate(r, from_attributes=True)
for r in rules
]
rule_ids = [str(r.id) for r in dto_rules if getattr(r, "id", None)]

return SecurityGroup(
id=openstack_sg.id,
name=getattr(openstack_sg, "name", None),
status=getattr(openstack_sg, "status", None),
description=getattr(openstack_sg, "description", None),
project_id=getattr(openstack_sg, "project_id", None),
security_group_rule_ids=rule_ids,
)
Loading