diff --git a/panther_analysis_tool/analysis_utils.py b/panther_analysis_tool/analysis_utils.py index 8f267319..0816759d 100644 --- a/panther_analysis_tool/analysis_utils.py +++ b/panther_analysis_tool/analysis_utils.py @@ -106,28 +106,32 @@ def filter_analysis( logging.debug("auto-adding data model file %s", os.path.join(file_name)) filtered_analysis.append(ClassifiedAnalysis(file_name, dir_name, analysis_spec)) continue - match = True - for filt in filters: - key, values = filt.key, filt.values - spec_value = analysis_spec.get(key, "") - spec_value = spec_value if isinstance(spec_value, list) else [spec_value] - if not set(spec_value).intersection(values): - match = False - break - for filt in filters_inverted: - key, values = filt.key, filt.values - spec_value = analysis_spec.get(key, "") - spec_value = spec_value if isinstance(spec_value, list) else [spec_value] - if set(spec_value).intersection(values): - match = False - break - - if match: + + if filter_analysis_spec(analysis_spec, filters, filters_inverted): filtered_analysis.append(ClassifiedAnalysis(file_name, dir_name, analysis_spec)) + continue return filtered_analysis +def filter_analysis_spec( + analysis_spec: Dict[str, Any], filters: List[Filter], filters_inverted: List[Filter] +) -> bool: + for filt in filters: + key, values = filt.key, filt.values + spec_value = analysis_spec.get(key, "") + spec_value = spec_value if isinstance(spec_value, list) else [spec_value] + if not set(spec_value).intersection(values): + return False + for filt in filters_inverted: + key, values = filt.key, filt.values + spec_value = analysis_spec.get(key, "") + spec_value = spec_value if isinstance(spec_value, list) else [spec_value] + if set(spec_value).intersection(values): + return False + return True + + def load_analysis_specs( directories: List[str], ignore_files: List[str] ) -> Iterator[Tuple[str, str, Any, Any]]: @@ -178,6 +182,7 @@ class LoadAnalysisSpecsResult: analysis_spec: Any yaml_ctx: YAML error: Optional[Exception] + raw_file_content: Optional[bytes] def __eq__(self, other: object) -> bool: if not isinstance(other, LoadAnalysisSpecsResult): @@ -275,25 +280,24 @@ def load_analysis_specs_ex( for file in ignore_files: ignored_normalized.append(os.path.normpath(file)) + yaml = get_yaml_loader(roundtrip=roundtrip_yaml) loaded_specs: List[Any] = [] for directory in directories: - for relative_path, _, file_list in os.walk(directory): - # Skip hidden folders - if ( - relative_path.split("/")[-1].startswith(".") - and relative_path != "./" - and relative_path != "." - ): - continue + for dirpath, dirnames, filenames in os.walk(directory): + dirnames[:] = [ + dirname + for dirname in dirnames + if not dirname.startswith(".") and dirname != "__pycache__" + ] # If the user runs with no path args, filter to make sure # we only run folders with valid analysis files. Ensure we test # files in the current directory by not skipping this iteration # when relative_path is the current dir - if directory in [".", "./"] and relative_path not in [".", "./"]: + if directory in [".", "./"] and dirpath not in [".", "./"]: if not any( ( - fnmatch(relative_path, path_pattern) + fnmatch(dirpath, path_pattern) for path_pattern in ( DATA_MODEL_PATH_PATTERN, HELPERS_PATH_PATTERN, @@ -305,42 +309,44 @@ def load_analysis_specs_ex( ) ) ): - logging.debug("Skipping path %s", relative_path) + logging.debug("Skipping path %s", dirpath) continue - for filename in sorted(file_list): + for filename in sorted(filenames): # Skip hidden files if filename.startswith("."): continue - spec_filename = os.path.abspath(os.path.join(relative_path, filename)) + spec_filename = os.path.abspath(os.path.join(dirpath, filename)) # skip loading files that have already been imported if spec_filename in loaded_specs: continue # Dont load files that are explictly ignored - relative_name = os.path.normpath(os.path.join(relative_path, filename)) + relative_name = os.path.normpath(os.path.join(dirpath, filename)) if relative_name in ignored_normalized: logging.info("ignoring file %s", relative_name) continue loaded_specs.append(spec_filename) # setup yaml object - yaml = get_yaml_loader(roundtrip=roundtrip_yaml) if fnmatch(filename, "*.y*ml"): - with open(spec_filename, "r", encoding="utf-8") as spec_file_obj: + with open(spec_filename, "rb") as spec_file_obj: try: + file_content = spec_file_obj.read() yield LoadAnalysisSpecsResult( spec_filename=spec_filename, - relative_path=relative_path, - analysis_spec=yaml.load(spec_file_obj), + relative_path=dirpath, + analysis_spec=yaml.load(io.BytesIO(file_content)), yaml_ctx=yaml, error=None, + raw_file_content=file_content, ) except (YAMLParser.ParserError, YAMLScanner.ScannerError) as err: # recreate the yaml object and yield the error yield LoadAnalysisSpecsResult( spec_filename=spec_filename, - relative_path=relative_path, + relative_path=dirpath, analysis_spec=None, yaml_ctx=yaml, error=err, + raw_file_content=None, ) @@ -509,7 +515,7 @@ def load_analysis( valid_table_names: List[str], ignore_files: List[str], ignore_extra_keys: bool, -) -> Tuple[Any, List[Any]]: +) -> Tuple[ClassifiedAnalysisContainer, List[Any]]: """Loads each policy or rule into memory. Args: @@ -601,7 +607,7 @@ def classify_analysis( analysis_schema.validate(analysis_spec) # lookup the analysis type id and validate there aren't any conflicts - analysis_id = lookup_analysis_id(analysis_spec, analysis_type) + analysis_id = lookup_analysis_id(analysis_spec) if analysis_id in analysis_ids: raise AnalysisIDConflictException(analysis_id) # check for duplicates where panther expects a unique set @@ -696,7 +702,8 @@ def handle_wrong_key_error(err: schema.SchemaWrongKeyError, keys: list) -> Excep return exc -def lookup_analysis_id(analysis_spec: Any, analysis_type: str) -> str: +def lookup_analysis_id(analysis_spec: Any) -> str: + analysis_type = analysis_spec["AnalysisType"] analysis_id = "UNKNOWN_ID" if analysis_type == AnalysisTypes.DATA_MODEL: analysis_id = analysis_spec["DataModelID"] diff --git a/panther_analysis_tool/command/clone.py b/panther_analysis_tool/command/clone.py new file mode 100644 index 00000000..6c7aaeb6 --- /dev/null +++ b/panther_analysis_tool/command/clone.py @@ -0,0 +1,175 @@ +import io +import logging +import pathlib +from dataclasses import dataclass +from typing import Any, Callable, Iterator, List, Optional, Tuple + +from panther_analysis_tool.analysis_utils import ( + LoadAnalysisSpecsResult, + filter_analysis_spec, + get_yaml_loader, + load_analysis_specs_ex, + lookup_analysis_id, +) +from panther_analysis_tool.constants import AnalysisTypes +from panther_analysis_tool.core import analysis_cache +from panther_analysis_tool.core.formatter import analysis_spec_dump +from panther_analysis_tool.core.parse import parse_filter + + +def run(analysis_id: Optional[str], filters: List[str]) -> Tuple[int, str]: + clone_analysis(analysis_id, filters, lambda x: x) + return 0, "" + + +@dataclass +class CloneAnalysisResult: + analysis_spec: dict[str, Any] + file_bytes: Optional[bytes] + relative_path: str + + +def clone_analysis( + analysis_id: Optional[str], filters: List[str], mutator: Callable[[Any], Any] +) -> None: + yaml = get_yaml_loader(roundtrip=False) + cache = analysis_cache.AnalysisCache() + all_specs = [] + for _id in cache.list_spec_ids(): + analysis_spec = cache.get_latest_spec(_id) + if analysis_spec is None: + raise ValueError(f"Analysis spec not found for id: {_id}") + + loaded = yaml.load(io.BytesIO(analysis_spec.spec)) + all_specs.append( + CloneAnalysisResult( + relative_path=analysis_spec.file_path, + analysis_spec=loaded, + file_bytes=cache.get_file_for_spec(analysis_spec.id), + ) + ) + + if not all_specs: + logging.info("Nothing to clone") + return None + + existing_specs = load_analysis_specs_ex(["."], [], False) + + if analysis_id is not None: + return _clone_analysis_id(analysis_id, existing_specs, all_specs, mutator) + + return _clone_analysis_filters(filters, all_specs, mutator) + + +def _clone_analysis_filters( + filters: List[str], + all_specs: List[CloneAnalysisResult], + mutator: Optional[Callable[[Any], Any]], +) -> None: + _filters, _filters_inverted = parse_filter(filters) + filtered_specs = [ + spec + for spec in all_specs + if filter_analysis_spec(spec.analysis_spec, _filters, _filters_inverted) + ] + + if not filtered_specs: + logging.info("Nothing to clone") + + # Apply the filters as needed + for clone_spec in filtered_specs: + match clone_spec.analysis_spec["AnalysisType"]: + case ( + AnalysisTypes.RULE + | AnalysisTypes.SCHEDULED_RULE + | AnalysisTypes.SAVED_QUERY + | AnalysisTypes.SCHEDULED_QUERY + | AnalysisTypes.GLOBAL + | AnalysisTypes.CORRELATION_RULE + | AnalysisTypes.DATA_MODEL + | AnalysisTypes.LOOKUP_TABLE + | AnalysisTypes.POLICY + | AnalysisTypes.DERIVED + | AnalysisTypes.SIMPLE_DETECTION + ): + _create_clone(clone_spec, mutator, clone_spec.file_bytes) + case AnalysisTypes.PACK: + # ignore packs + pass + case _: + raise ValueError( + f"Unsupported analysis type: {clone_spec.analysis_spec['AnalysisType']}" + ) + + +def _clone_analysis_id( + analysis_id: str, + existing_specs: Iterator[LoadAnalysisSpecsResult], + all_specs: List[CloneAnalysisResult], + mutator: Callable[[Any], Any], +) -> None: + for existing_spec in existing_specs: + if lookup_analysis_id(existing_spec.analysis_spec) == analysis_id: + new_spec = mutator(existing_spec.analysis_spec) + with open(existing_spec.spec_filename, "wb") as updated_spec: + updated_spec.write(analysis_spec_dump(new_spec)) + logging.info("Updated existing %s in %s", analysis_id, existing_spec.spec_filename) + return + + for clone_spec in all_specs: + if lookup_analysis_id(clone_spec.analysis_spec) == analysis_id: + _create_clone(clone_spec, mutator, clone_spec.file_bytes) + return + + +def _create_clone( + spec: CloneAnalysisResult, + mutator: Optional[Callable[[Any], Any]], + file_bytes: Optional[bytes], +) -> None: + # create a copy of the spec, with the BaseVersion set to the current spec + new_spec = spec.analysis_spec + new_spec["BaseVersion"] = spec.analysis_spec.get("Version", 1) + if mutator is not None: + new_spec = mutator(new_spec) + + # create a new file + new_file_path = pathlib.Path(spec.relative_path) + + new_file_path.parent.mkdir(parents=True, exist_ok=True) + + with open(new_file_path, "wb") as new_file: + new_file.write(analysis_spec_dump(new_spec)) + + match spec.analysis_spec["AnalysisType"]: + case AnalysisTypes.SCHEDULED_RULE | AnalysisTypes.GLOBAL | AnalysisTypes.POLICY: + if file_bytes is None: + analysis_id = lookup_analysis_id(spec.analysis_spec) + analysis_type = spec.analysis_spec["AnalysisType"] + raise ValueError(f"File bytes are required for {analysis_type} {analysis_id}") + + spec_filename = spec.analysis_spec.get("Filename") + if spec_filename is None: + raise ValueError("Filename is required for rules") + # clone the .py file + filename = new_file_path.parent / spec_filename + with open(filename, "wb") as new_file: + new_file.write(file_bytes) + case ( + AnalysisTypes.RULE + | AnalysisTypes.SAVED_QUERY + | AnalysisTypes.SCHEDULED_QUERY + | AnalysisTypes.DATA_MODEL + ): + spec_filename = spec.analysis_spec.get("Filename") + if spec_filename is None: + return + if file_bytes is None: + raise ValueError("File bytes are required for rules") + + filename = new_file_path.parent / spec_filename + with open(filename, "wb") as new_file: + new_file.write(file_bytes) + + case _: + pass diff --git a/panther_analysis_tool/command/enable.py b/panther_analysis_tool/command/enable.py new file mode 100644 index 00000000..d653d428 --- /dev/null +++ b/panther_analysis_tool/command/enable.py @@ -0,0 +1,14 @@ +from typing import Any, List, Optional, Tuple + +from panther_analysis_tool.command.clone import clone_analysis +from panther_analysis_tool.constants import AnalysisTypes + + +def run(analysis_id: Optional[str], filters: List[str]) -> Tuple[int, str]: + def mutator(spec: Any) -> Any: + if spec["AnalysisType"] in [AnalysisTypes.RULE, AnalysisTypes.SCHEDULED_RULE]: + spec["Enabled"] = True + return spec + + clone_analysis(analysis_id, filters=filters, mutator=mutator) + return 0, "" diff --git a/panther_analysis_tool/command/fetch.py b/panther_analysis_tool/command/fetch.py new file mode 100644 index 00000000..54ecea33 --- /dev/null +++ b/panther_analysis_tool/command/fetch.py @@ -0,0 +1,128 @@ +import io +import json +import os +import pathlib +import shutil +import subprocess # nosec:B404 +import zipfile +from typing import Tuple + +import requests + +from panther_analysis_tool.analysis_utils import load_analysis_specs_ex +from panther_analysis_tool.constants import ( + CACHE_DIR, + PANTHER_ANALYSIS_URL, + AnalysisTypes, +) +from panther_analysis_tool.core import analysis_cache + +PANTHER_ANALYSIS_GITHUB_BRANCH = "dmiller-next" + + +def run() -> Tuple[int, str]: + # git clone latest panther-analysis + import_from_github_branch() + # import_from_github_release() + return 0, "Fetched" + + +def import_from_github_branch() -> None: + shutil.rmtree(os.path.join(CACHE_DIR), ignore_errors=True) + os.makedirs(CACHE_DIR, exist_ok=True) + + subprocess.run( # nosec:B607 B603 + [ + "git", + "clone", + "-b", + PANTHER_ANALYSIS_GITHUB_BRANCH, + "https://github.com/panther-labs/panther-analysis.git", + os.path.join(CACHE_DIR, "panther-analysis"), + ], + check=True, + ) + + shutil.rmtree(os.path.join(CACHE_DIR, "panther-analysis", "templates")) + shutil.rmtree(os.path.join(CACHE_DIR, "panther-analysis", "test_scenarios")) + import_sqlite() + + +def import_from_github_release() -> None: + response = requests.get(PANTHER_ANALYSIS_URL, timeout=10) + pa_zip_asset = None + for asset in response.json()["assets"]: + if asset["name"] == "panther-analysis-all.zip": + pa_zip_asset = asset + break + if pa_zip_asset is None: + raise ValueError("No panther-analysis-all.zip asset found") + + response = requests.get(pa_zip_asset["browser_download_url"], timeout=10) + + # unzip to CACHE_DIR + os.makedirs(CACHE_DIR, exist_ok=True) + with zipfile.ZipFile(io.BytesIO(response.content), "r") as zip_ref: + zip_ref.extractall(os.path.join(CACHE_DIR, "panther-analysis")) + + import_sqlite() + + +def import_sqlite() -> None: + cache = analysis_cache.AnalysisCache() + cache.create_tables() + + versions = {} + with open(os.path.join(CACHE_DIR, "panther-analysis", "version.json"), "rb") as version_file: + versions = json.load(version_file)["versions"] + + # load all analysis specs + for spec in load_analysis_specs_ex([CACHE_DIR], [], False): + if spec.error is not None: + continue + + if "AnalysisType" not in spec.analysis_spec: + raise ValueError(f"Analysis type not found in spec: {spec.analysis_spec}") + + content = None + match spec.analysis_type(): + case AnalysisTypes.RULE | AnalysisTypes.SCHEDULED_RULE | AnalysisTypes.CORRELATION_RULE: + id_field = "RuleID" + case AnalysisTypes.DATA_MODEL: + id_field = "DataModelID" + case AnalysisTypes.POLICY: + id_field = "PolicyID" + case AnalysisTypes.GLOBAL: + id_field = "GlobalID" + case AnalysisTypes.SCHEDULED_QUERY | AnalysisTypes.SAVED_QUERY: + id_field = "QueryName" + case AnalysisTypes.PACK: + continue + case AnalysisTypes.LOOKUP_TABLE: + id_field = "LookupName" + case _: + raise ValueError(f"Unsupported analysis type: {spec.analysis_type()}") + + filename = spec.analysis_spec.get("Filename") + if filename is not None: + file_path = pathlib.Path(spec.spec_filename).parent / filename + with open(file_path, "rb") as spec_file: + content = spec_file.read() + + file_id = None + if content is not None: + file_id = cache.insert_file(content) + + id_value = spec.analysis_spec.get(id_field) + relpath = pathlib.Path(spec.spec_filename).relative_to( + pathlib.Path(CACHE_DIR).absolute() / "panther-analysis" + ) + spec_version = versions[id_value]["version"] + spec_id = cache.insert_spec( + id_field, id_value, spec.raw_file_content, str(relpath), spec_version + ) + + if file_id is not None: + cache.insert_file_mapping(spec_id, file_id) + cache.conn.commit() + cache.conn.close() diff --git a/panther_analysis_tool/command/fmt.py b/panther_analysis_tool/command/fmt.py new file mode 100644 index 00000000..27f1472a --- /dev/null +++ b/panther_analysis_tool/command/fmt.py @@ -0,0 +1,19 @@ +from typing import Tuple + +from panther_analysis_tool.analysis_utils import load_analysis_specs_ex +from panther_analysis_tool.core.formatter import analysis_spec_dump + + +def run() -> Tuple[int, str]: + return format_specs() + + +def format_specs() -> Tuple[int, str]: + specs = load_analysis_specs_ex(["."], [], False) + + for spec in specs: + spec.analysis_spec = analysis_spec_dump(spec.analysis_spec, sort=True) + with open(spec.spec_filename, "wb") as spec_file: + spec_file.write(spec.analysis_spec) + + return 0, "" diff --git a/panther_analysis_tool/command/init_project.py b/panther_analysis_tool/command/init_project.py new file mode 100644 index 00000000..c2f65335 --- /dev/null +++ b/panther_analysis_tool/command/init_project.py @@ -0,0 +1,29 @@ +import os +from typing import Tuple + + +def run() -> Tuple[int, str]: + setup_folder_structure() + return 0, "Project initialized" + + +def setup_folder_structure() -> None: + if not os.path.exists(".gitignore"): + with open(".gitignore", "w", encoding="utf-8") as gitignore_file: + + gitignore_file.write("# Cache values\n") + gitignore_file.write(".cache/\n") + + gitignore_file.write("\n# Panther settings\n") + gitignore_file.write(".panther_settings.*\n") + + gitignore_file.write("\n# Python\n") + gitignore_file.write("__pycache__/\n") + gitignore_file.write("*.pyc\n") + + gitignore_file.write("\n# Panther\n") + gitignore_file.write("panther-analysis-*.zip\n") + + gitignore_file.write("\n# IDEs\n") + gitignore_file.write(".vscode/\n") + gitignore_file.write(".idea/\n") diff --git a/panther_analysis_tool/command/merge.py b/panther_analysis_tool/command/merge.py new file mode 100644 index 00000000..9885c4c3 --- /dev/null +++ b/panther_analysis_tool/command/merge.py @@ -0,0 +1,414 @@ +import ast +import io +import logging +import os +import pathlib +import sqlite3 +import subprocess # nosec:B404 +import tempfile +from dataclasses import dataclass +from typing import Any, Dict, Optional, Tuple + +import ruamel + +from panther_analysis_tool.analysis_utils import ( + LoadAnalysisSpecsResult, + get_yaml_loader, + load_analysis_specs_ex, + lookup_analysis_id, +) +from panther_analysis_tool.core import analysis_cache, editor, git +from panther_analysis_tool.core.formatter import analysis_spec_dump + + +class MergeError(Exception): + pass + + +def run(analysis_id: Optional[str], migrate: bool) -> Tuple[int, str]: + return merge_analysis(analysis_id, migrate) + + +def merge_analysis(analysis_id: Optional[str], migrate: bool) -> Tuple[int, str]: + yaml = get_yaml_loader(True) + + # load all analysis specs + existing_specs = list(load_analysis_specs_ex(["."], [], False)) + if not existing_specs: + print("Nothing to merge") + return 0, "" + + do_interactive_merge = analysis_id is None + update_count = 0 + merge_conflicts = [] + cache = analysis_cache.AnalysisCache() + cursor = cache.cursor + git_manager = git.GitManager() + + loader = SpecLoader(cursor, cache, git_manager, yaml) + + for user_spec in existing_specs: + base_analysis_id = lookup_analysis_id(user_spec.analysis_spec) + if analysis_id is not None and analysis_id != base_analysis_id: + # user specified an analysis id, only merge that one + continue + + load_specs = _load_specs(loader, user_spec, migrate) + if load_specs is None: + continue + + user_spec_bytes, base_version = strip_base_version(yaml, user_spec.analysis_spec.copy()) + + if base_version == load_specs.latest_version: + # already up to date + continue + + spec_conflict, spec_output = merge_yaml( + load_specs.base_spec_bytes, load_specs.latest_spec_bytes, user_spec_bytes + ) + file_conflict, file_output = False, bytes() + + # next check for conflicts in the file + file_content = load_file_of_spec(user_spec) + if file_content is not None: + if load_specs.base_file_content is None: + print(f"Base file for {base_analysis_id}@{base_version} not found, skipping") + continue + if load_specs.latest_file_content is None: + print(f"Latest file for {base_analysis_id}@{base_version} not found, skipping") + continue + + file_conflict, file_output = merge_bytes( + load_specs.base_file_content, load_specs.latest_file_content, file_content + ) + + if spec_conflict or file_conflict: + if not do_interactive_merge: + merge_conflicts.append(lookup_analysis_id(user_spec.analysis_spec)) + continue + + if spec_conflict: + output = resolve_yaml_conflict(spec_output) + if output is None: + continue + spec_output = output + if file_conflict: + output = resolve_python_conflict(file_output) + if output is None: + continue + file_output = output + + # update the base spec + merged_spec = yaml.load(spec_output.decode()) + merged_spec["BaseVersion"] = load_specs.latest_version + with open(user_spec.spec_filename, "wb") as spec_file: + spec_file.write(analysis_spec_dump(merged_spec, True)) + + # update the file + if file_output != bytes(): + update_file_of_spec(user_spec, file_output) + + update_count += 1 + + if update_count != 0: + print(f"{update_count} spec(s) updated with latest panther version") + print("run `git diff` to see the changes") + + if len(merge_conflicts) != 0: + print(f"{len(merge_conflicts)} merge conflict(s) found") + print("run `pat merge ` to resolve each conflict") + for conflict in merge_conflicts: + print(f" {conflict}") + + return 0, "" + + +@dataclass +class LoadSpecsResult: + base_spec_bytes: bytes + base_file_content: Optional[bytes] + latest_spec_bytes: bytes + latest_file_content: Optional[bytes] + latest_version: int + + +def _load_specs( + loader: "SpecLoader", user_spec: LoadAnalysisSpecsResult, migrate: bool +) -> Optional[LoadSpecsResult]: + base_spec_bytes, base_file_content = loader.load_base_spec(user_spec, migrate) + if base_spec_bytes is None: + logging.warning("Base version not found for %s, skipping", user_spec.spec_filename) + return None + + # find latest version of the spec + latest_base_spec_bytes, latest_file_content, latest_version = loader.load_latest_spec(user_spec) + if latest_base_spec_bytes is None: + if migrate: + _migrate_file(user_spec, loader.git_manager) + return None + base_analysis_id = lookup_analysis_id(user_spec.analysis_spec) + logging.warning("Latest version of %s not found, skipping", base_analysis_id) + return None + if latest_version is None: + logging.warning("Latest version of %s not found, skipping", base_analysis_id) + return None + return LoadSpecsResult( + base_spec_bytes, + base_file_content, + latest_base_spec_bytes, + latest_file_content, + latest_version, + ) + + +class SpecLoader: + def __init__( + self, + cursor: sqlite3.Cursor, + cache: analysis_cache.AnalysisCache, + git_manager: git.GitManager, + yaml: ruamel.yaml.YAML, + ): + self.cursor = cursor + self.cache = cache + self.git_manager = git_manager + self.yaml = yaml + + def load_base_spec( + self, spec: LoadAnalysisSpecsResult, use_git: bool + ) -> Tuple[Optional[bytes], Optional[bytes]]: + base_version = spec.analysis_spec.get("BaseVersion") + base_spec_bytes: Optional[bytes] = None + base_file_content: Optional[bytes] = None + if base_version is None: + if not use_git: + return None, None + + base_spec_bytes = get_base_file_from_git( + self.git_manager, pathlib.Path(spec.spec_filename) + ) + if base_spec_bytes is None: + return None, None + + base_spec = self.yaml.load(base_spec_bytes) + filename = base_spec.get("Filename") + if filename is None: + return base_spec_bytes, None + + # now fetch that file from git + filename = pathlib.Path(spec.spec_filename).parent / filename + base_file_content = get_base_file_from_git(self.git_manager, filename) + return base_spec_bytes, base_file_content + + # otherwise load the base spec from the database + analysis_id = lookup_analysis_id(spec.analysis_spec) + base_analysis_spec = self.cache.get_spec_for_version(analysis_id, base_version) + if base_analysis_spec is None: + return None, None + base_file_content = self.cache.get_file_for_spec(base_analysis_spec.id) + + return base_analysis_spec.spec, base_file_content + + def load_latest_spec( + self, spec: LoadAnalysisSpecsResult + ) -> Tuple[Optional[bytes], Optional[bytes], Optional[int]]: + analysis_id = lookup_analysis_id(spec.analysis_spec) + analysis_spec = self.cache.get_latest_spec(analysis_id) + if analysis_spec is None: + return None, None, None + + latest_file_content = self.cache.get_file_for_spec(analysis_spec.id) + + return analysis_spec.spec, latest_file_content, analysis_spec.version + + +def was_deleted_by_panther(git_manager: git.GitManager, filename: pathlib.Path) -> bool: + # see if the spec was deleted by panther + merge_base = git_manager.merge_base("HEAD") + + # normalize the filename for the git repo + if filename.is_absolute(): + filename = filename.relative_to(git_manager.git_root()) + + # get the panther commit + panther_commit = git_manager.panther_latest_release_commit() + + proc = subprocess.run( # nosec:B607 B603 + [ + "git", + "log", + "--diff-filter=D", + "--oneline", + "-1", + f"{merge_base}..{panther_commit}", + "--", + filename, + ], + check=False, + capture_output=True, + ) + return proc.stdout.decode().strip() != "" + + +def still_exists_in_panther(git_manager: git.GitManager, filename: pathlib.Path) -> Optional[bytes]: + # see if the spec still exists in panther + # normalize the filename for the git repo + if filename.is_absolute(): + filename = filename.relative_to(git_manager.git_root()) + + # get the panther commit + panther_commit = git_manager.panther_latest_release_commit() + + proc = subprocess.run( # nosec:B607 B603 + ["git", "show", f"{panther_commit}:{filename}"], check=False, capture_output=True + ) + if proc.returncode != 0: + return None + return proc.stdout.strip() + + +def _migrate_file(user_spec: LoadAnalysisSpecsResult, git_manager: git.GitManager) -> None: + spec_path = pathlib.Path(user_spec.spec_filename) + if user_spec.analysis_spec.get("AnalysisType") == "pack": + # just delete packs + os.remove(user_spec.spec_filename) + return + if was_deleted_by_panther(git_manager, spec_path): + # the spec was deleted by panther + os.remove(user_spec.spec_filename) + filename = user_spec.analysis_spec.get("Filename") + if filename is not None: + filename = pathlib.Path(user_spec.spec_filename).parent / filename + if was_deleted_by_panther(git_manager, filename): + os.remove(filename) + logging.info("Deleted %s because it was deleted by panther", user_spec.spec_filename) + return + if still_exists_in_panther(git_manager, spec_path): + # the spec still exists in panther. It is likely the ID changed, + # do the diff on it with the current content + # TODO: handle this + logging.info("Spec changed IDs :%s", user_spec.spec_filename) + + +def load_file_of_spec(spec: LoadAnalysisSpecsResult) -> Optional[bytes]: + file_name = spec.analysis_spec.get("Filename") + if file_name is not None: + path = pathlib.Path(spec.spec_filename).parent / file_name + with open(path, "rb") as spec_file: + return spec_file.read() + return None + + +def update_file_of_spec(spec: LoadAnalysisSpecsResult, file_content: bytes) -> None: + file_name = spec.analysis_spec.get("Filename") + if file_name is None: + raise ValueError(f"No file name found for spec {lookup_analysis_id(spec.analysis_spec)}") + path = pathlib.Path(spec.spec_filename).parent / file_name + with open(path, "wb") as spec_file: + spec_file.write(file_content) + + +def strip_base_version(yaml: ruamel.yaml.YAML, spec: Dict[str, Any]) -> Tuple[bytes, Optional[int]]: + version = spec.pop("BaseVersion", None) + bytes_io = io.BytesIO() + yaml.dump(spec, bytes_io) + return bytes_io.getvalue(), version + + +def merge_yaml(base: bytes, latest: bytes, user: bytes) -> Tuple[bool, bytes]: + base = analysis_spec_dump(base) + latest = analysis_spec_dump(latest) + user = analysis_spec_dump(user) + return merge_bytes(base, latest, user) + + +def merge_bytes(base: bytes, latest: bytes, user: bytes) -> Tuple[bool, bytes]: + # create a temp file for each + with ( + tempfile.NamedTemporaryFile(delete=False) as temp_file_base, + tempfile.NamedTemporaryFile(delete=False) as temp_file_latest, + tempfile.NamedTemporaryFile(delete=False) as temp_file_user, + ): + temp_file_base.write(base) + temp_file_base.flush() + + temp_file_latest.write(latest) + temp_file_latest.flush() + + temp_file_user.write(user) + temp_file_user.flush() + + proc = subprocess.run( # nosec:B607 B603 + [ + "git", + "merge-file", + "-p", + "-L", + "ours", + "-L", + "base", + "-L", + "panther", + temp_file_user.name, + temp_file_base.name, + temp_file_latest.name, + ], + check=False, + capture_output=True, + ) + + return proc.returncode != 0, proc.stdout + + +def resolve_yaml_conflict(merge_result: bytes) -> Optional[bytes]: + temp_spec = editor.edit_file(merge_result) + + if temp_spec == merge_result: + print("No changes made") + return None + + # todo validate the spec + yaml = get_yaml_loader(True) + _ = yaml.load(temp_spec.decode()) + + return temp_spec + + +def resolve_python_conflict(merge_result: bytes) -> Optional[bytes]: + output = editor.edit_file(merge_result) + + if output == merge_result: + print("No changes made") + return None + + # todo validate the file + try: + ast.parse(output.decode()) + except SyntaxError as error: + print("Invalid python file") + print(f' File "{error.filename}", line {error.lineno}') + print(f" {error.text.strip()}" if error.text else "") + print(f' {" " * (error.offset - 1)}^' if error.offset is not None else "") + print(f"SyntaxError: {error.msg}") + return None + + return output + + +def get_base_file_from_git(git_manager: git.GitManager, filename: pathlib.Path) -> bytes: + # get the base version from the git history + merge_base = git_manager.merge_base("HEAD") + + # normalize the filename for the git repo + if filename.is_absolute(): + filename = filename.relative_to(git_manager.git_root()) + + # now fetch that file from git + proc = subprocess.run( + ["git", "show", f"{merge_base}:{filename}"], + capture_output=True, + check=False, + ) # nosec:B607 B603 + if proc.returncode != 0: + raise MergeError(f"Failed to get base file from git: {proc.stderr.decode().strip()}") + return proc.stdout diff --git a/panther_analysis_tool/command/rev.py b/panther_analysis_tool/command/rev.py new file mode 100644 index 00000000..97744222 --- /dev/null +++ b/panther_analysis_tool/command/rev.py @@ -0,0 +1,71 @@ +from typing import Tuple + +from panther_analysis_tool.analysis_utils import get_yaml_loader +from panther_analysis_tool.core import analysis_cache, editor + + +def run(analysis_id: str) -> Tuple[int, str]: + return rev_analysis(analysis_id) + + +# pylint: disable=too-many-locals +def rev_analysis(analysis_id: str) -> Tuple[int, str]: + conn = analysis_cache.connect_to_cache() + cursor = conn.cursor() + cursor.execute( + "SELECT id, id_field, id_value, spec, file_path, version FROM analysis_specs WHERE UPPER(id_value) = ?" + " ORDER BY version DESC LIMIT 1", + (analysis_id.upper(),), + ) + spec_id, id_field, id_value, spec, file_path, version = cursor.fetchone() + if not id_field: + return 1, f"No spec found for {analysis_id}" + + # read the temp file and compare it to the original spec + temp_spec = editor.edit_file(spec) + + if temp_spec == spec: + print("No changes made") + # return 0, "No changes made" + + # now check for a file attachement + file_content = None + temp_file_content = None + cursor.execute("SELECT file_id FROM file_mappings WHERE spec_id = ? LIMIT 1", (spec_id,)) + row = cursor.fetchone() + if row is not None: + file_id = row[0] + cursor.execute("SELECT content FROM files WHERE id = ?", (file_id,)) + row = cursor.fetchone() + if row is not None: + file_content = row[0] + temp_file_content = editor.edit_file(file_content) + + if temp_spec == spec and (file_id is None or file_content == temp_file_content): + print("No changes made") + return 0, "No changes made" + + # bump the yaml version and write it back to the db + spec_version = version + 1 + + yaml = get_yaml_loader(False) + # must be valid yaml + _ = yaml.load(temp_spec) + new_spec_id = cursor.execute( + "INSERT INTO analysis_specs (id_field, id_value, spec, file_path, version) VALUES (?, ?, ?, ?, ?)", + (id_field, id_value, temp_spec, file_path, spec_version), + ).lastrowid + + if file_id is not None: + if temp_file_content != file_content: + file_id = cursor.execute( + "INSERT INTO files (content) VALUES (?)", (temp_file_content,) + ).lastrowid + cursor.execute( + "INSERT INTO file_mappings (spec_id, file_id) VALUES (?, ?)", (new_spec_id, file_id) + ) + + conn.commit() + conn.close() + print(f"Reved {id_field} = {id_value} version {spec_version}") + return 0, "Reved" diff --git a/panther_analysis_tool/constants.py b/panther_analysis_tool/constants.py index 3619e697..9a0b90fe 100644 --- a/panther_analysis_tool/constants.py +++ b/panther_analysis_tool/constants.py @@ -91,3 +91,10 @@ class ReplayStatus: ENABLE_CORRELATION_RULES_FLAG = "EnableCorrelationRules" + +CACHE_DIR = ".cache" + +DEFAULT_EDITOR = "vi" + +PANTHER_ANALYSIS_URL = "https://api.github.com/repos/panther-labs/panther-analysis/releases/latest" +PANTHER_ANALYSIS_SQLITE_FILE = "panther-analysis.sqlite" diff --git a/panther_analysis_tool/core/analysis_cache.py b/panther_analysis_tool/core/analysis_cache.py new file mode 100644 index 00000000..2cddcf0a --- /dev/null +++ b/panther_analysis_tool/core/analysis_cache.py @@ -0,0 +1,146 @@ +import dataclasses +import pathlib +import sqlite3 +from typing import Optional, Tuple + +from panther_analysis_tool.constants import CACHE_DIR, PANTHER_ANALYSIS_SQLITE_FILE + + +def connect_to_cache() -> sqlite3.Connection: + """ + Connect to the analysis cache database. + """ + SQLITE_FILE = pathlib.Path(CACHE_DIR) / PANTHER_ANALYSIS_SQLITE_FILE + return sqlite3.connect(SQLITE_FILE) + + +@dataclasses.dataclass +class AnalysisSpec: + id: int + spec: bytes + version: int + file_path: str + id_field: str + id_value: str + + +class AnalysisCache: + def __init__(self) -> None: + self.conn = connect_to_cache() + self.cursor = self.conn.cursor() + + def create_tables(self) -> None: + # get all tables + self.cursor.execute( + "CREATE TABLE IF NOT EXISTS analysis_specs (id INTEGER PRIMARY KEY AUTOINCREMENT, id_field TEXT," + " id_value TEXT, spec BLOB, file_path TEXT, version INTEGER);" + ) + # unique constrain on id_field, id_value and version + self.cursor.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_analysis_specs_unique ON analysis_specs (id_field, id_value, version);" + ) + + self.cursor.execute( + "CREATE TABLE IF NOT EXISTS files (id INTEGER PRIMARY KEY AUTOINCREMENT, content BLOB UNIQUE);" + ) + + self.cursor.execute( + "CREATE TABLE IF NOT EXISTS file_mappings (id INTEGER PRIMARY KEY AUTOINCREMENT, spec_id INTEGER," + " file_id INTEGER, FOREIGN KEY (spec_id) REFERENCES analysis_specs(id), FOREIGN KEY (file_id) REFERENCES files(id));" + ) + self.cursor.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS idx_file_mappings_unique ON file_mappings (spec_id, file_id);" + ) + + def list_spec_ids(self) -> list[str]: + """ + List all analysis specs in the cache directory. + """ + self.cursor.execute("SELECT DISTINCT id_value FROM analysis_specs") + return [row[0] for row in self.cursor.fetchall()] + + def get_file_for_spec(self, analysis_spec_id: int) -> Optional[bytes]: + """ + Get the file for a spec. + """ + row = self.cursor.execute( + "SELECT file_id FROM file_mappings WHERE spec_id = ?", (analysis_spec_id,) + ).fetchone() + if row is None: + return None + return self.get_file_by_id(row[0]) + + def get_file_by_id(self, file_id: int) -> Optional[bytes]: + """ + Get the file by id. + """ + row = self.cursor.execute("SELECT content FROM files WHERE id = ?", (file_id,)).fetchone() + if row is None: + return None + return row[0] + + def get_spec_for_version(self, analysis_id: str, base_version: int) -> Optional[AnalysisSpec]: + self.cursor.execute( + "SELECT id, spec, version, file_path, id_field, id_value FROM analysis_specs WHERE id_value = ? AND version = ?", + (analysis_id, base_version), + ) + row = self.cursor.fetchone() + if row is None: + return None + return AnalysisSpec( + id=row[0], + spec=row[1], + version=row[2], + file_path=row[3], + id_field=row[4], + id_value=row[5], + ) + + def get_latest_spec(self, analysis_id: str) -> Optional[AnalysisSpec]: + self.cursor.execute( + "SELECT id, spec, version, file_path, id_field, id_value FROM analysis_specs WHERE id_value = ? ORDER BY version DESC LIMIT 1", + (analysis_id,), + ) + row = self.cursor.fetchone() + if row is None: + return None + return AnalysisSpec( + id=row[0], + spec=row[1], + version=row[2], + file_path=row[3], + id_field=row[4], + id_value=row[5], + ) + + def insert_file(self, content: bytes) -> Optional[int]: + try: + file_id = self.cursor.execute( + "INSERT INTO files (content) VALUES (?);", (content,) + ).lastrowid + except sqlite3.IntegrityError: + file_id = self.cursor.execute( + "SELECT id FROM files WHERE content = ?;", (content,) + ).fetchone()[0] + return file_id + + def insert_spec( + self, + id_field: str, + id_value: str, + spec_content: Optional[bytes], + file_path: str, + spec_version: int, + ) -> int: + spec_id = self.cursor.execute( + "INSERT INTO analysis_specs (id_field, id_value, spec, file_path, version) VALUES (?, ?, ?, ?, ?);", + (id_field, id_value, spec_content, file_path, spec_version), + ).lastrowid + # make typechecker happy + assert spec_id is not None # nosec: B101 + return spec_id + + def insert_file_mapping(self, spec_id: int, file_id: int) -> None: + self.cursor.execute( + "INSERT INTO file_mappings (spec_id, file_id) VALUES (?, ?);", (spec_id, file_id) + ) diff --git a/panther_analysis_tool/core/editor.py b/panther_analysis_tool/core/editor.py new file mode 100644 index 00000000..742203fb --- /dev/null +++ b/panther_analysis_tool/core/editor.py @@ -0,0 +1,20 @@ +import os +import subprocess # nosec:B404 +import tempfile + +from panther_analysis_tool.constants import DEFAULT_EDITOR + + +def edit_file(contents: bytes) -> bytes: + # create a temp file and write the spec to it + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file.write(contents) + temp_file.flush() + + # launch the editor with the temp file and wait for it to finish + editor = os.getenv("EDITOR", DEFAULT_EDITOR) + subprocess.run([editor, temp_file.name], check=True) # nosec:B603 + + # read the temp file and compare it to the original spec + with open(temp_file.name, "rb") as f: + return f.read() diff --git a/panther_analysis_tool/core/formatter.py b/panther_analysis_tool/core/formatter.py new file mode 100644 index 00000000..720abec9 --- /dev/null +++ b/panther_analysis_tool/core/formatter.py @@ -0,0 +1,151 @@ +import io +from typing import Any, Iterable + +from ruamel.yaml import YAML +from ruamel.yaml.comments import CommentedMap, CommentedSeq +from ruamel.yaml.scalarstring import DoubleQuotedScalarString, FoldedScalarString + +from panther_analysis_tool.constants import AnalysisTypes + +_yaml = YAML(typ="safe") +_yaml.indent(mapping=2, sequence=4, offset=2) + +_rt_yaml = YAML(typ="rt") +_rt_yaml.indent(mapping=2, sequence=4, offset=2) +_rt_yaml.width = 80 + + +def analysis_spec_dump(data: Any, sort: bool = True) -> bytes: + if isinstance(data, bytes): + data = _yaml.load(io.BytesIO(data)) + elif isinstance(data, str): + data = _yaml.load(io.StringIO(data)) + if sort: + data = sort_yaml(data) + data = {k: v.strip() if isinstance(v, str) else v for k, v in data.items()} + data = analysis_spec_to_ruamel(data) + dumped = io.BytesIO() + _rt_yaml.dump(data, dumped) + return dumped.getvalue() + + +def analysis_spec_to_ruamel(data: Any) -> Any: + if not isinstance(data, dict): + return data + + match data["AnalysisType"]: + case AnalysisTypes.RULE | AnalysisTypes.CORRELATION_RULE | AnalysisTypes.SCHEDULED_RULE: + return rule_to_ruamel(data) + case AnalysisTypes.GLOBAL: + return global_to_ruamel(data) + case AnalysisTypes.DATA_MODEL: + return datamodel_to_ruamel(data) + case AnalysisTypes.POLICY: + return policy_to_ruamel(data) + case AnalysisTypes.SAVED_QUERY | AnalysisTypes.SCHEDULED_QUERY: + return query_to_ruamel(data) + case AnalysisTypes.LOOKUP_TABLE: + return lookuptable_to_ruamel(data) + case _: + return data + + +def rule_to_ruamel(data: Any) -> Any: + for key, value in data.items(): + if key in ["Description", "Runbook"]: + if isinstance(value, str): + data[key] = FoldedScalarString(value) + if key in ["Tests"]: + format_tests(value) + if key in ["DisplayName", "RuleID"]: + data[key] = DoubleQuotedScalarString(value) + return data + + +def global_to_ruamel(data: Any) -> Any: + for key, value in data.items(): + if key in ["Description"]: + if isinstance(value, str): + data[key] = FoldedScalarString(value) + if key in ["GlobalID"]: + data[key] = DoubleQuotedScalarString(value) + return data + + +def datamodel_to_ruamel(data: Any) -> Any: + for key, value in data.items(): + if key in ["DataModelID", "DisplayName"]: + data[key] = DoubleQuotedScalarString(value) + return data + + +def policy_to_ruamel(data: Any) -> Any: + for key, value in data.items(): + if key in ["Description", "Runbook"]: + if isinstance(value, str): + data[key] = FoldedScalarString(value) + if key in ["Tests"]: + format_tests(value) + if key in ["PolicyID", "DisplayName"]: + data[key] = DoubleQuotedScalarString(value) + return data + + +def query_to_ruamel(data: Any) -> Any: + for key, value in data.items(): + if key in ["Query"]: + if isinstance(value, str): + data[key] = FoldedScalarString(value) + if key in ["Tests"]: + format_tests(value) + if key in ["QueryName"]: + data[key] = DoubleQuotedScalarString(value) + return data + + +def lookuptable_to_ruamel(data: Any) -> Any: + for key, value in data.items(): + if key in ["LookupName"]: + data[key] = DoubleQuotedScalarString(value) + return data + + +def format_tests(tests: Iterable[Any]) -> Any: + for test in tests: + if isinstance(test, dict): + for key, value in test.items(): + if key in ["Log", "Resource"]: + test[key] = to_inline_map(value) + + format_tests(test) + elif isinstance(test, list): + format_tests(test) + + +def to_inline_map(data: Any) -> Any: + if isinstance(data, dict): + m = CommentedMap() + m.fa.set_flow_style() + m.fa.set_block_style() + for k, v in data.items(): + m[to_inline_map(k)] = to_inline_map(v) + return m + elif isinstance(data, list): + s = CommentedSeq() + for v in data: + s.append(to_inline_map(v)) + return s + elif isinstance(data, str): + return DoubleQuotedScalarString(data) + return data + + +def sort_yaml(data: Any) -> Any: + if isinstance(data, dict): + keys = sorted(data.keys()) + return {k: sort_yaml(data[k]) for k in keys} + elif isinstance(data, list): + return [sort_yaml(v) for v in data] + elif isinstance(data, tuple): + return tuple(sort_yaml(v) for v in data) + return data diff --git a/panther_analysis_tool/core/git.py b/panther_analysis_tool/core/git.py new file mode 100644 index 00000000..5691e264 --- /dev/null +++ b/panther_analysis_tool/core/git.py @@ -0,0 +1,84 @@ +import subprocess # nosec:B404 +from typing import Optional + +PANTHER_HTTPS_PATH = "https://github.com/panther-labs/panther-analysis.git" +PANTHER_SSH_PATH = "git@github.com:panther-labs/panther-analysis.git" +PANTHER_PRIMARY_BRANCH = "main" + + +class GitManager: + def __init__(self) -> None: + self._panther_remote: Optional[str] = None + self._git_root: Optional[str] = None + + def panther_latest_release_commit(self) -> str: + """ + Get the panther commit hash + """ + return f"{self.panther_remote()}/{PANTHER_PRIMARY_BRANCH}" + + def panther_remote(self) -> str: + if self._panther_remote is not None: + return self._panther_remote + + panther_remote_output = subprocess.run( # nosec:B607 B603 + ["git", "remote", "-v"], capture_output=True, text=True + ) + if panther_remote_output.returncode != 0: + raise Exception(f"Failed to get panther remote: {panther_remote_output.stderr}") + panther_remote_stdout = panther_remote_output.stdout + for line in panther_remote_stdout.split("\n"): + name, url, operation = line.strip().split() + if url in [PANTHER_HTTPS_PATH, PANTHER_SSH_PATH] and operation == "(fetch)": + self._panther_remote = name + return name + raise Exception("Panther remote not found") + + def merge_base(self, branch: str) -> str: + """ + Get the merge base between the panther remote and the current branch + + Args: + branch: The branch to get the merge base from + + Returns: + The merge base commit hash + """ + panther_commit = self.panther_latest_release_commit() + merge_base_output = subprocess.run( # nosec:B607 B603 + ["git", "merge-base", panther_commit, branch], capture_output=True, text=True + ) + if merge_base_output.returncode != 0: + if "Not a valid object name" in merge_base_output.stderr: + # need to fetch the panther remote + subprocess.run( + ["git", "fetch", self.panther_remote()], check=True + ) # nosec:B607 B603 + elif merge_base_output.stderr == "": + raise Exception(f"Failed to find common ancestor:") + else: + raise Exception(f"Failed to get merge base: {merge_base_output.stderr}") + output = merge_base_output.stdout.strip() + if output == "": + raise Exception(f"No merge base found for {branch} and {panther_commit}") + return output + + def git_root(self) -> str: + """ + Get the root of the git repo + """ + if self._git_root is not None: + return self._git_root + + rev_parse = subprocess.run( # nosec:B607 B603 + ["git", "rev-parse", "--show-toplevel"], + capture_output=True, + text=True, + check=True, + ) + if rev_parse.returncode != 0: + raise Exception(f"Failed to get git root: {rev_parse.stderr}") + if rev_parse.stdout is None: + raise Exception("Failed to get git root") + self._git_root = rev_parse.stdout.strip() + return self._git_root diff --git a/panther_analysis_tool/main.py b/panther_analysis_tool/main.py index 3336806f..b68590d2 100644 --- a/panther_analysis_tool/main.py +++ b/panther_analysis_tool/main.py @@ -9,6 +9,7 @@ import mimetypes import os import shutil +import sqlite3 import subprocess # nosec import sys import time @@ -45,6 +46,7 @@ from typing_extensions import Annotated from panther_analysis_tool import analysis_utils +from panther_analysis_tool.core import analysis_cache from panther_analysis_tool.directory import setup_temp # this is needed at this location so each process can have its own temp directory @@ -98,6 +100,13 @@ benchmark, bulk_delete, check_connection, + clone, + enable, + fetch, + fmt, + init_project, + merge, + rev, validate, ) from panther_analysis_tool.command.standard_args import ( @@ -2396,6 +2405,84 @@ def check_packs_command( return check_packs(path) +@app_command_with_config(name="init", help="Initialize a new panther project") +def init_command() -> Tuple[int, str]: + return init_project.run() + + +def complete_id(_ctx: typer.Context, _args: List[str], incomplete: str) -> List[str]: + try: + cache = analysis_cache.AnalysisCache() + return [ + spec for spec in cache.list_spec_ids() if spec.lower().startswith(incomplete.lower()) + ] + except sqlite3.Error: + return [] + + +@app_command_with_config(name="enable", help="Enable a detection") +def enable_command( + filters: FilterType = None, + analysis_id: Annotated[ + Optional[str], + typer.Argument( + help="The ID of the analysis item to enable.", + autocompletion=complete_id, + ), + ] = None, +) -> Tuple[int, str]: + if filters is None: + filters = [] + return enable.run(analysis_id, filters) + + +@app_command_with_config(name="fetch", help="Fetch a detection") +def fetch_command() -> Tuple[int, str]: + return fetch.run() + + +@app_command_with_config(name="merge", help="Merge a detection") +def merge_command( + analysis_id: Annotated[ + Optional[str], + typer.Argument(help="The ID of the analysis item to merge.", autocompletion=complete_id), + ] = None, + migrate: Annotated[ + bool, typer.Option(help="Migrate the analysis item to the latest panther version.") + ] = False, +) -> Tuple[int, str]: + return merge.run(analysis_id, migrate) + + +@app_command_with_config(name="clone", help="Clone a detection") +def clone_command( + analysis_id: Annotated[ + Optional[str], + typer.Argument(help="The ID of the analysis item to clone.", autocompletion=complete_id), + ] = None, + filters: FilterType = None, +) -> Tuple[int, str]: + if filters is None: + filters = [] + return clone.run(analysis_id, filters) + + +# TODO: remove this +@app_command_with_config(name="rev", help="Rev a detection") +def rev_command( + analysis_id: Annotated[ + str, + typer.Argument(help="The ID of the analysis item to rev.", autocompletion=complete_id), + ], +) -> Tuple[int, str]: + return rev.run(analysis_id) + + +@app_command_with_config(name="fmt", help="Format a detection") +def fmt_command() -> Tuple[int, str]: + return fmt.run() + + # pylint: disable=too-many-statements def run() -> None: # setup logger and print version info as necessary diff --git a/panther_analysis_tool/schemas.py b/panther_analysis_tool/schemas.py index 20ab0068..f3da1593 100644 --- a/panther_analysis_tool/schemas.py +++ b/panther_analysis_tool/schemas.py @@ -75,6 +75,7 @@ def validate( ], Optional("DisplayName"): And(str, NAME_ID_VALIDATION_REGEX), Optional("Filename"): str, + Optional("BaseVersion"): int, }, ignore_extra_keys=False, ) @@ -86,6 +87,7 @@ def validate( "GlobalID": And(str, NAME_ID_VALIDATION_REGEX), Optional("Description"): str, Optional("Tags"): [str], + Optional("BaseVersion"): int, }, ignore_extra_keys=False, ) @@ -134,6 +136,7 @@ def validate( Optional("Mocks"): [MOCK_SCHEMA], } ], + Optional("BaseVersion"): int, }, ignore_extra_keys=False, ) # Prevent user typos on optional fields @@ -175,6 +178,7 @@ def validate( Optional("AlertContext"): object, Optional("GroupBy"): object, Optional("CreateAlert"): bool, + Optional("BaseVersion"): int, }, ignore_extra_keys=False, ) # Prevent user typos on optional fields @@ -204,6 +208,7 @@ def validate( Optional("GroupBy"): object, Optional("Tests"): object, Optional("CreateAlert"): bool, + Optional("BaseVersion"): int, }, ignore_extra_keys=False, ) @@ -238,6 +243,7 @@ def validate( ], } ], + Optional("BaseVersion"): int, }, ignore_extra_keys=False, ) @@ -251,6 +257,7 @@ def validate( Optional("Tags"): [str], Optional("Lookback"): bool, Optional("LookbackWindowSeconds"): int, + Optional("BaseVersion"): int, }, ignore_extra_keys=False, ) # Prevent user typos on optional fields @@ -271,6 +278,7 @@ def validate( Optional("Tags"): [str], Optional("Lookback"): bool, Optional("LookbackWindowSeconds"): int, + Optional("BaseVersion"): int, Optional("EmailConfig"): { "Recipients": [str], Optional("SendEmpty"): bool, @@ -305,6 +313,7 @@ def validate( }, Optional("Description"): str, Optional("Reference"): str, + Optional("BaseVersion"): int, }, ignore_extra_keys=False, ) # Prevent user typos on optional fields