Skip to content

Commit 853f6aa

Browse files
author
Suresh Natarajan
committed
feat: Add ECS Security Analysis Tool - Core Infrastructure (PR #1)
Add foundational security analysis tool with cluster listing functionality. This PR establishes the core infrastructure and module registration. Features: - Region validation and selection - ECS cluster listing with metadata - Interactive workflow for cluster selection - Comprehensive error handling - Module registration in main.py Files Added: - awslabs/ecs_mcp_server/api/security_analysis.py (223 lines) - awslabs/ecs_mcp_server/modules/security_analysis.py (155 lines) - tests/unit/test_security_analysis_api.py (168 lines) - tests/unit/test_security_analysis_module.py (147 lines) Files Modified: - awslabs/ecs_mcp_server/main.py (module registration) Testing: - 14 unit tests with 91% coverage (with branch coverage) - All tests pass - Follows all coding patterns from existing codebase Usage: analyze_ecs_security() # List clusters in default region analyze_ecs_security(region='us-west-2') # List in specific region Total: 693 lines (378 production + 315 tests)
1 parent 1512d91 commit 853f6aa

File tree

5 files changed

+695
-0
lines changed

5 files changed

+695
-0
lines changed
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
API for ECS security analysis operations.
17+
18+
This module provides functions for analyzing ECS cluster security configurations
19+
and generating security recommendations.
20+
"""
21+
22+
import logging
23+
import os
24+
from typing import Any, Dict
25+
26+
import boto3
27+
28+
from awslabs.ecs_mcp_server.api.resource_management import ecs_api_operation
29+
30+
logger = logging.getLogger(__name__)
31+
32+
33+
class RegionValidationError(Exception):
34+
"""Raised when an invalid AWS region is provided."""
35+
36+
pass
37+
38+
39+
def validate_region(region: str) -> None:
40+
"""
41+
Validate that the provided region is a valid AWS region for ECS.
42+
43+
Args:
44+
region: AWS region name to validate
45+
46+
Raises:
47+
RegionValidationError: If the region is not valid for ECS
48+
"""
49+
try:
50+
# Get list of available regions for ECS service
51+
available_regions = boto3.Session().get_available_regions("ecs")
52+
53+
if region not in available_regions:
54+
regions_list = ", ".join(sorted(available_regions))
55+
raise RegionValidationError(
56+
f"Invalid AWS region '{region}'. Must be one of: {regions_list}"
57+
)
58+
59+
logger.info(f"Region '{region}' validated successfully")
60+
except Exception as e:
61+
if isinstance(e, RegionValidationError):
62+
raise
63+
logger.error(f"Error validating region '{region}': {e}")
64+
raise RegionValidationError(f"Failed to validate region '{region}': {str(e)}") from e
65+
66+
67+
def get_target_region(region: str | None = None) -> str:
68+
"""
69+
Get the target AWS region for security analysis.
70+
71+
If region is provided, validates it and returns it.
72+
If region is None, uses AWS_REGION environment variable (defaults to 'us-east-1').
73+
74+
Args:
75+
region: Optional AWS region name
76+
77+
Returns:
78+
Validated AWS region name
79+
80+
Raises:
81+
RegionValidationError: If the region is invalid
82+
"""
83+
if region is None:
84+
# Get region from environment variable, default to us-east-1
85+
region = os.environ.get("AWS_REGION", "us-east-1")
86+
logger.info(f"No region specified, using region from environment: {region}")
87+
else:
88+
logger.info(f"Using specified region: {region}")
89+
90+
# Validate the region
91+
validate_region(region)
92+
93+
return region
94+
95+
96+
async def list_clusters_in_region(region: str) -> list[Dict[str, Any]]:
97+
"""
98+
List all ECS clusters in the specified region with their metadata.
99+
100+
Args:
101+
region: AWS region to list clusters from
102+
103+
Returns:
104+
List of cluster dictionaries with metadata
105+
106+
Raises:
107+
Exception: If listing clusters fails
108+
"""
109+
logger.info(f"Listing ECS clusters in region: {region}")
110+
111+
try:
112+
# List cluster ARNs
113+
list_response = await ecs_api_operation(api_operation="ListClusters", api_params={})
114+
115+
cluster_arns = list_response.get("clusterArns", [])
116+
117+
if not cluster_arns:
118+
logger.info(f"No clusters found in region {region}")
119+
return []
120+
121+
logger.info(f"Found {len(cluster_arns)} cluster(s) in region {region}")
122+
123+
# Describe clusters to get metadata
124+
describe_response = await ecs_api_operation(
125+
api_operation="DescribeClusters",
126+
api_params={
127+
"clusters": cluster_arns,
128+
"include": ["ATTACHMENTS", "SETTINGS", "STATISTICS", "TAGS"],
129+
},
130+
)
131+
132+
clusters = describe_response.get("clusters", [])
133+
134+
# Format cluster information
135+
all_clusters = []
136+
for cluster in clusters:
137+
cluster_info = {
138+
"cluster_name": cluster.get("clusterName"),
139+
"cluster_arn": cluster.get("clusterArn"),
140+
"status": cluster.get("status"),
141+
"running_tasks_count": cluster.get("runningTasksCount", 0),
142+
"pending_tasks_count": cluster.get("pendingTasksCount", 0),
143+
"active_services_count": cluster.get("activeServicesCount", 0),
144+
"registered_container_instances_count": cluster.get(
145+
"registeredContainerInstancesCount", 0
146+
),
147+
"tags": {tag["key"]: tag["value"] for tag in cluster.get("tags", [])},
148+
}
149+
all_clusters.append(cluster_info)
150+
151+
logger.info(f"Successfully retrieved metadata for {len(all_clusters)} cluster(s)")
152+
return all_clusters
153+
154+
except Exception as e:
155+
logger.error(f"Error listing clusters in region {region}: {e}")
156+
raise Exception(f"Failed to list clusters in region '{region}': {str(e)}") from e
157+
158+
159+
def format_cluster_list(clusters: list[Dict[str, Any]], region: str) -> str:
160+
"""
161+
Format a list of clusters for user selection.
162+
163+
Args:
164+
clusters: List of cluster dictionaries
165+
region: AWS region name
166+
167+
Returns:
168+
Formatted string with cluster information
169+
"""
170+
if not clusters:
171+
return f"""
172+
No ECS clusters found in region: {region}
173+
174+
To create a cluster, you can use the AWS CLI:
175+
```bash
176+
aws ecs create-cluster --cluster-name my-cluster --region {region}
177+
```
178+
179+
Or use the AWS Console to create a cluster in the ECS service.
180+
"""
181+
182+
# Build the formatted output
183+
lines = [
184+
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
185+
f"📋 ECS CLUSTERS IN REGION: {region}",
186+
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
187+
"",
188+
f"Found {len(clusters)} cluster(s):",
189+
"",
190+
]
191+
192+
for i, cluster in enumerate(clusters, 1):
193+
cluster_name = cluster.get("cluster_name", "Unknown")
194+
status = cluster.get("status", "Unknown")
195+
running_tasks = cluster.get("running_tasks_count", 0)
196+
active_services = cluster.get("active_services_count", 0)
197+
198+
lines.extend(
199+
[
200+
f"{i}. {cluster_name}",
201+
f" Status: {status}",
202+
f" Running Tasks: {running_tasks}",
203+
f" Active Services: {active_services}",
204+
"",
205+
]
206+
)
207+
208+
lines.extend(
209+
[
210+
"━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━",
211+
"",
212+
"To analyze a specific cluster, call this tool again with:",
213+
" cluster_names=['cluster-name']",
214+
"",
215+
"Example:",
216+
f" analyze_ecs_security("
217+
f"cluster_names=['{clusters[0].get('cluster_name')}'], "
218+
f"region='{region}')",
219+
"",
220+
]
221+
)
222+
223+
return "\n".join(lines)

src/ecs-mcp-server/awslabs/ecs_mcp_server/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
deployment_status,
3232
infrastructure,
3333
resource_management,
34+
security_analysis,
3435
troubleshooting,
3536
)
3637
from awslabs.ecs_mcp_server.utils.config import get_config
@@ -134,6 +135,7 @@ def _create_ecs_mcp_server() -> Tuple[FastMCP, Dict[str, Any]]:
134135
resource_management.register_module(mcp)
135136
troubleshooting.register_module(mcp)
136137
delete.register_module(mcp)
138+
security_analysis.register_module(mcp)
137139

138140
# Register all proxies
139141
aws_knowledge_proxy.register_proxy(mcp)
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""
16+
Security Analysis module for ECS MCP Server.
17+
This module provides tools and prompts for analyzing ECS security configurations.
18+
"""
19+
20+
import logging
21+
22+
from fastmcp import FastMCP
23+
from pydantic import Field
24+
25+
from awslabs.ecs_mcp_server.api.security_analysis import (
26+
format_cluster_list,
27+
get_target_region,
28+
list_clusters_in_region,
29+
)
30+
31+
logger = logging.getLogger(__name__)
32+
33+
34+
def register_module(mcp: FastMCP) -> None:
35+
"""Register security analysis module tools and prompts with the MCP server."""
36+
37+
# Define pydantic Field descriptions for all parameters
38+
region_field = Field(
39+
default=None,
40+
description=(
41+
"Optional AWS region to analyze. "
42+
"If not provided, uses the AWS_REGION environment variable (defaults to 'us-east-1'). "
43+
"Must be a valid AWS region where ECS is available. "
44+
"Example: 'us-west-2', 'eu-west-1'"
45+
),
46+
)
47+
48+
@mcp.tool(name="analyze_ecs_security", annotations=None)
49+
async def mcp_analyze_ecs_security(
50+
region: str | None = region_field,
51+
) -> str:
52+
"""
53+
List ECS clusters available for security analysis.
54+
55+
This tool lists all ECS clusters in the specified AWS region, providing
56+
an overview of available clusters that can be analyzed for security issues.
57+
58+
Interactive Workflow:
59+
60+
Step 1: List Available Clusters
61+
- Call with NO parameters to list clusters in default region
62+
- Call with region parameter to list clusters in specific region
63+
- Returns formatted list of available clusters with metadata
64+
65+
Step 2: User Selects Clusters (Future)
66+
- User reviews the list and selects which clusters to analyze
67+
- Future versions will support cluster_names parameter for analysis
68+
69+
Usage Examples:
70+
71+
Example 1 - List clusters in default region:
72+
analyze_ecs_security()
73+
# Returns list of clusters in AWS_REGION for selection
74+
75+
Example 2 - List clusters in specific region:
76+
analyze_ecs_security(region="us-west-2")
77+
# Returns list of clusters in us-west-2 for selection
78+
79+
Cluster Information Provided:
80+
- Cluster name and ARN
81+
- Current status (ACTIVE, INACTIVE, etc.)
82+
- Running tasks count
83+
- Active services count
84+
- Registered container instances count
85+
- Resource tags
86+
87+
Parameters:
88+
region: Optional AWS region. If None, uses AWS_REGION environment variable.
89+
90+
Returns:
91+
Formatted list of available clusters for selection
92+
93+
Error Handling:
94+
- Invalid region: Returns error message with list of valid regions
95+
- No clusters found: Returns helpful message with cluster creation guidance
96+
"""
97+
try:
98+
# Step 1: Determine target region
99+
logger.info("Step 1: Determining target region")
100+
target_region = get_target_region(region)
101+
102+
# Step 2: List clusters for user selection
103+
logger.info(f"Step 2: Listing clusters in region '{target_region}' for user selection")
104+
clusters = await list_clusters_in_region(target_region)
105+
return format_cluster_list(clusters, target_region)
106+
107+
except Exception as e:
108+
import traceback
109+
110+
error_msg = f"Error during security analysis: {str(e)}"
111+
logger.error(error_msg)
112+
logger.error(f"Traceback: {traceback.format_exc()}")
113+
return f"❌ {error_msg}\n\nDetailed error:\n{traceback.format_exc()}"
114+
115+
# Register prompt patterns for security analysis
116+
117+
@mcp.prompt("analyze ecs security")
118+
def analyze_ecs_security_prompt():
119+
"""User wants to analyze ECS security"""
120+
return ["analyze_ecs_security"]
121+
122+
@mcp.prompt("check ecs security")
123+
def check_ecs_security_prompt():
124+
"""User wants to check ECS security"""
125+
return ["analyze_ecs_security"]
126+
127+
@mcp.prompt("ecs security audit")
128+
def ecs_security_audit_prompt():
129+
"""User wants to perform an ECS security audit"""
130+
return ["analyze_ecs_security"]
131+
132+
@mcp.prompt("security best practices")
133+
def security_best_practices_prompt():
134+
"""User wants to check security best practices"""
135+
return ["analyze_ecs_security"]
136+
137+
@mcp.prompt("security recommendations")
138+
def security_recommendations_prompt():
139+
"""User wants security recommendations"""
140+
return ["analyze_ecs_security"]
141+
142+
@mcp.prompt("scan ecs clusters")
143+
def scan_ecs_clusters_prompt():
144+
"""User wants to scan ECS clusters for security issues"""
145+
return ["analyze_ecs_security"]
146+
147+
@mcp.prompt("ecs security scan")
148+
def ecs_security_scan_prompt():
149+
"""User wants to perform an ECS security scan"""
150+
return ["analyze_ecs_security"]
151+
152+
@mcp.prompt("list ecs clusters")
153+
def list_ecs_clusters_prompt():
154+
"""User wants to list ECS clusters"""
155+
return ["analyze_ecs_security"]

0 commit comments

Comments
 (0)