-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat(aws): CloudTrail timeline for findings #9101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Please add an entry to the corresponding |
|
✅ Conflict Markers Resolved All conflict markers have been successfully resolved in this pull request. |
prowler/providers/aws/services/cloudtrail/lib/enrichment/cloudtrail_enricher.py
Fixed
Show fixed
Hide fixed
🔒 Container Security ScanImage: 📊 Vulnerability Summary
2 package(s) affected
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #9101 +/- ##
===========================================
+ Coverage 58.94% 90.12% +31.18%
===========================================
Files 8 837 +829
Lines 397 25077 +24680
===========================================
+ Hits 234 22601 +22367
- Misses 163 2476 +2313
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
prowler/lib/check/check.py
Outdated
| ) | ||
|
|
||
|
|
||
| def _enrich_findings_post_scan(findings: list, output_options: Any) -> dict: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this name? Is this included in a class?
prowler/lib/scan/scan.py
Outdated
| from types import SimpleNamespace | ||
| from typing import Generator | ||
|
|
||
| from alive_progress import alive_bar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't add terminal UI things to the SDK. This is a CLI responsibility.
prowler/lib/scan/scan.py
Outdated
| _enrich_findings: bool = False | ||
| _enrichment_config: dict = None | ||
| _enricher = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about adding this to the Scan SDK. We need to think about it to have it as something external, executed afterwards.
We need to talk about the whole logic in this class. We need to move it somewhere else.
| RESOURCE_MODIFIED = "resource_modified" | ||
| RESOURCE_DELETED = "resource_deleted" | ||
|
|
||
| # EC2 Instance events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to add classes for each service?
| f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" | ||
| ) | ||
|
|
||
| def lookup_events_for_resource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we move the logic out of the Scan, maybe we'll need to create a new class for this part. We can't re-init the CloudTrail class. To me this would fit best in the enricher class.
prowler/providers/common/models.py
Outdated
| output_filename: str | ||
| only_logs: bool | ||
| unix_timestamp: bool | ||
| enrich_findings: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to find a way not to add more data to this object. The idea is to get rid of it as it makes some things dependant in between.
prowler/CHANGELOG.md
Outdated
| - `codepipeline_project_repo_private` check for AWS provider [(#5915)](https://github.com/prowler-cloud/prowler/pull/5915) | ||
| - `cloudstorage_bucket_versioning_enabled` check for GCP provider [(#9014)](https://github.com/prowler-cloud/prowler/pull/9014) | ||
| - `cloudstorage_bucket_soft_delete_enabled` check for GCP provider [(#9028)](https://github.com/prowler-cloud/prowler/pull/9028) | ||
| - CloudTrail enrichment for AWS provider [(#9101)](https://github.com/prowler-cloud/prowler/pull/9101) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd not add this until the feature is implemented as for now is only an SDK thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
puchy22
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As general comment the current AWS-specific design would require complete duplication for Azure (Activity Log) and GCP (Audit Logs). I've recommended an abstraction layer with provider-agnostic base classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the best place to put the code? You are not really using nothing for the service and it is not expected to be used by checks or services, for me it should be at provider level like other integrations like S3 or SecurityHub in prowler/providers/aws/lib
| ) | ||
| return None | ||
|
|
||
| def _create_enrichment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that this method is never used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed this one when refactoring.
|
|
||
| assert "1234abcd-12ab-34cd-56ef-1234567890ab" in message | ||
| assert "30" in message | ||
| assert "⚠️" in message or "DELETION SCHEDULED" in message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we should add this kind of emojis in the tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed!
| @@ -0,0 +1,1269 @@ | |||
| """CloudTrail enrichment service for Prowler findings. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am concerned about the sustainability of this file. I think it should be split into different files for each service and we should try to use another more sustainable way of adding new events that does not involve modifying a file with 1200 lines of code
| @@ -0,0 +1,1882 @@ | |||
| """Event message formatters for CloudTrail events. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as with cloudtrail_enricher.py, it is not sustainable having a class that is 1800 lines of code. It's so difficult to know what is the class exactly doing. Is there any way to divide it into other parts to make it more sustainable?
|
|
||
|
|
||
| # Union type for all event types | ||
| TimelineEventType = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are generating this union at module level and then only using it in the type hint of the TimelineEvent class.
Besides, this won't be a type as such, but rather a union. It seems like a strange way to do it, especially since it will only be used in one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100%, changed!
| ) | ||
|
|
||
|
|
||
| @dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why using dataclass instead of using pydantic BaseModel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, good catch.
| last_modified_at: datetime | None = None | ||
| related_resources: list[dict[str, str]] = field(default_factory=list) | ||
|
|
||
| def to_dict(self) -> dict[str, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using BaseModel saves you this method: https://docs.pydantic.dev/1.10/usage/exporting_models/#modeldict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, good catch!
| if not self.created_at: | ||
| return None | ||
|
|
||
| from datetime import timezone |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this here and not in the top of the file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, I did it inside the function because we only need to import it if we use this function.
| if not self.last_modified_at: | ||
| return None | ||
|
|
||
| from datetime import timezone |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
🔒 Container Security ScanImage: 📊 Vulnerability Summary
3 package(s) affected
|
AdriiiPRodri
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to add dedicated timeline serializers and hook them into the viewset so the timeline action follows our current response schema, because right now the endpoint isn’t aligned with it.
You also need to clean up the timeline action: validate filter[lookback_days], use the existing provider session initialization, remove the ad-hoc JSON body handling, and return errors through the usual DRF validation flow. Make sure the endpoint is documented with extend_schema, and if there are constants involved, they should live in the view as class-level attributes. Remove the global query-parameter override since it’s no longer needed.
Finally, add the required unittests: provider validation, lookback validation, and the happy path using mocked AWS components
api/src/backend/api/v1/views.py
Outdated
| from api.utils import initialize_prowler_provider | ||
| from prowler.providers.aws.lib.cloudtrail_timeline.cloudtrail_timeline import ( | ||
| CloudTrailTimeline, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we importing this here? I mean, this code runs on every call to this endpoint. This should be at the top of the file
api/src/backend/api/v1/views.py
Outdated
| url_name="timeline", | ||
| url_path=r"timeline(?:/(?P<lookback_days>[0-9]+))?", | ||
| ) | ||
| def timeline(self, request, pk=None, lookback_days=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MrCloudSec based on your tests, how long can this take?
@AdriiiPRodri @vicferpoy @josemazo @rakan-pro I'm not sure about having a synchronous endpoint to retrieve this data. From a design standpoint I think we'd have an async endpoint to enqueue a task to fetch that data. Then, another endpoint to get the status/result. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't currently know how long it takes us to retrieve that data, but if it can block, then I completely agree with you, this should be a task, and we should have an endpoint to fetch the results
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It takes one second or less to retrieve the timelin for each resource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if the timeline is more than 90 days? Since this depends on CloudTrail and the user's events, we need to make this async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the API side or in the SDK?
20e243d to
8e5b28a
Compare
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Context
Add CloudTrail timeline to AWS findings as an API-only programmatic utility that provides timeline context showing who created/modified resources and when.
Description
Introduces CloudTrail timeline capability that queries AWS CloudTrail Event History to add lifecycle context to security findings.
Key Features:
list[dict]for direct REST consumptionboto3.Sessionfor flexible credential managementAPI Usage:
Checklist
API
License
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.