Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 136 additions & 1 deletion src/openstack_mcp_server/tools/network_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Port,
Router,
RouterInterface,
SecurityGroup,
Subnet,
)

Expand Down Expand Up @@ -56,6 +57,11 @@ def register_tools(self, mcp: FastMCP):
mcp.tool()(self.add_router_interface)
mcp.tool()(self.get_router_interfaces)
mcp.tool()(self.remove_router_interface)
mcp.tool()(self.get_security_groups)
mcp.tool()(self.create_security_group)
mcp.tool()(self.get_security_group_detail)
mcp.tool()(self.update_security_group)
mcp.tool()(self.delete_security_group)

def get_networks(
self,
Expand Down Expand Up @@ -1161,6 +1167,135 @@ def _sanitize_server_filters(self, filters: dict) -> dict:
if not filters:
return {}
attrs = dict(filters)
# Remove client-only or unsupported filters
attrs.pop("status", None)
return attrs

def get_security_groups(
self,
project_id: str | None = None,
name: str | None = None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

이미 훌륭한 코드에 한 가지 제 생각을 덧붙여보자면, id필드가 있으면 더 좋지 않을까 생각이 듭니다.
id가 필요없을까? 생각에서 뻗어나갔는데요,
security group은 name을 중복값으로 생성할 수 있을텐데,
만약 조금 복잡하게 질문해서

  • 전제: 중복된 security name이 많음
  • 프롬프트: "instance중에 tag가 prod인 인스턴스들을 조회하고, 그 인스턴스들이 어떤 security group을 쓰는지 조회해줘"
    라고 했을 때,
    기존 프로젝트의 compute에서는 return은 id와 name 모두를 반환하므로 get_security_group_detail 함수에 id를 통해 정확히 조회해준다면 아름답겠지만, 자칫 get_security_groups의 name으로 조회하게된다면, 결과를 보장하기 힘들 것 같습니다.
    그런데 id필드가 있으면 적어도 이 케이스에 대해서는 안전할 것 같습니다

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

그러네요 ID 필드 추가하도록 하겠습니다!

) -> list[SecurityGroup]:
"""
Get the list of Security Groups with optional filtering.

:param project_id: Filter by project ID
:param name: Filter by security group name
:return: List of SecurityGroup objects
"""
conn = get_openstack_conn()
filters: dict = {}
if project_id:
filters["project_id"] = project_id
if name:
filters["name"] = name
security_groups = conn.network.security_groups(**filters)
return [
self._convert_to_security_group_model(sg) for sg in security_groups
]

def create_security_group(
self,
name: str,
description: str | None = None,
project_id: str | None = None,
) -> SecurityGroup:
"""
Create a new Security Group.

:param name: Security group name
:param description: Security group description
:param project_id: Project ID to assign ownership
:return: Created SecurityGroup object
"""
conn = get_openstack_conn()
args: dict = {"name": name}
if description is not None:
args["description"] = description
if project_id is not None:
args["project_id"] = project_id
sg = conn.network.create_security_group(**args)
return self._convert_to_security_group_model(sg)

def get_security_group_detail(
self, security_group_id: str
) -> SecurityGroup:
"""
Get detailed information about a specific Security Group.

:param security_group_id: ID of the security group to retrieve
:return: SecurityGroup details
"""
conn = get_openstack_conn()
sg = conn.network.get_security_group(security_group_id)
return self._convert_to_security_group_model(sg)

def update_security_group(
self,
security_group_id: str,
name: str | None = None,
description: str | None = None,
) -> SecurityGroup:
"""
Update an existing Security Group.

:param security_group_id: ID of the security group to update
:param name: New security group name
:param description: New security group description
:return: Updated SecurityGroup object
"""
conn = get_openstack_conn()
update_args: dict = {}
if name is not None:
update_args["name"] = name
if description is not None:
update_args["description"] = description
if not update_args:
current = conn.network.get_security_group(security_group_id)
return self._convert_to_security_group_model(current)
sg = conn.network.update_security_group(
security_group_id, **update_args
)
return self._convert_to_security_group_model(sg)

def delete_security_group(self, security_group_id: str) -> None:
"""
Delete a Security Group.

:param security_group_id: ID of the security group to delete
:return: None
"""
conn = get_openstack_conn()
conn.network.delete_security_group(
security_group_id, ignore_missing=False
)
return None

def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup:
"""
Convert an OpenStack Security Group object to a SecurityGroup pydantic model.

:param openstack_sg: OpenStack security group object
:return: Pydantic SecurityGroup model
"""
rule_ids: list[str] | None = None
rules = getattr(openstack_sg, "security_group_rules", None)
if rules is not None:
extracted: list[str] = []
for r in rules:
rid = None
if isinstance(r, dict):
rid = r.get("id")
else:
rid = getattr(r, "id", None)
Copy link
Collaborator

@S0okJu S0okJu Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getattr 으로 속성을 확인하신 이유가 궁금합니다.

Copy link
Collaborator Author

@platanus-kr platanus-kr Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security group에 rule이 없을때 None을 반환하는 로직입니다~

아래 승주님 의견받아 rule 객체로 변환하면서 해당 로직은 사라졌습니다!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private 함수 없이 Pydantic 객체로 변환하는게 불가능한가요?, 복잡한 변환이 아니여서 함수없이도 충분히 가능해보입니다.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

문자열 리스트가 아닌 보안그룹 규칙 객체로 반환하도록 개선하겠습니다!

if rid:
extracted.append(str(rid))
rule_ids = extracted

return SecurityGroup(
id=openstack_sg.id,
name=getattr(openstack_sg, "name", None),
status=getattr(openstack_sg, "status", None),
description=getattr(openstack_sg, "description", None),
project_id=getattr(openstack_sg, "project_id", None),
security_group_rule_ids=rule_ids,
)
118 changes: 118 additions & 0 deletions tests/tools/test_network_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Port,
Router,
RouterInterface,
SecurityGroup,
Subnet,
)

