Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Security Analysis API for ECS MCP Server.

This module provides comprehensive security analysis for ECS clusters.
Minimal implementation for PR #1 - basic structure and imports.
"""

import logging
from datetime import datetime
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)


class DataAdapter:
"""
Adapter for collecting ECS data for security analysis.
Minimal implementation for PR #1.
"""

def __init__(self) -> None:
"""Initialize the DataAdapter."""
self.logger = logger


class SecurityAnalyzer:
"""
Basic ECS security analyzer.
Minimal implementation for PR #1.
"""

def __init__(self) -> None:
"""Initialize the SecurityAnalyzer."""
self.logger = logger


async def analyze_ecs_security(
cluster_names: Optional[List[str]] = None,
regions: Optional[List[str]] = None,
analysis_scope: Optional[str] = "basic",
) -> Dict[str, Any]:
"""
Perform comprehensive security analysis of ECS deployments.
Minimal implementation for PR #1.

Args:
cluster_names: Optional list of cluster names to analyze
regions: Optional list of regions to analyze
analysis_scope: Scope of analysis

Returns:
Dictionary containing security analysis results
"""
try:
logger.info("ECS security analysis - minimal implementation")

return {
"recommendations": [],
"total_issues": 0,
"analysis_summary": {
"severity_breakdown": {"Critical": 0, "High": 0, "Medium": 0, "Low": 0},
"category_breakdown": {},
"total_recommendations": 0,
},
"timestamp": datetime.utcnow().isoformat(),
}

except Exception as e:
logger.error(f"Error in analyze_ecs_security: {e}")
return {
"error": str(e),
"recommendations": [],
"total_issues": 0,
"analysis_summary": {},
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Security analysis module for ECS MCP Server.
This module provides tools for comprehensive ECS security analysis.
"""

import logging
from typing import Any, Dict, List, Optional

from fastmcp import FastMCP

logger = logging.getLogger(__name__)


def register_module(mcp: FastMCP) -> None:
"""Register security analysis module tools with the MCP server."""

@mcp.tool(name="analyze_ecs_security")
async def mcp_analyze_ecs_security(
cluster_names: Optional[List[str]] = None,
regions: Optional[List[str]] = None,
analysis_scope: Optional[str] = "basic",
) -> Dict[str, Any]:
"""
Perform comprehensive security analysis of ECS deployments.

This tool analyzes ECS clusters, services, task definitions, and network configurations
to identify security vulnerabilities, misconfigurations, and compliance issues.
It provides actionable recommendations following AWS security best practices.

Args:
cluster_names: Optional list of specific cluster names to analyze
regions: Optional list of AWS regions to analyze (defaults to us-east-1)
analysis_scope: Scope of analysis (currently 'basic' only)

Returns:
Dictionary containing security analysis results with findings and recommendations
"""
try:
logger.info("Starting ECS security analysis (minimal implementation)")

# Minimal placeholder implementation for PR #1
return {
"recommendations": [],
"total_issues": 0,
"analysis_summary": {
"severity_breakdown": {"Critical": 0, "High": 0, "Medium": 0, "Low": 0},
"category_breakdown": {},
"total_recommendations": 0,
},
"timestamp": "2024-01-01T00:00:00Z",
"status": "success",
"message": "Security analysis module registered successfully",
}