Expand Down Expand Up @@ -1368,6 +1369,123 @@ def test_update_reassign_bulk_and_auto_assign_floating_ip(
auto = tools.assign_first_available_floating_ip("ext-net", "port-9")
assert isinstance(auto, FloatingIP)

def test_get_security_groups_filters(self, mock_openstack_connect_network):
mock_conn = mock_openstack_connect_network

sg = Mock()
sg.id = "sg-1"
sg.name = "default"
sg.status = None
sg.description = "desc"
sg.project_id = "proj-1"
sg.security_group_rules = [
{"id": "r-1"},
{"id": "r-2"},
]
mock_conn.network.security_groups.return_value = [sg]

tools = self.get_network_tools()
res = tools.get_security_groups(project_id="proj-1", name="default")
assert res == [
SecurityGroup(
id="sg-1",
name="default",
status=None,
description="desc",
project_id="proj-1",
security_group_rule_ids=["r-1", "r-2"],
)
]
mock_conn.network.security_groups.assert_called_once_with(
project_id="proj-1", name="default"
)

def test_create_security_group(self, mock_openstack_connect_network):
mock_conn = mock_openstack_connect_network
sg = Mock()
sg.id = "sg-2"
sg.name = "web"
sg.status = None
sg.description = "for web"
sg.project_id = "proj-1"
sg.security_group_rules = []
mock_conn.network.create_security_group.return_value = sg

tools = self.get_network_tools()
res = tools.create_security_group(
name="web", description="for web", project_id="proj-1"
)
assert res == SecurityGroup(
id="sg-2",
name="web",
status=None,
description="for web",
project_id="proj-1",
security_group_rule_ids=[],
)
mock_conn.network.create_security_group.assert_called_once_with(
name="web", description="for web", project_id="proj-1"
)

def test_get_security_group_detail(self, mock_openstack_connect_network):
mock_conn = mock_openstack_connect_network
sg = Mock()
sg.id = "sg-3"
sg.name = "db"
sg.status = None
sg.description = None
sg.project_id = None
sg.security_group_rules = None
mock_conn.network.get_security_group.return_value = sg

tools = self.get_network_tools()
res = tools.get_security_group_detail("sg-3")
assert res.id == "sg-3"
mock_conn.network.get_security_group.assert_called_once_with("sg-3")

def test_update_security_group(self, mock_openstack_connect_network):
mock_conn = mock_openstack_connect_network
sg = Mock()
sg.id = "sg-4"
sg.name = "new-name"
sg.status = None
sg.description = "new-desc"
sg.project_id = None
sg.security_group_rules = []
mock_conn.network.update_security_group.return_value = sg

tools = self.get_network_tools()
res = tools.update_security_group(
security_group_id="sg-4", name="new-name", description="new-desc"
)
assert res.name == "new-name"
mock_conn.network.update_security_group.assert_called_once_with(
"sg-4", name="new-name", description="new-desc"
)

# No fields -> returns current
current = Mock()
current.id = "sg-5"
current.name = "cur"
current.status = None
current.description = None
current.project_id = None
current.security_group_rules = None
mock_conn.network.get_security_group.return_value = current
res2 = tools.update_security_group("sg-5")
assert res2.id == "sg-5"

def test_delete_security_group(self, mock_openstack_connect_network):
mock_conn = mock_openstack_connect_network
mock_conn.network.delete_security_group.return_value = None

tools = self.get_network_tools()
res = tools.delete_security_group("sg-6")
assert res is None
mock_conn.network.delete_security_group.assert_called_once_with(
"sg-6", ignore_missing=False
)

def test_get_routers_with_filters(self, mock_openstack_connect_network):
mock_conn = mock_openstack_connect_network

Expand Down