except Exception as e:
logger.error(f"Error in security analysis: {e}")
return {
"error": f"Security analysis failed: {str(e)}",
"recommendations": [],
"total_issues": 0,
"analysis_summary": {},
}
173 changes: 173 additions & 0 deletions src/ecs-mcp-server/tests/unit/test_security_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Unit tests for security analysis API - minimal implementation for PR #1.
"""

from unittest.mock import MagicMock, patch

import pytest

from awslabs.ecs_mcp_server.api.security_analysis import (
DataAdapter,
SecurityAnalyzer,
analyze_ecs_security,
)
from awslabs.ecs_mcp_server.modules.security_analysis import register_module


class TestDataAdapter:
"""Test cases for DataAdapter class - minimal implementation."""

def test_init(self) -> None:
"""Test DataAdapter initialization."""
adapter = DataAdapter()
assert adapter.logger is not None


class TestSecurityAnalyzer:
"""Test cases for SecurityAnalyzer class - minimal implementation."""

def test_init(self) -> None:
"""Test SecurityAnalyzer initialization."""
analyzer = SecurityAnalyzer()
assert analyzer.logger is not None


class TestAnalyzeEcsSecurity:
"""Test cases for analyze_ecs_security function - minimal implementation."""

@pytest.mark.anyio
async def test_analyze_ecs_security_basic(self) -> None:
"""Test basic security analysis functionality."""
result = await analyze_ecs_security()

assert "recommendations" in result
assert "total_issues" in result
assert "analysis_summary" in result
assert "timestamp" in result

assert result["recommendations"] == []
assert result["total_issues"] == 0
assert isinstance(result["analysis_summary"], dict)

@pytest.mark.anyio
async def test_analyze_ecs_security_with_params(self) -> None:
"""Test security analysis with parameters."""
result = await analyze_ecs_security(
cluster_names=["test-cluster"], regions=["us-east-1"], analysis_scope="basic"
)

assert "recommendations" in result
assert result["total_issues"] == 0

@pytest.mark.anyio
async def test_analyze_ecs_security_error_handling(self) -> None:
"""Test security analysis error handling."""
# Mock datetime to raise an exception
with patch("awslabs.ecs_mcp_server.api.security_analysis.datetime") as mock_datetime:
mock_datetime.utcnow.side_effect = Exception("Test exception")

result = await analyze_ecs_security()

assert "error" in result
assert "Test exception" in result["error"]
assert result["recommendations"] == []
assert result["total_issues"] == 0


class TestModuleRegistration:
"""Test cases for module registration."""

@pytest.mark.anyio
async def test_register_module(self) -> None:
"""Test module registration with MCP."""
mock_mcp = MagicMock()
mock_tool_decorator = MagicMock()
mock_mcp.tool.return_value = mock_tool_decorator

# Register the module
register_module(mock_mcp)

# Verify tool was registered
mock_mcp.tool.assert_called_once_with(name="analyze_ecs_security")

@pytest.mark.anyio
async def test_mcp_tool_functionality(self) -> None:
"""Test the MCP tool functionality."""
mock_mcp = MagicMock()
registered_tool = None

def capture_tool(name):
def decorator(func):
nonlocal registered_tool
registered_tool = func
return func

return decorator

mock_mcp.tool.side_effect = capture_tool

# Register the module
register_module(mock_mcp)

# Test the registered tool
assert registered_tool is not None
result = await registered_tool()

assert "recommendations" in result
assert "status" in result
assert result["status"] == "success"

@pytest.mark.anyio
async def test_mcp_tool_error_handling(self) -> None:
"""Test MCP tool error handling."""
mock_mcp = MagicMock()
registered_tool = None

def capture_tool(name):
def decorator(func):
nonlocal registered_tool
registered_tool = func
return func

return decorator

mock_mcp.tool.side_effect = capture_tool

# Register the module
register_module(mock_mcp)

# Mock logger to raise exception
with patch("awslabs.ecs_mcp_server.modules.security_analysis.logger") as mock_logger:
mock_logger.info.side_effect = Exception("Test exception")

result = await registered_tool()

assert "error" in result
assert "Test exception" in result["error"]


# Integration test placeholder
class TestIntegration:
"""Integration tests - minimal implementation."""

def test_module_imports(self) -> None:
"""Test that all modules import correctly."""
from awslabs.ecs_mcp_server.api import security_analysis
from awslabs.ecs_mcp_server.modules import security_analysis as security_module

assert security_analysis is not None
assert security_module is not None